Introduce a basic shared task pool, and use it for image decoding.

This commit is contained in:
Glenn Watson 2014-10-20 13:34:14 +10:00
parent a983debaf1
commit 26636474a9
4 changed files with 84 additions and 23 deletions

View file

@ -6,6 +6,7 @@ use image::base::{Image, load_from_memory};
use resource_task;
use resource_task::{LoadData, ResourceTask};
use servo_util::taskpool::TaskPool;
use std::comm::{channel, Receiver, Sender};
use std::collections::hashmap::HashMap;
use std::mem::replace;
@ -79,7 +80,7 @@ impl<E, S: Encoder<E>> Encodable<S, E> for ImageCacheTask {
type DecoderFactory = fn() -> (proc(&[u8]) : 'static -> Option<Image>);
impl ImageCacheTask {
pub fn new(resource_task: ResourceTask) -> ImageCacheTask {
pub fn new(resource_task: ResourceTask, task_pool: TaskPool) -> ImageCacheTask {
let (chan, port) = channel();
let chan_clone = chan.clone();
@ -90,7 +91,8 @@ impl ImageCacheTask {
chan: chan_clone,
state_map: HashMap::new(),
wait_map: HashMap::new(),
need_exit: None
need_exit: None,
task_pool: task_pool,
};
cache.run();
});
@ -100,11 +102,11 @@ impl ImageCacheTask {
}
}
pub fn new_sync(resource_task: ResourceTask) -> ImageCacheTask {
pub fn new_sync(resource_task: ResourceTask, task_pool: TaskPool) -> ImageCacheTask {
let (chan, port) = channel();
spawn(proc() {
let inner_cache = ImageCacheTask::new(resource_task);
let inner_cache = ImageCacheTask::new(resource_task, task_pool);
loop {
let msg: Msg = port.recv();
@ -140,6 +142,7 @@ struct ImageCache {
/// List of clients waiting on a WaitForImage response
wait_map: HashMap<Url, Arc<Mutex<Vec<Sender<ImageResponseMsg>>>>>,
need_exit: Option<Sender<()>>,
task_pool: TaskPool,
}
#[deriving(Clone)]
@ -314,7 +317,7 @@ impl ImageCache {
let to_cache = self.chan.clone();
let url_clone = url.clone();
spawn(proc() {
self.task_pool.execute(proc() {
let url = url_clone;
debug!("image_cache_task: started image decode for {:s}", url.serialize());
let image = load_from_memory(data.as_slice());
@ -493,6 +496,7 @@ mod tests {
use resource_task;
use resource_task::{ResourceTask, Metadata, start_sending};
use image::base::test_image_bin;
use servo_util::taskpool::TaskPool;
use std::comm;
use url::Url;
@ -581,7 +585,7 @@ mod tests {
fn should_exit_on_request() {
let mock_resource_task = mock_resource_task(box DoesNothing);
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
image_cache_task.exit();
mock_resource_task.send(resource_task::Exit);
@ -592,7 +596,7 @@ mod tests {
fn should_fail_if_unprefetched_image_is_requested() {
let mock_resource_task = mock_resource_task(box DoesNothing);
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
let (chan, port) = channel();
@ -606,7 +610,7 @@ mod tests {
let mock_resource_task = mock_resource_task(box JustSendOK { url_requested_chan: url_requested_chan});
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
image_cache_task.send(Prefetch(url));
@ -621,7 +625,7 @@ mod tests {
let mock_resource_task = mock_resource_task(box JustSendOK { url_requested_chan: url_requested_chan});
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
image_cache_task.send(Prefetch(url.clone()));
@ -641,7 +645,7 @@ mod tests {
let mock_resource_task = mock_resource_task(box WaitSendTestImage{wait_port: wait_port});
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
image_cache_task.send(Prefetch(url.clone()));
@ -658,7 +662,7 @@ mod tests {
fn should_return_decoded_image_data_if_data_has_arrived() {
let mock_resource_task = mock_resource_task(box SendTestImage);
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
let join_port = image_cache_task.wait_for_store();
@ -684,7 +688,7 @@ mod tests {
fn should_return_decoded_image_data_for_multiple_requests() {
let mock_resource_task = mock_resource_task(box SendTestImage);
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
let join_port = image_cache_task.wait_for_store();
@ -732,7 +736,7 @@ mod tests {
}
});
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
image_cache_task.send(Prefetch(url.clone()));
@ -779,7 +783,7 @@ mod tests {
}
});
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
image_cache_task.send(Prefetch(url.clone()));
@ -808,7 +812,7 @@ mod tests {
fn should_return_failed_if_image_bin_cannot_be_fetched() {
let mock_resource_task = mock_resource_task(box SendTestImageErr);
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
let join_port = image_cache_task.wait_for_store_prefetched();
@ -834,7 +838,7 @@ mod tests {
fn should_return_failed_for_multiple_get_image_requests_if_image_bin_cannot_be_fetched() {
let mock_resource_task = mock_resource_task(box SendTestImageErr);
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
let join_port = image_cache_task.wait_for_store_prefetched();
@ -868,7 +872,7 @@ mod tests {
fn should_return_failed_if_image_decode_fails() {
let mock_resource_task = mock_resource_task(box SendBogusImage);
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
let join_port = image_cache_task.wait_for_store();
@ -896,7 +900,7 @@ mod tests {
fn should_return_image_on_wait_if_image_is_already_loaded() {
let mock_resource_task = mock_resource_task(box SendTestImage);
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
let join_port = image_cache_task.wait_for_store();
@ -924,7 +928,7 @@ mod tests {
let mock_resource_task = mock_resource_task(box WaitSendTestImage {wait_port: wait_port});
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
image_cache_task.send(Prefetch(url.clone()));
@ -950,7 +954,7 @@ mod tests {
let mock_resource_task = mock_resource_task(box WaitSendTestImageErr{wait_port: wait_port});
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
image_cache_task.send(Prefetch(url.clone()));
@ -974,7 +978,7 @@ mod tests {
fn sync_cache_should_wait_for_images() {
let mock_resource_task = mock_resource_task(box SendTestImage);
let image_cache_task = ImageCacheTask::new_sync(mock_resource_task.clone());
let image_cache_task = ImageCacheTask::new_sync(mock_resource_task.clone(), TaskPool::new(4));
let url = Url::parse("file:///").unwrap();
image_cache_task.send(Prefetch(url.clone()));

View file

@ -50,6 +50,7 @@ pub mod str;
pub mod task;
pub mod tid;
pub mod time;
pub mod taskpool;
pub mod vec;
pub mod workqueue;

View file

@ -0,0 +1,53 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
//! A load-balancing task pool.
//!
//! This differs in implementation from std::sync::TaskPool in that each job is
//! up for grabs by any of the child tasks in the pool.
//!
//
// This is based on the cargo task pool.
// https://github.com/rust-lang/cargo/blob/master/src/cargo/util/pool.rs
//
// The only difference is that a normal channel is used instead of a sync_channel.
//
use std::sync::{Arc, Mutex};
pub struct TaskPool {
tx: Sender<proc():Send>,
}
impl TaskPool {
pub fn new(tasks: uint) -> TaskPool {
assert!(tasks > 0);
let (tx, rx) = channel();
let state = Arc::new(Mutex::new(rx));
for _ in range(0, tasks) {
let state = state.clone();
spawn(proc() worker(&*state));
}
return TaskPool { tx: tx };
fn worker(rx: &Mutex<Receiver<proc():Send>>) {
loop {
let job = rx.lock().recv_opt();
match job {
Ok(job) => job(),
Err(..) => break,
}
}
}
}
pub fn execute(&self, job: proc():Send) {
self.tx.send(job);
}
}

View file

@ -51,6 +51,8 @@ use servo_util::time::TimeProfiler;
use servo_util::memory::MemoryProfiler;
#[cfg(not(test))]
use servo_util::opts;
#[cfg(not(test))]
use servo_util::taskpool::TaskPool;
#[cfg(not(test))]
use green::GreenTaskBuilder;
@ -79,6 +81,7 @@ pub fn run<Window: WindowMethods>(opts: opts::Opts, window: Option<Rc<Window>>)
let opts_clone = opts.clone();
let time_profiler_chan_clone = time_profiler_chan.clone();
let shared_task_pool = TaskPool::new(8);
let (result_chan, result_port) = channel();
TaskBuilder::new()
@ -91,9 +94,9 @@ pub fn run<Window: WindowMethods>(opts: opts::Opts, window: Option<Rc<Window>>)
// image load or we risk emitting an output file missing the
// image.
let image_cache_task = if opts.output_file.is_some() {
ImageCacheTask::new_sync(resource_task.clone())
ImageCacheTask::new_sync(resource_task.clone(), shared_task_pool)
} else {
ImageCacheTask::new(resource_task.clone())
ImageCacheTask::new(resource_task.clone(), shared_task_pool)
};
let font_cache_task = FontCacheTask::new(resource_task.clone());
let constellation_chan = Constellation::<layout::layout_task::LayoutTask,