mirror of
https://github.com/servo/servo.git
synced 2025-07-22 23:03:42 +01:00
Refactored image cache task - details below.
* Simpler image cache API for clients to use. * Significantly fewer threads. * One thread for image cache task (multiplexes commands, decoder threads and async resource requests). * 4 threads for decoder worker tasks. * Removed ReflowEvent hacks in script and layout tasks. * Image elements pass a Trusted<T> to image cache, which is used to dirty nodes via script task. Previous use of Untrusted addresses was unsafe. * Image requests such as background-image on layout / paint threads trigger repaint only rather than full reflow. * Add reflow batching for when multiple images load quickly. * Reduces the number of paints loading wikipedia from ~95 to ~35. * Reasonably simple to add proper prefetch support in a follow up PR. * Async loaded images always construct Image fragments now, instead of generic. * Image fragments support the image not being present. * Simpler implementation of synchronous image loading for reftests. * Removed image holder. * image.onload support. * image NaturalWidth and NaturalHeight support. * Updated WPT expectations.
This commit is contained in:
parent
e278e5b9a2
commit
d8aef7208e
33 changed files with 2785 additions and 1679 deletions
|
@ -2,408 +2,361 @@
|
|||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
use net_traits::ResourceTask;
|
||||
use collections::borrow::ToOwned;
|
||||
use net_traits::image::base::{Image, load_from_memory};
|
||||
use net_traits::image_cache_task::{ImageResponseMsg, ImageCacheTask, Msg};
|
||||
use net_traits::image_cache_task::{load_image_data};
|
||||
use profile::time::{self, profile};
|
||||
use url::Url;
|
||||
|
||||
use std::borrow::ToOwned;
|
||||
use net_traits::image_cache_task::{ImageState, ImageCacheTask, ImageCacheChan, ImageCacheCommand, ImageCacheResult};
|
||||
use net_traits::load_whole_resource;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::hash_map::Entry::{Occupied, Vacant};
|
||||
use std::mem::replace;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||
use std::mem;
|
||||
use std::sync::Arc;
|
||||
use std::sync::mpsc::{channel, Sender, Receiver, Select};
|
||||
use util::resource_files::resources_dir_path;
|
||||
use util::task::spawn_named;
|
||||
use util::taskpool::TaskPool;
|
||||
use url::Url;
|
||||
use net_traits::{AsyncResponseTarget, ControlMsg, LoadData, ResponseAction, ResourceTask, LoadConsumer};
|
||||
use net_traits::image_cache_task::ImageResponder;
|
||||
|
||||
pub trait ImageCacheTaskFactory {
|
||||
fn new(resource_task: ResourceTask, task_pool: TaskPool,
|
||||
time_profiler_chan: time::ProfilerChan,
|
||||
load_placeholder: LoadPlaceholder) -> Self;
|
||||
fn new_sync(resource_task: ResourceTask, task_pool: TaskPool,
|
||||
time_profiler_chan: time::ProfilerChan,
|
||||
load_placeholder: LoadPlaceholder) -> Self;
|
||||
///
|
||||
/// TODO(gw): Remaining work on image cache:
|
||||
/// * Make use of the prefetch support in various parts of the code.
|
||||
/// * Experiment with changing the image 'key' from being a url to a resource id
|
||||
/// This might be a decent performance win and/or memory saving in some cases (esp. data urls)
|
||||
/// * Profile time in GetImageIfAvailable - might be worth caching these results per paint / layout task.
|
||||
///
|
||||
|
||||
/// Represents an image that is either being loaded
|
||||
/// by the resource task, or decoded by a worker thread.
|
||||
struct PendingLoad {
|
||||
bytes: Vec<u8>,
|
||||
result: Option<Result<(), String>>,
|
||||
listeners: Vec<ImageListener>,
|
||||
}
|
||||
|
||||
impl ImageCacheTaskFactory for ImageCacheTask {
|
||||
fn new(resource_task: ResourceTask, task_pool: TaskPool,
|
||||
time_profiler_chan: time::ProfilerChan,
|
||||
load_placeholder: LoadPlaceholder) -> ImageCacheTask {
|
||||
let (chan, port) = channel();
|
||||
let chan_clone = chan.clone();
|
||||
|
||||
spawn_named("ImageCacheTask".to_owned(), move || {
|
||||
let mut cache = ImageCache {
|
||||
resource_task: resource_task,
|
||||
port: port,
|
||||
chan: chan_clone,
|
||||
state_map: HashMap::new(),
|
||||
wait_map: HashMap::new(),
|
||||
need_exit: None,
|
||||
task_pool: task_pool,
|
||||
time_profiler_chan: time_profiler_chan,
|
||||
placeholder_data: Arc::new(vec!()),
|
||||
};
|
||||
cache.run(load_placeholder);
|
||||
});
|
||||
|
||||
ImageCacheTask {
|
||||
chan: chan,
|
||||
impl PendingLoad {
|
||||
fn new() -> PendingLoad {
|
||||
PendingLoad {
|
||||
bytes: vec!(),
|
||||
result: None,
|
||||
listeners: vec!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn new_sync(resource_task: ResourceTask, task_pool: TaskPool,
|
||||
time_profiler_chan: time::ProfilerChan,
|
||||
load_placeholder: LoadPlaceholder) -> ImageCacheTask {
|
||||
let (chan, port) = channel();
|
||||
fn add_listener(&mut self, listener: ImageListener) {
|
||||
self.listeners.push(listener);
|
||||
}
|
||||
}
|
||||
|
||||
spawn_named("ImageCacheTask (sync)".to_owned(), move || {
|
||||
let inner_cache: ImageCacheTask = ImageCacheTaskFactory::new(resource_task, task_pool,
|
||||
time_profiler_chan, load_placeholder);
|
||||
/// Represents an image that has completed loading.
|
||||
/// Images that fail to load (due to network or decode
|
||||
/// failure) are still stored here, so that they aren't
|
||||
/// fetched again.
|
||||
struct CompletedLoad {
|
||||
image: Option<Arc<Image>>,
|
||||
}
|
||||
|
||||
loop {
|
||||
let msg: Msg = port.recv().unwrap();
|
||||
|
||||
match msg {
|
||||
Msg::GetImage(url, response) => {
|
||||
inner_cache.send(Msg::WaitForImage(url, response));
|
||||
}
|
||||
Msg::Exit(response) => {
|
||||
inner_cache.send(Msg::Exit(response));
|
||||
break;
|
||||
}
|
||||
msg => inner_cache.send(msg)
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
ImageCacheTask {
|
||||
chan: chan,
|
||||
impl CompletedLoad {
|
||||
fn new(image: Option<Arc<Image>>) -> CompletedLoad {
|
||||
CompletedLoad {
|
||||
image: image,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Stores information to notify a client when the state
|
||||
/// of an image changes.
|
||||
struct ImageListener {
|
||||
sender: ImageCacheChan,
|
||||
responder: Option<Box<ImageResponder>>,
|
||||
}
|
||||
|
||||
impl ImageListener {
|
||||
fn new(sender: ImageCacheChan, responder: Option<Box<ImageResponder>>) -> ImageListener {
|
||||
ImageListener {
|
||||
sender: sender,
|
||||
responder: responder,
|
||||
}
|
||||
}
|
||||
|
||||
fn notify(self, image: Option<Arc<Image>>) {
|
||||
let ImageCacheChan(ref sender) = self.sender;
|
||||
let msg = ImageCacheResult {
|
||||
responder: self.responder,
|
||||
image: image,
|
||||
};
|
||||
sender.send(msg).ok();
|
||||
}
|
||||
}
|
||||
|
||||
struct ResourceLoadInfo {
|
||||
action: ResponseAction,
|
||||
url: Url,
|
||||
}
|
||||
|
||||
struct ResourceListener {
|
||||
url: Url,
|
||||
sender: Sender<ResourceLoadInfo>,
|
||||
}
|
||||
|
||||
impl AsyncResponseTarget for ResourceListener {
|
||||
fn invoke_with_listener(&self, action: ResponseAction) {
|
||||
self.sender.send(ResourceLoadInfo {
|
||||
action: action,
|
||||
url: self.url.clone(),
|
||||
}).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
/// Implementation of the image cache
|
||||
struct ImageCache {
|
||||
/// A handle to the resource task for fetching the image binaries
|
||||
resource_task: ResourceTask,
|
||||
/// The port on which we'll receive client requests
|
||||
port: Receiver<Msg>,
|
||||
/// A copy of the shared chan to give to child tasks
|
||||
chan: Sender<Msg>,
|
||||
/// The state of processing an image for a URL
|
||||
state_map: HashMap<Url, ImageState>,
|
||||
/// List of clients waiting on a WaitForImage response
|
||||
wait_map: HashMap<Url, Arc<Mutex<Vec<Sender<ImageResponseMsg>>>>>,
|
||||
need_exit: Option<Sender<()>>,
|
||||
// Receive commands from clients
|
||||
cmd_receiver: Receiver<ImageCacheCommand>,
|
||||
|
||||
// Receive notifications from the resource task
|
||||
progress_receiver: Receiver<ResourceLoadInfo>,
|
||||
progress_sender: Sender<ResourceLoadInfo>,
|
||||
|
||||
// Receive notifications from the decoder thread pool
|
||||
decoder_receiver: Receiver<DecoderMsg>,
|
||||
decoder_sender: Sender<DecoderMsg>,
|
||||
|
||||
// Worker threads for decoding images.
|
||||
task_pool: TaskPool,
|
||||
time_profiler_chan: time::ProfilerChan,
|
||||
// Default image used when loading fails.
|
||||
placeholder_data: Arc<Vec<u8>>,
|
||||
|
||||
// Resource task handle
|
||||
resource_task: ResourceTask,
|
||||
|
||||
// Images that are loading over network, or decoding.
|
||||
pending_loads: HashMap<Url, PendingLoad>,
|
||||
|
||||
// Images that have finished loading (successful or not)
|
||||
completed_loads: HashMap<Url, CompletedLoad>,
|
||||
|
||||
// The placeholder image used when an image fails to load
|
||||
placeholder_image: Option<Arc<Image>>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
enum ImageState {
|
||||
Init,
|
||||
Prefetching(AfterPrefetch),
|
||||
Prefetched(Vec<u8>),
|
||||
Decoding,
|
||||
Decoded(Arc<Image>),
|
||||
Failed
|
||||
/// Message that the decoder worker threads send to main image cache task.
|
||||
struct DecoderMsg {
|
||||
url: Url,
|
||||
image: Option<Image>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
enum AfterPrefetch {
|
||||
DoDecode,
|
||||
DoNotDecode
|
||||
}
|
||||
|
||||
pub enum LoadPlaceholder {
|
||||
Preload,
|
||||
Ignore
|
||||
/// The types of messages that the main image cache task receives.
|
||||
enum SelectResult {
|
||||
Command(ImageCacheCommand),
|
||||
Progress(ResourceLoadInfo),
|
||||
Decoder(DecoderMsg),
|
||||
}
|
||||
|
||||
impl ImageCache {
|
||||
// Used to preload the default placeholder.
|
||||
fn init(&mut self) {
|
||||
let mut placeholder_url = resources_dir_path();
|
||||
placeholder_url.push("rippy.jpg");
|
||||
let image = load_image_data(Url::from_file_path(&*placeholder_url).unwrap(), self.resource_task.clone(), &self.placeholder_data);
|
||||
|
||||
match image {
|
||||
Err(..) => debug!("image_cache_task: failed loading the placeholder."),
|
||||
Ok(image_data) => self.placeholder_data = Arc::new(image_data),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run(&mut self, load_placeholder: LoadPlaceholder) {
|
||||
// We have to load the placeholder before running.
|
||||
match load_placeholder {
|
||||
LoadPlaceholder::Preload => self.init(),
|
||||
LoadPlaceholder::Ignore => debug!("image_cache_task: use old behavior."),
|
||||
}
|
||||
|
||||
let mut store_chan: Option<Sender<()>> = None;
|
||||
let mut store_prefetched_chan: Option<Sender<()>> = None;
|
||||
fn run(&mut self) {
|
||||
let mut exit_sender: Option<Sender<()>> = None;
|
||||
|
||||
loop {
|
||||
let msg = self.port.recv().unwrap();
|
||||
let result = {
|
||||
let sel = Select::new();
|
||||
|
||||
match msg {
|
||||
Msg::Prefetch(url) => self.prefetch(url),
|
||||
Msg::StorePrefetchedImageData(url, data) => {
|
||||
store_prefetched_chan.map(|chan| {
|
||||
chan.send(()).unwrap();
|
||||
});
|
||||
store_prefetched_chan = None;
|
||||
let mut cmd_handle = sel.handle(&self.cmd_receiver);
|
||||
let mut progress_handle = sel.handle(&self.progress_receiver);
|
||||
let mut decoder_handle = sel.handle(&self.decoder_receiver);
|
||||
|
||||
self.store_prefetched_image_data(url, data);
|
||||
}
|
||||
Msg::Decode(url) => self.decode(url),
|
||||
Msg::StoreImage(url, image) => {
|
||||
store_chan.map(|chan| {
|
||||
chan.send(()).unwrap();
|
||||
});
|
||||
store_chan = None;
|
||||
|
||||
self.store_image(url, image)
|
||||
}
|
||||
Msg::GetImage(url, response) => self.get_image(url, response),
|
||||
Msg::WaitForImage(url, response) => {
|
||||
self.wait_for_image(url, response)
|
||||
}
|
||||
Msg::WaitForStore(chan) => store_chan = Some(chan),
|
||||
Msg::WaitForStorePrefetched(chan) => store_prefetched_chan = Some(chan),
|
||||
Msg::Exit(response) => {
|
||||
assert!(self.need_exit.is_none());
|
||||
self.need_exit = Some(response);
|
||||
}
|
||||
}
|
||||
|
||||
let need_exit = replace(&mut self.need_exit, None);
|
||||
|
||||
match need_exit {
|
||||
Some(response) => {
|
||||
// Wait until we have no outstanding requests and subtasks
|
||||
// before exiting
|
||||
let mut can_exit = true;
|
||||
for (_, state) in self.state_map.iter() {
|
||||
match *state {
|
||||
ImageState::Prefetching(..) => can_exit = false,
|
||||
ImageState::Decoding => can_exit = false,
|
||||
|
||||
ImageState::Init | ImageState::Prefetched(..) |
|
||||
ImageState::Decoded(..) | ImageState::Failed => ()
|
||||
}
|
||||
unsafe {
|
||||
cmd_handle.add();
|
||||
progress_handle.add();
|
||||
decoder_handle.add();
|
||||
}
|
||||
|
||||
if can_exit {
|
||||
response.send(()).unwrap();
|
||||
break;
|
||||
let ret = sel.wait();
|
||||
|
||||
if ret == cmd_handle.id() {
|
||||
SelectResult::Command(self.cmd_receiver.recv().unwrap())
|
||||
} else if ret == decoder_handle.id() {
|
||||
SelectResult::Decoder(self.decoder_receiver.recv().unwrap())
|
||||
} else {
|
||||
self.need_exit = Some(response);
|
||||
SelectResult::Progress(self.progress_receiver.recv().unwrap())
|
||||
}
|
||||
}
|
||||
None => ()
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
fn get_state(&self, url: &Url) -> ImageState {
|
||||
match self.state_map.get(url) {
|
||||
Some(state) => state.clone(),
|
||||
None => ImageState::Init
|
||||
}
|
||||
}
|
||||
|
||||
fn set_state(&mut self, url: Url, state: ImageState) {
|
||||
self.state_map.insert(url, state);
|
||||
}
|
||||
|
||||
fn prefetch(&mut self, url: Url) {
|
||||
match self.get_state(&url) {
|
||||
ImageState::Init => {
|
||||
let to_cache = self.chan.clone();
|
||||
let resource_task = self.resource_task.clone();
|
||||
let url_clone = url.clone();
|
||||
let placeholder = self.placeholder_data.clone();
|
||||
spawn_named("ImageCacheTask (prefetch)".to_owned(), move || {
|
||||
let url = url_clone;
|
||||
debug!("image_cache_task: started fetch for {}", url.serialize());
|
||||
|
||||
let image = load_image_data(url.clone(), resource_task.clone(), &placeholder);
|
||||
to_cache.send(Msg::StorePrefetchedImageData(url.clone(), image)).unwrap();
|
||||
debug!("image_cache_task: ended fetch for {}", url.serialize());
|
||||
});
|
||||
|
||||
self.set_state(url, ImageState::Prefetching(AfterPrefetch::DoNotDecode));
|
||||
}
|
||||
|
||||
ImageState::Prefetching(..) | ImageState::Prefetched(..) |
|
||||
ImageState::Decoding | ImageState::Decoded(..) | ImageState::Failed => {
|
||||
// We've already begun working on this image
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn store_prefetched_image_data(&mut self, url: Url, data: Result<Vec<u8>, ()>) {
|
||||
match self.get_state(&url) {
|
||||
ImageState::Prefetching(next_step) => {
|
||||
match data {
|
||||
Ok(data) => {
|
||||
self.set_state(url.clone(), ImageState::Prefetched(data));
|
||||
match next_step {
|
||||
AfterPrefetch::DoDecode => self.decode(url),
|
||||
_ => ()
|
||||
match result {
|
||||
SelectResult::Command(cmd) => {
|
||||
exit_sender = self.handle_cmd(cmd);
|
||||
}
|
||||
}
|
||||
Err(..) => {
|
||||
self.set_state(url.clone(), ImageState::Failed);
|
||||
self.purge_waiters(url, || ImageResponseMsg::ImageFailed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ImageState::Init
|
||||
| ImageState::Prefetched(..)
|
||||
| ImageState::Decoding
|
||||
| ImageState::Decoded(..)
|
||||
| ImageState::Failed => {
|
||||
panic!("wrong state for storing prefetched image")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn decode(&mut self, url: Url) {
|
||||
match self.get_state(&url) {
|
||||
ImageState::Init => panic!("decoding image before prefetch"),
|
||||
|
||||
ImageState::Prefetching(AfterPrefetch::DoNotDecode) => {
|
||||
// We don't have the data yet, queue up the decode
|
||||
self.set_state(url, ImageState::Prefetching(AfterPrefetch::DoDecode))
|
||||
}
|
||||
|
||||
ImageState::Prefetching(AfterPrefetch::DoDecode) => {
|
||||
// We don't have the data yet, but the decode request is queued up
|
||||
}
|
||||
|
||||
ImageState::Prefetched(data) => {
|
||||
let to_cache = self.chan.clone();
|
||||
let url_clone = url.clone();
|
||||
let time_profiler_chan = self.time_profiler_chan.clone();
|
||||
|
||||
self.task_pool.execute(move || {
|
||||
let url = url_clone;
|
||||
debug!("image_cache_task: started image decode for {}", url.serialize());
|
||||
let image = profile(time::ProfilerCategory::ImageDecoding,
|
||||
None, time_profiler_chan, || {
|
||||
load_from_memory(&data)
|
||||
});
|
||||
|
||||
let image = image.map(Arc::new);
|
||||
to_cache.send(Msg::StoreImage(url.clone(), image)).unwrap();
|
||||
debug!("image_cache_task: ended image decode for {}", url.serialize());
|
||||
});
|
||||
|
||||
self.set_state(url, ImageState::Decoding);
|
||||
}
|
||||
|
||||
ImageState::Decoding | ImageState::Decoded(..) | ImageState::Failed => {
|
||||
// We've already begun decoding
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn store_image(&mut self, url: Url, image: Option<Arc<Image>>) {
|
||||
|
||||
match self.get_state(&url) {
|
||||
ImageState::Decoding => {
|
||||
match image {
|
||||
Some(image) => {
|
||||
self.set_state(url.clone(), ImageState::Decoded(image.clone()));
|
||||
self.purge_waiters(url, || ImageResponseMsg::ImageReady(image.clone()) );
|
||||
}
|
||||
None => {
|
||||
self.set_state(url.clone(), ImageState::Failed);
|
||||
self.purge_waiters(url, || ImageResponseMsg::ImageFailed );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ImageState::Init
|
||||
| ImageState::Prefetching(..)
|
||||
| ImageState::Prefetched(..)
|
||||
| ImageState::Decoded(..)
|
||||
| ImageState::Failed => {
|
||||
panic!("incorrect state in store_image")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn purge_waiters<F>(&mut self, url: Url, f: F) where F: Fn() -> ImageResponseMsg {
|
||||
match self.wait_map.remove(&url) {
|
||||
Some(waiters) => {
|
||||
let items = waiters.lock().unwrap();
|
||||
for response in items.iter() {
|
||||
response.send(f()).unwrap();
|
||||
SelectResult::Progress(msg) => {
|
||||
self.handle_progress(msg);
|
||||
}
|
||||
SelectResult::Decoder(msg) => {
|
||||
self.handle_decoder(msg);
|
||||
}
|
||||
}
|
||||
|
||||
// Can only exit when all pending loads are complete.
|
||||
if let Some(ref exit_sender) = exit_sender {
|
||||
if self.pending_loads.len() == 0 {
|
||||
exit_sender.send(()).unwrap();
|
||||
break;
|
||||
}
|
||||
}
|
||||
None => ()
|
||||
}
|
||||
}
|
||||
|
||||
fn get_image(&self, url: Url, response: Sender<ImageResponseMsg>) {
|
||||
match self.get_state(&url) {
|
||||
ImageState::Init => panic!("request for image before prefetch"),
|
||||
ImageState::Prefetching(AfterPrefetch::DoDecode) => response.send(ImageResponseMsg::ImageNotReady).unwrap(),
|
||||
ImageState::Prefetching(AfterPrefetch::DoNotDecode) | ImageState::Prefetched(..) => panic!("request for image before decode"),
|
||||
ImageState::Decoding => response.send(ImageResponseMsg::ImageNotReady).unwrap(),
|
||||
ImageState::Decoded(image) => response.send(ImageResponseMsg::ImageReady(image)).unwrap(),
|
||||
ImageState::Failed => response.send(ImageResponseMsg::ImageFailed).unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
fn wait_for_image(&mut self, url: Url, response: Sender<ImageResponseMsg>) {
|
||||
match self.get_state(&url) {
|
||||
ImageState::Init => panic!("request for image before prefetch"),
|
||||
|
||||
ImageState::Prefetching(AfterPrefetch::DoNotDecode) | ImageState::Prefetched(..) => panic!("request for image before decode"),
|
||||
|
||||
ImageState::Prefetching(AfterPrefetch::DoDecode) | ImageState::Decoding => {
|
||||
// We don't have this image yet
|
||||
match self.wait_map.entry(url) {
|
||||
Occupied(mut entry) => {
|
||||
entry.get_mut().lock().unwrap().push(response);
|
||||
// Handle a request from a client
|
||||
fn handle_cmd(&mut self, cmd: ImageCacheCommand) -> Option<Sender<()>> {
|
||||
match cmd {
|
||||
ImageCacheCommand::Exit(sender) => {
|
||||
return Some(sender);
|
||||
}
|
||||
ImageCacheCommand::RequestImage(url, result_chan, responder) => {
|
||||
self.request_image(url, result_chan, responder);
|
||||
}
|
||||
ImageCacheCommand::GetImageIfAvailable(url, consumer) => {
|
||||
let result = match self.completed_loads.get(&url) {
|
||||
Some(completed_load) => {
|
||||
completed_load.image.clone().ok_or(ImageState::LoadError)
|
||||
}
|
||||
Vacant(entry) => {
|
||||
entry.insert(Arc::new(Mutex::new(vec!(response))));
|
||||
None => {
|
||||
let pending_load = self.pending_loads.get(&url);
|
||||
Err(pending_load.map_or(ImageState::NotRequested, |_| ImageState::Pending))
|
||||
}
|
||||
};
|
||||
consumer.send(result).unwrap();
|
||||
}
|
||||
};
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
// Handle progress messages from the resource task
|
||||
fn handle_progress(&mut self, msg: ResourceLoadInfo) {
|
||||
match msg.action {
|
||||
ResponseAction::HeadersAvailable(_) => {}
|
||||
ResponseAction::DataAvailable(data) => {
|
||||
let pending_load = self.pending_loads.get_mut(&msg.url).unwrap();
|
||||
pending_load.bytes.push_all(data.as_slice());
|
||||
}
|
||||
ResponseAction::ResponseComplete(result) => {
|
||||
match result {
|
||||
Ok(()) => {
|
||||
let pending_load = self.pending_loads.get_mut(&msg.url).unwrap();
|
||||
pending_load.result = Some(result);
|
||||
|
||||
let bytes = mem::replace(&mut pending_load.bytes, vec!());
|
||||
let url = msg.url.clone();
|
||||
let sender = self.decoder_sender.clone();
|
||||
|
||||
self.task_pool.execute(move || {
|
||||
let image = load_from_memory(bytes.as_slice());
|
||||
let msg = DecoderMsg {
|
||||
url: url,
|
||||
image: image
|
||||
};
|
||||
sender.send(msg).unwrap();
|
||||
});
|
||||
}
|
||||
Err(_) => {
|
||||
let placeholder_image = self.placeholder_image.clone();
|
||||
self.complete_load(msg.url, placeholder_image);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ImageState::Decoded(image) => {
|
||||
response.send(ImageResponseMsg::ImageReady(image)).unwrap();
|
||||
// Handle a message from one of the decoder worker threads
|
||||
fn handle_decoder(&mut self, msg: DecoderMsg) {
|
||||
let image = msg.image.map(Arc::new);
|
||||
self.complete_load(msg.url, image);
|
||||
}
|
||||
|
||||
// Change state of a url from pending -> loaded.
|
||||
fn complete_load(&mut self, url: Url, image: Option<Arc<Image>>) {
|
||||
let pending_load = self.pending_loads.remove(&url).unwrap();
|
||||
|
||||
let completed_load = CompletedLoad::new(image.clone());
|
||||
self.completed_loads.insert(url, completed_load);
|
||||
|
||||
for listener in pending_load.listeners.into_iter() {
|
||||
listener.notify(image.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Request an image from the cache
|
||||
fn request_image(&mut self, url: Url, result_chan: ImageCacheChan, responder: Option<Box<ImageResponder>>) {
|
||||
let image_listener = ImageListener::new(result_chan, responder);
|
||||
|
||||
// Check if already completed
|
||||
match self.completed_loads.get(&url) {
|
||||
Some(completed_load) => {
|
||||
// It's already completed, return a notify straight away
|
||||
image_listener.notify(completed_load.image.clone());
|
||||
}
|
||||
None => {
|
||||
// Check if the load is already pending
|
||||
match self.pending_loads.entry(url.clone()) {
|
||||
Occupied(mut e) => {
|
||||
// It's pending, so add the listener for state changes
|
||||
let pending_load = e.get_mut();
|
||||
pending_load.add_listener(image_listener);
|
||||
}
|
||||
Vacant(e) => {
|
||||
// A new load request! Add the pending load and request
|
||||
// it from the resource task.
|
||||
let mut pending_load = PendingLoad::new();
|
||||
pending_load.add_listener(image_listener);
|
||||
e.insert(pending_load);
|
||||
|
||||
ImageState::Failed => {
|
||||
response.send(ImageResponseMsg::ImageFailed).unwrap();
|
||||
let load_data = LoadData::new(url.clone());
|
||||
let listener = box ResourceListener {
|
||||
url: url,
|
||||
sender: self.progress_sender.clone(),
|
||||
};
|
||||
self.resource_task.send(ControlMsg::Load(load_data, LoadConsumer::Listener(listener))).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_listener<F, A>(f: F) -> Sender<A>
|
||||
where F: FnOnce(Receiver<A>) + Send + 'static,
|
||||
A: Send + 'static
|
||||
{
|
||||
let (setup_chan, setup_port) = channel();
|
||||
/// Create a new image cache.
|
||||
pub fn new_image_cache_task(resource_task: ResourceTask) -> ImageCacheTask {
|
||||
let (cmd_sender, cmd_receiver) = channel();
|
||||
let (progress_sender, progress_receiver) = channel();
|
||||
let (decoder_sender, decoder_receiver) = channel();
|
||||
|
||||
spawn_named("ImageCacheTask (listener)".to_owned(), move || {
|
||||
let (chan, port) = channel();
|
||||
setup_chan.send(chan).unwrap();
|
||||
f(port);
|
||||
spawn_named("ImageCacheThread".to_owned(), move || {
|
||||
|
||||
// Preload the placeholder image, used when images fail to load.
|
||||
let mut placeholder_url = resources_dir_path();
|
||||
// TODO (Savago): replace for a prettier one.
|
||||
placeholder_url.push("rippy.jpg");
|
||||
let url = Url::from_file_path(&*placeholder_url).unwrap();
|
||||
let placeholder_image = match load_whole_resource(&resource_task, url) {
|
||||
Err(..) => {
|
||||
debug!("image_cache_task: failed loading the placeholder.");
|
||||
None
|
||||
}
|
||||
Ok((_, image_data)) => {
|
||||
Some(Arc::new(load_from_memory(&image_data).unwrap()))
|
||||
}
|
||||
};
|
||||
|
||||
let mut cache = ImageCache {
|
||||
cmd_receiver: cmd_receiver,
|
||||
progress_sender: progress_sender,
|
||||
progress_receiver: progress_receiver,
|
||||
decoder_sender: decoder_sender,
|
||||
decoder_receiver: decoder_receiver,
|
||||
task_pool: TaskPool::new(4),
|
||||
pending_loads: HashMap::new(),
|
||||
completed_loads: HashMap::new(),
|
||||
resource_task: resource_task,
|
||||
placeholder_image: placeholder_image,
|
||||
};
|
||||
|
||||
cache.run();
|
||||
});
|
||||
setup_port.recv().unwrap()
|
||||
|
||||
ImageCacheTask::new(cmd_sender)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue