mirror of
https://github.com/servo/servo.git
synced 2025-07-24 15:50:21 +01:00
update timer scheduler to use crossbeam
This commit is contained in:
parent
32eb858a6a
commit
c893c8955d
4 changed files with 73 additions and 107 deletions
|
@ -110,7 +110,7 @@ use canvas_traits::canvas::CanvasMsg;
|
||||||
use compositing::compositor_thread::CompositorProxy;
|
use compositing::compositor_thread::CompositorProxy;
|
||||||
use compositing::compositor_thread::Msg as ToCompositorMsg;
|
use compositing::compositor_thread::Msg as ToCompositorMsg;
|
||||||
use compositing::SendableFrameTree;
|
use compositing::SendableFrameTree;
|
||||||
use crossbeam_channel::{unbounded, Receiver, Sender};
|
use crossbeam_channel::{after, never, unbounded, Receiver, Sender};
|
||||||
use devtools_traits::{ChromeToDevtoolsControlMsg, DevtoolsControlMsg};
|
use devtools_traits::{ChromeToDevtoolsControlMsg, DevtoolsControlMsg};
|
||||||
use embedder_traits::{Cursor, EmbedderMsg, EmbedderProxy, EventLoopWaker};
|
use embedder_traits::{Cursor, EmbedderMsg, EmbedderProxy, EventLoopWaker};
|
||||||
use euclid::{default::Size2D as UntypedSize2D, Scale, Size2D};
|
use euclid::{default::Size2D as UntypedSize2D, Scale, Size2D};
|
||||||
|
@ -329,10 +329,15 @@ pub struct Constellation<Message, LTF, STF> {
|
||||||
/// memory profiler thread.
|
/// memory profiler thread.
|
||||||
mem_profiler_chan: mem::ProfilerChan,
|
mem_profiler_chan: mem::ProfilerChan,
|
||||||
|
|
||||||
/// A channel for the constellation to send messages to the
|
/// A channel for a pipeline to schedule timer events.
|
||||||
/// timer thread.
|
|
||||||
scheduler_chan: IpcSender<TimerSchedulerMsg>,
|
scheduler_chan: IpcSender<TimerSchedulerMsg>,
|
||||||
|
|
||||||
|
/// The receiver to which the IPC requests from scheduler_chan will be forwarded.
|
||||||
|
scheduler_receiver: Receiver<Result<TimerSchedulerMsg, IpcError>>,
|
||||||
|
|
||||||
|
/// The logic and data behing scheduling timer events.
|
||||||
|
timer_scheduler: TimerScheduler,
|
||||||
|
|
||||||
/// A single WebRender document the constellation operates on.
|
/// A single WebRender document the constellation operates on.
|
||||||
webrender_document: webrender_api::DocumentId,
|
webrender_document: webrender_api::DocumentId,
|
||||||
|
|
||||||
|
@ -686,6 +691,12 @@ where
|
||||||
ipc_namespace_receiver,
|
ipc_namespace_receiver,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let (scheduler_chan, ipc_scheduler_receiver) =
|
||||||
|
ipc::channel().expect("ipc channel failure");
|
||||||
|
let scheduler_receiver = route_ipc_receiver_to_new_mpsc_receiver_preserving_errors(
|
||||||
|
ipc_scheduler_receiver,
|
||||||
|
);
|
||||||
|
|
||||||
let (background_hang_monitor_sender, ipc_bhm_receiver) =
|
let (background_hang_monitor_sender, ipc_bhm_receiver) =
|
||||||
ipc::channel().expect("ipc channel failure");
|
ipc::channel().expect("ipc channel failure");
|
||||||
let background_hang_monitor_receiver =
|
let background_hang_monitor_receiver =
|
||||||
|
@ -765,7 +776,9 @@ where
|
||||||
},
|
},
|
||||||
phantom: PhantomData,
|
phantom: PhantomData,
|
||||||
webdriver: WebDriverData::new(),
|
webdriver: WebDriverData::new(),
|
||||||
scheduler_chan: TimerScheduler::start(),
|
timer_scheduler: TimerScheduler::new(),
|
||||||
|
scheduler_chan,
|
||||||
|
scheduler_receiver,
|
||||||
document_states: HashMap::new(),
|
document_states: HashMap::new(),
|
||||||
webrender_document: state.webrender_document,
|
webrender_document: state.webrender_document,
|
||||||
webrender_api_sender: state.webrender_api_sender,
|
webrender_api_sender: state.webrender_api_sender,
|
||||||
|
@ -1179,8 +1192,16 @@ where
|
||||||
Layout(FromLayoutMsg),
|
Layout(FromLayoutMsg),
|
||||||
NetworkListener((PipelineId, FetchResponseMsg)),
|
NetworkListener((PipelineId, FetchResponseMsg)),
|
||||||
FromSWManager(SWManagerMsg),
|
FromSWManager(SWManagerMsg),
|
||||||
|
Timer(TimerSchedulerMsg),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A timeout corresponding to the earliest scheduled timer event, if any.
|
||||||
|
let scheduler_timeout = self
|
||||||
|
.timer_scheduler
|
||||||
|
.check_timers()
|
||||||
|
.map(|timeout| after(timeout))
|
||||||
|
.unwrap_or(never());
|
||||||
|
|
||||||
// Get one incoming request.
|
// Get one incoming request.
|
||||||
// This is one of the few places where the compositor is
|
// This is one of the few places where the compositor is
|
||||||
// allowed to panic. If one of the receiver.recv() calls
|
// allowed to panic. If one of the receiver.recv() calls
|
||||||
|
@ -1216,6 +1237,14 @@ where
|
||||||
recv(self.swmanager_receiver) -> msg => {
|
recv(self.swmanager_receiver) -> msg => {
|
||||||
msg.expect("Unexpected panic channel panic in constellation").map(Request::FromSWManager)
|
msg.expect("Unexpected panic channel panic in constellation").map(Request::FromSWManager)
|
||||||
}
|
}
|
||||||
|
recv(self.scheduler_receiver) -> msg => {
|
||||||
|
msg.expect("Unexpected panic channel panic in constellation").map(Request::Timer)
|
||||||
|
}
|
||||||
|
recv(scheduler_timeout) -> _ => {
|
||||||
|
// Note: by returning, we go back to the top,
|
||||||
|
// where check_timers will be called.
|
||||||
|
return;
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
let request = match request {
|
let request = match request {
|
||||||
|
@ -1243,6 +1272,9 @@ where
|
||||||
Request::FromSWManager(message) => {
|
Request::FromSWManager(message) => {
|
||||||
self.handle_request_from_swmanager(message);
|
self.handle_request_from_swmanager(message);
|
||||||
},
|
},
|
||||||
|
Request::Timer(message) => {
|
||||||
|
self.timer_scheduler.handle_timer_request(message);
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1859,11 +1891,6 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("Exiting timer scheduler.");
|
|
||||||
if let Err(e) = self.scheduler_chan.send(TimerSchedulerMsg::Exit) {
|
|
||||||
warn!("Exit timer scheduler failed ({})", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!("Exiting font cache thread.");
|
debug!("Exiting font cache thread.");
|
||||||
self.font_cache_thread.exit();
|
self.font_cache_thread.exit();
|
||||||
|
|
||||||
|
|
|
@ -2,15 +2,12 @@
|
||||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
|
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
|
||||||
|
|
||||||
use crossbeam_channel::{self, TryRecvError};
|
|
||||||
use ipc_channel::ipc::{self, IpcSender};
|
|
||||||
use script_traits::{TimerEvent, TimerEventRequest, TimerSchedulerMsg};
|
use script_traits::{TimerEvent, TimerEventRequest, TimerSchedulerMsg};
|
||||||
use std::cmp::{self, Ord};
|
use std::cmp::{self, Ord};
|
||||||
use std::collections::BinaryHeap;
|
use std::collections::BinaryHeap;
|
||||||
use std::thread;
|
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
pub struct TimerScheduler;
|
pub struct TimerScheduler(BinaryHeap<ScheduledEvent>);
|
||||||
|
|
||||||
struct ScheduledEvent {
|
struct ScheduledEvent {
|
||||||
request: TimerEventRequest,
|
request: TimerEventRequest,
|
||||||
|
@ -37,93 +34,40 @@ impl PartialEq for ScheduledEvent {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TimerScheduler {
|
impl TimerScheduler {
|
||||||
pub fn start() -> IpcSender<TimerSchedulerMsg> {
|
pub fn new() -> Self {
|
||||||
let (req_ipc_sender, req_ipc_receiver) = ipc::channel().expect("Channel creation failed.");
|
TimerScheduler(BinaryHeap::<ScheduledEvent>::new())
|
||||||
let (req_sender, req_receiver) = crossbeam_channel::bounded(1);
|
}
|
||||||
|
|
||||||
// We could do this much more directly with recv_timeout
|
/// Dispatch any events whose due time is past,
|
||||||
// (https://github.com/rust-lang/rfcs/issues/962).
|
/// and return a timeout corresponding to the earliest scheduled event, if any.
|
||||||
|
pub fn check_timers(&mut self) -> Option<Duration> {
|
||||||
|
let now = Instant::now();
|
||||||
|
loop {
|
||||||
|
match self.0.peek() {
|
||||||
|
// Dispatch the event if its due time is past
|
||||||
|
Some(event) if event.for_time <= now => {
|
||||||
|
let TimerEventRequest(ref sender, source, id, _) = event.request;
|
||||||
|
let _ = sender.send(TimerEvent(source, id));
|
||||||
|
},
|
||||||
|
// Do not schedule a timeout.
|
||||||
|
None => return None,
|
||||||
|
// Schedule a timeout for the earliest event.
|
||||||
|
Some(event) => return Some(event.for_time - now),
|
||||||
|
}
|
||||||
|
// Remove the event from the priority queue
|
||||||
|
// (Note this only executes when the first event has been dispatched).
|
||||||
|
self.0.pop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// util::thread doesn't give us access to the JoinHandle, which we need for park/unpark,
|
/// Handle an incoming timer request.
|
||||||
// so we use the builder directly.
|
pub fn handle_timer_request(&mut self, request: TimerSchedulerMsg) {
|
||||||
let timeout_thread = thread::Builder::new()
|
let TimerEventRequest(_, _, _, delay) = request.0;
|
||||||
.name(String::from("TimerScheduler"))
|
let schedule = Instant::now() + Duration::from_millis(delay.get());
|
||||||
.spawn(move || {
|
let event = ScheduledEvent {
|
||||||
// We maintain a priority queue of future events, sorted by due time.
|
request: request.0,
|
||||||
let mut scheduled_events = BinaryHeap::<ScheduledEvent>::new();
|
for_time: schedule,
|
||||||
loop {
|
};
|
||||||
let now = Instant::now();
|
self.0.push(event);
|
||||||
// Dispatch any events whose due time is past
|
|
||||||
loop {
|
|
||||||
match scheduled_events.peek() {
|
|
||||||
// Dispatch the event if its due time is past
|
|
||||||
Some(event) if event.for_time <= now => {
|
|
||||||
let TimerEventRequest(ref sender, source, id, _) = event.request;
|
|
||||||
let _ = sender.send(TimerEvent(source, id));
|
|
||||||
},
|
|
||||||
// Otherwise, we're done dispatching events
|
|
||||||
_ => break,
|
|
||||||
}
|
|
||||||
// Remove the event from the priority queue
|
|
||||||
// (Note this only executes when the first event has been dispatched
|
|
||||||
scheduled_events.pop();
|
|
||||||
}
|
|
||||||
// Look to see if there are any incoming events
|
|
||||||
match req_receiver.try_recv() {
|
|
||||||
// If there is an event, add it to the priority queue
|
|
||||||
Ok(TimerSchedulerMsg::Request(req)) => {
|
|
||||||
let TimerEventRequest(_, _, _, delay) = req;
|
|
||||||
let schedule = Instant::now() + Duration::from_millis(delay.get());
|
|
||||||
let event = ScheduledEvent {
|
|
||||||
request: req,
|
|
||||||
for_time: schedule,
|
|
||||||
};
|
|
||||||
scheduled_events.push(event);
|
|
||||||
},
|
|
||||||
// If there is no incoming event, park the thread,
|
|
||||||
// it will either be unparked when a new event arrives,
|
|
||||||
// or by a timeout.
|
|
||||||
Err(TryRecvError::Empty) => match scheduled_events.peek() {
|
|
||||||
None => thread::park(),
|
|
||||||
Some(event) => thread::park_timeout(event.for_time - now),
|
|
||||||
},
|
|
||||||
// If the channel is closed or we are shutting down, we are done.
|
|
||||||
Ok(TimerSchedulerMsg::Exit) | Err(TryRecvError::Disconnected) => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// This thread can terminate if the req_ipc_sender is dropped.
|
|
||||||
warn!("TimerScheduler thread terminated.");
|
|
||||||
})
|
|
||||||
.expect("Thread creation failed.")
|
|
||||||
.thread()
|
|
||||||
.clone();
|
|
||||||
|
|
||||||
// A proxy that just routes incoming IPC requests over the MPSC channel to the timeout thread,
|
|
||||||
// and unparks the timeout thread each time. Note that if unpark is called while the timeout
|
|
||||||
// thread isn't parked, this causes the next call to thread::park by the timeout thread
|
|
||||||
// not to block. This means that the timeout thread won't park when there is a request
|
|
||||||
// waiting in the MPSC channel buffer.
|
|
||||||
thread::Builder::new()
|
|
||||||
.name(String::from("TimerProxy"))
|
|
||||||
.spawn(move || {
|
|
||||||
while let Ok(req) = req_ipc_receiver.recv() {
|
|
||||||
let mut shutting_down = false;
|
|
||||||
match req {
|
|
||||||
TimerSchedulerMsg::Exit => shutting_down = true,
|
|
||||||
_ => {},
|
|
||||||
}
|
|
||||||
let _ = req_sender.send(req);
|
|
||||||
timeout_thread.unpark();
|
|
||||||
if shutting_down {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// This thread can terminate if the req_ipc_sender is dropped.
|
|
||||||
warn!("TimerProxy thread terminated.");
|
|
||||||
})
|
|
||||||
.expect("Thread creation failed.");
|
|
||||||
|
|
||||||
// Return the IPC sender
|
|
||||||
req_ipc_sender
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -285,7 +285,7 @@ impl OneshotTimers {
|
||||||
delay,
|
delay,
|
||||||
);
|
);
|
||||||
self.scheduler_chan
|
self.scheduler_chan
|
||||||
.send(TimerSchedulerMsg::Request(request))
|
.send(TimerSchedulerMsg(request))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -556,14 +556,9 @@ pub struct TimerEventRequest(
|
||||||
pub MsDuration,
|
pub MsDuration,
|
||||||
);
|
);
|
||||||
|
|
||||||
/// Type of messages that can be sent to the timer scheduler.
|
/// The message used to send a request to the timer scheduler.
|
||||||
#[derive(Debug, Deserialize, Serialize)]
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
pub enum TimerSchedulerMsg {
|
pub struct TimerSchedulerMsg(pub TimerEventRequest);
|
||||||
/// Message to schedule a new timer event.
|
|
||||||
Request(TimerEventRequest),
|
|
||||||
/// Message to exit the timer scheduler.
|
|
||||||
Exit,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Notifies the script thread to fire due timers.
|
/// Notifies the script thread to fire due timers.
|
||||||
/// `TimerSource` must be `FromWindow` when dispatched to `ScriptThread` and
|
/// `TimerSource` must be `FromWindow` when dispatched to `ScriptThread` and
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue