servo/components/shared/base/generic_channel.rs
Jonathan Schwender ce8b05738c
script: Migrate swmanager to GenericChannel (#39076)
Migrate `ServiceWorkerMsg` and `SWManagerMsg` to GenericChannel

Testing: Covered by service worker wpt tests
Part of #38912

Signed-off-by: Jonathan Schwender <schwenderjonathan@gmail.com>
2025-09-01 19:23:16 +00:00

563 lines
19 KiB
Rust

/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
//! Enum wrappers to be able to select different channel implementations at runtime.
use std::fmt;
use std::fmt::Display;
use std::marker::PhantomData;
use ipc_channel::ipc::IpcError;
use ipc_channel::router::ROUTER;
use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
use serde::de::VariantAccess;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use servo_config::opts;
mod callback;
pub use callback::GenericCallback;
/// Abstraction of the ability to send a particular type of message cross-process.
/// This can be used to ease the use of GenericSender sub-fields.
pub trait GenericSend<T>
where
T: serde::Serialize + for<'de> serde::Deserialize<'de>,
{
/// send message T
fn send(&self, _: T) -> SendResult;
/// get underlying sender
fn sender(&self) -> GenericSender<T>;
}
/// A GenericSender that sends messages to a [GenericReceiver].
///
/// The sender supports sending messages cross-process, if servo is run in multiprocess mode.
pub struct GenericSender<T: Serialize>(GenericSenderVariants<T>);
/// The actual GenericSender variant.
///
/// This enum is private, so that outside code can't construct a GenericSender itself.
/// This ensures that users can't construct a crossbeam variant in multiprocess mode.
enum GenericSenderVariants<T: Serialize> {
Ipc(ipc_channel::ipc::IpcSender<T>),
/// A crossbeam-channel. To keep the API in sync with the Ipc variant when using a Router,
/// which propagates the IPC error, the inner type is a Result.
/// In the IPC case, the Router deserializes the message, which can fail, and sends
/// the result to a crossbeam receiver.
/// The crossbeam channel does not involve serializing, so we can't have this error,
/// but replicating the API allows us to have one channel type as the receiver
/// after routing the receiver .
Crossbeam(crossbeam_channel::Sender<Result<T, ipc_channel::Error>>),
}
impl<T: Serialize> Serialize for GenericSender<T> {
fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
match &self.0 {
GenericSenderVariants::Ipc(sender) => {
s.serialize_newtype_variant("GenericSender", 0, "Ipc", sender)
},
// All GenericSenders will be IPC channels in multi-process mode, so sending a
// GenericChannel over existing IPC channels is no problem and won't fail.
// In single-process mode, we can also send GenericSenders over other GenericSenders
// just fine, since no serialization is required.
// The only reason we need / want serialization is to support sending GenericSenders
// over existing IPC channels **in single process mode**. This allows us to
// incrementally port channels to the GenericChannel, without needing to follow a
// top-to-bottom approach.
// Long-term we can remove this branch in the code again and replace it with
// unreachable, since likely all IPC channels would be GenericChannels.
GenericSenderVariants::Crossbeam(sender) => {
if opts::get().multiprocess {
return Err(serde::ser::Error::custom(
"Crossbeam channel found in multiprocess mode!",
));
} // We know everything is in one address-space, so we can "serialize" the sender by
// sending a leaked Box pointer.
let sender_clone_addr = Box::leak(Box::new(sender.clone())) as *mut _ as usize;
s.serialize_newtype_variant("GenericSender", 1, "Crossbeam", &sender_clone_addr)
},
}
}
}
struct GenericSenderVisitor<T> {
marker: PhantomData<T>,
}
impl<'de, T: Serialize + Deserialize<'de>> serde::de::Visitor<'de> for GenericSenderVisitor<T> {
type Value = GenericSender<T>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a GenericSender variant")
}
fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
where
A: serde::de::EnumAccess<'de>,
{
#[derive(Deserialize)]
enum GenericSenderVariantNames {
Ipc,
Crossbeam,
}
let (variant_name, variant_data): (GenericSenderVariantNames, _) = data.variant()?;
match variant_name {
GenericSenderVariantNames::Ipc => variant_data
.newtype_variant::<ipc_channel::ipc::IpcSender<T>>()
.map(|sender| GenericSender(GenericSenderVariants::Ipc(sender))),
GenericSenderVariantNames::Crossbeam => {
if opts::get().multiprocess {
return Err(serde::de::Error::custom(
"Crossbeam channel found in multiprocess mode!",
));
}
let addr = variant_data.newtype_variant::<usize>()?;
let ptr = addr as *mut crossbeam_channel::Sender<Result<T, ipc_channel::Error>>;
// SAFETY: We know we are in the same address space as the sender, so we can safely
// reconstruct the Box.
#[allow(unsafe_code)]
let sender = unsafe { Box::from_raw(ptr) };
Ok(GenericSender(GenericSenderVariants::Crossbeam(*sender)))
},
}
}
}
impl<'a, T: Serialize + Deserialize<'a>> Deserialize<'a> for GenericSender<T> {
fn deserialize<D>(d: D) -> Result<GenericSender<T>, D::Error>
where
D: Deserializer<'a>,
{
d.deserialize_enum(
"GenericSender",
&["Ipc", "Crossbeam"],
GenericSenderVisitor {
marker: PhantomData,
},
)
}
}
impl<T> Clone for GenericSender<T>
where
T: Serialize,
{
fn clone(&self) -> Self {
match self.0 {
GenericSenderVariants::Ipc(ref chan) => {
GenericSender(GenericSenderVariants::Ipc(chan.clone()))
},
GenericSenderVariants::Crossbeam(ref chan) => {
GenericSender(GenericSenderVariants::Crossbeam(chan.clone()))
},
}
}
}
impl<T: Serialize> fmt::Debug for GenericSender<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Sender(..)")
}
}
impl<T: Serialize> GenericSender<T> {
#[inline]
pub fn send(&self, msg: T) -> SendResult {
match self.0 {
GenericSenderVariants::Ipc(ref sender) => sender
.send(msg)
.map_err(|e| SendError::SerializationError(format!("{e}"))),
GenericSenderVariants::Crossbeam(ref sender) => {
sender.send(Ok(msg)).map_err(|_| SendError::Disconnected)
},
}
}
}
impl<T: Serialize> MallocSizeOf for GenericSender<T> {
fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize {
0
}
}
#[derive(Debug)]
pub enum SendError {
Disconnected,
SerializationError(String),
}
impl Display for SendError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{self:?}")
}
}
pub type SendResult = Result<(), SendError>;
#[derive(Debug)]
pub enum ReceiveError {
DeserializationFailed(String),
/// Io Error. May occur when using IPC.
Io(std::io::Error),
/// The channel was closed.
Disconnected,
}
impl From<IpcError> for ReceiveError {
fn from(e: IpcError) -> Self {
match e {
IpcError::Disconnected => ReceiveError::Disconnected,
IpcError::Bincode(reason) => ReceiveError::DeserializationFailed(reason.to_string()),
IpcError::Io(reason) => ReceiveError::Io(reason),
}
}
}
impl From<crossbeam_channel::RecvError> for ReceiveError {
fn from(_: crossbeam_channel::RecvError) -> Self {
ReceiveError::Disconnected
}
}
pub enum TryReceiveError {
Empty,
ReceiveError(ReceiveError),
}
impl From<ipc_channel::ipc::TryRecvError> for TryReceiveError {
fn from(e: ipc_channel::ipc::TryRecvError) -> Self {
match e {
ipc_channel::ipc::TryRecvError::Empty => TryReceiveError::Empty,
ipc_channel::ipc::TryRecvError::IpcError(inner) => {
TryReceiveError::ReceiveError(inner.into())
},
}
}
}
impl From<crossbeam_channel::TryRecvError> for TryReceiveError {
fn from(e: crossbeam_channel::TryRecvError) -> Self {
match e {
crossbeam_channel::TryRecvError::Empty => TryReceiveError::Empty,
crossbeam_channel::TryRecvError::Disconnected => {
TryReceiveError::ReceiveError(ReceiveError::Disconnected)
},
}
}
}
pub type RoutedReceiver<T> = crossbeam_channel::Receiver<Result<T, ipc_channel::Error>>;
pub type ReceiveResult<T> = Result<T, ReceiveError>;
pub type TryReceiveResult<T> = Result<T, TryReceiveError>;
pub type RoutedReceiverReceiveResult<T> =
Result<Result<T, ipc_channel::Error>, crossbeam_channel::RecvError>;
pub fn to_receive_result<T>(receive_result: RoutedReceiverReceiveResult<T>) -> ReceiveResult<T> {
match receive_result {
Ok(Ok(msg)) => Ok(msg),
Err(_crossbeam_recv_err) => Err(ReceiveError::Disconnected),
Ok(Err(ipc_err)) => Err(ReceiveError::DeserializationFailed(ipc_err.to_string())),
}
}
pub struct GenericReceiver<T>(GenericReceiverVariants<T>)
where
T: for<'de> Deserialize<'de> + Serialize;
enum GenericReceiverVariants<T>
where
T: for<'de> Deserialize<'de> + Serialize,
{
Ipc(ipc_channel::ipc::IpcReceiver<T>),
Crossbeam(RoutedReceiver<T>),
}
impl<T> GenericReceiver<T>
where
T: for<'de> Deserialize<'de> + Serialize,
{
#[inline]
pub fn recv(&self) -> ReceiveResult<T> {
match self.0 {
GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.recv()?),
GenericReceiverVariants::Crossbeam(ref receiver) => {
// `recv()` returns an error if the channel is disconnected
let msg = receiver.recv()?;
// `msg` must be `ok` because the corresponding [`GenericSender::Crossbeam`] will
// unconditionally send an `Ok(T)`
Ok(msg.expect("Infallible"))
},
}
}
#[inline]
pub fn try_recv(&self) -> TryReceiveResult<T> {
match self.0 {
GenericReceiverVariants::Ipc(ref receiver) => Ok(receiver.try_recv()?),
GenericReceiverVariants::Crossbeam(ref receiver) => {
let msg = receiver.try_recv()?;
Ok(msg.expect("Infallible"))
},
}
}
/// Route to a crossbeam receiver, preserving any errors.
///
/// For `Crossbeam` receivers this is a no-op, while for `Ipc` receivers
/// this creates a route.
#[inline]
pub fn route_preserving_errors(self) -> RoutedReceiver<T>
where
T: Send + 'static,
{
match self.0 {
GenericReceiverVariants::Ipc(ipc_receiver) => {
let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
let crossbeam_sender_clone = crossbeam_sender.clone();
ROUTER.add_typed_route(
ipc_receiver,
Box::new(move |message| {
let _ = crossbeam_sender_clone.send(message);
}),
);
crossbeam_receiver
},
GenericReceiverVariants::Crossbeam(receiver) => receiver,
}
}
}
impl<T> Serialize for GenericReceiver<T>
where
T: for<'de> Deserialize<'de> + Serialize,
{
fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
match &self.0 {
GenericReceiverVariants::Ipc(receiver) => {
s.serialize_newtype_variant("GenericReceiver", 0, "Ipc", receiver)
},
GenericReceiverVariants::Crossbeam(receiver) => {
if opts::get().multiprocess {
return Err(serde::ser::Error::custom(
"Crossbeam channel found in multiprocess mode!",
));
} // We know everything is in one address-space, so we can "serialize" the receiver by
// sending a leaked Box pointer.
let receiver_clone_addr = Box::leak(Box::new(receiver.clone())) as *mut _ as usize;
s.serialize_newtype_variant("GenericReceiver", 1, "Crossbeam", &receiver_clone_addr)
},
}
}
}
struct GenericReceiverVisitor<T> {
marker: PhantomData<T>,
}
impl<'de, T> serde::de::Visitor<'de> for GenericReceiverVisitor<T>
where
T: for<'a> Deserialize<'a> + Serialize,
{
type Value = GenericReceiver<T>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a GenericReceiver variant")
}
fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
where
A: serde::de::EnumAccess<'de>,
{
#[derive(Deserialize)]
enum GenericReceiverVariantNames {
Ipc,
Crossbeam,
}
let (variant_name, variant_data): (GenericReceiverVariantNames, _) = data.variant()?;
match variant_name {
GenericReceiverVariantNames::Ipc => variant_data
.newtype_variant::<ipc_channel::ipc::IpcReceiver<T>>()
.map(|receiver| GenericReceiver(GenericReceiverVariants::Ipc(receiver))),
GenericReceiverVariantNames::Crossbeam => {
if opts::get().multiprocess {
return Err(serde::de::Error::custom(
"Crossbeam channel found in multiprocess mode!",
));
}
let addr = variant_data.newtype_variant::<usize>()?;
let ptr = addr as *mut RoutedReceiver<T>;
// SAFETY: We know we are in the same address space as the sender, so we can safely
// reconstruct the Box.
#[allow(unsafe_code)]
let receiver = unsafe { Box::from_raw(ptr) };
Ok(GenericReceiver(GenericReceiverVariants::Crossbeam(
*receiver,
)))
},
}
}
}
impl<'a, T> Deserialize<'a> for GenericReceiver<T>
where
T: for<'de> Deserialize<'de> + Serialize,
{
fn deserialize<D>(d: D) -> Result<GenericReceiver<T>, D::Error>
where
D: Deserializer<'a>,
{
d.deserialize_enum(
"GenericReceiver",
&["Ipc", "Crossbeam"],
GenericReceiverVisitor {
marker: PhantomData,
},
)
}
}
/// Private helper function to create a crossbeam based channel.
///
/// Do NOT make this function public!
fn new_generic_channel_crossbeam<T>() -> (GenericSender<T>, GenericReceiver<T>)
where
T: Serialize + for<'de> serde::Deserialize<'de>,
{
let (tx, rx) = crossbeam_channel::unbounded();
(
GenericSender(GenericSenderVariants::Crossbeam(tx)),
GenericReceiver(GenericReceiverVariants::Crossbeam(rx)),
)
}
fn new_generic_channel_ipc<T>() -> Result<(GenericSender<T>, GenericReceiver<T>), std::io::Error>
where
T: Serialize + for<'de> serde::Deserialize<'de>,
{
ipc_channel::ipc::channel().map(|(tx, rx)| {
(
GenericSender(GenericSenderVariants::Ipc(tx)),
GenericReceiver(GenericReceiverVariants::Ipc(rx)),
)
})
}
/// Creates a Servo channel that can select different channel implementations based on multiprocess
/// mode or not. If the scenario doesn't require message to pass process boundary, a simple
/// crossbeam channel is preferred.
pub fn channel<T>() -> Option<(GenericSender<T>, GenericReceiver<T>)>
where
T: for<'de> Deserialize<'de> + Serialize,
{
if servo_config::opts::get().multiprocess || servo_config::opts::get().force_ipc {
new_generic_channel_ipc().ok()
} else {
Some(new_generic_channel_crossbeam())
}
}
#[cfg(test)]
mod single_process_channel_tests {
//! These unit-tests test that ipc_channel and crossbeam_channel Senders and Receivers
//! can be sent over each other without problems in single-process mode.
//! In multiprocess mode we exclusively use `ipc_channel` anyway, which is ensured due
//! to `channel()` being the only way to construct `GenericSender` and Receiver pairs.
use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
#[test]
fn generic_crossbeam_can_send() {
let (tx, rx) = new_generic_channel_crossbeam();
tx.send(5).expect("Send failed");
let val = rx.recv().expect("Receive failed");
assert_eq!(val, 5);
}
#[test]
fn generic_crossbeam_ping_pong() {
let (tx, rx) = new_generic_channel_crossbeam();
let (tx2, rx2) = new_generic_channel_crossbeam();
tx.send(tx2).expect("Send failed");
std::thread::scope(|s| {
s.spawn(move || {
let reply_sender = rx.recv().expect("Receive failed");
reply_sender.send(42).expect("Sending reply failed");
});
});
let res = rx2.recv().expect("Receive of reply failed");
assert_eq!(res, 42);
}
#[test]
fn generic_ipc_ping_pong() {
let (tx, rx) = new_generic_channel_ipc().unwrap();
let (tx2, rx2) = new_generic_channel_ipc().unwrap();
tx.send(tx2).expect("Send failed");
std::thread::scope(|s| {
s.spawn(move || {
let reply_sender = rx.recv().expect("Receive failed");
reply_sender.send(42).expect("Sending reply failed");
});
});
let res = rx2.recv().expect("Receive of reply failed");
assert_eq!(res, 42);
}
#[test]
fn send_crossbeam_sender_over_ipc_channel() {
let (tx, rx) = new_generic_channel_ipc().unwrap();
let (tx2, rx2) = new_generic_channel_crossbeam();
tx.send(tx2).expect("Send failed");
std::thread::scope(|s| {
s.spawn(move || {
let reply_sender = rx.recv().expect("Receive failed");
reply_sender.send(42).expect("Sending reply failed");
});
});
let res = rx2.recv().expect("Receive of reply failed");
assert_eq!(res, 42);
}
#[test]
fn send_generic_ipc_channel_over_crossbeam() {
let (tx, rx) = new_generic_channel_crossbeam();
let (tx2, rx2) = new_generic_channel_ipc().unwrap();
tx.send(tx2).expect("Send failed");
std::thread::scope(|s| {
s.spawn(move || {
let reply_sender = rx.recv().expect("Receive failed");
reply_sender.send(42).expect("Sending reply failed");
});
});
let res = rx2.recv().expect("Receive of reply failed");
assert_eq!(res, 42);
}
#[test]
fn send_crossbeam_receiver_over_ipc_channel() {
let (tx, rx) = new_generic_channel_ipc().unwrap();
let (tx2, rx2) = new_generic_channel_crossbeam();
tx.send(rx2).expect("Send failed");
tx2.send(42).expect("Send failed");
std::thread::scope(|s| {
s.spawn(move || {
let another_receiver = rx.recv().expect("Receive failed");
let res = another_receiver.recv().expect("Receive failed");
assert_eq!(res, 42);
});
});
}
}