Modify ImageCacheTask to have a required decoding step

Instead of considering a fetched image ready and potentially blocking
while waiting for it to decode, instead require the decode to be requested. If
a image is not yet decoded the image cache will immediately return
ImageNotReady
This commit is contained in:
Brian Anderson 2012-08-16 18:58:00 -07:00
parent 160baa61be
commit 84c607da47
2 changed files with 233 additions and 72 deletions

View file

@ -94,7 +94,12 @@ class ImageHolder {
self.image_cache_task = image_cache_task;
// Tell the image cache we're going to be interested in this url
image_cache_task.send(image_cache_task::Prefetch(move url));
// FIXME: These two messages must be sent to prep an image for use
// but they are intended to be spread out in time. Ideally prefetch
// should be done as early as possible and decode only once we
// are sure that the image will be used.
image_cache_task.send(image_cache_task::Prefetch(copy url));
image_cache_task.send(image_cache_task::Decode(move url));
}
// This function should not be called by two tasks at the same time

View file

@ -1,4 +1,4 @@
export Msg, Prefetch, GetImage, Exit;
export Msg, Prefetch, Decode, GetImage, Exit;
export ImageResponseMsg, ImageReady, ImageNotReady;
export ImageCacheTask;
export image_cache_task;
@ -19,18 +19,21 @@ import to_str::ToStr;
enum Msg {
/// Tell the cache that we may need a particular image soon. Must be posted
/// before GetImage
/// before Decode
Prefetch(url),
/// Used be the prefetch tasks to post back image binaries
/*priv*/ StorePrefetchedImageData(url, result<Cell<~[u8]>, ()>),
/// Request an Image object for a URL
GetImage(url, Chan<ImageResponseMsg>),
/// Tell the cache to decode an image. Must be posted before GetImage
Decode(url),
/// Used by the decoder tasks to post decoded images back to the cache
/*priv*/ StoreImage(url, arc<~Image>),
/// Request an Image object for a URL
GetImage(url, Chan<ImageResponseMsg>),
/// For testing
/*priv*/ OnMsg(fn~(msg: &Msg)),
@ -45,10 +48,21 @@ enum ImageResponseMsg {
type ImageCacheTask = Chan<Msg>;
type DecoderFactory = ~fn() -> ~fn(~[u8]) -> ~Image;
fn image_cache_task(resource_task: ResourceTask) -> ImageCacheTask {
do spawn_listener |from_client| {
image_cache_task_(resource_task, default_decoder_factory)
}
fn image_cache_task_(resource_task: ResourceTask, +decoder_factory: DecoderFactory) -> ImageCacheTask {
// FIXME: Doing some dancing to avoid copying decoder_factory, our test
// version of which contains an uncopyable type which rust will currently
// copy unsoundly
let decoder_factory_cell = Cell(move decoder_factory);
do spawn_listener |from_client, move decoder_factory_cell| {
ImageCache {
resource_task: resource_task,
decoder_factory: decoder_factory_cell.take(),
from_client: from_client,
state_map: url_map(),
need_exit: none
@ -59,6 +73,8 @@ fn image_cache_task(resource_task: ResourceTask) -> ImageCacheTask {
struct ImageCache {
/// A handle to the resource task for fetching the image binaries
resource_task: ResourceTask;
/// Creates image decoders
decoder_factory: DecoderFactory;
/// The port on which we'll receive client requests
from_client: Port<Msg>;
/// The state of processsing an image for a URL
@ -68,15 +84,16 @@ struct ImageCache {
enum ImageState {
Init,
Prefetching,
Prefetching(AfterPrefetch),
Prefetched(@Cell<~[u8]>),
Decoding(@FutureData),
Decoding,
Decoded(@arc<~Image>),
Failed
}
struct FutureData {
mut waiters: ~[Chan<ImageResponseMsg>];
enum AfterPrefetch {
DoDecode,
DoNotDecode
}
#[allow(non_implicitly_copyable_typarams)]
@ -97,8 +114,9 @@ impl ImageCache {
match msg {
Prefetch(url) => self.prefetch(copy url),
StorePrefetchedImageData(url, data) => self.store_prefetched_image_data(copy url, &data),
GetImage(url, response) => self.get_image(copy url, response),
Decode(url) => self.decode(copy url),
StoreImage(url, image) => self.store_image(copy url, &image),
GetImage(url, response) => self.get_image(copy url, response),
OnMsg(handler) => msg_handlers += [copy handler],
Exit(response) => {
assert self.need_exit.is_none();
@ -113,8 +131,8 @@ impl ImageCache {
let mut can_exit = true;
for self.state_map.each_value |state| {
match state {
Prefetching => can_exit = false,
Decoding(*) => can_exit = false,
Prefetching(*) => can_exit = false,
Decoding => can_exit = false,
Init
| Prefetched(*)
@ -166,12 +184,12 @@ impl ImageCache {
#debug("image_cache_task: ended fetch for %s", (copy url).to_str());
}
self.set_state(url, Prefetching);
self.set_state(url, Prefetching(DoNotDecode));
}
Prefetching
Prefetching(*)
| Prefetched(*)
| Decoding(*)
| Decoding
| Decoded(*)
| Failed => {
// We've already begun working on this image
@ -181,21 +199,25 @@ impl ImageCache {
/*priv*/ fn store_prefetched_image_data(+url: url, data: &result<Cell<~[u8]>, ()>) {
match self.get_state(copy url) {
Prefetching => {
Prefetching(next_step) => {
match *data {
ok(data_cell) => {
let data = data_cell.take();
self.set_state(url, Prefetched(@Cell(data)));
self.set_state(copy url, Prefetched(@Cell(data)));
}
err(*) => {
self.set_state(url, Failed);
self.set_state(copy url, Failed);
}
}
if next_step == DoDecode {
self.decode(url)
}
}
Init
| Prefetched(*)
| Decoding(*)
| Decoding
| Decoded(*)
| Failed => {
fail ~"wrong state for storing prefetched image"
@ -203,43 +225,61 @@ impl ImageCache {
}
}
/*priv*/ fn decode(+url: url) {
match self.get_state(copy url) {
Init => fail ~"Decoding image before prefetch",
Prefetching(DoNotDecode) => {
// We don't have the data yet, queue up the decode
self.set_state(url, Prefetching(DoDecode))
}
Prefetching(DoDecode) => {
// We don't have the data yet, but the decode request is queued up
}
Prefetched(data_cell) => {
assert !data_cell.is_empty();
let data = data_cell.take();
let to_cache = self.from_client.chan();
let url_cell = Cell(copy url);
let decode = self.decoder_factory();
do spawn |move url_cell, move decode| {
let url = url_cell.take();
#debug("image_cache_task: started image decode for %s", url.to_str());
let image = arc(decode(data));
to_cache.send(StoreImage(copy url, clone_arc(&image)));
#debug("image_cache_task: ended image decode for %s", url.to_str());
}
self.set_state(url, Decoding);
}
Decoding
| Decoded(*)
| Failed => {
// We've already begun decoding
}
}
}
/*priv*/ fn get_image(+url: url, response: Chan<ImageResponseMsg>) {
match self.get_state(copy url) {
Init => fail ~"Request for image before prefetch",
Prefetching => {
Prefetching(DoDecode) => {
response.send(ImageNotReady);
}
Prefetched(data_cell) => {
assert !data_cell.is_empty();
Prefetching(DoNotDecode)
| Prefetched(*) => fail ~"request for image before decode",
let data = data_cell.take();
let to_cache = self.from_client.chan();
let url_cell = Cell(copy url);
do spawn |move url_cell| {
let url = url_cell.take();
#debug("image_cache_task: started image decode for %s", url.to_str());
let image = arc(~load_from_memory(data));
// Send the image to the original requester
response.send(ImageReady(clone_arc(&image)));
to_cache.send(StoreImage(copy url, clone_arc(&image)));
#debug("image_cache_task: ended image decode for %s", url.to_str());
}
let future_data = @FutureData {
waiters: ~[]
};
self.set_state(url, Decoding(future_data));
}
Decoding(future_data) => {
// We've started decoding this image but haven't recieved it back yet.
// Put this client on the wait list
vec::push(future_data.waiters, response);
Decoding => {
response.send(ImageNotReady)
}
Decoded(image) => {
@ -255,22 +295,12 @@ impl ImageCache {
/*priv*/ fn store_image(+url: url, image: &arc<~Image>) {
match self.get_state(copy url) {
Decoding(future_data) => {
let mut waiters = ~[];
waiters <-> future_data.waiters;
// Send the image to all those who requested it while
// it was being decoded
for waiters.each |waiter| {
waiter.send(ImageReady(clone_arc(image)))
}
Decoding => {
self.set_state(url, Decoded(@clone_arc(image)));
}
Init
| Prefetching
| Prefetching(*)
| Prefetched(*)
| Decoded(*)
| Failed => {
@ -317,6 +347,10 @@ fn load_image_data(+url: url, resource_task: ResourceTask) -> result<~[u8], ()>
}
}
fn default_decoder_factory() -> ~fn(~[u8]) -> ~Image {
fn~(data: ~[u8]) -> ~Image { ~load_from_memory(data) }
}
#[test]
fn should_exit_on_request() {
@ -385,6 +419,53 @@ fn should_request_url_from_resource_task_on_prefetch() {
mock_resource_task.send(resource_task::Exit);
}
#[test]
#[should_fail]
fn should_fail_if_requesting_decode_of_an_unprefetched_image() {
let mock_resource_task = do spawn_listener |_from_client| {
};
let image_cache_task = image_cache_task(mock_resource_task);
let url = make_url(~"file", none);
image_cache_task.send(Decode(url));
image_cache_task.exit();
}
#[test]
#[should_fail]
fn should_fail_if_requesting_image_before_requesting_decode() {
let mock_resource_task = do spawn_listener |from_client| {
// infer me
let from_client: Port<resource_task::ControlMsg> = from_client;
loop {
match from_client.recv() {
resource_task::Load(url, response) => {
response.send(resource_task::Done(result::ok(())));
}
resource_task::Exit => break
}
}
};
let image_cache_task = image_cache_task(mock_resource_task);
let url = make_url(~"file", none);
image_cache_task.send(Prefetch(copy url));
// no decode message
let response_port = port();
image_cache_task.send(GetImage(url, response_port.chan()));
image_cache_task.exit();
mock_resource_task.send(resource_task::Exit);
}
#[test]
fn should_not_request_url_from_resource_task_on_multiple_prefetches() {
let url_requested = port();
@ -433,6 +514,7 @@ fn should_return_image_not_ready_if_data_has_not_arrived() {
// Don't send the data until after the client requests
// the image
wait_port.recv();
response.send(resource_task::Payload(test_image_bin()));
response.send(resource_task::Done(result::ok(())));
}
resource_task::Exit => break,
@ -444,6 +526,7 @@ fn should_return_image_not_ready_if_data_has_not_arrived() {
let url = make_url(~"file", none);
image_cache_task.send(Prefetch(copy url));
image_cache_task.send(Decode(copy url));
let response_port = port();
image_cache_task.send(GetImage(url, response_port.chan()));
assert response_port.recv() == ImageNotReady;
@ -474,20 +557,21 @@ fn should_return_decoded_image_data_if_data_has_arrived() {
let image_cache_task = image_cache_task(mock_resource_task);
let url = make_url(~"file", none);
let wait_for_prefetech = port();
let wait_for_prefetech_chan = wait_for_prefetech.chan();
let wait_for_image = port();
let wait_for_image_chan = wait_for_image.chan();
image_cache_task.send(OnMsg(|msg| {
match *msg {
StorePrefetchedImageData(*) => wait_for_prefetech_chan.send(()),
StoreImage(*) => wait_for_image_chan.send(()),
_ => ()
}
}));
image_cache_task.send(Prefetch(copy url));
image_cache_task.send(Decode(copy url));
// Wait until our mock resource task has sent the image to the image cache
wait_for_prefetech_chan.recv();
wait_for_image_chan.recv();
let response_port = port();
image_cache_task.send(GetImage(url, response_port.chan()));
@ -522,20 +606,21 @@ fn should_return_decoded_image_data_for_multiple_requests() {
let image_cache_task = image_cache_task(mock_resource_task);
let url = make_url(~"file", none);
let wait_for_prefetech = port();
let wait_for_prefetech_chan = wait_for_prefetech.chan();
let wait_for_image = port();
let wait_for_image_chan = wait_for_image.chan();
image_cache_task.send(OnMsg(|msg| {
match *msg {
StorePrefetchedImageData(*) => wait_for_prefetech_chan.send(()),
StoreImage(*) => wait_for_image_chan.send(()),
_ => ()
}
}));
image_cache_task.send(Prefetch(copy url));
image_cache_task.send(Decode(copy url));
// Wait until our mock resource task has sent the image to the image cache
wait_for_prefetech.recv();
wait_for_image.recv();
for iter::repeat(2) {
let response_port = port();
@ -582,21 +667,22 @@ fn should_not_request_image_from_resource_task_if_image_is_already_available() {
let image_cache_task = image_cache_task(mock_resource_task);
let url = make_url(~"file", none);
let wait_for_prefetech = port();
let wait_for_prefetech_chan = wait_for_prefetech.chan();
let wait_for_image = port();
let wait_for_image_chan = wait_for_image.chan();
image_cache_task.send(OnMsg(|msg| {
match *msg {
StorePrefetchedImageData(*) => wait_for_prefetech_chan.send(()),
StoreImage(*) => wait_for_image_chan.send(()),
_ => ()
}
}));
image_cache_task.send(Prefetch(copy url));
image_cache_task.send(Decode(copy url));
// Wait until our mock resource task has sent the image to the image cache
image_bin_sent.recv();
wait_for_prefetech.recv();
wait_for_image.recv();
let response_port = port();
image_cache_task.send(GetImage(copy url, response_port.chan()));
@ -654,6 +740,7 @@ fn should_not_request_image_from_resource_task_if_image_fetch_already_failed() {
let url = make_url(~"file", none);
image_cache_task.send(Prefetch(copy url));
image_cache_task.send(Decode(copy url));
// Wait until our mock resource task has sent the image to the image cache
image_bin_sent.recv();
@ -663,6 +750,7 @@ fn should_not_request_image_from_resource_task_if_image_fetch_already_failed() {
response_port.recv();
image_cache_task.send(Prefetch(copy url));
image_cache_task.send(Decode(copy url));
let response_port = port();
image_cache_task.send(GetImage(url, response_port.chan()));
@ -712,6 +800,7 @@ fn should_return_not_ready_if_image_bin_cannot_be_fetched() {
}));
image_cache_task.send(Prefetch(copy url));
image_cache_task.send(Decode(copy url));
// Wait until our mock resource task has sent the image to the image cache
wait_for_prefetech.recv();
@ -761,6 +850,7 @@ fn should_return_not_ready_for_multiple_get_image_requests_if_image_bin_cannot_b
}));
image_cache_task.send(Prefetch(copy url));
image_cache_task.send(Decode(copy url));
// Wait until our mock resource task has sent the image to the image cache
wait_for_prefetech.recv();
@ -783,3 +873,69 @@ fn should_return_not_ready_for_multiple_get_image_requests_if_image_bin_cannot_b
image_cache_task.exit();
mock_resource_task.send(resource_task::Exit);
}
#[test]
fn should_return_not_ready_if_image_is_still_decoding() {
let (wait_to_decode_chan, wait_to_decode_port) = pipes::stream();
let mock_resource_task = do spawn_listener |from_client| {
// infer me
let from_client: Port<resource_task::ControlMsg> = from_client;
loop {
match from_client.recv() {
resource_task::Load(_, response) => {
response.send(resource_task::Payload(test_image_bin()));
response.send(resource_task::Done(result::ok(())));
}
resource_task::Exit => break
}
}
};
let wait_to_decode_port_cell = Cell(wait_to_decode_port);
let decoder_factory = fn~(move wait_to_decode_port_cell) -> ~fn(~[u8]) -> ~Image {
let wait_to_decode_port = wait_to_decode_port_cell.take();
fn~(data: ~[u8], move wait_to_decode_port) -> ~Image {
// Don't decode until after the client requests the image
wait_to_decode_port.recv();
~load_from_memory(data)
}
};
let image_cache_task = image_cache_task_(mock_resource_task, decoder_factory);
let url = make_url(~"file", none);
let wait_for_prefetech = port();
let wait_for_prefetech_chan = wait_for_prefetech.chan();
image_cache_task.send(OnMsg(|msg| {
match *msg {
StorePrefetchedImageData(*) => wait_for_prefetech_chan.send(()),
_ => ()
}
}));
image_cache_task.send(Prefetch(copy url));
image_cache_task.send(Decode(copy url));
// Wait until our mock resource task has sent the image to the image cache
wait_for_prefetech.recv();
// Make the request
let response_port = port();
image_cache_task.send(GetImage(url, response_port.chan()));
match response_port.recv() {
ImageNotReady => (),
_ => fail
}
// Now decode
wait_to_decode_chan.send(());
image_cache_task.exit();
mock_resource_task.send(resource_task::Exit);
}