mirror of
https://github.com/servo/servo.git
synced 2025-07-22 23:03:42 +01:00
introduce task-queues, and throttling the performance-timeline task-source, in script and worker threads.
queue
This commit is contained in:
parent
da36740f0b
commit
ca6306c430
10 changed files with 540 additions and 93 deletions
165
components/script/task_queue.rs
Normal file
165
components/script/task_queue.rs
Normal file
|
@ -0,0 +1,165 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
//! Machinery for [task-queue](https://html.spec.whatwg.org/multipage/#task-queue).
|
||||
|
||||
use dom::bindings::cell::DomRefCell;
|
||||
use dom::worker::TrustedWorkerAddress;
|
||||
use msg::constellation_msg::PipelineId;
|
||||
use script_runtime::ScriptThreadEventCategory;
|
||||
use std::cell::Cell;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::default::Default;
|
||||
use std::sync::mpsc::{Receiver, Sender};
|
||||
use task::TaskBox;
|
||||
use task_source::TaskSourceName;
|
||||
|
||||
|
||||
pub type QueuedTask = (Option<TrustedWorkerAddress>, ScriptThreadEventCategory, Box<TaskBox>, Option<PipelineId>);
|
||||
|
||||
/// Defining the operations used to convert from a msg T to a QueuedTask.
|
||||
pub trait QueuedTaskConversion {
|
||||
fn task_category(&self) -> Option<&ScriptThreadEventCategory>;
|
||||
fn into_queued_task(self) -> Option<QueuedTask>;
|
||||
fn from_queued_task(queued_task: QueuedTask) -> Self;
|
||||
fn wake_up_msg() -> Self;
|
||||
fn is_wake_up(&self) -> bool;
|
||||
}
|
||||
|
||||
pub struct TaskQueue<T> {
|
||||
/// The original port on which the task-sources send tasks as messages.
|
||||
port: Receiver<T>,
|
||||
/// A sender to ensure the port doesn't block on select while there are throttled tasks.
|
||||
wake_up_sender: Sender<T>,
|
||||
/// A queue from which the event-loop can drain tasks.
|
||||
msg_queue: DomRefCell<VecDeque<T>>,
|
||||
/// A "business" counter, reset for each iteration of the event-loop
|
||||
taken_task_counter: Cell<u64>,
|
||||
/// Tasks that will be throttled for as long as we are "busy".
|
||||
throttled: DomRefCell<HashMap<TaskSourceName, VecDeque<QueuedTask>>>
|
||||
}
|
||||
|
||||
impl<T: QueuedTaskConversion> TaskQueue<T> {
|
||||
pub fn new(port: Receiver<T>, wake_up_sender: Sender<T>) -> TaskQueue<T> {
|
||||
TaskQueue {
|
||||
port,
|
||||
wake_up_sender,
|
||||
msg_queue: DomRefCell::new(VecDeque::new()),
|
||||
taken_task_counter: Default::default(),
|
||||
throttled: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Process incoming tasks, immediately sending priority ones downstream,
|
||||
/// and categorizing potential throttles.
|
||||
fn process_incoming_tasks(&self) {
|
||||
let mut non_throttled: Vec<T> = self.port
|
||||
.try_iter()
|
||||
.filter(|msg| !msg.is_wake_up())
|
||||
.collect();
|
||||
|
||||
let to_be_throttled: Vec<T> = non_throttled.drain_filter(|msg|{
|
||||
let category = match msg.task_category() {
|
||||
Some(category) => category,
|
||||
None => return false,
|
||||
};
|
||||
match category {
|
||||
ScriptThreadEventCategory::PerformanceTimelineTask => return true,
|
||||
_ => {
|
||||
// A task that will not be throttled, start counting "business"
|
||||
self.taken_task_counter.set(self.taken_task_counter.get() + 1);
|
||||
return false
|
||||
},
|
||||
}
|
||||
}).collect();
|
||||
|
||||
for msg in non_throttled {
|
||||
// Immediately send non-throttled tasks for processing.
|
||||
let _ = self.msg_queue.borrow_mut().push_back(msg);
|
||||
}
|
||||
|
||||
for msg in to_be_throttled {
|
||||
// Categorize tasks per task queue.
|
||||
let (worker, category, boxed, pipeline_id) = match msg.into_queued_task() {
|
||||
Some((worker, category, boxed, pipeline_id)) => (worker, category, boxed, pipeline_id),
|
||||
None => unreachable!("A message to be throttled should always be convertible into a queued task"),
|
||||
};
|
||||
// FIXME: Add the task-source name directly to CommonScriptMsg::Task.
|
||||
let task_source = match category {
|
||||
ScriptThreadEventCategory::PerformanceTimelineTask => TaskSourceName::PerformanceTimeline,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let mut throttled_tasks = self.throttled.borrow_mut();
|
||||
throttled_tasks
|
||||
.entry(task_source)
|
||||
.or_insert(VecDeque::new())
|
||||
.push_back((worker, category, boxed, pipeline_id));
|
||||
}
|
||||
}
|
||||
|
||||
/// Reset the queue for a new iteration of the event-loop,
|
||||
/// returning the port about whose readiness we want to be notified.
|
||||
pub fn select(&self) -> &Receiver<T> {
|
||||
// This is a new iteration of the event-loop, so we reset the "business" counter.
|
||||
self.taken_task_counter.set(0);
|
||||
// We want to be notified when the script-port is ready to receive.
|
||||
// Hence that's the one we need to include in the select.
|
||||
&self.port
|
||||
}
|
||||
|
||||
/// Take a message from the front of the queue, without waiting if empty.
|
||||
pub fn recv(&self) -> Result<T, ()> {
|
||||
self.msg_queue.borrow_mut().pop_front().ok_or(())
|
||||
}
|
||||
|
||||
/// Same as recv.
|
||||
pub fn try_recv(&self) -> Result<T, ()> {
|
||||
self.recv()
|
||||
}
|
||||
|
||||
/// Drain the queue for the current iteration of the event-loop.
|
||||
/// Holding-back throttles above a given high-water mark.
|
||||
pub fn take_tasks(&self) {
|
||||
// High-watermark: once reached, throttled tasks will be held-back.
|
||||
const PER_ITERATION_MAX: u64 = 5;
|
||||
// Always first check for new tasks, but don't reset 'taken_task_counter'.
|
||||
self.process_incoming_tasks();
|
||||
let mut throttled = self.throttled.borrow_mut();
|
||||
let mut throttled_length: usize = throttled.values().map(|queue| queue.len()).sum();
|
||||
let task_source_names = TaskSourceName::all();
|
||||
let mut task_source_cycler = task_source_names.iter().cycle();
|
||||
// "being busy", is defined as having more than x tasks for this loop's iteration.
|
||||
// As long as we're not busy, and there are throttled tasks left:
|
||||
loop {
|
||||
let max_reached = self.taken_task_counter.get() > PER_ITERATION_MAX;
|
||||
let none_left = throttled_length == 0;
|
||||
match (max_reached, none_left) {
|
||||
(_, true) => break,
|
||||
(true, false) => {
|
||||
// We have reached the high-watermark for this iteration of the event-loop,
|
||||
// yet also have throttled messages left in the queue.
|
||||
// Ensure the select wakes up in the next iteration of the event-loop
|
||||
let _ = self.wake_up_sender.send(T::wake_up_msg());
|
||||
break;
|
||||
},
|
||||
(false, false) => {
|
||||
// Cycle through non-priority task sources, taking one throttled task from each.
|
||||
let task_source = task_source_cycler.next().unwrap();
|
||||
let throttled_queue = match throttled.get_mut(&task_source) {
|
||||
Some(queue) => queue,
|
||||
None => continue,
|
||||
};
|
||||
let queued_task = match throttled_queue.pop_front() {
|
||||
Some(queued_task) => queued_task,
|
||||
None => continue,
|
||||
};
|
||||
let msg = T::from_queued_task(queued_task);
|
||||
let _ = self.msg_queue.borrow_mut().push_back(msg);
|
||||
self.taken_task_counter.set(self.taken_task_counter.get() + 1);
|
||||
throttled_length = throttled_length - 1;
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue