/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ //! # Generic Callbacks //! //! When sending cross-process messages, we sometimes want to run custom callbacks when the //! recipient has finished processing. The callback should run in the sender's address space, and //! could be something like enqueuing a task. //! In Multi-process mode we can implement this by providing an `IpcSender` to the recipient, //! which the recipient can use to send some data back to the senders process. //! To avoid blocking the sender, we can pass the callback to the ROUTER, which runs the callback //! when receiving the Ipc message. //! The callback will be run on every reply message from the recipient. `IpcSender`s are also //! `Clone`able, so the Router will sequentialise callbacks. //! //! ## Callback scenario visualization //! //! The following visualization showcases how Ipc and the router thread are currently used //! to run callbacks asynchronously on the sender process. The recipient may keep the //! ReplySender alive and send an arbitrary amount of messages / replies. //! //! ```none //! Process A | Process B //! | //! +---------+ IPC: SendMessage(ReplySender) | +-------------+ clone +-------------+ //! | Sender |-------------------------------------------> | Recipient | ------> | ReplySender | //! +---------+ | +-------------+ +-------------+ //! | | | | //! | RegisterCallback A +---------+ | Send Reply 1 | Send Reply 2 | //! + ------------------> | Router | <--------------------------+-----------------------+ //! +---------+ | //! | A(reply1) | //! | A(reply2) | //! | ... | //! v | //! | //! ``` //! //! //! ## Optimizing single-process mode. //! //! In Single-process mode, there is no need for the Recipient to send an IpcReply, //! since they are in the same address space and could just execute the callback directly. //! Since we want to create an abstraction over such callbacks, we need to consider constraints //! that the existing multiprocess Ipc solution imposes on us: //! //! - Support for `FnMut` callbacks (internal mutable state + multiple calls) //! - The abstraction should be `Clone`able //! //! These constraints motivate the [GenericCallback] type, which supports `FnMut` callbacks //! and is clonable. This requires wrapping the callback with `Arc>`, which also adds //! synchronization, which could be something that existing callbacks rely on. //! //! ### Future work //! //! - Further abstractions for callbacks with fewer constraints, e.g. callbacks //! which don't need to be cloned by the recipient, or non-mutable callbacks. //! - A tracing option to measure callback runtime and identify callbacks which misbehave (block) //! for a long time. use std::fmt; use std::marker::PhantomData; use std::sync::{Arc, Mutex}; use ipc_channel::ErrorKind; use ipc_channel::ipc::IpcSender; use ipc_channel::router::ROUTER; use malloc_size_of::{MallocSizeOf, MallocSizeOfOps}; use serde::de::VariantAccess; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use servo_config::opts; use crate::generic_channel::{SendError, SendResult}; /// The callback type of our messages. /// /// This is equivalent to [TypedRouterHandler][ipc_channel::router::TypedRouterHandler], /// except that this type is not wrapped in a Box. /// The callback will be wrapped in either a Box or an Arc, depending on if it is run on /// the router, or passed to the recipient. pub type MsgCallback = dyn FnMut(Result) + Send; /// A mechanism to run a callback in the process this callback was constructed in. /// /// The GenericCallback can be sent cross-process (in multi-process mode). In this case /// the callback will be executed on the [ROUTER] thread. /// In single-process mode the callback will be executed directly. pub struct GenericCallback(GenericCallbackVariants) where T: Serialize + Send + 'static; enum GenericCallbackVariants where T: Serialize + Send + 'static, { CrossProcess(IpcSender), InProcess(Arc>>), } impl Clone for GenericCallback where T: Serialize + Send + 'static, { fn clone(&self) -> Self { let variant = match &self.0 { GenericCallbackVariants::CrossProcess(sender) => { GenericCallbackVariants::CrossProcess((*sender).clone()) }, GenericCallbackVariants::InProcess(callback) => { GenericCallbackVariants::InProcess(callback.clone()) }, }; GenericCallback(variant) } } impl MallocSizeOf for GenericCallback where T: Serialize + Send + 'static, { fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize { 0 } } impl GenericCallback where T: for<'de> Deserialize<'de> + Serialize + Send + 'static, { /// Creates a new GenericCallback. /// /// The callback should not do any heavy work and not block. pub fn new) + Send + 'static>( callback: F, ) -> Result { let generic_callback = if opts::get().multiprocess || opts::get().force_ipc { let (ipc_sender, ipc_receiver) = ipc_channel::ipc::channel()?; ROUTER.add_typed_route(ipc_receiver, Box::new(callback)); GenericCallback(GenericCallbackVariants::CrossProcess(ipc_sender)) } else { let callback = Arc::new(Mutex::new(callback)); GenericCallback(GenericCallbackVariants::InProcess(callback)) }; Ok(generic_callback) } /// Send `value` to the callback. /// /// Note that a return value of `Ok()` simply means that value was sent successfully /// to the callback. The callback itself does not return any value. /// The caller may not assume that the callback is executed synchronously. pub fn send(&self, value: T) -> SendResult { match &self.0 { GenericCallbackVariants::CrossProcess(sender) => { sender.send(value).map_err(|error| match *error { ErrorKind::Io(_) => SendError::Disconnected, serialization_error => { SendError::SerializationError(serialization_error.to_string()) }, }) }, GenericCallbackVariants::InProcess(callback) => { let mut cb = callback.lock().expect("poisoned"); (*cb)(Ok(value)); Ok(()) }, } } } impl Serialize for GenericCallback where T: Serialize + Send + 'static, { fn serialize(&self, s: S) -> Result { match &self.0 { GenericCallbackVariants::CrossProcess(sender) => { s.serialize_newtype_variant("GenericCallback", 0, "CrossProcess", sender) }, // The only reason we need / want serialization in single-process mode is to support // sending GenericCallbacks over existing IPC channels. This allows us to // incrementally port IPC channels to the GenericChannel, without needing to follow a // top-to-bottom approach. // Long-term we can remove this branch in the code again and replace it with // unreachable, since likely all IPC channels would be GenericChannels. GenericCallbackVariants::InProcess(wrapped_callback) => { if opts::get().multiprocess { return Err(serde::ser::Error::custom( "InProcess callback can't be serialized in multiprocess mode", )); } // Due to the signature of `serialize` we need to clone the Arc to get an owned // pointer we can leak. // We additionally need to Box to get a thin pointer. let cloned_callback = Box::new(wrapped_callback.clone()); let sender_clone_addr = Box::leak(cloned_callback) as *mut Arc<_> as usize; s.serialize_newtype_variant("GenericCallback", 1, "InProcess", &sender_clone_addr) }, } } } struct GenericCallbackVisitor { marker: PhantomData, } impl<'de, T> serde::de::Visitor<'de> for GenericCallbackVisitor where T: Serialize + Deserialize<'de> + Send + 'static, { type Value = GenericCallback; fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { formatter.write_str("a GenericCallback variant") } fn visit_enum(self, data: A) -> Result where A: serde::de::EnumAccess<'de>, { #[derive(Deserialize)] enum GenericCallbackVariantNames { CrossProcess, InProcess, } let (variant_name, variant_data): (GenericCallbackVariantNames, _) = data.variant()?; match variant_name { GenericCallbackVariantNames::CrossProcess => variant_data .newtype_variant::>() .map(|sender| GenericCallback(GenericCallbackVariants::CrossProcess(sender))), GenericCallbackVariantNames::InProcess => { if opts::get().multiprocess { return Err(serde::de::Error::custom( "InProcess callback found in multiprocess mode", )); } let addr = variant_data.newtype_variant::()?; let ptr = addr as *mut Arc>; // SAFETY: We know we are in the same address space as the sender, so we can safely // reconstruct the Arc, that we previously leaked with `into_raw` during // serialization. // Attention: Code reviewers should carefully compare the deserialization here // with the serialization above. #[allow(unsafe_code)] let callback = unsafe { Box::from_raw(ptr) }; Ok(GenericCallback(GenericCallbackVariants::InProcess( *callback, ))) }, } } } impl<'a, T> Deserialize<'a> for GenericCallback where T: Serialize + Deserialize<'a> + Send + 'static, { fn deserialize(d: D) -> Result, D::Error> where D: Deserializer<'a>, { d.deserialize_enum( "GenericCallback", &["CrossProcess", "InProcess"], GenericCallbackVisitor { marker: PhantomData, }, ) } } impl fmt::Debug for GenericCallback where T: Serialize + Send + 'static, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "GenericCallback(..)") } } #[cfg(test)] mod single_process_callback_test { use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use crate::generic_channel::GenericCallback; #[test] fn generic_callback() { let number = Arc::new(AtomicUsize::new(0)); let number_clone = number.clone(); let callback = move |msg: Result| { number_clone.store(msg.unwrap(), Ordering::SeqCst) }; let generic_callback = GenericCallback::new(callback).unwrap(); std::thread::scope(|s| { s.spawn(move || generic_callback.send(42)); }); assert_eq!(number.load(Ordering::SeqCst), 42); } #[test] fn generic_callback_via_generic_sender() { let number = Arc::new(AtomicUsize::new(0)); let number_clone = number.clone(); let callback = move |msg: Result| { number_clone.store(msg.unwrap(), Ordering::SeqCst) }; let generic_callback = GenericCallback::new(callback).unwrap(); let (tx, rx) = crate::generic_channel::channel().unwrap(); tx.send(generic_callback).unwrap(); std::thread::scope(|s| { s.spawn(move || { let callback = rx.recv().unwrap(); callback.send(42).unwrap(); }); }); assert_eq!(number.load(Ordering::SeqCst), 42); } #[test] fn generic_callback_via_ipc_sender() { let number = Arc::new(AtomicUsize::new(0)); let number_clone = number.clone(); let callback = move |msg: Result| { number_clone.store(msg.unwrap(), Ordering::SeqCst) }; let generic_callback = GenericCallback::new(callback).unwrap(); let (tx, rx) = ipc_channel::ipc::channel().unwrap(); tx.send(generic_callback).unwrap(); std::thread::scope(|s| { s.spawn(move || { let callback = rx.recv().unwrap(); callback.send(42).unwrap(); }); }); assert_eq!(number.load(Ordering::SeqCst), 42); } }