Convert image_cache_task to pipes

This commit is contained in:
Brian Anderson 2012-10-12 16:55:55 -07:00
parent b9e2571a35
commit af5985b6e2
7 changed files with 127 additions and 120 deletions

View file

@ -66,7 +66,7 @@ fn ContentTask<S: Compositor Send Copy>(layout_task: LayoutTask,
resource_task: ResourceTask, resource_task: ResourceTask,
img_cache_task: ImageCacheTask) -> ContentTask { img_cache_task: ImageCacheTask) -> ContentTask {
do task().sched_mode(SingleThreaded).spawn_listener::<ControlMsg> |from_master| { do task().sched_mode(SingleThreaded).spawn_listener::<ControlMsg> |from_master| {
let content = Content(layout_task, from_master, resource_task, img_cache_task); let content = Content(layout_task, from_master, resource_task, img_cache_task.clone());
compositor.add_event_listener(content.event_port.chan()); compositor.add_event_listener(content.event_port.chan());
content.start(); content.start();
} }
@ -164,7 +164,7 @@ impl Content {
let result = html::hubbub_html_parser::parse_html(self.scope, let result = html::hubbub_html_parser::parse_html(self.scope,
copy url, copy url,
self.resource_task, self.resource_task,
self.image_cache_task); self.image_cache_task.clone());
let root = result.root; let root = result.root;
let css_rules = result.style_port.recv(); let css_rules = result.style_port.recv();

View file

@ -13,7 +13,15 @@ use resource::image_cache_task;
use image_cache_task::{ImageCacheTask, image_cache_task, ImageCacheTaskClient}; use image_cache_task::{ImageCacheTask, image_cache_task, ImageCacheTaskClient};
use pipes::{Port, Chan}; use pipes::{Port, Chan};
pub struct Engine<C:Compositor Send Copy> { pub type EngineTask = comm::Chan<Msg>;
pub enum Msg {
LoadURLMsg(Url),
ExitMsg(Chan<()>)
}
struct Engine<C:Compositor Send Copy> {
request_port: comm::Port<Msg>,
compositor: C, compositor: C,
render_task: Renderer, render_task: Renderer,
resource_task: ResourceTask, resource_task: ResourceTask,
@ -22,29 +30,30 @@ pub struct Engine<C:Compositor Send Copy> {
content_task: ContentTask content_task: ContentTask
} }
pub fn Engine<C:Compositor Send Copy>(compositor: C, fn Engine<C:Compositor Send Copy>(compositor: C,
resource_task: ResourceTask, resource_task: ResourceTask,
image_cache_task: ImageCacheTask) -> Engine<C> { image_cache_task: ImageCacheTask) -> EngineTask {
let render_task = RenderTask(compositor); do spawn_listener::<Msg> |request| {
let layout_task = LayoutTask(render_task, image_cache_task); let render_task = RenderTask(compositor);
let content_task = ContentTask(layout_task, compositor, resource_task, image_cache_task); let layout_task = LayoutTask(render_task, image_cache_task.clone());
let content_task = ContentTask(layout_task, compositor, resource_task, image_cache_task.clone());
Engine { Engine {
compositor: compositor, request_port: request,
render_task: render_task, compositor: compositor,
resource_task: resource_task, render_task: render_task,
image_cache_task: image_cache_task, resource_task: resource_task,
layout_task: layout_task, image_cache_task: image_cache_task,
content_task: content_task layout_task: layout_task,
content_task: content_task
}.run();
} }
} }
impl<C: Compositor Copy Send> Engine<C> { impl<C: Compositor Copy Send> Engine<C> {
fn start() -> comm::Chan<Msg> { fn run() {
do spawn_listener::<Msg> |request| { while self.handle_request(self.request_port.recv()) {
while self.handle_request(request.recv()) { // Go on...
// Go on...
}
} }
} }
@ -80,8 +89,3 @@ impl<C: Compositor Copy Send> Engine<C> {
} }
} }
pub enum Msg {
LoadURLMsg(Url),
ExitMsg(Chan<()>)
}

View file

@ -78,17 +78,17 @@ impl ImageHolder {
None => fail ~"expected to have a url" None => fail ~"expected to have a url"
}; };
let response_port = Port(); let (response_chan, response_port) = pipes::stream();
self.image_cache_task.send(image_cache_task::GetImage(copy url, response_port.chan())); self.image_cache_task.send(image_cache_task::GetImage(copy url, response_chan));
self.image = match response_port.recv() { self.image = match response_port.recv() {
image_cache_task::ImageReady(image) => Some(clone(&image)), image_cache_task::ImageReady(image) => Some(clone(&image)),
image_cache_task::ImageNotReady => { image_cache_task::ImageNotReady => {
// Need to reflow when the image is available // Need to reflow when the image is available
let image_cache_task = self.image_cache_task; let image_cache_task = self.image_cache_task.clone();
let reflow = copy self.reflow_cb; let reflow = copy self.reflow_cb;
do task::spawn |copy url, move reflow| { do task::spawn |copy url, move reflow| {
let response_port = Port(); let (response_chan, response_port) = pipes::stream();
image_cache_task.send(image_cache_task::WaitForImage(copy url, response_port.chan())); image_cache_task.send(image_cache_task::WaitForImage(copy url, response_chan));
match response_port.recv() { match response_port.recv() {
image_cache_task::ImageReady(*) => reflow(), image_cache_task::ImageReady(*) => reflow(),
image_cache_task::ImageNotReady => fail /*not possible*/, image_cache_task::ImageNotReady => fail /*not possible*/,

View file

@ -224,7 +224,7 @@ impl LayoutTreeBuilder {
// an ICE (mozilla/rust issue #3601) // an ICE (mozilla/rust issue #3601)
if d.image.is_some() { if d.image.is_some() {
let holder = ImageHolder({copy *d.image.get_ref()}, let holder = ImageHolder({copy *d.image.get_ref()},
layout_ctx.image_cache, layout_ctx.image_cache.clone(),
copy layout_ctx.reflow_cb); copy layout_ctx.reflow_cb);
@ImageBox(RenderBoxData(node, ctx, self.next_box_id()), holder) @ImageBox(RenderBoxData(node, ctx, self.next_box_id()), holder)

View file

@ -53,7 +53,7 @@ pub enum Msg {
fn LayoutTask(render_task: RenderTask, fn LayoutTask(render_task: RenderTask,
img_cache_task: ImageCacheTask) -> LayoutTask { img_cache_task: ImageCacheTask) -> LayoutTask {
do spawn_listener::<Msg> |from_content| { do spawn_listener::<Msg> |from_content| {
Layout(render_task, img_cache_task, from_content).start(); Layout(render_task, img_cache_task.clone(), from_content).start();
} }
} }
@ -137,7 +137,7 @@ impl Layout {
au::from_px(window_size.height as int)); au::from_px(window_size.height as int));
let layout_ctx = LayoutContext { let layout_ctx = LayoutContext {
image_cache: self.image_cache_task, image_cache: self.image_cache_task.clone(),
font_cache: self.font_cache, font_cache: self.font_cache,
doc_url: doc_url, doc_url: doc_url,
reflow_cb: || to_content.send(ReflowEvent), reflow_cb: || to_content.send(ReflowEvent),

View file

@ -2,7 +2,7 @@ use core::util::replace;
use image::base::{Image, load_from_memory, test_image_bin}; use image::base::{Image, load_from_memory, test_image_bin};
use std::net::url::Url; use std::net::url::Url;
use util::url::{make_url, UrlMap, url_map}; use util::url::{make_url, UrlMap, url_map};
use comm::{Chan, Port}; use pipes::{stream, SharedChan, Chan, Port};
use task::{spawn, spawn_listener}; use task::{spawn, spawn_listener};
use resource::resource_task; use resource::resource_task;
use resource_task::ResourceTask; use resource_task::ResourceTask;
@ -74,7 +74,7 @@ impl ImageResponseMsg: cmp::Eq {
} }
} }
pub type ImageCacheTask = Chan<Msg>; pub type ImageCacheTask = SharedChan<Msg>;
type DecoderFactory = ~fn() -> ~fn(&[u8]) -> Option<Image>; type DecoderFactory = ~fn() -> ~fn(&[u8]) -> Option<Image>;
@ -87,24 +87,37 @@ pub fn ImageCacheTask_(resource_task: ResourceTask, decoder_factory: DecoderFact
// version of which contains an uncopyable type which rust will currently // version of which contains an uncopyable type which rust will currently
// copy unsoundly // copy unsoundly
let decoder_factory_cell = Cell(move decoder_factory); let decoder_factory_cell = Cell(move decoder_factory);
do spawn_listener |from_client, move decoder_factory_cell| {
let (chan, port) = stream();
let chan = SharedChan(move chan);
let port_cell = Cell(move port);
let chan_cell = Cell(chan.clone());
do spawn |move port_cell, move chan_cell, move decoder_factory_cell| {
ImageCache { ImageCache {
resource_task: resource_task, resource_task: resource_task,
decoder_factory: decoder_factory_cell.take(), decoder_factory: decoder_factory_cell.take(),
from_client: from_client, port: port_cell.take(),
chan: chan_cell.take(),
state_map: url_map(), state_map: url_map(),
wait_map: url_map(), wait_map: url_map(),
need_exit: None need_exit: None
}.run(); }.run();
} }
chan
} }
fn SyncImageCacheTask(resource_task: ResourceTask) -> ImageCacheTask { fn SyncImageCacheTask(resource_task: ResourceTask) -> ImageCacheTask {
do spawn_listener |from_client: Port<Msg>| { let (chan, port) = stream();
let port_cell = Cell(move port);
do spawn |move port_cell| {
let port = port_cell.take();
let inner_cache = ImageCacheTask(resource_task); let inner_cache = ImageCacheTask(resource_task);
loop { loop {
let msg = from_client.recv(); let msg: Msg = port.recv();
match move msg { match move msg {
GetImage(move url, move response) => inner_cache.send(WaitForImage(url, response)), GetImage(move url, move response) => inner_cache.send(WaitForImage(url, response)),
@ -116,6 +129,8 @@ fn SyncImageCacheTask(resource_task: ResourceTask) -> ImageCacheTask {
} }
} }
} }
return SharedChan(chan);
} }
struct ImageCache { struct ImageCache {
@ -124,7 +139,9 @@ struct ImageCache {
/// Creates image decoders /// Creates image decoders
decoder_factory: DecoderFactory, decoder_factory: DecoderFactory,
/// The port on which we'll receive client requests /// The port on which we'll receive client requests
from_client: Port<Msg>, port: Port<Msg>,
/// A copy of the shared chan to give to child tasks
chan: SharedChan<Msg>,
/// The state of processsing an image for a URL /// The state of processsing an image for a URL
state_map: UrlMap<ImageState>, state_map: UrlMap<ImageState>,
/// List of clients waiting on a WaitForImage response /// List of clients waiting on a WaitForImage response
@ -154,7 +171,7 @@ impl ImageCache {
let mut msg_handlers: ~[fn~(msg: &Msg)] = ~[]; let mut msg_handlers: ~[fn~(msg: &Msg)] = ~[];
loop { loop {
let msg = self.from_client.recv(); let msg = self.port.recv();
for msg_handlers.each |handler| { (*handler)(&msg) } for msg_handlers.each |handler| { (*handler)(&msg) }
@ -219,7 +236,7 @@ impl ImageCache {
priv fn prefetch(url: Url) { priv fn prefetch(url: Url) {
match self.get_state(copy url) { match self.get_state(copy url) {
Init => { Init => {
let to_cache = self.from_client.chan(); let to_cache = self.chan.clone();
let resource_task = self.resource_task; let resource_task = self.resource_task;
let url_cell = Cell(copy url); let url_cell = Cell(copy url);
@ -298,7 +315,7 @@ impl ImageCache {
assert !data_cell.is_empty(); assert !data_cell.is_empty();
let data = data_cell.take(); let data = data_cell.take();
let to_cache = self.from_client.chan(); let to_cache = self.chan.clone();
let url_cell = Cell(copy url); let url_cell = Cell(copy url);
let decode = self.decoder_factory(); let decode = self.decoder_factory();
@ -433,9 +450,9 @@ trait ImageCacheTaskClient {
impl ImageCacheTask: ImageCacheTaskClient { impl ImageCacheTask: ImageCacheTaskClient {
fn exit() { fn exit() {
let response = Port(); let (response_chan, response_port) = stream();
self.send(Exit(response.chan())); self.send(Exit(response_chan));
response.recv(); response_port.recv();
} }
} }
@ -466,14 +483,10 @@ fn default_decoder_factory() -> ~fn(&[u8]) -> Option<Image> {
} }
#[cfg(test)] #[cfg(test)]
fn mock_resource_task(on_load: ~fn(resource: Chan<resource_task::ProgressMsg>)) -> ResourceTask { fn mock_resource_task(on_load: ~fn(resource: comm::Chan<resource_task::ProgressMsg>)) -> ResourceTask {
do spawn_listener |from_client, move on_load| { do spawn_listener |port: comm::Port<resource_task::ControlMsg>, move on_load| {
// infer me
let from_client: Port<resource_task::ControlMsg> = from_client;
loop { loop {
match from_client.recv() { match port.recv() {
resource_task::Load(_, response) => { resource_task::Load(_, response) => {
on_load(response); on_load(response);
} }
@ -504,9 +517,9 @@ fn should_fail_if_unprefetched_image_is_requested() {
let image_cache_task = ImageCacheTask(mock_resource_task); let image_cache_task = ImageCacheTask(mock_resource_task);
let url = make_url(~"file", None); let url = make_url(~"file", None);
let request = Port(); let (chan, port) = stream();
image_cache_task.send(GetImage(url, request.chan())); image_cache_task.send(GetImage(url, chan));
request.recv(); port.recv();
} }
#[test] #[test]
@ -556,8 +569,8 @@ fn should_fail_if_requesting_image_before_requesting_decode() {
image_cache_task.send(Prefetch(copy url)); image_cache_task.send(Prefetch(copy url));
// no decode message // no decode message
let response_port = Port(); let (chan, port) = stream();
image_cache_task.send(GetImage(url, response_port.chan())); image_cache_task.send(GetImage(url, chan));
image_cache_task.exit(); image_cache_task.exit();
mock_resource_task.send(resource_task::Exit); mock_resource_task.send(resource_task::Exit);
@ -602,8 +615,8 @@ fn should_return_image_not_ready_if_data_has_not_arrived() {
image_cache_task.send(Prefetch(copy url)); image_cache_task.send(Prefetch(copy url));
image_cache_task.send(Decode(copy url)); image_cache_task.send(Decode(copy url));
let response_port = Port(); let (response_chan, response_port) = stream();
image_cache_task.send(GetImage(url, response_port.chan())); image_cache_task.send(GetImage(url, response_chan));
assert response_port.recv() == ImageNotReady; assert response_port.recv() == ImageNotReady;
wait_chan.send(()); wait_chan.send(());
image_cache_task.exit(); image_cache_task.exit();
@ -637,8 +650,8 @@ fn should_return_decoded_image_data_if_data_has_arrived() {
// Wait until our mock resource task has sent the image to the image cache // Wait until our mock resource task has sent the image to the image cache
wait_for_image_chan.recv(); wait_for_image_chan.recv();
let response_port = Port(); let (response_chan, response_port) = stream();
image_cache_task.send(GetImage(url, response_port.chan())); image_cache_task.send(GetImage(url, response_chan));
match response_port.recv() { match response_port.recv() {
ImageReady(_) => (), ImageReady(_) => (),
_ => fail _ => fail
@ -676,8 +689,8 @@ fn should_return_decoded_image_data_for_multiple_requests() {
wait_for_image.recv(); wait_for_image.recv();
for iter::repeat(2) { for iter::repeat(2) {
let response_port = Port(); let (response_chan, response_port) = stream();
image_cache_task.send(GetImage(copy url, response_port.chan())); image_cache_task.send(GetImage(copy url, response_chan));
match response_port.recv() { match response_port.recv() {
ImageReady(_) => (), ImageReady(_) => (),
_ => fail _ => fail
@ -697,22 +710,18 @@ fn should_not_request_image_from_resource_task_if_image_is_already_available() {
let resource_task_exited = Port(); let resource_task_exited = Port();
let resource_task_exited_chan = resource_task_exited.chan(); let resource_task_exited_chan = resource_task_exited.chan();
let mock_resource_task = do spawn_listener |from_client| { let mock_resource_task = do spawn_listener |port: comm::Port<resource_task::ControlMsg>| {
// infer me
let from_client: Port<resource_task::ControlMsg> = from_client;
loop { loop {
match from_client.recv() { match port.recv() {
resource_task::Load(_, response) => { resource_task::Load(_, response) => {
response.send(resource_task::Payload(test_image_bin())); response.send(resource_task::Payload(test_image_bin()));
response.send(resource_task::Done(result::Ok(()))); response.send(resource_task::Done(result::Ok(())));
image_bin_sent_chan.send(()); image_bin_sent_chan.send(());
} }
resource_task::Exit => { resource_task::Exit => {
resource_task_exited_chan.send(()); resource_task_exited_chan.send(());
break break
} }
} }
} }
}; };
@ -746,22 +755,18 @@ fn should_not_request_image_from_resource_task_if_image_fetch_already_failed() {
let resource_task_exited = Port(); let resource_task_exited = Port();
let resource_task_exited_chan = resource_task_exited.chan(); let resource_task_exited_chan = resource_task_exited.chan();
let mock_resource_task = do spawn_listener |from_client| { let mock_resource_task = do spawn_listener |port: comm::Port<resource_task::ControlMsg>| {
// infer me
let from_client: Port<resource_task::ControlMsg> = from_client;
loop { loop {
match from_client.recv() { match port.recv() {
resource_task::Load(_, response) => { resource_task::Load(_, response) => {
response.send(resource_task::Payload(test_image_bin())); response.send(resource_task::Payload(test_image_bin()));
response.send(resource_task::Done(result::Err(()))); response.send(resource_task::Done(result::Err(())));
image_bin_sent_chan.send(()); image_bin_sent_chan.send(());
} }
resource_task::Exit => { resource_task::Exit => {
resource_task_exited_chan.send(()); resource_task_exited_chan.send(());
break break
} }
} }
} }
}; };
@ -816,8 +821,8 @@ fn should_return_failed_if_image_bin_cannot_be_fetched() {
// Wait until our mock resource task has sent the image to the image cache // Wait until our mock resource task has sent the image to the image cache
wait_for_prefetech.recv(); wait_for_prefetech.recv();
let response_port = Port(); let (response_chan, response_port) = stream();
image_cache_task.send(GetImage(url, response_port.chan())); image_cache_task.send(GetImage(url, response_chan));
match response_port.recv() { match response_port.recv() {
ImageFailed => (), ImageFailed => (),
_ => fail _ => fail
@ -855,16 +860,16 @@ fn should_return_failed_for_multiple_get_image_requests_if_image_bin_cannot_be_f
// Wait until our mock resource task has sent the image to the image cache // Wait until our mock resource task has sent the image to the image cache
wait_for_prefetech.recv(); wait_for_prefetech.recv();
let response_port = Port(); let (response_chan, response_port) = stream();
image_cache_task.send(GetImage(copy url, response_port.chan())); image_cache_task.send(GetImage(copy url, response_chan));
match response_port.recv() { match response_port.recv() {
ImageFailed => (), ImageFailed => (),
_ => fail _ => fail
} }
// And ask again, we should get the same response // And ask again, we should get the same response
let response_port = Port(); let (response_chan, response_port) = stream();
image_cache_task.send(GetImage(url, response_port.chan())); image_cache_task.send(GetImage(url, response_chan));
match response_port.recv() { match response_port.recv() {
ImageFailed => (), ImageFailed => (),
_ => fail _ => fail
@ -914,8 +919,8 @@ fn should_return_not_ready_if_image_is_still_decoding() {
wait_for_prefetech.recv(); wait_for_prefetech.recv();
// Make the request // Make the request
let response_port = Port(); let (response_chan, response_port) = stream();
image_cache_task.send(GetImage(url, response_port.chan())); image_cache_task.send(GetImage(url, response_chan));
match response_port.recv() { match response_port.recv() {
ImageNotReady => (), ImageNotReady => (),
@ -958,8 +963,8 @@ fn should_return_failed_if_image_decode_fails() {
wait_for_decode.recv(); wait_for_decode.recv();
// Make the request // Make the request
let response_port = Port(); let (response_chan, response_port) = stream();
image_cache_task.send(GetImage(url, response_port.chan())); image_cache_task.send(GetImage(url, response_chan));
match response_port.recv() { match response_port.recv() {
ImageFailed => (), ImageFailed => (),
@ -997,8 +1002,8 @@ fn should_return_image_on_wait_if_image_is_already_loaded() {
// Wait until our mock resource task has sent the image to the image cache // Wait until our mock resource task has sent the image to the image cache
wait_for_decode.recv(); wait_for_decode.recv();
let response_port = Port(); let (response_chan, response_port) = stream();
image_cache_task.send(WaitForImage(url, response_port.chan())); image_cache_task.send(WaitForImage(url, response_chan));
match response_port.recv() { match response_port.recv() {
ImageReady(*) => (), ImageReady(*) => (),
_ => fail _ => fail
@ -1025,8 +1030,8 @@ fn should_return_image_on_wait_if_image_is_not_yet_loaded() {
image_cache_task.send(Prefetch(copy url)); image_cache_task.send(Prefetch(copy url));
image_cache_task.send(Decode(copy url)); image_cache_task.send(Decode(copy url));
let response_port = Port(); let (response_chan, response_port) = stream();
image_cache_task.send(WaitForImage(url, response_port.chan())); image_cache_task.send(WaitForImage(url, response_chan));
wait_chan.send(()); wait_chan.send(());
@ -1056,8 +1061,8 @@ fn should_return_image_failed_on_wait_if_image_fails_to_load() {
image_cache_task.send(Prefetch(copy url)); image_cache_task.send(Prefetch(copy url));
image_cache_task.send(Decode(copy url)); image_cache_task.send(Decode(copy url));
let response_port = Port(); let (response_chan, response_port) = stream();
image_cache_task.send(WaitForImage(url, response_port.chan())); image_cache_task.send(WaitForImage(url, response_chan));
wait_chan.send(()); wait_chan.send(());
@ -1083,8 +1088,8 @@ fn sync_cache_should_wait_for_images() {
image_cache_task.send(Prefetch(copy url)); image_cache_task.send(Prefetch(copy url));
image_cache_task.send(Decode(copy url)); image_cache_task.send(Decode(copy url));
let response_port = Port(); let (response_chan, response_port) = stream();
image_cache_task.send(GetImage(url, response_port.chan())); image_cache_task.send(GetImage(url, response_chan));
match response_port.recv() { match response_port.recv() {
ImageReady(_) => (), ImageReady(_) => (),
_ => fail _ => fail

View file

@ -42,13 +42,12 @@ fn run_pipeline_screen(urls: &[~str]) {
// Create a servo instance // Create a servo instance
let resource_task = ResourceTask(); let resource_task = ResourceTask();
let image_cache_task = ImageCacheTask(resource_task); let image_cache_task = ImageCacheTask(resource_task);
let engine = Engine(osmain, resource_task, image_cache_task); let engine_task = Engine(osmain, resource_task, image_cache_task);
let engine_chan = engine.start();
for urls.each |filename| { for urls.each |filename| {
let url = make_url(copy *filename, None); let url = make_url(copy *filename, None);
#debug["master: Sending url `%s`", url.to_str()]; #debug["master: Sending url `%s`", url.to_str()];
engine_chan.send(LoadURLMsg(url)); engine_task.send(LoadURLMsg(url));
#debug["master: Waiting for keypress"]; #debug["master: Waiting for keypress"];
match keypress_from_osmain.try_recv() { match keypress_from_osmain.try_recv() {
@ -60,7 +59,7 @@ fn run_pipeline_screen(urls: &[~str]) {
// Shut everything down // Shut everything down
#debug["master: Shut down"]; #debug["master: Shut down"];
let (exit_chan, exit_response_from_engine) = pipes::stream(); let (exit_chan, exit_response_from_engine) = pipes::stream();
engine_chan.send(engine::ExitMsg(exit_chan)); engine_task.send(engine::ExitMsg(exit_chan));
exit_response_from_engine.recv(); exit_response_from_engine.recv();
osmain.send(osmain::Exit); osmain.send(osmain::Exit);
@ -81,8 +80,7 @@ fn run_pipeline_png(url: ~str, outfile: &str) {
// fulfilled before the first paint. // fulfilled before the first paint.
let image_cache_task = SyncImageCacheTask(resource_task); let image_cache_task = SyncImageCacheTask(resource_task);
let engine_task = Engine(compositor, resource_task, image_cache_task); let engine_task = Engine(compositor, resource_task, image_cache_task);
let engine_chan = engine_task.start(); engine_task.send(LoadURLMsg(make_url(copy url, None)));
engine_chan.send(LoadURLMsg(make_url(copy url, None)));
match buffered_file_writer(&Path(outfile)) { match buffered_file_writer(&Path(outfile)) {
Ok(writer) => writer.write(pngdata_from_compositor.recv()), Ok(writer) => writer.write(pngdata_from_compositor.recv()),
@ -90,7 +88,7 @@ fn run_pipeline_png(url: ~str, outfile: &str) {
} }
let (exit_chan, exit_response_from_engine) = pipes::stream(); let (exit_chan, exit_response_from_engine) = pipes::stream();
engine_chan.send(engine::ExitMsg(exit_chan)); engine_task.send(engine::ExitMsg(exit_chan));
exit_response_from_engine.recv(); exit_response_from_engine.recv();
compositor.send(png_compositor::Exit); compositor.send(png_compositor::Exit);
}) })