mirror of
https://github.com/servo/servo.git
synced 2025-08-04 13:10:20 +01:00
Replace mpsc with crossbeam/servo channel, update ipc-channel
Co-authored-by: Gregory Terzian <gterzian@users.noreply.github.com>
This commit is contained in:
parent
b977b4994c
commit
2a996fbc8f
89 changed files with 341 additions and 377 deletions
|
@ -53,7 +53,7 @@ html5ever = "0.22"
|
|||
hyper = "0.10"
|
||||
hyper_serde = "0.8"
|
||||
image = "0.19"
|
||||
ipc-channel = "0.10"
|
||||
ipc-channel = "0.11"
|
||||
itertools = "0.7.6"
|
||||
jstraceable_derive = {path = "../jstraceable_derive"}
|
||||
lazy_static = "1"
|
||||
|
@ -85,6 +85,7 @@ serde_bytes = "0.10"
|
|||
servo_allocator = {path = "../allocator"}
|
||||
servo_arc = {path = "../servo_arc"}
|
||||
servo_atoms = {path = "../atoms"}
|
||||
servo_channel = {path = "../channel"}
|
||||
servo_config = {path = "../config"}
|
||||
servo_geometry = {path = "../geometry" }
|
||||
servo-media = {git = "https://github.com/servo/media"}
|
||||
|
|
|
@ -11,7 +11,7 @@ use dom::globalscope::GlobalScope;
|
|||
use dom::worker::TrustedWorkerAddress;
|
||||
use dom::workerglobalscope::WorkerGlobalScope;
|
||||
use script_runtime::{ScriptChan, CommonScriptMsg, ScriptPort};
|
||||
use std::sync::mpsc::{Receiver, Select, Sender};
|
||||
use servo_channel::{Receiver, Sender};
|
||||
use task_queue::{QueuedTaskConversion, TaskQueue};
|
||||
|
||||
/// A ScriptChan that can be cloned freely and will silently send a TrustedWorkerAddress with
|
||||
|
@ -65,9 +65,9 @@ impl ScriptChan for WorkerThreadWorkerChan {
|
|||
impl ScriptPort for Receiver<DedicatedWorkerScriptMsg> {
|
||||
fn recv(&self) -> Result<CommonScriptMsg, ()> {
|
||||
let common_msg = match self.recv() {
|
||||
Ok(DedicatedWorkerScriptMsg::CommonWorker(_worker, common_msg)) => common_msg,
|
||||
Err(_) => return Err(()),
|
||||
Ok(DedicatedWorkerScriptMsg::WakeUp) => panic!("unexpected worker event message!")
|
||||
Some(DedicatedWorkerScriptMsg::CommonWorker(_worker, common_msg)) => common_msg,
|
||||
None => return Err(()),
|
||||
Some(DedicatedWorkerScriptMsg::WakeUp) => panic!("unexpected worker event message!")
|
||||
};
|
||||
match common_msg {
|
||||
WorkerScriptMsg::Common(script_msg) => Ok(script_msg),
|
||||
|
@ -89,7 +89,6 @@ pub trait WorkerEventLoopMethods {
|
|||
fn from_devtools_msg(&self, msg: DevtoolScriptControlMsg) -> Self::Event;
|
||||
}
|
||||
|
||||
#[allow(unsafe_code)]
|
||||
// https://html.spec.whatwg.org/multipage/#worker-event-loop
|
||||
pub fn run_worker_event_loop<T, TimerMsg, WorkerMsg, Event>(worker_scope: &T,
|
||||
worker: Option<&TrustedWorkerAddress>)
|
||||
|
@ -101,31 +100,18 @@ where
|
|||
+ DomObject {
|
||||
let scope = worker_scope.upcast::<WorkerGlobalScope>();
|
||||
let timer_event_port = worker_scope.timer_event_port();
|
||||
let devtools_port = scope.from_devtools_receiver();
|
||||
let devtools_port = match scope.from_devtools_sender() {
|
||||
Some(_) => Some(scope.from_devtools_receiver().select()),
|
||||
None => None,
|
||||
};
|
||||
let task_queue = worker_scope.task_queue();
|
||||
let sel = Select::new();
|
||||
let mut worker_handle = sel.handle(task_queue.select());
|
||||
let mut timer_event_handle = sel.handle(timer_event_port);
|
||||
let mut devtools_handle = sel.handle(devtools_port);
|
||||
unsafe {
|
||||
worker_handle.add();
|
||||
timer_event_handle.add();
|
||||
if scope.from_devtools_sender().is_some() {
|
||||
devtools_handle.add();
|
||||
}
|
||||
}
|
||||
let ret = sel.wait();
|
||||
let event = {
|
||||
if ret == worker_handle.id() {
|
||||
task_queue.take_tasks();
|
||||
let event = select! {
|
||||
recv(task_queue.select(), msg) => {
|
||||
task_queue.take_tasks(msg.unwrap());
|
||||
worker_scope.from_worker_msg(task_queue.recv().unwrap())
|
||||
} else if ret == timer_event_handle.id() {
|
||||
worker_scope.from_timer_msg(timer_event_port.recv().unwrap())
|
||||
} else if ret == devtools_handle.id() {
|
||||
worker_scope.from_devtools_msg(devtools_port.recv().unwrap())
|
||||
} else {
|
||||
panic!("unexpected select result!")
|
||||
}
|
||||
},
|
||||
recv(timer_event_port.select(), msg) => worker_scope.from_timer_msg(msg.unwrap()),
|
||||
recv(devtools_port, msg) => worker_scope.from_devtools_msg(msg.unwrap()),
|
||||
};
|
||||
let mut sequential = vec![];
|
||||
sequential.push(event);
|
||||
|
@ -138,14 +124,14 @@ where
|
|||
// Batch all events that are ready.
|
||||
// The task queue will throttle non-priority tasks if necessary.
|
||||
match task_queue.try_recv() {
|
||||
Err(_) => match timer_event_port.try_recv() {
|
||||
Err(_) => match devtools_port.try_recv() {
|
||||
Err(_) => break,
|
||||
Ok(ev) => sequential.push(worker_scope.from_devtools_msg(ev)),
|
||||
None => match timer_event_port.try_recv() {
|
||||
None => match devtools_port.and_then(|port| port.try_recv()) {
|
||||
None => break,
|
||||
Some(ev) => sequential.push(worker_scope.from_devtools_msg(ev)),
|
||||
},
|
||||
Ok(ev) => sequential.push(worker_scope.from_timer_msg(ev)),
|
||||
Some(ev) => sequential.push(worker_scope.from_timer_msg(ev)),
|
||||
},
|
||||
Ok(ev) => sequential.push(worker_scope.from_worker_msg(ev)),
|
||||
Some(ev) => sequential.push(worker_scope.from_worker_msg(ev)),
|
||||
}
|
||||
}
|
||||
// Step 3
|
||||
|
|
|
@ -88,6 +88,7 @@ use selectors::matching::ElementSelectorFlags;
|
|||
use serde::{Deserialize, Serialize};
|
||||
use servo_arc::Arc as ServoArc;
|
||||
use servo_atoms::Atom;
|
||||
use servo_channel::{Receiver, Sender};
|
||||
use servo_media::Backend;
|
||||
use servo_media::audio::buffer_source_node::AudioBuffer;
|
||||
use servo_media::audio::context::AudioContext;
|
||||
|
@ -104,7 +105,6 @@ use std::path::PathBuf;
|
|||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize};
|
||||
use std::sync::mpsc::{Receiver, Sender};
|
||||
use std::time::{SystemTime, Instant};
|
||||
use style::attr::{AttrIdentifier, AttrValue, LengthOrPercentageOrAuto};
|
||||
use style::context::QuirksMode;
|
||||
|
|
|
@ -36,12 +36,12 @@ use net_traits::request::{CredentialsMode, Destination, RequestInit};
|
|||
use script_runtime::{CommonScriptMsg, ScriptChan, ScriptPort, new_rt_and_cx, Runtime};
|
||||
use script_runtime::ScriptThreadEventCategory::WorkerEvent;
|
||||
use script_traits::{TimerEvent, TimerSource, WorkerGlobalScopeInit, WorkerScriptLoadOrigin};
|
||||
use servo_channel::{channel, route_ipc_receiver_to_new_servo_sender, Sender, Receiver};
|
||||
use servo_rand::random;
|
||||
use servo_url::ServoUrl;
|
||||
use std::mem::replace;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::mpsc::{Receiver, Sender, channel};
|
||||
use std::thread;
|
||||
use style::thread_state::{self, ThreadState};
|
||||
use task_queue::{QueuedTask, QueuedTaskConversion, TaskQueue};
|
||||
|
@ -313,7 +313,7 @@ impl DedicatedWorkerGlobalScope {
|
|||
let runtime = unsafe { new_rt_and_cx() };
|
||||
|
||||
let (devtools_mpsc_chan, devtools_mpsc_port) = channel();
|
||||
ROUTER.route_ipc_receiver_to_mpsc_sender(from_devtools_receiver, devtools_mpsc_chan);
|
||||
route_ipc_receiver_to_new_servo_sender(from_devtools_receiver, devtools_mpsc_chan);
|
||||
|
||||
let (timer_tx, timer_rx) = channel();
|
||||
let (timer_ipc_chan, timer_ipc_port) = ipc::channel().unwrap();
|
||||
|
|
|
@ -60,8 +60,6 @@ use std::ptr::null_mut;
|
|||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::mpsc;
|
||||
use std::sync::mpsc::Sender;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use style_traits::CSSPixel;
|
||||
|
@ -352,7 +350,7 @@ impl PaintWorkletGlobalScope {
|
|||
arguments: Vec<String>)
|
||||
-> Result<DrawAPaintImageResult, PaintWorkletError> {
|
||||
let name = self.name.clone();
|
||||
let (sender, receiver) = mpsc::channel();
|
||||
let (sender, receiver) = channel();
|
||||
let task = PaintWorkletTask::DrawAPaintImage(name,
|
||||
size,
|
||||
device_pixel_ratio,
|
||||
|
|
|
@ -22,17 +22,16 @@ use dom::worker::TrustedWorkerAddress;
|
|||
use dom::workerglobalscope::WorkerGlobalScope;
|
||||
use dom_struct::dom_struct;
|
||||
use ipc_channel::ipc::{self, IpcSender, IpcReceiver};
|
||||
use ipc_channel::router::ROUTER;
|
||||
use js::jsapi::{JSAutoCompartment, JSContext, JS_AddInterruptCallback};
|
||||
use js::jsval::UndefinedValue;
|
||||
use net_traits::{load_whole_resource, IpcSend, CustomResponseMediator};
|
||||
use net_traits::request::{CredentialsMode, Destination, RequestInit};
|
||||
use script_runtime::{CommonScriptMsg, ScriptChan, new_rt_and_cx, Runtime};
|
||||
use script_traits::{TimerEvent, WorkerGlobalScopeInit, ScopeThings, ServiceWorkerMsg, WorkerScriptLoadOrigin};
|
||||
use servo_channel::{channel, route_ipc_receiver_to_new_servo_sender, Receiver, Sender};
|
||||
use servo_config::prefs::PREFS;
|
||||
use servo_rand::random;
|
||||
use servo_url::ServoUrl;
|
||||
use std::sync::mpsc::{Receiver, Sender, channel};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use style::thread_state::{self, ThreadState};
|
||||
|
@ -272,7 +271,7 @@ impl ServiceWorkerGlobalScope {
|
|||
let runtime = unsafe { new_rt_and_cx() };
|
||||
|
||||
let (devtools_mpsc_chan, devtools_mpsc_port) = channel();
|
||||
ROUTER.route_ipc_receiver_to_mpsc_sender(devtools_receiver, devtools_mpsc_chan);
|
||||
route_ipc_receiver_to_new_servo_sender(devtools_receiver, devtools_mpsc_chan);
|
||||
// TODO XXXcreativcoder use this timer_ipc_port, when we have a service worker instance here
|
||||
let (timer_ipc_chan, _timer_ipc_port) = ipc::channel().unwrap();
|
||||
let (timer_chan, timer_port) = channel();
|
||||
|
|
|
@ -27,12 +27,12 @@ use html5ever::tendril::fmt::UTF8;
|
|||
use html5ever::tokenizer::{Tokenizer as HtmlTokenizer, TokenizerOpts, TokenizerResult};
|
||||
use html5ever::tree_builder::{ElementFlags, NodeOrText as HtmlNodeOrText, NextParserState, QuirksMode, TreeSink};
|
||||
use html5ever::tree_builder::{TreeBuilder, TreeBuilderOpts};
|
||||
use servo_channel::{channel, Receiver, Sender};
|
||||
use servo_url::ServoUrl;
|
||||
use std::borrow::Cow;
|
||||
use std::cell::Cell;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::vec_deque::VecDeque;
|
||||
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||
use std::thread;
|
||||
use style::context::QuirksMode as ServoQuirksMode;
|
||||
|
||||
|
|
|
@ -13,9 +13,9 @@ use dom::workletglobalscope::WorkletGlobalScopeInit;
|
|||
use dom_struct::dom_struct;
|
||||
use js::rust::Runtime;
|
||||
use msg::constellation_msg::PipelineId;
|
||||
use servo_channel::Sender;
|
||||
use servo_url::ServoUrl;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::mpsc::Sender;
|
||||
|
||||
// check-tidy: no specs after this line
|
||||
|
||||
|
|
|
@ -36,11 +36,11 @@ use profile_traits::ipc;
|
|||
use script_runtime::CommonScriptMsg;
|
||||
use script_runtime::ScriptThreadEventCategory::WebVREvent;
|
||||
use serde_bytes::ByteBuf;
|
||||
use servo_channel::{channel, Sender};
|
||||
use std::cell::Cell;
|
||||
use std::mem;
|
||||
use std::ops::Deref;
|
||||
use std::rc::Rc;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use task_source::TaskSourceName;
|
||||
use webvr_traits::{WebVRDisplayData, WebVRDisplayEvent, WebVRFrameData, WebVRLayer, WebVRMsg};
|
||||
|
@ -505,7 +505,7 @@ impl VRDisplay {
|
|||
// while the render thread is syncing the VRFrameData to be used for the current frame.
|
||||
// This thread runs until the user calls ExitPresent, the tab is closed or some unexpected error happened.
|
||||
thread::Builder::new().name("WebVR_RAF".into()).spawn(move || {
|
||||
let (raf_sender, raf_receiver) = mpsc::channel();
|
||||
let (raf_sender, raf_receiver) = channel();
|
||||
let mut near = near_init;
|
||||
let mut far = far_init;
|
||||
|
||||
|
@ -581,7 +581,7 @@ impl VRDisplay {
|
|||
self.frame_data_status.set(status);
|
||||
}
|
||||
|
||||
fn handle_raf(&self, end_sender: &mpsc::Sender<Result<(f64, f64), ()>>) {
|
||||
fn handle_raf(&self, end_sender: &Sender<Result<(f64, f64), ()>>) {
|
||||
self.frame_data_status.set(VRFrameDataStatus::Waiting);
|
||||
self.running_display_raf.set(true);
|
||||
|
||||
|
|
|
@ -94,6 +94,7 @@ use script_traits::{TimerSchedulerMsg, UntrustedNodeAddress, WindowSizeData, Win
|
|||
use script_traits::webdriver_msg::{WebDriverJSError, WebDriverJSResult};
|
||||
use selectors::attr::CaseSensitivity;
|
||||
use servo_arc;
|
||||
use servo_channel::{channel, Sender};
|
||||
use servo_config::opts;
|
||||
use servo_geometry::{f32_rect_to_au_rect, MaxRect};
|
||||
use servo_url::{Host, MutableOrigin, ImmutableOrigin, ServoUrl};
|
||||
|
@ -109,8 +110,6 @@ use std::mem;
|
|||
use std::rc::Rc;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::{Sender, channel};
|
||||
use std::sync::mpsc::TryRecvError::{Disconnected, Empty};
|
||||
use style::error_reporting::ParseErrorReporter;
|
||||
use style::media_queries;
|
||||
use style::parser::ParserContext as CssParserContext;
|
||||
|
@ -1376,15 +1375,16 @@ impl Window {
|
|||
|
||||
debug!("script: layout forked");
|
||||
|
||||
let complete = match join_port.try_recv() {
|
||||
Err(Empty) => {
|
||||
let complete = select! {
|
||||
recv(join_port.select(), msg) => if let Some(reflow_complete) = msg {
|
||||
reflow_complete
|
||||
} else {
|
||||
panic!("Layout thread failed while script was waiting for a result.");
|
||||
},
|
||||
default => {
|
||||
info!("script: waiting on layout");
|
||||
join_port.recv().unwrap()
|
||||
}
|
||||
Ok(reflow_complete) => reflow_complete,
|
||||
Err(Disconnected) => {
|
||||
panic!("Layout thread failed while script was waiting for a result.");
|
||||
}
|
||||
};
|
||||
|
||||
debug!("script: layout joined");
|
||||
|
|
|
@ -25,10 +25,10 @@ use js::jsapi::{JSAutoCompartment, JSContext, JS_RequestInterruptCallback};
|
|||
use js::jsval::UndefinedValue;
|
||||
use js::rust::HandleValue;
|
||||
use script_traits::WorkerScriptLoadOrigin;
|
||||
use servo_channel::{channel, Sender};
|
||||
use std::cell::Cell;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::{Sender, channel};
|
||||
use task::TaskOnce;
|
||||
|
||||
pub type TrustedWorkerAddress = Trusted<Worker>;
|
||||
|
|
|
@ -36,12 +36,12 @@ use net_traits::request::{CredentialsMode, Destination, RequestInit as NetReques
|
|||
use script_runtime::{CommonScriptMsg, ScriptChan, ScriptPort, get_reports, Runtime};
|
||||
use script_traits::{TimerEvent, TimerEventId};
|
||||
use script_traits::WorkerGlobalScopeInit;
|
||||
use servo_channel::Receiver;
|
||||
use servo_url::{MutableOrigin, ServoUrl};
|
||||
use std::default::Default;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::Receiver;
|
||||
use task::TaskCanceller;
|
||||
use task_source::file_reading::FileReadingTaskSource;
|
||||
use task_source::networking::NetworkingTaskSource;
|
||||
|
|
|
@ -48,6 +48,7 @@ use script_runtime::Runtime;
|
|||
use script_runtime::ScriptThreadEventCategory;
|
||||
use script_runtime::new_rt_and_cx;
|
||||
use script_thread::{MainThreadScriptMsg, ScriptThread};
|
||||
use servo_channel::{channel, Sender, Receiver};
|
||||
use servo_rand;
|
||||
use servo_url::ImmutableOrigin;
|
||||
use servo_url::ServoUrl;
|
||||
|
@ -58,9 +59,6 @@ use std::rc::Rc;
|
|||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicIsize;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::mpsc;
|
||||
use std::sync::mpsc::Receiver;
|
||||
use std::sync::mpsc::Sender;
|
||||
use std::thread;
|
||||
use style::thread_state::{self, ThreadState};
|
||||
use swapper::Swapper;
|
||||
|
@ -309,7 +307,7 @@ impl WorkletThreadPool {
|
|||
|
||||
/// For testing.
|
||||
pub fn test_worklet_lookup(&self, id: WorkletId, key: String) -> Option<String> {
|
||||
let (sender, receiver) = mpsc::channel();
|
||||
let (sender, receiver) = channel();
|
||||
let msg = WorkletData::Task(id, WorkletTask::Test(TestWorkletTask::Lookup(key, sender)));
|
||||
let _ = self.primary_sender.send(msg);
|
||||
receiver.recv().expect("Test worklet has died?")
|
||||
|
@ -355,7 +353,7 @@ struct WorkletThreadRole {
|
|||
|
||||
impl WorkletThreadRole {
|
||||
fn new(is_hot_backup: bool, is_cold_backup: bool) -> WorkletThreadRole {
|
||||
let (sender, receiver) = mpsc::channel();
|
||||
let (sender, receiver) = channel();
|
||||
WorkletThreadRole {
|
||||
sender: sender,
|
||||
receiver: receiver,
|
||||
|
@ -419,7 +417,7 @@ impl WorkletThread {
|
|||
#[allow(unsafe_code)]
|
||||
#[allow(unrooted_must_root)]
|
||||
fn spawn(role: WorkletThreadRole, init: WorkletThreadInit) -> Sender<WorkletControl> {
|
||||
let (control_sender, control_receiver) = mpsc::channel();
|
||||
let (control_sender, control_receiver) = channel();
|
||||
// TODO: name this thread
|
||||
thread::spawn(move || {
|
||||
// TODO: add a new IN_WORKLET thread state?
|
||||
|
@ -488,12 +486,12 @@ impl WorkletThread {
|
|||
if let Some(control) = self.control_buffer.take() {
|
||||
self.process_control(control);
|
||||
}
|
||||
while let Ok(control) = self.control_receiver.try_recv() {
|
||||
while let Some(control) = self.control_receiver.try_recv() {
|
||||
self.process_control(control);
|
||||
}
|
||||
self.gc();
|
||||
} else if self.control_buffer.is_none() {
|
||||
if let Ok(control) = self.control_receiver.try_recv() {
|
||||
if let Some(control) = self.control_receiver.try_recv() {
|
||||
self.control_buffer = Some(control);
|
||||
let msg = WorkletData::StartSwapRoles(self.role.sender.clone());
|
||||
let _ = self.cold_backup_sender.send(msg);
|
||||
|
|
|
@ -26,11 +26,11 @@ use script_thread::MainThreadScriptMsg;
|
|||
use script_traits::{Painter, ScriptMsg};
|
||||
use script_traits::{ScriptToConstellationChan, TimerSchedulerMsg};
|
||||
use servo_atoms::Atom;
|
||||
use servo_channel::Sender;
|
||||
use servo_url::ImmutableOrigin;
|
||||
use servo_url::MutableOrigin;
|
||||
use servo_url::ServoUrl;
|
||||
use std::sync::Arc;
|
||||
use std::sync::mpsc::Sender;
|
||||
|
||||
#[dom_struct]
|
||||
/// <https://drafts.css-houdini.org/worklets/#workletglobalscope>
|
||||
|
|
|
@ -6,7 +6,6 @@
|
|||
#![cfg_attr(feature = "unstable", feature(on_unimplemented))]
|
||||
#![feature(const_fn)]
|
||||
#![feature(drain_filter)]
|
||||
#![feature(mpsc_select)]
|
||||
#![feature(plugin)]
|
||||
#![feature(try_from)]
|
||||
|
||||
|
|
|
@ -70,7 +70,6 @@ use hyper::header::ReferrerPolicy as ReferrerPolicyHeader;
|
|||
use hyper::mime::{Mime, SubLevel, TopLevel};
|
||||
use hyper_serde::Serde;
|
||||
use ipc_channel::ipc::{self, IpcSender};
|
||||
use ipc_channel::router::ROUTER;
|
||||
use js::glue::GetWindowProxyClass;
|
||||
use js::jsapi::{JSAutoCompartment, JSContext, JS_SetWrapObjectCallbacks};
|
||||
use js::jsapi::{JSTracer, SetWindowProxyClass};
|
||||
|
@ -101,6 +100,8 @@ use script_traits::CompositorEvent::{KeyEvent, MouseButtonEvent, MouseMoveEvent,
|
|||
use script_traits::webdriver_msg::WebDriverScriptCommand;
|
||||
use serviceworkerjob::{Job, JobQueue};
|
||||
use servo_atoms::Atom;
|
||||
use servo_channel::{channel, Receiver, Sender};
|
||||
use servo_channel::{route_ipc_receiver_to_new_servo_receiver, route_ipc_receiver_to_new_servo_sender};
|
||||
use servo_config::opts;
|
||||
use servo_url::{ImmutableOrigin, MutableOrigin, ServoUrl};
|
||||
use std::cell::Cell;
|
||||
|
@ -113,7 +114,6 @@ use std::ptr;
|
|||
use std::rc::Rc;
|
||||
use std::result::Result;
|
||||
use std::sync::Arc;
|
||||
use std::sync::mpsc::{Receiver, Select, Sender, channel};
|
||||
use std::thread;
|
||||
use style::thread_state::{self, ThreadState};
|
||||
use task_queue::{QueuedTask, QueuedTaskConversion, TaskQueue};
|
||||
|
@ -300,39 +300,39 @@ impl OpaqueSender<CommonScriptMsg> for Box<ScriptChan + Send> {
|
|||
|
||||
impl ScriptPort for Receiver<CommonScriptMsg> {
|
||||
fn recv(&self) -> Result<CommonScriptMsg, ()> {
|
||||
self.recv().map_err(|_| ())
|
||||
self.recv().ok_or(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ScriptPort for Receiver<MainThreadScriptMsg> {
|
||||
fn recv(&self) -> Result<CommonScriptMsg, ()> {
|
||||
match self.recv() {
|
||||
Ok(MainThreadScriptMsg::Common(script_msg)) => Ok(script_msg),
|
||||
Ok(_) => panic!("unexpected main thread event message!"),
|
||||
_ => Err(()),
|
||||
Some(MainThreadScriptMsg::Common(script_msg)) => Ok(script_msg),
|
||||
Some(_) => panic!("unexpected main thread event message!"),
|
||||
None => Err(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ScriptPort for Receiver<(TrustedWorkerAddress, CommonScriptMsg)> {
|
||||
fn recv(&self) -> Result<CommonScriptMsg, ()> {
|
||||
self.recv().map(|(_, msg)| msg).map_err(|_| ())
|
||||
self.recv().map(|(_, msg)| msg).ok_or(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ScriptPort for Receiver<(TrustedWorkerAddress, MainThreadScriptMsg)> {
|
||||
fn recv(&self) -> Result<CommonScriptMsg, ()> {
|
||||
match self.recv().map(|(_, msg)| msg) {
|
||||
Ok(MainThreadScriptMsg::Common(script_msg)) => Ok(script_msg),
|
||||
Ok(_) => panic!("unexpected main thread event message!"),
|
||||
_ => Err(()),
|
||||
Some(MainThreadScriptMsg::Common(script_msg)) => Ok(script_msg),
|
||||
Some(_) => panic!("unexpected main thread event message!"),
|
||||
None => Err(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ScriptPort for Receiver<(TrustedServiceWorkerAddress, CommonScriptMsg)> {
|
||||
fn recv(&self) -> Result<CommonScriptMsg, ()> {
|
||||
self.recv().map(|(_, msg)| msg).map_err(|_| ())
|
||||
self.recv().map(|(_, msg)| msg).ok_or(())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -896,12 +896,12 @@ impl ScriptThread {
|
|||
|
||||
// Ask the router to proxy IPC messages from the devtools to us.
|
||||
let (ipc_devtools_sender, ipc_devtools_receiver) = ipc::channel().unwrap();
|
||||
let devtools_port = ROUTER.route_ipc_receiver_to_new_mpsc_receiver(ipc_devtools_receiver);
|
||||
let devtools_port = route_ipc_receiver_to_new_servo_receiver(ipc_devtools_receiver);
|
||||
|
||||
let (timer_event_chan, timer_event_port) = channel();
|
||||
|
||||
// Ask the router to proxy IPC messages from the control port to us.
|
||||
let control_port = ROUTER.route_ipc_receiver_to_new_mpsc_receiver(state.control_port);
|
||||
let control_port = route_ipc_receiver_to_new_servo_receiver(state.control_port);
|
||||
|
||||
let boxed_script_sender = Box::new(MainThreadScriptChan(chan.clone()));
|
||||
|
||||
|
@ -1019,37 +1019,15 @@ impl ScriptThread {
|
|||
|
||||
// Receive at least one message so we don't spinloop.
|
||||
debug!("Waiting for event.");
|
||||
let mut event = {
|
||||
let sel = Select::new();
|
||||
let mut script_port = sel.handle(self.task_queue.select());
|
||||
let mut control_port = sel.handle(&self.control_port);
|
||||
let mut timer_event_port = sel.handle(&self.timer_event_port);
|
||||
let mut devtools_port = sel.handle(&self.devtools_port);
|
||||
let mut image_cache_port = sel.handle(&self.image_cache_port);
|
||||
unsafe {
|
||||
script_port.add();
|
||||
control_port.add();
|
||||
timer_event_port.add();
|
||||
if self.devtools_chan.is_some() {
|
||||
devtools_port.add();
|
||||
}
|
||||
image_cache_port.add();
|
||||
}
|
||||
let ret = sel.wait();
|
||||
if ret == script_port.id() {
|
||||
self.task_queue.take_tasks();
|
||||
let mut event = select! {
|
||||
recv(self.task_queue.select(), msg) => {
|
||||
self.task_queue.take_tasks(msg.unwrap());
|
||||
FromScript(self.task_queue.recv().unwrap())
|
||||
} else if ret == control_port.id() {
|
||||
FromConstellation(self.control_port.recv().unwrap())
|
||||
} else if ret == timer_event_port.id() {
|
||||
FromScheduler(self.timer_event_port.recv().unwrap())
|
||||
} else if ret == devtools_port.id() {
|
||||
FromDevtools(self.devtools_port.recv().unwrap())
|
||||
} else if ret == image_cache_port.id() {
|
||||
FromImageCache(self.image_cache_port.recv().unwrap())
|
||||
} else {
|
||||
panic!("unexpected select result")
|
||||
}
|
||||
},
|
||||
recv(self.control_port.select(), msg) => FromConstellation(msg.unwrap()),
|
||||
recv(self.timer_event_port.select(), msg) => FromScheduler(msg.unwrap()),
|
||||
recv(self.devtools_chan.as_ref().map(|_| self.devtools_port.select()), msg) => FromDevtools(msg.unwrap()),
|
||||
recv(self.image_cache_port.select(), msg) => FromImageCache(msg.unwrap()),
|
||||
};
|
||||
debug!("Got event.");
|
||||
|
||||
|
@ -1131,20 +1109,20 @@ impl ScriptThread {
|
|||
// and check for more resize events. If there are no events pending, we'll move
|
||||
// on and execute the sequential non-resize events we've seen.
|
||||
match self.control_port.try_recv() {
|
||||
Err(_) => match self.task_queue.try_recv() {
|
||||
Err(_) => match self.timer_event_port.try_recv() {
|
||||
Err(_) => match self.devtools_port.try_recv() {
|
||||
Err(_) => match self.image_cache_port.try_recv() {
|
||||
Err(_) => break,
|
||||
Ok(ev) => event = FromImageCache(ev),
|
||||
None => match self.task_queue.try_recv() {
|
||||
None => match self.timer_event_port.try_recv() {
|
||||
None => match self.devtools_port.try_recv() {
|
||||
None => match self.image_cache_port.try_recv() {
|
||||
None => break,
|
||||
Some(ev) => event = FromImageCache(ev),
|
||||
},
|
||||
Ok(ev) => event = FromDevtools(ev),
|
||||
Some(ev) => event = FromDevtools(ev),
|
||||
},
|
||||
Ok(ev) => event = FromScheduler(ev),
|
||||
Some(ev) => event = FromScheduler(ev),
|
||||
},
|
||||
Ok(ev) => event = FromScript(ev),
|
||||
Some(ev) => event = FromScript(ev),
|
||||
},
|
||||
Ok(ev) => event = FromConstellation(ev),
|
||||
Some(ev) => event = FromConstellation(ev),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2200,7 +2178,7 @@ impl ScriptThread {
|
|||
let HistoryTraversalTaskSource(ref history_sender) = self.history_traversal_task_source;
|
||||
|
||||
let (ipc_timer_event_chan, ipc_timer_event_port) = ipc::channel().unwrap();
|
||||
ROUTER.route_ipc_receiver_to_mpsc_sender(ipc_timer_event_port,
|
||||
route_ipc_receiver_to_new_servo_sender(ipc_timer_event_port,
|
||||
self.timer_event_chan.clone());
|
||||
|
||||
let origin = if final_url.as_str() == "about:blank" {
|
||||
|
|
|
@ -13,13 +13,12 @@ use dom::bindings::structuredclone::StructuredCloneData;
|
|||
use dom::serviceworkerglobalscope::{ServiceWorkerGlobalScope, ServiceWorkerScriptMsg};
|
||||
use dom::serviceworkerregistration::longest_prefix_match;
|
||||
use ipc_channel::ipc::{self, IpcSender};
|
||||
use ipc_channel::router::ROUTER;
|
||||
use net_traits::{CustomResponseMediator, CoreResourceMsg};
|
||||
use script_traits::{ServiceWorkerMsg, ScopeThings, SWManagerMsg, SWManagerSenders, DOMMessage};
|
||||
use servo_channel::{channel, route_ipc_receiver_to_new_servo_receiver, Sender, Receiver};
|
||||
use servo_config::prefs::PREFS;
|
||||
use servo_url::ServoUrl;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::mpsc::{channel, Sender, Receiver, RecvError};
|
||||
use std::thread;
|
||||
|
||||
enum Message {
|
||||
|
@ -56,8 +55,8 @@ impl ServiceWorkerManager {
|
|||
pub fn spawn_manager(sw_senders: SWManagerSenders) {
|
||||
let (own_sender, from_constellation_receiver) = ipc::channel().unwrap();
|
||||
let (resource_chan, resource_port) = ipc::channel().unwrap();
|
||||
let from_constellation = ROUTER.route_ipc_receiver_to_new_mpsc_receiver(from_constellation_receiver);
|
||||
let resource_port = ROUTER.route_ipc_receiver_to_new_mpsc_receiver(resource_port);
|
||||
let from_constellation = route_ipc_receiver_to_new_servo_receiver(from_constellation_receiver);
|
||||
let resource_port = route_ipc_receiver_to_new_servo_receiver(resource_port);
|
||||
let _ = sw_senders.resource_sender.send(CoreResourceMsg::NetworkMediator(resource_chan));
|
||||
let _ = sw_senders.swmanager_sender.send(SWManagerMsg::OwnSender(own_sender.clone()));
|
||||
thread::Builder::new().name("ServiceWorkerManager".to_owned()).spawn(move || {
|
||||
|
@ -108,7 +107,7 @@ impl ServiceWorkerManager {
|
|||
}
|
||||
|
||||
fn handle_message(&mut self) {
|
||||
while let Ok(message) = self.receive_message() {
|
||||
while let Some(message) = self.receive_message() {
|
||||
let should_continue = match message {
|
||||
Message::FromConstellation(msg) => {
|
||||
self.handle_message_from_constellation(msg)
|
||||
|
@ -184,13 +183,10 @@ impl ServiceWorkerManager {
|
|||
true
|
||||
}
|
||||
|
||||
#[allow(unsafe_code)]
|
||||
fn receive_message(&mut self) -> Result<Message, RecvError> {
|
||||
let msg_from_constellation = &self.own_port;
|
||||
let msg_from_resource = &self.resource_receiver;
|
||||
fn receive_message(&mut self) -> Option<Message> {
|
||||
select! {
|
||||
msg = msg_from_constellation.recv() => msg.map(Message::FromConstellation),
|
||||
msg = msg_from_resource.recv() => msg.map(Message::FromResource)
|
||||
recv(self.own_port.select(), msg) => msg.map(Message::FromConstellation),
|
||||
recv(self.resource_receiver.select(), msg) => msg.map(Message::FromResource),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,10 +8,10 @@ use dom::bindings::cell::DomRefCell;
|
|||
use dom::worker::TrustedWorkerAddress;
|
||||
use msg::constellation_msg::PipelineId;
|
||||
use script_runtime::ScriptThreadEventCategory;
|
||||
use servo_channel::{Receiver, Sender, base_channel};
|
||||
use std::cell::Cell;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::default::Default;
|
||||
use std::sync::mpsc::{Receiver, Sender};
|
||||
use task::TaskBox;
|
||||
use task_source::TaskSourceName;
|
||||
|
||||
|
@ -59,13 +59,18 @@ impl<T: QueuedTaskConversion> TaskQueue<T> {
|
|||
|
||||
/// Process incoming tasks, immediately sending priority ones downstream,
|
||||
/// and categorizing potential throttles.
|
||||
fn process_incoming_tasks(&self) {
|
||||
let mut non_throttled: Vec<T> = self.port
|
||||
.try_iter()
|
||||
.filter(|msg| !msg.is_wake_up())
|
||||
.collect();
|
||||
fn process_incoming_tasks(&self, first_msg: T) {
|
||||
let mut incoming = Vec::with_capacity(self.port.len() + 1);
|
||||
if !first_msg.is_wake_up() {
|
||||
incoming.push(first_msg);
|
||||
}
|
||||
while let Some(msg) = self.port.try_recv() {
|
||||
if !msg.is_wake_up() {
|
||||
incoming.push(msg);
|
||||
}
|
||||
}
|
||||
|
||||
let to_be_throttled: Vec<T> = non_throttled.drain_filter(|msg|{
|
||||
let to_be_throttled: Vec<T> = incoming.drain_filter(|msg|{
|
||||
let task_source = match msg.task_source_name() {
|
||||
Some(task_source) => task_source,
|
||||
None => return false,
|
||||
|
@ -80,7 +85,7 @@ impl<T: QueuedTaskConversion> TaskQueue<T> {
|
|||
}
|
||||
}).collect();
|
||||
|
||||
for msg in non_throttled {
|
||||
for msg in incoming {
|
||||
// Immediately send non-throttled tasks for processing.
|
||||
let _ = self.msg_queue.borrow_mut().push_back(msg);
|
||||
}
|
||||
|
@ -101,31 +106,31 @@ impl<T: QueuedTaskConversion> TaskQueue<T> {
|
|||
|
||||
/// Reset the queue for a new iteration of the event-loop,
|
||||
/// returning the port about whose readiness we want to be notified.
|
||||
pub fn select(&self) -> &Receiver<T> {
|
||||
pub fn select(&self) -> &base_channel::Receiver<T> {
|
||||
// This is a new iteration of the event-loop, so we reset the "business" counter.
|
||||
self.taken_task_counter.set(0);
|
||||
// We want to be notified when the script-port is ready to receive.
|
||||
// Hence that's the one we need to include in the select.
|
||||
&self.port
|
||||
self.port.select()
|
||||
}
|
||||
|
||||
/// Take a message from the front of the queue, without waiting if empty.
|
||||
pub fn recv(&self) -> Result<T, ()> {
|
||||
self.msg_queue.borrow_mut().pop_front().ok_or(())
|
||||
pub fn recv(&self) -> Option<T> {
|
||||
self.msg_queue.borrow_mut().pop_front()
|
||||
}
|
||||
|
||||
/// Same as recv.
|
||||
pub fn try_recv(&self) -> Result<T, ()> {
|
||||
pub fn try_recv(&self) -> Option<T> {
|
||||
self.recv()
|
||||
}
|
||||
|
||||
/// Drain the queue for the current iteration of the event-loop.
|
||||
/// Holding-back throttles above a given high-water mark.
|
||||
pub fn take_tasks(&self) {
|
||||
pub fn take_tasks(&self, first_msg: T) {
|
||||
// High-watermark: once reached, throttled tasks will be held-back.
|
||||
const PER_ITERATION_MAX: u64 = 5;
|
||||
// Always first check for new tasks, but don't reset 'taken_task_counter'.
|
||||
self.process_incoming_tasks();
|
||||
self.process_incoming_tasks(first_msg);
|
||||
let mut throttled = self.throttled.borrow_mut();
|
||||
let mut throttled_length: usize = throttled.values().map(|queue| queue.len()).sum();
|
||||
let task_source_names = TaskSourceName::all();
|
||||
|
|
|
@ -11,9 +11,9 @@ use msg::constellation_msg::PipelineId;
|
|||
use script_runtime::{CommonScriptMsg, ScriptThreadEventCategory};
|
||||
use script_thread::MainThreadScriptMsg;
|
||||
use servo_atoms::Atom;
|
||||
use servo_channel::Sender;
|
||||
use std::fmt;
|
||||
use std::result::Result;
|
||||
use std::sync::mpsc::Sender;
|
||||
use task::{TaskCanceller, TaskOnce};
|
||||
use task_source::{TaskSource, TaskSourceName};
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
use script_runtime::{ScriptChan, CommonScriptMsg};
|
||||
use script_thread::MainThreadScriptMsg;
|
||||
use std::sync::mpsc::Sender;
|
||||
use servo_channel::Sender;
|
||||
|
||||
#[derive(JSTraceable)]
|
||||
pub struct HistoryTraversalTaskSource(pub Sender<MainThreadScriptMsg>);
|
||||
|
|
|
@ -11,9 +11,9 @@ use msg::constellation_msg::PipelineId;
|
|||
use script_runtime::{CommonScriptMsg, ScriptThreadEventCategory};
|
||||
use script_thread::MainThreadScriptMsg;
|
||||
use servo_atoms::Atom;
|
||||
use servo_channel::Sender;
|
||||
use std::fmt;
|
||||
use std::result::Result;
|
||||
use std::sync::mpsc::Sender;
|
||||
use task::{TaskCanceller, TaskOnce};
|
||||
use task_source::{TaskSource, TaskSourceName};
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue