diff --git a/components/compositing/compositor.rs b/components/compositing/compositor.rs index 7b0d18b6329..3943da095d8 100644 --- a/components/compositing/compositor.rs +++ b/components/compositing/compositor.rs @@ -11,9 +11,9 @@ use std::rc::Rc; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; -use base::Epoch; use base::cross_process_instant::CrossProcessInstant; use base::id::{PipelineId, WebViewId}; +use base::{Epoch, generic_channel}; use bitflags::bitflags; use compositing_traits::display_list::{CompositorDisplayListInfo, ScrollTree, ScrollType}; use compositing_traits::rendering_context::RenderingContext; @@ -22,7 +22,7 @@ use compositing_traits::{ WebViewTrait, }; use constellation_traits::{EmbedderToConstellationMessage, PaintMetricEvent}; -use crossbeam_channel::{Receiver, Sender}; +use crossbeam_channel::Sender; use dpi::PhysicalSize; use embedder_traits::{CompositorHitTestResult, InputEvent, ShutdownState, ViewportDetails}; use euclid::{Point2D, Rect, Scale, Size2D, Transform3D}; @@ -90,7 +90,7 @@ pub struct ServoRenderer { shutdown_state: Rc>, /// The port on which we receive messages. - compositor_receiver: Receiver, + compositor_receiver: generic_channel::RoutedReceiver, /// The channel on which messages can be sent to the constellation. pub(crate) constellation_sender: Sender, @@ -1377,7 +1377,7 @@ impl IOCompositor { } /// Get the message receiver for this [`IOCompositor`]. - pub fn receiver(&self) -> Ref<'_, Receiver> { + pub fn receiver(&self) -> Ref<'_, generic_channel::RoutedReceiver> { Ref::map(self.global.borrow(), |global| &global.compositor_receiver) } diff --git a/components/compositing/lib.rs b/components/compositing/lib.rs index 4faeadf5ba6..c2bfd8eb9d7 100644 --- a/components/compositing/lib.rs +++ b/components/compositing/lib.rs @@ -7,10 +7,11 @@ use std::cell::Cell; use std::rc::Rc; +use base::generic_channel; use compositing_traits::rendering_context::RenderingContext; use compositing_traits::{CompositorMsg, CompositorProxy}; use constellation_traits::EmbedderToConstellationMessage; -use crossbeam_channel::{Receiver, Sender}; +use crossbeam_channel::Sender; use embedder_traits::{EventLoopWaker, ShutdownState}; use profile_traits::{mem, time}; use webrender::RenderApi; @@ -32,7 +33,7 @@ pub struct InitialCompositorState { /// A channel to the compositor. pub sender: CompositorProxy, /// A port on which messages inbound to the compositor can be received. - pub receiver: Receiver, + pub receiver: generic_channel::RoutedReceiver, /// A channel to the constellation. pub constellation_chan: Sender, /// A channel to the time profiler thread. diff --git a/components/constellation/constellation.rs b/components/constellation/constellation.rs index ebc2d55ce09..accfa69e1ec 100644 --- a/components/constellation/constellation.rs +++ b/components/constellation/constellation.rs @@ -99,11 +99,12 @@ use background_hang_monitor::HangMonitorRegister; use background_hang_monitor_api::{ BackgroundHangMonitorControlMsg, BackgroundHangMonitorRegister, HangMonitorAlert, }; -use base::Epoch; +use base::generic_channel::GenericSender; use base::id::{ BrowsingContextGroupId, BrowsingContextId, HistoryStateId, MessagePortId, MessagePortRouterId, PipelineId, PipelineNamespace, PipelineNamespaceId, PipelineNamespaceRequest, WebViewId, }; +use base::{Epoch, generic_channel}; #[cfg(feature = "bluetooth")] use bluetooth_traits::BluetoothRequest; use canvas::canvas_paint_thread::CanvasPaintThread; @@ -288,11 +289,11 @@ pub struct Constellation { /// An IPC channel for script threads to send messages to the constellation. /// This is the script threads' view of `script_receiver`. - script_sender: IpcSender<(PipelineId, ScriptToConstellationMessage)>, + script_sender: GenericSender<(PipelineId, ScriptToConstellationMessage)>, /// A channel for the constellation to receive messages from script threads. /// This is the constellation's view of `script_sender`. - script_receiver: Receiver>, + script_receiver: generic_channel::RoutedReceiver<(PipelineId, ScriptToConstellationMessage)>, /// A handle to register components for hang monitoring. /// None when in multiprocess mode. @@ -605,11 +606,8 @@ where .name("Constellation".to_owned()) .spawn(move || { let (script_ipc_sender, script_ipc_receiver) = - ipc::channel().expect("ipc channel failure"); - let script_receiver = - route_ipc_receiver_to_new_crossbeam_receiver_preserving_errors( - script_ipc_receiver, - ); + generic_channel::channel().expect("ipc channel failure"); + let script_receiver = script_ipc_receiver.route_preserving_errors(); let (namespace_ipc_sender, namespace_ipc_receiver) = ipc::channel().expect("ipc channel failure"); @@ -1243,7 +1241,7 @@ where let request = match request { Ok(request) => request, - Err(err) => return error!("Deserialization failed ({}).", err), + Err(err) => return error!("Deserialization failed ({err:?})."), }; match request { diff --git a/components/script/dom/workletglobalscope.rs b/components/script/dom/workletglobalscope.rs index 80767ac2e21..5c185fae919 100644 --- a/components/script/dom/workletglobalscope.rs +++ b/components/script/dom/workletglobalscope.rs @@ -4,6 +4,7 @@ use std::sync::Arc; +use base::generic_channel::GenericSender; use base::id::PipelineId; use constellation_traits::{ScriptToConstellationChan, ScriptToConstellationMessage}; use crossbeam_channel::Sender; @@ -199,7 +200,7 @@ pub(crate) struct WorkletGlobalScopeInit { /// Channel to devtools pub(crate) devtools_chan: Option>, /// Messages to send to constellation - pub(crate) to_constellation_sender: IpcSender<(PipelineId, ScriptToConstellationMessage)>, + pub(crate) to_constellation_sender: GenericSender<(PipelineId, ScriptToConstellationMessage)>, /// The image cache pub(crate) image_cache: Arc, /// Identity manager for WebGPU resources diff --git a/components/script/messaging.rs b/components/script/messaging.rs index fb295a0f0d3..6104cdfaecc 100644 --- a/components/script/messaging.rs +++ b/components/script/messaging.rs @@ -340,7 +340,7 @@ pub(crate) struct ScriptThreadSenders { /// particular pipelines. #[no_trace] pub(crate) pipeline_to_constellation_sender: - IpcSender<(PipelineId, ScriptToConstellationMessage)>, + GenericSender<(PipelineId, ScriptToConstellationMessage)>, /// The shared [`IpcSender`] which is sent to the `ImageCache` when requesting an image. The /// messages on this channel are routed to crossbeam [`Sender`] on the router thread, which diff --git a/components/servo/lib.rs b/components/servo/lib.rs index 37ce2019c7a..9b3fff5f2bf 100644 --- a/components/servo/lib.rs +++ b/components/servo/lib.rs @@ -33,6 +33,7 @@ use std::rc::{Rc, Weak}; use std::sync::{Arc, Mutex}; use std::thread; +use base::generic_channel; pub use base::id::WebViewId; use base::id::{PipelineNamespace, PipelineNamespaceId}; #[cfg(feature = "bluetooth")] @@ -82,7 +83,6 @@ use gaol::sandbox::{ChildSandbox, ChildSandboxMethods}; pub use gleam::gl; use gleam::gl::RENDERER; use ipc_channel::ipc::{self, IpcSender}; -use ipc_channel::router::ROUTER; use javascript_evaluator::JavaScriptEvaluator; pub use keyboard_types::{ Code, CompositionEvent, CompositionState, Key, KeyState, Location, Modifiers, NamedKey, @@ -543,7 +543,7 @@ impl Servo { let mut compositor = self.compositor.borrow_mut(); let mut messages = Vec::new(); while let Ok(message) = compositor.receiver().try_recv() { - messages.push(message); + messages.push(message.expect("IPC serialization error")); } compositor.handle_messages(messages); } @@ -1094,28 +1094,23 @@ fn create_embedder_channel( fn create_compositor_channel( event_loop_waker: Box, -) -> (CompositorProxy, Receiver) { - let (sender, receiver) = unbounded(); +) -> ( + CompositorProxy, + generic_channel::RoutedReceiver, +) { + let routed_channel = + generic_channel::routed_channel_with_local_sender().expect("Create channel failure"); - let (compositor_ipc_sender, compositor_ipc_receiver) = - ipc::channel().expect("ipc channel failure"); + let cross_process_compositor_api = + CrossProcessCompositorApi(routed_channel.generic_sender.clone()); - let cross_process_compositor_api = CrossProcessCompositorApi(compositor_ipc_sender); - let compositor_proxy = CompositorProxy { - sender, + let compositor_proxy = CompositorProxy::new( + routed_channel.local_sender, cross_process_compositor_api, event_loop_waker, - }; - - let compositor_proxy_clone = compositor_proxy.clone(); - ROUTER.add_typed_route( - compositor_ipc_receiver, - Box::new(move |message| { - compositor_proxy_clone.send(message.expect("Could not convert Compositor message")); - }), ); - (compositor_proxy, receiver) + (compositor_proxy, routed_channel.local_receiver) } #[allow(clippy::too_many_arguments)] diff --git a/components/shared/base/generic_channel.rs b/components/shared/base/generic_channel.rs index ce336f0d174..2dd0c2a5b7c 100644 --- a/components/shared/base/generic_channel.rs +++ b/components/shared/base/generic_channel.rs @@ -395,6 +395,18 @@ where } } +/// A GenericChannel, which was routed via the Router. +pub struct GenericRoutedChannel Deserialize<'de>> { + /// A GenericSender of the channel, i.e. it may be sent to other processes in multiprocess mode. + /// Connected to `local_receiver` via the [ROUTER]. + pub generic_sender: GenericSender, + /// A sender that directly sends to the local_receiver. Can only be used in the same process + /// as the `local_receiver`. + pub local_sender: crossbeam_channel::Sender>, + /// The receiving end of the channel. Only usable in the current process. + pub local_receiver: RoutedReceiver, +} + /// Private helper function to create a crossbeam based channel. /// /// Do NOT make this function public! @@ -435,6 +447,34 @@ where } } +/// Returns a [GenericRoutedChannel], where the receiver is usable in the current process, +/// and sending is possible both in remote and local process, via the generic_sender and the +/// local_sender. +pub fn routed_channel_with_local_sender() -> Option> +where + T: for<'de> Deserialize<'de> + Serialize + Send + 'static, +{ + let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded(); + let generic_sender = if opts::get().multiprocess || opts::get().force_ipc { + let (ipc_sender, ipc_receiver) = ipc_channel::ipc::channel().ok()?; + let crossbeam_sender_clone = crossbeam_sender.clone(); + ROUTER.add_typed_route( + ipc_receiver, + Box::new(move |message| { + let _ = crossbeam_sender_clone.send(message); + }), + ); + GenericSender(GenericSenderVariants::Ipc(ipc_sender)) + } else { + GenericSender(GenericSenderVariants::Crossbeam(crossbeam_sender.clone())) + }; + Some(GenericRoutedChannel { + generic_sender, + local_sender: crossbeam_sender, + local_receiver: crossbeam_receiver, + }) +} + #[cfg(test)] mod single_process_channel_tests { //! These unit-tests test that ipc_channel and crossbeam_channel Senders and Receivers diff --git a/components/shared/compositing/lib.rs b/components/shared/compositing/lib.rs index f61dbbb6711..8345b0b9e83 100644 --- a/components/shared/compositing/lib.rs +++ b/components/shared/compositing/lib.rs @@ -23,6 +23,7 @@ pub mod viewport_description; use std::collections::HashMap; use std::sync::{Arc, Mutex}; +use base::generic_channel::GenericSender; use bitflags::bitflags; use display_list::CompositorDisplayListInfo; use embedder_traits::ScreenGeometry; @@ -44,7 +45,12 @@ use crate::viewport_description::ViewportDescription; /// Sends messages to the compositor. #[derive(Clone)] pub struct CompositorProxy { - pub sender: Sender, + /// A sender optimised for sending in the local process. + /// The type is a Result to match the API of ipc_channel after routing, + /// which contains a Result to propagate deserialization errors to the + /// recipient. + /// The field is private to hide the inner `Result` type. + sender: Sender>, /// Access to [`Self::sender`] that is possible to send across an IPC /// channel. These messages are routed via the router thread to /// [`Self::sender`]. @@ -52,6 +58,20 @@ pub struct CompositorProxy { pub event_loop_waker: Box, } +impl CompositorProxy { + pub fn new( + local_process_sender: Sender>, + cross_process_compositor_api: CrossProcessCompositorApi, + event_loop_waker: Box, + ) -> Self { + Self { + sender: local_process_sender, + cross_process_compositor_api, + event_loop_waker, + } + } +} + impl OpaqueSender for CompositorProxy { fn send(&self, message: CompositorMsg) { CompositorProxy::send(self, message) @@ -60,7 +80,7 @@ impl OpaqueSender for CompositorProxy { impl CompositorProxy { pub fn send(&self, msg: CompositorMsg) { - if let Err(err) = self.sender.send(msg) { + if let Err(err) = self.sender.send(Ok(msg)) { warn!("Failed to send response ({:?}).", err); } self.event_loop_waker.wake(); @@ -170,18 +190,18 @@ pub struct CompositionPipeline { /// A mechanism to send messages from ScriptThread to the parent process' WebRender instance. #[derive(Clone, Deserialize, MallocSizeOf, Serialize)] -pub struct CrossProcessCompositorApi(pub IpcSender); +pub struct CrossProcessCompositorApi(pub GenericSender); impl CrossProcessCompositorApi { /// Create a new [`CrossProcessCompositorApi`] struct that does not have a listener on the other /// end to use for unit testing. pub fn dummy() -> Self { - let (sender, _) = ipc::channel().unwrap(); + let (sender, _) = base::generic_channel::channel().unwrap(); Self(sender) } /// Get the sender for this proxy. - pub fn sender(&self) -> &IpcSender { + pub fn sender(&self) -> &GenericSender { &self.0 } diff --git a/components/shared/constellation/from_script_message.rs b/components/shared/constellation/from_script_message.rs index 1f96d933a93..c55e08826e1 100644 --- a/components/shared/constellation/from_script_message.rs +++ b/components/shared/constellation/from_script_message.rs @@ -8,6 +8,7 @@ use std::collections::HashMap; use std::fmt; use base::Epoch; +use base::generic_channel::{GenericSender, SendResult}; use base::id::{ BroadcastChannelRouterId, BrowsingContextId, HistoryStateId, MessagePortId, MessagePortRouterId, PipelineId, ServiceWorkerId, ServiceWorkerRegistrationId, WebViewId, @@ -21,7 +22,6 @@ use embedder_traits::{ }; use euclid::default::Size2D as UntypedSize2D; use http::{HeaderMap, Method}; -use ipc_channel::Error as IpcError; use ipc_channel::ipc::{IpcReceiver, IpcSender}; use malloc_size_of_derive::MallocSizeOf; use net_traits::policy_container::PolicyContainer; @@ -46,14 +46,14 @@ use crate::{ #[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)] pub struct ScriptToConstellationChan { /// Sender for communicating with constellation thread. - pub sender: IpcSender<(PipelineId, ScriptToConstellationMessage)>, + pub sender: GenericSender<(PipelineId, ScriptToConstellationMessage)>, /// Used to identify the origin of the message. pub pipeline_id: PipelineId, } impl ScriptToConstellationChan { /// Send ScriptMsg and attach the pipeline_id to the message. - pub fn send(&self, msg: ScriptToConstellationMessage) -> Result<(), IpcError> { + pub fn send(&self, msg: ScriptToConstellationMessage) -> SendResult { self.sender.send((self.pipeline_id, msg)) } }