mirror of
https://github.com/servo/servo.git
synced 2025-08-04 05:00:08 +01:00
Replace the one-thread-per-timeout model by a two-thread model of timeouts.
This commit is contained in:
parent
983612751b
commit
e6ebd7f11d
1 changed files with 78 additions and 180 deletions
|
@ -2,86 +2,20 @@
|
||||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||||
|
|
||||||
use euclid::length::Length;
|
|
||||||
use ipc_channel::ipc::{self, IpcSender};
|
use ipc_channel::ipc::{self, IpcSender};
|
||||||
use ipc_channel::router::ROUTER;
|
|
||||||
use script_traits::{MsDuration, NsDuration, precise_time_ms, precise_time_ns};
|
|
||||||
use script_traits::{TimerEvent, TimerEventRequest};
|
use script_traits::{TimerEvent, TimerEventRequest};
|
||||||
use std::cell::RefCell;
|
|
||||||
use std::cmp::{self, Ord};
|
use std::cmp::{self, Ord};
|
||||||
use std::collections::BinaryHeap;
|
use std::collections::BinaryHeap;
|
||||||
use std::sync::Arc;
|
use std::sync::mpsc;
|
||||||
use std::sync::atomic::{self, AtomicBool};
|
use std::sync::mpsc::TryRecvError::{Disconnected, Empty};
|
||||||
use std::sync::mpsc::{channel, Receiver, Select};
|
use std::thread;
|
||||||
use std::thread::{self, spawn, Thread};
|
use std::time::{Duration, Instant};
|
||||||
use std::time::Duration;
|
|
||||||
use util::thread::spawn_named;
|
|
||||||
|
|
||||||
/// A quick hack to work around the removal of [`std::old_io::timer::Timer`](
|
pub struct TimerScheduler;
|
||||||
/// http://doc.rust-lang.org/1.0.0-beta/std/old_io/timer/struct.Timer.html )
|
|
||||||
struct CancelableOneshotTimer {
|
|
||||||
thread: Thread,
|
|
||||||
canceled: Arc<AtomicBool>,
|
|
||||||
port: Receiver<()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CancelableOneshotTimer {
|
|
||||||
fn new(duration: MsDuration) -> CancelableOneshotTimer {
|
|
||||||
let (tx, rx) = channel();
|
|
||||||
let canceled = Arc::new(AtomicBool::new(false));
|
|
||||||
let canceled_clone = canceled.clone();
|
|
||||||
|
|
||||||
let thread = spawn(move || {
|
|
||||||
let due_time = precise_time_ms() + duration;
|
|
||||||
|
|
||||||
let mut park_time = duration;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
thread::park_timeout(Duration::from_millis(park_time.get()));
|
|
||||||
|
|
||||||
if canceled_clone.load(atomic::Ordering::Relaxed) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// park_timeout_ms does not guarantee parking for the
|
|
||||||
// given amout. We might have woken up early.
|
|
||||||
let current_time = precise_time_ms();
|
|
||||||
if current_time >= due_time {
|
|
||||||
let _ = tx.send(());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
park_time = due_time - current_time;
|
|
||||||
}
|
|
||||||
}).thread().clone();
|
|
||||||
|
|
||||||
CancelableOneshotTimer {
|
|
||||||
thread: thread,
|
|
||||||
canceled: canceled,
|
|
||||||
port: rx,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn port(&self) -> &Receiver<()> {
|
|
||||||
&self.port
|
|
||||||
}
|
|
||||||
|
|
||||||
fn cancel(&self) {
|
|
||||||
self.canceled.store(true, atomic::Ordering::Relaxed);
|
|
||||||
self.thread.unpark();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct TimerScheduler {
|
|
||||||
port: Receiver<TimerEventRequest>,
|
|
||||||
|
|
||||||
scheduled_events: RefCell<BinaryHeap<ScheduledEvent>>,
|
|
||||||
|
|
||||||
timer: RefCell<Option<CancelableOneshotTimer>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct ScheduledEvent {
|
struct ScheduledEvent {
|
||||||
request: TimerEventRequest,
|
request: TimerEventRequest,
|
||||||
for_time: NsDuration,
|
for_time: Instant,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Ord for ScheduledEvent {
|
impl Ord for ScheduledEvent {
|
||||||
|
@ -103,119 +37,83 @@ impl PartialEq for ScheduledEvent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
enum Task {
|
|
||||||
HandleRequest(TimerEventRequest),
|
|
||||||
DispatchDueEvents,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TimerScheduler {
|
impl TimerScheduler {
|
||||||
pub fn start() -> IpcSender<TimerEventRequest> {
|
pub fn start() -> IpcSender<TimerEventRequest> {
|
||||||
let (chan, port) = ipc::channel().unwrap();
|
let (req_ipc_sender, req_ipc_receiver) = ipc::channel().unwrap();
|
||||||
|
let (req_sender, req_receiver) = mpsc::sync_channel(1);
|
||||||
|
|
||||||
let timer_scheduler = TimerScheduler {
|
// We could do this much more directly with recv_timeout
|
||||||
port: ROUTER.route_ipc_receiver_to_new_mpsc_receiver(port),
|
// (https://github.com/rust-lang/rfcs/issues/962).
|
||||||
|
|
||||||
scheduled_events: RefCell::new(BinaryHeap::new()),
|
// util::thread doesn't give us access to the JoinHandle, which we need for park/unpark,
|
||||||
|
// so we use the builder directly.
|
||||||
timer: RefCell::new(None),
|
let timeout_thread = thread::Builder::new()
|
||||||
};
|
.name(String::from("TimerScheduler"))
|
||||||
|
.spawn(move || {
|
||||||
spawn_named("TimerScheduler".to_owned(), move || {
|
// We maintain a priority queue of future events, sorted by due time.
|
||||||
timer_scheduler.run_event_loop();
|
let mut scheduled_events = BinaryHeap::<ScheduledEvent>::new();
|
||||||
});
|
loop {
|
||||||
|
let now = Instant::now();
|
||||||
chan
|
// Dispatch any events whose due time is past
|
||||||
|
loop {
|
||||||
|
match scheduled_events.peek() {
|
||||||
|
// Dispatch the event if its due time is past
|
||||||
|
Some(event) if event.for_time <= now => {
|
||||||
|
let TimerEventRequest(ref sender, source, id, _) = event.request;
|
||||||
|
let _ = sender.send(TimerEvent(source, id));
|
||||||
|
},
|
||||||
|
// Otherwise, we're done dispatching events
|
||||||
|
_ => break,
|
||||||
}
|
}
|
||||||
|
// Remove the event from the priority queue
|
||||||
fn run_event_loop(&self) {
|
// (Note this only executes when the first event has been dispatched
|
||||||
while let Some(thread) = self.receive_next_task() {
|
scheduled_events.pop();
|
||||||
match thread {
|
}
|
||||||
Task::HandleRequest(request) => self.handle_request(request),
|
// Look to see if there are any incoming events
|
||||||
Task::DispatchDueEvents => self.dispatch_due_events(),
|
match req_receiver.try_recv() {
|
||||||
|
// If there is an event, add it to the priority queue
|
||||||
|
Ok(req) => {
|
||||||
|
let TimerEventRequest(_, _, _, delay) = req;
|
||||||
|
let schedule = Instant::now() + Duration::from_millis(delay.get());
|
||||||
|
let event = ScheduledEvent { request: req, for_time: schedule };
|
||||||
|
scheduled_events.push(event);
|
||||||
|
},
|
||||||
|
// If there is no incoming event, park the thread,
|
||||||
|
// it will either be unparked when a new event arrives,
|
||||||
|
// or by a timeout.
|
||||||
|
Err(Empty) => match scheduled_events.peek() {
|
||||||
|
None => thread::park(),
|
||||||
|
Some(event) => thread::park_timeout(event.for_time - now),
|
||||||
|
},
|
||||||
|
// If the channel is closed, we are done.
|
||||||
|
Err(Disconnected) => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// This thread can terminate if the req_ipc_sender is dropped.
|
||||||
|
warn!("TimerScheduler thread terminated.");
|
||||||
|
})
|
||||||
|
.unwrap()
|
||||||
|
.thread()
|
||||||
|
.clone();
|
||||||
|
|
||||||
|
// A proxy that just routes incoming IPC requests over the MPSC channel to the timeout thread,
|
||||||
|
// and unparks the timeout thread each time. Note that if unpark is called while the timeout
|
||||||
|
// thread isn't parked, this causes the next call to thread::park by the timeout thread
|
||||||
|
// not to block. This means that the timeout thread won't park when there is a request
|
||||||
|
// waiting in the MPSC channel buffer.
|
||||||
|
thread::Builder::new()
|
||||||
|
.name(String::from("TimerProxy"))
|
||||||
|
.spawn(move || {
|
||||||
|
while let Ok(req) = req_ipc_receiver.recv() {
|
||||||
|
req_sender.send(req).unwrap();
|
||||||
|
timeout_thread.unpark();
|
||||||
}
|
}
|
||||||
|
// This thread can terminate if the req_ipc_sender is dropped.
|
||||||
|
warn!("TimerProxy thread terminated.");
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
#[allow(unsafe_code)]
|
// Return the IPC sender
|
||||||
fn receive_next_task(&self) -> Option<Task> {
|
req_ipc_sender
|
||||||
let port = &self.port;
|
|
||||||
let timer = self.timer.borrow();
|
|
||||||
let timer_port = timer.as_ref().map(|timer| timer.port());
|
|
||||||
|
|
||||||
if let Some(ref timer_port) = timer_port {
|
|
||||||
let sel = Select::new();
|
|
||||||
let mut scheduler_handle = sel.handle(port);
|
|
||||||
let mut timer_handle = sel.handle(timer_port);
|
|
||||||
|
|
||||||
unsafe {
|
|
||||||
scheduler_handle.add();
|
|
||||||
timer_handle.add();
|
|
||||||
}
|
|
||||||
|
|
||||||
let ret = sel.wait();
|
|
||||||
if ret == scheduler_handle.id() {
|
|
||||||
port.recv().ok().map(Task::HandleRequest)
|
|
||||||
} else if ret == timer_handle.id() {
|
|
||||||
timer_port.recv().ok().map(|_| Task::DispatchDueEvents)
|
|
||||||
} else {
|
|
||||||
panic!("unexpected select result!")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
port.recv().ok().map(Task::HandleRequest)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_request(&self, request: TimerEventRequest) {
|
|
||||||
let TimerEventRequest(_, _, _, duration_ms) = request;
|
|
||||||
let duration_ns = Length::new(duration_ms.get() * 1000 * 1000);
|
|
||||||
let schedule_for = precise_time_ns() + duration_ns;
|
|
||||||
|
|
||||||
let previously_earliest = self.scheduled_events.borrow().peek()
|
|
||||||
.map_or(Length::new(u64::max_value()), |scheduled| scheduled.for_time);
|
|
||||||
|
|
||||||
self.scheduled_events.borrow_mut().push(ScheduledEvent {
|
|
||||||
request: request,
|
|
||||||
for_time: schedule_for,
|
|
||||||
});
|
|
||||||
|
|
||||||
if schedule_for < previously_earliest {
|
|
||||||
self.start_timer_for_next_event();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn dispatch_due_events(&self) {
|
|
||||||
let now = precise_time_ns();
|
|
||||||
|
|
||||||
{
|
|
||||||
let mut events = self.scheduled_events.borrow_mut();
|
|
||||||
|
|
||||||
while !events.is_empty() && events.peek().as_ref().unwrap().for_time <= now {
|
|
||||||
let event = events.pop().unwrap();
|
|
||||||
let TimerEventRequest(chan, source, id, _) = event.request;
|
|
||||||
|
|
||||||
let _ = chan.send(TimerEvent(source, id));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
self.start_timer_for_next_event();
|
|
||||||
}
|
|
||||||
|
|
||||||
fn start_timer_for_next_event(&self) {
|
|
||||||
let events = self.scheduled_events.borrow();
|
|
||||||
let next_event = events.peek();
|
|
||||||
|
|
||||||
let mut timer = self.timer.borrow_mut();
|
|
||||||
|
|
||||||
if let Some(ref mut timer) = *timer {
|
|
||||||
timer.cancel();
|
|
||||||
}
|
|
||||||
|
|
||||||
*timer = next_event.map(|next_event| {
|
|
||||||
let delay_ns = next_event.for_time.get().saturating_sub(precise_time_ns().get());
|
|
||||||
// Round up, we'd rather be late than early…
|
|
||||||
let delay_ms = Length::new(delay_ns.saturating_add(999999) / (1000 * 1000));
|
|
||||||
|
|
||||||
CancelableOneshotTimer::new(delay_ms)
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue