mirror of
https://github.com/servo/servo.git
synced 2025-07-22 23:03:42 +01:00
in BC event-loop, only run tasks related to fully-active documents
This commit is contained in:
parent
6b648429f5
commit
ecfb9c639a
9 changed files with 219 additions and 14 deletions
|
@ -7,12 +7,13 @@
|
|||
use crate::dom::bindings::cell::DomRefCell;
|
||||
use crate::dom::worker::TrustedWorkerAddress;
|
||||
use crate::script_runtime::ScriptThreadEventCategory;
|
||||
use crate::script_thread::ScriptThread;
|
||||
use crate::task::TaskBox;
|
||||
use crate::task_source::TaskSourceName;
|
||||
use crossbeam_channel::{self, Receiver, Sender};
|
||||
use msg::constellation_msg::PipelineId;
|
||||
use std::cell::Cell;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
use std::default::Default;
|
||||
|
||||
pub type QueuedTask = (
|
||||
|
@ -26,8 +27,10 @@ pub type QueuedTask = (
|
|||
/// Defining the operations used to convert from a msg T to a QueuedTask.
|
||||
pub trait QueuedTaskConversion {
|
||||
fn task_source_name(&self) -> Option<&TaskSourceName>;
|
||||
fn pipeline_id(&self) -> Option<PipelineId>;
|
||||
fn into_queued_task(self) -> Option<QueuedTask>;
|
||||
fn from_queued_task(queued_task: QueuedTask) -> Self;
|
||||
fn inactive_msg() -> Self;
|
||||
fn wake_up_msg() -> Self;
|
||||
fn is_wake_up(&self) -> bool;
|
||||
}
|
||||
|
@ -43,6 +46,8 @@ pub struct TaskQueue<T> {
|
|||
taken_task_counter: Cell<u64>,
|
||||
/// Tasks that will be throttled for as long as we are "busy".
|
||||
throttled: DomRefCell<HashMap<TaskSourceName, VecDeque<QueuedTask>>>,
|
||||
/// Tasks for not fully-active documents.
|
||||
inactive: DomRefCell<HashMap<PipelineId, VecDeque<QueuedTask>>>,
|
||||
}
|
||||
|
||||
impl<T: QueuedTaskConversion> TaskQueue<T> {
|
||||
|
@ -53,22 +58,66 @@ impl<T: QueuedTaskConversion> TaskQueue<T> {
|
|||
msg_queue: DomRefCell::new(VecDeque::new()),
|
||||
taken_task_counter: Default::default(),
|
||||
throttled: Default::default(),
|
||||
inactive: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Release previously held-back tasks for documents that are now fully-active.
|
||||
/// https://html.spec.whatwg.org/multipage/#event-loop-processing-model:fully-active
|
||||
fn release_tasks_for_fully_active_documents(
|
||||
&self,
|
||||
fully_active: &HashSet<PipelineId>,
|
||||
) -> Vec<T> {
|
||||
self.inactive
|
||||
.borrow_mut()
|
||||
.iter_mut()
|
||||
.filter(|(pipeline_id, _)| fully_active.contains(pipeline_id))
|
||||
.flat_map(|(_, inactive_queue)| {
|
||||
inactive_queue
|
||||
.drain(0..)
|
||||
.map(|queued_task| T::from_queued_task(queued_task))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Hold back tasks for currently not fully-active documents.
|
||||
/// https://html.spec.whatwg.org/multipage/#event-loop-processing-model:fully-active
|
||||
fn store_task_for_inactive_pipeline(&self, msg: T, pipeline_id: &PipelineId) {
|
||||
let mut inactive = self.inactive.borrow_mut();
|
||||
let inactive_queue = inactive.entry(pipeline_id.clone()).or_default();
|
||||
inactive_queue.push_back(
|
||||
msg.into_queued_task()
|
||||
.expect("Incoming messages should always be convertible into queued tasks"),
|
||||
);
|
||||
let mut msg_queue = self.msg_queue.borrow_mut();
|
||||
if msg_queue.is_empty() {
|
||||
// Ensure there is at least one message.
|
||||
// Otherwise if the just stored inactive message
|
||||
// was the first and last of this iteration,
|
||||
// it will result in a spurious wake-up of the event-loop.
|
||||
msg_queue.push_back(T::inactive_msg());
|
||||
}
|
||||
}
|
||||
|
||||
/// Process incoming tasks, immediately sending priority ones downstream,
|
||||
/// and categorizing potential throttles.
|
||||
fn process_incoming_tasks(&self, first_msg: T) {
|
||||
let mut incoming = Vec::with_capacity(self.port.len() + 1);
|
||||
fn process_incoming_tasks(&self, first_msg: T, fully_active: &HashSet<PipelineId>) {
|
||||
// 1. Make any previously stored task from now fully-active document available.
|
||||
let mut incoming = self.release_tasks_for_fully_active_documents(fully_active);
|
||||
|
||||
// 2. Process the first message(artifact of the fact that select always returns a message).
|
||||
if !first_msg.is_wake_up() {
|
||||
incoming.push(first_msg);
|
||||
}
|
||||
|
||||
// 3. Process any other incoming message.
|
||||
while let Ok(msg) = self.port.try_recv() {
|
||||
if !msg.is_wake_up() {
|
||||
incoming.push(msg);
|
||||
}
|
||||
}
|
||||
|
||||
// 4. Filter tasks from non-priority task-sources.
|
||||
let to_be_throttled: Vec<T> = incoming
|
||||
.drain_filter(|msg| {
|
||||
let task_source = match msg.task_source_name() {
|
||||
|
@ -88,6 +137,12 @@ impl<T: QueuedTaskConversion> TaskQueue<T> {
|
|||
.collect();
|
||||
|
||||
for msg in incoming {
|
||||
if let Some(pipeline_id) = msg.pipeline_id() {
|
||||
if !fully_active.contains(&pipeline_id) {
|
||||
self.store_task_for_inactive_pipeline(msg, &pipeline_id);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
// Immediately send non-throttled tasks for processing.
|
||||
let _ = self.msg_queue.borrow_mut().push_back(msg);
|
||||
}
|
||||
|
@ -103,7 +158,7 @@ impl<T: QueuedTaskConversion> TaskQueue<T> {
|
|||
let mut throttled_tasks = self.throttled.borrow_mut();
|
||||
throttled_tasks
|
||||
.entry(task_source.clone())
|
||||
.or_insert(VecDeque::new())
|
||||
.or_default()
|
||||
.push_back((worker, category, boxed, pipeline_id, task_source));
|
||||
}
|
||||
}
|
||||
|
@ -133,8 +188,9 @@ impl<T: QueuedTaskConversion> TaskQueue<T> {
|
|||
pub fn take_tasks(&self, first_msg: T) {
|
||||
// High-watermark: once reached, throttled tasks will be held-back.
|
||||
const PER_ITERATION_MAX: u64 = 5;
|
||||
let fully_active = ScriptThread::get_fully_active_document_ids();
|
||||
// Always first check for new tasks, but don't reset 'taken_task_counter'.
|
||||
self.process_incoming_tasks(first_msg);
|
||||
self.process_incoming_tasks(first_msg, &fully_active);
|
||||
let mut throttled = self.throttled.borrow_mut();
|
||||
let mut throttled_length: usize = throttled.values().map(|queue| queue.len()).sum();
|
||||
let task_source_names = TaskSourceName::all();
|
||||
|
@ -165,6 +221,20 @@ impl<T: QueuedTaskConversion> TaskQueue<T> {
|
|||
None => continue,
|
||||
};
|
||||
let msg = T::from_queued_task(queued_task);
|
||||
|
||||
// Hold back tasks for currently inactive documents.
|
||||
if let Some(pipeline_id) = msg.pipeline_id() {
|
||||
if !fully_active.contains(&pipeline_id) {
|
||||
self.store_task_for_inactive_pipeline(msg, &pipeline_id);
|
||||
// Reduce the length of throttles,
|
||||
// but don't add the task to "msg_queue",
|
||||
// and neither increment "taken_task_counter".
|
||||
throttled_length = throttled_length - 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Make the task available for the event-loop to handle as a message.
|
||||
let _ = self.msg_queue.borrow_mut().push_back(msg);
|
||||
self.taken_task_counter
|
||||
.set(self.taken_task_counter.get() + 1);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue