Update crossbeam-channel to 0.3

This commit is contained in:
Bastien Orivel 2018-11-07 19:48:07 +01:00
parent 76195e0779
commit 9a7eeb349a
74 changed files with 303 additions and 521 deletions

View file

@ -42,6 +42,7 @@ canvas_traits = {path = "../canvas_traits"}
caseless = "0.2"
cookie = "0.11"
chrono = "0.4"
crossbeam-channel = "0.3"
cssparser = "0.25"
deny_public_fields = {path = "../deny_public_fields"}
devtools_traits = {path = "../devtools_traits"}
@ -94,7 +95,6 @@ serde_bytes = "0.10"
servo_allocator = {path = "../allocator"}
servo_arc = {path = "../servo_arc"}
servo_atoms = {path = "../atoms"}
servo_channel = {path = "../channel"}
servo_config = {path = "../config"}
servo_geometry = {path = "../geometry" }
servo-media = {git = "https://github.com/servo/media"}

View file

@ -11,8 +11,8 @@ use crate::dom::worker::TrustedWorkerAddress;
use crate::dom::workerglobalscope::WorkerGlobalScope;
use crate::script_runtime::{CommonScriptMsg, ScriptChan, ScriptPort};
use crate::task_queue::{QueuedTaskConversion, TaskQueue};
use crossbeam_channel::{Receiver, Sender};
use devtools_traits::DevtoolScriptControlMsg;
use servo_channel::{Receiver, Sender};
/// A ScriptChan that can be cloned freely and will silently send a TrustedWorkerAddress with
/// common event loop messages. While this SendableWorkerScriptChan is alive, the associated
@ -69,9 +69,9 @@ impl ScriptChan for WorkerThreadWorkerChan {
impl ScriptPort for Receiver<DedicatedWorkerScriptMsg> {
fn recv(&self) -> Result<CommonScriptMsg, ()> {
let common_msg = match self.recv() {
Some(DedicatedWorkerScriptMsg::CommonWorker(_worker, common_msg)) => common_msg,
None => return Err(()),
Some(DedicatedWorkerScriptMsg::WakeUp) => panic!("unexpected worker event message!"),
Ok(DedicatedWorkerScriptMsg::CommonWorker(_worker, common_msg)) => common_msg,
Err(_) => return Err(()),
Ok(DedicatedWorkerScriptMsg::WakeUp) => panic!("unexpected worker event message!"),
};
match common_msg {
WorkerScriptMsg::Common(script_msg) => Ok(script_msg),
@ -108,17 +108,18 @@ pub fn run_worker_event_loop<T, TimerMsg, WorkerMsg, Event>(
let scope = worker_scope.upcast::<WorkerGlobalScope>();
let timer_event_port = worker_scope.timer_event_port();
let devtools_port = match scope.from_devtools_sender() {
Some(_) => Some(scope.from_devtools_receiver().select()),
Some(_) => Some(scope.from_devtools_receiver()),
None => None,
};
let task_queue = worker_scope.task_queue();
let event = select! {
recv(task_queue.select(), msg) => {
recv(task_queue.select()) -> msg => {
task_queue.take_tasks(msg.unwrap());
worker_scope.from_worker_msg(task_queue.recv().unwrap())
},
recv(timer_event_port.select(), msg) => worker_scope.from_timer_msg(msg.unwrap()),
recv(devtools_port, msg) => worker_scope.from_devtools_msg(msg.unwrap()),
recv(timer_event_port) -> msg => worker_scope.from_timer_msg(msg.unwrap()),
recv(devtools_port.unwrap_or(&crossbeam_channel::never())) -> msg =>
worker_scope.from_devtools_msg(msg.unwrap()),
};
let mut sequential = vec![];
sequential.push(event);
@ -131,14 +132,15 @@ pub fn run_worker_event_loop<T, TimerMsg, WorkerMsg, Event>(
// Batch all events that are ready.
// The task queue will throttle non-priority tasks if necessary.
match task_queue.try_recv() {
None => match timer_event_port.try_recv() {
None => match devtools_port.and_then(|port| port.try_recv()) {
None => break,
Some(ev) => sequential.push(worker_scope.from_devtools_msg(ev)),
Err(_) => match timer_event_port.try_recv() {
Err(_) => match devtools_port.map(|port| port.try_recv()) {
None => {},
Some(Err(_)) => break,
Some(Ok(ev)) => sequential.push(worker_scope.from_devtools_msg(ev)),
},
Some(ev) => sequential.push(worker_scope.from_timer_msg(ev)),
Ok(ev) => sequential.push(worker_scope.from_timer_msg(ev)),
},
Some(ev) => sequential.push(worker_scope.from_worker_msg(ev)),
Ok(ev) => sequential.push(worker_scope.from_worker_msg(ev)),
}
}
// Step 3

View file

@ -49,6 +49,7 @@ use crate::dom::bindings::utils::WindowProxyHandler;
use crate::dom::document::PendingRestyle;
use crate::dom::htmlimageelement::SourceSet;
use crate::dom::htmlmediaelement::MediaFrameRenderer;
use crossbeam_channel::{Receiver, Sender};
use cssparser::RGBA;
use devtools_traits::{CSSError, TimelineMarkerType, WorkerId};
use encoding_rs::{Decoder, Encoding};
@ -93,7 +94,6 @@ use selectors::matching::ElementSelectorFlags;
use serde::{Deserialize, Serialize};
use servo_arc::Arc as ServoArc;
use servo_atoms::Atom;
use servo_channel::{Receiver, Sender};
use servo_media::audio::analyser_node::AnalysisEngine;
use servo_media::audio::buffer_source_node::AudioBuffer;
use servo_media::audio::context::AudioContext;

View file

@ -26,6 +26,7 @@ use crate::script_runtime::ScriptThreadEventCategory::WorkerEvent;
use crate::script_runtime::{new_rt_and_cx, CommonScriptMsg, Runtime, ScriptChan, ScriptPort};
use crate::task_queue::{QueuedTask, QueuedTaskConversion, TaskQueue};
use crate::task_source::TaskSourceName;
use crossbeam_channel::{unbounded, Receiver, Sender};
use devtools_traits::DevtoolScriptControlMsg;
use dom_struct::dom_struct;
use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
@ -38,7 +39,6 @@ use msg::constellation_msg::TopLevelBrowsingContextId;
use net_traits::request::{CredentialsMode, Destination, RequestInit};
use net_traits::{load_whole_resource, IpcSend};
use script_traits::{TimerEvent, TimerSource, WorkerGlobalScopeInit, WorkerScriptLoadOrigin};
use servo_channel::{channel, route_ipc_receiver_to_new_servo_sender, Receiver, Sender};
use servo_rand::random;
use servo_url::ServoUrl;
use std::mem::replace;
@ -329,10 +329,13 @@ impl DedicatedWorkerGlobalScope {
let runtime = unsafe { new_rt_and_cx() };
let (devtools_mpsc_chan, devtools_mpsc_port) = channel();
route_ipc_receiver_to_new_servo_sender(from_devtools_receiver, devtools_mpsc_chan);
let (devtools_mpsc_chan, devtools_mpsc_port) = unbounded();
ROUTER.route_ipc_receiver_to_crossbeam_sender(
from_devtools_receiver,
devtools_mpsc_chan,
);
let (timer_tx, timer_rx) = channel();
let (timer_tx, timer_rx) = unbounded();
let (timer_ipc_chan, timer_ipc_port) = ipc::channel().unwrap();
let worker_for_route = worker.clone();
ROUTER.add_route(
@ -404,7 +407,7 @@ impl DedicatedWorkerGlobalScope {
}
pub fn new_script_pair(&self) -> (Box<dyn ScriptChan + Send>, Box<dyn ScriptPort + Send>) {
let (tx, rx) = channel();
let (tx, rx) = unbounded();
let chan = Box::new(SendableWorkerScriptChan {
sender: tx,
worker: self.worker.borrow().as_ref().unwrap().clone(),

View file

@ -23,6 +23,7 @@ use crate::dom::worklet::WorkletExecutor;
use crate::dom::workletglobalscope::WorkletGlobalScope;
use crate::dom::workletglobalscope::WorkletGlobalScopeInit;
use crate::dom::workletglobalscope::WorkletTask;
use crossbeam_channel::{unbounded, Sender};
use dom_struct::dom_struct;
use euclid::TypedScale;
use euclid::TypedSize2D;
@ -49,8 +50,6 @@ use profile_traits::ipc;
use script_traits::Painter;
use script_traits::{DrawAPaintImageResult, PaintWorkletError};
use servo_atoms::Atom;
use servo_channel::base_channel;
use servo_channel::{channel, Sender};
use servo_config::prefs::PREFS;
use servo_url::ServoUrl;
use std::cell::Cell;
@ -426,7 +425,7 @@ impl PaintWorkletGlobalScope {
arguments: Vec<String>,
) -> Result<DrawAPaintImageResult, PaintWorkletError> {
let name = self.name.clone();
let (sender, receiver) = channel();
let (sender, receiver) = unbounded();
let task = PaintWorkletTask::DrawAPaintImage(
name,
size,
@ -445,12 +444,9 @@ impl PaintWorkletGlobalScope {
.as_u64()
.unwrap_or(10u64);
select! {
recv(base_channel::after(Duration::from_millis(timeout))) => {
Err(PaintWorkletError::Timeout)
}
recv(receiver.select(), msg) => msg.ok_or(PaintWorkletError::Timeout)
}
receiver
.recv_timeout(Duration::from_millis(timeout))
.map_err(|e| PaintWorkletError::from(e))
}
}
Box::new(WorkletPainter {

View file

@ -22,9 +22,11 @@ use crate::dom::workerglobalscope::WorkerGlobalScope;
use crate::script_runtime::{new_rt_and_cx, CommonScriptMsg, Runtime, ScriptChan};
use crate::task_queue::{QueuedTask, QueuedTaskConversion, TaskQueue};
use crate::task_source::TaskSourceName;
use crossbeam_channel::{unbounded, Receiver, Sender};
use devtools_traits::DevtoolScriptControlMsg;
use dom_struct::dom_struct;
use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
use ipc_channel::router::ROUTER;
use js::jsapi::{JSAutoCompartment, JSContext, JS_AddInterruptCallback};
use js::jsval::UndefinedValue;
use net_traits::request::{CredentialsMode, Destination, RequestInit};
@ -32,7 +34,6 @@ use net_traits::{load_whole_resource, CustomResponseMediator, IpcSend};
use script_traits::{
ScopeThings, ServiceWorkerMsg, TimerEvent, WorkerGlobalScopeInit, WorkerScriptLoadOrigin,
};
use servo_channel::{channel, route_ipc_receiver_to_new_servo_sender, Receiver, Sender};
use servo_config::prefs::PREFS;
use servo_rand::random;
use servo_url::ServoUrl;
@ -293,11 +294,12 @@ impl ServiceWorkerGlobalScope {
let runtime = unsafe { new_rt_and_cx() };
let (devtools_mpsc_chan, devtools_mpsc_port) = channel();
route_ipc_receiver_to_new_servo_sender(devtools_receiver, devtools_mpsc_chan);
let (devtools_mpsc_chan, devtools_mpsc_port) = unbounded();
ROUTER
.route_ipc_receiver_to_crossbeam_sender(devtools_receiver, devtools_mpsc_chan);
// TODO XXXcreativcoder use this timer_ipc_port, when we have a service worker instance here
let (timer_ipc_chan, _timer_ipc_port) = ipc::channel().unwrap();
let (timer_chan, timer_port) = channel();
let (timer_chan, timer_port) = unbounded();
let global = ServiceWorkerGlobalScope::new(
init,
url,

View file

@ -20,6 +20,7 @@ use crate::dom::node::Node;
use crate::dom::processinginstruction::ProcessingInstruction;
use crate::dom::servoparser::{create_element_for_token, ElementAttribute, ParsingAlgorithm};
use crate::dom::virtualmethods::vtable_for;
use crossbeam_channel::{unbounded, Receiver, Sender};
use html5ever::buffer_queue::BufferQueue;
use html5ever::tendril::fmt::UTF8;
use html5ever::tendril::{SendTendril, StrTendril, Tendril};
@ -29,7 +30,6 @@ use html5ever::tree_builder::{
};
use html5ever::tree_builder::{TreeBuilder, TreeBuilderOpts};
use html5ever::{Attribute as HtmlAttribute, ExpandedName, QualName};
use servo_channel::{channel, Receiver, Sender};
use servo_url::ServoUrl;
use std::borrow::Cow;
use std::cell::Cell;
@ -215,9 +215,9 @@ impl Tokenizer {
fragment_context: Option<super::FragmentContext>,
) -> Self {
// Messages from the Tokenizer (main thread) to HtmlTokenizer (parser thread)
let (to_html_tokenizer_sender, html_tokenizer_receiver) = channel();
let (to_html_tokenizer_sender, html_tokenizer_receiver) = unbounded();
// Messages from HtmlTokenizer and Sink (parser thread) to Tokenizer (main thread)
let (to_tokenizer_sender, tokenizer_receiver) = channel();
let (to_tokenizer_sender, tokenizer_receiver) = unbounded();
let mut tokenizer = Tokenizer {
document: Dom::from_ref(document),

View file

@ -10,10 +10,10 @@ use crate::dom::bindings::str::DOMString;
use crate::dom::worklet::WorkletExecutor;
use crate::dom::workletglobalscope::WorkletGlobalScope;
use crate::dom::workletglobalscope::WorkletGlobalScopeInit;
use crossbeam_channel::Sender;
use dom_struct::dom_struct;
use js::rust::Runtime;
use msg::constellation_msg::PipelineId;
use servo_channel::Sender;
use servo_url::ServoUrl;
use std::collections::HashMap;

View file

@ -33,11 +33,11 @@ use crate::dom::webglrenderingcontext::WebGLRenderingContext;
use crate::script_runtime::CommonScriptMsg;
use crate::script_runtime::ScriptThreadEventCategory::WebVREvent;
use crate::task_source::TaskSourceName;
use crossbeam_channel::{unbounded, Sender};
use dom_struct::dom_struct;
use ipc_channel::ipc::IpcSender;
use profile_traits::ipc;
use serde_bytes::ByteBuf;
use servo_channel::{channel, Sender};
use std::cell::Cell;
use std::mem;
use std::ops::Deref;
@ -538,7 +538,7 @@ impl VRDisplay {
thread::Builder::new()
.name("WebVR_RAF".into())
.spawn(move || {
let (raf_sender, raf_receiver) = channel();
let (raf_sender, raf_receiver) = unbounded();
let mut near = near_init;
let mut far = far_init;

View file

@ -68,6 +68,7 @@ use crate::task_manager::TaskManager;
use crate::task_source::TaskSourceName;
use crate::timers::{IsInterval, TimerCallback};
use crate::webdriver_handlers::jsval_to_webdriver;
use crossbeam_channel::{unbounded, Sender, TryRecvError};
use cssparser::{Parser, ParserInput};
use devtools_traits::{ScriptToDevtoolsControlMsg, TimelineMarker, TimelineMarkerType};
use dom_struct::dom_struct;
@ -104,7 +105,6 @@ use script_traits::{ConstellationControlMsg, DocumentState, LoadData};
use script_traits::{ScriptMsg, ScriptToConstellationChan, ScrollState, TimerEvent, TimerEventId};
use script_traits::{TimerSchedulerMsg, UntrustedNodeAddress, WindowSizeData, WindowSizeType};
use selectors::attr::CaseSensitivity;
use servo_channel::{channel, Sender};
use servo_config::opts;
use servo_geometry::{f32_rect_to_au_rect, MaxRect};
use servo_url::{Host, ImmutableOrigin, MutableOrigin, ServoUrl};
@ -348,7 +348,7 @@ impl Window {
}
pub fn new_script_pair(&self) -> (Box<dyn ScriptChan + Send>, Box<dyn ScriptPort + Send>) {
let (tx, rx) = channel();
let (tx, rx) = unbounded();
(Box::new(SendableMainThreadScriptChan(tx)), Box::new(rx))
}
@ -1394,7 +1394,7 @@ impl Window {
};
// Layout will let us know when it's done.
let (join_chan, join_port) = channel();
let (join_chan, join_port) = unbounded();
// On debug mode, print the reflow event information.
if opts::get().relayout_event {
@ -1427,16 +1427,15 @@ impl Window {
debug!("script: layout forked");
let complete = select! {
recv(join_port.select(), msg) => if let Some(reflow_complete) = msg {
reflow_complete
} else {
panic!("Layout thread failed while script was waiting for a result.");
},
default => {
let complete = match join_port.try_recv() {
Err(TryRecvError::Empty) => {
info!("script: waiting on layout");
join_port.recv().unwrap()
}
},
Ok(reflow_complete) => reflow_complete,
Err(TryRecvError::Disconnected) => {
panic!("Layout thread failed while script was waiting for a result.");
},
};
debug!("script: layout joined");
@ -2033,7 +2032,7 @@ impl Window {
webrender_api_sender: RenderApiSender,
) -> DomRoot<Self> {
let layout_rpc: Box<dyn LayoutRPC + Send> = {
let (rpc_send, rpc_recv) = channel();
let (rpc_send, rpc_recv) = unbounded();
layout_chan.send(Msg::GetRPC(rpc_send)).unwrap();
rpc_recv.recv().unwrap()
};

View file

@ -21,6 +21,7 @@ use crate::dom::globalscope::GlobalScope;
use crate::dom::messageevent::MessageEvent;
use crate::dom::workerglobalscope::prepare_workerscope_init;
use crate::task::TaskOnce;
use crossbeam_channel::{unbounded, Sender};
use devtools_traits::{DevtoolsPageInfo, ScriptToDevtoolsControlMsg};
use dom_struct::dom_struct;
use ipc_channel::ipc;
@ -28,7 +29,6 @@ use js::jsapi::{JSAutoCompartment, JSContext, JS_RequestInterruptCallback};
use js::jsval::UndefinedValue;
use js::rust::HandleValue;
use script_traits::WorkerScriptLoadOrigin;
use servo_channel::{channel, Sender};
use std::cell::Cell;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@ -79,7 +79,7 @@ impl Worker {
Err(_) => return Err(Error::Syntax),
};
let (sender, receiver) = channel();
let (sender, receiver) = unbounded();
let closing = Arc::new(AtomicBool::new(false));
let worker = Worker::new(global, sender.clone(), closing.clone());
global.track_worker(closing.clone());

View file

@ -32,6 +32,7 @@ use crate::task_source::performance_timeline::PerformanceTimelineTaskSource;
use crate::task_source::remote_event::RemoteEventTaskSource;
use crate::task_source::websocket::WebsocketTaskSource;
use crate::timers::{IsInterval, TimerCallback};
use crossbeam_channel::Receiver;
use devtools_traits::{DevtoolScriptControlMsg, WorkerId};
use dom_struct::dom_struct;
use ipc_channel::ipc::IpcSender;
@ -44,7 +45,6 @@ use net_traits::request::{CredentialsMode, Destination, RequestInit as NetReques
use net_traits::{load_whole_resource, IpcSend};
use script_traits::WorkerGlobalScopeInit;
use script_traits::{TimerEvent, TimerEventId};
use servo_channel::Receiver;
use servo_url::{MutableOrigin, ServoUrl};
use std::default::Default;
use std::rc::Rc;

View file

@ -39,6 +39,7 @@ use crate::script_runtime::ScriptThreadEventCategory;
use crate::script_thread::{MainThreadScriptMsg, ScriptThread};
use crate::task::TaskBox;
use crate::task_source::TaskSourceName;
use crossbeam_channel::{unbounded, Receiver, Sender};
use dom_struct::dom_struct;
use js::jsapi::JSGCParamKey;
use js::jsapi::JSTracer;
@ -50,7 +51,6 @@ use net_traits::request::Destination;
use net_traits::request::RequestInit;
use net_traits::request::RequestMode;
use net_traits::IpcSend;
use servo_channel::{channel, Receiver, Sender};
use servo_url::ImmutableOrigin;
use servo_url::ServoUrl;
use std::cmp::max;
@ -334,7 +334,7 @@ impl WorkletThreadPool {
/// For testing.
pub fn test_worklet_lookup(&self, id: WorkletId, key: String) -> Option<String> {
let (sender, receiver) = channel();
let (sender, receiver) = unbounded();
let msg = WorkletData::Task(id, WorkletTask::Test(TestWorkletTask::Lookup(key, sender)));
let _ = self.primary_sender.send(msg);
receiver.recv().expect("Test worklet has died?")
@ -388,7 +388,7 @@ struct WorkletThreadRole {
impl WorkletThreadRole {
fn new(is_hot_backup: bool, is_cold_backup: bool) -> WorkletThreadRole {
let (sender, receiver) = channel();
let (sender, receiver) = unbounded();
WorkletThreadRole {
sender: sender,
receiver: receiver,
@ -452,7 +452,7 @@ impl WorkletThread {
#[allow(unsafe_code)]
#[allow(unrooted_must_root)]
fn spawn(role: WorkletThreadRole, init: WorkletThreadInit) -> Sender<WorkletControl> {
let (control_sender, control_receiver) = channel();
let (control_sender, control_receiver) = unbounded();
// TODO: name this thread
thread::spawn(move || {
// TODO: add a new IN_WORKLET thread state?
@ -522,12 +522,12 @@ impl WorkletThread {
if let Some(control) = self.control_buffer.take() {
self.process_control(control);
}
while let Some(control) = self.control_receiver.try_recv() {
while let Ok(control) = self.control_receiver.try_recv() {
self.process_control(control);
}
self.gc();
} else if self.control_buffer.is_none() {
if let Some(control) = self.control_receiver.try_recv() {
if let Ok(control) = self.control_receiver.try_recv() {
self.control_buffer = Some(control);
let msg = WorkletData::StartSwapRoles(self.role.sender.clone());
let _ = self.cold_backup_sender.send(msg);

View file

@ -11,6 +11,7 @@ use crate::dom::testworkletglobalscope::TestWorkletGlobalScope;
use crate::dom::testworkletglobalscope::TestWorkletTask;
use crate::dom::worklet::WorkletExecutor;
use crate::script_thread::MainThreadScriptMsg;
use crossbeam_channel::Sender;
use devtools_traits::ScriptToDevtoolsControlMsg;
use dom_struct::dom_struct;
use ipc_channel::ipc;
@ -26,7 +27,6 @@ use profile_traits::time;
use script_traits::{Painter, ScriptMsg};
use script_traits::{ScriptToConstellationChan, TimerSchedulerMsg};
use servo_atoms::Atom;
use servo_channel::Sender;
use servo_url::ImmutableOrigin;
use servo_url::MutableOrigin;
use servo_url::ServoUrl;

View file

@ -20,6 +20,8 @@
#[macro_use]
extern crate bitflags;
#[macro_use]
extern crate crossbeam_channel;
#[macro_use]
extern crate cssparser;
#[macro_use]
extern crate deny_public_fields;
@ -46,8 +48,6 @@ extern crate profile_traits;
#[macro_use]
extern crate servo_atoms;
#[macro_use]
extern crate servo_channel;
#[macro_use]
extern crate style;
#[macro_use]

View file

@ -83,6 +83,7 @@ use crate::task_source::user_interaction::UserInteractionTaskSource;
use crate::task_source::websocket::WebsocketTaskSource;
use crate::task_source::TaskSourceName;
use crate::webdriver_handlers;
use crossbeam_channel::{unbounded, Receiver, Sender};
use devtools_traits::CSSError;
use devtools_traits::{DevtoolScriptControlMsg, DevtoolsPageInfo};
use devtools_traits::{ScriptToDevtoolsControlMsg, WorkerId};
@ -93,6 +94,7 @@ use headers_ext::LastModified;
use headers_ext::ReferrerPolicy as ReferrerPolicyHeader;
use hyper_serde::Serde;
use ipc_channel::ipc::{self, IpcSender};
use ipc_channel::router::ROUTER;
use js::glue::GetWindowProxyClass;
use js::jsapi::{JSAutoCompartment, JSContext, JS_SetWrapObjectCallbacks};
use js::jsapi::{JSTracer, SetWindowProxyClass};
@ -122,10 +124,6 @@ use script_traits::{ScriptToConstellationChan, TimerEvent, TimerSchedulerMsg};
use script_traits::{TimerSource, TouchEventType, TouchId, UntrustedNodeAddress};
use script_traits::{UpdatePipelineIdReason, WindowSizeData, WindowSizeType};
use servo_atoms::Atom;
use servo_channel::{channel, Receiver, Sender};
use servo_channel::{
route_ipc_receiver_to_new_servo_receiver, route_ipc_receiver_to_new_servo_sender,
};
use servo_config::opts;
use servo_url::{ImmutableOrigin, MutableOrigin, ServoUrl};
use std::cell::Cell;
@ -321,39 +319,39 @@ impl OpaqueSender<CommonScriptMsg> for Box<dyn ScriptChan + Send> {
impl ScriptPort for Receiver<CommonScriptMsg> {
fn recv(&self) -> Result<CommonScriptMsg, ()> {
self.recv().ok_or(())
self.recv().map_err(|_| ())
}
}
impl ScriptPort for Receiver<MainThreadScriptMsg> {
fn recv(&self) -> Result<CommonScriptMsg, ()> {
match self.recv() {
Some(MainThreadScriptMsg::Common(script_msg)) => Ok(script_msg),
Some(_) => panic!("unexpected main thread event message!"),
None => Err(()),
Ok(MainThreadScriptMsg::Common(script_msg)) => Ok(script_msg),
Ok(_) => panic!("unexpected main thread event message!"),
Err(_) => Err(()),
}
}
}
impl ScriptPort for Receiver<(TrustedWorkerAddress, CommonScriptMsg)> {
fn recv(&self) -> Result<CommonScriptMsg, ()> {
self.recv().map(|(_, msg)| msg).ok_or(())
self.recv().map(|(_, msg)| msg).map_err(|_| ())
}
}
impl ScriptPort for Receiver<(TrustedWorkerAddress, MainThreadScriptMsg)> {
fn recv(&self) -> Result<CommonScriptMsg, ()> {
match self.recv().map(|(_, msg)| msg) {
Some(MainThreadScriptMsg::Common(script_msg)) => Ok(script_msg),
Some(_) => panic!("unexpected main thread event message!"),
None => Err(()),
Ok(MainThreadScriptMsg::Common(script_msg)) => Ok(script_msg),
Ok(_) => panic!("unexpected main thread event message!"),
Err(_) => Err(()),
}
}
}
impl ScriptPort for Receiver<(TrustedServiceWorkerAddress, CommonScriptMsg)> {
fn recv(&self) -> Result<CommonScriptMsg, ()> {
self.recv().map(|(_, msg)| msg).ok_or(())
self.recv().map(|(_, msg)| msg).map_err(|_| ())
}
}
@ -648,9 +646,9 @@ impl ScriptThreadFactory for ScriptThread {
state: InitialScriptState,
load_data: LoadData,
) -> (Sender<message::Msg>, Receiver<message::Msg>) {
let (script_chan, script_port) = channel();
let (script_chan, script_port) = unbounded();
let (sender, receiver) = channel();
let (sender, receiver) = unbounded();
let layout_chan = sender.clone();
thread::Builder::new()
.name(format!("ScriptThread {:?}", state.id))
@ -1004,16 +1002,17 @@ impl ScriptThread {
// Ask the router to proxy IPC messages from the devtools to us.
let (ipc_devtools_sender, ipc_devtools_receiver) = ipc::channel().unwrap();
let devtools_port = route_ipc_receiver_to_new_servo_receiver(ipc_devtools_receiver);
let devtools_port =
ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(ipc_devtools_receiver);
let (timer_event_chan, timer_event_port) = channel();
let (timer_event_chan, timer_event_port) = unbounded();
// Ask the router to proxy IPC messages from the control port to us.
let control_port = route_ipc_receiver_to_new_servo_receiver(state.control_port);
let control_port = ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(state.control_port);
let boxed_script_sender = Box::new(MainThreadScriptChan(chan.clone()));
let (image_cache_channel, image_cache_port) = channel();
let (image_cache_channel, image_cache_port) = unbounded();
let task_queue = TaskQueue::new(port, chan.clone());
@ -1130,14 +1129,15 @@ impl ScriptThread {
// Receive at least one message so we don't spinloop.
debug!("Waiting for event.");
let mut event = select! {
recv(self.task_queue.select(), msg) => {
recv(self.task_queue.select()) -> msg => {
self.task_queue.take_tasks(msg.unwrap());
FromScript(self.task_queue.recv().unwrap())
},
recv(self.control_port.select(), msg) => FromConstellation(msg.unwrap()),
recv(self.timer_event_port.select(), msg) => FromScheduler(msg.unwrap()),
recv(self.devtools_chan.as_ref().map(|_| self.devtools_port.select()), msg) => FromDevtools(msg.unwrap()),
recv(self.image_cache_port.select(), msg) => FromImageCache(msg.unwrap()),
recv(self.control_port) -> msg => FromConstellation(msg.unwrap()),
recv(self.timer_event_port) -> msg => FromScheduler(msg.unwrap()),
recv(self.devtools_chan.as_ref().map(|_| &self.devtools_port).unwrap_or(&crossbeam_channel::never())) -> msg
=> FromDevtools(msg.unwrap()),
recv(self.image_cache_port) -> msg => FromImageCache(msg.unwrap()),
};
debug!("Got event.");
@ -1221,20 +1221,20 @@ impl ScriptThread {
// and check for more resize events. If there are no events pending, we'll move
// on and execute the sequential non-resize events we've seen.
match self.control_port.try_recv() {
None => match self.task_queue.try_recv() {
None => match self.timer_event_port.try_recv() {
None => match self.devtools_port.try_recv() {
None => match self.image_cache_port.try_recv() {
None => break,
Some(ev) => event = FromImageCache(ev),
Err(_) => match self.task_queue.try_recv() {
Err(_) => match self.timer_event_port.try_recv() {
Err(_) => match self.devtools_port.try_recv() {
Err(_) => match self.image_cache_port.try_recv() {
Err(_) => break,
Ok(ev) => event = FromImageCache(ev),
},
Some(ev) => event = FromDevtools(ev),
Ok(ev) => event = FromDevtools(ev),
},
Some(ev) => event = FromScheduler(ev),
Ok(ev) => event = FromScheduler(ev),
},
Some(ev) => event = FromScript(ev),
Ok(ev) => event = FromScript(ev),
},
Some(ev) => event = FromConstellation(ev),
Ok(ev) => event = FromConstellation(ev),
}
}
@ -1843,7 +1843,7 @@ impl ScriptThread {
layout_threads,
} = new_layout_info;
let layout_pair = channel();
let layout_pair = unbounded();
let layout_chan = layout_pair.0.clone();
let msg = message::Msg::CreateLayoutThread(NewLayoutThreadInfo {
@ -2253,7 +2253,7 @@ impl ScriptThread {
// We shut down layout before removing the document,
// since layout might still be in the middle of laying it out.
debug!("preparing to shut down layout for page {}", id);
let (response_chan, response_port) = channel();
let (response_chan, response_port) = unbounded();
chan.send(message::Msg::PrepareToExit(response_chan)).ok();
let _ = response_port.recv();
@ -2571,7 +2571,10 @@ impl ScriptThread {
let MainThreadScriptChan(ref sender) = self.chan;
let (ipc_timer_event_chan, ipc_timer_event_port) = ipc::channel().unwrap();
route_ipc_receiver_to_new_servo_sender(ipc_timer_event_port, self.timer_event_chan.clone());
ROUTER.route_ipc_receiver_to_crossbeam_sender(
ipc_timer_event_port,
self.timer_event_chan.clone(),
);
let origin = if final_url.as_str() == "about:blank" {
incomplete.origin.clone()

View file

@ -11,11 +11,12 @@ use crate::dom::abstractworker::WorkerScriptMsg;
use crate::dom::bindings::structuredclone::StructuredCloneData;
use crate::dom::serviceworkerglobalscope::{ServiceWorkerGlobalScope, ServiceWorkerScriptMsg};
use crate::dom::serviceworkerregistration::longest_prefix_match;
use crossbeam_channel::{unbounded, Receiver, RecvError, Sender};
use devtools_traits::{DevtoolsPageInfo, ScriptToDevtoolsControlMsg};
use ipc_channel::ipc::{self, IpcSender};
use ipc_channel::router::ROUTER;
use net_traits::{CoreResourceMsg, CustomResponseMediator};
use script_traits::{DOMMessage, SWManagerMsg, SWManagerSenders, ScopeThings, ServiceWorkerMsg};
use servo_channel::{channel, route_ipc_receiver_to_new_servo_receiver, Receiver, Sender};
use servo_config::prefs::PREFS;
use servo_url::ServoUrl;
use std::collections::HashMap;
@ -58,8 +59,8 @@ impl ServiceWorkerManager {
let (own_sender, from_constellation_receiver) = ipc::channel().unwrap();
let (resource_chan, resource_port) = ipc::channel().unwrap();
let from_constellation =
route_ipc_receiver_to_new_servo_receiver(from_constellation_receiver);
let resource_port = route_ipc_receiver_to_new_servo_receiver(resource_port);
ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(from_constellation_receiver);
let resource_port = ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(resource_port);
let _ = sw_senders
.resource_sender
.send(CoreResourceMsg::NetworkMediator(resource_chan));
@ -90,7 +91,7 @@ impl ServiceWorkerManager {
) -> Option<Sender<ServiceWorkerScriptMsg>> {
let scope_things = self.registered_workers.get(&scope_url);
if let Some(scope_things) = scope_things {
let (sender, receiver) = channel();
let (sender, receiver) = unbounded();
let (devtools_sender, devtools_receiver) = ipc::channel().unwrap();
if let Some(ref chan) = scope_things.devtools_chan {
let title = format!("ServiceWorker for {}", scope_things.script_url);
@ -122,7 +123,7 @@ impl ServiceWorkerManager {
}
fn handle_message(&mut self) {
while let Some(message) = self.receive_message() {
while let Ok(message) = self.receive_message() {
let should_continue = match message {
Message::FromConstellation(msg) => self.handle_message_from_constellation(msg),
Message::FromResource(msg) => self.handle_message_from_resource(msg),
@ -196,10 +197,10 @@ impl ServiceWorkerManager {
true
}
fn receive_message(&mut self) -> Option<Message> {
fn receive_message(&mut self) -> Result<Message, RecvError> {
select! {
recv(self.own_port.select(), msg) => msg.map(Message::FromConstellation),
recv(self.resource_receiver.select(), msg) => msg.map(Message::FromResource),
recv(self.own_port) -> msg => msg.map(Message::FromConstellation),
recv(self.resource_receiver) -> msg => msg.map(Message::FromResource),
}
}
}

View file

@ -9,8 +9,8 @@ use crate::dom::worker::TrustedWorkerAddress;
use crate::script_runtime::ScriptThreadEventCategory;
use crate::task::TaskBox;
use crate::task_source::TaskSourceName;
use crossbeam_channel::{self, Receiver, Sender};
use msg::constellation_msg::PipelineId;
use servo_channel::{base_channel, Receiver, Sender};
use std::cell::Cell;
use std::collections::{HashMap, VecDeque};
use std::default::Default;
@ -63,7 +63,7 @@ impl<T: QueuedTaskConversion> TaskQueue<T> {
if !first_msg.is_wake_up() {
incoming.push(first_msg);
}
while let Some(msg) = self.port.try_recv() {
while let Ok(msg) = self.port.try_recv() {
if !msg.is_wake_up() {
incoming.push(msg);
}
@ -110,21 +110,21 @@ impl<T: QueuedTaskConversion> TaskQueue<T> {
/// Reset the queue for a new iteration of the event-loop,
/// returning the port about whose readiness we want to be notified.
pub fn select(&self) -> &base_channel::Receiver<T> {
pub fn select(&self) -> &crossbeam_channel::Receiver<T> {
// This is a new iteration of the event-loop, so we reset the "business" counter.
self.taken_task_counter.set(0);
// We want to be notified when the script-port is ready to receive.
// Hence that's the one we need to include in the select.
self.port.select()
&self.port
}
/// Take a message from the front of the queue, without waiting if empty.
pub fn recv(&self) -> Option<T> {
self.msg_queue.borrow_mut().pop_front()
pub fn recv(&self) -> Result<T, ()> {
self.msg_queue.borrow_mut().pop_front().ok_or(())
}
/// Same as recv.
pub fn try_recv(&self) -> Option<T> {
pub fn try_recv(&self) -> Result<T, ()> {
self.recv()
}

View file

@ -6,8 +6,8 @@ use crate::script_runtime::{CommonScriptMsg, ScriptThreadEventCategory};
use crate::script_thread::MainThreadScriptMsg;
use crate::task::{TaskCanceller, TaskOnce};
use crate::task_source::{TaskSource, TaskSourceName};
use crossbeam_channel::Sender;
use msg::constellation_msg::PipelineId;
use servo_channel::Sender;
#[derive(Clone, JSTraceable)]
pub struct HistoryTraversalTaskSource(pub Sender<MainThreadScriptMsg>, pub PipelineId);

View file

@ -11,9 +11,9 @@ use crate::script_runtime::{CommonScriptMsg, ScriptThreadEventCategory};
use crate::script_thread::MainThreadScriptMsg;
use crate::task::{TaskCanceller, TaskOnce};
use crate::task_source::{TaskSource, TaskSourceName};
use crossbeam_channel::Sender;
use msg::constellation_msg::PipelineId;
use servo_atoms::Atom;
use servo_channel::Sender;
use std::fmt;
use std::result::Result;

View file

@ -11,9 +11,9 @@ use crate::script_runtime::{CommonScriptMsg, ScriptThreadEventCategory};
use crate::script_thread::MainThreadScriptMsg;
use crate::task::{TaskCanceller, TaskOnce};
use crate::task_source::{TaskSource, TaskSourceName};
use crossbeam_channel::Sender;
use msg::constellation_msg::PipelineId;
use servo_atoms::Atom;
use servo_channel::Sender;
use std::fmt;
use std::result::Result;