Run all task spawning through util, to allow for easy hooking.

During debugging, I found it useful to hook all task creation in a
central location, and util::task was the perfect place for it.

r? @pcwalton (or maybe someone else, I'm kinda sending you a bunch of
reviews today because I don't know who better to give them to)
This commit is contained in:
Clark Gaebel 2014-10-28 09:53:45 -07:00
parent 9e94ecf99c
commit 6df1cc8e4c
14 changed files with 48 additions and 36 deletions

View file

@ -15,6 +15,11 @@ pub fn spawn_named<S: IntoMaybeOwned<'static>>(name: S, f: proc():Send) {
builder.spawn(f);
}
pub fn spawn_named_native<S: IntoMaybeOwned<'static>>(name: S, f: proc():Send) {
let builder = task::TaskBuilder::new().named(name).native();
builder.spawn(f);
}
/// Arrange to send a particular message to a channel if the task fails.
pub fn spawn_named_with_send_on_failure<T: Send>(name: &'static str,
state: task_state::TaskState,

View file

@ -15,6 +15,7 @@
// The only difference is that a normal channel is used instead of a sync_channel.
//
use task::spawn_named;
use std::sync::{Arc, Mutex};
pub struct TaskPool {
@ -28,9 +29,11 @@ impl TaskPool {
let state = Arc::new(Mutex::new(rx));
for _ in range(0, tasks) {
for i in range(0, tasks) {
let state = state.clone();
spawn(proc() worker(&*state));
spawn_named(
format!("TaskPoolWorker {}/{}", i+1, tasks),
proc() worker(&*state));
}
return TaskPool { tx: tx };
@ -50,4 +53,3 @@ impl TaskPool {
self.tx.send(job);
}
}

View file

@ -7,16 +7,15 @@
//! Data associated with queues is simply a pair of unsigned integers. It is expected that a
//! higher-level API on top of this could allow safe fork-join parallelism.
use task::spawn_named_native;
use task_state;
use native::task::NativeTaskBuilder;
use libc::funcs::posix88::unistd::usleep;
use rand::{Rng, XorShiftRng};
use std::mem;
use std::rand::weak_rng;
use std::sync::atomics::{AtomicUint, SeqCst};
use std::sync::deque::{Abort, BufferPool, Data, Empty, Stealer, Worker};
use std::task::TaskBuilder;
use libc::funcs::posix88::unistd::usleep;
/// A unit of work.
///
@ -247,12 +246,15 @@ impl<QueueData: Send, WorkData: Send> WorkQueue<QueueData, WorkData> {
}
// Spawn threads.
for thread in threads.into_iter() {
TaskBuilder::new().named(task_name).native().spawn(proc() {
task_state::initialize(state | task_state::InWorker);
let mut thread = thread;
thread.start()
})
for (i, thread) in threads.into_iter().enumerate() {
spawn_named_native(
format!("{} worker {}/{}", task_name, i+1, thread_count),
proc() {
task_state::initialize(state | task_state::InWorker);
let mut thread = thread;
thread.start()
})
}
WorkQueue {