util: Reduce memory barriers and integer divisions in the work queue.

Improves scaling on multicore ARM devices.
This commit is contained in:
Patrick Walton 2015-06-11 15:07:16 -07:00
parent d87af8ac52
commit a063fdde73
2 changed files with 50 additions and 31 deletions

View file

@ -76,14 +76,26 @@ struct WorkerThread<QueueData: 'static, WorkData: 'static> {
unsafe impl<QueueData: 'static, WorkData: 'static> Send for WorkerThread<QueueData, WorkData> {}
static SPIN_COUNT: u32 = 128;
static SPINS_UNTIL_BACKOFF: u32 = 100;
static BACKOFF_INCREMENT_IN_US: u32 = 5;
const SPINS_UNTIL_BACKOFF: u32 = 128;
const BACKOFF_INCREMENT_IN_US: u32 = 5;
const BACKOFFS_UNTIL_CONTROL_CHECK: u32 = 6;
fn next_power_of_two(mut v: u32) -> u32 {
v -= 1;
v |= v >> 1;
v |= v >> 2;
v |= v >> 4;
v |= v >> 8;
v |= v >> 16;
v += 1;
v
}
impl<QueueData: Send, WorkData: Send> WorkerThread<QueueData, WorkData> {
/// The main logic. This function starts up the worker and listens for
/// messages.
fn start(&mut self) {
let deque_index_mask = next_power_of_two(self.other_deques.len() as u32) - 1;
loop {
// Wait for a start message.
let (mut deque, ref_count, queue_data) = match self.port.recv().unwrap() {
@ -110,8 +122,16 @@ impl<QueueData: Send, WorkData: Send> WorkerThread<QueueData, WorkData> {
let mut i = 0;
let mut should_continue = true;
loop {
let victim = (self.rng.next_u32() as usize) % self.other_deques.len();
match self.other_deques[victim].steal() {
// Don't just use `rand % len` because that's slow on ARM.
let mut victim;
loop {
victim = self.rng.next_u32() & deque_index_mask;
if (victim as usize) < self.other_deques.len() {
break
}
}
match self.other_deques[victim as usize].steal() {
Empty | Abort => {
// Continue.
}
@ -123,23 +143,23 @@ impl<QueueData: Send, WorkData: Send> WorkerThread<QueueData, WorkData> {
}
if i > SPINS_UNTIL_BACKOFF {
if back_off_sleep >= BACKOFF_INCREMENT_IN_US *
BACKOFFS_UNTIL_CONTROL_CHECK {
match self.port.try_recv() {
Ok(WorkerMsg::Stop) => {
should_continue = false;
break
}
Ok(WorkerMsg::Exit) => return,
Ok(_) => panic!("unexpected message"),
_ => {}
}
}
unsafe {
usleep(back_off_sleep as u32);
}
back_off_sleep += BACKOFF_INCREMENT_IN_US;
}
if i == SPIN_COUNT {
match self.port.try_recv() {
Ok(WorkerMsg::Stop) => {
should_continue = false;
break
}
Ok(WorkerMsg::Exit) => return,
Ok(_) => panic!("unexpected message"),
_ => {}
}
i = 0
} else {
i += 1
@ -164,7 +184,7 @@ impl<QueueData: Send, WorkData: Send> WorkerThread<QueueData, WorkData> {
// The work is done. Now decrement the count of outstanding work items. If this was
// the last work unit in the queue, then send a message on the channel.
unsafe {
if (*ref_count).fetch_sub(1, Ordering::SeqCst) == 1 {
if (*ref_count).fetch_sub(1, Ordering::Release) == 1 {
self.chan.send(SupervisorMsg::Finished).unwrap()
}
}
@ -189,7 +209,7 @@ impl<'a, QueueData: 'static, WorkData: Send + 'static> WorkerProxy<'a, QueueData
#[inline]
pub fn push(&mut self, work_unit: WorkUnit<QueueData, WorkData>) {
unsafe {
drop((*self.ref_count).fetch_add(1, Ordering::SeqCst));
drop((*self.ref_count).fetch_add(1, Ordering::Relaxed));
}
self.worker.push(work_unit);
}