From a063fdde7380570a4b4b7bf9998470a47f6ea4c3 Mon Sep 17 00:00:00 2001 From: Patrick Walton Date: Thu, 11 Jun 2015 15:07:16 -0700 Subject: [PATCH] util: Reduce memory barriers and integer divisions in the work queue. Improves scaling on multicore ARM devices. --- components/util/deque/mod.rs | 21 ++++++------- components/util/workqueue.rs | 60 ++++++++++++++++++++++++------------ 2 files changed, 50 insertions(+), 31 deletions(-) diff --git a/components/util/deque/mod.rs b/components/util/deque/mod.rs index fc79d9f76ee..5200bd2e48c 100644 --- a/components/util/deque/mod.rs +++ b/components/util/deque/mod.rs @@ -45,8 +45,7 @@ // NB: the "buffer pool" strategy is not done for speed, but rather for // correctness. For more info, see the comment on `swap_buffer` -// FIXME: all atomic operations in this module use a SeqCst ordering. That is -// probably overkill +// FIXME: more than likely, more atomic operations than necessary use SeqCst pub use self::Stolen::{Empty, Abort, Data}; @@ -58,7 +57,7 @@ use std::ptr; use std::sync::Mutex; use std::sync::atomic::{AtomicIsize, AtomicPtr}; -use std::sync::atomic::Ordering::SeqCst; +use std::sync::atomic::Ordering::{Relaxed, SeqCst}; // Once the queue is less than 1/K full, then it will be downsized. Note that // the deque requires that this number be less than 2. @@ -240,8 +239,8 @@ impl Deque { } unsafe fn push(&self, data: T) { - let mut b = self.bottom.load(SeqCst); - let t = self.top.load(SeqCst); + let mut b = self.bottom.load(Relaxed); + let t = self.top.load(Relaxed); let mut a = self.array.load(SeqCst); let size = b - t; if size >= (*a).size() - 1 { @@ -256,11 +255,11 @@ impl Deque { } unsafe fn pop(&self) -> Option { - let b = self.bottom.load(SeqCst); + let b = self.bottom.load(Relaxed); let a = self.array.load(SeqCst); let b = b - 1; self.bottom.store(b, SeqCst); - let t = self.top.load(SeqCst); + let t = self.top.load(Relaxed); let size = b - t; if size < 0 { self.bottom.store(t, SeqCst); @@ -282,12 +281,12 @@ impl Deque { } unsafe fn steal(&self) -> Stolen { - let t = self.top.load(SeqCst); - let old = self.array.load(SeqCst); - let b = self.bottom.load(SeqCst); - let a = self.array.load(SeqCst); + let t = self.top.load(Relaxed); + let old = self.array.load(Relaxed); + let b = self.bottom.load(Relaxed); let size = b - t; if size <= 0 { return Empty } + let a = self.array.load(SeqCst); if size % (*a).size() == 0 { if a == old && t == self.top.load(SeqCst) { return Empty diff --git a/components/util/workqueue.rs b/components/util/workqueue.rs index c368e56e930..2a76adb51eb 100644 --- a/components/util/workqueue.rs +++ b/components/util/workqueue.rs @@ -76,14 +76,26 @@ struct WorkerThread { unsafe impl Send for WorkerThread {} -static SPIN_COUNT: u32 = 128; -static SPINS_UNTIL_BACKOFF: u32 = 100; -static BACKOFF_INCREMENT_IN_US: u32 = 5; +const SPINS_UNTIL_BACKOFF: u32 = 128; +const BACKOFF_INCREMENT_IN_US: u32 = 5; +const BACKOFFS_UNTIL_CONTROL_CHECK: u32 = 6; + +fn next_power_of_two(mut v: u32) -> u32 { + v -= 1; + v |= v >> 1; + v |= v >> 2; + v |= v >> 4; + v |= v >> 8; + v |= v >> 16; + v += 1; + v +} impl WorkerThread { /// The main logic. This function starts up the worker and listens for /// messages. fn start(&mut self) { + let deque_index_mask = next_power_of_two(self.other_deques.len() as u32) - 1; loop { // Wait for a start message. let (mut deque, ref_count, queue_data) = match self.port.recv().unwrap() { @@ -110,8 +122,16 @@ impl WorkerThread { let mut i = 0; let mut should_continue = true; loop { - let victim = (self.rng.next_u32() as usize) % self.other_deques.len(); - match self.other_deques[victim].steal() { + // Don't just use `rand % len` because that's slow on ARM. + let mut victim; + loop { + victim = self.rng.next_u32() & deque_index_mask; + if (victim as usize) < self.other_deques.len() { + break + } + } + + match self.other_deques[victim as usize].steal() { Empty | Abort => { // Continue. } @@ -123,23 +143,23 @@ impl WorkerThread { } if i > SPINS_UNTIL_BACKOFF { + if back_off_sleep >= BACKOFF_INCREMENT_IN_US * + BACKOFFS_UNTIL_CONTROL_CHECK { + match self.port.try_recv() { + Ok(WorkerMsg::Stop) => { + should_continue = false; + break + } + Ok(WorkerMsg::Exit) => return, + Ok(_) => panic!("unexpected message"), + _ => {} + } + } + unsafe { usleep(back_off_sleep as u32); } back_off_sleep += BACKOFF_INCREMENT_IN_US; - } - - if i == SPIN_COUNT { - match self.port.try_recv() { - Ok(WorkerMsg::Stop) => { - should_continue = false; - break - } - Ok(WorkerMsg::Exit) => return, - Ok(_) => panic!("unexpected message"), - _ => {} - } - i = 0 } else { i += 1 @@ -164,7 +184,7 @@ impl WorkerThread { // The work is done. Now decrement the count of outstanding work items. If this was // the last work unit in the queue, then send a message on the channel. unsafe { - if (*ref_count).fetch_sub(1, Ordering::SeqCst) == 1 { + if (*ref_count).fetch_sub(1, Ordering::Release) == 1 { self.chan.send(SupervisorMsg::Finished).unwrap() } } @@ -189,7 +209,7 @@ impl<'a, QueueData: 'static, WorkData: Send + 'static> WorkerProxy<'a, QueueData #[inline] pub fn push(&mut self, work_unit: WorkUnit) { unsafe { - drop((*self.ref_count).fetch_add(1, Ordering::SeqCst)); + drop((*self.ref_count).fetch_add(1, Ordering::Relaxed)); } self.worker.push(work_unit); }