fix(bhm): deliver exit signal reliably (for real)

> However, as it turns out, `crossbeam-channel`'s channels don't drop
> remaining messages until all associated senders *and* receivers are
> dropped. This means the exit signal won't be delivered as long as
> there's at least one `HangMonitorRegister` or
> `BackgroundHangMonitorChan` maintaining a copy of the sender. To work
> around this and guarantee a rapid delivery of the exit signal, the
> sender is wrapped in `Arc`, and only the worker thread maintains a
> strong reference, thus ensuring both the sender and receiver are
> dropped as soon as the worker thread exits.
This commit is contained in:
yvt 2021-06-20 23:29:16 +09:00
parent 18c79cafac
commit 602c02edd2

View file

@ -16,12 +16,14 @@ use msg::constellation_msg::{
}; };
use std::cell::Cell; use std::cell::Cell;
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Weak};
use std::thread; use std::thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
#[derive(Clone)] #[derive(Clone)]
pub struct HangMonitorRegister { pub struct HangMonitorRegister {
sender: Sender<(MonitoredComponentId, MonitoredComponentMsg)>, sender: Weak<Sender<(MonitoredComponentId, MonitoredComponentMsg)>>,
tether: Sender<Never>,
monitoring_enabled: bool, monitoring_enabled: bool,
} }
@ -32,27 +34,37 @@ impl HangMonitorRegister {
control_port: IpcReceiver<BackgroundHangMonitorControlMsg>, control_port: IpcReceiver<BackgroundHangMonitorControlMsg>,
monitoring_enabled: bool, monitoring_enabled: bool,
) -> Box<dyn BackgroundHangMonitorRegister> { ) -> Box<dyn BackgroundHangMonitorRegister> {
// Create a channel to pass messages of type `MonitoredComponentMsg`.
// See the discussion in `<HangMonitorRegister as
// BackgroundHangMonitorRegister>::register_component` for why we wrap
// the sender with `Arc` and why `HangMonitorRegister` only maintains
// a weak reference to it.
let (sender, port) = unbounded(); let (sender, port) = unbounded();
let sender = Arc::new(sender);
let sender_weak = Arc::downgrade(&sender);
// Create a "tether" channel, whose sole purpose is to keep the worker
// thread alive. The worker thread will terminates when all copies of
// `tether` are dropped.
let (tether, tether_port) = unbounded();
let _ = thread::Builder::new() let _ = thread::Builder::new()
.spawn(move || { .spawn(move || {
let mut monitor = BackgroundHangMonitorWorker::new( let mut monitor = BackgroundHangMonitorWorker::new(
constellation_chan, constellation_chan,
control_port, control_port,
port, (sender, port),
tether_port,
monitoring_enabled, monitoring_enabled,
); );
while monitor.run() { while monitor.run() {
// Monitoring until all senders have been dropped... // Monitoring until all senders have been dropped...
} }
// Suppress `signal_to_exit`
for (_, mut component) in monitor.monitored_components.drain() {
component.exit_signal.release();
}
}) })
.expect("Couldn't start BHM worker."); .expect("Couldn't start BHM worker.");
Box::new(HangMonitorRegister { Box::new(HangMonitorRegister {
sender, sender: sender_weak,
tether,
monitoring_enabled, monitoring_enabled,
}) })
} }
@ -71,6 +83,7 @@ impl BackgroundHangMonitorRegister for HangMonitorRegister {
) -> Box<dyn BackgroundHangMonitor> { ) -> Box<dyn BackgroundHangMonitor> {
let bhm_chan = BackgroundHangMonitorChan::new( let bhm_chan = BackgroundHangMonitorChan::new(
self.sender.clone(), self.sender.clone(),
self.tether.clone(),
component_id, component_id,
self.monitoring_enabled, self.monitoring_enabled,
); );
@ -90,21 +103,54 @@ impl BackgroundHangMonitorRegister for HangMonitorRegister {
#[cfg(any(target_os = "android", target_arch = "arm", target_arch = "aarch64"))] #[cfg(any(target_os = "android", target_arch = "arm", target_arch = "aarch64"))]
let sampler = crate::sampler::DummySampler::new(); let sampler = crate::sampler::DummySampler::new();
// There's a race condition between the reception of // When a component is registered, and there's an exit request that
// `BackgroundHangMonitorControlMsg::Exit` and // reached BHM, we want an exit signal to be delivered to the
// `MonitoredComponentMsg::Register`. When the worker receives `Exit`, // component's exit signal handler eventually. However, there's a race
// it stops receiving messages, and any remaining messages (including // condition between the reception of `BackgroundHangMonitorControlMsg::
// the `MonitoredComponentMsg::Register` we sent) in the channel are // Exit` and `MonitoredComponentMsg::Register` that needs to handled
// dropped. Wrapping `exit_signal` with this RAII wrapper ensures the // carefully. When the worker receives an `Exit` message, it stops
// exit signal is delivered even in such cases. // processing messages, and any further `Register` messages sent to the
// worker thread are ignored. If the submissions of `Exit` and
// `Register` messages are far apart enough, the channel is closed by
// the time the client attempts to send a `Register` message, and
// therefore the client can figure out by `Sender::send`'s return value
// that it must deliver an exit signal. However, if these message
// submissions are close enough, the `Register` message is still sent,
// but the worker thread might exit before it sees the message, leaving
// the message unprocessed and the exit signal unsent.
//
// To fix this, we wrap the exit signal handler in an RAII wrapper of
// type `SignalToExitOnDrop` to automatically send a signal when it's
// dropped. This way, we can make sure the exit signal is sent even if
// the message couldn't reach the worker thread and be processed.
//
// However, as it turns out, `crossbeam-channel`'s channels don't drop
// remaining messages until all associated senders *and* receivers are
// dropped. This means the exit signal won't be delivered as long as
// there's at least one `HangMonitorRegister` or
// `BackgroundHangMonitorChan` maintaining a copy of the sender. To work
// around this and guarantee a rapid delivery of the exit signal, the
// sender is wrapped in `Arc`, and only the worker thread maintains a
// strong reference, thus ensuring both the sender and receiver are
// dropped as soon as the worker thread exits.
let exit_signal = SignalToExitOnDrop(exit_signal); let exit_signal = SignalToExitOnDrop(exit_signal);
// If the tether is dropped after this call, the worker thread might
// exit before processing the `Register` message because there's no
// implicit ordering guarantee between two channels. If this happens,
// an exit signal will be sent despite we haven't received a
// corresponding exit request. To enforce the correct ordering and
// prevent a false exit signal from being sent, we include a copy of
// `self.tether` in the `Register` message.
let tether = self.tether.clone();
bhm_chan.send(MonitoredComponentMsg::Register( bhm_chan.send(MonitoredComponentMsg::Register(
sampler, sampler,
thread::current().name().map(str::to_owned), thread::current().name().map(str::to_owned),
transient_hang_timeout, transient_hang_timeout,
permanent_hang_timeout, permanent_hang_timeout,
exit_signal, exit_signal,
tether,
)); ));
Box::new(bhm_chan) Box::new(bhm_chan)
} }
@ -125,6 +171,7 @@ enum MonitoredComponentMsg {
Duration, Duration,
Duration, Duration,
SignalToExitOnDrop, SignalToExitOnDrop,
Sender<Never>,
), ),
/// Unregister component for monitoring. /// Unregister component for monitoring.
Unregister, Unregister,
@ -134,11 +181,15 @@ enum MonitoredComponentMsg {
NotifyWait, NotifyWait,
} }
/// Stable equivalent to the `!` type
enum Never {}
/// A wrapper around a sender to the monitor, /// A wrapper around a sender to the monitor,
/// which will send the Id of the monitored component along with each message, /// which will send the Id of the monitored component along with each message,
/// and keep track of whether the monitor is still listening on the other end. /// and keep track of whether the monitor is still listening on the other end.
struct BackgroundHangMonitorChan { struct BackgroundHangMonitorChan {
sender: Sender<(MonitoredComponentId, MonitoredComponentMsg)>, sender: Weak<Sender<(MonitoredComponentId, MonitoredComponentMsg)>>,
_tether: Sender<Never>,
component_id: MonitoredComponentId, component_id: MonitoredComponentId,
disconnected: Cell<bool>, disconnected: Cell<bool>,
monitoring_enabled: bool, monitoring_enabled: bool,
@ -146,12 +197,14 @@ struct BackgroundHangMonitorChan {
impl BackgroundHangMonitorChan { impl BackgroundHangMonitorChan {
fn new( fn new(
sender: Sender<(MonitoredComponentId, MonitoredComponentMsg)>, sender: Weak<Sender<(MonitoredComponentId, MonitoredComponentMsg)>>,
tether: Sender<Never>,
component_id: MonitoredComponentId, component_id: MonitoredComponentId,
monitoring_enabled: bool, monitoring_enabled: bool,
) -> Self { ) -> Self {
BackgroundHangMonitorChan { BackgroundHangMonitorChan {
sender, sender,
_tether: tether,
component_id: component_id, component_id: component_id,
disconnected: Default::default(), disconnected: Default::default(),
monitoring_enabled, monitoring_enabled,
@ -162,7 +215,17 @@ impl BackgroundHangMonitorChan {
if self.disconnected.get() { if self.disconnected.get() {
return; return;
} }
if let Err(_) = self.sender.send((self.component_id.clone(), msg)) {
// The worker thread owns both the receiver *and* the only strong
// reference to the sender. An `upgrade` failure means the latter is
// gone, and a `send` failure means the former is gone. They are dropped
// simultaneously, but we might observe an intermediate state.
if self
.sender
.upgrade()
.and_then(|sender| sender.send((self.component_id.clone(), msg)).ok())
.is_none()
{
warn!("BackgroundHangMonitor has gone away"); warn!("BackgroundHangMonitor has gone away");
self.disconnected.set(true); self.disconnected.set(true);
} }
@ -234,6 +297,8 @@ struct BackgroundHangMonitorWorker {
monitored_components: HashMap<MonitoredComponentId, MonitoredComponent>, monitored_components: HashMap<MonitoredComponentId, MonitoredComponent>,
constellation_chan: IpcSender<HangMonitorAlert>, constellation_chan: IpcSender<HangMonitorAlert>,
port: Receiver<(MonitoredComponentId, MonitoredComponentMsg)>, port: Receiver<(MonitoredComponentId, MonitoredComponentMsg)>,
_port_sender: Arc<Sender<(MonitoredComponentId, MonitoredComponentMsg)>>,
tether_port: Receiver<Never>,
control_port: Receiver<BackgroundHangMonitorControlMsg>, control_port: Receiver<BackgroundHangMonitorControlMsg>,
sampling_duration: Option<Duration>, sampling_duration: Option<Duration>,
sampling_max_duration: Option<Duration>, sampling_max_duration: Option<Duration>,
@ -248,7 +313,11 @@ impl BackgroundHangMonitorWorker {
fn new( fn new(
constellation_chan: IpcSender<HangMonitorAlert>, constellation_chan: IpcSender<HangMonitorAlert>,
control_port: IpcReceiver<BackgroundHangMonitorControlMsg>, control_port: IpcReceiver<BackgroundHangMonitorControlMsg>,
port: Receiver<(MonitoredComponentId, MonitoredComponentMsg)>, (port_sender, port): (
Arc<Sender<(MonitoredComponentId, MonitoredComponentMsg)>>,
Receiver<(MonitoredComponentId, MonitoredComponentMsg)>,
),
tether_port: Receiver<Never>,
monitoring_enabled: bool, monitoring_enabled: bool,
) -> Self { ) -> Self {
let control_port = ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(control_port); let control_port = ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(control_port);
@ -257,6 +326,8 @@ impl BackgroundHangMonitorWorker {
monitored_components: Default::default(), monitored_components: Default::default(),
constellation_chan, constellation_chan,
port, port,
_port_sender: port_sender,
tether_port,
control_port, control_port,
sampling_duration: None, sampling_duration: None,
sampling_max_duration: None, sampling_max_duration: None,
@ -325,11 +396,24 @@ impl BackgroundHangMonitorWorker {
let received = select! { let received = select! {
recv(self.port) -> event => { recv(self.port) -> event => {
// Since we own the `Arc<Sender<_>>`, the channel never
// gets disconnected.
Some(event.unwrap())
},
recv(self.tether_port) -> event => {
// This arm can only reached by a tether disconnection
match event { match event {
Ok(msg) => Some(msg), Ok(x) => match x {}
// Our sender has been dropped, quit. Err(_) => {}
Err(_) => return false,
} }
// All associated `HangMonitorRegister` and
// `BackgroundHangMonitorChan` have been dropped. Suppress
// `signal_to_exit` and exit the BHM.
for component in self.monitored_components.values_mut() {
component.exit_signal.release();
}
return false;
}, },
recv(self.control_port) -> event => { recv(self.control_port) -> event => {
match event { match event {
@ -394,6 +478,7 @@ impl BackgroundHangMonitorWorker {
transient_hang_timeout, transient_hang_timeout,
permanent_hang_timeout, permanent_hang_timeout,
exit_signal, exit_signal,
_tether,
), ),
) => { ) => {
let component = MonitoredComponent { let component = MonitoredComponent {