diff --git a/src/servo/content/content_task.rs b/src/servo/content/content_task.rs index d1ec2fa8fcc..8b2681624c1 100644 --- a/src/servo/content/content_task.rs +++ b/src/servo/content/content_task.rs @@ -66,7 +66,7 @@ fn ContentTask(layout_task: LayoutTask, resource_task: ResourceTask, img_cache_task: ImageCacheTask) -> ContentTask { do task().sched_mode(SingleThreaded).spawn_listener:: |from_master| { - let content = Content(layout_task, from_master, resource_task, img_cache_task); + let content = Content(layout_task, from_master, resource_task, img_cache_task.clone()); compositor.add_event_listener(content.event_port.chan()); content.start(); } @@ -164,7 +164,7 @@ impl Content { let result = html::hubbub_html_parser::parse_html(self.scope, copy url, self.resource_task, - self.image_cache_task); + self.image_cache_task.clone()); let root = result.root; let css_rules = result.style_port.recv(); diff --git a/src/servo/engine.rs b/src/servo/engine.rs index ace4fdd900b..6ec411ac55f 100644 --- a/src/servo/engine.rs +++ b/src/servo/engine.rs @@ -13,7 +13,15 @@ use resource::image_cache_task; use image_cache_task::{ImageCacheTask, image_cache_task, ImageCacheTaskClient}; use pipes::{Port, Chan}; -pub struct Engine { +pub type EngineTask = comm::Chan; + +pub enum Msg { + LoadURLMsg(Url), + ExitMsg(Chan<()>) +} + +struct Engine { + request_port: comm::Port, compositor: C, render_task: Renderer, resource_task: ResourceTask, @@ -22,29 +30,30 @@ pub struct Engine { content_task: ContentTask } -pub fn Engine(compositor: C, - resource_task: ResourceTask, - image_cache_task: ImageCacheTask) -> Engine { - let render_task = RenderTask(compositor); - let layout_task = LayoutTask(render_task, image_cache_task); - let content_task = ContentTask(layout_task, compositor, resource_task, image_cache_task); +fn Engine(compositor: C, + resource_task: ResourceTask, + image_cache_task: ImageCacheTask) -> EngineTask { + do spawn_listener:: |request| { + let render_task = RenderTask(compositor); + let layout_task = LayoutTask(render_task, image_cache_task.clone()); + let content_task = ContentTask(layout_task, compositor, resource_task, image_cache_task.clone()); - Engine { - compositor: compositor, - render_task: render_task, - resource_task: resource_task, - image_cache_task: image_cache_task, - layout_task: layout_task, - content_task: content_task + Engine { + request_port: request, + compositor: compositor, + render_task: render_task, + resource_task: resource_task, + image_cache_task: image_cache_task, + layout_task: layout_task, + content_task: content_task + }.run(); } } impl Engine { - fn start() -> comm::Chan { - do spawn_listener:: |request| { - while self.handle_request(request.recv()) { - // Go on... - } + fn run() { + while self.handle_request(self.request_port.recv()) { + // Go on... } } @@ -80,8 +89,3 @@ impl Engine { } } -pub enum Msg { - LoadURLMsg(Url), - ExitMsg(Chan<()>) -} - diff --git a/src/servo/image/holder.rs b/src/servo/image/holder.rs index d24a6f2d992..02d413870cf 100644 --- a/src/servo/image/holder.rs +++ b/src/servo/image/holder.rs @@ -78,17 +78,17 @@ impl ImageHolder { None => fail ~"expected to have a url" }; - let response_port = Port(); - self.image_cache_task.send(image_cache_task::GetImage(copy url, response_port.chan())); + let (response_chan, response_port) = pipes::stream(); + self.image_cache_task.send(image_cache_task::GetImage(copy url, response_chan)); self.image = match response_port.recv() { image_cache_task::ImageReady(image) => Some(clone(&image)), image_cache_task::ImageNotReady => { // Need to reflow when the image is available - let image_cache_task = self.image_cache_task; + let image_cache_task = self.image_cache_task.clone(); let reflow = copy self.reflow_cb; do task::spawn |copy url, move reflow| { - let response_port = Port(); - image_cache_task.send(image_cache_task::WaitForImage(copy url, response_port.chan())); + let (response_chan, response_port) = pipes::stream(); + image_cache_task.send(image_cache_task::WaitForImage(copy url, response_chan)); match response_port.recv() { image_cache_task::ImageReady(*) => reflow(), image_cache_task::ImageNotReady => fail /*not possible*/, diff --git a/src/servo/layout/box_builder.rs b/src/servo/layout/box_builder.rs index ecf75ba1308..1b52c55adac 100644 --- a/src/servo/layout/box_builder.rs +++ b/src/servo/layout/box_builder.rs @@ -224,7 +224,7 @@ impl LayoutTreeBuilder { // an ICE (mozilla/rust issue #3601) if d.image.is_some() { let holder = ImageHolder({copy *d.image.get_ref()}, - layout_ctx.image_cache, + layout_ctx.image_cache.clone(), copy layout_ctx.reflow_cb); @ImageBox(RenderBoxData(node, ctx, self.next_box_id()), holder) diff --git a/src/servo/layout/layout_task.rs b/src/servo/layout/layout_task.rs index 3141a6466b7..e1b80154857 100644 --- a/src/servo/layout/layout_task.rs +++ b/src/servo/layout/layout_task.rs @@ -53,7 +53,7 @@ pub enum Msg { fn LayoutTask(render_task: RenderTask, img_cache_task: ImageCacheTask) -> LayoutTask { do spawn_listener:: |from_content| { - Layout(render_task, img_cache_task, from_content).start(); + Layout(render_task, img_cache_task.clone(), from_content).start(); } } @@ -137,7 +137,7 @@ impl Layout { au::from_px(window_size.height as int)); let layout_ctx = LayoutContext { - image_cache: self.image_cache_task, + image_cache: self.image_cache_task.clone(), font_cache: self.font_cache, doc_url: doc_url, reflow_cb: || to_content.send(ReflowEvent), diff --git a/src/servo/resource/image_cache_task.rs b/src/servo/resource/image_cache_task.rs index 771950839c0..1363c48c2b6 100644 --- a/src/servo/resource/image_cache_task.rs +++ b/src/servo/resource/image_cache_task.rs @@ -2,7 +2,7 @@ use core::util::replace; use image::base::{Image, load_from_memory, test_image_bin}; use std::net::url::Url; use util::url::{make_url, UrlMap, url_map}; -use comm::{Chan, Port}; +use pipes::{stream, SharedChan, Chan, Port}; use task::{spawn, spawn_listener}; use resource::resource_task; use resource_task::ResourceTask; @@ -74,7 +74,7 @@ impl ImageResponseMsg: cmp::Eq { } } -pub type ImageCacheTask = Chan; +pub type ImageCacheTask = SharedChan; type DecoderFactory = ~fn() -> ~fn(&[u8]) -> Option; @@ -87,24 +87,37 @@ pub fn ImageCacheTask_(resource_task: ResourceTask, decoder_factory: DecoderFact // version of which contains an uncopyable type which rust will currently // copy unsoundly let decoder_factory_cell = Cell(move decoder_factory); - do spawn_listener |from_client, move decoder_factory_cell| { + + let (chan, port) = stream(); + let chan = SharedChan(move chan); + let port_cell = Cell(move port); + let chan_cell = Cell(chan.clone()); + + do spawn |move port_cell, move chan_cell, move decoder_factory_cell| { ImageCache { resource_task: resource_task, decoder_factory: decoder_factory_cell.take(), - from_client: from_client, + port: port_cell.take(), + chan: chan_cell.take(), state_map: url_map(), wait_map: url_map(), need_exit: None }.run(); } + + chan } fn SyncImageCacheTask(resource_task: ResourceTask) -> ImageCacheTask { - do spawn_listener |from_client: Port| { + let (chan, port) = stream(); + let port_cell = Cell(move port); + + do spawn |move port_cell| { + let port = port_cell.take(); let inner_cache = ImageCacheTask(resource_task); loop { - let msg = from_client.recv(); + let msg: Msg = port.recv(); match move msg { GetImage(move url, move response) => inner_cache.send(WaitForImage(url, response)), @@ -116,6 +129,8 @@ fn SyncImageCacheTask(resource_task: ResourceTask) -> ImageCacheTask { } } } + + return SharedChan(chan); } struct ImageCache { @@ -124,7 +139,9 @@ struct ImageCache { /// Creates image decoders decoder_factory: DecoderFactory, /// The port on which we'll receive client requests - from_client: Port, + port: Port, + /// A copy of the shared chan to give to child tasks + chan: SharedChan, /// The state of processsing an image for a URL state_map: UrlMap, /// List of clients waiting on a WaitForImage response @@ -154,7 +171,7 @@ impl ImageCache { let mut msg_handlers: ~[fn~(msg: &Msg)] = ~[]; loop { - let msg = self.from_client.recv(); + let msg = self.port.recv(); for msg_handlers.each |handler| { (*handler)(&msg) } @@ -219,7 +236,7 @@ impl ImageCache { priv fn prefetch(url: Url) { match self.get_state(copy url) { Init => { - let to_cache = self.from_client.chan(); + let to_cache = self.chan.clone(); let resource_task = self.resource_task; let url_cell = Cell(copy url); @@ -298,7 +315,7 @@ impl ImageCache { assert !data_cell.is_empty(); let data = data_cell.take(); - let to_cache = self.from_client.chan(); + let to_cache = self.chan.clone(); let url_cell = Cell(copy url); let decode = self.decoder_factory(); @@ -433,9 +450,9 @@ trait ImageCacheTaskClient { impl ImageCacheTask: ImageCacheTaskClient { fn exit() { - let response = Port(); - self.send(Exit(response.chan())); - response.recv(); + let (response_chan, response_port) = stream(); + self.send(Exit(response_chan)); + response_port.recv(); } } @@ -466,14 +483,10 @@ fn default_decoder_factory() -> ~fn(&[u8]) -> Option { } #[cfg(test)] -fn mock_resource_task(on_load: ~fn(resource: Chan)) -> ResourceTask { - do spawn_listener |from_client, move on_load| { - - // infer me - let from_client: Port = from_client; - +fn mock_resource_task(on_load: ~fn(resource: comm::Chan)) -> ResourceTask { + do spawn_listener |port: comm::Port, move on_load| { loop { - match from_client.recv() { + match port.recv() { resource_task::Load(_, response) => { on_load(response); } @@ -504,9 +517,9 @@ fn should_fail_if_unprefetched_image_is_requested() { let image_cache_task = ImageCacheTask(mock_resource_task); let url = make_url(~"file", None); - let request = Port(); - image_cache_task.send(GetImage(url, request.chan())); - request.recv(); + let (chan, port) = stream(); + image_cache_task.send(GetImage(url, chan)); + port.recv(); } #[test] @@ -556,8 +569,8 @@ fn should_fail_if_requesting_image_before_requesting_decode() { image_cache_task.send(Prefetch(copy url)); // no decode message - let response_port = Port(); - image_cache_task.send(GetImage(url, response_port.chan())); + let (chan, port) = stream(); + image_cache_task.send(GetImage(url, chan)); image_cache_task.exit(); mock_resource_task.send(resource_task::Exit); @@ -602,8 +615,8 @@ fn should_return_image_not_ready_if_data_has_not_arrived() { image_cache_task.send(Prefetch(copy url)); image_cache_task.send(Decode(copy url)); - let response_port = Port(); - image_cache_task.send(GetImage(url, response_port.chan())); + let (response_chan, response_port) = stream(); + image_cache_task.send(GetImage(url, response_chan)); assert response_port.recv() == ImageNotReady; wait_chan.send(()); image_cache_task.exit(); @@ -637,8 +650,8 @@ fn should_return_decoded_image_data_if_data_has_arrived() { // Wait until our mock resource task has sent the image to the image cache wait_for_image_chan.recv(); - let response_port = Port(); - image_cache_task.send(GetImage(url, response_port.chan())); + let (response_chan, response_port) = stream(); + image_cache_task.send(GetImage(url, response_chan)); match response_port.recv() { ImageReady(_) => (), _ => fail @@ -676,8 +689,8 @@ fn should_return_decoded_image_data_for_multiple_requests() { wait_for_image.recv(); for iter::repeat(2) { - let response_port = Port(); - image_cache_task.send(GetImage(copy url, response_port.chan())); + let (response_chan, response_port) = stream(); + image_cache_task.send(GetImage(copy url, response_chan)); match response_port.recv() { ImageReady(_) => (), _ => fail @@ -697,22 +710,18 @@ fn should_not_request_image_from_resource_task_if_image_is_already_available() { let resource_task_exited = Port(); let resource_task_exited_chan = resource_task_exited.chan(); - let mock_resource_task = do spawn_listener |from_client| { - - // infer me - let from_client: Port = from_client; - + let mock_resource_task = do spawn_listener |port: comm::Port| { loop { - match from_client.recv() { - resource_task::Load(_, response) => { - response.send(resource_task::Payload(test_image_bin())); - response.send(resource_task::Done(result::Ok(()))); - image_bin_sent_chan.send(()); - } - resource_task::Exit => { - resource_task_exited_chan.send(()); - break - } + match port.recv() { + resource_task::Load(_, response) => { + response.send(resource_task::Payload(test_image_bin())); + response.send(resource_task::Done(result::Ok(()))); + image_bin_sent_chan.send(()); + } + resource_task::Exit => { + resource_task_exited_chan.send(()); + break + } } } }; @@ -746,22 +755,18 @@ fn should_not_request_image_from_resource_task_if_image_fetch_already_failed() { let resource_task_exited = Port(); let resource_task_exited_chan = resource_task_exited.chan(); - let mock_resource_task = do spawn_listener |from_client| { - - // infer me - let from_client: Port = from_client; - + let mock_resource_task = do spawn_listener |port: comm::Port| { loop { - match from_client.recv() { - resource_task::Load(_, response) => { - response.send(resource_task::Payload(test_image_bin())); - response.send(resource_task::Done(result::Err(()))); - image_bin_sent_chan.send(()); - } - resource_task::Exit => { - resource_task_exited_chan.send(()); - break - } + match port.recv() { + resource_task::Load(_, response) => { + response.send(resource_task::Payload(test_image_bin())); + response.send(resource_task::Done(result::Err(()))); + image_bin_sent_chan.send(()); + } + resource_task::Exit => { + resource_task_exited_chan.send(()); + break + } } } }; @@ -816,8 +821,8 @@ fn should_return_failed_if_image_bin_cannot_be_fetched() { // Wait until our mock resource task has sent the image to the image cache wait_for_prefetech.recv(); - let response_port = Port(); - image_cache_task.send(GetImage(url, response_port.chan())); + let (response_chan, response_port) = stream(); + image_cache_task.send(GetImage(url, response_chan)); match response_port.recv() { ImageFailed => (), _ => fail @@ -855,16 +860,16 @@ fn should_return_failed_for_multiple_get_image_requests_if_image_bin_cannot_be_f // Wait until our mock resource task has sent the image to the image cache wait_for_prefetech.recv(); - let response_port = Port(); - image_cache_task.send(GetImage(copy url, response_port.chan())); + let (response_chan, response_port) = stream(); + image_cache_task.send(GetImage(copy url, response_chan)); match response_port.recv() { ImageFailed => (), _ => fail } // And ask again, we should get the same response - let response_port = Port(); - image_cache_task.send(GetImage(url, response_port.chan())); + let (response_chan, response_port) = stream(); + image_cache_task.send(GetImage(url, response_chan)); match response_port.recv() { ImageFailed => (), _ => fail @@ -914,8 +919,8 @@ fn should_return_not_ready_if_image_is_still_decoding() { wait_for_prefetech.recv(); // Make the request - let response_port = Port(); - image_cache_task.send(GetImage(url, response_port.chan())); + let (response_chan, response_port) = stream(); + image_cache_task.send(GetImage(url, response_chan)); match response_port.recv() { ImageNotReady => (), @@ -958,8 +963,8 @@ fn should_return_failed_if_image_decode_fails() { wait_for_decode.recv(); // Make the request - let response_port = Port(); - image_cache_task.send(GetImage(url, response_port.chan())); + let (response_chan, response_port) = stream(); + image_cache_task.send(GetImage(url, response_chan)); match response_port.recv() { ImageFailed => (), @@ -997,8 +1002,8 @@ fn should_return_image_on_wait_if_image_is_already_loaded() { // Wait until our mock resource task has sent the image to the image cache wait_for_decode.recv(); - let response_port = Port(); - image_cache_task.send(WaitForImage(url, response_port.chan())); + let (response_chan, response_port) = stream(); + image_cache_task.send(WaitForImage(url, response_chan)); match response_port.recv() { ImageReady(*) => (), _ => fail @@ -1025,8 +1030,8 @@ fn should_return_image_on_wait_if_image_is_not_yet_loaded() { image_cache_task.send(Prefetch(copy url)); image_cache_task.send(Decode(copy url)); - let response_port = Port(); - image_cache_task.send(WaitForImage(url, response_port.chan())); + let (response_chan, response_port) = stream(); + image_cache_task.send(WaitForImage(url, response_chan)); wait_chan.send(()); @@ -1056,8 +1061,8 @@ fn should_return_image_failed_on_wait_if_image_fails_to_load() { image_cache_task.send(Prefetch(copy url)); image_cache_task.send(Decode(copy url)); - let response_port = Port(); - image_cache_task.send(WaitForImage(url, response_port.chan())); + let (response_chan, response_port) = stream(); + image_cache_task.send(WaitForImage(url, response_chan)); wait_chan.send(()); @@ -1083,8 +1088,8 @@ fn sync_cache_should_wait_for_images() { image_cache_task.send(Prefetch(copy url)); image_cache_task.send(Decode(copy url)); - let response_port = Port(); - image_cache_task.send(GetImage(url, response_port.chan())); + let (response_chan, response_port) = stream(); + image_cache_task.send(GetImage(url, response_chan)); match response_port.recv() { ImageReady(_) => (), _ => fail diff --git a/src/servo/servo.rs b/src/servo/servo.rs index b41cb6822d1..c7c94520f68 100644 --- a/src/servo/servo.rs +++ b/src/servo/servo.rs @@ -42,13 +42,12 @@ fn run_pipeline_screen(urls: &[~str]) { // Create a servo instance let resource_task = ResourceTask(); let image_cache_task = ImageCacheTask(resource_task); - let engine = Engine(osmain, resource_task, image_cache_task); - let engine_chan = engine.start(); + let engine_task = Engine(osmain, resource_task, image_cache_task); for urls.each |filename| { let url = make_url(copy *filename, None); #debug["master: Sending url `%s`", url.to_str()]; - engine_chan.send(LoadURLMsg(url)); + engine_task.send(LoadURLMsg(url)); #debug["master: Waiting for keypress"]; match keypress_from_osmain.try_recv() { @@ -60,7 +59,7 @@ fn run_pipeline_screen(urls: &[~str]) { // Shut everything down #debug["master: Shut down"]; let (exit_chan, exit_response_from_engine) = pipes::stream(); - engine_chan.send(engine::ExitMsg(exit_chan)); + engine_task.send(engine::ExitMsg(exit_chan)); exit_response_from_engine.recv(); osmain.send(osmain::Exit); @@ -81,8 +80,7 @@ fn run_pipeline_png(url: ~str, outfile: &str) { // fulfilled before the first paint. let image_cache_task = SyncImageCacheTask(resource_task); let engine_task = Engine(compositor, resource_task, image_cache_task); - let engine_chan = engine_task.start(); - engine_chan.send(LoadURLMsg(make_url(copy url, None))); + engine_task.send(LoadURLMsg(make_url(copy url, None))); match buffered_file_writer(&Path(outfile)) { Ok(writer) => writer.write(pngdata_from_compositor.recv()), @@ -90,7 +88,7 @@ fn run_pipeline_png(url: ~str, outfile: &str) { } let (exit_chan, exit_response_from_engine) = pipes::stream(); - engine_chan.send(engine::ExitMsg(exit_chan)); + engine_task.send(engine::ExitMsg(exit_chan)); exit_response_from_engine.recv(); compositor.send(png_compositor::Exit); })