From ea5d786506c32766a42414be24bee1ff3b26b408 Mon Sep 17 00:00:00 2001 From: Jonathan Schwender <55576758+jschwe@users.noreply.github.com> Date: Mon, 25 Aug 2025 06:19:41 +0200 Subject: [PATCH] generic_channel: Preserve IPC errors (#38854) We should not be using `route_ipc_receiver_to_new_crossbeam_receiver` or similar methods, that `unwrap()` on the ROUTER thread if they encounter IPC errors. Instead, we now propagate the error to the crossbeam receiver. In the GenericChannel::Crossbeam case this means, that we need to use a `Result` as the data type, even though the Result variant is always okay, so that the receiver type is the same regardless of `IPC` or not. This is required, so we have the same channel type, and can pass the inner crossbeam channel into e.g. `select!`, without having to wrap or re-implement select. This also means, that as we switch towards GenericChannel, we will gradually improve our error handling and eventually remove the existing panics on IPC errors. These changes were extracted out of https://github.com/servo/servo/pull/38782 Testing: Covered by existing tests. No new panics were introduced. Signed-off-by: Jonathan Schwender --- Cargo.lock | 1 + components/script/messaging.rs | 14 ++- components/script/script_thread.rs | 2 +- components/shared/base/generic_channel.rs | 132 ++++++++++++++++++---- components/webgl/Cargo.toml | 1 + components/webgl/webgl_thread.rs | 7 +- 6 files changed, 131 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 346deea4a61..6aa5820b871 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9717,6 +9717,7 @@ dependencies = [ name = "webgl" version = "0.0.1" dependencies = [ + "base", "bitflags 2.9.3", "byteorder", "canvas_traits", diff --git a/components/script/messaging.rs b/components/script/messaging.rs index a0b21cfbe27..fb295a0f0d3 100644 --- a/components/script/messaging.rs +++ b/components/script/messaging.rs @@ -8,7 +8,7 @@ use std::cell::RefCell; use std::option::Option; use std::result::Result; -use base::generic_channel::GenericSender; +use base::generic_channel::{GenericSender, RoutedReceiver}; use base::id::PipelineId; #[cfg(feature = "bluetooth")] use bluetooth_traits::BluetoothRequest; @@ -371,7 +371,7 @@ pub(crate) struct ScriptThreadSenders { pub(crate) struct ScriptThreadReceivers { /// A [`Receiver`] that receives messages from the constellation. #[no_trace] - pub(crate) constellation_receiver: Receiver, + pub(crate) constellation_receiver: RoutedReceiver, /// The [`Receiver`] which receives incoming messages from the `ImageCache`. #[no_trace] @@ -405,7 +405,7 @@ impl ScriptThreadReceivers { .expect("Spurious wake-up of the event-loop, task-queue has no tasks available"); MixedMessage::FromScript(event) }, - recv(self.constellation_receiver) -> msg => MixedMessage::FromConstellation(msg.unwrap()), + recv(self.constellation_receiver) -> msg => MixedMessage::FromConstellation(msg.unwrap().unwrap()), recv(self.devtools_server_receiver) -> msg => MixedMessage::FromDevtools(msg.unwrap()), recv(self.image_cache_receiver) -> msg => MixedMessage::FromImageCache(msg.unwrap()), recv(timer_scheduler.wait_channel()) -> _ => MixedMessage::TimerFired, @@ -438,6 +438,14 @@ impl ScriptThreadReceivers { task_queue: &TaskQueue, ) -> Option { if let Ok(message) = self.constellation_receiver.try_recv() { + let message = message + .inspect_err(|e| { + log::warn!( + "ScriptThreadReceivers IPC error on constellation_receiver: {:?}", + e + ); + }) + .ok()?; return MixedMessage::FromConstellation(message).into(); } if let Ok(message) = task_queue.take_tasks_and_recv() { diff --git a/components/script/script_thread.rs b/components/script/script_thread.rs index 877f537d4a5..afcd816eb1c 100644 --- a/components/script/script_thread.rs +++ b/components/script/script_thread.rs @@ -870,7 +870,7 @@ impl ScriptThread { JS_AddInterruptCallback(cx, Some(interrupt_callback)); } - let constellation_receiver = state.constellation_receiver.into_inner(); + let constellation_receiver = state.constellation_receiver.route_preserving_errors(); // Ask the router to proxy IPC messages from the devtools to us. let devtools_server_sender = state.devtools_server_sender; diff --git a/components/shared/base/generic_channel.rs b/components/shared/base/generic_channel.rs index e599656e419..cd6852fdb81 100644 --- a/components/shared/base/generic_channel.rs +++ b/components/shared/base/generic_channel.rs @@ -5,7 +5,9 @@ //! Enum wrappers to be able to select different channel implementations at runtime. use std::fmt; +use std::fmt::Display; +use ipc_channel::ipc::IpcError; use ipc_channel::router::ROUTER; use malloc_size_of::{MallocSizeOf, MallocSizeOfOps}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -16,7 +18,14 @@ static GENERIC_CHANNEL_USAGE_ERROR_PANIC_MSG: &str = "May not send a crossbeam c pub enum GenericSender { Ipc(ipc_channel::ipc::IpcSender), - Crossbeam(crossbeam_channel::Sender), + /// A crossbeam-channel. To keep the API in sync with the Ipc variant when using a Router, + /// which propagates the IPC error, the inner type is a Result. + /// In the IPC case, the Router deserializes the message, which can fail, and sends + /// the result to a crossbeam receiver. + /// The crossbeam channel does not involve serializing, so we can't have this error, + /// but replicating the API allows us to have one channel type as the receiver + /// after routing the receiver . + Crossbeam(crossbeam_channel::Sender>), } impl Serialize for GenericSender { @@ -60,8 +69,12 @@ impl GenericSender { #[inline] pub fn send(&self, msg: T) -> SendResult { match *self { - GenericSender::Ipc(ref sender) => sender.send(msg).map_err(|_| SendError), - GenericSender::Crossbeam(ref sender) => sender.send(msg).map_err(|_| SendError), + GenericSender::Ipc(ref sender) => sender + .send(msg) + .map_err(|e| SendError::SerializationError(format!("{e}"))), + GenericSender::Crossbeam(ref sender) => { + sender.send(Ok(msg)).map_err(|_| SendError::Disconnected) + }, } } } @@ -73,19 +86,80 @@ impl MallocSizeOf for GenericSender { } #[derive(Debug)] -pub struct SendError; +pub enum SendError { + Disconnected, + SerializationError(String), +} + +impl Display for SendError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{self:?}") + } +} + pub type SendResult = Result<(), SendError>; #[derive(Debug)] -pub struct ReceiveError; +pub enum ReceiveError { + DeserializationFailed(String), + /// Io Error. May occur when using IPC. + Io(std::io::Error), + /// The channel was closed. + Disconnected, +} + +impl From for ReceiveError { + fn from(e: IpcError) -> Self { + match e { + IpcError::Disconnected => ReceiveError::Disconnected, + IpcError::Bincode(reason) => ReceiveError::DeserializationFailed(reason.to_string()), + IpcError::Io(reason) => ReceiveError::Io(reason), + } + } +} + +impl From for ReceiveError { + fn from(_: crossbeam_channel::RecvError) -> Self { + ReceiveError::Disconnected + } +} + +pub enum TryReceiveError { + Empty, + ReceiveError(ReceiveError), +} + +impl From for TryReceiveError { + fn from(e: ipc_channel::ipc::TryRecvError) -> Self { + match e { + ipc_channel::ipc::TryRecvError::Empty => TryReceiveError::Empty, + ipc_channel::ipc::TryRecvError::IpcError(inner) => { + TryReceiveError::ReceiveError(inner.into()) + }, + } + } +} + +impl From for TryReceiveError { + fn from(e: crossbeam_channel::TryRecvError) -> Self { + match e { + crossbeam_channel::TryRecvError::Empty => TryReceiveError::Empty, + crossbeam_channel::TryRecvError::Disconnected => { + TryReceiveError::ReceiveError(ReceiveError::Disconnected) + }, + } + } +} + pub type ReceiveResult = Result; +pub type TryReceiveResult = Result; pub enum GenericReceiver where T: for<'de> Deserialize<'de> + Serialize, { Ipc(ipc_channel::ipc::IpcReceiver), - Crossbeam(crossbeam_channel::Receiver), + Crossbeam(RoutedReceiver), } impl GenericReceiver @@ -95,29 +169,48 @@ where #[inline] pub fn recv(&self) -> ReceiveResult { match *self { - GenericReceiver::Ipc(ref receiver) => receiver.recv().map_err(|_| ReceiveError), - GenericReceiver::Crossbeam(ref receiver) => receiver.recv().map_err(|_| ReceiveError), - } - } - - #[inline] - pub fn try_recv(&self) -> ReceiveResult { - match *self { - GenericReceiver::Ipc(ref receiver) => receiver.try_recv().map_err(|_| ReceiveError), + GenericReceiver::Ipc(ref receiver) => Ok(receiver.recv()?), GenericReceiver::Crossbeam(ref receiver) => { - receiver.try_recv().map_err(|_| ReceiveError) + // `recv()` returns an error if the channel is disconnected + let msg = receiver.recv()?; + // `msg` must be `ok` because the corresponding [`GenericSender::Crossbeam`] will + // unconditionally send an `Ok(T)` + Ok(msg.expect("Infallible")) }, } } #[inline] - pub fn into_inner(self) -> crossbeam_channel::Receiver + pub fn try_recv(&self) -> TryReceiveResult { + match *self { + GenericReceiver::Ipc(ref receiver) => Ok(receiver.try_recv()?), + GenericReceiver::Crossbeam(ref receiver) => { + let msg = receiver.try_recv()?; + Ok(msg.expect("Infallible")) + }, + } + } + + /// Route to a crossbeam receiver, preserving any errors. + /// + /// For `Crossbeam` receivers this is a no-op, while for `Ipc` receivers + /// this creates a route. + #[inline] + pub fn route_preserving_errors(self) -> RoutedReceiver where T: Send + 'static, { match self { - GenericReceiver::Ipc(receiver) => { - ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(receiver) + GenericReceiver::Ipc(ipc_receiver) => { + let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded(); + let crossbeam_sender_clone = crossbeam_sender.clone(); + ROUTER.add_typed_route( + ipc_receiver, + Box::new(move |message| { + let _ = crossbeam_sender_clone.send(message); + }), + ); + crossbeam_receiver }, GenericReceiver::Crossbeam(receiver) => receiver, } @@ -165,3 +258,4 @@ where Some((GenericSender::Crossbeam(tx), GenericReceiver::Crossbeam(rx))) } } +pub type RoutedReceiver = crossbeam_channel::Receiver>; diff --git a/components/webgl/Cargo.toml b/components/webgl/Cargo.toml index 46fe7381959..28b23baaa32 100644 --- a/components/webgl/Cargo.toml +++ b/components/webgl/Cargo.toml @@ -16,6 +16,7 @@ webgl_backtrace = ["canvas_traits/webgl_backtrace"] webxr = ["dep:webxr", "dep:webxr-api"] [dependencies] +base = { path = "../shared/base" } bitflags = { workspace = true } byteorder = { workspace = true } canvas_traits = { workspace = true } diff --git a/components/webgl/webgl_thread.rs b/components/webgl/webgl_thread.rs index 8503e6b47c2..223adcac4a8 100644 --- a/components/webgl/webgl_thread.rs +++ b/components/webgl/webgl_thread.rs @@ -8,6 +8,7 @@ use std::rc::Rc; use std::sync::{Arc, Mutex}; use std::{slice, thread}; +use base::generic_channel::RoutedReceiver; use bitflags::bitflags; use byteorder::{ByteOrder, NativeEndian, WriteBytesExt}; use canvas_traits::webgl; @@ -217,7 +218,7 @@ pub(crate) struct WebGLThread { /// We use it to get an unique ID for new WebGLContexts. external_images: Arc>, /// The receiver that will be used for processing WebGL messages. - receiver: crossbeam_channel::Receiver, + receiver: RoutedReceiver, /// The receiver that should be used to send WebGL messages for processing. sender: WebGLSender, /// The swap chains used by webrender @@ -275,7 +276,7 @@ impl WebGLThread { bound_context_id: None, external_images, sender, - receiver: receiver.into_inner(), + receiver: receiver.route_preserving_errors(), webrender_swap_chains, api_type, #[cfg(feature = "webxr")] @@ -297,7 +298,7 @@ impl WebGLThread { fn process(&mut self) { let webgl_chan = WebGLChan(self.sender.clone()); - while let Ok(msg) = self.receiver.recv() { + while let Ok(Ok(msg)) = self.receiver.recv() { let exit = self.handle_msg(msg, &webgl_chan); if exit { break;