mirror of
https://github.com/servo/servo.git
synced 2025-09-27 23:30:08 +01:00
generic_channel: Preserve IPC errors (#38854)
We should not be using `route_ipc_receiver_to_new_crossbeam_receiver` or similar methods, that `unwrap()` on the ROUTER thread if they encounter IPC errors. Instead, we now propagate the error to the crossbeam receiver. In the GenericChannel::Crossbeam case this means, that we need to use a `Result<T>` as the data type, even though the Result variant is always okay, so that the receiver type is the same regardless of `IPC` or not. This is required, so we have the same channel type, and can pass the inner crossbeam channel into e.g. `select!`, without having to wrap or re-implement select. This also means, that as we switch towards GenericChannel, we will gradually improve our error handling and eventually remove the existing panics on IPC errors. These changes were extracted out of https://github.com/servo/servo/pull/38782 Testing: Covered by existing tests. No new panics were introduced. Signed-off-by: Jonathan Schwender <schwenderjonathan@gmail.com>
This commit is contained in:
parent
2b7186893f
commit
ea5d786506
6 changed files with 131 additions and 26 deletions
|
@ -5,7 +5,9 @@
|
|||
//! Enum wrappers to be able to select different channel implementations at runtime.
|
||||
|
||||
use std::fmt;
|
||||
use std::fmt::Display;
|
||||
|
||||
use ipc_channel::ipc::IpcError;
|
||||
use ipc_channel::router::ROUTER;
|
||||
use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
|
@ -16,7 +18,14 @@ static GENERIC_CHANNEL_USAGE_ERROR_PANIC_MSG: &str = "May not send a crossbeam c
|
|||
|
||||
pub enum GenericSender<T: Serialize> {
|
||||
Ipc(ipc_channel::ipc::IpcSender<T>),
|
||||
Crossbeam(crossbeam_channel::Sender<T>),
|
||||
/// A crossbeam-channel. To keep the API in sync with the Ipc variant when using a Router,
|
||||
/// which propagates the IPC error, the inner type is a Result.
|
||||
/// In the IPC case, the Router deserializes the message, which can fail, and sends
|
||||
/// the result to a crossbeam receiver.
|
||||
/// The crossbeam channel does not involve serializing, so we can't have this error,
|
||||
/// but replicating the API allows us to have one channel type as the receiver
|
||||
/// after routing the receiver .
|
||||
Crossbeam(crossbeam_channel::Sender<Result<T, ipc_channel::Error>>),
|
||||
}
|
||||
|
||||
impl<T: Serialize> Serialize for GenericSender<T> {
|
||||
|
@ -60,8 +69,12 @@ impl<T: Serialize> GenericSender<T> {
|
|||
#[inline]
|
||||
pub fn send(&self, msg: T) -> SendResult {
|
||||
match *self {
|
||||
GenericSender::Ipc(ref sender) => sender.send(msg).map_err(|_| SendError),
|
||||
GenericSender::Crossbeam(ref sender) => sender.send(msg).map_err(|_| SendError),
|
||||
GenericSender::Ipc(ref sender) => sender
|
||||
.send(msg)
|
||||
.map_err(|e| SendError::SerializationError(format!("{e}"))),
|
||||
GenericSender::Crossbeam(ref sender) => {
|
||||
sender.send(Ok(msg)).map_err(|_| SendError::Disconnected)
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -73,19 +86,80 @@ impl<T: Serialize> MallocSizeOf for GenericSender<T> {
|
|||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SendError;
|
||||
pub enum SendError {
|
||||
Disconnected,
|
||||
SerializationError(String),
|
||||
}
|
||||
|
||||
impl Display for SendError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{self:?}")
|
||||
}
|
||||
}
|
||||
|
||||
pub type SendResult = Result<(), SendError>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ReceiveError;
|
||||
pub enum ReceiveError {
|
||||
DeserializationFailed(String),
|
||||
/// Io Error. May occur when using IPC.
|
||||
Io(std::io::Error),
|
||||
/// The channel was closed.
|
||||
Disconnected,
|
||||
}
|
||||
|
||||
impl From<IpcError> for ReceiveError {
|
||||
fn from(e: IpcError) -> Self {
|
||||
match e {
|
||||
IpcError::Disconnected => ReceiveError::Disconnected,
|
||||
IpcError::Bincode(reason) => ReceiveError::DeserializationFailed(reason.to_string()),
|
||||
IpcError::Io(reason) => ReceiveError::Io(reason),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crossbeam_channel::RecvError> for ReceiveError {
|
||||
fn from(_: crossbeam_channel::RecvError) -> Self {
|
||||
ReceiveError::Disconnected
|
||||
}
|
||||
}
|
||||
|
||||
pub enum TryReceiveError {
|
||||
Empty,
|
||||
ReceiveError(ReceiveError),
|
||||
}
|
||||
|
||||
impl From<ipc_channel::ipc::TryRecvError> for TryReceiveError {
|
||||
fn from(e: ipc_channel::ipc::TryRecvError) -> Self {
|
||||
match e {
|
||||
ipc_channel::ipc::TryRecvError::Empty => TryReceiveError::Empty,
|
||||
ipc_channel::ipc::TryRecvError::IpcError(inner) => {
|
||||
TryReceiveError::ReceiveError(inner.into())
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crossbeam_channel::TryRecvError> for TryReceiveError {
|
||||
fn from(e: crossbeam_channel::TryRecvError) -> Self {
|
||||
match e {
|
||||
crossbeam_channel::TryRecvError::Empty => TryReceiveError::Empty,
|
||||
crossbeam_channel::TryRecvError::Disconnected => {
|
||||
TryReceiveError::ReceiveError(ReceiveError::Disconnected)
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type ReceiveResult<T> = Result<T, ReceiveError>;
|
||||
pub type TryReceiveResult<T> = Result<T, TryReceiveError>;
|
||||
|
||||
pub enum GenericReceiver<T>
|
||||
where
|
||||
T: for<'de> Deserialize<'de> + Serialize,
|
||||
{
|
||||
Ipc(ipc_channel::ipc::IpcReceiver<T>),
|
||||
Crossbeam(crossbeam_channel::Receiver<T>),
|
||||
Crossbeam(RoutedReceiver<T>),
|
||||
}
|
||||
|
||||
impl<T> GenericReceiver<T>
|
||||
|
@ -95,29 +169,48 @@ where
|
|||
#[inline]
|
||||
pub fn recv(&self) -> ReceiveResult<T> {
|
||||
match *self {
|
||||
GenericReceiver::Ipc(ref receiver) => receiver.recv().map_err(|_| ReceiveError),
|
||||
GenericReceiver::Crossbeam(ref receiver) => receiver.recv().map_err(|_| ReceiveError),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn try_recv(&self) -> ReceiveResult<T> {
|
||||
match *self {
|
||||
GenericReceiver::Ipc(ref receiver) => receiver.try_recv().map_err(|_| ReceiveError),
|
||||
GenericReceiver::Ipc(ref receiver) => Ok(receiver.recv()?),
|
||||
GenericReceiver::Crossbeam(ref receiver) => {
|
||||
receiver.try_recv().map_err(|_| ReceiveError)
|
||||
// `recv()` returns an error if the channel is disconnected
|
||||
let msg = receiver.recv()?;
|
||||
// `msg` must be `ok` because the corresponding [`GenericSender::Crossbeam`] will
|
||||
// unconditionally send an `Ok(T)`
|
||||
Ok(msg.expect("Infallible"))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn into_inner(self) -> crossbeam_channel::Receiver<T>
|
||||
pub fn try_recv(&self) -> TryReceiveResult<T> {
|
||||
match *self {
|
||||
GenericReceiver::Ipc(ref receiver) => Ok(receiver.try_recv()?),
|
||||
GenericReceiver::Crossbeam(ref receiver) => {
|
||||
let msg = receiver.try_recv()?;
|
||||
Ok(msg.expect("Infallible"))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Route to a crossbeam receiver, preserving any errors.
|
||||
///
|
||||
/// For `Crossbeam` receivers this is a no-op, while for `Ipc` receivers
|
||||
/// this creates a route.
|
||||
#[inline]
|
||||
pub fn route_preserving_errors(self) -> RoutedReceiver<T>
|
||||
where
|
||||
T: Send + 'static,
|
||||
{
|
||||
match self {
|
||||
GenericReceiver::Ipc(receiver) => {
|
||||
ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(receiver)
|
||||
GenericReceiver::Ipc(ipc_receiver) => {
|
||||
let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
|
||||
let crossbeam_sender_clone = crossbeam_sender.clone();
|
||||
ROUTER.add_typed_route(
|
||||
ipc_receiver,
|
||||
Box::new(move |message| {
|
||||
let _ = crossbeam_sender_clone.send(message);
|
||||
}),
|
||||
);
|
||||
crossbeam_receiver
|
||||
},
|
||||
GenericReceiver::Crossbeam(receiver) => receiver,
|
||||
}
|
||||
|
@ -165,3 +258,4 @@ where
|
|||
Some((GenericSender::Crossbeam(tx), GenericReceiver::Crossbeam(rx)))
|
||||
}
|
||||
}
|
||||
pub type RoutedReceiver<T> = crossbeam_channel::Receiver<Result<T, ipc_channel::Error>>;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue