Switch to external deque crate

This commit is contained in:
Greg Morenz 2016-02-17 11:05:37 -05:00
parent ab381cf951
commit 21e5d0d046
5 changed files with 23 additions and 422 deletions

View file

@ -10,7 +10,7 @@
#[cfg(windows)]
extern crate kernel32;
use deque::{Abort, BufferPool, Data, Empty, Stealer, Worker};
use deque::{self, Abort, Data, Empty, Stealer, Worker};
#[cfg(not(windows))]
use libc::usleep;
use rand::{Rng, XorShiftRng, weak_rng};
@ -25,7 +25,7 @@ use thread_state;
///
/// - `QueueData`: global custom data for the entire work queue.
/// - `WorkData`: custom data specific to each unit of work.
pub struct WorkUnit<QueueData, WorkData> {
pub struct WorkUnit<QueueData, WorkData: Send> {
/// The function to execute.
pub fun: extern "Rust" fn(WorkData, &mut WorkerProxy<QueueData, WorkData>),
/// Arbitrary data.
@ -33,7 +33,7 @@ pub struct WorkUnit<QueueData, WorkData> {
}
/// Messages from the supervisor to the worker.
enum WorkerMsg<QueueData: 'static, WorkData: 'static> {
enum WorkerMsg<QueueData: 'static, WorkData: 'static + Send> {
/// Tells the worker to start work.
Start(Worker<WorkUnit<QueueData, WorkData>>, *mut AtomicUsize, *const QueueData),
/// Tells the worker to stop. It can be restarted again with a `WorkerMsg::Start`.
@ -44,19 +44,19 @@ enum WorkerMsg<QueueData: 'static, WorkData: 'static> {
Exit,
}
unsafe impl<QueueData: 'static, WorkData: 'static> Send for WorkerMsg<QueueData, WorkData> {}
unsafe impl<QueueData: 'static, WorkData: 'static + Send> Send for WorkerMsg<QueueData, WorkData> {}
/// Messages to the supervisor.
enum SupervisorMsg<QueueData: 'static, WorkData: 'static> {
enum SupervisorMsg<QueueData: 'static, WorkData: 'static + Send> {
Finished,
HeapSizeOfTLS(usize),
ReturnDeque(usize, Worker<WorkUnit<QueueData, WorkData>>),
}
unsafe impl<QueueData: 'static, WorkData: 'static> Send for SupervisorMsg<QueueData, WorkData> {}
unsafe impl<QueueData: 'static, WorkData: 'static + Send> Send for SupervisorMsg<QueueData, WorkData> {}
/// Information that the supervisor thread keeps about the worker threads.
struct WorkerInfo<QueueData: 'static, WorkData: 'static> {
struct WorkerInfo<QueueData: 'static, WorkData: 'static + Send> {
/// The communication channel to the workers.
chan: Sender<WorkerMsg<QueueData, WorkData>>,
/// The worker end of the deque, if we have it.
@ -66,7 +66,7 @@ struct WorkerInfo<QueueData: 'static, WorkData: 'static> {
}
/// Information specific to each worker thread that the thread keeps.
struct WorkerThread<QueueData: 'static, WorkData: 'static> {
struct WorkerThread<QueueData: 'static, WorkData: 'static + Send> {
/// The index of this worker.
index: usize,
/// The communication port from the supervisor.
@ -79,7 +79,7 @@ struct WorkerThread<QueueData: 'static, WorkData: 'static> {
rng: XorShiftRng,
}
unsafe impl<QueueData: 'static, WorkData: 'static> Send for WorkerThread<QueueData, WorkData> {}
unsafe impl<QueueData: 'static, WorkData: 'static + Send> Send for WorkerThread<QueueData, WorkData> {}
const SPINS_UNTIL_BACKOFF: u32 = 128;
const BACKOFF_INCREMENT_IN_US: u32 = 5;
@ -208,7 +208,7 @@ impl<QueueData: Sync, WorkData: Send> WorkerThread<QueueData, WorkData> {
}
/// A handle to the work queue that individual work units have.
pub struct WorkerProxy<'a, QueueData: 'a, WorkData: 'a> {
pub struct WorkerProxy<'a, QueueData: 'a, WorkData: 'a + Send> {
worker: &'a mut Worker<WorkUnit<QueueData, WorkData>>,
ref_count: *mut AtomicUsize,
queue_data: &'a QueueData,
@ -239,7 +239,7 @@ impl<'a, QueueData: 'static, WorkData: Send + 'static> WorkerProxy<'a, QueueData
}
/// A work queue on which units of work can be submitted.
pub struct WorkQueue<QueueData: 'static, WorkData: 'static> {
pub struct WorkQueue<QueueData: 'static, WorkData: 'static + Send> {
/// Information about each of the workers.
workers: Vec<WorkerInfo<QueueData, WorkData>>,
/// A port on which deques can be received from the workers.
@ -259,8 +259,7 @@ impl<QueueData: Sync, WorkData: Send> WorkQueue<QueueData, WorkData> {
let (mut infos, mut threads) = (vec!(), vec!());
for i in 0..thread_count {
let (worker_chan, worker_port) = channel();
let pool = BufferPool::new();
let (worker, thief) = pool.deque();
let (worker, thief) = deque::new();
infos.push(WorkerInfo {
chan: worker_chan,
deque: Some(worker),