From 029715aba6f4a4c62b5a2759d47dda57954cc433 Mon Sep 17 00:00:00 2001 From: Gregory Terzian Date: Sat, 18 Aug 2018 20:28:42 +0200 Subject: [PATCH] introduce a generic worker event-loop --- .../script/dom/abstractworkerglobalscope.rs | 94 ++++++++++++++- .../script/dom/dedicatedworkerglobalscope.rs | 107 +++++++----------- .../script/dom/serviceworkerglobalscope.rs | 105 +++++++---------- 3 files changed, 175 insertions(+), 131 deletions(-) diff --git a/components/script/dom/abstractworkerglobalscope.rs b/components/script/dom/abstractworkerglobalscope.rs index 99c08dc2c38..45c594dbaee 100644 --- a/components/script/dom/abstractworkerglobalscope.rs +++ b/components/script/dom/abstractworkerglobalscope.rs @@ -2,11 +2,17 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +use devtools_traits::DevtoolScriptControlMsg; use dom::abstractworker::WorkerScriptMsg; -use dom::dedicatedworkerglobalscope::DedicatedWorkerScriptMsg; +use dom::bindings::conversions::DerivedFrom; +use dom::bindings::reflector::DomObject; +use dom::dedicatedworkerglobalscope::{AutoWorkerReset, DedicatedWorkerScriptMsg}; +use dom::globalscope::GlobalScope; use dom::worker::TrustedWorkerAddress; +use dom::workerglobalscope::WorkerGlobalScope; use script_runtime::{ScriptChan, CommonScriptMsg, ScriptPort}; -use std::sync::mpsc::{Receiver, Sender}; +use std::sync::mpsc::{Receiver, Select, Sender}; +use task_queue::{QueuedTaskConversion, TaskQueue}; /// A ScriptChan that can be cloned freely and will silently send a TrustedWorkerAddress with /// common event loop messages. While this SendableWorkerScriptChan is alive, the associated @@ -69,3 +75,87 @@ impl ScriptPort for Receiver { } } } + +pub trait WorkerEventLoopMethods { + type TimerMsg: Send; + type WorkerMsg: QueuedTaskConversion + Send; + type Event; + fn timer_event_port(&self) -> &Receiver; + fn task_queue(&self) -> &TaskQueue; + fn handle_event(&self, event: Self::Event); + fn handle_worker_post_event(&self, worker: &TrustedWorkerAddress) -> Option; + fn from_worker_msg(&self, msg: Self::WorkerMsg) -> Self::Event; + fn from_timer_msg(&self, msg: Self::TimerMsg) -> Self::Event; + fn from_devtools_msg(&self, msg: DevtoolScriptControlMsg) -> Self::Event; +} + +#[allow(unsafe_code)] +// https://html.spec.whatwg.org/multipage/#worker-event-loop +pub fn run_worker_event_loop(worker_scope: &T, + worker: Option<&TrustedWorkerAddress>) +where + TimerMsg: Send, + WorkerMsg: QueuedTaskConversion + Send, + T: WorkerEventLoopMethods + + DerivedFrom + DerivedFrom + + DomObject { + let scope = worker_scope.upcast::(); + let timer_event_port = worker_scope.timer_event_port(); + let devtools_port = scope.from_devtools_receiver(); + let task_queue = worker_scope.task_queue(); + let sel = Select::new(); + let mut worker_handle = sel.handle(task_queue.select()); + let mut timer_event_handle = sel.handle(timer_event_port); + let mut devtools_handle = sel.handle(devtools_port); + unsafe { + worker_handle.add(); + timer_event_handle.add(); + if scope.from_devtools_sender().is_some() { + devtools_handle.add(); + } + } + let ret = sel.wait(); + let event = { + if ret == worker_handle.id() { + task_queue.take_tasks(); + worker_scope.from_worker_msg(task_queue.recv().unwrap()) + } else if ret == timer_event_handle.id() { + worker_scope.from_timer_msg(timer_event_port.recv().unwrap()) + } else if ret == devtools_handle.id() { + worker_scope.from_devtools_msg(devtools_port.recv().unwrap()) + } else { + panic!("unexpected select result!") + } + }; + let mut sequential = vec![]; + sequential.push(event); + // https://html.spec.whatwg.org/multipage/#worker-event-loop + // Once the WorkerGlobalScope's closing flag is set to true, + // the event loop's task queues must discard any further tasks + // that would be added to them + // (tasks already on the queue are unaffected except where otherwise specified). + while !scope.is_closing() { + // Batch all events that are ready. + // The task queue will throttle non-priority tasks if necessary. + match task_queue.try_recv() { + Err(_) => match timer_event_port.try_recv() { + Err(_) => match devtools_port.try_recv() { + Err(_) => break, + Ok(ev) => sequential.push(worker_scope.from_devtools_msg(ev)), + }, + Ok(ev) => sequential.push(worker_scope.from_timer_msg(ev)), + }, + Ok(ev) => sequential.push(worker_scope.from_worker_msg(ev)), + } + } + // Step 3 + for event in sequential { + worker_scope.handle_event(event); + // Step 6 + let _ar = match worker { + Some(worker) => worker_scope.handle_worker_post_event(worker), + None => None + }; + worker_scope.upcast::().perform_a_microtask_checkpoint(); + } +} diff --git a/components/script/dom/dedicatedworkerglobalscope.rs b/components/script/dom/dedicatedworkerglobalscope.rs index b71f9cd69b7..7cb23fbd7f1 100644 --- a/components/script/dom/dedicatedworkerglobalscope.rs +++ b/components/script/dom/dedicatedworkerglobalscope.rs @@ -6,6 +6,7 @@ use devtools; use devtools_traits::DevtoolScriptControlMsg; use dom::abstractworker::{SimpleWorkerErrorHandler, WorkerScriptMsg}; use dom::abstractworkerglobalscope::{SendableWorkerScriptChan, WorkerThreadWorkerChan}; +use dom::abstractworkerglobalscope::{WorkerEventLoopMethods, run_worker_event_loop}; use dom::bindings::cell::DomRefCell; use dom::bindings::codegen::Bindings::DedicatedWorkerGlobalScopeBinding; use dom::bindings::codegen::Bindings::DedicatedWorkerGlobalScopeBinding::DedicatedWorkerGlobalScopeMethods; @@ -40,7 +41,7 @@ use servo_url::ServoUrl; use std::mem::replace; use std::sync::Arc; use std::sync::atomic::AtomicBool; -use std::sync::mpsc::{Receiver, Select, Sender, channel}; +use std::sync::mpsc::{Receiver, Sender, channel}; use std::thread; use style::thread_state::{self, ThreadState}; use task_queue::{QueuedTask, QueuedTaskConversion, TaskQueue}; @@ -49,7 +50,7 @@ use task_queue::{QueuedTask, QueuedTaskConversion, TaskQueue}; /// value for the duration of this object's lifetime. This ensures that the related Worker /// object only lives as long as necessary (ie. while events are being executed), while /// providing a reference that can be cloned freely. -struct AutoWorkerReset<'a> { +pub struct AutoWorkerReset<'a> { workerscope: &'a DedicatedWorkerGlobalScope, old_worker: Option, } @@ -78,7 +79,7 @@ pub enum DedicatedWorkerScriptMsg { WakeUp, } -enum MixedMessage { +pub enum MixedMessage { FromWorker(DedicatedWorkerScriptMsg), FromScheduler((TrustedWorkerAddress, TimerEvent)), FromDevtools(DevtoolScriptControlMsg) @@ -155,6 +156,41 @@ pub struct DedicatedWorkerGlobalScope { parent_sender: Box, } +impl WorkerEventLoopMethods for DedicatedWorkerGlobalScope { + type TimerMsg = (TrustedWorkerAddress, TimerEvent); + type WorkerMsg = DedicatedWorkerScriptMsg; + type Event = MixedMessage; + + fn timer_event_port(&self) -> &Receiver<(TrustedWorkerAddress, TimerEvent)> { + &self.timer_event_port + } + + fn task_queue(&self) -> &TaskQueue { + &self.task_queue + } + + fn handle_event(&self, event: MixedMessage) { + self.handle_mixed_message(event); + } + + fn handle_worker_post_event(&self, worker: &TrustedWorkerAddress) -> Option { + let ar = AutoWorkerReset::new(&self, worker.clone()); + Some(ar) + } + + fn from_worker_msg(&self, msg: DedicatedWorkerScriptMsg) -> MixedMessage { + MixedMessage::FromWorker(msg) + } + + fn from_timer_msg(&self, msg: (TrustedWorkerAddress, TimerEvent)) -> MixedMessage { + MixedMessage::FromScheduler(msg) + } + + fn from_devtools_msg(&self, msg: DevtoolScriptControlMsg) -> MixedMessage { + MixedMessage::FromDevtools(msg) + } +} + impl DedicatedWorkerGlobalScope { fn new_inherited(init: WorkerGlobalScopeInit, worker_url: ServoUrl, @@ -309,7 +345,7 @@ impl DedicatedWorkerGlobalScope { // The worker processing model remains on this step until the event loop is destroyed, // which happens after the closing flag is set to true. while !scope.is_closing() { - global.run_event_loop(worker.clone()); + run_worker_event_loop(&*global, Some(&worker)); } }, reporter_name, parent_sender, CommonScriptMsg::CollectReports); }).expect("Thread spawning failed"); @@ -331,65 +367,6 @@ impl DedicatedWorkerGlobalScope { (chan, Box::new(rx)) } - #[allow(unsafe_code)] - fn run_event_loop(&self, worker: TrustedWorkerAddress) { - let scope = self.upcast::(); - let timer_event_port = &self.timer_event_port; - let devtools_port = scope.from_devtools_receiver(); - - let sel = Select::new(); - let mut worker_handle = sel.handle(self.task_queue.select()); - let mut timer_event_handle = sel.handle(timer_event_port); - let mut devtools_handle = sel.handle(devtools_port); - unsafe { - worker_handle.add(); - timer_event_handle.add(); - if scope.from_devtools_sender().is_some() { - devtools_handle.add(); - } - } - let ret = sel.wait(); - let event = { - if ret == worker_handle.id() { - MixedMessage::FromWorker(self.task_queue.take_tasks().recv().unwrap()) - } else if ret == timer_event_handle.id() { - MixedMessage::FromScheduler(timer_event_port.recv().unwrap()) - } else if ret == devtools_handle.id() { - MixedMessage::FromDevtools(devtools_port.recv().unwrap()) - } else { - panic!("unexpected select result!") - } - }; - let mut sequential = vec![]; - sequential.push(event); - // https://html.spec.whatwg.org/multipage/#worker-event-loop - // Once the WorkerGlobalScope's closing flag is set to true, - // the event loop's task queues must discard any further tasks - // that would be added to them - // (tasks already on the queue are unaffected except where otherwise specified). - while !scope.is_closing() { - // Batch all events that are ready. - // The task queue will throttle non-priority tasks if necessary. - match self.task_queue.take_tasks().try_recv() { - Err(_) => match timer_event_port.try_recv() { - Err(_) => match devtools_port.try_recv() { - Err(_) => break, - Ok(ev) => sequential.push(MixedMessage::FromDevtools(ev)), - }, - Ok(ev) => sequential.push(MixedMessage::FromScheduler(ev)), - }, - Ok(ev) => sequential.push(MixedMessage::FromWorker(ev)), - } - } - // Step 3 - for event in sequential { - self.handle_event(event); - // Step 6 - let _ar = AutoWorkerReset::new(&self, worker.clone()); - self.upcast::().perform_a_microtask_checkpoint(); - } - } - fn handle_script_event(&self, msg: WorkerScriptMsg) { match msg { WorkerScriptMsg::DOMMessage(data) => { @@ -407,8 +384,8 @@ impl DedicatedWorkerGlobalScope { } } - fn handle_event(&self, event: MixedMessage) { - match event { + fn handle_mixed_message(&self, msg: MixedMessage) { + match msg { MixedMessage::FromDevtools(msg) => { match msg { DevtoolScriptControlMsg::EvaluateJS(_pipe_id, string, sender) => diff --git a/components/script/dom/serviceworkerglobalscope.rs b/components/script/dom/serviceworkerglobalscope.rs index 1a95d3adba5..97573df279a 100644 --- a/components/script/dom/serviceworkerglobalscope.rs +++ b/components/script/dom/serviceworkerglobalscope.rs @@ -5,17 +5,20 @@ use devtools; use devtools_traits::DevtoolScriptControlMsg; use dom::abstractworker::WorkerScriptMsg; +use dom::abstractworkerglobalscope::{WorkerEventLoopMethods, run_worker_event_loop}; use dom::bindings::codegen::Bindings::ServiceWorkerGlobalScopeBinding; use dom::bindings::codegen::Bindings::ServiceWorkerGlobalScopeBinding::ServiceWorkerGlobalScopeMethods; use dom::bindings::inheritance::Castable; use dom::bindings::reflector::DomObject; use dom::bindings::root::{DomRoot, RootCollection, ThreadLocalStackRoots}; use dom::bindings::str::DOMString; +use dom::dedicatedworkerglobalscope::AutoWorkerReset; use dom::event::Event; use dom::eventtarget::EventTarget; use dom::extendableevent::ExtendableEvent; use dom::extendablemessageevent::ExtendableMessageEvent; use dom::globalscope::GlobalScope; +use dom::worker::TrustedWorkerAddress; use dom::workerglobalscope::WorkerGlobalScope; use dom_struct::dom_struct; use ipc_channel::ipc::{self, IpcSender, IpcReceiver}; @@ -29,7 +32,7 @@ use script_traits::{TimerEvent, WorkerGlobalScopeInit, ScopeThings, ServiceWorke use servo_config::prefs::PREFS; use servo_rand::random; use servo_url::ServoUrl; -use std::sync::mpsc::{Receiver, Select, Sender, channel}; +use std::sync::mpsc::{Receiver, Sender, channel}; use std::thread; use std::time::Duration; use style::thread_state::{self, ThreadState}; @@ -130,6 +133,40 @@ pub struct ServiceWorkerGlobalScope { scope_url: ServoUrl, } +impl WorkerEventLoopMethods for ServiceWorkerGlobalScope { + type TimerMsg = (); + type WorkerMsg = ServiceWorkerScriptMsg; + type Event = MixedMessage; + + fn timer_event_port(&self) -> &Receiver<()> { + &self.timer_event_port + } + + fn task_queue(&self) -> &TaskQueue { + &self.task_queue + } + + fn handle_event(&self, event: MixedMessage) { + self.handle_mixed_message(event); + } + + fn handle_worker_post_event(&self, _worker: &TrustedWorkerAddress) -> Option { + None + } + + fn from_worker_msg(&self, msg: ServiceWorkerScriptMsg) -> MixedMessage { + MixedMessage::FromServiceWorker(msg) + } + + fn from_timer_msg(&self, msg: ()) -> MixedMessage { + MixedMessage::FromTimeoutThread(msg) + } + + fn from_devtools_msg(&self, msg: DevtoolScriptControlMsg) -> MixedMessage { + MixedMessage::FromDevtools(msg) + } +} + impl ServiceWorkerGlobalScope { fn new_inherited(init: WorkerGlobalScopeInit, worker_url: ServoUrl, @@ -265,14 +302,14 @@ impl ServiceWorkerGlobalScope { // The worker processing model remains on this step until the event loop is destroyed, // which happens after the closing flag is set to true. while !scope.is_closing() { - global.run_event_loop(); + run_worker_event_loop(&*global, None); } }, reporter_name, scope.script_chan(), CommonScriptMsg::CollectReports); }).expect("Thread spawning failed"); } - fn handle_event(&self, event: MixedMessage) -> bool { - match event { + fn handle_mixed_message(&self, msg: MixedMessage) -> bool { + match msg { MixedMessage::FromDevtools(msg) => { match msg { DevtoolScriptControlMsg::EvaluateJS(_pipe_id, string, sender) => @@ -322,66 +359,6 @@ impl ServiceWorkerGlobalScope { } } - #[allow(unsafe_code)] - fn run_event_loop(&self) { - let scope = self.upcast::(); - let devtools_port = scope.from_devtools_receiver(); - let timer_event_port = &self.timer_event_port; - - let sel = Select::new(); - let mut worker_handle = sel.handle(self.task_queue.select()); - let mut devtools_handle = sel.handle(devtools_port); - let mut timer_port_handle = sel.handle(timer_event_port); - unsafe { - worker_handle.add(); - if scope.from_devtools_sender().is_some() { - devtools_handle.add(); - } - timer_port_handle.add(); - } - - let ret = sel.wait(); - let event = { - if ret == worker_handle.id() { - MixedMessage::FromServiceWorker(self.task_queue.take_tasks().recv().unwrap()) - } else if ret == devtools_handle.id() { - MixedMessage::FromDevtools(devtools_port.recv().unwrap()) - } else if ret == timer_port_handle.id() { - MixedMessage::FromTimeoutThread(timer_event_port.recv().unwrap()) - } else { - panic!("unexpected select result!") - } - }; - - let mut sequential = vec![]; - sequential.push(event); - // https://html.spec.whatwg.org/multipage/#worker-event-loop - // Once the WorkerGlobalScope's closing flag is set to true, - // the event loop's task queues must discard any further tasks - // that would be added to them - // (tasks already on the queue are unaffected except where otherwise specified). - while !scope.is_closing() { - // Batch all events that are ready. - // The task queue will throttle non-priority tasks if necessary. - match self.task_queue.take_tasks().try_recv() { - Err(_) => match timer_event_port.try_recv() { - Err(_) => match devtools_port.try_recv() { - Err(_) => break, - Ok(ev) => sequential.push(MixedMessage::FromDevtools(ev)), - }, - Ok(ev) => sequential.push(MixedMessage::FromTimeoutThread(ev)), - }, - Ok(ev) => sequential.push(MixedMessage::FromServiceWorker(ev)), - } - } - // Step 3 - for event in sequential { - self.handle_event(event); - // Step 6 - self.upcast::().perform_a_microtask_checkpoint(); - } - } - pub fn script_chan(&self) -> Box { Box::new(ServiceWorkerChan { sender: self.own_sender.clone()