diff --git a/components/servo/Cargo.lock b/components/servo/Cargo.lock index 39b651e0912..9d50849fc3e 100644 --- a/components/servo/Cargo.lock +++ b/components/servo/Cargo.lock @@ -373,6 +373,14 @@ dependencies = [ "unreachable 0.0.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "deque" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand 0.3.12 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "devtools" version = "0.0.1" @@ -2043,6 +2051,7 @@ dependencies = [ "app_units 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "bitflags 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "cssparser 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", + "deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "euclid 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "getopts 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)", "heapsize 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/components/util/Cargo.toml b/components/util/Cargo.toml index b6f8fd15640..5388654a8b0 100644 --- a/components/util/Cargo.toml +++ b/components/util/Cargo.toml @@ -30,6 +30,7 @@ git = "https://github.com/servo/ipc-channel" app_units = {version = "0.2.1", features = ["plugins"]} bitflags = "0.3" cssparser = {version = "0.5.3", features = ["heap_size", "serde-serialization"]} +deque = "0.3.1" euclid = {version = "0.6.2", features = ["unstable", "plugins"]} getopts = "0.2.11" heapsize = "0.3.0" diff --git a/components/util/deque/mod.rs b/components/util/deque/mod.rs deleted file mode 100644 index 7ee26c9f2bf..00000000000 --- a/components/util/deque/mod.rs +++ /dev/null @@ -1,406 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! A (mostly) lock-free concurrent work-stealing deque -//! -//! This module contains an implementation of the Chase-Lev work stealing deque -//! described in "Dynamic Circular Work-Stealing Deque". The implementation is -//! heavily based on the pseudocode found in the paper. -//! -//! This implementation does not want to have the restriction of a garbage -//! collector for reclamation of buffers, and instead it uses a shared pool of -//! buffers. This shared pool is required for correctness in this -//! implementation. -//! -//! The only lock-synchronized portions of this deque are the buffer allocation -//! and deallocation portions. Otherwise all operations are lock-free. -//! -//! # Example -//! -//! use util::deque::BufferPool; -//! -//! let mut pool = BufferPool::new(); -//! let (mut worker, mut stealer) = pool.deque(); -//! -//! // Only the worker may push/pop -//! worker.push(1i); -//! worker.pop(); -//! -//! // Stealers take data from the other end of the deque -//! worker.push(1i); -//! stealer.steal(); -//! -//! // Stealers can be cloned to have many stealers stealing in parallel -//! worker.push(1i); -//! let mut stealer2 = stealer.clone(); -//! stealer2.steal(); - -// NB: the "buffer pool" strategy is not done for speed, but rather for -// correctness. For more info, see the comment on `swap_buffer` - -// FIXME: more than likely, more atomic operations than necessary use SeqCst - -use std::marker; -use std::mem::{align_of, forget, size_of, transmute}; -use std::ptr; -use alloc::heap::{allocate, deallocate}; -use std::sync::Arc; -use std::sync::Mutex; -use std::sync::atomic::Ordering::{Relaxed, SeqCst}; -use std::sync::atomic::{AtomicIsize, AtomicPtr}; -pub use self::Stolen::{Abort, Data, Empty}; - -// Once the queue is less than 1/K full, then it will be downsized. Note that -// the deque requires that this number be less than 2. -static K: isize = 4; - -// Minimum number of bits that a buffer size should be. No buffer will resize to -// under this value, and all deques will initially contain a buffer of this -// size. -// -// The size in question is 1 << MIN_BITS -static MIN_BITS: usize = 7; - -struct Deque { - bottom: AtomicIsize, - top: AtomicIsize, - array: AtomicPtr>, - pool: BufferPool, -} - -unsafe impl Send for Deque {} - -/// Worker half of the work-stealing deque. This worker has exclusive access to -/// one side of the deque, and uses `push` and `pop` method to manipulate it. -/// -/// There may only be one worker per deque. -pub struct Worker { - deque: Arc>, -} - -impl !marker::Sync for Worker {} - -/// The stealing half of the work-stealing deque. Stealers have access to the -/// opposite end of the deque from the worker, and they only have access to the -/// `steal` method. -pub struct Stealer { - deque: Arc>, -} - -impl !marker::Sync for Stealer {} - -/// When stealing some data, this is an enumeration of the possible outcomes. -#[derive(PartialEq, Debug)] -pub enum Stolen { - /// The deque was empty at the time of stealing - Empty, - /// The stealer lost the race for stealing data, and a retry may return more - /// data. - Abort, - /// The stealer has successfully stolen some data. - Data(T), -} - -/// The allocation pool for buffers used by work-stealing deques. Right now this -/// structure is used for reclamation of memory after it is no longer in use by -/// deques. -/// -/// This data structure is protected by a mutex, but it is rarely used. Deques -/// will only use this structure when allocating a new buffer or deallocating a -/// previous one. -pub struct BufferPool { - // FIXME: This entire file was copied from std::sync::deque before it was removed, - // I converted `Exclusive` to `Mutex` here, but that might not be ideal - pool: Arc>>>>, -} - -/// An internal buffer used by the chase-lev deque. This structure is actually -/// implemented as a circular buffer, and is used as the intermediate storage of -/// the data in the deque. -/// -/// This type is implemented with *T instead of Vec for two reasons: -/// -/// 1. There is nothing safe about using this buffer. This easily allows the -/// same value to be read twice in to rust, and there is nothing to -/// prevent this. The usage by the deque must ensure that one of the -/// values is forgotten. Furthermore, we only ever want to manually run -/// destructors for values in this buffer (on drop) because the bounds -/// are defined by the deque it's owned by. -/// -/// 2. We can certainly avoid bounds checks using *T instead of Vec, although -/// LLVM is probably pretty good at doing this already. -struct Buffer { - storage: *const T, - log_size: usize, -} - -unsafe impl Send for Buffer { } - -impl BufferPool { - /// Allocates a new buffer pool which in turn can be used to allocate new - /// deques. - pub fn new() -> BufferPool { - BufferPool { pool: Arc::new(Mutex::new(Vec::new())) } - } - - /// Allocates a new work-stealing deque which will send/receiving memory to - /// and from this buffer pool. - pub fn deque(&self) -> (Worker, Stealer) { - let a = Arc::new(Deque::new(self.clone())); - let b = a.clone(); - (Worker { deque: a }, Stealer { deque: b }) - } - - fn alloc(&mut self, bits: usize) -> Box> { - unsafe { - let mut pool = self.pool.lock().unwrap(); - match pool.iter().position(|x| x.size() >= (1 << bits)) { - Some(i) => pool.remove(i), - None => box Buffer::new(bits) - } - } - } -} - -impl BufferPool { - fn free(&self, buf: Box>) { - let mut pool = self.pool.lock().unwrap(); - match pool.iter().position(|v| v.size() > buf.size()) { - Some(i) => pool.insert(i, buf), - None => pool.push(buf), - } - } -} - -impl Clone for BufferPool { - fn clone(&self) -> BufferPool { BufferPool { pool: self.pool.clone() } } -} - -impl Worker { - /// Pushes data onto the front of this work queue. - pub fn push(&self, t: T) { - unsafe { self.deque.push(t) } - } - /// Pops data off the front of the work queue, returning `None` on an empty - /// queue. - pub fn pop(&self) -> Option { - unsafe { self.deque.pop() } - } - - /// Gets access to the buffer pool that this worker is attached to. This can - /// be used to create more deques which share the same buffer pool as this - /// deque. - pub fn pool(&self) -> &BufferPool { - &self.deque.pool - } -} - -impl Stealer { - /// Steals work off the end of the queue (opposite of the worker's end) - pub fn steal(&self) -> Stolen { - unsafe { self.deque.steal() } - } - - /// Gets access to the buffer pool that this stealer is attached to. This - /// can be used to create more deques which share the same buffer pool as - /// this deque. - pub fn pool(&self) -> &BufferPool { - &self.deque.pool - } -} - -impl Clone for Stealer { - fn clone(&self) -> Stealer { - Stealer { deque: self.deque.clone() } - } -} - -// Almost all of this code can be found directly in the paper so I'm not -// personally going to heavily comment what's going on here. - -impl Deque { - fn new(mut pool: BufferPool) -> Deque { - let buf = pool.alloc(MIN_BITS); - Deque { - bottom: AtomicIsize::new(0), - top: AtomicIsize::new(0), - array: AtomicPtr::new(unsafe { transmute(buf) }), - pool: pool, - } - } - - unsafe fn push(&self, data: T) { - let mut b = self.bottom.load(Relaxed); - let t = self.top.load(Relaxed); - let mut a = self.array.load(SeqCst); - let size = b - t; - if size >= (*a).size() - 1 { - // You won't find this code in the chase-lev deque paper. This is - // alluded to in a small footnote, however. We always free a buffer - // when growing in order to prevent leaks. - a = self.swap_buffer(b, a, (*a).resize(b, t, 1)); - b = self.bottom.load(SeqCst); - } - (*a).put(b, data); - self.bottom.store(b + 1, SeqCst); - } - - unsafe fn pop(&self) -> Option { - let b = self.bottom.load(Relaxed); - let a = self.array.load(SeqCst); - let b = b - 1; - self.bottom.store(b, SeqCst); - let t = self.top.load(Relaxed); - let size = b - t; - if size < 0 { - self.bottom.store(t, SeqCst); - return None; - } - let data = (*a).get(b); - if size > 0 { - self.maybe_shrink(b, t); - return Some(data); - } - if self.top.compare_and_swap(t, t + 1, SeqCst) == t { - self.bottom.store(t + 1, SeqCst); - Some(data) - } else { - self.bottom.store(t + 1, SeqCst); - forget(data); // someone else stole this value - None - } - } - - unsafe fn steal(&self) -> Stolen { - let t = self.top.load(Relaxed); - let old = self.array.load(Relaxed); - let b = self.bottom.load(Relaxed); - let size = b - t; - if size <= 0 { return Empty } - let a = self.array.load(SeqCst); - if size % (*a).size() == 0 { - if a == old && t == self.top.load(SeqCst) { - return Empty - } - return Abort - } - let data = (*a).get(t); - if self.top.compare_and_swap(t, t + 1, SeqCst) == t { - Data(data) - } else { - forget(data); // someone else stole this value - Abort - } - } - - unsafe fn maybe_shrink(&self, b: isize, t: isize) { - let a = self.array.load(SeqCst); - if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) { - self.swap_buffer(b, a, (*a).resize(b, t, -1)); - } - } - - // Helper routine not mentioned in the paper which is used in growing and - // shrinking buffers to swap in a new buffer into place. As a bit of a - // recap, the whole point that we need a buffer pool rather than just - // calling malloc/free directly is that stealers can continue using buffers - // after this method has called 'free' on it. The continued usage is simply - // a read followed by a forget, but we must make sure that the memory can - // continue to be read after we flag this buffer for reclamation. - unsafe fn swap_buffer(&self, b: isize, old: *mut Buffer, - buf: Buffer) -> *mut Buffer { - let newbuf: *mut Buffer = transmute(box buf); - self.array.store(newbuf, SeqCst); - let ss = (*newbuf).size(); - self.bottom.store(b + ss, SeqCst); - let t = self.top.load(SeqCst); - if self.top.compare_and_swap(t, t + ss, SeqCst) != t { - self.bottom.store(b, SeqCst); - } - self.pool.free(transmute(old)); - newbuf - } -} - - -impl Drop for Deque { - fn drop(&mut self) { - let t = self.top.load(SeqCst); - let b = self.bottom.load(SeqCst); - let a = self.array.load(SeqCst); - // Free whatever is leftover in the dequeue, and then move the buffer - // back into the pool. - for i in t..b { - let _: T = unsafe { (*a).get(i) }; - } - self.pool.free(unsafe { transmute(a) }); - } -} - -#[inline] -fn buffer_alloc_size(log_size: usize) -> usize { - (1 << log_size) * size_of::() -} - -impl Buffer { - unsafe fn new(log_size: usize) -> Buffer { - let size = buffer_alloc_size::(log_size); - let buffer = allocate(size, align_of::()); - if buffer.is_null() { ::alloc::oom() } - Buffer { - storage: buffer as *const T, - log_size: log_size, - } - } - - fn size(&self) -> isize { 1 << self.log_size } - - // Apparently LLVM cannot optimize (foo % (1 << bar)) into this implicitly - fn mask(&self) -> isize { (1 << self.log_size) - 1 } - - unsafe fn elem(&self, i: isize) -> *const T { - self.storage.offset(i & self.mask()) - } - - // This does not protect against loading duplicate values of the same cell, - // nor does this clear out the contents contained within. Hence, this is a - // very unsafe method which the caller needs to treat specially in case a - // race is lost. - unsafe fn get(&self, i: isize) -> T { - ptr::read(self.elem(i)) - } - - // Unsafe because this unsafely overwrites possibly uninitialized or - // initialized data. - unsafe fn put(&self, i: isize, t: T) { - ptr::write(self.elem(i) as *mut T, t); - } - - // Again, unsafe because this has incredibly dubious ownership violations. - // It is assumed that this buffer is immediately dropped. - unsafe fn resize(&self, b: isize, t: isize, delta: isize) -> Buffer { - // NB: not entirely obvious, but thanks to 2's complement, - // casting delta to usize and then adding gives the desired - // effect. - let buf = Buffer::new(self.log_size.wrapping_add(delta as usize)); - for i in t..b { - buf.put(i, self.get(i)); - } - buf - } -} - -impl Drop for Buffer { - fn drop(&mut self) { - // It is assumed that all buffers are empty on drop. - let size = buffer_alloc_size::(self.log_size); - unsafe { deallocate(self.storage as *mut u8, size, align_of::()) } - } -} diff --git a/components/util/lib.rs b/components/util/lib.rs index 3a039c58da8..8444768a415 100644 --- a/components/util/lib.rs +++ b/components/util/lib.rs @@ -8,8 +8,6 @@ #![feature(custom_derive)] #![cfg_attr(feature = "non-geckolib", feature(decode_utf16))] #![feature(fnbox)] -#![feature(heap_api)] -#![feature(oom)] #![feature(optin_builtin_traits)] #![feature(plugin)] #![feature(reflect_marker)] @@ -25,6 +23,7 @@ extern crate app_units; extern crate bitflags; #[macro_use] extern crate cssparser; +extern crate deque; extern crate euclid; extern crate getopts; extern crate heapsize; @@ -51,7 +50,6 @@ use std::sync::Arc; pub mod cache; pub mod debug_utils; -pub mod deque; pub mod geometry; pub mod ipc; pub mod linked_list; diff --git a/components/util/workqueue.rs b/components/util/workqueue.rs index c46039b6068..3b97ab13e8a 100644 --- a/components/util/workqueue.rs +++ b/components/util/workqueue.rs @@ -10,7 +10,7 @@ #[cfg(windows)] extern crate kernel32; -use deque::{Abort, BufferPool, Data, Empty, Stealer, Worker}; +use deque::{self, Abort, Data, Empty, Stealer, Worker}; #[cfg(not(windows))] use libc::usleep; use rand::{Rng, XorShiftRng, weak_rng}; @@ -25,7 +25,7 @@ use thread_state; /// /// - `QueueData`: global custom data for the entire work queue. /// - `WorkData`: custom data specific to each unit of work. -pub struct WorkUnit { +pub struct WorkUnit { /// The function to execute. pub fun: extern "Rust" fn(WorkData, &mut WorkerProxy), /// Arbitrary data. @@ -33,7 +33,7 @@ pub struct WorkUnit { } /// Messages from the supervisor to the worker. -enum WorkerMsg { +enum WorkerMsg { /// Tells the worker to start work. Start(Worker>, *mut AtomicUsize, *const QueueData), /// Tells the worker to stop. It can be restarted again with a `WorkerMsg::Start`. @@ -44,19 +44,19 @@ enum WorkerMsg { Exit, } -unsafe impl Send for WorkerMsg {} +unsafe impl Send for WorkerMsg {} /// Messages to the supervisor. -enum SupervisorMsg { +enum SupervisorMsg { Finished, HeapSizeOfTLS(usize), ReturnDeque(usize, Worker>), } -unsafe impl Send for SupervisorMsg {} +unsafe impl Send for SupervisorMsg {} /// Information that the supervisor thread keeps about the worker threads. -struct WorkerInfo { +struct WorkerInfo { /// The communication channel to the workers. chan: Sender>, /// The worker end of the deque, if we have it. @@ -66,7 +66,7 @@ struct WorkerInfo { } /// Information specific to each worker thread that the thread keeps. -struct WorkerThread { +struct WorkerThread { /// The index of this worker. index: usize, /// The communication port from the supervisor. @@ -79,7 +79,7 @@ struct WorkerThread { rng: XorShiftRng, } -unsafe impl Send for WorkerThread {} +unsafe impl Send for WorkerThread {} const SPINS_UNTIL_BACKOFF: u32 = 128; const BACKOFF_INCREMENT_IN_US: u32 = 5; @@ -208,7 +208,7 @@ impl WorkerThread { } /// A handle to the work queue that individual work units have. -pub struct WorkerProxy<'a, QueueData: 'a, WorkData: 'a> { +pub struct WorkerProxy<'a, QueueData: 'a, WorkData: 'a + Send> { worker: &'a mut Worker>, ref_count: *mut AtomicUsize, queue_data: &'a QueueData, @@ -239,7 +239,7 @@ impl<'a, QueueData: 'static, WorkData: Send + 'static> WorkerProxy<'a, QueueData } /// A work queue on which units of work can be submitted. -pub struct WorkQueue { +pub struct WorkQueue { /// Information about each of the workers. workers: Vec>, /// A port on which deques can be received from the workers. @@ -259,8 +259,7 @@ impl WorkQueue { let (mut infos, mut threads) = (vec!(), vec!()); for i in 0..thread_count { let (worker_chan, worker_port) = channel(); - let pool = BufferPool::new(); - let (worker, thief) = pool.deque(); + let (worker, thief) = deque::new(); infos.push(WorkerInfo { chan: worker_chan, deque: Some(worker),