generic_channel: Support sending GenericChannel::Crossbeam over IPC (#38873)

In Single-process mode, sending crossbeam channels over an ipc channel
is perfectly safe to do (since everything is in the same process) and
allows us to more easily incrementally port channels.

In Multi-process mode, GenericChannels will always be IPC channels, so
we won't hit the "serialize crossbeam channels" branch (since the only
way to construct channels, checks the process mode). This property is
ensured by `channel()` being the only way to construct a `GenericSender`
and Receiver pair. To achieve this, we make the previously `pub` enum
private, and wrap it in a newtype, so that the type can't be constructed
from outside the module.
To be extra safe, we still check if we are in multiprocess mode or not
during (de-)serialization and emit an error.


Testing: Add a new unit-test to ensure sending GenericSender / Receivers
over an ipc_channel works.

---------

Signed-off-by: Jonathan Schwender <schwenderjonathan@gmail.com>
This commit is contained in:
Jonathan Schwender 2025-08-25 10:11:09 +02:00 committed by GitHub
parent 11bea7303d
commit 7441944e36
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -6,17 +6,25 @@
use std::fmt; use std::fmt;
use std::fmt::Display; use std::fmt::Display;
use std::marker::PhantomData;
use ipc_channel::ipc::IpcError; use ipc_channel::ipc::IpcError;
use ipc_channel::router::ROUTER; use ipc_channel::router::ROUTER;
use malloc_size_of::{MallocSizeOf, MallocSizeOfOps}; use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
use serde::de::VariantAccess;
use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde::{Deserialize, Deserializer, Serialize, Serializer};
use servo_config::opts;
static GENERIC_CHANNEL_USAGE_ERROR_PANIC_MSG: &str = "May not send a crossbeam channel over an IPC channel. \ /// A GenericSender that sends messages to a [GenericReceiver].
Please also convert the ipc-channel you want to send this GenericReceiver over \ ///
into a GenericChannel."; /// The sender supports sending messages cross-process, if servo is run in multiprocess mode.
pub struct GenericSender<T: Serialize>(GenericSenderVariants<T>);
pub enum GenericSender<T: Serialize> { /// The actual GenericSender variant.
///
/// This enum is private, so that outside code can't construct a GenericSender itself.
/// This ensures that users can't construct a crossbeam variant in multiprocess mode.
enum GenericSenderVariants<T: Serialize> {
Ipc(ipc_channel::ipc::IpcSender<T>), Ipc(ipc_channel::ipc::IpcSender<T>),
/// A crossbeam-channel. To keep the API in sync with the Ipc variant when using a Router, /// A crossbeam-channel. To keep the API in sync with the Ipc variant when using a Router,
/// which propagates the IPC error, the inner type is a Result. /// which propagates the IPC error, the inner type is a Result.
@ -30,20 +38,91 @@ pub enum GenericSender<T: Serialize> {
impl<T: Serialize> Serialize for GenericSender<T> { impl<T: Serialize> Serialize for GenericSender<T> {
fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> { fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
match self { match &self.0 {
GenericSender::Ipc(i) => i.serialize(s), GenericSenderVariants::Ipc(sender) => {
GenericSender::Crossbeam(_) => panic!("{GENERIC_CHANNEL_USAGE_ERROR_PANIC_MSG}"), s.serialize_newtype_variant("GenericSender", 0, "Ipc", sender)
},
// All GenericSenders will be IPC channels in multi-process mode, so sending a
// GenericChannel over existing IPC channels is no problem and won't fail.
// In single-process mode, we can also send GenericSenders over other GenericSenders
// just fine, since no serialization is required.
// The only reason we need / want serialization is to support sending GenericSenders
// over existing IPC channels **in single process mode**. This allows us to
// incrementally port channels to the GenericChannel, without needing to follow a
// top-to-bottom approach.
// Long-term we can remove this branch in the code again and replace it with
// unreachable, since likely all IPC channels would be GenericChannels.
GenericSenderVariants::Crossbeam(sender) => {
if opts::get().multiprocess {
return Err(serde::ser::Error::custom(
"Crossbeam channel found in multiprocess mode!",
));
} // We know everything is in one address-space, so we can "serialize" the sender by
// sending a leaked Box pointer.
let sender_clone_addr = Box::leak(Box::new(sender.clone())) as *mut _ as usize;
s.serialize_newtype_variant("GenericSender", 1, "Crossbeam", &sender_clone_addr)
},
} }
} }
} }
impl<'a, T: Serialize> Deserialize<'a> for GenericSender<T> { struct GenericSenderVisitor<T> {
marker: PhantomData<T>,
}
impl<'de, T: Serialize + Deserialize<'de>> serde::de::Visitor<'de> for GenericSenderVisitor<T> {
type Value = GenericSender<T>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a GenericSender variant")
}
fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
where
A: serde::de::EnumAccess<'de>,
{
#[derive(Deserialize)]
enum GenericSenderVariantNames {
Ipc,
Crossbeam,
}
let (variant_name, variant_data): (GenericSenderVariantNames, _) = data.variant()?;
match variant_name {
GenericSenderVariantNames::Ipc => variant_data
.newtype_variant::<ipc_channel::ipc::IpcSender<T>>()
.map(|sender| GenericSender(GenericSenderVariants::Ipc(sender))),
GenericSenderVariantNames::Crossbeam => {
if opts::get().multiprocess {
return Err(serde::de::Error::custom(
"Crossbeam channel found in multiprocess mode!",
));
}
let addr = variant_data.newtype_variant::<usize>()?;
let ptr = addr as *mut crossbeam_channel::Sender<Result<T, ipc_channel::Error>>;
// SAFETY: We know we are in the same address space as the sender, so we can safely
// reconstruct the Box.
#[allow(unsafe_code)]
let sender = unsafe { Box::from_raw(ptr) };
Ok(GenericSender(GenericSenderVariants::Crossbeam(*sender)))
},
}
}
}
impl<'a, T: Serialize + Deserialize<'a>> Deserialize<'a> for GenericSender<T> {
fn deserialize<D>(d: D) -> Result<GenericSender<T>, D::Error> fn deserialize<D>(d: D) -> Result<GenericSender<T>, D::Error>
where where
D: Deserializer<'a>, D: Deserializer<'a>,
{ {
// Only ipc_channel will encounter deserialize scenario. d.deserialize_enum(
ipc_channel::ipc::IpcSender::<T>::deserialize(d).map(GenericSender::Ipc) "GenericSender",
&["Ipc", "Crossbeam"],
GenericSenderVisitor {
marker: PhantomData,
},
)
} }
} }
@ -52,9 +131,13 @@ where
T: Serialize, T: Serialize,
{ {
fn clone(&self) -> Self { fn clone(&self) -> Self {
match *self { match self.0 {
GenericSender::Ipc(ref chan) => GenericSender::Ipc(chan.clone()), GenericSenderVariants::Ipc(ref chan) => {
GenericSender::Crossbeam(ref chan) => GenericSender::Crossbeam(chan.clone()), GenericSender(GenericSenderVariants::Ipc(chan.clone()))
},
GenericSenderVariants::Crossbeam(ref chan) => {
GenericSender(GenericSenderVariants::Crossbeam(chan.clone()))
},
} }
} }
} }
@ -68,11 +151,11 @@ impl<T: Serialize> fmt::Debug for GenericSender<T> {
impl<T: Serialize> GenericSender<T> { impl<T: Serialize> GenericSender<T> {
#[inline] #[inline]
pub fn send(&self, msg: T) -> SendResult { pub fn send(&self, msg: T) -> SendResult {
match *self { match self.0 {
GenericSender::Ipc(ref sender) => sender GenericSenderVariants::Ipc(ref sender) => sender
.send(msg) .send(msg)
.map_err(|e| SendError::SerializationError(format!("{e}"))), .map_err(|e| SendError::SerializationError(format!("{e}"))),
GenericSender::Crossbeam(ref sender) => { GenericSenderVariants::Crossbeam(ref sender) => {
sender.send(Ok(msg)).map_err(|_| SendError::Disconnected) sender.send(Ok(msg)).map_err(|_| SendError::Disconnected)
}, },
} }
@ -151,10 +234,15 @@ impl From<crossbeam_channel::TryRecvError> for TryReceiveError {
} }
} }
pub type RoutedReceiver<T> = crossbeam_channel::Receiver<Result<T, ipc_channel::Error>>;
pub type ReceiveResult<T> = Result<T, ReceiveError>; pub type ReceiveResult<T> = Result<T, ReceiveError>;
pub type TryReceiveResult<T> = Result<T, TryReceiveError>; pub type TryReceiveResult<T> = Result<T, TryReceiveError>;
pub enum GenericReceiver<T> pub struct GenericReceiver<T>(GenericReceiverVariants<T>)
where
T: for<'de> Deserialize<'de> + Serialize;
enum GenericReceiverVariants<T>
where where
T: for<'de> Deserialize<'de> + Serialize, T: for<'de> Deserialize<'de> + Serialize,
{ {
@ -168,9 +256,9 @@ where
{ {
#[inline] #[inline]
pub fn recv(&self) -> ReceiveResult<T> { pub fn recv(&self) -> ReceiveResult<T> {
match *self { match self.0 {
GenericReceiver::Ipc(ref receiver) => Ok(receiver.recv()?), GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.recv()?),
GenericReceiver::Crossbeam(ref receiver) => { GenericReceiverVariants::Crossbeam(ref receiver) => {
// `recv()` returns an error if the channel is disconnected // `recv()` returns an error if the channel is disconnected
let msg = receiver.recv()?; let msg = receiver.recv()?;
// `msg` must be `ok` because the corresponding [`GenericSender::Crossbeam`] will // `msg` must be `ok` because the corresponding [`GenericSender::Crossbeam`] will
@ -182,9 +270,9 @@ where
#[inline] #[inline]
pub fn try_recv(&self) -> TryReceiveResult<T> { pub fn try_recv(&self) -> TryReceiveResult<T> {
match *self { match self.0 {
GenericReceiver::Ipc(ref receiver) => Ok(receiver.try_recv()?), GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.try_recv()?),
GenericReceiver::Crossbeam(ref receiver) => { GenericReceiverVariants::Crossbeam(ref receiver) => {
let msg = receiver.try_recv()?; let msg = receiver.try_recv()?;
Ok(msg.expect("Infallible")) Ok(msg.expect("Infallible"))
}, },
@ -200,8 +288,8 @@ where
where where
T: Send + 'static, T: Send + 'static,
{ {
match self { match self.0 {
GenericReceiver::Ipc(ipc_receiver) => { GenericReceiverVariants::Ipc(ipc_receiver) => {
let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded(); let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
let crossbeam_sender_clone = crossbeam_sender.clone(); let crossbeam_sender_clone = crossbeam_sender.clone();
ROUTER.add_typed_route( ROUTER.add_typed_route(
@ -212,7 +300,7 @@ where
); );
crossbeam_receiver crossbeam_receiver
}, },
GenericReceiver::Crossbeam(receiver) => receiver, GenericReceiverVariants::Crossbeam(receiver) => receiver,
} }
} }
} }
@ -222,9 +310,69 @@ where
T: for<'de> Deserialize<'de> + Serialize, T: for<'de> Deserialize<'de> + Serialize,
{ {
fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> { fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
match self { match &self.0 {
GenericReceiver::Ipc(receiver) => receiver.serialize(s), GenericReceiverVariants::Ipc(receiver) => {
GenericReceiver::Crossbeam(_) => panic!("{GENERIC_CHANNEL_USAGE_ERROR_PANIC_MSG}"), s.serialize_newtype_variant("GenericReceiver", 0, "Ipc", receiver)
},
GenericReceiverVariants::Crossbeam(receiver) => {
if opts::get().multiprocess {
return Err(serde::ser::Error::custom(
"Crossbeam channel found in multiprocess mode!",
));
} // We know everything is in one address-space, so we can "serialize" the receiver by
// sending a leaked Box pointer.
let receiver_clone_addr = Box::leak(Box::new(receiver.clone())) as *mut _ as usize;
s.serialize_newtype_variant("GenericReceiver", 1, "Crossbeam", &receiver_clone_addr)
},
}
}
}
struct GenericReceiverVisitor<T> {
marker: PhantomData<T>,
}
impl<'de, T> serde::de::Visitor<'de> for GenericReceiverVisitor<T>
where
T: for<'a> Deserialize<'a> + Serialize,
{
type Value = GenericReceiver<T>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a GenericReceiver variant")
}
fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
where
A: serde::de::EnumAccess<'de>,
{
#[derive(Deserialize)]
enum GenericReceiverVariantNames {
Ipc,
Crossbeam,
}
let (variant_name, variant_data): (GenericReceiverVariantNames, _) = data.variant()?;
match variant_name {
GenericReceiverVariantNames::Ipc => variant_data
.newtype_variant::<ipc_channel::ipc::IpcReceiver<T>>()
.map(|receiver| GenericReceiver(GenericReceiverVariants::Ipc(receiver))),
GenericReceiverVariantNames::Crossbeam => {
if opts::get().multiprocess {
return Err(serde::de::Error::custom(
"Crossbeam channel found in multiprocess mode!",
));
}
let addr = variant_data.newtype_variant::<usize>()?;
let ptr = addr as *mut RoutedReceiver<T>;
// SAFETY: We know we are in the same address space as the sender, so we can safely
// reconstruct the Box.
#[allow(unsafe_code)]
let receiver = unsafe { Box::from_raw(ptr) };
Ok(GenericReceiver(GenericReceiverVariants::Crossbeam(
*receiver,
)))
},
} }
} }
} }
@ -237,11 +385,42 @@ where
where where
D: Deserializer<'a>, D: Deserializer<'a>,
{ {
// Only ipc_channel will encounter deserialize scenario. d.deserialize_enum(
ipc_channel::ipc::IpcReceiver::<T>::deserialize(d).map(GenericReceiver::Ipc) "GenericReceiver",
&["Ipc", "Crossbeam"],
GenericReceiverVisitor {
marker: PhantomData,
},
)
} }
} }
/// Private helper function to create a crossbeam based channel.
///
/// Do NOT make this function public!
fn new_generic_channel_crossbeam<T>() -> (GenericSender<T>, GenericReceiver<T>)
where
T: Serialize + for<'de> serde::Deserialize<'de>,
{
let (tx, rx) = crossbeam_channel::unbounded();
(
GenericSender(GenericSenderVariants::Crossbeam(tx)),
GenericReceiver(GenericReceiverVariants::Crossbeam(rx)),
)
}
fn new_generic_channel_ipc<T>() -> Result<(GenericSender<T>, GenericReceiver<T>), std::io::Error>
where
T: Serialize + for<'de> serde::Deserialize<'de>,
{
ipc_channel::ipc::channel().map(|(tx, rx)| {
(
GenericSender(GenericSenderVariants::Ipc(tx)),
GenericReceiver(GenericReceiverVariants::Ipc(rx)),
)
})
}
/// Creates a Servo channel that can select different channel implementations based on multiprocess /// Creates a Servo channel that can select different channel implementations based on multiprocess
/// mode or not. If the scenario doesn't require message to pass process boundary, a simple /// mode or not. If the scenario doesn't require message to pass process boundary, a simple
/// crossbeam channel is preferred. /// crossbeam channel is preferred.
@ -250,12 +429,110 @@ where
T: for<'de> Deserialize<'de> + Serialize, T: for<'de> Deserialize<'de> + Serialize,
{ {
if servo_config::opts::get().multiprocess || servo_config::opts::get().force_ipc { if servo_config::opts::get().multiprocess || servo_config::opts::get().force_ipc {
ipc_channel::ipc::channel() new_generic_channel_ipc().ok()
.map(|(tx, rx)| (GenericSender::Ipc(tx), GenericReceiver::Ipc(rx)))
.ok()
} else { } else {
let (tx, rx) = crossbeam_channel::unbounded(); Some(new_generic_channel_crossbeam())
Some((GenericSender::Crossbeam(tx), GenericReceiver::Crossbeam(rx))) }
}
#[cfg(test)]
mod single_process_channel_tests {
//! These unit-tests test that ipc_channel and crossbeam_channel Senders and Receivers
//! can be sent over each other without problems in single-process mode.
//! In multiprocess mode we exclusively use `ipc_channel` anyway, which is ensured due
//! to `channel()` being the only way to construct `GenericSender` and Receiver pairs.
use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
#[test]
fn generic_crossbeam_can_send() {
let (tx, rx) = new_generic_channel_crossbeam();
tx.send(5).expect("Send failed");
let val = rx.recv().expect("Receive failed");
assert_eq!(val, 5);
}
#[test]
fn generic_crossbeam_ping_pong() {
let (tx, rx) = new_generic_channel_crossbeam();
let (tx2, rx2) = new_generic_channel_crossbeam();
tx.send(tx2).expect("Send failed");
std::thread::scope(|s| {
s.spawn(move || {
let reply_sender = rx.recv().expect("Receive failed");
reply_sender.send(42).expect("Sending reply failed");
});
});
let res = rx2.recv().expect("Receive of reply failed");
assert_eq!(res, 42);
}
#[test]
fn generic_ipc_ping_pong() {
let (tx, rx) = new_generic_channel_ipc().unwrap();
let (tx2, rx2) = new_generic_channel_ipc().unwrap();
tx.send(tx2).expect("Send failed");
std::thread::scope(|s| {
s.spawn(move || {
let reply_sender = rx.recv().expect("Receive failed");
reply_sender.send(42).expect("Sending reply failed");
});
});
let res = rx2.recv().expect("Receive of reply failed");
assert_eq!(res, 42);
}
#[test]
fn send_crossbeam_sender_over_ipc_channel() {
let (tx, rx) = new_generic_channel_ipc().unwrap();
let (tx2, rx2) = new_generic_channel_crossbeam();
tx.send(tx2).expect("Send failed");
std::thread::scope(|s| {
s.spawn(move || {
let reply_sender = rx.recv().expect("Receive failed");
reply_sender.send(42).expect("Sending reply failed");
});
});
let res = rx2.recv().expect("Receive of reply failed");
assert_eq!(res, 42);
}
#[test]
fn send_generic_ipc_channel_over_crossbeam() {
let (tx, rx) = new_generic_channel_crossbeam();
let (tx2, rx2) = new_generic_channel_ipc().unwrap();
tx.send(tx2).expect("Send failed");
std::thread::scope(|s| {
s.spawn(move || {
let reply_sender = rx.recv().expect("Receive failed");
reply_sender.send(42).expect("Sending reply failed");
});
});
let res = rx2.recv().expect("Receive of reply failed");
assert_eq!(res, 42);
}
#[test]
fn send_crossbeam_receiver_over_ipc_channel() {
let (tx, rx) = new_generic_channel_ipc().unwrap();
let (tx2, rx2) = new_generic_channel_crossbeam();
tx.send(rx2).expect("Send failed");
tx2.send(42).expect("Send failed");
std::thread::scope(|s| {
s.spawn(move || {
let another_receiver = rx.recv().expect("Receive failed");
let res = another_receiver.recv().expect("Receive failed");
assert_eq!(res, 42);
});
});
} }
} }
pub type RoutedReceiver<T> = crossbeam_channel::Receiver<Result<T, ipc_channel::Error>>;