Make the workqueue's type parameter names less terrible

This commit is contained in:
Brendan Zabarauskas 2014-05-28 15:32:27 -07:00
parent 69a3d100ad
commit 0761107daa

View file

@ -18,63 +18,62 @@ use std::task::TaskOpts;
/// A unit of work. /// A unit of work.
/// ///
/// The type parameter `QUD` stands for "queue user data" and represents global custom data for the /// # Type parameters
/// entire work queue, and the type parameter `WUD` stands for "work user data" and represents ///
/// custom data specific to each unit of work. /// - `QueueData`: global custom data for the entire work queue.
pub struct WorkUnit<QUD,WUD> { /// - `WorkData`: custom data specific to each unit of work.
pub struct WorkUnit<QueueData, WorkData> {
/// The function to execute. /// The function to execute.
pub fun: extern "Rust" fn(WUD, &mut WorkerProxy<QUD,WUD>), pub fun: extern "Rust" fn(WorkData, &mut WorkerProxy<QueueData, WorkData>),
/// Arbitrary data. /// Arbitrary data.
pub data: WUD, pub data: WorkData,
} }
/// Messages from the supervisor to the worker. /// Messages from the supervisor to the worker.
enum WorkerMsg<QUD,WUD> { enum WorkerMsg<QueueData, WorkData> {
/// Tells the worker to start work. /// Tells the worker to start work.
StartMsg(Worker<WorkUnit<QUD,WUD>>, *mut AtomicUint, *QUD), StartMsg(Worker<WorkUnit<QueueData, WorkData>>, *mut AtomicUint, *QueueData),
/// Tells the worker to stop. It can be restarted again with a `StartMsg`. /// Tells the worker to stop. It can be restarted again with a `StartMsg`.
StopMsg, StopMsg,
/// Tells the worker thread to terminate. /// Tells the worker thread to terminate.
ExitMsg, ExitMsg,
} }
/// Messages to the supervisor. /// Messages to the supervisor.
enum SupervisorMsg<QUD,WUD> { enum SupervisorMsg<QueueData, WorkData> {
FinishedMsg, FinishedMsg,
ReturnDequeMsg(uint, Worker<WorkUnit<QUD,WUD>>), ReturnDequeMsg(uint, Worker<WorkUnit<QueueData, WorkData>>),
} }
/// Information that the supervisor thread keeps about the worker threads. /// Information that the supervisor thread keeps about the worker threads.
struct WorkerInfo<QUD,WUD> { struct WorkerInfo<QueueData, WorkData> {
/// The communication channel to the workers. /// The communication channel to the workers.
chan: Sender<WorkerMsg<QUD,WUD>>, chan: Sender<WorkerMsg<QueueData, WorkData>>,
/// The buffer pool for this deque. /// The buffer pool for this deque.
pool: BufferPool<WorkUnit<QUD,WUD>>, pool: BufferPool<WorkUnit<QueueData, WorkData>>,
/// The worker end of the deque, if we have it. /// The worker end of the deque, if we have it.
deque: Option<Worker<WorkUnit<QUD,WUD>>>, deque: Option<Worker<WorkUnit<QueueData, WorkData>>>,
/// The thief end of the work-stealing deque. /// The thief end of the work-stealing deque.
thief: Stealer<WorkUnit<QUD,WUD>>, thief: Stealer<WorkUnit<QueueData, WorkData>>,
} }
/// Information specific to each worker thread that the thread keeps. /// Information specific to each worker thread that the thread keeps.
struct WorkerThread<QUD,WUD> { struct WorkerThread<QueueData, WorkData> {
/// The index of this worker. /// The index of this worker.
index: uint, index: uint,
/// The communication port from the supervisor. /// The communication port from the supervisor.
port: Receiver<WorkerMsg<QUD,WUD>>, port: Receiver<WorkerMsg<QueueData, WorkData>>,
/// The communication channel on which messages are sent to the supervisor. /// The communication channel on which messages are sent to the supervisor.
chan: Sender<SupervisorMsg<QUD,WUD>>, chan: Sender<SupervisorMsg<QueueData, WorkData>>,
/// The thief end of the work-stealing deque for all other workers. /// The thief end of the work-stealing deque for all other workers.
other_deques: Vec<Stealer<WorkUnit<QUD,WUD>>>, other_deques: Vec<Stealer<WorkUnit<QueueData, WorkData>>>,
/// The random number generator for this worker. /// The random number generator for this worker.
rng: XorShiftRng, rng: XorShiftRng,
} }
static SPIN_COUNT: uint = 1000; static SPIN_COUNT: uint = 1000;
impl<QUD:Send,WUD:Send> WorkerThread<QUD,WUD> { impl<QueueData: Send, WorkData: Send> WorkerThread<QueueData, WorkData> {
/// The main logic. This function starts up the worker and listens for /// The main logic. This function starts up the worker and listens for
/// messages. /// messages.
fn start(&mut self) { fn start(&mut self) {
@ -160,16 +159,16 @@ impl<QUD:Send,WUD:Send> WorkerThread<QUD,WUD> {
} }
/// A handle to the work queue that individual work units have. /// A handle to the work queue that individual work units have.
pub struct WorkerProxy<'a,QUD,WUD> { pub struct WorkerProxy<'a, QueueData, WorkData> {
worker: &'a mut Worker<WorkUnit<QUD,WUD>>, worker: &'a mut Worker<WorkUnit<QueueData, WorkData>>,
ref_count: *mut AtomicUint, ref_count: *mut AtomicUint,
queue_data: *QUD, queue_data: *QueueData,
} }
impl<'a,QUD,WUD:Send> WorkerProxy<'a,QUD,WUD> { impl<'a, QueueData, WorkData: Send> WorkerProxy<'a, QueueData, WorkData> {
/// Enqueues a block into the work queue. /// Enqueues a block into the work queue.
#[inline] #[inline]
pub fn push(&mut self, work_unit: WorkUnit<QUD,WUD>) { pub fn push(&mut self, work_unit: WorkUnit<QueueData, WorkData>) {
unsafe { unsafe {
drop((*self.ref_count).fetch_add(1, SeqCst)); drop((*self.ref_count).fetch_add(1, SeqCst));
} }
@ -178,7 +177,7 @@ impl<'a,QUD,WUD:Send> WorkerProxy<'a,QUD,WUD> {
/// Retrieves the queue user data. /// Retrieves the queue user data.
#[inline] #[inline]
pub fn user_data<'a>(&'a self) -> &'a QUD { pub fn user_data<'a>(&'a self) -> &'a QueueData {
unsafe { unsafe {
cast::transmute(self.queue_data) cast::transmute(self.queue_data)
} }
@ -186,21 +185,21 @@ impl<'a,QUD,WUD:Send> WorkerProxy<'a,QUD,WUD> {
} }
/// A work queue on which units of work can be submitted. /// A work queue on which units of work can be submitted.
pub struct WorkQueue<QUD,WUD> { pub struct WorkQueue<QueueData, WorkData> {
/// Information about each of the workers. /// Information about each of the workers.
workers: Vec<WorkerInfo<QUD,WUD>>, workers: Vec<WorkerInfo<QueueData, WorkData>>,
/// A port on which deques can be received from the workers. /// A port on which deques can be received from the workers.
port: Receiver<SupervisorMsg<QUD,WUD>>, port: Receiver<SupervisorMsg<QueueData, WorkData>>,
/// The amount of work that has been enqueued. /// The amount of work that has been enqueued.
work_count: uint, work_count: uint,
/// Arbitrary user data. /// Arbitrary user data.
pub data: QUD, pub data: QueueData,
} }
impl<QUD:Send,WUD:Send> WorkQueue<QUD,WUD> { impl<QueueData: Send, WorkData: Send> WorkQueue<QueueData, WorkData> {
/// Creates a new work queue and spawns all the threads associated with /// Creates a new work queue and spawns all the threads associated with
/// it. /// it.
pub fn new(task_name: &'static str, thread_count: uint, user_data: QUD) -> WorkQueue<QUD,WUD> { pub fn new(task_name: &'static str, thread_count: uint, user_data: QueueData) -> WorkQueue<QueueData, WorkData> {
// Set up data structures. // Set up data structures.
let (supervisor_chan, supervisor_port) = channel(); let (supervisor_chan, supervisor_port) = channel();
let (mut infos, mut threads) = (vec!(), vec!()); let (mut infos, mut threads) = (vec!(), vec!());
@ -253,7 +252,7 @@ impl<QUD:Send,WUD:Send> WorkQueue<QUD,WUD> {
/// Enqueues a block into the work queue. /// Enqueues a block into the work queue.
#[inline] #[inline]
pub fn push(&mut self, work_unit: WorkUnit<QUD,WUD>) { pub fn push(&mut self, work_unit: WorkUnit<QueueData, WorkData>) {
match self.workers.get_mut(0).deque { match self.workers.get_mut(0).deque {
None => { None => {
fail!("tried to push a block but we don't have the deque?!") fail!("tried to push a block but we don't have the deque?!")