Perform the image prefetch in a different task

Having the prefetch task send back the results immediately will
provide an appropriate event to signel clients that want to know
when images become available.
This commit is contained in:
Brian Anderson 2012-08-12 21:41:53 -07:00
parent fa8fd4d243
commit fe4b1c92dd
2 changed files with 248 additions and 97 deletions

View file

@ -8,7 +8,7 @@ import resource::resource_task;
import resource::resource_task::{ResourceTask}; import resource::resource_task::{ResourceTask};
import std::net::url::url; import std::net::url::url;
import resource::image_cache_task; import resource::image_cache_task;
import image_cache_task::{ImageCacheTask, image_cache_task}; import image_cache_task::{ImageCacheTask, image_cache_task, ImageCacheTaskClient};
import pipes::{port, chan}; import pipes::{port, chan};
@ -67,7 +67,7 @@ class Engine<S:Sink send copy> {
self.renderer.send(renderer::ExitMsg(response_chan)); self.renderer.send(renderer::ExitMsg(response_chan));
response_port.recv(); response_port.recv();
self.image_cache_task.send(image_cache_task::Exit); self.image_cache_task.exit();
self.resource_task.send(resource_task::Exit); self.resource_task.send(resource_task::Exit);
sender.send(()); sender.send(());

View file

@ -2,6 +2,7 @@ export Msg, Prefetch, GetImage, Exit;
export ImageResponseMsg, ImageReady, ImageNotReady; export ImageResponseMsg, ImageReady, ImageNotReady;
export ImageCacheTask; export ImageCacheTask;
export image_cache_task; export image_cache_task;
export ImageCacheTaskClient;
import image::base::{Image, load_from_memory, test_image_bin}; import image::base::{Image, load_from_memory, test_image_bin};
import std::net::url::url; import std::net::url::url;
@ -13,16 +14,28 @@ import resource_task::ResourceTask;
import std::arc::arc; import std::arc::arc;
import clone_arc = std::arc::clone; import clone_arc = std::arc::clone;
import std::cell::Cell; import std::cell::Cell;
import result::{result, ok, err};
import to_str::to_str;
enum Msg { enum Msg {
/// Tell the cache that we may need a particular image soon. Must be posted /// Tell the cache that we may need a particular image soon. Must be posted
/// before GetImage /// before GetImage
Prefetch(url), Prefetch(url),
/// Used be the prefetch tasks to post back image binaries
/*priv*/ StorePrefetchedImageData(url, result<Cell<~[u8]>, ()>),
/// Request an Image object for a URL /// Request an Image object for a URL
GetImage(url, chan<ImageResponseMsg>), GetImage(url, chan<ImageResponseMsg>),
/// Used by the decoder tasks to post decoded images back to the cache /// Used by the decoder tasks to post decoded images back to the cache
StoreImage(url, arc<~Image>), /*priv*/ StoreImage(url, arc<~Image>),
Exit
/// For testing
/*priv*/ OnMsg(fn~(msg: &Msg)),
/// Clients must wait for a response before shutting down the ResourceTask
Exit(chan<()>)
} }
enum ImageResponseMsg { enum ImageResponseMsg {
@ -37,7 +50,8 @@ fn image_cache_task(resource_task: ResourceTask) -> ImageCacheTask {
ImageCache { ImageCache {
resource_task: resource_task, resource_task: resource_task,
from_client: from_client, from_client: from_client,
state_map: url_map() state_map: url_map(),
need_exit: none
}.run(); }.run();
} }
} }
@ -49,21 +63,18 @@ struct ImageCache {
from_client: port<Msg>; from_client: port<Msg>;
/// The state of processsing an image for a URL /// The state of processsing an image for a URL
state_map: UrlMap<ImageState>; state_map: UrlMap<ImageState>;
mut need_exit: option<chan<()>>;
} }
enum ImageState { enum ImageState {
Init, Init,
Prefetching(@PrefetchData), Prefetching,
Prefetched(@Cell<~[u8]>),
Decoding(@FutureData), Decoding(@FutureData),
Decoded(@arc<~Image>), Decoded(@arc<~Image>),
Failed Failed
} }
struct PrefetchData {
response_port: port<resource_task::ProgressMsg>;
mut data: ~[u8];
}
struct FutureData { struct FutureData {
mut waiters: ~[chan<ImageResponseMsg>]; mut waiters: ~[chan<ImageResponseMsg>];
} }
@ -73,13 +84,51 @@ impl ImageCache {
fn run() { fn run() {
let mut msg_handlers: ~[fn~(msg: &Msg)] = ~[];
loop { loop {
let msg = self.from_client.recv();
for msg_handlers.each |handler| { handler(&msg) }
#debug("image_cache_task: received: %?", msg);
// FIXME: Need to move out the urls // FIXME: Need to move out the urls
match self.from_client.recv() { match msg {
Prefetch(url) => self.prefetch(copy url), Prefetch(url) => self.prefetch(copy url),
StorePrefetchedImageData(url, data) => self.store_prefetched_image_data(copy url, &data),
GetImage(url, response) => self.get_image(copy url, response), GetImage(url, response) => self.get_image(copy url, response),
StoreImage(url, image) => self.store_image(copy url, &image), StoreImage(url, image) => self.store_image(copy url, &image),
Exit => break OnMsg(handler) => msg_handlers += [copy handler],
Exit(response) => {
assert self.need_exit.is_none();
self.need_exit = some(response);
}
}
match copy self.need_exit {
some(response) => {
// Wait until we have no outstanding requests and subtasks
// before exiting
let mut can_exit = true;
for self.state_map.each_value |state| {
match state {
Prefetching => can_exit = false,
Decoding(*) => can_exit = false,
Init
| Prefetched(*)
| Decoded(*)
| Failed => ()
}
}
if can_exit {
response.send(());
break;
}
}
none => ()
} }
} }
} }
@ -98,18 +147,30 @@ impl ImageCache {
/*priv*/ fn prefetch(+url: url) { /*priv*/ fn prefetch(+url: url) {
match self.get_state(copy url) { match self.get_state(copy url) {
Init => { Init => {
let response_port = port(); let to_cache = self.from_client.chan();
self.resource_task.send(resource_task::Load(copy url, response_port.chan())); let resource_task = self.resource_task;
let url_cell = Cell(copy url);
let prefetch_data = @PrefetchData { do spawn |move url_cell| {
response_port: response_port, let url = url_cell.take();
data: ~[] #debug("image_cache_task: started fetch for %s", url.to_str());
};
self.set_state(url, Prefetching(prefetch_data)); let image = load_image_data(copy url, resource_task);
let result = if image.is_ok() {
ok(Cell(result::unwrap(image)))
} else {
err(())
};
to_cache.send(StorePrefetchedImageData(copy url, result));
#debug("image_cache_task: ended fetch for %s", (copy url).to_str());
}
self.set_state(url, Prefetching);
} }
Prefetching(*) Prefetching
| Prefetched(*)
| Decoding(*) | Decoding(*)
| Decoded(*) | Decoded(*)
| Failed => { | Failed => {
@ -118,58 +179,61 @@ impl ImageCache {
} }
} }
/*priv*/ fn store_prefetched_image_data(+url: url, data: &result<Cell<~[u8]>, ()>) {
match self.get_state(copy url) {
Prefetching => {
match *data {
ok(data_cell) => {
let data = data_cell.take();
self.set_state(url, Prefetched(@Cell(data)));
}
err(*) => {
self.set_state(url, Failed);
}
}
}
Init
| Prefetched(*)
| Decoding(*)
| Decoded(*)
| Failed => {
fail ~"wrong state for storing prefetched image"
}
}
}
/*priv*/ fn get_image(+url: url, response: chan<ImageResponseMsg>) { /*priv*/ fn get_image(+url: url, response: chan<ImageResponseMsg>) {
match self.get_state(copy url) { match self.get_state(copy url) {
Init => fail ~"Request for image before prefetch", Init => fail ~"Request for image before prefetch",
Prefetching(prefetch_data) => { Prefetching => {
response.send(ImageNotReady);
}
let mut image_sent = false; Prefetched(data_cell) => {
assert !data_cell.is_empty();
while prefetch_data.response_port.peek() { let data = data_cell.take();
match prefetch_data.response_port.recv() { let to_cache = self.from_client.chan();
resource_task::Payload(data) => { let url_cell = Cell(copy url);
prefetch_data.data += data;
}
resource_task::Done(result::ok(*)) => {
// We've got the entire image binary
let mut data = ~[];
data <-> prefetch_data.data;
let data <- data; // freeze for capture
let to_cache = self.from_client.chan(); do spawn |move url_cell| {
let url = url_cell.take();
let url_cell = Cell(copy url); #debug("image_cache_task: started image decode for %s", url.to_str());
do spawn |move url_cell| { let image = arc(~load_from_memory(data));
let image = arc(~load_from_memory(data)); // Send the image to the original requester
// Send the image to the original requester response.send(ImageReady(clone_arc(&image)));
response.send(ImageReady(clone_arc(&image))); to_cache.send(StoreImage(copy url, clone_arc(&image)));
to_cache.send(StoreImage(url_cell.take(), clone_arc(&image))); #debug("image_cache_task: ended image decode for %s", url.to_str());
}
let future_data = @FutureData {
waiters: ~[]
};
self.set_state(url, Decoding(future_data));
image_sent = true;
break;
}
resource_task::Done(result::err(*)) => {
// There was an error loading the image binary. Put it
// in the error map so we remember the error for future
// requests.
self.set_state(url, Failed);
break;
}
}
} }
if !image_sent { let future_data = @FutureData {
response.send(ImageNotReady); waiters: ~[]
} };
self.set_state(url, Decoding(future_data));
} }
Decoding(future_data) => { Decoding(future_data) => {
@ -206,7 +270,8 @@ impl ImageCache {
} }
Init Init
| Prefetching(*) | Prefetching
| Prefetched(*)
| Decoded(*) | Decoded(*)
| Failed => { | Failed => {
fail ~"incorrect state in store_image" fail ~"incorrect state in store_image"
@ -216,6 +281,42 @@ impl ImageCache {
} }
} }
trait ImageCacheTaskClient {
fn exit();
}
impl ImageCacheTask: ImageCacheTaskClient {
fn exit() {
let response = port();
self.send(Exit(response.chan()));
response.recv();
}
}
fn load_image_data(+url: url, resource_task: ResourceTask) -> result<~[u8], ()> {
let response_port = port();
resource_task.send(resource_task::Load(url, response_port.chan()));
let mut image_data = ~[];
loop {
match response_port.recv() {
resource_task::Payload(data) => {
image_data += data;
}
resource_task::Done(result::ok(*)) => {
return ok(image_data);
}
resource_task::Done(result::err(*)) => {
return err(());
}
}
}
}
#[test] #[test]
fn should_exit_on_request() { fn should_exit_on_request() {
@ -235,7 +336,7 @@ fn should_exit_on_request() {
let image_cache_task = image_cache_task(mock_resource_task); let image_cache_task = image_cache_task(mock_resource_task);
let _url = make_url(~"file", none); let _url = make_url(~"file", none);
image_cache_task.send(Exit); image_cache_task.exit();
mock_resource_task.send(resource_task::Exit); mock_resource_task.send(resource_task::Exit);
} }
@ -266,7 +367,10 @@ fn should_request_url_from_resource_task_on_prefetch() {
loop { loop {
match from_client.recv() { match from_client.recv() {
resource_task::Load(url, _) => url_requested_chan.send(()), resource_task::Load(url, response) => {
url_requested_chan.send(());
response.send(resource_task::Done(result::ok(())));
}
resource_task::Exit => break resource_task::Exit => break
} }
} }
@ -277,7 +381,7 @@ fn should_request_url_from_resource_task_on_prefetch() {
image_cache_task.send(Prefetch(url)); image_cache_task.send(Prefetch(url));
url_requested.recv(); url_requested.recv();
image_cache_task.send(Exit); image_cache_task.exit();
mock_resource_task.send(resource_task::Exit); mock_resource_task.send(resource_task::Exit);
} }
@ -293,7 +397,10 @@ fn should_not_request_url_from_resource_task_on_multiple_prefetches() {
loop { loop {
match from_client.recv() { match from_client.recv() {
resource_task::Load(url, _) => url_requested_chan.send(()), resource_task::Load(url, response) => {
url_requested_chan.send(());
response.send(resource_task::Done(result::ok(())));
}
resource_task::Exit => break resource_task::Exit => break
} }
} }
@ -305,13 +412,16 @@ fn should_not_request_url_from_resource_task_on_multiple_prefetches() {
image_cache_task.send(Prefetch(copy url)); image_cache_task.send(Prefetch(copy url));
image_cache_task.send(Prefetch(url)); image_cache_task.send(Prefetch(url));
url_requested.recv(); url_requested.recv();
image_cache_task.send(Exit); image_cache_task.exit();
mock_resource_task.send(resource_task::Exit); mock_resource_task.send(resource_task::Exit);
assert !url_requested.peek() assert !url_requested.peek()
} }
#[test] #[test]
fn should_return_image_not_ready_if_data_has_not_arrived() { fn should_return_image_not_ready_if_data_has_not_arrived() {
let (wait_chan, wait_port) = pipes::stream();
let mock_resource_task = do spawn_listener |from_client| { let mock_resource_task = do spawn_listener |from_client| {
// infer me // infer me
@ -319,8 +429,13 @@ fn should_return_image_not_ready_if_data_has_not_arrived() {
loop { loop {
match from_client.recv() { match from_client.recv() {
resource_task::Load(url, response) => {
// Don't send the data until after the client requests
// the image
wait_port.recv();
response.send(resource_task::Done(result::ok(())));
}
resource_task::Exit => break, resource_task::Exit => break,
_ => ()
} }
} }
}; };
@ -332,16 +447,14 @@ fn should_return_image_not_ready_if_data_has_not_arrived() {
let response_port = port(); let response_port = port();
image_cache_task.send(GetImage(url, response_port.chan())); image_cache_task.send(GetImage(url, response_port.chan()));
assert response_port.recv() == ImageNotReady; assert response_port.recv() == ImageNotReady;
image_cache_task.send(Exit); wait_chan.send(());
image_cache_task.exit();
mock_resource_task.send(resource_task::Exit); mock_resource_task.send(resource_task::Exit);
} }
#[test] #[test]
fn should_return_decoded_image_data_if_data_has_arrived() { fn should_return_decoded_image_data_if_data_has_arrived() {
let image_bin_sent = port();
let image_bin_sent_chan = image_bin_sent.chan();
let mock_resource_task = do spawn_listener |from_client| { let mock_resource_task = do spawn_listener |from_client| {
// infer me // infer me
@ -352,7 +465,6 @@ fn should_return_decoded_image_data_if_data_has_arrived() {
resource_task::Load(_, response) => { resource_task::Load(_, response) => {
response.send(resource_task::Payload(test_image_bin())); response.send(resource_task::Payload(test_image_bin()));
response.send(resource_task::Done(result::ok(()))); response.send(resource_task::Done(result::ok(())));
image_bin_sent_chan.send(());
} }
resource_task::Exit => break resource_task::Exit => break
} }
@ -362,10 +474,20 @@ fn should_return_decoded_image_data_if_data_has_arrived() {
let image_cache_task = image_cache_task(mock_resource_task); let image_cache_task = image_cache_task(mock_resource_task);
let url = make_url(~"file", none); let url = make_url(~"file", none);
let wait_for_prefetech = port();
let wait_for_prefetech_chan = wait_for_prefetech.chan();
image_cache_task.send(OnMsg(|msg| {
match *msg {
StorePrefetchedImageData(*) => wait_for_prefetech_chan.send(()),
_ => ()
}
}));
image_cache_task.send(Prefetch(copy url)); image_cache_task.send(Prefetch(copy url));
// Wait until our mock resource task has sent the image to the image cache // Wait until our mock resource task has sent the image to the image cache
image_bin_sent.recv(); wait_for_prefetech_chan.recv();
let response_port = port(); let response_port = port();
image_cache_task.send(GetImage(url, response_port.chan())); image_cache_task.send(GetImage(url, response_port.chan()));
@ -374,16 +496,13 @@ fn should_return_decoded_image_data_if_data_has_arrived() {
_ => fail _ => fail
} }
image_cache_task.send(Exit); image_cache_task.exit();
mock_resource_task.send(resource_task::Exit); mock_resource_task.send(resource_task::Exit);
} }
#[test] #[test]
fn should_return_decoded_image_data_for_multiple_requests() { fn should_return_decoded_image_data_for_multiple_requests() {
let image_bin_sent = port();
let image_bin_sent_chan = image_bin_sent.chan();
let mock_resource_task = do spawn_listener |from_client| { let mock_resource_task = do spawn_listener |from_client| {
// infer me // infer me
@ -394,7 +513,6 @@ fn should_return_decoded_image_data_for_multiple_requests() {
resource_task::Load(_, response) => { resource_task::Load(_, response) => {
response.send(resource_task::Payload(test_image_bin())); response.send(resource_task::Payload(test_image_bin()));
response.send(resource_task::Done(result::ok(()))); response.send(resource_task::Done(result::ok(())));
image_bin_sent_chan.send(());
} }
resource_task::Exit => break resource_task::Exit => break
} }
@ -404,10 +522,20 @@ fn should_return_decoded_image_data_for_multiple_requests() {
let image_cache_task = image_cache_task(mock_resource_task); let image_cache_task = image_cache_task(mock_resource_task);
let url = make_url(~"file", none); let url = make_url(~"file", none);
let wait_for_prefetech = port();
let wait_for_prefetech_chan = wait_for_prefetech.chan();
image_cache_task.send(OnMsg(|msg| {
match *msg {
StorePrefetchedImageData(*) => wait_for_prefetech_chan.send(()),
_ => ()
}
}));
image_cache_task.send(Prefetch(copy url)); image_cache_task.send(Prefetch(copy url));
// Wait until our mock resource task has sent the image to the image cache // Wait until our mock resource task has sent the image to the image cache
image_bin_sent.recv(); wait_for_prefetech.recv();
for iter::repeat(2) { for iter::repeat(2) {
let response_port = port(); let response_port = port();
@ -418,7 +546,7 @@ fn should_return_decoded_image_data_for_multiple_requests() {
} }
} }
image_cache_task.send(Exit); image_cache_task.exit();
mock_resource_task.send(resource_task::Exit); mock_resource_task.send(resource_task::Exit);
} }
@ -454,10 +582,21 @@ fn should_not_request_image_from_resource_task_if_image_is_already_available() {
let image_cache_task = image_cache_task(mock_resource_task); let image_cache_task = image_cache_task(mock_resource_task);
let url = make_url(~"file", none); let url = make_url(~"file", none);
let wait_for_prefetech = port();
let wait_for_prefetech_chan = wait_for_prefetech.chan();
image_cache_task.send(OnMsg(|msg| {
match *msg {
StorePrefetchedImageData(*) => wait_for_prefetech_chan.send(()),
_ => ()
}
}));
image_cache_task.send(Prefetch(copy url)); image_cache_task.send(Prefetch(copy url));
// Wait until our mock resource task has sent the image to the image cache // Wait until our mock resource task has sent the image to the image cache
image_bin_sent.recv(); image_bin_sent.recv();
wait_for_prefetech.recv();
let response_port = port(); let response_port = port();
image_cache_task.send(GetImage(copy url, response_port.chan())); image_cache_task.send(GetImage(copy url, response_port.chan()));
@ -472,7 +611,7 @@ fn should_not_request_image_from_resource_task_if_image_is_already_available() {
image_cache_task.send(GetImage(url, response_port.chan())); image_cache_task.send(GetImage(url, response_port.chan()));
response_port.recv(); response_port.recv();
image_cache_task.send(Exit); image_cache_task.exit();
mock_resource_task.send(resource_task::Exit); mock_resource_task.send(resource_task::Exit);
resource_task_exited.recv(); resource_task_exited.recv();
@ -529,7 +668,7 @@ fn should_not_request_image_from_resource_task_if_image_fetch_already_failed() {
image_cache_task.send(GetImage(url, response_port.chan())); image_cache_task.send(GetImage(url, response_port.chan()));
response_port.recv(); response_port.recv();
image_cache_task.send(Exit); image_cache_task.exit();
mock_resource_task.send(resource_task::Exit); mock_resource_task.send(resource_task::Exit);
resource_task_exited.recv(); resource_task_exited.recv();
@ -542,9 +681,6 @@ fn should_not_request_image_from_resource_task_if_image_fetch_already_failed() {
#[test] #[test]
fn should_return_not_ready_if_image_bin_cannot_be_fetched() { fn should_return_not_ready_if_image_bin_cannot_be_fetched() {
let image_bin_sent = port();
let image_bin_sent_chan = image_bin_sent.chan();
let mock_resource_task = do spawn_listener |from_client| { let mock_resource_task = do spawn_listener |from_client| {
// infer me // infer me
@ -556,7 +692,6 @@ fn should_return_not_ready_if_image_bin_cannot_be_fetched() {
response.send(resource_task::Payload(test_image_bin())); response.send(resource_task::Payload(test_image_bin()));
// ERROR fetching image // ERROR fetching image
response.send(resource_task::Done(result::err(()))); response.send(resource_task::Done(result::err(())));
image_bin_sent_chan.send(());
} }
resource_task::Exit => break resource_task::Exit => break
} }
@ -566,10 +701,20 @@ fn should_return_not_ready_if_image_bin_cannot_be_fetched() {
let image_cache_task = image_cache_task(mock_resource_task); let image_cache_task = image_cache_task(mock_resource_task);
let url = make_url(~"file", none); let url = make_url(~"file", none);
let wait_for_prefetech = port();
let wait_for_prefetech_chan = wait_for_prefetech.chan();
image_cache_task.send(OnMsg(|msg| {
match *msg {
StorePrefetchedImageData(*) => wait_for_prefetech_chan.send(()),
_ => ()
}
}));
image_cache_task.send(Prefetch(copy url)); image_cache_task.send(Prefetch(copy url));
// Wait until our mock resource task has sent the image to the image cache // Wait until our mock resource task has sent the image to the image cache
image_bin_sent.recv(); wait_for_prefetech.recv();
let response_port = port(); let response_port = port();
image_cache_task.send(GetImage(url, response_port.chan())); image_cache_task.send(GetImage(url, response_port.chan()));
@ -578,16 +723,13 @@ fn should_return_not_ready_if_image_bin_cannot_be_fetched() {
_ => fail _ => fail
} }
image_cache_task.send(Exit); image_cache_task.exit();
mock_resource_task.send(resource_task::Exit); mock_resource_task.send(resource_task::Exit);
} }
#[test] #[test]
fn should_return_not_ready_for_multiple_get_image_requests_if_image_bin_cannot_be_fetched() { fn should_return_not_ready_for_multiple_get_image_requests_if_image_bin_cannot_be_fetched() {
let image_bin_sent = port();
let image_bin_sent_chan = image_bin_sent.chan();
let mock_resource_task = do spawn_listener |from_client| { let mock_resource_task = do spawn_listener |from_client| {
// infer me // infer me
@ -599,7 +741,6 @@ fn should_return_not_ready_for_multiple_get_image_requests_if_image_bin_cannot_b
response.send(resource_task::Payload(test_image_bin())); response.send(resource_task::Payload(test_image_bin()));
// ERROR fetching image // ERROR fetching image
response.send(resource_task::Done(result::err(()))); response.send(resource_task::Done(result::err(())));
image_bin_sent_chan.send(());
} }
resource_task::Exit => break resource_task::Exit => break
} }
@ -609,10 +750,20 @@ fn should_return_not_ready_for_multiple_get_image_requests_if_image_bin_cannot_b
let image_cache_task = image_cache_task(mock_resource_task); let image_cache_task = image_cache_task(mock_resource_task);
let url = make_url(~"file", none); let url = make_url(~"file", none);
let wait_for_prefetech = port();
let wait_for_prefetech_chan = wait_for_prefetech.chan();
image_cache_task.send(OnMsg(|msg| {
match *msg {
StorePrefetchedImageData(*) => wait_for_prefetech_chan.send(()),
_ => ()
}
}));
image_cache_task.send(Prefetch(copy url)); image_cache_task.send(Prefetch(copy url));
// Wait until our mock resource task has sent the image to the image cache // Wait until our mock resource task has sent the image to the image cache
image_bin_sent.recv(); wait_for_prefetech.recv();
let response_port = port(); let response_port = port();
image_cache_task.send(GetImage(copy url, response_port.chan())); image_cache_task.send(GetImage(copy url, response_port.chan()));
@ -629,6 +780,6 @@ fn should_return_not_ready_for_multiple_get_image_requests_if_image_bin_cannot_b
_ => fail _ => fail
} }
image_cache_task.send(Exit); image_cache_task.exit();
mock_resource_task.send(resource_task::Exit); mock_resource_task.send(resource_task::Exit);
} }