Add generic cross process callback mechanism (#38973)

One commonly encountered mechanism in servo is using ipc channels
together with the router, to register a custom callback to run in the
current process, when receiving a reply.
The new `GenericCallback` abstracts over this, and allows executing an
arbitrary callback in the process of the `GenericCallback` creator. In
multiprocess mode, this internally uses ipc channels and follows the
existing pattern.
In single process mode, we execute the callback directly, which avoids
one call to the router.
Executing the callback still incurs synchronization, since we need to
support cloning the abstraction, and the callback closure may be
`FnMut`. Future work could provide more optimized abstractions for
callbacks that don't have these requirements.

This PR allows applying #38782 again, which was previously reverted in
#38940 due to the lack of custom callback support.

See also the module documentation in `generic_channel/callback.rs`.

Testing: This PR adds unit tests. Also passes the manual testcase from
#38939

Part of #38912

---------

Signed-off-by: Jonathan Schwender <schwenderjonathan@gmail.com>
Signed-off-by: Jonathan Schwender <55576758+jschwe@users.noreply.github.com>
Co-authored-by: Josh Matthews <josh@joshmatthews.net>
This commit is contained in:
Jonathan Schwender 2025-08-30 18:58:49 +02:00 committed by GitHub
parent 6565d982bd
commit e5f9d81058
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 366 additions and 20 deletions

View file

@ -33,7 +33,7 @@ use std::rc::{Rc, Weak};
use std::sync::{Arc, Mutex};
use std::thread;
use base::generic_channel::RoutedReceiver;
use base::generic_channel::{GenericCallback, RoutedReceiver};
pub use base::id::WebViewId;
use base::id::{PipelineNamespace, PipelineNamespaceId};
#[cfg(feature = "bluetooth")]
@ -83,7 +83,6 @@ use gaol::sandbox::{ChildSandbox, ChildSandboxMethods};
pub use gleam::gl;
use gleam::gl::RENDERER;
use ipc_channel::ipc::{self, IpcSender};
use ipc_channel::router::ROUTER;
use javascript_evaluator::JavaScriptEvaluator;
pub use keyboard_types::{
Code, CompositionEvent, CompositionState, Key, KeyState, Location, Modifiers, NamedKey,
@ -1103,25 +1102,25 @@ fn create_compositor_channel(
event_loop_waker: Box<dyn EventLoopWaker>,
) -> (CompositorProxy, RoutedReceiver<CompositorMsg>) {
let (sender, receiver) = unbounded();
let sender_clone = sender.clone();
let event_loop_waker_clone = event_loop_waker.clone();
// This callback is equivalent to `CompositorProxy::send`
let result_callback = move |msg: Result<CompositorMsg, ipc_channel::Error>| {
if let Err(err) = sender_clone.send(msg) {
warn!("Failed to send response ({:?}).", err);
}
event_loop_waker_clone.wake();
};
let (compositor_ipc_sender, compositor_ipc_receiver) =
ipc::channel().expect("ipc channel failure");
let cross_process_compositor_api = CrossProcessCompositorApi::new(compositor_ipc_sender);
let generic_callback =
GenericCallback::new(result_callback).expect("Failed to create callback");
let cross_process_compositor_api = CrossProcessCompositorApi::new(generic_callback);
let compositor_proxy = CompositorProxy {
sender,
cross_process_compositor_api,
event_loop_waker,
};
let compositor_proxy_clone = compositor_proxy.clone();
ROUTER.add_typed_route(
compositor_ipc_receiver,
Box::new(move |message| {
compositor_proxy_clone.route_msg(message);
}),
);
(compositor_proxy, receiver)
}

View file

@ -15,6 +15,9 @@ use serde::de::VariantAccess;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use servo_config::opts;
mod callback;
pub use callback::GenericCallback;
/// Abstraction of the ability to send a particular type of message cross-process.
/// This can be used to ease the use of GenericSender sub-fields.
pub trait GenericSend<T>

View file

@ -0,0 +1,344 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
//! # Generic Callbacks
//!
//! When sending cross-process messages, we sometimes want to run custom callbacks when the
//! recipient has finished processing. The callback should run in the sender's address space, and
//! could be something like enqueuing a task.
//! In Multi-process mode we can implement this by providing an `IpcSender` to the recipient,
//! which the recipient can use to send some data back to the senders process.
//! To avoid blocking the sender, we can pass the callback to the ROUTER, which runs the callback
//! when receiving the Ipc message.
//! The callback will be run on every reply message from the recipient. `IpcSender`s are also
//! `Clone`able, so the Router will sequentialise callbacks.
//!
//! ## Callback scenario visualization
//!
//! The following visualization showcases how Ipc and the router thread are currently used
//! to run callbacks asynchronously on the sender process. The recipient may keep the
//! ReplySender alive and send an arbitrary amount of messages / replies.
//!
//! ```none
//! Process A | Process B
//! |
//! +---------+ IPC: SendMessage(ReplySender) | +-------------+ clone +-------------+
//! | Sender |-------------------------------------------> | Recipient | ------> | ReplySender |
//! +---------+ | +-------------+ +-------------+
//! | | | |
//! | RegisterCallback A +---------+ | Send Reply 1 | Send Reply 2 |
//! + ------------------> | Router | <--------------------------+-----------------------+
//! +---------+ |
//! | A(reply1) |
//! | A(reply2) |
//! | ... |
//! v |
//! |
//! ```
//!
//!
//! ## Optimizing single-process mode.
//!
//! In Single-process mode, there is no need for the Recipient to send an IpcReply,
//! since they are in the same address space and could just execute the callback directly.
//! Since we want to create an abstraction over such callbacks, we need to consider constraints
//! that the existing multiprocess Ipc solution imposes on us:
//!
//! - Support for `FnMut` callbacks (internal mutable state + multiple calls)
//! - The abstraction should be `Clone`able
//!
//! These constraints motivate the [GenericCallback] type, which supports `FnMut` callbacks
//! and is clonable. This requires wrapping the callback with `Arc<Mutex<>>`, which also adds
//! synchronization, which could be something that existing callbacks rely on.
//!
//! ### Future work
//!
//! - Further abstractions for callbacks with fewer constraints, e.g. callbacks
//! which don't need to be cloned by the recipient, or non-mutable callbacks.
//! - A tracing option to measure callback runtime and identify callbacks which misbehave (block)
//! for a long time.
use std::fmt;
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use ipc_channel::ErrorKind;
use ipc_channel::ipc::IpcSender;
use ipc_channel::router::ROUTER;
use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
use serde::de::VariantAccess;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use servo_config::opts;
use crate::generic_channel::{SendError, SendResult};
/// The callback type of our messages.
///
/// This is equivalent to [TypedRouterHandler][ipc_channel::router::TypedRouterHandler],
/// except that this type is not wrapped in a Box.
/// The callback will be wrapped in either a Box or an Arc, depending on if it is run on
/// the router, or passed to the recipient.
pub type MsgCallback<T> = dyn FnMut(Result<T, ipc_channel::Error>) + Send;
/// A mechanism to run a callback in the process this callback was constructed in.
///
/// The GenericCallback can be sent cross-process (in multi-process mode). In this case
/// the callback will be executed on the [ROUTER] thread.
/// In single-process mode the callback will be executed directly.
pub struct GenericCallback<T>(GenericCallbackVariants<T>)
where
T: Serialize + Send + 'static;
enum GenericCallbackVariants<T>
where
T: Serialize + Send + 'static,
{
CrossProcess(IpcSender<T>),
InProcess(Arc<Mutex<MsgCallback<T>>>),
}
impl<T> Clone for GenericCallback<T>
where
T: Serialize + Send + 'static,
{
fn clone(&self) -> Self {
let variant = match &self.0 {
GenericCallbackVariants::CrossProcess(sender) => {
GenericCallbackVariants::CrossProcess((*sender).clone())
},
GenericCallbackVariants::InProcess(callback) => {
GenericCallbackVariants::InProcess(callback.clone())
},
};
GenericCallback(variant)
}
}
impl<T> MallocSizeOf for GenericCallback<T>
where
T: Serialize + Send + 'static,
{
fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize {
0
}
}
impl<T> GenericCallback<T>
where
T: for<'de> Deserialize<'de> + Serialize + Send + 'static,
{
/// Creates a new GenericCallback.
///
/// The callback should not do any heavy work and not block.
pub fn new<F: FnMut(Result<T, ipc_channel::Error>) + Send + 'static>(
callback: F,
) -> Result<Self, ipc_channel::Error> {
let generic_callback = if opts::get().multiprocess || opts::get().force_ipc {
let (ipc_sender, ipc_receiver) = ipc_channel::ipc::channel()?;
ROUTER.add_typed_route(ipc_receiver, Box::new(callback));
GenericCallback(GenericCallbackVariants::CrossProcess(ipc_sender))
} else {
let callback = Arc::new(Mutex::new(callback));
GenericCallback(GenericCallbackVariants::InProcess(callback))
};
Ok(generic_callback)
}
/// Send `value` to the callback.
///
/// Note that a return value of `Ok()` simply means that value was sent successfully
/// to the callback. The callback itself does not return any value.
/// The caller may not assume that the callback is executed synchronously.
pub fn send(&self, value: T) -> SendResult {
match &self.0 {
GenericCallbackVariants::CrossProcess(sender) => {
sender.send(value).map_err(|error| match *error {
ErrorKind::Io(_) => SendError::Disconnected,
serialization_error => {
SendError::SerializationError(serialization_error.to_string())
},
})
},
GenericCallbackVariants::InProcess(callback) => {
let mut cb = callback.lock().expect("poisoned");
(*cb)(Ok(value));
Ok(())
},
}
}
}
impl<T> Serialize for GenericCallback<T>
where
T: Serialize + Send + 'static,
{
fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
match &self.0 {
GenericCallbackVariants::CrossProcess(sender) => {
s.serialize_newtype_variant("GenericCallback", 0, "CrossProcess", sender)
},
// The only reason we need / want serialization in single-process mode is to support
// sending GenericCallbacks over existing IPC channels. This allows us to
// incrementally port IPC channels to the GenericChannel, without needing to follow a
// top-to-bottom approach.
// Long-term we can remove this branch in the code again and replace it with
// unreachable, since likely all IPC channels would be GenericChannels.
GenericCallbackVariants::InProcess(wrapped_callback) => {
if opts::get().multiprocess {
return Err(serde::ser::Error::custom(
"InProcess callback can't be serialized in multiprocess mode",
));
}
// Due to the signature of `serialize` we need to clone the Arc to get an owned
// pointer we can leak.
// We additionally need to Box to get a thin pointer.
let cloned_callback = Box::new(wrapped_callback.clone());
let sender_clone_addr = Box::leak(cloned_callback) as *mut Arc<_> as usize;
s.serialize_newtype_variant("GenericCallback", 1, "InProcess", &sender_clone_addr)
},
}
}
}
struct GenericCallbackVisitor<T> {
marker: PhantomData<T>,
}
impl<'de, T> serde::de::Visitor<'de> for GenericCallbackVisitor<T>
where
T: Serialize + Deserialize<'de> + Send + 'static,
{
type Value = GenericCallback<T>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a GenericCallback variant")
}
fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
where
A: serde::de::EnumAccess<'de>,
{
#[derive(Deserialize)]
enum GenericCallbackVariantNames {
CrossProcess,
InProcess,
}
let (variant_name, variant_data): (GenericCallbackVariantNames, _) = data.variant()?;
match variant_name {
GenericCallbackVariantNames::CrossProcess => variant_data
.newtype_variant::<IpcSender<T>>()
.map(|sender| GenericCallback(GenericCallbackVariants::CrossProcess(sender))),
GenericCallbackVariantNames::InProcess => {
if opts::get().multiprocess {
return Err(serde::de::Error::custom(
"InProcess callback found in multiprocess mode",
));
}
let addr = variant_data.newtype_variant::<usize>()?;
let ptr = addr as *mut Arc<Mutex<_>>;
// SAFETY: We know we are in the same address space as the sender, so we can safely
// reconstruct the Arc, that we previously leaked with `into_raw` during
// serialization.
// Attention: Code reviewers should carefully compare the deserialization here
// with the serialization above.
#[allow(unsafe_code)]
let callback = unsafe { Box::from_raw(ptr) };
Ok(GenericCallback(GenericCallbackVariants::InProcess(
*callback,
)))
},
}
}
}
impl<'a, T> Deserialize<'a> for GenericCallback<T>
where
T: Serialize + Deserialize<'a> + Send + 'static,
{
fn deserialize<D>(d: D) -> Result<GenericCallback<T>, D::Error>
where
D: Deserializer<'a>,
{
d.deserialize_enum(
"GenericCallback",
&["CrossProcess", "InProcess"],
GenericCallbackVisitor {
marker: PhantomData,
},
)
}
}
impl<T> fmt::Debug for GenericCallback<T>
where
T: Serialize + Send + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "GenericCallback(..)")
}
}
#[cfg(test)]
mod single_process_callback_test {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::generic_channel::GenericCallback;
#[test]
fn generic_callback() {
let number = Arc::new(AtomicUsize::new(0));
let number_clone = number.clone();
let callback = move |msg: Result<usize, ipc_channel::Error>| {
number_clone.store(msg.unwrap(), Ordering::SeqCst)
};
let generic_callback = GenericCallback::new(callback).unwrap();
std::thread::scope(|s| {
s.spawn(move || generic_callback.send(42));
});
assert_eq!(number.load(Ordering::SeqCst), 42);
}
#[test]
fn generic_callback_via_generic_sender() {
let number = Arc::new(AtomicUsize::new(0));
let number_clone = number.clone();
let callback = move |msg: Result<usize, ipc_channel::Error>| {
number_clone.store(msg.unwrap(), Ordering::SeqCst)
};
let generic_callback = GenericCallback::new(callback).unwrap();
let (tx, rx) = crate::generic_channel::channel().unwrap();
tx.send(generic_callback).unwrap();
std::thread::scope(|s| {
s.spawn(move || {
let callback = rx.recv().unwrap();
callback.send(42).unwrap();
});
});
assert_eq!(number.load(Ordering::SeqCst), 42);
}
#[test]
fn generic_callback_via_ipc_sender() {
let number = Arc::new(AtomicUsize::new(0));
let number_clone = number.clone();
let callback = move |msg: Result<usize, ipc_channel::Error>| {
number_clone.store(msg.unwrap(), Ordering::SeqCst)
};
let generic_callback = GenericCallback::new(callback).unwrap();
let (tx, rx) = ipc_channel::ipc::channel().unwrap();
tx.send(generic_callback).unwrap();
std::thread::scope(|s| {
s.spawn(move || {
let callback = rx.recv().unwrap();
callback.send(42).unwrap();
});
});
assert_eq!(number.load(Ordering::SeqCst), 42);
}
}

View file

@ -24,7 +24,7 @@ pub mod viewport_description;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use base::generic_channel::{self, GenericSender};
use base::generic_channel::{self, GenericCallback, GenericSender};
use bitflags::bitflags;
use display_list::CompositorDisplayListInfo;
use embedder_traits::ScreenGeometry;
@ -188,19 +188,19 @@ pub struct CompositionPipeline {
/// A mechanism to send messages from ScriptThread to the parent process' WebRender instance.
#[derive(Clone, Deserialize, MallocSizeOf, Serialize)]
pub struct CrossProcessCompositorApi(IpcSender<CompositorMsg>);
pub struct CrossProcessCompositorApi(GenericCallback<CompositorMsg>);
impl CrossProcessCompositorApi {
/// Create a new [`CrossProcessCompositorApi`] struct.
pub fn new(sender: IpcSender<CompositorMsg>) -> Self {
CrossProcessCompositorApi(sender)
pub fn new(callback: GenericCallback<CompositorMsg>) -> Self {
CrossProcessCompositorApi(callback)
}
/// Create a new [`CrossProcessCompositorApi`] struct that does not have a listener on the other
/// end to use for unit testing.
pub fn dummy() -> Self {
let (sender, _) = ipc::channel().unwrap();
Self(sender)
let callback = GenericCallback::new(|_msg| ()).unwrap();
Self(callback)
}
/// Inform WebRender of the existence of this pipeline.