servo/components/net/image_cache_task.rs
Ralph Giles 135bcacee4 Use the correct .png extension for the image placeholder.
This was changed to the mozilla broken image resource
without changing the filename, even though the file
type changed.
2015-11-13 11:45:09 -08:00

493 lines
17 KiB
Rust

/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use ipc_channel::ipc::{self, IpcSender};
use ipc_channel::router::ROUTER;
use net_traits::image::base::{Image, load_from_memory};
use net_traits::image_cache_task::ImageResponder;
use net_traits::image_cache_task::{ImageCacheChan, ImageCacheCommand, ImageCacheTask, ImageState};
use net_traits::image_cache_task::{ImageCacheResult, ImageResponse, UsePlaceholder};
use net_traits::{AsyncResponseTarget, ControlMsg, LoadConsumer, LoadData, ResourceTask, ResponseAction};
use std::borrow::ToOwned;
use std::collections::HashMap;
use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::fs::File;
use std::io::Read;
use std::mem;
use std::sync::Arc;
use std::sync::mpsc::{Receiver, Select, Sender, channel};
use url::Url;
use util::resource_files::resources_dir_path;
use util::task::spawn_named;
use util::taskpool::TaskPool;
///
/// TODO(gw): Remaining work on image cache:
/// * Make use of the prefetch support in various parts of the code.
/// * Profile time in GetImageIfAvailable - might be worth caching these results per paint / layout task.
///
/// MAYBE(Yoric):
/// * For faster lookups, it might be useful to store the LoadKey in the DOM once we have performed a first load.
/// Represents an image that is either being loaded
/// by the resource task, or decoded by a worker thread.
struct PendingLoad {
// The bytes loaded so far. Reset to an empty vector once loading
// is complete and the buffer has been transmitted to the decoder.
bytes: Vec<u8>,
// Once loading is complete, the result of the operation.
result: Option<Result<(), String>>,
listeners: Vec<ImageListener>,
// The url being loaded. Do not forget that this may be several Mb
// if we are loading a data: url.
url: Arc<Url>
}
impl PendingLoad {
fn new(url: Arc<Url>) -> PendingLoad {
PendingLoad {
bytes: vec!(),
result: None,
listeners: vec!(),
url: url,
}
}
fn add_listener(&mut self, listener: ImageListener) {
self.listeners.push(listener);
}
}
// Represents all the currently pending loads/decodings. For
// performance reasons, loads are indexed by a dedicated load key.
struct AllPendingLoads {
// The loads, indexed by a load key. Used during most operations,
// for performance reasons.
loads: HashMap<LoadKey, PendingLoad>,
// Get a load key from its url. Used ony when starting and
// finishing a load or when adding a new listener.
url_to_load_key: HashMap<Arc<Url>, LoadKey>,
// A counter used to generate instances of LoadKey
keygen: LoadKeyGenerator,
}
// Result of accessing a cache.
#[derive(Eq, PartialEq)]
enum CacheResult {
Hit, // The value was in the cache.
Miss, // The value was not in the cache and needed to be regenerated.
}
impl AllPendingLoads {
fn new() -> AllPendingLoads {
AllPendingLoads {
loads: HashMap::new(),
url_to_load_key: HashMap::new(),
keygen: LoadKeyGenerator::new(),
}
}
// `true` if there is no currently pending load, `false` otherwise.
fn is_empty(&self) -> bool {
assert!(self.loads.is_empty() == self.url_to_load_key.is_empty());
self.loads.is_empty()
}
// get a PendingLoad from its LoadKey. Prefer this to `get_by_url`,
// for performance reasons.
fn get_by_key_mut(&mut self, key: &LoadKey) -> Option<&mut PendingLoad> {
self.loads.get_mut(key)
}
// get a PendingLoad from its url. When possible, prefer `get_by_key_mut`.
fn get_by_url(&self, url: &Url) -> Option<&PendingLoad> {
self.url_to_load_key.get(url).
and_then(|load_key|
self.loads.get(load_key)
)
}
fn remove(&mut self, key: &LoadKey) -> Option<PendingLoad> {
self.loads.remove(key).
and_then(|pending_load| {
self.url_to_load_key.remove(&pending_load.url).unwrap();
Some(pending_load)
})
}
fn get_cached(&mut self, url: Arc<Url>) -> (CacheResult, LoadKey, &mut PendingLoad) {
match self.url_to_load_key.entry(url.clone()) {
Occupied(url_entry) => {
let load_key = url_entry.get();
(CacheResult::Hit, *load_key, self.loads.get_mut(load_key).unwrap())
}
Vacant(url_entry) => {
let load_key = self.keygen.next();
url_entry.insert(load_key);
let pending_load = PendingLoad::new(url);
match self.loads.entry(load_key) {
Occupied(_) => unreachable!(),
Vacant(load_entry) => {
let mut_load = load_entry.insert(pending_load);
(CacheResult::Miss, load_key, mut_load)
}
}
}
}
}
}
/// Represents an image that has completed loading.
/// Images that fail to load (due to network or decode
/// failure) are still stored here, so that they aren't
/// fetched again.
struct CompletedLoad {
image_response: ImageResponse,
}
impl CompletedLoad {
fn new(image_response: ImageResponse) -> CompletedLoad {
CompletedLoad {
image_response: image_response,
}
}
}
/// Stores information to notify a client when the state
/// of an image changes.
struct ImageListener {
sender: ImageCacheChan,
responder: Option<ImageResponder>,
}
// A key used to communicate during loading.
#[derive(Eq, Hash, PartialEq, Clone, Copy)]
struct LoadKey(u64);
struct LoadKeyGenerator {
counter: u64
}
impl LoadKeyGenerator {
fn new() -> LoadKeyGenerator {
LoadKeyGenerator {
counter: 0
}
}
fn next(&mut self) -> LoadKey {
self.counter += 1;
LoadKey(self.counter)
}
}
impl ImageListener {
fn new(sender: ImageCacheChan, responder: Option<ImageResponder>) -> ImageListener {
ImageListener {
sender: sender,
responder: responder,
}
}
fn notify(self, image_response: ImageResponse) {
let ImageCacheChan(ref sender) = self.sender;
let msg = ImageCacheResult {
responder: self.responder,
image_response: image_response,
};
sender.send(msg).ok();
}
}
struct ResourceLoadInfo {
action: ResponseAction,
key: LoadKey,
}
/// Implementation of the image cache
struct ImageCache {
// Receive commands from clients
cmd_receiver: Receiver<ImageCacheCommand>,
// Receive notifications from the resource task
progress_receiver: Receiver<ResourceLoadInfo>,
progress_sender: Sender<ResourceLoadInfo>,
// Receive notifications from the decoder thread pool
decoder_receiver: Receiver<DecoderMsg>,
decoder_sender: Sender<DecoderMsg>,
// Worker threads for decoding images.
task_pool: TaskPool,
// Resource task handle
resource_task: ResourceTask,
// Images that are loading over network, or decoding.
pending_loads: AllPendingLoads,
// Images that have finished loading (successful or not)
completed_loads: HashMap<Arc<Url>, CompletedLoad>,
// The placeholder image used when an image fails to load
placeholder_image: Option<Arc<Image>>,
}
/// Message that the decoder worker threads send to main image cache task.
struct DecoderMsg {
key: LoadKey,
image: Option<Image>,
}
/// The types of messages that the main image cache task receives.
enum SelectResult {
Command(ImageCacheCommand),
Progress(ResourceLoadInfo),
Decoder(DecoderMsg),
}
impl ImageCache {
fn run(&mut self) {
let mut exit_sender: Option<IpcSender<()>> = None;
loop {
let result = {
let sel = Select::new();
let mut cmd_handle = sel.handle(&self.cmd_receiver);
let mut progress_handle = sel.handle(&self.progress_receiver);
let mut decoder_handle = sel.handle(&self.decoder_receiver);
unsafe {
cmd_handle.add();
progress_handle.add();
decoder_handle.add();
}
let ret = sel.wait();
if ret == cmd_handle.id() {
SelectResult::Command(self.cmd_receiver.recv().unwrap())
} else if ret == decoder_handle.id() {
SelectResult::Decoder(self.decoder_receiver.recv().unwrap())
} else {
SelectResult::Progress(self.progress_receiver.recv().unwrap())
}
};
match result {
SelectResult::Command(cmd) => {
exit_sender = self.handle_cmd(cmd);
}
SelectResult::Progress(msg) => {
self.handle_progress(msg);
}
SelectResult::Decoder(msg) => {
self.handle_decoder(msg);
}
}
// Can only exit when all pending loads are complete.
if let Some(ref exit_sender) = exit_sender {
if self.pending_loads.is_empty() {
exit_sender.send(()).unwrap();
break;
}
}
}
}
// Handle a request from a client
fn handle_cmd(&mut self, cmd: ImageCacheCommand) -> Option<IpcSender<()>> {
match cmd {
ImageCacheCommand::Exit(sender) => {
return Some(sender);
}
ImageCacheCommand::RequestImage(url, result_chan, responder) => {
self.request_image(url, result_chan, responder);
}
ImageCacheCommand::GetImageIfAvailable(url, use_placeholder, consumer) => {
let result = match self.completed_loads.get(&url) {
Some(completed_load) => {
match (completed_load.image_response.clone(), use_placeholder) {
(ImageResponse::Loaded(image), _) |
(ImageResponse::PlaceholderLoaded(image), UsePlaceholder::Yes) => {
Ok(image)
}
(ImageResponse::PlaceholderLoaded(_), UsePlaceholder::No) |
(ImageResponse::None, _) => {
Err(ImageState::LoadError)
}
}
}
None => {
self.pending_loads.get_by_url(&url).
map_or(Err(ImageState::NotRequested), |_| Err(ImageState::Pending))
}
};
consumer.send(result).unwrap();
}
};
None
}
// Handle progress messages from the resource task
fn handle_progress(&mut self, msg: ResourceLoadInfo) {
match (msg.action, msg.key) {
(ResponseAction::HeadersAvailable(_), _) => {}
(ResponseAction::DataAvailable(data), _) => {
let pending_load = self.pending_loads.get_by_key_mut(&msg.key).unwrap();
pending_load.bytes.push_all(&data);
}
(ResponseAction::ResponseComplete(result), key) => {
match result {
Ok(()) => {
let pending_load = self.pending_loads.get_by_key_mut(&msg.key).unwrap();
pending_load.result = Some(result);
let bytes = mem::replace(&mut pending_load.bytes, vec!());
let sender = self.decoder_sender.clone();
self.task_pool.execute(move || {
let image = load_from_memory(&bytes);
let msg = DecoderMsg {
key: key,
image: image
};
sender.send(msg).unwrap();
});
}
Err(_) => {
match self.placeholder_image.clone() {
Some(placeholder_image) => {
self.complete_load(msg.key, ImageResponse::PlaceholderLoaded(
placeholder_image))
}
None => self.complete_load(msg.key, ImageResponse::None),
}
}
}
}
}
}
// Handle a message from one of the decoder worker threads
fn handle_decoder(&mut self, msg: DecoderMsg) {
let image = match msg.image {
None => ImageResponse::None,
Some(image) => ImageResponse::Loaded(Arc::new(image)),
};
self.complete_load(msg.key, image);
}
// Change state of a url from pending -> loaded.
fn complete_load(&mut self, key: LoadKey, image_response: ImageResponse) {
let pending_load = self.pending_loads.remove(&key).unwrap();
let completed_load = CompletedLoad::new(image_response.clone());
self.completed_loads.insert(pending_load.url, completed_load);
for listener in pending_load.listeners {
listener.notify(image_response.clone());
}
}
// Request an image from the cache. If the image hasn't been
// loaded/decoded yet, it will be loaded/decoded in the
// background.
fn request_image(&mut self,
url: Url,
result_chan: ImageCacheChan,
responder: Option<ImageResponder>) {
let image_listener = ImageListener::new(result_chan, responder);
// Let's avoid copying url everywhere.
let ref_url = Arc::new(url);
// Check if already completed
match self.completed_loads.get(&ref_url) {
Some(completed_load) => {
// It's already completed, return a notify straight away
image_listener.notify(completed_load.image_response.clone());
}
None => {
// Check if the load is already pending
let (cache_result, load_key, mut pending_load) = self.pending_loads.get_cached(ref_url.clone());
pending_load.add_listener(image_listener);
match cache_result {
CacheResult::Miss => {
// A new load request! Request the load from
// the resource task.
let load_data = LoadData::new((*ref_url).clone(), None);
let (action_sender, action_receiver) = ipc::channel().unwrap();
let response_target = AsyncResponseTarget {
sender: action_sender,
};
let msg = ControlMsg::Load(load_data,
LoadConsumer::Listener(response_target),
None);
let progress_sender = self.progress_sender.clone();
ROUTER.add_route(action_receiver.to_opaque(), box move |message| {
let action: ResponseAction = message.to().unwrap();
progress_sender.send(ResourceLoadInfo {
action: action,
key: load_key,
}).unwrap();
});
self.resource_task.send(msg).unwrap();
}
CacheResult::Hit => {
// Request is already on its way.
}
}
}
}
}
}
/// Create a new image cache.
pub fn new_image_cache_task(resource_task: ResourceTask) -> ImageCacheTask {
let (ipc_command_sender, ipc_command_receiver) = ipc::channel().unwrap();
let (progress_sender, progress_receiver) = channel();
let (decoder_sender, decoder_receiver) = channel();
spawn_named("ImageCacheThread".to_owned(), move || {
// Preload the placeholder image, used when images fail to load.
let mut placeholder_path = resources_dir_path();
placeholder_path.push("rippy.png");
let mut image_data = vec![];
let result = File::open(&placeholder_path).and_then(|mut file| {
file.read_to_end(&mut image_data)
});
let placeholder_image = result.ok().map(|_| {
Arc::new(load_from_memory(&image_data).unwrap())
});
// Ask the router to proxy messages received over IPC to us.
let cmd_receiver = ROUTER.route_ipc_receiver_to_new_mpsc_receiver(ipc_command_receiver);
let mut cache = ImageCache {
cmd_receiver: cmd_receiver,
progress_sender: progress_sender,
progress_receiver: progress_receiver,
decoder_sender: decoder_sender,
decoder_receiver: decoder_receiver,
task_pool: TaskPool::new(4),
pending_loads: AllPendingLoads::new(),
completed_loads: HashMap::new(),
resource_task: resource_task,
placeholder_image: placeholder_image,
};
cache.run();
});
ImageCacheTask::new(ipc_command_sender)
}