GenericChannel: Migrate compositor channels to GenericChannel (#38782)

Besides migrating the channel to GenericChannel, this PR adds
`routed_channel_with_local_sender()` to `generic_channel`. This is for
existing use-cases, where we want to provide both an IPC capable
GenericSender, as well as a crossbeam Sender, for efficient sending if
the sender is in the same process.

Testing: All of our channels should send / receive at least some
messages during WPT tests.

Signed-off-by: Jonathan Schwender <schwenderjonathan@gmail.com>
This commit is contained in:
Jonathan Schwender 2025-08-25 13:05:21 +02:00 committed by GitHub
parent 7441944e36
commit fb1c0a4c48
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 98 additions and 43 deletions

View file

@ -11,9 +11,9 @@ use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use base::Epoch;
use base::cross_process_instant::CrossProcessInstant; use base::cross_process_instant::CrossProcessInstant;
use base::id::{PipelineId, WebViewId}; use base::id::{PipelineId, WebViewId};
use base::{Epoch, generic_channel};
use bitflags::bitflags; use bitflags::bitflags;
use compositing_traits::display_list::{CompositorDisplayListInfo, ScrollTree, ScrollType}; use compositing_traits::display_list::{CompositorDisplayListInfo, ScrollTree, ScrollType};
use compositing_traits::rendering_context::RenderingContext; use compositing_traits::rendering_context::RenderingContext;
@ -22,7 +22,7 @@ use compositing_traits::{
WebViewTrait, WebViewTrait,
}; };
use constellation_traits::{EmbedderToConstellationMessage, PaintMetricEvent}; use constellation_traits::{EmbedderToConstellationMessage, PaintMetricEvent};
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::Sender;
use dpi::PhysicalSize; use dpi::PhysicalSize;
use embedder_traits::{CompositorHitTestResult, InputEvent, ShutdownState, ViewportDetails}; use embedder_traits::{CompositorHitTestResult, InputEvent, ShutdownState, ViewportDetails};
use euclid::{Point2D, Rect, Scale, Size2D, Transform3D}; use euclid::{Point2D, Rect, Scale, Size2D, Transform3D};
@ -90,7 +90,7 @@ pub struct ServoRenderer {
shutdown_state: Rc<Cell<ShutdownState>>, shutdown_state: Rc<Cell<ShutdownState>>,
/// The port on which we receive messages. /// The port on which we receive messages.
compositor_receiver: Receiver<CompositorMsg>, compositor_receiver: generic_channel::RoutedReceiver<CompositorMsg>,
/// The channel on which messages can be sent to the constellation. /// The channel on which messages can be sent to the constellation.
pub(crate) constellation_sender: Sender<EmbedderToConstellationMessage>, pub(crate) constellation_sender: Sender<EmbedderToConstellationMessage>,
@ -1377,7 +1377,7 @@ impl IOCompositor {
} }
/// Get the message receiver for this [`IOCompositor`]. /// Get the message receiver for this [`IOCompositor`].
pub fn receiver(&self) -> Ref<'_, Receiver<CompositorMsg>> { pub fn receiver(&self) -> Ref<'_, generic_channel::RoutedReceiver<CompositorMsg>> {
Ref::map(self.global.borrow(), |global| &global.compositor_receiver) Ref::map(self.global.borrow(), |global| &global.compositor_receiver)
} }

View file

@ -7,10 +7,11 @@
use std::cell::Cell; use std::cell::Cell;
use std::rc::Rc; use std::rc::Rc;
use base::generic_channel;
use compositing_traits::rendering_context::RenderingContext; use compositing_traits::rendering_context::RenderingContext;
use compositing_traits::{CompositorMsg, CompositorProxy}; use compositing_traits::{CompositorMsg, CompositorProxy};
use constellation_traits::EmbedderToConstellationMessage; use constellation_traits::EmbedderToConstellationMessage;
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::Sender;
use embedder_traits::{EventLoopWaker, ShutdownState}; use embedder_traits::{EventLoopWaker, ShutdownState};
use profile_traits::{mem, time}; use profile_traits::{mem, time};
use webrender::RenderApi; use webrender::RenderApi;
@ -32,7 +33,7 @@ pub struct InitialCompositorState {
/// A channel to the compositor. /// A channel to the compositor.
pub sender: CompositorProxy, pub sender: CompositorProxy,
/// A port on which messages inbound to the compositor can be received. /// A port on which messages inbound to the compositor can be received.
pub receiver: Receiver<CompositorMsg>, pub receiver: generic_channel::RoutedReceiver<CompositorMsg>,
/// A channel to the constellation. /// A channel to the constellation.
pub constellation_chan: Sender<EmbedderToConstellationMessage>, pub constellation_chan: Sender<EmbedderToConstellationMessage>,
/// A channel to the time profiler thread. /// A channel to the time profiler thread.

View file

@ -99,11 +99,12 @@ use background_hang_monitor::HangMonitorRegister;
use background_hang_monitor_api::{ use background_hang_monitor_api::{
BackgroundHangMonitorControlMsg, BackgroundHangMonitorRegister, HangMonitorAlert, BackgroundHangMonitorControlMsg, BackgroundHangMonitorRegister, HangMonitorAlert,
}; };
use base::Epoch; use base::generic_channel::GenericSender;
use base::id::{ use base::id::{
BrowsingContextGroupId, BrowsingContextId, HistoryStateId, MessagePortId, MessagePortRouterId, BrowsingContextGroupId, BrowsingContextId, HistoryStateId, MessagePortId, MessagePortRouterId,
PipelineId, PipelineNamespace, PipelineNamespaceId, PipelineNamespaceRequest, WebViewId, PipelineId, PipelineNamespace, PipelineNamespaceId, PipelineNamespaceRequest, WebViewId,
}; };
use base::{Epoch, generic_channel};
#[cfg(feature = "bluetooth")] #[cfg(feature = "bluetooth")]
use bluetooth_traits::BluetoothRequest; use bluetooth_traits::BluetoothRequest;
use canvas::canvas_paint_thread::CanvasPaintThread; use canvas::canvas_paint_thread::CanvasPaintThread;
@ -288,11 +289,11 @@ pub struct Constellation<STF, SWF> {
/// An IPC channel for script threads to send messages to the constellation. /// An IPC channel for script threads to send messages to the constellation.
/// This is the script threads' view of `script_receiver`. /// This is the script threads' view of `script_receiver`.
script_sender: IpcSender<(PipelineId, ScriptToConstellationMessage)>, script_sender: GenericSender<(PipelineId, ScriptToConstellationMessage)>,
/// A channel for the constellation to receive messages from script threads. /// A channel for the constellation to receive messages from script threads.
/// This is the constellation's view of `script_sender`. /// This is the constellation's view of `script_sender`.
script_receiver: Receiver<Result<(PipelineId, ScriptToConstellationMessage), IpcError>>, script_receiver: generic_channel::RoutedReceiver<(PipelineId, ScriptToConstellationMessage)>,
/// A handle to register components for hang monitoring. /// A handle to register components for hang monitoring.
/// None when in multiprocess mode. /// None when in multiprocess mode.
@ -605,11 +606,8 @@ where
.name("Constellation".to_owned()) .name("Constellation".to_owned())
.spawn(move || { .spawn(move || {
let (script_ipc_sender, script_ipc_receiver) = let (script_ipc_sender, script_ipc_receiver) =
ipc::channel().expect("ipc channel failure"); generic_channel::channel().expect("ipc channel failure");
let script_receiver = let script_receiver = script_ipc_receiver.route_preserving_errors();
route_ipc_receiver_to_new_crossbeam_receiver_preserving_errors(
script_ipc_receiver,
);
let (namespace_ipc_sender, namespace_ipc_receiver) = let (namespace_ipc_sender, namespace_ipc_receiver) =
ipc::channel().expect("ipc channel failure"); ipc::channel().expect("ipc channel failure");
@ -1243,7 +1241,7 @@ where
let request = match request { let request = match request {
Ok(request) => request, Ok(request) => request,
Err(err) => return error!("Deserialization failed ({}).", err), Err(err) => return error!("Deserialization failed ({err:?})."),
}; };
match request { match request {

View file

@ -4,6 +4,7 @@
use std::sync::Arc; use std::sync::Arc;
use base::generic_channel::GenericSender;
use base::id::PipelineId; use base::id::PipelineId;
use constellation_traits::{ScriptToConstellationChan, ScriptToConstellationMessage}; use constellation_traits::{ScriptToConstellationChan, ScriptToConstellationMessage};
use crossbeam_channel::Sender; use crossbeam_channel::Sender;
@ -199,7 +200,7 @@ pub(crate) struct WorkletGlobalScopeInit {
/// Channel to devtools /// Channel to devtools
pub(crate) devtools_chan: Option<IpcSender<ScriptToDevtoolsControlMsg>>, pub(crate) devtools_chan: Option<IpcSender<ScriptToDevtoolsControlMsg>>,
/// Messages to send to constellation /// Messages to send to constellation
pub(crate) to_constellation_sender: IpcSender<(PipelineId, ScriptToConstellationMessage)>, pub(crate) to_constellation_sender: GenericSender<(PipelineId, ScriptToConstellationMessage)>,
/// The image cache /// The image cache
pub(crate) image_cache: Arc<dyn ImageCache>, pub(crate) image_cache: Arc<dyn ImageCache>,
/// Identity manager for WebGPU resources /// Identity manager for WebGPU resources

View file

@ -340,7 +340,7 @@ pub(crate) struct ScriptThreadSenders {
/// particular pipelines. /// particular pipelines.
#[no_trace] #[no_trace]
pub(crate) pipeline_to_constellation_sender: pub(crate) pipeline_to_constellation_sender:
IpcSender<(PipelineId, ScriptToConstellationMessage)>, GenericSender<(PipelineId, ScriptToConstellationMessage)>,
/// The shared [`IpcSender`] which is sent to the `ImageCache` when requesting an image. The /// The shared [`IpcSender`] which is sent to the `ImageCache` when requesting an image. The
/// messages on this channel are routed to crossbeam [`Sender`] on the router thread, which /// messages on this channel are routed to crossbeam [`Sender`] on the router thread, which

View file

@ -33,6 +33,7 @@ use std::rc::{Rc, Weak};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
use base::generic_channel;
pub use base::id::WebViewId; pub use base::id::WebViewId;
use base::id::{PipelineNamespace, PipelineNamespaceId}; use base::id::{PipelineNamespace, PipelineNamespaceId};
#[cfg(feature = "bluetooth")] #[cfg(feature = "bluetooth")]
@ -82,7 +83,6 @@ use gaol::sandbox::{ChildSandbox, ChildSandboxMethods};
pub use gleam::gl; pub use gleam::gl;
use gleam::gl::RENDERER; use gleam::gl::RENDERER;
use ipc_channel::ipc::{self, IpcSender}; use ipc_channel::ipc::{self, IpcSender};
use ipc_channel::router::ROUTER;
use javascript_evaluator::JavaScriptEvaluator; use javascript_evaluator::JavaScriptEvaluator;
pub use keyboard_types::{ pub use keyboard_types::{
Code, CompositionEvent, CompositionState, Key, KeyState, Location, Modifiers, NamedKey, Code, CompositionEvent, CompositionState, Key, KeyState, Location, Modifiers, NamedKey,
@ -543,7 +543,7 @@ impl Servo {
let mut compositor = self.compositor.borrow_mut(); let mut compositor = self.compositor.borrow_mut();
let mut messages = Vec::new(); let mut messages = Vec::new();
while let Ok(message) = compositor.receiver().try_recv() { while let Ok(message) = compositor.receiver().try_recv() {
messages.push(message); messages.push(message.expect("IPC serialization error"));
} }
compositor.handle_messages(messages); compositor.handle_messages(messages);
} }
@ -1094,28 +1094,23 @@ fn create_embedder_channel(
fn create_compositor_channel( fn create_compositor_channel(
event_loop_waker: Box<dyn EventLoopWaker>, event_loop_waker: Box<dyn EventLoopWaker>,
) -> (CompositorProxy, Receiver<CompositorMsg>) { ) -> (
let (sender, receiver) = unbounded(); CompositorProxy,
generic_channel::RoutedReceiver<CompositorMsg>,
) {
let routed_channel =
generic_channel::routed_channel_with_local_sender().expect("Create channel failure");
let (compositor_ipc_sender, compositor_ipc_receiver) = let cross_process_compositor_api =
ipc::channel().expect("ipc channel failure"); CrossProcessCompositorApi(routed_channel.generic_sender.clone());
let cross_process_compositor_api = CrossProcessCompositorApi(compositor_ipc_sender); let compositor_proxy = CompositorProxy::new(
let compositor_proxy = CompositorProxy { routed_channel.local_sender,
sender,
cross_process_compositor_api, cross_process_compositor_api,
event_loop_waker, event_loop_waker,
};
let compositor_proxy_clone = compositor_proxy.clone();
ROUTER.add_typed_route(
compositor_ipc_receiver,
Box::new(move |message| {
compositor_proxy_clone.send(message.expect("Could not convert Compositor message"));
}),
); );
(compositor_proxy, receiver) (compositor_proxy, routed_channel.local_receiver)
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]

View file

@ -395,6 +395,18 @@ where
} }
} }
/// A GenericChannel, which was routed via the Router.
pub struct GenericRoutedChannel<T: Serialize + for<'de> Deserialize<'de>> {
/// A GenericSender of the channel, i.e. it may be sent to other processes in multiprocess mode.
/// Connected to `local_receiver` via the [ROUTER].
pub generic_sender: GenericSender<T>,
/// A sender that directly sends to the local_receiver. Can only be used in the same process
/// as the `local_receiver`.
pub local_sender: crossbeam_channel::Sender<Result<T, ipc_channel::Error>>,
/// The receiving end of the channel. Only usable in the current process.
pub local_receiver: RoutedReceiver<T>,
}
/// Private helper function to create a crossbeam based channel. /// Private helper function to create a crossbeam based channel.
/// ///
/// Do NOT make this function public! /// Do NOT make this function public!
@ -435,6 +447,34 @@ where
} }
} }
/// Returns a [GenericRoutedChannel], where the receiver is usable in the current process,
/// and sending is possible both in remote and local process, via the generic_sender and the
/// local_sender.
pub fn routed_channel_with_local_sender<T>() -> Option<GenericRoutedChannel<T>>
where
T: for<'de> Deserialize<'de> + Serialize + Send + 'static,
{
let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
let generic_sender = if opts::get().multiprocess || opts::get().force_ipc {
let (ipc_sender, ipc_receiver) = ipc_channel::ipc::channel().ok()?;
let crossbeam_sender_clone = crossbeam_sender.clone();
ROUTER.add_typed_route(
ipc_receiver,
Box::new(move |message| {
let _ = crossbeam_sender_clone.send(message);
}),
);
GenericSender(GenericSenderVariants::Ipc(ipc_sender))
} else {
GenericSender(GenericSenderVariants::Crossbeam(crossbeam_sender.clone()))
};
Some(GenericRoutedChannel {
generic_sender,
local_sender: crossbeam_sender,
local_receiver: crossbeam_receiver,
})
}
#[cfg(test)] #[cfg(test)]
mod single_process_channel_tests { mod single_process_channel_tests {
//! These unit-tests test that ipc_channel and crossbeam_channel Senders and Receivers //! These unit-tests test that ipc_channel and crossbeam_channel Senders and Receivers

View file

@ -23,6 +23,7 @@ pub mod viewport_description;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use base::generic_channel::GenericSender;
use bitflags::bitflags; use bitflags::bitflags;
use display_list::CompositorDisplayListInfo; use display_list::CompositorDisplayListInfo;
use embedder_traits::ScreenGeometry; use embedder_traits::ScreenGeometry;
@ -44,7 +45,12 @@ use crate::viewport_description::ViewportDescription;
/// Sends messages to the compositor. /// Sends messages to the compositor.
#[derive(Clone)] #[derive(Clone)]
pub struct CompositorProxy { pub struct CompositorProxy {
pub sender: Sender<CompositorMsg>, /// A sender optimised for sending in the local process.
/// The type is a Result to match the API of ipc_channel after routing,
/// which contains a Result to propagate deserialization errors to the
/// recipient.
/// The field is private to hide the inner `Result` type.
sender: Sender<Result<CompositorMsg, ipc_channel::Error>>,
/// Access to [`Self::sender`] that is possible to send across an IPC /// Access to [`Self::sender`] that is possible to send across an IPC
/// channel. These messages are routed via the router thread to /// channel. These messages are routed via the router thread to
/// [`Self::sender`]. /// [`Self::sender`].
@ -52,6 +58,20 @@ pub struct CompositorProxy {
pub event_loop_waker: Box<dyn EventLoopWaker>, pub event_loop_waker: Box<dyn EventLoopWaker>,
} }
impl CompositorProxy {
pub fn new(
local_process_sender: Sender<Result<CompositorMsg, ipc_channel::Error>>,
cross_process_compositor_api: CrossProcessCompositorApi,
event_loop_waker: Box<dyn EventLoopWaker>,
) -> Self {
Self {
sender: local_process_sender,
cross_process_compositor_api,
event_loop_waker,
}
}
}
impl OpaqueSender<CompositorMsg> for CompositorProxy { impl OpaqueSender<CompositorMsg> for CompositorProxy {
fn send(&self, message: CompositorMsg) { fn send(&self, message: CompositorMsg) {
CompositorProxy::send(self, message) CompositorProxy::send(self, message)
@ -60,7 +80,7 @@ impl OpaqueSender<CompositorMsg> for CompositorProxy {
impl CompositorProxy { impl CompositorProxy {
pub fn send(&self, msg: CompositorMsg) { pub fn send(&self, msg: CompositorMsg) {
if let Err(err) = self.sender.send(msg) { if let Err(err) = self.sender.send(Ok(msg)) {
warn!("Failed to send response ({:?}).", err); warn!("Failed to send response ({:?}).", err);
} }
self.event_loop_waker.wake(); self.event_loop_waker.wake();
@ -170,18 +190,18 @@ pub struct CompositionPipeline {
/// A mechanism to send messages from ScriptThread to the parent process' WebRender instance. /// A mechanism to send messages from ScriptThread to the parent process' WebRender instance.
#[derive(Clone, Deserialize, MallocSizeOf, Serialize)] #[derive(Clone, Deserialize, MallocSizeOf, Serialize)]
pub struct CrossProcessCompositorApi(pub IpcSender<CompositorMsg>); pub struct CrossProcessCompositorApi(pub GenericSender<CompositorMsg>);
impl CrossProcessCompositorApi { impl CrossProcessCompositorApi {
/// Create a new [`CrossProcessCompositorApi`] struct that does not have a listener on the other /// Create a new [`CrossProcessCompositorApi`] struct that does not have a listener on the other
/// end to use for unit testing. /// end to use for unit testing.
pub fn dummy() -> Self { pub fn dummy() -> Self {
let (sender, _) = ipc::channel().unwrap(); let (sender, _) = base::generic_channel::channel().unwrap();
Self(sender) Self(sender)
} }
/// Get the sender for this proxy. /// Get the sender for this proxy.
pub fn sender(&self) -> &IpcSender<CompositorMsg> { pub fn sender(&self) -> &GenericSender<CompositorMsg> {
&self.0 &self.0
} }

View file

@ -8,6 +8,7 @@ use std::collections::HashMap;
use std::fmt; use std::fmt;
use base::Epoch; use base::Epoch;
use base::generic_channel::{GenericSender, SendResult};
use base::id::{ use base::id::{
BroadcastChannelRouterId, BrowsingContextId, HistoryStateId, MessagePortId, BroadcastChannelRouterId, BrowsingContextId, HistoryStateId, MessagePortId,
MessagePortRouterId, PipelineId, ServiceWorkerId, ServiceWorkerRegistrationId, WebViewId, MessagePortRouterId, PipelineId, ServiceWorkerId, ServiceWorkerRegistrationId, WebViewId,
@ -21,7 +22,6 @@ use embedder_traits::{
}; };
use euclid::default::Size2D as UntypedSize2D; use euclid::default::Size2D as UntypedSize2D;
use http::{HeaderMap, Method}; use http::{HeaderMap, Method};
use ipc_channel::Error as IpcError;
use ipc_channel::ipc::{IpcReceiver, IpcSender}; use ipc_channel::ipc::{IpcReceiver, IpcSender};
use malloc_size_of_derive::MallocSizeOf; use malloc_size_of_derive::MallocSizeOf;
use net_traits::policy_container::PolicyContainer; use net_traits::policy_container::PolicyContainer;
@ -46,14 +46,14 @@ use crate::{
#[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)] #[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)]
pub struct ScriptToConstellationChan { pub struct ScriptToConstellationChan {
/// Sender for communicating with constellation thread. /// Sender for communicating with constellation thread.
pub sender: IpcSender<(PipelineId, ScriptToConstellationMessage)>, pub sender: GenericSender<(PipelineId, ScriptToConstellationMessage)>,
/// Used to identify the origin of the message. /// Used to identify the origin of the message.
pub pipeline_id: PipelineId, pub pipeline_id: PipelineId,
} }
impl ScriptToConstellationChan { impl ScriptToConstellationChan {
/// Send ScriptMsg and attach the pipeline_id to the message. /// Send ScriptMsg and attach the pipeline_id to the message.
pub fn send(&self, msg: ScriptToConstellationMessage) -> Result<(), IpcError> { pub fn send(&self, msg: ScriptToConstellationMessage) -> SendResult {
self.sender.send((self.pipeline_id, msg)) self.sender.send((self.pipeline_id, msg))
} }
} }