Auto merge of #13041 - emilio:fallible-threadpool, r=bholley

style: Make WorkQueue creation fallible.

<!-- Please describe your changes on the following line: -->

---
<!-- Thank you for contributing to Servo! Please replace each `[ ]` by `[X]` when the step is complete, and replace `__` with appropriate data: -->
- [ ] `./mach build -d` does not report any errors
- [x] `./mach test-tidy` does not report any errors
- [ ] These changes fix #__ (github issue number if applicable).

<!-- Either: -->
- [x] These changes do not require tests.

<!-- Pull requests that do not address these steps are welcome, but they will require additional verification as part of the review process. -->

Fixes bug 1290205 in bugzilla.

<!-- Reviewable:start -->
---
This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/servo/servo/13041)
<!-- Reviewable:end -->
This commit is contained in:
bors-servo 2016-08-26 01:46:16 -05:00 committed by GitHub
commit 911bc94848
4 changed files with 43 additions and 15 deletions

View file

@ -399,7 +399,7 @@ impl LayoutThread {
MediaType::Screen,
opts::get().initial_window_size.to_f32() * ScaleFactor::new(1.0));
let parallel_traversal = if layout_threads != 1 {
Some(WorkQueue::new("LayoutWorker", thread_state::LAYOUT, layout_threads))
WorkQueue::new("LayoutWorker", thread_state::LAYOUT, layout_threads).ok()
} else {
None
};

View file

@ -18,8 +18,8 @@ use deque::{self, Abort, Data, Empty, Stealer, Worker};
use rand::{Rng, XorShiftRng, weak_rng};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, Sender, channel};
use std::thread;
use thread_state;
use util::thread::spawn_named;
/// A unit of work.
///
@ -244,10 +244,11 @@ impl<QueueData: Sync, WorkData: Send> WorkQueue<QueueData, WorkData> {
/// it.
pub fn new(thread_name: &'static str,
state: thread_state::ThreadState,
thread_count: usize) -> WorkQueue<QueueData, WorkData> {
thread_count: usize) -> Result<WorkQueue<QueueData, WorkData>, ()> {
// Set up data structures.
let (supervisor_chan, supervisor_port) = channel();
let (mut infos, mut threads) = (vec!(), vec!());
let mut infos = Vec::with_capacity(thread_count);
let mut threads = Vec::with_capacity(thread_count);
for i in 0..thread_count {
let (worker_chan, worker_port) = channel();
let (worker, thief) = deque::new();
@ -276,21 +277,42 @@ impl<QueueData: Sync, WorkData: Send> WorkQueue<QueueData, WorkData> {
}
// Spawn threads.
let mut thread_handles = vec![];
for (i, thread) in threads.into_iter().enumerate() {
spawn_named(
format!("{} worker {}/{}", thread_name, i + 1, thread_count),
move || {
let handle = thread::Builder::new()
.name(format!("{} worker {}/{}", thread_name, i + 1, thread_count))
.spawn(move || {
thread_state::initialize(state | thread_state::IN_WORKER);
let mut thread = thread;
thread.start()
})
});
match handle {
Ok(handle) => {
thread_handles.push(handle);
}
Err(err) => {
warn!("Failed spawning thread: {:?}", err);
break;
}
}
}
WorkQueue {
if thread_handles.len() != thread_count {
// At least one worker thread failed to be created, just close the
// rest of them, and return an error.
for (i, handle) in thread_handles.into_iter().enumerate() {
let _ = infos[i].chan.send(WorkerMsg::Exit);
let _ = handle.join();
}
return Err(());
}
Ok(WorkQueue {
workers: infos,
port: supervisor_port,
work_count: 0,
}
})
}
/// Enqueues a block into the work queue.