From 602c02edd29029ab16e988e4bb5e2cddc1c1da5b Mon Sep 17 00:00:00 2001 From: yvt Date: Sun, 20 Jun 2021 23:29:16 +0900 Subject: [PATCH] fix(bhm): deliver exit signal reliably (for real) > However, as it turns out, `crossbeam-channel`'s channels don't drop > remaining messages until all associated senders *and* receivers are > dropped. This means the exit signal won't be delivered as long as > there's at least one `HangMonitorRegister` or > `BackgroundHangMonitorChan` maintaining a copy of the sender. To work > around this and guarantee a rapid delivery of the exit signal, the > sender is wrapped in `Arc`, and only the worker thread maintains a > strong reference, thus ensuring both the sender and receiver are > dropped as soon as the worker thread exits. --- .../background_hang_monitor.rs | 129 +++++++++++++++--- 1 file changed, 107 insertions(+), 22 deletions(-) diff --git a/components/background_hang_monitor/background_hang_monitor.rs b/components/background_hang_monitor/background_hang_monitor.rs index 7d06f61f633..ab1e4e5940c 100644 --- a/components/background_hang_monitor/background_hang_monitor.rs +++ b/components/background_hang_monitor/background_hang_monitor.rs @@ -16,12 +16,14 @@ use msg::constellation_msg::{ }; use std::cell::Cell; use std::collections::{HashMap, VecDeque}; +use std::sync::{Arc, Weak}; use std::thread; use std::time::{Duration, Instant}; #[derive(Clone)] pub struct HangMonitorRegister { - sender: Sender<(MonitoredComponentId, MonitoredComponentMsg)>, + sender: Weak>, + tether: Sender, monitoring_enabled: bool, } @@ -32,27 +34,37 @@ impl HangMonitorRegister { control_port: IpcReceiver, monitoring_enabled: bool, ) -> Box { + // Create a channel to pass messages of type `MonitoredComponentMsg`. + // See the discussion in `::register_component` for why we wrap + // the sender with `Arc` and why `HangMonitorRegister` only maintains + // a weak reference to it. let (sender, port) = unbounded(); + let sender = Arc::new(sender); + let sender_weak = Arc::downgrade(&sender); + + // Create a "tether" channel, whose sole purpose is to keep the worker + // thread alive. The worker thread will terminates when all copies of + // `tether` are dropped. + let (tether, tether_port) = unbounded(); + let _ = thread::Builder::new() .spawn(move || { let mut monitor = BackgroundHangMonitorWorker::new( constellation_chan, control_port, - port, + (sender, port), + tether_port, monitoring_enabled, ); while monitor.run() { // Monitoring until all senders have been dropped... } - - // Suppress `signal_to_exit` - for (_, mut component) in monitor.monitored_components.drain() { - component.exit_signal.release(); - } }) .expect("Couldn't start BHM worker."); Box::new(HangMonitorRegister { - sender, + sender: sender_weak, + tether, monitoring_enabled, }) } @@ -71,6 +83,7 @@ impl BackgroundHangMonitorRegister for HangMonitorRegister { ) -> Box { let bhm_chan = BackgroundHangMonitorChan::new( self.sender.clone(), + self.tether.clone(), component_id, self.monitoring_enabled, ); @@ -90,21 +103,54 @@ impl BackgroundHangMonitorRegister for HangMonitorRegister { #[cfg(any(target_os = "android", target_arch = "arm", target_arch = "aarch64"))] let sampler = crate::sampler::DummySampler::new(); - // There's a race condition between the reception of - // `BackgroundHangMonitorControlMsg::Exit` and - // `MonitoredComponentMsg::Register`. When the worker receives `Exit`, - // it stops receiving messages, and any remaining messages (including - // the `MonitoredComponentMsg::Register` we sent) in the channel are - // dropped. Wrapping `exit_signal` with this RAII wrapper ensures the - // exit signal is delivered even in such cases. + // When a component is registered, and there's an exit request that + // reached BHM, we want an exit signal to be delivered to the + // component's exit signal handler eventually. However, there's a race + // condition between the reception of `BackgroundHangMonitorControlMsg:: + // Exit` and `MonitoredComponentMsg::Register` that needs to handled + // carefully. When the worker receives an `Exit` message, it stops + // processing messages, and any further `Register` messages sent to the + // worker thread are ignored. If the submissions of `Exit` and + // `Register` messages are far apart enough, the channel is closed by + // the time the client attempts to send a `Register` message, and + // therefore the client can figure out by `Sender::send`'s return value + // that it must deliver an exit signal. However, if these message + // submissions are close enough, the `Register` message is still sent, + // but the worker thread might exit before it sees the message, leaving + // the message unprocessed and the exit signal unsent. + // + // To fix this, we wrap the exit signal handler in an RAII wrapper of + // type `SignalToExitOnDrop` to automatically send a signal when it's + // dropped. This way, we can make sure the exit signal is sent even if + // the message couldn't reach the worker thread and be processed. + // + // However, as it turns out, `crossbeam-channel`'s channels don't drop + // remaining messages until all associated senders *and* receivers are + // dropped. This means the exit signal won't be delivered as long as + // there's at least one `HangMonitorRegister` or + // `BackgroundHangMonitorChan` maintaining a copy of the sender. To work + // around this and guarantee a rapid delivery of the exit signal, the + // sender is wrapped in `Arc`, and only the worker thread maintains a + // strong reference, thus ensuring both the sender and receiver are + // dropped as soon as the worker thread exits. let exit_signal = SignalToExitOnDrop(exit_signal); + // If the tether is dropped after this call, the worker thread might + // exit before processing the `Register` message because there's no + // implicit ordering guarantee between two channels. If this happens, + // an exit signal will be sent despite we haven't received a + // corresponding exit request. To enforce the correct ordering and + // prevent a false exit signal from being sent, we include a copy of + // `self.tether` in the `Register` message. + let tether = self.tether.clone(); + bhm_chan.send(MonitoredComponentMsg::Register( sampler, thread::current().name().map(str::to_owned), transient_hang_timeout, permanent_hang_timeout, exit_signal, + tether, )); Box::new(bhm_chan) } @@ -125,6 +171,7 @@ enum MonitoredComponentMsg { Duration, Duration, SignalToExitOnDrop, + Sender, ), /// Unregister component for monitoring. Unregister, @@ -134,11 +181,15 @@ enum MonitoredComponentMsg { NotifyWait, } +/// Stable equivalent to the `!` type +enum Never {} + /// A wrapper around a sender to the monitor, /// which will send the Id of the monitored component along with each message, /// and keep track of whether the monitor is still listening on the other end. struct BackgroundHangMonitorChan { - sender: Sender<(MonitoredComponentId, MonitoredComponentMsg)>, + sender: Weak>, + _tether: Sender, component_id: MonitoredComponentId, disconnected: Cell, monitoring_enabled: bool, @@ -146,12 +197,14 @@ struct BackgroundHangMonitorChan { impl BackgroundHangMonitorChan { fn new( - sender: Sender<(MonitoredComponentId, MonitoredComponentMsg)>, + sender: Weak>, + tether: Sender, component_id: MonitoredComponentId, monitoring_enabled: bool, ) -> Self { BackgroundHangMonitorChan { sender, + _tether: tether, component_id: component_id, disconnected: Default::default(), monitoring_enabled, @@ -162,7 +215,17 @@ impl BackgroundHangMonitorChan { if self.disconnected.get() { return; } - if let Err(_) = self.sender.send((self.component_id.clone(), msg)) { + + // The worker thread owns both the receiver *and* the only strong + // reference to the sender. An `upgrade` failure means the latter is + // gone, and a `send` failure means the former is gone. They are dropped + // simultaneously, but we might observe an intermediate state. + if self + .sender + .upgrade() + .and_then(|sender| sender.send((self.component_id.clone(), msg)).ok()) + .is_none() + { warn!("BackgroundHangMonitor has gone away"); self.disconnected.set(true); } @@ -234,6 +297,8 @@ struct BackgroundHangMonitorWorker { monitored_components: HashMap, constellation_chan: IpcSender, port: Receiver<(MonitoredComponentId, MonitoredComponentMsg)>, + _port_sender: Arc>, + tether_port: Receiver, control_port: Receiver, sampling_duration: Option, sampling_max_duration: Option, @@ -248,7 +313,11 @@ impl BackgroundHangMonitorWorker { fn new( constellation_chan: IpcSender, control_port: IpcReceiver, - port: Receiver<(MonitoredComponentId, MonitoredComponentMsg)>, + (port_sender, port): ( + Arc>, + Receiver<(MonitoredComponentId, MonitoredComponentMsg)>, + ), + tether_port: Receiver, monitoring_enabled: bool, ) -> Self { let control_port = ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(control_port); @@ -257,6 +326,8 @@ impl BackgroundHangMonitorWorker { monitored_components: Default::default(), constellation_chan, port, + _port_sender: port_sender, + tether_port, control_port, sampling_duration: None, sampling_max_duration: None, @@ -325,11 +396,24 @@ impl BackgroundHangMonitorWorker { let received = select! { recv(self.port) -> event => { + // Since we own the `Arc>`, the channel never + // gets disconnected. + Some(event.unwrap()) + }, + recv(self.tether_port) -> event => { + // This arm can only reached by a tether disconnection match event { - Ok(msg) => Some(msg), - // Our sender has been dropped, quit. - Err(_) => return false, + Ok(x) => match x {} + Err(_) => {} } + + // All associated `HangMonitorRegister` and + // `BackgroundHangMonitorChan` have been dropped. Suppress + // `signal_to_exit` and exit the BHM. + for component in self.monitored_components.values_mut() { + component.exit_signal.release(); + } + return false; }, recv(self.control_port) -> event => { match event { @@ -394,6 +478,7 @@ impl BackgroundHangMonitorWorker { transient_hang_timeout, permanent_hang_timeout, exit_signal, + _tether, ), ) => { let component = MonitoredComponent {