ensure clean shutdown of all threads running JS

This commit is contained in:
Gregory Terzian 2020-06-24 15:07:48 +08:00
parent 0b61cfc3ae
commit 44ebca72da
25 changed files with 565 additions and 232 deletions

View file

@ -3,14 +3,17 @@
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use crate::sampler::{NativeStack, Sampler};
use crossbeam_channel::{after, unbounded, Receiver, Sender};
use crossbeam_channel::{after, never, unbounded, Receiver, Sender};
use ipc_channel::ipc::{IpcReceiver, IpcSender};
use ipc_channel::router::ROUTER;
use msg::constellation_msg::MonitoredComponentId;
use msg::constellation_msg::{
BackgroundHangMonitor, BackgroundHangMonitorClone, BackgroundHangMonitorRegister,
BackgroundHangMonitor, BackgroundHangMonitorClone, BackgroundHangMonitorExitSignal,
BackgroundHangMonitorRegister,
};
use msg::constellation_msg::{
BackgroundHangMonitorControlMsg, HangAlert, HangAnnotation, HangMonitorAlert,
};
use msg::constellation_msg::{HangAlert, HangAnnotation, HangMonitorAlert, SamplerControlMsg};
use std::cell::Cell;
use std::collections::{HashMap, VecDeque};
use std::thread;
@ -19,23 +22,32 @@ use std::time::{Duration, Instant};
#[derive(Clone)]
pub struct HangMonitorRegister {
sender: Sender<(MonitoredComponentId, MonitoredComponentMsg)>,
monitoring_enabled: bool,
}
impl HangMonitorRegister {
/// Start a new hang monitor worker, and return a handle to register components for monitoring.
pub fn init(
constellation_chan: IpcSender<HangMonitorAlert>,
control_port: IpcReceiver<SamplerControlMsg>,
control_port: IpcReceiver<BackgroundHangMonitorControlMsg>,
monitoring_enabled: bool,
) -> Box<dyn BackgroundHangMonitorRegister> {
let (sender, port) = unbounded();
let _ = thread::Builder::new().spawn(move || {
let mut monitor =
BackgroundHangMonitorWorker::new(constellation_chan, control_port, port);
let mut monitor = BackgroundHangMonitorWorker::new(
constellation_chan,
control_port,
port,
monitoring_enabled,
);
while monitor.run() {
// Monitoring until all senders have been dropped...
}
});
Box::new(HangMonitorRegister { sender })
Box::new(HangMonitorRegister {
sender,
monitoring_enabled,
})
}
}
@ -48,8 +60,13 @@ impl BackgroundHangMonitorRegister for HangMonitorRegister {
component_id: MonitoredComponentId,
transient_hang_timeout: Duration,
permanent_hang_timeout: Duration,
exit_signal: Option<Box<dyn BackgroundHangMonitorExitSignal>>,
) -> Box<dyn BackgroundHangMonitor> {
let bhm_chan = BackgroundHangMonitorChan::new(self.sender.clone(), component_id);
let bhm_chan = BackgroundHangMonitorChan::new(
self.sender.clone(),
component_id,
self.monitoring_enabled,
);
#[cfg(all(
target_os = "windows",
@ -71,6 +88,7 @@ impl BackgroundHangMonitorRegister for HangMonitorRegister {
thread::current().name().map(str::to_owned),
transient_hang_timeout,
permanent_hang_timeout,
exit_signal,
));
Box::new(bhm_chan)
}
@ -85,7 +103,13 @@ impl BackgroundHangMonitorClone for HangMonitorRegister {
/// Messages sent from monitored components to the monitor.
pub enum MonitoredComponentMsg {
/// Register component for monitoring,
Register(Box<dyn Sampler>, Option<String>, Duration, Duration),
Register(
Box<dyn Sampler>,
Option<String>,
Duration,
Duration,
Option<Box<dyn BackgroundHangMonitorExitSignal>>,
),
/// Unregister component for monitoring.
Unregister,
/// Notify start of new activity for a given component,
@ -101,17 +125,20 @@ pub struct BackgroundHangMonitorChan {
sender: Sender<(MonitoredComponentId, MonitoredComponentMsg)>,
component_id: MonitoredComponentId,
disconnected: Cell<bool>,
monitoring_enabled: bool,
}
impl BackgroundHangMonitorChan {
pub fn new(
sender: Sender<(MonitoredComponentId, MonitoredComponentMsg)>,
component_id: MonitoredComponentId,
monitoring_enabled: bool,
) -> Self {
BackgroundHangMonitorChan {
sender,
component_id: component_id,
disconnected: Default::default(),
monitoring_enabled,
}
}
@ -128,12 +155,16 @@ impl BackgroundHangMonitorChan {
impl BackgroundHangMonitor for BackgroundHangMonitorChan {
fn notify_activity(&self, annotation: HangAnnotation) {
let msg = MonitoredComponentMsg::NotifyActivity(annotation);
self.send(msg);
if self.monitoring_enabled {
let msg = MonitoredComponentMsg::NotifyActivity(annotation);
self.send(msg);
}
}
fn notify_wait(&self) {
let msg = MonitoredComponentMsg::NotifyWait;
self.send(msg);
if self.monitoring_enabled {
let msg = MonitoredComponentMsg::NotifyWait;
self.send(msg);
}
}
fn unregister(&self) {
let msg = MonitoredComponentMsg::Unregister;
@ -150,6 +181,7 @@ struct MonitoredComponent {
sent_transient_alert: bool,
sent_permanent_alert: bool,
is_waiting: bool,
exit_signal: Option<Box<dyn BackgroundHangMonitorExitSignal>>,
}
struct Sample(MonitoredComponentId, Instant, NativeStack);
@ -159,20 +191,22 @@ pub struct BackgroundHangMonitorWorker {
monitored_components: HashMap<MonitoredComponentId, MonitoredComponent>,
constellation_chan: IpcSender<HangMonitorAlert>,
port: Receiver<(MonitoredComponentId, MonitoredComponentMsg)>,
control_port: Receiver<SamplerControlMsg>,
control_port: Receiver<BackgroundHangMonitorControlMsg>,
sampling_duration: Option<Duration>,
sampling_max_duration: Option<Duration>,
last_sample: Instant,
creation: Instant,
sampling_baseline: Instant,
samples: VecDeque<Sample>,
monitoring_enabled: bool,
}
impl BackgroundHangMonitorWorker {
pub fn new(
constellation_chan: IpcSender<HangMonitorAlert>,
control_port: IpcReceiver<SamplerControlMsg>,
control_port: IpcReceiver<BackgroundHangMonitorControlMsg>,
port: Receiver<(MonitoredComponentId, MonitoredComponentMsg)>,
monitoring_enabled: bool,
) -> Self {
let control_port = ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(control_port);
Self {
@ -187,6 +221,7 @@ impl BackgroundHangMonitorWorker {
sampling_baseline: Instant::now(),
creation: Instant::now(),
samples: Default::default(),
monitoring_enabled,
}
}
@ -232,13 +267,19 @@ impl BackgroundHangMonitorWorker {
}
pub fn run(&mut self) -> bool {
let timeout = if let Some(duration) = self.sampling_duration {
duration
let tick = if let Some(duration) = self.sampling_duration {
let duration = duration
.checked_sub(Instant::now() - self.last_sample)
.unwrap_or_else(|| Duration::from_millis(0))
.unwrap_or_else(|| Duration::from_millis(0));
after(duration)
} else {
Duration::from_millis(100)
if self.monitoring_enabled {
after(Duration::from_millis(100))
} else {
never()
}
};
let received = select! {
recv(self.port) -> event => {
match event {
@ -249,24 +290,38 @@ impl BackgroundHangMonitorWorker {
},
recv(self.control_port) -> event => {
match event {
Ok(SamplerControlMsg::Enable(rate, max_duration)) => {
Ok(BackgroundHangMonitorControlMsg::EnableSampler(rate, max_duration)) => {
println!("Enabling profiler.");
self.sampling_duration = Some(rate);
self.sampling_max_duration = Some(max_duration);
self.sampling_baseline = Instant::now();
None
}
Ok(SamplerControlMsg::Disable) => {
},
Ok(BackgroundHangMonitorControlMsg::DisableSampler) => {
println!("Disabling profiler.");
self.finish_sampled_profile();
self.sampling_duration = None;
None
}
return true;
},
Ok(BackgroundHangMonitorControlMsg::Exit(sender)) => {
for component in self.monitored_components.values() {
if let Some(signal) = component.exit_signal.as_ref() {
signal.signal_to_exit();
}
}
// Confirm exit with to the constellation.
let _ = sender.send(());
// Also exit the BHM.
return false;
},
Err(_) => return false,
}
}
recv(after(timeout)) -> _ => None,
recv(tick) -> _ => None,
};
if let Some(msg) = received {
self.handle_msg(msg);
while let Ok(another_msg) = self.port.try_recv() {
@ -297,6 +352,7 @@ impl BackgroundHangMonitorWorker {
name,
transient_hang_timeout,
permanent_hang_timeout,
exit_signal,
),
) => {
let component = MonitoredComponent {
@ -308,6 +364,7 @@ impl BackgroundHangMonitorWorker {
sent_transient_alert: false,
sent_permanent_alert: false,
is_waiting: true,
exit_signal,
};
if let Some(name) = name {
self.component_names.insert(component_id.clone(), name);

View file

@ -9,8 +9,13 @@ use background_hang_monitor::HangMonitorRegister;
use ipc_channel::ipc;
use msg::constellation_msg::ScriptHangAnnotation;
use msg::constellation_msg::TEST_PIPELINE_ID;
use msg::constellation_msg::{HangAlert, HangAnnotation, HangMonitorAlert};
use msg::constellation_msg::{
BackgroundHangMonitorControlMsg, BackgroundHangMonitorExitSignal, HangAlert, HangAnnotation,
HangMonitorAlert,
};
use msg::constellation_msg::{MonitoredComponentId, MonitoredComponentType};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::time::Duration;
@ -27,12 +32,16 @@ fn test_hang_monitoring() {
ipc::channel().expect("ipc channel failure");
let (_sampler_sender, sampler_receiver) = ipc::channel().expect("ipc channel failure");
let background_hang_monitor_register =
HangMonitorRegister::init(background_hang_monitor_ipc_sender.clone(), sampler_receiver);
let background_hang_monitor_register = HangMonitorRegister::init(
background_hang_monitor_ipc_sender.clone(),
sampler_receiver,
true,
);
let background_hang_monitor = background_hang_monitor_register.register_component(
MonitoredComponentId(TEST_PIPELINE_ID, MonitoredComponentType::Script),
Duration::from_millis(10),
Duration::from_millis(1000),
None,
);
// Start an activity.
@ -125,12 +134,16 @@ fn test_hang_monitoring_unregister() {
ipc::channel().expect("ipc channel failure");
let (_sampler_sender, sampler_receiver) = ipc::channel().expect("ipc channel failure");
let background_hang_monitor_register =
HangMonitorRegister::init(background_hang_monitor_ipc_sender.clone(), sampler_receiver);
let background_hang_monitor_register = HangMonitorRegister::init(
background_hang_monitor_ipc_sender.clone(),
sampler_receiver,
true,
);
let background_hang_monitor = background_hang_monitor_register.register_component(
MonitoredComponentId(TEST_PIPELINE_ID, MonitoredComponentType::Script),
Duration::from_millis(10),
Duration::from_millis(1000),
None,
);
// Start an activity.
@ -146,3 +159,50 @@ fn test_hang_monitoring_unregister() {
// No new alert yet
assert!(background_hang_monitor_receiver.try_recv().is_err());
}
#[test]
fn test_hang_monitoring_exit_signal() {
let _lock = SERIAL.lock().unwrap();
let (background_hang_monitor_ipc_sender, _background_hang_monitor_receiver) =
ipc::channel().expect("ipc channel failure");
let (control_sender, control_receiver) = ipc::channel().expect("ipc channel failure");
struct BHMExitSignal {
closing: Arc<AtomicBool>,
}
impl BackgroundHangMonitorExitSignal for BHMExitSignal {
fn signal_to_exit(&self) {
self.closing.store(true, Ordering::SeqCst);
}
}
let closing = Arc::new(AtomicBool::new(false));
let signal = BHMExitSignal {
closing: closing.clone(),
};
// Init a worker, without active monitoring.
let background_hang_monitor_register = HangMonitorRegister::init(
background_hang_monitor_ipc_sender.clone(),
control_receiver,
false,
);
let _background_hang_monitor = background_hang_monitor_register.register_component(
MonitoredComponentId(TEST_PIPELINE_ID, MonitoredComponentType::Script),
Duration::from_millis(10),
Duration::from_millis(1000),
Some(Box::new(signal)),
);
let (exit_sender, exit_receiver) = ipc::channel().expect("Failed to create IPC channel!");
// Send the exit message.
let _ = control_sender.send(BackgroundHangMonitorControlMsg::Exit(exit_sender));
// Assert we receive a confirmation back.
assert!(exit_receiver.recv().is_ok());
// Assert we get the exit signal.
while !closing.load(Ordering::SeqCst) {}
}