From fe4b1c92dd37ff7abe0d05721145cfa2c7baa55f Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Sun, 12 Aug 2012 21:41:53 -0700 Subject: [PATCH] Perform the image prefetch in a different task Having the prefetch task send back the results immediately will provide an appropriate event to signel clients that want to know when images become available. --- src/servo/engine.rs | 4 +- src/servo/resource/image_cache_task.rs | 341 ++++++++++++++++++------- 2 files changed, 248 insertions(+), 97 deletions(-) diff --git a/src/servo/engine.rs b/src/servo/engine.rs index 8e1110fdadb..28ca5e5d051 100644 --- a/src/servo/engine.rs +++ b/src/servo/engine.rs @@ -8,7 +8,7 @@ import resource::resource_task; import resource::resource_task::{ResourceTask}; import std::net::url::url; import resource::image_cache_task; -import image_cache_task::{ImageCacheTask, image_cache_task}; +import image_cache_task::{ImageCacheTask, image_cache_task, ImageCacheTaskClient}; import pipes::{port, chan}; @@ -67,7 +67,7 @@ class Engine { self.renderer.send(renderer::ExitMsg(response_chan)); response_port.recv(); - self.image_cache_task.send(image_cache_task::Exit); + self.image_cache_task.exit(); self.resource_task.send(resource_task::Exit); sender.send(()); diff --git a/src/servo/resource/image_cache_task.rs b/src/servo/resource/image_cache_task.rs index ba51fa5786a..a9b0792a090 100644 --- a/src/servo/resource/image_cache_task.rs +++ b/src/servo/resource/image_cache_task.rs @@ -2,6 +2,7 @@ export Msg, Prefetch, GetImage, Exit; export ImageResponseMsg, ImageReady, ImageNotReady; export ImageCacheTask; export image_cache_task; +export ImageCacheTaskClient; import image::base::{Image, load_from_memory, test_image_bin}; import std::net::url::url; @@ -13,16 +14,28 @@ import resource_task::ResourceTask; import std::arc::arc; import clone_arc = std::arc::clone; import std::cell::Cell; +import result::{result, ok, err}; +import to_str::to_str; enum Msg { /// Tell the cache that we may need a particular image soon. Must be posted /// before GetImage Prefetch(url), + + /// Used be the prefetch tasks to post back image binaries + /*priv*/ StorePrefetchedImageData(url, result, ()>), + /// Request an Image object for a URL GetImage(url, chan), + /// Used by the decoder tasks to post decoded images back to the cache - StoreImage(url, arc<~Image>), - Exit + /*priv*/ StoreImage(url, arc<~Image>), + + /// For testing + /*priv*/ OnMsg(fn~(msg: &Msg)), + + /// Clients must wait for a response before shutting down the ResourceTask + Exit(chan<()>) } enum ImageResponseMsg { @@ -37,7 +50,8 @@ fn image_cache_task(resource_task: ResourceTask) -> ImageCacheTask { ImageCache { resource_task: resource_task, from_client: from_client, - state_map: url_map() + state_map: url_map(), + need_exit: none }.run(); } } @@ -49,21 +63,18 @@ struct ImageCache { from_client: port; /// The state of processsing an image for a URL state_map: UrlMap; + mut need_exit: option>; } enum ImageState { Init, - Prefetching(@PrefetchData), + Prefetching, + Prefetched(@Cell<~[u8]>), Decoding(@FutureData), Decoded(@arc<~Image>), Failed } -struct PrefetchData { - response_port: port; - mut data: ~[u8]; -} - struct FutureData { mut waiters: ~[chan]; } @@ -73,13 +84,51 @@ impl ImageCache { fn run() { + let mut msg_handlers: ~[fn~(msg: &Msg)] = ~[]; + loop { + let msg = self.from_client.recv(); + + for msg_handlers.each |handler| { handler(&msg) } + + #debug("image_cache_task: received: %?", msg); + // FIXME: Need to move out the urls - match self.from_client.recv() { + match msg { Prefetch(url) => self.prefetch(copy url), + StorePrefetchedImageData(url, data) => self.store_prefetched_image_data(copy url, &data), GetImage(url, response) => self.get_image(copy url, response), StoreImage(url, image) => self.store_image(copy url, &image), - Exit => break + OnMsg(handler) => msg_handlers += [copy handler], + Exit(response) => { + assert self.need_exit.is_none(); + self.need_exit = some(response); + } + } + + match copy self.need_exit { + some(response) => { + // Wait until we have no outstanding requests and subtasks + // before exiting + let mut can_exit = true; + for self.state_map.each_value |state| { + match state { + Prefetching => can_exit = false, + Decoding(*) => can_exit = false, + + Init + | Prefetched(*) + | Decoded(*) + | Failed => () + } + } + + if can_exit { + response.send(()); + break; + } + } + none => () } } } @@ -98,18 +147,30 @@ impl ImageCache { /*priv*/ fn prefetch(+url: url) { match self.get_state(copy url) { Init => { - let response_port = port(); - self.resource_task.send(resource_task::Load(copy url, response_port.chan())); + let to_cache = self.from_client.chan(); + let resource_task = self.resource_task; + let url_cell = Cell(copy url); - let prefetch_data = @PrefetchData { - response_port: response_port, - data: ~[] - }; + do spawn |move url_cell| { + let url = url_cell.take(); + #debug("image_cache_task: started fetch for %s", url.to_str()); - self.set_state(url, Prefetching(prefetch_data)); + let image = load_image_data(copy url, resource_task); + + let result = if image.is_ok() { + ok(Cell(result::unwrap(image))) + } else { + err(()) + }; + to_cache.send(StorePrefetchedImageData(copy url, result)); + #debug("image_cache_task: ended fetch for %s", (copy url).to_str()); + } + + self.set_state(url, Prefetching); } - Prefetching(*) + Prefetching + | Prefetched(*) | Decoding(*) | Decoded(*) | Failed => { @@ -118,58 +179,61 @@ impl ImageCache { } } + /*priv*/ fn store_prefetched_image_data(+url: url, data: &result, ()>) { + match self.get_state(copy url) { + Prefetching => { + match *data { + ok(data_cell) => { + let data = data_cell.take(); + self.set_state(url, Prefetched(@Cell(data))); + } + err(*) => { + self.set_state(url, Failed); + } + } + } + + Init + | Prefetched(*) + | Decoding(*) + | Decoded(*) + | Failed => { + fail ~"wrong state for storing prefetched image" + } + } + } + /*priv*/ fn get_image(+url: url, response: chan) { match self.get_state(copy url) { Init => fail ~"Request for image before prefetch", - Prefetching(prefetch_data) => { + Prefetching => { + response.send(ImageNotReady); + } - let mut image_sent = false; + Prefetched(data_cell) => { + assert !data_cell.is_empty(); - while prefetch_data.response_port.peek() { - match prefetch_data.response_port.recv() { - resource_task::Payload(data) => { - prefetch_data.data += data; - } - resource_task::Done(result::ok(*)) => { - // We've got the entire image binary - let mut data = ~[]; - data <-> prefetch_data.data; - let data <- data; // freeze for capture + let data = data_cell.take(); + let to_cache = self.from_client.chan(); + let url_cell = Cell(copy url); - let to_cache = self.from_client.chan(); - - let url_cell = Cell(copy url); - do spawn |move url_cell| { - let image = arc(~load_from_memory(data)); - // Send the image to the original requester - response.send(ImageReady(clone_arc(&image))); - to_cache.send(StoreImage(url_cell.take(), clone_arc(&image))); - } - - let future_data = @FutureData { - waiters: ~[] - }; - - self.set_state(url, Decoding(future_data)); - - image_sent = true; - break; - } - resource_task::Done(result::err(*)) => { - // There was an error loading the image binary. Put it - // in the error map so we remember the error for future - // requests. - self.set_state(url, Failed); - break; - } - } + do spawn |move url_cell| { + let url = url_cell.take(); + #debug("image_cache_task: started image decode for %s", url.to_str()); + let image = arc(~load_from_memory(data)); + // Send the image to the original requester + response.send(ImageReady(clone_arc(&image))); + to_cache.send(StoreImage(copy url, clone_arc(&image))); + #debug("image_cache_task: ended image decode for %s", url.to_str()); } - if !image_sent { - response.send(ImageNotReady); - } + let future_data = @FutureData { + waiters: ~[] + }; + + self.set_state(url, Decoding(future_data)); } Decoding(future_data) => { @@ -206,7 +270,8 @@ impl ImageCache { } Init - | Prefetching(*) + | Prefetching + | Prefetched(*) | Decoded(*) | Failed => { fail ~"incorrect state in store_image" @@ -216,6 +281,42 @@ impl ImageCache { } } + +trait ImageCacheTaskClient { + fn exit(); +} + +impl ImageCacheTask: ImageCacheTaskClient { + + fn exit() { + let response = port(); + self.send(Exit(response.chan())); + response.recv(); + } + +} + +fn load_image_data(+url: url, resource_task: ResourceTask) -> result<~[u8], ()> { + let response_port = port(); + resource_task.send(resource_task::Load(url, response_port.chan())); + + let mut image_data = ~[]; + + loop { + match response_port.recv() { + resource_task::Payload(data) => { + image_data += data; + } + resource_task::Done(result::ok(*)) => { + return ok(image_data); + } + resource_task::Done(result::err(*)) => { + return err(()); + } + } + } +} + #[test] fn should_exit_on_request() { @@ -235,7 +336,7 @@ fn should_exit_on_request() { let image_cache_task = image_cache_task(mock_resource_task); let _url = make_url(~"file", none); - image_cache_task.send(Exit); + image_cache_task.exit(); mock_resource_task.send(resource_task::Exit); } @@ -266,7 +367,10 @@ fn should_request_url_from_resource_task_on_prefetch() { loop { match from_client.recv() { - resource_task::Load(url, _) => url_requested_chan.send(()), + resource_task::Load(url, response) => { + url_requested_chan.send(()); + response.send(resource_task::Done(result::ok(()))); + } resource_task::Exit => break } } @@ -277,7 +381,7 @@ fn should_request_url_from_resource_task_on_prefetch() { image_cache_task.send(Prefetch(url)); url_requested.recv(); - image_cache_task.send(Exit); + image_cache_task.exit(); mock_resource_task.send(resource_task::Exit); } @@ -293,7 +397,10 @@ fn should_not_request_url_from_resource_task_on_multiple_prefetches() { loop { match from_client.recv() { - resource_task::Load(url, _) => url_requested_chan.send(()), + resource_task::Load(url, response) => { + url_requested_chan.send(()); + response.send(resource_task::Done(result::ok(()))); + } resource_task::Exit => break } } @@ -305,13 +412,16 @@ fn should_not_request_url_from_resource_task_on_multiple_prefetches() { image_cache_task.send(Prefetch(copy url)); image_cache_task.send(Prefetch(url)); url_requested.recv(); - image_cache_task.send(Exit); + image_cache_task.exit(); mock_resource_task.send(resource_task::Exit); assert !url_requested.peek() } #[test] fn should_return_image_not_ready_if_data_has_not_arrived() { + + let (wait_chan, wait_port) = pipes::stream(); + let mock_resource_task = do spawn_listener |from_client| { // infer me @@ -319,8 +429,13 @@ fn should_return_image_not_ready_if_data_has_not_arrived() { loop { match from_client.recv() { + resource_task::Load(url, response) => { + // Don't send the data until after the client requests + // the image + wait_port.recv(); + response.send(resource_task::Done(result::ok(()))); + } resource_task::Exit => break, - _ => () } } }; @@ -332,16 +447,14 @@ fn should_return_image_not_ready_if_data_has_not_arrived() { let response_port = port(); image_cache_task.send(GetImage(url, response_port.chan())); assert response_port.recv() == ImageNotReady; - image_cache_task.send(Exit); + wait_chan.send(()); + image_cache_task.exit(); mock_resource_task.send(resource_task::Exit); } #[test] fn should_return_decoded_image_data_if_data_has_arrived() { - let image_bin_sent = port(); - let image_bin_sent_chan = image_bin_sent.chan(); - let mock_resource_task = do spawn_listener |from_client| { // infer me @@ -352,7 +465,6 @@ fn should_return_decoded_image_data_if_data_has_arrived() { resource_task::Load(_, response) => { response.send(resource_task::Payload(test_image_bin())); response.send(resource_task::Done(result::ok(()))); - image_bin_sent_chan.send(()); } resource_task::Exit => break } @@ -362,10 +474,20 @@ fn should_return_decoded_image_data_if_data_has_arrived() { let image_cache_task = image_cache_task(mock_resource_task); let url = make_url(~"file", none); + let wait_for_prefetech = port(); + let wait_for_prefetech_chan = wait_for_prefetech.chan(); + + image_cache_task.send(OnMsg(|msg| { + match *msg { + StorePrefetchedImageData(*) => wait_for_prefetech_chan.send(()), + _ => () + } + })); + image_cache_task.send(Prefetch(copy url)); // Wait until our mock resource task has sent the image to the image cache - image_bin_sent.recv(); + wait_for_prefetech_chan.recv(); let response_port = port(); image_cache_task.send(GetImage(url, response_port.chan())); @@ -374,16 +496,13 @@ fn should_return_decoded_image_data_if_data_has_arrived() { _ => fail } - image_cache_task.send(Exit); + image_cache_task.exit(); mock_resource_task.send(resource_task::Exit); } #[test] fn should_return_decoded_image_data_for_multiple_requests() { - let image_bin_sent = port(); - let image_bin_sent_chan = image_bin_sent.chan(); - let mock_resource_task = do spawn_listener |from_client| { // infer me @@ -394,7 +513,6 @@ fn should_return_decoded_image_data_for_multiple_requests() { resource_task::Load(_, response) => { response.send(resource_task::Payload(test_image_bin())); response.send(resource_task::Done(result::ok(()))); - image_bin_sent_chan.send(()); } resource_task::Exit => break } @@ -404,10 +522,20 @@ fn should_return_decoded_image_data_for_multiple_requests() { let image_cache_task = image_cache_task(mock_resource_task); let url = make_url(~"file", none); + let wait_for_prefetech = port(); + let wait_for_prefetech_chan = wait_for_prefetech.chan(); + + image_cache_task.send(OnMsg(|msg| { + match *msg { + StorePrefetchedImageData(*) => wait_for_prefetech_chan.send(()), + _ => () + } + })); + image_cache_task.send(Prefetch(copy url)); // Wait until our mock resource task has sent the image to the image cache - image_bin_sent.recv(); + wait_for_prefetech.recv(); for iter::repeat(2) { let response_port = port(); @@ -418,7 +546,7 @@ fn should_return_decoded_image_data_for_multiple_requests() { } } - image_cache_task.send(Exit); + image_cache_task.exit(); mock_resource_task.send(resource_task::Exit); } @@ -454,10 +582,21 @@ fn should_not_request_image_from_resource_task_if_image_is_already_available() { let image_cache_task = image_cache_task(mock_resource_task); let url = make_url(~"file", none); + let wait_for_prefetech = port(); + let wait_for_prefetech_chan = wait_for_prefetech.chan(); + + image_cache_task.send(OnMsg(|msg| { + match *msg { + StorePrefetchedImageData(*) => wait_for_prefetech_chan.send(()), + _ => () + } + })); + image_cache_task.send(Prefetch(copy url)); // Wait until our mock resource task has sent the image to the image cache image_bin_sent.recv(); + wait_for_prefetech.recv(); let response_port = port(); image_cache_task.send(GetImage(copy url, response_port.chan())); @@ -472,7 +611,7 @@ fn should_not_request_image_from_resource_task_if_image_is_already_available() { image_cache_task.send(GetImage(url, response_port.chan())); response_port.recv(); - image_cache_task.send(Exit); + image_cache_task.exit(); mock_resource_task.send(resource_task::Exit); resource_task_exited.recv(); @@ -529,7 +668,7 @@ fn should_not_request_image_from_resource_task_if_image_fetch_already_failed() { image_cache_task.send(GetImage(url, response_port.chan())); response_port.recv(); - image_cache_task.send(Exit); + image_cache_task.exit(); mock_resource_task.send(resource_task::Exit); resource_task_exited.recv(); @@ -542,9 +681,6 @@ fn should_not_request_image_from_resource_task_if_image_fetch_already_failed() { #[test] fn should_return_not_ready_if_image_bin_cannot_be_fetched() { - let image_bin_sent = port(); - let image_bin_sent_chan = image_bin_sent.chan(); - let mock_resource_task = do spawn_listener |from_client| { // infer me @@ -556,7 +692,6 @@ fn should_return_not_ready_if_image_bin_cannot_be_fetched() { response.send(resource_task::Payload(test_image_bin())); // ERROR fetching image response.send(resource_task::Done(result::err(()))); - image_bin_sent_chan.send(()); } resource_task::Exit => break } @@ -566,10 +701,20 @@ fn should_return_not_ready_if_image_bin_cannot_be_fetched() { let image_cache_task = image_cache_task(mock_resource_task); let url = make_url(~"file", none); + let wait_for_prefetech = port(); + let wait_for_prefetech_chan = wait_for_prefetech.chan(); + + image_cache_task.send(OnMsg(|msg| { + match *msg { + StorePrefetchedImageData(*) => wait_for_prefetech_chan.send(()), + _ => () + } + })); + image_cache_task.send(Prefetch(copy url)); // Wait until our mock resource task has sent the image to the image cache - image_bin_sent.recv(); + wait_for_prefetech.recv(); let response_port = port(); image_cache_task.send(GetImage(url, response_port.chan())); @@ -578,16 +723,13 @@ fn should_return_not_ready_if_image_bin_cannot_be_fetched() { _ => fail } - image_cache_task.send(Exit); + image_cache_task.exit(); mock_resource_task.send(resource_task::Exit); } #[test] fn should_return_not_ready_for_multiple_get_image_requests_if_image_bin_cannot_be_fetched() { - let image_bin_sent = port(); - let image_bin_sent_chan = image_bin_sent.chan(); - let mock_resource_task = do spawn_listener |from_client| { // infer me @@ -599,7 +741,6 @@ fn should_return_not_ready_for_multiple_get_image_requests_if_image_bin_cannot_b response.send(resource_task::Payload(test_image_bin())); // ERROR fetching image response.send(resource_task::Done(result::err(()))); - image_bin_sent_chan.send(()); } resource_task::Exit => break } @@ -609,10 +750,20 @@ fn should_return_not_ready_for_multiple_get_image_requests_if_image_bin_cannot_b let image_cache_task = image_cache_task(mock_resource_task); let url = make_url(~"file", none); + let wait_for_prefetech = port(); + let wait_for_prefetech_chan = wait_for_prefetech.chan(); + + image_cache_task.send(OnMsg(|msg| { + match *msg { + StorePrefetchedImageData(*) => wait_for_prefetech_chan.send(()), + _ => () + } + })); + image_cache_task.send(Prefetch(copy url)); // Wait until our mock resource task has sent the image to the image cache - image_bin_sent.recv(); + wait_for_prefetech.recv(); let response_port = port(); image_cache_task.send(GetImage(copy url, response_port.chan())); @@ -629,6 +780,6 @@ fn should_return_not_ready_for_multiple_get_image_requests_if_image_bin_cannot_b _ => fail } - image_cache_task.send(Exit); + image_cache_task.exit(); mock_resource_task.send(resource_task::Exit); }