From 1644436557f8252368cc099d8d952d6e3796c989 Mon Sep 17 00:00:00 2001 From: Josh Matthews Date: Sun, 25 Jan 2015 10:44:28 -0500 Subject: [PATCH 01/11] Start switching net/ to use abstractions over channels to allow introducing non-channel communication in the future. --- components/net/about_loader.rs | 8 ++-- components/net/data_loader.rs | 9 ++-- components/net/file_loader.rs | 13 +++--- components/net/http_loader.rs | 19 ++++----- components/net/resource_task.rs | 50 +++++++++++++---------- components/net_traits/image_cache_task.rs | 3 +- components/net_traits/lib.rs | 39 +++++++++++++++--- components/script/dom/xmlhttprequest.rs | 17 ++++---- components/script/script_task.rs | 5 +-- components/servo/Cargo.lock | 1 + tests/unit/net/data_loader.rs | 3 +- tests/unit/net/image_cache_task.rs | 14 +++---- tests/unit/net/resource_task.rs | 6 +-- 13 files changed, 108 insertions(+), 79 deletions(-) diff --git a/components/net/about_loader.rs b/components/net/about_loader.rs index cd7d40671fd..e12fc32839c 100644 --- a/components/net/about_loader.rs +++ b/components/net/about_loader.rs @@ -2,7 +2,7 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use net_traits::{LoadData, Metadata}; +use net_traits::{LoadData, Metadata,ResponseSenders}; use net_traits::ProgressMsg::Done; use mime_classifier::MIMEClassifier; use resource_task::start_sending; @@ -18,10 +18,9 @@ use std::borrow::IntoCow; use std::fs::PathExt; use std::sync::Arc; -pub fn factory(mut load_data: LoadData, classifier: Arc) { +pub fn factory(mut load_data: LoadData, start_chan: ResponseSenders, classifier: Arc) { match load_data.url.non_relative_scheme_data().unwrap() { "blank" => { - let start_chan = load_data.consumer; let chan = start_sending(start_chan, Metadata { final_url: load_data.url, content_type: Some(ContentType(Mime(TopLevel::Text, SubLevel::Html, vec![]))), @@ -40,11 +39,10 @@ pub fn factory(mut load_data: LoadData, classifier: Arc) { load_data.url = Url::from_file_path(&*path).unwrap(); } _ => { - let start_chan = load_data.consumer; start_sending(start_chan, Metadata::default(load_data.url)) .send(Done(Err("Unknown about: URL.".to_string()))).unwrap(); return } }; - file_loader::factory(load_data, classifier) + file_loader::factory(load_data, start_chan, classifier) } diff --git a/components/net/data_loader.rs b/components/net/data_loader.rs index 03ec60be3a8..d4146a0f4b1 100644 --- a/components/net/data_loader.rs +++ b/components/net/data_loader.rs @@ -2,7 +2,7 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use net_traits::{LoadData, Metadata}; +use net_traits::{LoadData, Metadata, ResponseSenders}; use net_traits::ProgressMsg::{Payload, Done}; use mime_classifier::MIMEClassifier; use resource_task::start_sending; @@ -13,16 +13,15 @@ use hyper::mime::Mime; use std::sync::Arc; use url::{percent_decode, SchemeData}; -pub fn factory(load_data: LoadData, _classifier: Arc) { +pub fn factory(load_data: LoadData, senders: ResponseSenders, _classifier: Arc) { // NB: we don't spawn a new task. // Hypothesis: data URLs are too small for parallel base64 etc. to be worth it. // Should be tested at some point. // Left in separate function to allow easy moving to a task, if desired. - load(load_data) + load(load_data, senders) } -pub fn load(load_data: LoadData) { - let start_chan = load_data.consumer; +pub fn load(load_data: LoadData, start_chan: ResponseSenders) { let url = load_data.url; assert!(&*url.scheme == "data"); diff --git a/components/net/file_loader.rs b/components/net/file_loader.rs index 64539c29477..26fda4074c1 100644 --- a/components/net/file_loader.rs +++ b/components/net/file_loader.rs @@ -2,7 +2,7 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use net_traits::{LoadData, Metadata, ProgressMsg}; +use net_traits::{LoadData, Metadata, ProgressMsg, ResponseSenders}; use net_traits::ProgressMsg::{Payload, Done}; use mime_classifier::MIMEClassifier; use resource_task::{start_sending, start_sending_sniffed}; @@ -44,9 +44,8 @@ fn read_all(reader: &mut File, progress_chan: &Sender) } } -pub fn factory(load_data: LoadData, classifier: Arc) { +pub fn factory(load_data: LoadData, senders: ResponseSenders, classifier: Arc) { let url = load_data.url; - let start_chan = load_data.consumer; assert!(&*url.scheme == "file"); spawn_named("file_loader".to_owned(), move || { let metadata = Metadata::default(url.clone()); @@ -58,24 +57,24 @@ pub fn factory(load_data: LoadData, classifier: Arc) { let res = read_block(reader); let (res, progress_chan) = match res { Ok(ReadStatus::Partial(buf)) => { - let progress_chan = start_sending_sniffed(start_chan, metadata, + let progress_chan = start_sending_sniffed(senders, metadata, classifier, &buf); progress_chan.send(Payload(buf)).unwrap(); (read_all(reader, &progress_chan), progress_chan) } Ok(ReadStatus::EOF) | Err(_) => - (res.map(|_| ()), start_sending(start_chan, metadata)), + (res.map(|_| ()), start_sending(senders, metadata)), }; progress_chan.send(Done(res)).unwrap(); } Err(e) => { - let progress_chan = start_sending(start_chan, metadata); + let progress_chan = start_sending(senders, metadata); progress_chan.send(Done(Err(e.description().to_string()))).unwrap(); } } } Err(_) => { - let progress_chan = start_sending(start_chan, metadata); + let progress_chan = start_sending(senders, metadata); progress_chan.send(Done(Err(url.to_string()))).unwrap(); } } diff --git a/components/net/http_loader.rs b/components/net/http_loader.rs index d544c307401..bccfb03b8a4 100644 --- a/components/net/http_loader.rs +++ b/components/net/http_loader.rs @@ -2,7 +2,7 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use net_traits::{ControlMsg, CookieSource, LoadData, LoadResponse, Metadata}; +use net_traits::{ControlMsg, CookieSource, LoadData, Metadata, ResponseSenders}; use net_traits::ProgressMsg::{Payload, Done}; use mime_classifier::MIMEClassifier; use resource_task::{start_sending_opt, start_sending_sniffed_opt}; @@ -32,13 +32,13 @@ use url::{Url, UrlParser}; use std::borrow::ToOwned; pub fn factory(cookies_chan: Sender) - -> Box)> + Send> { - box move |(load_data, classifier)| { - spawn_named("http_loader".to_owned(), move || load(load_data, classifier, cookies_chan)) + -> Box)> + Send> { + box move |(load_data, senders, classifier)| { + spawn_named("http_loader".to_owned(), move || load(load_data, senders, classifier, cookies_chan)) } } -fn send_error(url: Url, err: String, start_chan: Sender) { +fn send_error(url: Url, err: String, start_chan: ResponseSenders) { let mut metadata: Metadata = Metadata::default(url); metadata.status = None; @@ -66,13 +66,12 @@ fn read_block(reader: &mut R) -> Result { } } -fn load(mut load_data: LoadData, classifier: Arc, cookies_chan: Sender) { +fn load(mut load_data: LoadData, start_chan: ResponseSenders, classifier: Arc, cookies_chan: Sender) { // FIXME: At the time of writing this FIXME, servo didn't have any central // location for configuration. If you're reading this and such a // repository DOES exist, please update this constant to use it. let max_redirects = 50; let mut iters = 0; - let start_chan = load_data.consumer; let mut url = load_data.url.clone(); let mut redirected_to = HashSet::new(); @@ -140,8 +139,8 @@ reason: \"certificate verify failed\" }]"; ) => { let mut image = resources_dir_path(); image.push("badcert.html"); - let load_data = LoadData::new(Url::from_file_path(&*image).unwrap(), start_chan); - file_loader::factory(load_data, classifier); + let load_data = LoadData::new(Url::from_file_path(&*image).unwrap()); + file_loader::factory(load_data, start_chan, classifier); return; }, Err(e) => { @@ -340,7 +339,7 @@ reason: \"certificate verify failed\" }]"; } fn send_data(reader: &mut R, - start_chan: Sender, + start_chan: ResponseSenders, metadata: Metadata, classifier: Arc) { let (progress_chan, mut chunk) = { diff --git a/components/net/resource_task.rs b/components/net/resource_task.rs index 9952190c23b..6f0d6994113 100644 --- a/components/net/resource_task.rs +++ b/components/net/resource_task.rs @@ -12,7 +12,7 @@ use cookie_storage::CookieStorage; use cookie; use mime_classifier::MIMEClassifier; -use net_traits::{ControlMsg, LoadData, LoadResponse}; +use net_traits::{ControlMsg, LoadData, LoadResponse, ResponseSenders, LoadConsumer}; use net_traits::{Metadata, ProgressMsg, ResourceTask}; use net_traits::ProgressMsg::Done; use util::opts; @@ -59,19 +59,19 @@ pub fn global_init() { } /// For use by loaders in responding to a Load message. -pub fn start_sending(start_chan: Sender, metadata: Metadata) -> Sender { +pub fn start_sending(start_chan: ResponseSenders, metadata: Metadata) -> Sender { start_sending_opt(start_chan, metadata).ok().unwrap() } /// For use by loaders in responding to a Load message that allows content sniffing. -pub fn start_sending_sniffed(start_chan: Sender, metadata: Metadata, +pub fn start_sending_sniffed(start_chan: ResponseSenders, metadata: Metadata, classifier: Arc, partial_body: &Vec) -> Sender { start_sending_sniffed_opt(start_chan, metadata, classifier, partial_body).ok().unwrap() } /// For use by loaders in responding to a Load message that allows content sniffing. -pub fn start_sending_sniffed_opt(start_chan: Sender, mut metadata: Metadata, +pub fn start_sending_sniffed_opt(start_chan: ResponseSenders, mut metadata: Metadata, classifier: Arc, partial_body: &Vec) -> Result, ()> { if opts::get().sniff_mime_types { @@ -94,15 +94,20 @@ pub fn start_sending_sniffed_opt(start_chan: Sender, mut metadata: } /// For use by loaders in responding to a Load message. -pub fn start_sending_opt(start_chan: Sender, metadata: Metadata) -> Result, ()> { - let (progress_chan, progress_port) = channel(); - let result = start_chan.send(LoadResponse { - metadata: metadata, - progress_port: progress_port, - }); - match result { - Ok(_) => Ok(progress_chan), - Err(_) => Err(()) +pub fn start_sending_opt(start_chan: ResponseSenders, metadata: Metadata) -> Result, ()> { + match start_chan { + ResponseSenders::Channel(start_chan) => { + let (progress_chan, progress_port) = channel(); + let result = start_chan.send(LoadResponse { + metadata: metadata, + progress_port: progress_port, + }); + match result { + Ok(_) => Ok(progress_chan), + Err(_) => Err(()) + } + } + ResponseSenders::Listener(_) => panic!(), } } @@ -176,8 +181,8 @@ impl ResourceManager { fn start(&mut self) { loop { match self.from_client.recv().unwrap() { - ControlMsg::Load(load_data) => { - self.load(load_data) + ControlMsg::Load(load_data, consumer) => { + self.load(load_data, consumer) } ControlMsg::SetCookiesForUrl(request, cookie_list, source) => { let header = Header::parse_header(&[cookie_list.into_bytes()]); @@ -199,7 +204,7 @@ impl ResourceManager { } } - fn load(&mut self, mut load_data: LoadData) { + fn load(&mut self, mut load_data: LoadData, consumer: LoadConsumer) { unsafe { if let Some(host_table) = HOST_TABLE { load_data = replace_hosts(load_data, host_table); @@ -208,13 +213,14 @@ impl ResourceManager { self.user_agent.as_ref().map(|ua| load_data.headers.set(UserAgent(ua.clone()))); - fn from_factory(factory: fn(LoadData, Arc)) - -> Box)> + Send> { - box move |(load_data, classifier)| { - factory(load_data, classifier) + fn from_factory(factory: fn(LoadData, ResponseSenders, Arc)) + -> Box)> + Send> { + box move |(load_data, senders, classifier)| { + factory(load_data, senders, classifier) } } + let senders = ResponseSenders::from_consumer(consumer); let loader = match &*load_data.url.scheme { "file" => from_factory(file_loader::factory), "http" | "https" | "view-source" => http_loader::factory(self.resource_task.clone()), @@ -222,13 +228,13 @@ impl ResourceManager { "about" => from_factory(about_loader::factory), _ => { debug!("resource_task: no loader for scheme {}", load_data.url.scheme); - start_sending(load_data.consumer, Metadata::default(load_data.url)) + start_sending(senders, Metadata::default(load_data.url)) .send(ProgressMsg::Done(Err("no loader for scheme".to_string()))).unwrap(); return } }; debug!("resource_task: loading url: {}", load_data.url.serialize()); - loader.invoke((load_data, self.mime_classifier.clone())); + loader.invoke((load_data, senders, self.mime_classifier.clone())); } } diff --git a/components/net_traits/image_cache_task.rs b/components/net_traits/image_cache_task.rs index 852b4ad9c71..905a7de3611 100644 --- a/components/net_traits/image_cache_task.rs +++ b/components/net_traits/image_cache_task.rs @@ -3,6 +3,7 @@ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ use image::base::Image; +use LoadConsumer::Channel; use {ControlMsg, LoadData, ProgressMsg, ResourceTask}; use url::Url; @@ -84,7 +85,7 @@ impl ImageCacheTaskClient for ImageCacheTask { pub fn load_image_data(url: Url, resource_task: ResourceTask, placeholder: &[u8]) -> Result, ()> { let (response_chan, response_port) = channel(); - resource_task.send(ControlMsg::Load(LoadData::new(url.clone(), response_chan))).unwrap(); + resource_task.send(ControlMsg::Load(LoadData::new(url.clone()), Channel(response_chan))).unwrap(); let mut image_data = vec!(); diff --git a/components/net_traits/lib.rs b/components/net_traits/lib.rs index 8a35fc87762..5914922c275 100644 --- a/components/net_traits/lib.rs +++ b/components/net_traits/lib.rs @@ -51,11 +51,10 @@ pub struct LoadData { pub preserved_headers: Headers, pub data: Option>, pub cors: Option, - pub consumer: Sender, } impl LoadData { - pub fn new(url: Url, consumer: Sender) -> LoadData { + pub fn new(url: Url) -> LoadData { LoadData { url: url, method: Method::Get, @@ -63,17 +62,31 @@ impl LoadData { preserved_headers: Headers::new(), data: None, cors: None, - consumer: consumer, } } } +pub trait AsyncResponseListener { + fn headers_available(&self, metadata: Metadata); + fn data_available(&self, payload: Vec); + fn response_complete(&self, status: Result<(), String>); +} + +pub trait AsyncResponseTarget { + fn get_listener(&self) -> &AsyncResponseListener; +} + +pub enum LoadConsumer { + Channel(Sender), + Listener(Box), +} + /// Handle to a resource task pub type ResourceTask = Sender; pub enum ControlMsg { /// Request the data associated with a particular URL - Load(LoadData), + Load(LoadData, LoadConsumer), /// Store a set of cookies for a given originating URL SetCookiesForUrl(Url, String, CookieSource), /// Retrieve the stored cookies for a given URL @@ -159,6 +172,20 @@ pub enum CookieSource { NonHTTP, } +pub enum ResponseSenders { + Channel(Sender), + Listener(Box), +} + +impl ResponseSenders { + pub fn from_consumer(consumer: LoadConsumer) -> ResponseSenders { + match consumer { + LoadConsumer::Channel(c) => ResponseSenders::Channel(c), + LoadConsumer::Listener(l) => ResponseSenders::Listener(l), + } + } +} + /// Messages sent in response to a `Load` message #[derive(PartialEq,Debug)] pub enum ProgressMsg { @@ -172,7 +199,7 @@ pub enum ProgressMsg { pub fn load_whole_resource(resource_task: &ResourceTask, url: Url) -> Result<(Metadata, Vec), String> { let (start_chan, start_port) = channel(); - resource_task.send(ControlMsg::Load(LoadData::new(url, start_chan))).unwrap(); + resource_task.send(ControlMsg::Load(LoadData::new(url), LoadConsumer::Channel(start_chan))).unwrap(); let response = start_port.recv().unwrap(); let mut buf = vec!(); @@ -188,7 +215,7 @@ pub fn load_whole_resource(resource_task: &ResourceTask, url: Url) /// Load a URL asynchronously and iterate over chunks of bytes from the response. pub fn load_bytes_iter(resource_task: &ResourceTask, url: Url) -> (Metadata, ProgressMsgPortIterator) { let (input_chan, input_port) = channel(); - resource_task.send(ControlMsg::Load(LoadData::new(url, input_chan))).unwrap(); + resource_task.send(ControlMsg::Load(LoadData::new(url), LoadConsumer::Channel(input_chan))).unwrap(); let response = input_port.recv().unwrap(); let iter = ProgressMsgPortIterator { progress_port: response.progress_port }; diff --git a/components/script/dom/xmlhttprequest.rs b/components/script/dom/xmlhttprequest.rs index 5d0f76e5463..8cf605ece13 100644 --- a/components/script/dom/xmlhttprequest.rs +++ b/components/script/dom/xmlhttprequest.rs @@ -44,7 +44,7 @@ use js::jsval::{JSVal, NullValue, UndefinedValue}; use net_traits::ControlMsg::Load; use net_traits::ProgressMsg::{Payload, Done}; -use net_traits::{ResourceTask, ResourceCORSData, LoadData, LoadResponse}; +use net_traits::{ResourceTask, ResourceCORSData, LoadData, LoadConsumer}; use cors::{allow_cross_origin_request, CORSRequest, RequestMode}; use util::str::DOMString; use util::task::spawn_named; @@ -213,8 +213,8 @@ impl XMLHttpRequest { #[allow(unsafe_code)] fn fetch(fetch_type: &SyncOrAsync, resource_task: ResourceTask, mut load_data: LoadData, terminate_receiver: Receiver, - cors_request: Result,()>, gen_id: GenerationId, - start_port: Receiver) -> ErrorResult { + cors_request: Result,()>, gen_id: GenerationId) + -> ErrorResult { fn notify_partial_progress(fetch_type: &SyncOrAsync, msg: XHRProgress) { match *fetch_type { @@ -283,7 +283,8 @@ impl XMLHttpRequest { } // Step 10, 13 - resource_task.send(Load(load_data)).unwrap(); + let (start_chan, start_port) = channel(); + resource_task.send(Load(load_data, LoadConsumer::Channel(start_chan))).unwrap(); let progress_port; @@ -579,8 +580,7 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { let global = self.global.root(); let resource_task = global.r().resource_task(); - let (start_chan, start_port) = channel(); - let mut load_data = LoadData::new(self.request_url.borrow().clone().unwrap(), start_chan); + let mut load_data = LoadData::new(self.request_url.borrow().clone().unwrap()); load_data.data = extracted; #[inline] @@ -650,7 +650,7 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { let gen_id = self.generation_id.get(); if self.sync.get() { return XMLHttpRequest::fetch(&mut SyncOrAsync::Sync(self), resource_task, load_data, - terminate_receiver, cors_request, gen_id, start_port); + terminate_receiver, cors_request, gen_id); } else { self.fetch_time.set(time::now().to_timespec().sec); let script_chan = global.r().script_chan(); @@ -665,8 +665,7 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { load_data, terminate_receiver, cors_request, - gen_id, - start_port); + gen_id); }); let timeout = self.timeout.get(); if timeout > 0 { diff --git a/components/script/script_task.rs b/components/script/script_task.rs index 877289a27fd..b304a15afda 100644 --- a/components/script/script_task.rs +++ b/components/script/script_task.rs @@ -61,7 +61,7 @@ use msg::constellation_msg::{ConstellationChan}; use msg::constellation_msg::{LoadData, PipelineId, SubpageId, MozBrowserEvent, WorkerId}; use msg::constellation_msg::{Failure, WindowSizeData, PipelineExitType}; use msg::constellation_msg::Msg as ConstellationMsg; -use net_traits::{ResourceTask, ControlMsg, LoadResponse}; +use net_traits::{ResourceTask, ControlMsg, LoadResponse, LoadConsumer}; use net_traits::LoadData as NetLoadData; use net_traits::image_cache_task::ImageCacheTask; use net_traits::storage_task::StorageTask; @@ -1331,8 +1331,7 @@ impl ScriptTask { preserved_headers: load_data.headers, data: load_data.data, cors: None, - consumer: input_chan, - })).unwrap(); + }, LoadConsumer::Channel(input_chan))).unwrap(); let load_response = input_port.recv().unwrap(); script_chan.send(ScriptMsg::PageFetchComplete(id, subpage, load_response)).unwrap(); diff --git a/components/servo/Cargo.lock b/components/servo/Cargo.lock index 153569455a3..15a82748036 100644 --- a/components/servo/Cargo.lock +++ b/components/servo/Cargo.lock @@ -932,6 +932,7 @@ dependencies = [ "cssparser 0.2.0 (git+https://github.com/servo/rust-cssparser)", "geom 0.1.0 (git+https://github.com/servo/rust-geom)", "gfx 0.0.1", + "hyper 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "net 0.0.1", "net_traits 0.0.1", "profile 0.0.1", diff --git a/tests/unit/net/data_loader.rs b/tests/unit/net/data_loader.rs index 482a7462bea..6f6f8b6c02c 100644 --- a/tests/unit/net/data_loader.rs +++ b/tests/unit/net/data_loader.rs @@ -4,6 +4,7 @@ extern crate hyper; +use net_traits::ResponseSenders::Channel; use net_traits::LoadData; use net_traits::ProgressMsg::{Payload, Done}; use self::hyper::header::ContentType; @@ -19,7 +20,7 @@ fn assert_parse(url: &'static str, use net::data_loader::load; let (start_chan, start_port) = channel(); - load(LoadData::new(Url::parse(url).unwrap(), start_chan)); + load(LoadData::new(Url::parse(url).unwrap()), Channel(start_chan)); let response = start_port.recv().unwrap(); assert_eq!(&response.metadata.content_type, &content_type); diff --git a/tests/unit/net/image_cache_task.rs b/tests/unit/net/image_cache_task.rs index c384411fb99..5b6cac13c09 100644 --- a/tests/unit/net/image_cache_task.rs +++ b/tests/unit/net/image_cache_task.rs @@ -7,7 +7,7 @@ use net_traits::image_cache_task::ImageResponseMsg::*; use net_traits::image_cache_task::Msg::*; use net::resource_task::start_sending; -use net_traits::{ControlMsg, Metadata, ProgressMsg, ResourceTask}; +use net_traits::{ControlMsg, Metadata, ProgressMsg, ResourceTask, ResponseSenders}; use net_traits::image_cache_task::{ImageCacheTask, ImageCacheTaskClient, ImageResponseMsg, Msg}; use net_traits::ProgressMsg::{Payload, Done}; use profile::time; @@ -110,8 +110,8 @@ fn mock_resource_task(on_load: Box) -> ResourceT spawn_listener(move |port: Receiver| { loop { match port.recv().unwrap() { - ControlMsg::Load(response) => { - let chan = start_sending(response.consumer, Metadata::default( + ControlMsg::Load(response, consumer) => { + let chan = start_sending(ResponseSenders::from_consumer(consumer), Metadata::default( Url::parse("file:///fake").unwrap())); on_load.invoke(chan); } @@ -280,8 +280,8 @@ fn should_not_request_image_from_resource_task_if_image_is_already_available() { let mock_resource_task = spawn_listener(move |port: Receiver| { loop { match port.recv().unwrap() { - ControlMsg::Load(response) => { - let chan = start_sending(response.consumer, Metadata::default( + ControlMsg::Load(response, consumer) => { + let chan = start_sending(ResponseSenders::from_consumer(consumer), Metadata::default( Url::parse("file:///fake").unwrap())); chan.send(Payload(test_image_bin())); chan.send(Done(Ok(()))); @@ -329,8 +329,8 @@ fn should_not_request_image_from_resource_task_if_image_fetch_already_failed() { let mock_resource_task = spawn_listener(move |port: Receiver| { loop { match port.recv().unwrap() { - ControlMsg::Load(response) => { - let chan = start_sending(response.consumer, Metadata::default( + ControlMsg::Load(response, consumer) => { + let chan = start_sending(ResponseSenders::from_consumer(consumer), Metadata::default( Url::parse("file:///fake").unwrap())); chan.send(Payload(test_image_bin())); chan.send(Done(Err("".to_string()))); diff --git a/tests/unit/net/resource_task.rs b/tests/unit/net/resource_task.rs index da79322a2ff..d9ef7cde4d3 100644 --- a/tests/unit/net/resource_task.rs +++ b/tests/unit/net/resource_task.rs @@ -3,7 +3,7 @@ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ use net::resource_task::{new_resource_task, parse_hostsfile, replace_hosts}; -use net_traits::{ControlMsg, LoadData}; +use net_traits::{ControlMsg, LoadData, LoadConsumer}; use net_traits::ProgressMsg; use std::borrow::ToOwned; use std::boxed; @@ -23,7 +23,7 @@ fn test_bad_scheme() { let resource_task = new_resource_task(None); let (start_chan, start) = channel(); let url = Url::parse("bogus://whatever").unwrap(); - resource_task.send(ControlMsg::Load(LoadData::new(url, start_chan))).unwrap(); + resource_task.send(ControlMsg::Load(LoadData::new(url), LoadConsumer::Channel(start_chan))).unwrap(); let response = start.recv().unwrap(); match response.progress_port.recv().unwrap() { ProgressMsg::Done(result) => { assert!(result.is_err()) } @@ -173,7 +173,7 @@ fn test_replace_hosts() { let resource_task = new_resource_task(None); let (start_chan, _) = channel(); let url = Url::parse(&format!("http://foo.bar.com:{}", port)).unwrap(); - resource_task.send(ControlMsg::Load(replace_hosts(LoadData::new(url, start_chan), host_table))).unwrap(); + resource_task.send(ControlMsg::Load(replace_hosts(LoadData::new(url), host_table), LoadConsumer::Channel(start_chan))).unwrap(); match listener.accept() { Ok(..) => assert!(true, "received request"), From 7517aac9e989dd9a9795f809a5d489b224f50868 Mon Sep 17 00:00:00 2001 From: Josh Matthews Date: Sun, 25 Jan 2015 11:10:49 -0500 Subject: [PATCH 02/11] Completely abstract sending responses over channels vs. listeners. --- components/net/file_loader.rs | 7 +++--- components/net/resource_task.rs | 39 ++++++++++++++++++++++++------ components/net_traits/lib.rs | 20 +++++++++++++-- tests/unit/net/image_cache_task.rs | 16 ++++++------ 4 files changed, 61 insertions(+), 21 deletions(-) diff --git a/components/net/file_loader.rs b/components/net/file_loader.rs index 26fda4074c1..2d02d9b4c19 100644 --- a/components/net/file_loader.rs +++ b/components/net/file_loader.rs @@ -2,17 +2,16 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use net_traits::{LoadData, Metadata, ProgressMsg, ResponseSenders}; +use net_traits::{LoadData, Metadata, ResponseSenders}; use net_traits::ProgressMsg::{Payload, Done}; use mime_classifier::MIMEClassifier; -use resource_task::{start_sending, start_sending_sniffed}; +use resource_task::{start_sending, start_sending_sniffed, ProgressSender}; use std::borrow::ToOwned; use std::fs::File; use std::io::Read; use std::path::PathBuf; use std::sync::Arc; -use std::sync::mpsc::Sender; use util::task::spawn_named; static READ_SIZE: usize = 8192; @@ -34,7 +33,7 @@ fn read_block(reader: &mut File) -> Result { } } -fn read_all(reader: &mut File, progress_chan: &Sender) +fn read_all(reader: &mut File, progress_chan: &ProgressSender) -> Result<(), String> { loop { match try!(read_block(reader)) { diff --git a/components/net/resource_task.rs b/components/net/resource_task.rs index 6f0d6994113..0d4bc19a64d 100644 --- a/components/net/resource_task.rs +++ b/components/net/resource_task.rs @@ -13,7 +13,7 @@ use cookie; use mime_classifier::MIMEClassifier; use net_traits::{ControlMsg, LoadData, LoadResponse, ResponseSenders, LoadConsumer}; -use net_traits::{Metadata, ProgressMsg, ResourceTask}; +use net_traits::{Metadata, ProgressMsg, ResourceTask, AsyncResponseTarget, ResponseAction}; use net_traits::ProgressMsg::Done; use util::opts; use util::task::spawn_named; @@ -58,22 +58,44 @@ pub fn global_init() { } } +pub enum ProgressSender { + Channel(Sender), + Listener(Box), +} + +impl ProgressSender { + //XXXjdm return actual error + pub fn send(&self, msg: ProgressMsg) -> Result<(), ()> { + match *self { + ProgressSender::Channel(ref c) => c.send(msg).map_err(|_| ()), + ProgressSender::Listener(ref b) => { + let action = match msg { + ProgressMsg::Payload(buf) => ResponseAction::DataAvailable(buf), + ProgressMsg::Done(status) => ResponseAction::ResponseComplete(status), + }; + b.invoke_with_listener(action); + Ok(()) + } + } + } +} + /// For use by loaders in responding to a Load message. -pub fn start_sending(start_chan: ResponseSenders, metadata: Metadata) -> Sender { +pub fn start_sending(start_chan: ResponseSenders, metadata: Metadata) -> ProgressSender { start_sending_opt(start_chan, metadata).ok().unwrap() } /// For use by loaders in responding to a Load message that allows content sniffing. pub fn start_sending_sniffed(start_chan: ResponseSenders, metadata: Metadata, classifier: Arc, partial_body: &Vec) - -> Sender { + -> ProgressSender { start_sending_sniffed_opt(start_chan, metadata, classifier, partial_body).ok().unwrap() } /// For use by loaders in responding to a Load message that allows content sniffing. pub fn start_sending_sniffed_opt(start_chan: ResponseSenders, mut metadata: Metadata, classifier: Arc, partial_body: &Vec) - -> Result, ()> { + -> Result { if opts::get().sniff_mime_types { // TODO: should be calculated in the resource loader, from pull requeset #4094 let nosniff = false; @@ -94,7 +116,7 @@ pub fn start_sending_sniffed_opt(start_chan: ResponseSenders, mut metadata: Meta } /// For use by loaders in responding to a Load message. -pub fn start_sending_opt(start_chan: ResponseSenders, metadata: Metadata) -> Result, ()> { +pub fn start_sending_opt(start_chan: ResponseSenders, metadata: Metadata) -> Result { match start_chan { ResponseSenders::Channel(start_chan) => { let (progress_chan, progress_port) = channel(); @@ -103,11 +125,14 @@ pub fn start_sending_opt(start_chan: ResponseSenders, metadata: Metadata) -> Res progress_port: progress_port, }); match result { - Ok(_) => Ok(progress_chan), + Ok(_) => Ok(ProgressSender::Channel(progress_chan)), Err(_) => Err(()) } } - ResponseSenders::Listener(_) => panic!(), + ResponseSenders::Listener(target) => { + target.invoke_with_listener(ResponseAction::HeadersAvailable(metadata)); + Ok(ProgressSender::Listener(target)) + } } } diff --git a/components/net_traits/lib.rs b/components/net_traits/lib.rs index 5914922c275..660e3d05320 100644 --- a/components/net_traits/lib.rs +++ b/components/net_traits/lib.rs @@ -72,8 +72,24 @@ pub trait AsyncResponseListener { fn response_complete(&self, status: Result<(), String>); } +pub enum ResponseAction { + HeadersAvailable(Metadata), + DataAvailable(Vec), + ResponseComplete(Result<(), String>) +} + +impl ResponseAction { + pub fn process(self, listener: &AsyncResponseListener) { + match self { + ResponseAction::HeadersAvailable(m) => listener.headers_available(m), + ResponseAction::DataAvailable(d) => listener.data_available(d), + ResponseAction::ResponseComplete(r) => listener.response_complete(r), + } + } +} + pub trait AsyncResponseTarget { - fn get_listener(&self) -> &AsyncResponseListener; + fn invoke_with_listener(&self, action: ResponseAction); } pub enum LoadConsumer { @@ -174,7 +190,7 @@ pub enum CookieSource { pub enum ResponseSenders { Channel(Sender), - Listener(Box), + Listener(Box), } impl ResponseSenders { diff --git a/tests/unit/net/image_cache_task.rs b/tests/unit/net/image_cache_task.rs index 5b6cac13c09..a12e9b2b2e0 100644 --- a/tests/unit/net/image_cache_task.rs +++ b/tests/unit/net/image_cache_task.rs @@ -6,7 +6,7 @@ use net::image_cache_task::*; use net_traits::image_cache_task::ImageResponseMsg::*; use net_traits::image_cache_task::Msg::*; -use net::resource_task::start_sending; +use net::resource_task::{start_sending, ProgressSender}; use net_traits::{ControlMsg, Metadata, ProgressMsg, ResourceTask, ResponseSenders}; use net_traits::image_cache_task::{ImageCacheTask, ImageCacheTaskClient, ImageResponseMsg, Msg}; use net_traits::ProgressMsg::{Payload, Done}; @@ -41,7 +41,7 @@ impl ImageCacheTaskHelper for ImageCacheTask { } trait Closure { - fn invoke(&self, _response: Sender) { } + fn invoke(&self, _response: ProgressSender) { } } struct DoesNothing; impl Closure for DoesNothing { } @@ -50,7 +50,7 @@ struct JustSendOK { url_requested_chan: Sender<()>, } impl Closure for JustSendOK { - fn invoke(&self, response: Sender) { + fn invoke(&self, response: ProgressSender) { self.url_requested_chan.send(()).unwrap(); response.send(Done(Ok(()))).unwrap(); } @@ -58,7 +58,7 @@ impl Closure for JustSendOK { struct SendTestImage; impl Closure for SendTestImage { - fn invoke(&self, response: Sender) { + fn invoke(&self, response: ProgressSender) { response.send(Payload(test_image_bin())).unwrap(); response.send(Done(Ok(()))).unwrap(); } @@ -66,7 +66,7 @@ impl Closure for SendTestImage { struct SendBogusImage; impl Closure for SendBogusImage { - fn invoke(&self, response: Sender) { + fn invoke(&self, response: ProgressSender) { response.send(Payload(vec!())).unwrap(); response.send(Done(Ok(()))).unwrap(); } @@ -74,7 +74,7 @@ impl Closure for SendBogusImage { struct SendTestImageErr; impl Closure for SendTestImageErr { - fn invoke(&self, response: Sender) { + fn invoke(&self, response: ProgressSender) { response.send(Payload(test_image_bin())).unwrap(); response.send(Done(Err("".to_string()))).unwrap(); } @@ -84,7 +84,7 @@ struct WaitSendTestImage { wait_port: Receiver<()>, } impl Closure for WaitSendTestImage { - fn invoke(&self, response: Sender) { + fn invoke(&self, response: ProgressSender) { // Don't send the data until after the client requests // the image self.wait_port.recv().unwrap(); @@ -97,7 +97,7 @@ struct WaitSendTestImageErr { wait_port: Receiver<()>, } impl Closure for WaitSendTestImageErr { - fn invoke(&self, response: Sender) { + fn invoke(&self, response: ProgressSender) { // Don't send the data until after the client requests // the image self.wait_port.recv().unwrap(); From 5c7be5c9c3f593fac14b9eec85b9d697528c78f4 Mon Sep 17 00:00:00 2001 From: Josh Matthews Date: Mon, 26 Jan 2015 11:56:46 +0000 Subject: [PATCH 03/11] Make async XMLHttpRequest requests use async network events. --- components/script/cors.rs | 22 +- components/script/dom/bindings/cell.rs | 1 + components/script/dom/bindings/trace.rs | 16 ++ components/script/dom/xmlhttprequest.rs | 257 ++++++++++++++++++++++-- 4 files changed, 283 insertions(+), 13 deletions(-) diff --git a/components/script/cors.rs b/components/script/cors.rs index e9521296464..0b669ab8bf8 100644 --- a/components/script/cors.rs +++ b/components/script/cors.rs @@ -10,6 +10,7 @@ //! with CORSRequest being expanded into FetchRequest (etc) use std::ascii::AsciiExt; +use std::borrow::ToOwned; use time; use time::{now, Timespec}; @@ -24,6 +25,15 @@ use hyper::method::Method; use hyper::status::StatusClass::Success; use url::{SchemeData, Url}; +use util::task::spawn_named; + +pub trait AsyncCORSResponseListener { + fn response_available(&self, response: CORSResponse); +} + +pub trait AsyncCORSResponseTarget { + fn invoke_with_listener(&self, response: CORSResponse); +} #[derive(Clone)] pub struct CORSRequest { @@ -88,7 +98,17 @@ impl CORSRequest { } } - /// https://fetch.spec.whatwg.org/#concept-http-fetch + pub fn http_fetch_async(&self, listener: Box) { + // TODO: this exists only to make preflight check non-blocking + // perhaps should be handled by the resource task? + let req = self.clone(); + spawn_named("cors".to_owned(), move || { + let response = req.http_fetch(); + listener.invoke_with_listener(response); + }); + } + + /// http://fetch.spec.whatwg.org/#concept-http-fetch /// This method assumes that the CORS flag is set /// This does not perform the full HTTP fetch, rather it handles part of the CORS filtering /// if self.mode is ForcedPreflight, then the CORS-with-forced-preflight diff --git a/components/script/dom/bindings/cell.rs b/components/script/dom/bindings/cell.rs index b64d80cefc3..177fed9397b 100644 --- a/components/script/dom/bindings/cell.rs +++ b/components/script/dom/bindings/cell.rs @@ -16,6 +16,7 @@ use std::cell::{BorrowState, RefCell, Ref, RefMut}; /// /// This extends the API of `core::cell::RefCell` to allow unsafe access in /// certain situations, with dynamic checking in debug builds. +#[derive(Clone)] pub struct DOMRefCell { value: RefCell, } diff --git a/components/script/dom/bindings/trace.rs b/components/script/dom/bindings/trace.rs index 1695f16ead7..3d32abb610a 100644 --- a/components/script/dom/bindings/trace.rs +++ b/components/script/dom/bindings/trace.rs @@ -207,6 +207,16 @@ impl JSTraceable for Option { } } +impl JSTraceable for Result { + #[inline] + fn trace(&self, trc: *mut JSTracer) { + match *self { + Ok(ref inner) => inner.trace(trc), + Err(ref inner) => inner.trace(trc), + } + } +} + impl JSTraceable for HashMap where K: Hash + Eq + JSTraceable, V: JSTraceable, @@ -297,6 +307,12 @@ impl JSTraceable for Box { } } +impl JSTraceable for () { + #[inline] + fn trace(&self, _trc: *mut JSTracer) { + } +} + /// Holds a set of vectors that need to be rooted pub struct RootedCollectionSet { set: Vec>> diff --git a/components/script/dom/xmlhttprequest.rs b/components/script/dom/xmlhttprequest.rs index 8cf605ece13..ecc4c99c5d0 100644 --- a/components/script/dom/xmlhttprequest.rs +++ b/components/script/dom/xmlhttprequest.rs @@ -44,18 +44,21 @@ use js::jsval::{JSVal, NullValue, UndefinedValue}; use net_traits::ControlMsg::Load; use net_traits::ProgressMsg::{Payload, Done}; -use net_traits::{ResourceTask, ResourceCORSData, LoadData, LoadConsumer}; -use cors::{allow_cross_origin_request, CORSRequest, RequestMode}; +use net_traits::{ResourceTask, ResourceCORSData, LoadData, LoadConsumer, AsyncResponseTarget}; +use net_traits::{AsyncResponseListener, ResponseAction, Metadata}; +use cors::{allow_cross_origin_request, CORSRequest, RequestMode, AsyncCORSResponseListener}; +use cors::{AsyncCORSResponseTarget, CORSResponse}; use util::str::DOMString; use util::task::spawn_named; use std::ascii::AsciiExt; use std::borrow::ToOwned; -use std::cell::Cell; +use std::cell::{RefCell, Cell}; use std::sync::mpsc::{Sender, Receiver, channel}; use std::default::Default; use std::old_io::Timer; use std::str::FromStr; +use std::sync::{Mutex, Arc}; use std::time::duration::Duration; use time; use url::{Url, UrlParser}; @@ -159,6 +162,7 @@ pub struct XMLHttpRequest { fetch_time: Cell, terminate_sender: DOMRefCell>>, generation_id: Cell, + response_status: Cell>, } impl XMLHttpRequest { @@ -191,7 +195,8 @@ impl XMLHttpRequest { timer: DOMRefCell::new(Timer::new().unwrap()), fetch_time: Cell::new(0), terminate_sender: DOMRefCell::new(None), - generation_id: Cell::new(GenerationId(0)) + generation_id: Cell::new(GenerationId(0)), + response_status: Cell::new(Ok(())), } } pub fn new(global: GlobalRef) -> Temporary { @@ -210,6 +215,197 @@ impl XMLHttpRequest { xhr.r().process_partial_response(progress); } + #[allow(unsafe_code)] + fn fetch2(xhr: TrustedXHRAddress, script_chan: Box, + resource_task: ResourceTask, load_data: LoadData, sync: bool, + terminate_receiver: Receiver, + cors_request: Result,()>, gen_id: GenerationId) { + let cors_request = match cors_request { + Err(_) => { + // Happens in case of cross-origin non-http URIs + //notify_error_and_return!(Network); + return; //XXXjdm + } + Ok(req) => req, + }; + + #[derive(Clone)] + struct XHRContext { + xhr: TrustedXHRAddress, + gen_id: GenerationId, + cors_request: Option, + buf: DOMRefCell>, + terminate_receiver: Arc>>, + got_response_complete: Cell, + } + + let context = Arc::new(Mutex::new(XHRContext { + xhr: xhr, + cors_request: cors_request.clone(), + gen_id: gen_id, + terminate_receiver: Arc::new(Mutex::new(terminate_receiver)), + buf: DOMRefCell::new(vec!()), + got_response_complete: Cell::new(false), + })); + + if let Some(req) = cors_request { + struct CORSContext { + xhr: Arc>, + load_data: RefCell>, + req: CORSRequest, + script_chan: Box, + resource_task: ResourceTask, + } + + impl AsyncCORSResponseListener for CORSContext { + fn response_available(&self, response: CORSResponse) { + if response.network_error { + //notify_error_and_return!(Network); + return; //XXXjdm + } + + let mut load_data = self.load_data.borrow_mut().take().unwrap(); + load_data.cors = Some(ResourceCORSData { + preflight: self.req.preflight_flag, + origin: self.req.origin.clone() + }); + + initiate_async_xhr(self.xhr.clone(), self.script_chan.clone(), + self.resource_task.clone(), load_data); + } + } + + struct CORSListener { + context: Arc>, + script_chan: Box, + } + + impl AsyncCORSResponseTarget for CORSListener { + fn invoke_with_listener(&self, response: CORSResponse) { + self.script_chan.send(ScriptMsg::RunnableMsg(box CORSRunnable { + context: self.context.clone(), + response: response, + })).unwrap(); + } + } + + struct CORSRunnable { + context: Arc>, + response: CORSResponse, + } + + impl Runnable for CORSRunnable { + fn handler(self: Box) { + let this = *self; + let context = this.context.lock().unwrap(); + context.response_available(this.response); + } + } + + let cors_context = Arc::new(Mutex::new(CORSContext { + xhr: context.clone(), + load_data: RefCell::new(Some(load_data)), + req: req.clone(), + script_chan: script_chan.clone(), + resource_task: resource_task, + })); + + req.http_fetch_async(box CORSListener { + context: cors_context, + script_chan: script_chan + }); + } else { + initiate_async_xhr(context.clone(), script_chan, resource_task, load_data); + } + + impl AsyncResponseListener for XHRContext { + fn headers_available(&self, metadata: Metadata) { + let xhr = self.xhr.to_temporary().root(); + let _decision = xhr.r().process_headers_available(self.cors_request.clone(), + self.gen_id, + metadata); + } + + fn data_available(&self, payload: Vec) { + self.buf.borrow_mut().push_all(payload.as_slice()); + let xhr = self.xhr.to_temporary().root(); + xhr.r().process_data_available(self.gen_id, self.buf.borrow().clone()); + } + + fn response_complete(&self, status: Result<(), String>) { + let xhr = self.xhr.to_temporary().root(); + xhr.r().process_response_complete(self.gen_id, status); + self.got_response_complete.set(true); + } + } + + struct XHRRunnable { + context: Arc>, + action: ResponseAction, + } + + impl Runnable for XHRRunnable { + fn handler(self: Box) { + let this = *self; + + let context = this.context.lock(); + let context = context.unwrap(); + let xhr = context.xhr.to_temporary().root(); + if xhr.r().generation_id.get() != context.gen_id { + return; + } + + { + let terminate_receiver = context.terminate_receiver.lock().unwrap(); + if let Ok(reason) = terminate_receiver.try_recv() { + match reason { + TerminateReason::AbortedOrReopened => return, //Err(Abort) + TerminateReason::TimedOut => { + xhr.r().process_partial_response( + XHRProgress::Errored(context.gen_id, Network)); + return; //Err(Network) + } + } + } + } + + this.action.process(&*context); + } + } + + struct XHRListener { + context: Arc>, + script_chan: Box, + } + + impl AsyncResponseTarget for XHRListener { + fn invoke_with_listener(&self, action: ResponseAction) { + self.script_chan.send(ScriptMsg::RunnableMsg(box XHRRunnable { + context: self.context.clone(), + action: action + })).unwrap(); + } + } + + fn initiate_async_xhr(context: Arc>, + script_chan: Box, + resource_task: ResourceTask, + load_data: LoadData) { + let listener = box XHRListener { + context: context, + script_chan: script_chan, + }; + resource_task.send(Load(load_data, LoadConsumer::Listener(listener))).unwrap(); + } + + if sync { + while !context.lock().unwrap().got_response_complete.get() { + //TODO: spin the event loop + panic!("don't know how to spin the event loop yet"); + } + } + } + #[allow(unsafe_code)] fn fetch(fetch_type: &SyncOrAsync, resource_task: ResourceTask, mut load_data: LoadData, terminate_receiver: Receiver, @@ -659,14 +855,8 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { // inflight events queued up in the script task's port. let addr = Trusted::new(self.global.root().r().get_cx(), self, script_chan.clone()); - spawn_named("XHRTask".to_owned(), move || { - let _ = XMLHttpRequest::fetch(&mut SyncOrAsync::Async(addr, script_chan), - resource_task, - load_data, - terminate_receiver, - cors_request, - gen_id); - }); + XMLHttpRequest::fetch2(addr, script_chan, resource_task, load_data, self.sync.get(), + terminate_receiver, cors_request, gen_id); let timeout = self.timeout.get(); if timeout > 0 { self.set_timeout(timeout); @@ -811,6 +1001,10 @@ pub type TrustedXHRAddress = Trusted; trait PrivateXMLHttpRequestHelpers { fn change_ready_state(self, XMLHttpRequestState); + fn process_headers_available(&self, cors_request: Option, + gen_id: GenerationId, metadata: Metadata) -> Result<(), Error>; + fn process_data_available(self, gen_id: GenerationId, payload: Vec); + fn process_response_complete(self, gen_id: GenerationId, status: Result<(), String>); fn process_partial_response(self, progress: XHRProgress); fn terminate_ongoing_fetch(self); fn insert_trusted_header(self, name: String, value: String); @@ -836,6 +1030,38 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { event.r().fire(target); } + fn process_headers_available(&self, cors_request: Option, + gen_id: GenerationId, metadata: Metadata) -> Result<(), Error> { + match cors_request { + Some(ref req) => { + match metadata.headers { + Some(ref h) if allow_cross_origin_request(req, h) => {}, + _ => { + self.process_partial_response(XHRProgress::Errored(gen_id, Network)); + return Err(Network); + } + } + }, + + _ => {} + }; + // XXXManishearth Clear cache entries in case of a network error + self.process_partial_response(XHRProgress::HeadersReceived(gen_id, + metadata.headers, metadata.status)); + Ok(()) + } + + fn process_data_available(self, gen_id: GenerationId, payload: Vec) { + self.process_partial_response(XHRProgress::Loading(gen_id, ByteString::new(payload))); + } + + fn process_response_complete(self, gen_id: GenerationId, status: Result<(), String>) { + match status { + Ok(()) => self.process_partial_response(XHRProgress::Done(gen_id)), + Err(_) => self.process_partial_response(XHRProgress::Errored(gen_id, Network)), + } + } + fn process_partial_response(self, progress: XHRProgress) { let msg_id = progress.generation_id(); @@ -852,6 +1078,11 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { // Ignore message if it belongs to a terminated fetch return_if_fetch_was_terminated!(); + // Ignore messages coming from previously-errored responses + if self.response_status.get().is_err() { + return; + } + match progress { XHRProgress::HeadersReceived(_, headers, status) => { assert!(self.ready_state.get() == XMLHttpRequestState::Opened); @@ -918,6 +1149,7 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { self.dispatch_response_progress_event("loadend".to_owned()); }, XHRProgress::Errored(_, e) => { + self.response_status.set(Err(())); self.send_flag.set(false); // XXXManishearth set response to NetworkError self.change_ready_state(XMLHttpRequestState::Done); @@ -952,6 +1184,7 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { let GenerationId(prev_id) = self.generation_id.get(); self.generation_id.set(GenerationId(prev_id + 1)); self.terminate_sender.borrow().as_ref().map(|s| s.send(TerminateReason::AbortedOrReopened)); + self.response_status.set(Ok(())); } fn insert_trusted_header(self, name: String, value: String) { From 17a88f1f815ad54364f3a86a9d23050312f35da7 Mon Sep 17 00:00:00 2001 From: Josh Matthews Date: Wed, 4 Mar 2015 15:20:31 -0500 Subject: [PATCH 04/11] Make timeouts for async XHR post a runnable. --- components/script/dom/xmlhttprequest.rs | 65 +++++++++++++++---------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/components/script/dom/xmlhttprequest.rs b/components/script/dom/xmlhttprequest.rs index ecc4c99c5d0..f757775ed44 100644 --- a/components/script/dom/xmlhttprequest.rs +++ b/components/script/dom/xmlhttprequest.rs @@ -218,7 +218,6 @@ impl XMLHttpRequest { #[allow(unsafe_code)] fn fetch2(xhr: TrustedXHRAddress, script_chan: Box, resource_task: ResourceTask, load_data: LoadData, sync: bool, - terminate_receiver: Receiver, cors_request: Result,()>, gen_id: GenerationId) { let cors_request = match cors_request { Err(_) => { @@ -229,13 +228,11 @@ impl XMLHttpRequest { Ok(req) => req, }; - #[derive(Clone)] struct XHRContext { xhr: TrustedXHRAddress, gen_id: GenerationId, cors_request: Option, buf: DOMRefCell>, - terminate_receiver: Arc>>, got_response_complete: Cell, } @@ -243,7 +240,6 @@ impl XMLHttpRequest { xhr: xhr, cors_request: cors_request.clone(), gen_id: gen_id, - terminate_receiver: Arc::new(Mutex::new(terminate_receiver)), buf: DOMRefCell::new(vec!()), got_response_complete: Cell::new(false), })); @@ -260,8 +256,10 @@ impl XMLHttpRequest { impl AsyncCORSResponseListener for CORSContext { fn response_available(&self, response: CORSResponse) { if response.network_error { - //notify_error_and_return!(Network); - return; //XXXjdm + let context = self.xhr.lock().unwrap(); + let xhr = context.xhr.to_temporary().root(); + xhr.r().process_partial_response(XHRProgress::Errored(context.gen_id, Network)); + return; //XXXjdm Err(Network) } let mut load_data = self.load_data.borrow_mut().take().unwrap(); @@ -348,27 +346,12 @@ impl XMLHttpRequest { fn handler(self: Box) { let this = *self; - let context = this.context.lock(); - let context = context.unwrap(); + let context = this.context.lock().unwrap(); let xhr = context.xhr.to_temporary().root(); if xhr.r().generation_id.get() != context.gen_id { return; } - { - let terminate_receiver = context.terminate_receiver.lock().unwrap(); - if let Ok(reason) = terminate_receiver.try_recv() { - match reason { - TerminateReason::AbortedOrReopened => return, //Err(Abort) - TerminateReason::TimedOut => { - xhr.r().process_partial_response( - XHRProgress::Errored(context.gen_id, Network)); - return; //Err(Network) - } - } - } - } - this.action.process(&*context); } } @@ -856,7 +839,7 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { let addr = Trusted::new(self.global.root().r().get_cx(), self, script_chan.clone()); XMLHttpRequest::fetch2(addr, script_chan, resource_task, load_data, self.sync.get(), - terminate_receiver, cors_request, gen_id); + cors_request, gen_id); let timeout = self.timeout.get(); if timeout > 0 { self.set_timeout(timeout); @@ -1015,6 +998,7 @@ trait PrivateXMLHttpRequestHelpers { fn set_timeout(self, timeout:u32); fn cancel_timeout(self); fn filter_response_headers(self) -> Headers; + fn discard_subsequent_responses(self); } impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { @@ -1078,7 +1062,7 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { // Ignore message if it belongs to a terminated fetch return_if_fetch_was_terminated!(); - // Ignore messages coming from previously-errored responses + // Ignore messages coming from previously-errored responses or requests that have timed out if self.response_status.get().is_err() { return; } @@ -1134,6 +1118,8 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { self.ready_state.get() == XMLHttpRequestState::Loading || self.sync.get()); + self.cancel_timeout(); + // Part of step 11, send() (processing response end of file) // XXXManishearth handle errors, if any (substep 2) @@ -1149,7 +1135,9 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { self.dispatch_response_progress_event("loadend".to_owned()); }, XHRProgress::Errored(_, e) => { - self.response_status.set(Err(())); + self.cancel_timeout(); + + self.discard_subsequent_responses(); self.send_flag.set(false); // XXXManishearth set response to NetworkError self.change_ready_state(XMLHttpRequestState::Done); @@ -1222,14 +1210,37 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { self.dispatch_progress_event(false, type_, len, total); } fn set_timeout(self, timeout: u32) { + struct XHRTimeout { + xhr: TrustedXHRAddress, + gen_id: GenerationId, + } + + impl Runnable for XHRTimeout { + fn handler(self: Box) { + let this = *self; + let xhr = this.xhr.to_temporary().root(); + if xhr.r().ready_state.get() != XMLHttpRequestState::Done { + xhr.r().process_partial_response(XHRProgress::Errored(this.gen_id, Timeout)); + } + } + } + // Sets up the object to timeout in a given number of milliseconds // This will cancel all previous timeouts let oneshot = self.timer.borrow_mut() .oneshot(Duration::milliseconds(timeout as i64)); let terminate_sender = (*self.terminate_sender.borrow()).clone(); + let global = self.global.root(); + let script_chan = global.r().script_chan(); + let xhr = Trusted::new(global.r().get_cx(), self, global.r().script_chan()); + let gen_id = self.generation_id.get(); spawn_named("XHR:Timer".to_owned(), move || { match oneshot.recv() { Ok(_) => { + script_chan.send(ScriptMsg::RunnableMsg(box XHRTimeout { + xhr: xhr, + gen_id: gen_id, + })).unwrap(); terminate_sender.map(|s| s.send(TerminateReason::TimedOut)); }, Err(_) => { @@ -1297,6 +1308,10 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { // XXXManishearth additional CORS filtering goes here headers } + + fn discard_subsequent_responses(self) { + self.response_status.set(Err(())); + } } trait Extractable { From 2ee21ddbe79fa3a3b01f446ad1e6d23e67c68e46 Mon Sep 17 00:00:00 2001 From: Josh Matthews Date: Wed, 4 Mar 2015 15:38:25 -0500 Subject: [PATCH 05/11] Make the fetch method only handle sync XHRs. --- components/script/dom/xmlhttprequest.rs | 54 +++++-------------------- 1 file changed, 11 insertions(+), 43 deletions(-) diff --git a/components/script/dom/xmlhttprequest.rs b/components/script/dom/xmlhttprequest.rs index f757775ed44..11766cf26da 100644 --- a/components/script/dom/xmlhttprequest.rs +++ b/components/script/dom/xmlhttprequest.rs @@ -77,24 +77,6 @@ enum XMLHttpRequestState { Done = 4, } -struct XHRProgressHandler { - addr: TrustedXHRAddress, - progress: XHRProgress, -} - -impl XHRProgressHandler { - fn new(addr: TrustedXHRAddress, progress: XHRProgress) -> XHRProgressHandler { - XHRProgressHandler { addr: addr, progress: progress } - } -} - -impl Runnable for XHRProgressHandler { - fn handler(self: Box) { - let this = *self; - XMLHttpRequest::handle_progress(this.addr, this.progress); - } -} - #[derive(PartialEq, Clone, Copy)] #[jstraceable] pub struct GenerationId(u32); @@ -122,11 +104,6 @@ impl XHRProgress { } } -enum SyncOrAsync<'a> { - Sync(JSRef<'a, XMLHttpRequest>), - Async(TrustedXHRAddress, Box) -} - enum TerminateReason { AbortedOrReopened, TimedOut, @@ -222,8 +199,10 @@ impl XMLHttpRequest { let cors_request = match cors_request { Err(_) => { // Happens in case of cross-origin non-http URIs - //notify_error_and_return!(Network); - return; //XXXjdm + let xhr = xhr.to_temporary().root(); + xhr.r().process_partial_response(XHRProgress::Errored( + xhr.r().generation_id.get(), Network)); + return; //XXXjdm Err(Network) } Ok(req) => req, }; @@ -390,25 +369,14 @@ impl XMLHttpRequest { } #[allow(unsafe_code)] - fn fetch(fetch_type: &SyncOrAsync, resource_task: ResourceTask, + fn fetch(xhr: JSRef, resource_task: ResourceTask, mut load_data: LoadData, terminate_receiver: Receiver, cors_request: Result,()>, gen_id: GenerationId) -> ErrorResult { - fn notify_partial_progress(fetch_type: &SyncOrAsync, msg: XHRProgress) { - match *fetch_type { - SyncOrAsync::Sync(xhr) => { - xhr.process_partial_response(msg); - }, - SyncOrAsync::Async(ref addr, ref script_chan) => { - script_chan.send(ScriptMsg::RunnableMsg(box XHRProgressHandler::new(addr.clone(), msg))).unwrap(); - } - } - } - macro_rules! notify_error_and_return( ($err:expr) => ({ - notify_partial_progress(fetch_type, XHRProgress::Errored(gen_id, $err)); + xhr.process_partial_response(XHRProgress::Errored(gen_id, $err)); return Err($err) }); ); @@ -481,7 +449,7 @@ impl XMLHttpRequest { _ => {} }; // XXXManishearth Clear cache entries in case of a network error - notify_partial_progress(fetch_type, XHRProgress::HeadersReceived(gen_id, + xhr.process_partial_response(XHRProgress::HeadersReceived(gen_id, response.metadata.headers.clone(), response.metadata.status.clone())); progress_port = response.progress_port; @@ -506,11 +474,11 @@ impl XMLHttpRequest { progress = progress_port.recv() => match progress.unwrap() { Payload(data) => { buf.push_all(data.as_slice()); - notify_partial_progress(fetch_type, - XHRProgress::Loading(gen_id, ByteString::new(buf.clone()))); + xhr.process_partial_response(XHRProgress::Loading( + gen_id, ByteString::new(buf.clone()))); }, Done(Ok(())) => { - notify_partial_progress(fetch_type, XHRProgress::Done(gen_id)); + xhr.process_partial_response(XHRProgress::Done(gen_id)); return Ok(()); }, Done(Err(_)) => { @@ -828,7 +796,7 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { let gen_id = self.generation_id.get(); if self.sync.get() { - return XMLHttpRequest::fetch(&mut SyncOrAsync::Sync(self), resource_task, load_data, + return XMLHttpRequest::fetch(self, resource_task, load_data, terminate_receiver, cors_request, gen_id); } else { self.fetch_time.set(time::now().to_timespec().sec); From 01e66035ffb621e57466f0a2ff73d0cbd795eb1c Mon Sep 17 00:00:00 2001 From: Josh Matthews Date: Thu, 5 Mar 2015 09:22:58 -0500 Subject: [PATCH 06/11] Implement sync XHR by creating and spinning on-demand event loops. --- components/script/dom/bindings/global.rs | 20 +- .../script/dom/dedicatedworkerglobalscope.rs | 19 +- components/script/dom/window.rs | 7 +- components/script/dom/workerglobalscope.rs | 22 +- components/script/dom/xmlhttprequest.rs | 331 ++++++++++-------- components/script/script_task.rs | 26 ++ 6 files changed, 269 insertions(+), 156 deletions(-) diff --git a/components/script/dom/bindings/global.rs b/components/script/dom/bindings/global.rs index 233143851e6..bff4c66f89b 100644 --- a/components/script/dom/bindings/global.rs +++ b/components/script/dom/bindings/global.rs @@ -12,8 +12,8 @@ use dom::bindings::js::{JS, JSRef, Root, Unrooted}; use dom::bindings::utils::{Reflectable, Reflector}; use dom::workerglobalscope::{WorkerGlobalScope, WorkerGlobalScopeHelpers}; use dom::window::{self, WindowHelpers}; -use script_task::ScriptChan; use devtools_traits::DevtoolsControlChan; +use script_task::{ScriptChan, ScriptPort, ScriptMsg, ScriptTask}; use msg::constellation_msg::{PipelineId, WorkerId}; use net_traits::ResourceTask; @@ -129,6 +129,24 @@ impl<'a> GlobalRef<'a> { GlobalRef::Worker(ref worker) => worker.script_chan(), } } + + /// `ScriptChan` used to send messages to the event loop of this global's + /// thread. + pub fn new_script_pair(&self) -> (Box, Box) { + match *self { + GlobalRef::Window(ref window) => window.new_script_pair(), + GlobalRef::Worker(ref worker) => worker.new_script_pair(), + } + } + + /// Process a single event as if it were the next event in the task queue for + /// this global. + pub fn process_event(&self, msg: ScriptMsg) { + match *self { + GlobalRef::Window(_) => ScriptTask::process_event(msg), + GlobalRef::Worker(ref worker) => worker.process_event(msg), + } + } } impl<'a> Reflectable for GlobalRef<'a> { diff --git a/components/script/dom/dedicatedworkerglobalscope.rs b/components/script/dom/dedicatedworkerglobalscope.rs index 83168d44e2d..8ab0621e0f9 100644 --- a/components/script/dom/dedicatedworkerglobalscope.rs +++ b/components/script/dom/dedicatedworkerglobalscope.rs @@ -21,7 +21,7 @@ use dom::messageevent::MessageEvent; use dom::worker::{TrustedWorkerAddress, WorkerMessageHandler, WorkerEventHandler, WorkerErrorHandler}; use dom::workerglobalscope::{WorkerGlobalScope, WorkerGlobalScopeHelpers}; use dom::workerglobalscope::WorkerGlobalScopeTypeId; -use script_task::{ScriptTask, ScriptChan, ScriptMsg, TimerSource}; +use script_task::{ScriptTask, ScriptChan, ScriptMsg, TimerSource, ScriptPort}; use script_task::StackRootTLS; use msg::constellation_msg::PipelineId; @@ -38,7 +38,7 @@ use js::jsval::JSVal; use js::rust::Cx; use std::rc::Rc; -use std::sync::mpsc::{Sender, Receiver}; +use std::sync::mpsc::{Sender, Receiver, channel}; use url::Url; /// A ScriptChan that can be cloned freely and will silently send a TrustedWorkerAddress with @@ -198,6 +198,8 @@ impl DedicatedWorkerGlobalScope { pub trait DedicatedWorkerGlobalScopeHelpers { fn script_chan(self) -> Box; fn pipeline(self) -> PipelineId; + fn new_script_pair(self) -> (Box, Box); + fn process_event(self, msg: ScriptMsg); } impl<'a> DedicatedWorkerGlobalScopeHelpers for JSRef<'a, DedicatedWorkerGlobalScope> { @@ -213,6 +215,19 @@ impl<'a> DedicatedWorkerGlobalScopeHelpers for JSRef<'a, DedicatedWorkerGlobalSc fn pipeline(self) -> PipelineId { self.id } + + fn new_script_pair(self) -> (Box, Box) { + let (tx, rx) = channel(); + let chan = box SendableWorkerScriptChan { + sender: tx, + worker: self.worker.borrow().as_ref().unwrap().clone(), + }; + (chan, box rx) + } + + fn process_event(self, msg: ScriptMsg) { + self.handle_event(msg); + } } trait PrivateDedicatedWorkerGlobalScopeHelpers { diff --git a/components/script/dom/window.rs b/components/script/dom/window.rs index f4e2f5b0732..b46a5af952d 100644 --- a/components/script/dom/window.rs +++ b/components/script/dom/window.rs @@ -29,7 +29,7 @@ use dom::storage::Storage; use layout_interface::{ReflowGoal, ReflowQueryType, LayoutRPC, LayoutChan, Reflow, Msg}; use layout_interface::{ContentBoxResponse, ContentBoxesResponse, ScriptReflow}; use page::Page; -use script_task::{TimerSource, ScriptChan}; +use script_task::{TimerSource, ScriptChan, ScriptPort, NonWorkerScriptChan}; use script_task::ScriptMsg; use script_traits::ScriptControlChan; use timers::{IsInterval, TimerId, TimerManager, TimerCallback}; @@ -198,6 +198,11 @@ impl Window { self.parent_info } + pub fn new_script_pair(&self) -> (Box, Box) { + let (tx, rx) = channel(); + (box NonWorkerScriptChan(tx), box rx) + } + pub fn control_chan<'a>(&'a self) -> &'a ScriptControlChan { &self.control_chan } diff --git a/components/script/dom/workerglobalscope.rs b/components/script/dom/workerglobalscope.rs index 76fd85e0603..d22f45c971a 100644 --- a/components/script/dom/workerglobalscope.rs +++ b/components/script/dom/workerglobalscope.rs @@ -16,7 +16,7 @@ use dom::eventtarget::{EventTarget, EventTargetTypeId}; use dom::workerlocation::WorkerLocation; use dom::workernavigator::WorkerNavigator; use dom::window::{base64_atob, base64_btoa}; -use script_task::{ScriptChan, TimerSource}; +use script_task::{ScriptChan, TimerSource, ScriptPort, ScriptMsg}; use timers::{IsInterval, TimerId, TimerManager, TimerCallback}; use devtools_traits::DevtoolsControlChan; @@ -216,6 +216,8 @@ pub trait WorkerGlobalScopeHelpers { fn handle_fire_timer(self, timer_id: TimerId); fn script_chan(self) -> Box; fn pipeline(self) -> PipelineId; + fn new_script_pair(self) -> (Box, Box); + fn process_event(self, msg: ScriptMsg); fn get_cx(self) -> *mut JSContext; } @@ -238,6 +240,24 @@ impl<'a> WorkerGlobalScopeHelpers for JSRef<'a, WorkerGlobalScope> { } } + fn new_script_pair(self) -> (Box, Box) { + let dedicated: Option> = + DedicatedWorkerGlobalScopeCast::to_ref(self); + match dedicated { + Some(dedicated) => dedicated.new_script_pair(), + None => panic!("need to implement creating isolated event loops for SharedWorker"), + } + } + + fn process_event(self, msg: ScriptMsg) { + let dedicated: Option> = + DedicatedWorkerGlobalScopeCast::to_ref(self); + match dedicated { + Some(dedicated) => dedicated.process_event(msg), + None => panic!("need to implement processing single events for SharedWorker"), + } + } + fn handle_fire_timer(self, timer_id: TimerId) { self.timers.fire_timer(timer_id, self); } diff --git a/components/script/dom/xmlhttprequest.rs b/components/script/dom/xmlhttprequest.rs index 11766cf26da..9f90bc2a628 100644 --- a/components/script/dom/xmlhttprequest.rs +++ b/components/script/dom/xmlhttprequest.rs @@ -26,7 +26,7 @@ use dom::urlsearchparams::URLSearchParamsHelpers; use dom::xmlhttprequesteventtarget::XMLHttpRequestEventTarget; use dom::xmlhttprequesteventtarget::XMLHttpRequestEventTargetTypeId; use dom::xmlhttprequestupload::XMLHttpRequestUpload; -use script_task::{ScriptChan, ScriptMsg, Runnable}; +use script_task::{ScriptChan, ScriptMsg, Runnable, ScriptPort}; use encoding::all::UTF_8; use encoding::label::encoding_from_whatwg_label; @@ -81,6 +81,16 @@ enum XMLHttpRequestState { #[jstraceable] pub struct GenerationId(u32); +/// Closure of required data for each async network event that comprises the +/// XHR's response. +struct XHRContext { + xhr: TrustedXHRAddress, + gen_id: GenerationId, + cors_request: Option, + buf: DOMRefCell>, + sync_status: DOMRefCell>, +} + #[derive(Clone)] pub enum XHRProgress { /// Notify that headers have been received @@ -138,6 +148,7 @@ pub struct XMLHttpRequest { timer: DOMRefCell, fetch_time: Cell, terminate_sender: DOMRefCell>>, + timeout_target: DOMRefCell>>, generation_id: Cell, response_status: Cell>, } @@ -172,6 +183,7 @@ impl XMLHttpRequest { timer: DOMRefCell::new(Timer::new().unwrap()), fetch_time: Cell::new(0), terminate_sender: DOMRefCell::new(None), + timeout_target: DOMRefCell::new(None), generation_id: Cell::new(GenerationId(0)), response_status: Cell::new(Ok(())), } @@ -187,120 +199,94 @@ impl XMLHttpRequest { Ok(XMLHttpRequest::new(global)) } - pub fn handle_progress(addr: TrustedXHRAddress, progress: XHRProgress) { - let xhr = addr.to_temporary().root(); - xhr.r().process_partial_response(progress); - } - - #[allow(unsafe_code)] - fn fetch2(xhr: TrustedXHRAddress, script_chan: Box, - resource_task: ResourceTask, load_data: LoadData, sync: bool, - cors_request: Result,()>, gen_id: GenerationId) { - let cors_request = match cors_request { - Err(_) => { - // Happens in case of cross-origin non-http URIs - let xhr = xhr.to_temporary().root(); - xhr.r().process_partial_response(XHRProgress::Errored( - xhr.r().generation_id.get(), Network)); - return; //XXXjdm Err(Network) - } - Ok(req) => req, - }; - - struct XHRContext { - xhr: TrustedXHRAddress, - gen_id: GenerationId, - cors_request: Option, - buf: DOMRefCell>, - got_response_complete: Cell, + fn check_cors(context: Arc>, + load_data: LoadData, + req: CORSRequest, + script_chan: Box, + resource_task: ResourceTask) { + struct CORSContext { + xhr: Arc>, + load_data: RefCell>, + req: CORSRequest, + script_chan: Box, + resource_task: ResourceTask, } - let context = Arc::new(Mutex::new(XHRContext { - xhr: xhr, - cors_request: cors_request.clone(), - gen_id: gen_id, - buf: DOMRefCell::new(vec!()), - got_response_complete: Cell::new(false), + impl AsyncCORSResponseListener for CORSContext { + fn response_available(&self, response: CORSResponse) { + if response.network_error { + let mut context = self.xhr.lock().unwrap(); + let xhr = context.xhr.to_temporary().root(); + xhr.r().process_partial_response(XHRProgress::Errored(context.gen_id, Network)); + *context.sync_status.borrow_mut() = Some(Err(Network)); + return; + } + + let mut load_data = self.load_data.borrow_mut().take().unwrap(); + load_data.cors = Some(ResourceCORSData { + preflight: self.req.preflight_flag, + origin: self.req.origin.clone() + }); + + XMLHttpRequest::initiate_async_xhr(self.xhr.clone(), self.script_chan.clone(), + self.resource_task.clone(), load_data); + } + } + + struct CORSListener { + context: Arc>, + script_chan: Box, + } + + impl AsyncCORSResponseTarget for CORSListener { + fn invoke_with_listener(&self, response: CORSResponse) { + self.script_chan.send(ScriptMsg::RunnableMsg(box CORSRunnable { + context: self.context.clone(), + response: response, + })).unwrap(); + } + } + + struct CORSRunnable { + context: Arc>, + response: CORSResponse, + } + + impl Runnable for CORSRunnable { + fn handler(self: Box) { + let this = *self; + let context = this.context.lock().unwrap(); + context.response_available(this.response); + } + } + + let cors_context = Arc::new(Mutex::new(CORSContext { + xhr: context, + load_data: RefCell::new(Some(load_data)), + req: req.clone(), + script_chan: script_chan.clone(), + resource_task: resource_task, })); - if let Some(req) = cors_request { - struct CORSContext { - xhr: Arc>, - load_data: RefCell>, - req: CORSRequest, - script_chan: Box, - resource_task: ResourceTask, - } - - impl AsyncCORSResponseListener for CORSContext { - fn response_available(&self, response: CORSResponse) { - if response.network_error { - let context = self.xhr.lock().unwrap(); - let xhr = context.xhr.to_temporary().root(); - xhr.r().process_partial_response(XHRProgress::Errored(context.gen_id, Network)); - return; //XXXjdm Err(Network) - } - - let mut load_data = self.load_data.borrow_mut().take().unwrap(); - load_data.cors = Some(ResourceCORSData { - preflight: self.req.preflight_flag, - origin: self.req.origin.clone() - }); - - initiate_async_xhr(self.xhr.clone(), self.script_chan.clone(), - self.resource_task.clone(), load_data); - } - } - - struct CORSListener { - context: Arc>, - script_chan: Box, - } - - impl AsyncCORSResponseTarget for CORSListener { - fn invoke_with_listener(&self, response: CORSResponse) { - self.script_chan.send(ScriptMsg::RunnableMsg(box CORSRunnable { - context: self.context.clone(), - response: response, - })).unwrap(); - } - } - - struct CORSRunnable { - context: Arc>, - response: CORSResponse, - } - - impl Runnable for CORSRunnable { - fn handler(self: Box) { - let this = *self; - let context = this.context.lock().unwrap(); - context.response_available(this.response); - } - } - - let cors_context = Arc::new(Mutex::new(CORSContext { - xhr: context.clone(), - load_data: RefCell::new(Some(load_data)), - req: req.clone(), - script_chan: script_chan.clone(), - resource_task: resource_task, - })); - - req.http_fetch_async(box CORSListener { - context: cors_context, - script_chan: script_chan - }); - } else { - initiate_async_xhr(context.clone(), script_chan, resource_task, load_data); - } + req.http_fetch_async(box CORSListener { + context: cors_context, + script_chan: script_chan + }); + } + fn initiate_async_xhr(context: Arc>, + script_chan: Box, + resource_task: ResourceTask, + load_data: LoadData) { impl AsyncResponseListener for XHRContext { fn headers_available(&self, metadata: Metadata) { let xhr = self.xhr.to_temporary().root(); - let _decision = xhr.r().process_headers_available(self.cors_request.clone(), - self.gen_id, - metadata); + let rv = xhr.r().process_headers_available(self.cors_request.clone(), + self.gen_id, + metadata); + if rv.is_err() { + *self.sync_status.borrow_mut() = Some(rv); + } } fn data_available(&self, payload: Vec) { @@ -311,8 +297,8 @@ impl XMLHttpRequest { fn response_complete(&self, status: Result<(), String>) { let xhr = self.xhr.to_temporary().root(); - xhr.r().process_response_complete(self.gen_id, status); - self.got_response_complete.set(true); + let rv = xhr.r().process_response_complete(self.gen_id, status); + *self.sync_status.borrow_mut() = Some(rv); } } @@ -349,23 +335,11 @@ impl XMLHttpRequest { } } - fn initiate_async_xhr(context: Arc>, - script_chan: Box, - resource_task: ResourceTask, - load_data: LoadData) { - let listener = box XHRListener { - context: context, - script_chan: script_chan, - }; - resource_task.send(Load(load_data, LoadConsumer::Listener(listener))).unwrap(); - } - - if sync { - while !context.lock().unwrap().got_response_complete.get() { - //TODO: spin the event loop - panic!("don't know how to spin the event loop yet"); - } - } + let listener = box XHRListener { + context: context, + script_chan: script_chan, + }; + resource_task.send(Load(load_data, LoadConsumer::Listener(listener))).unwrap(); } #[allow(unsafe_code)] @@ -725,8 +699,6 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { } - let global = self.global.root(); - let resource_task = global.r().resource_task(); let mut load_data = LoadData::new(self.request_url.borrow().clone().unwrap()); load_data.data = extracted; @@ -764,6 +736,7 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { *self.terminate_sender.borrow_mut() = Some(terminate_sender); // CORS stuff + let global = self.global.root(); let referer_url = self.global.root().r().get_url(); let mode = if self.upload_events.get() { RequestMode::ForcedPreflight @@ -794,24 +767,15 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { debug!("request_headers = {:?}", *self.request_headers.borrow()); - let gen_id = self.generation_id.get(); + self.fetch_time.set(time::now().to_timespec().sec); + let rv = self.fetch2(load_data, cors_request, global.r()); if self.sync.get() { - return XMLHttpRequest::fetch(self, resource_task, load_data, - terminate_receiver, cors_request, gen_id); - } else { - self.fetch_time.set(time::now().to_timespec().sec); - let script_chan = global.r().script_chan(); - // Pin the object before launching the fetch task. This is to ensure that - // the object will stay alive as long as there are (possibly cancelled) - // inflight events queued up in the script task's port. - let addr = Trusted::new(self.global.root().r().get_cx(), self, - script_chan.clone()); - XMLHttpRequest::fetch2(addr, script_chan, resource_task, load_data, self.sync.get(), - cors_request, gen_id); - let timeout = self.timeout.get(); - if timeout > 0 { - self.set_timeout(timeout); - } + return rv; + } + + let timeout = self.timeout.get(); + if timeout > 0 { + self.set_timeout(timeout); } Ok(()) } @@ -955,7 +919,7 @@ trait PrivateXMLHttpRequestHelpers { fn process_headers_available(&self, cors_request: Option, gen_id: GenerationId, metadata: Metadata) -> Result<(), Error>; fn process_data_available(self, gen_id: GenerationId, payload: Vec); - fn process_response_complete(self, gen_id: GenerationId, status: Result<(), String>); + fn process_response_complete(self, gen_id: GenerationId, status: Result<(), String>) -> ErrorResult; fn process_partial_response(self, progress: XHRProgress); fn terminate_ongoing_fetch(self); fn insert_trusted_header(self, name: String, value: String); @@ -967,6 +931,8 @@ trait PrivateXMLHttpRequestHelpers { fn cancel_timeout(self); fn filter_response_headers(self) -> Headers; fn discard_subsequent_responses(self); + fn fetch2(self, load_data: LoadData, cors_request: Result,()>, + global: GlobalRef) -> ErrorResult; } impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { @@ -1007,10 +973,17 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { self.process_partial_response(XHRProgress::Loading(gen_id, ByteString::new(payload))); } - fn process_response_complete(self, gen_id: GenerationId, status: Result<(), String>) { + fn process_response_complete(self, gen_id: GenerationId, status: Result<(), String>) + -> ErrorResult { match status { - Ok(()) => self.process_partial_response(XHRProgress::Done(gen_id)), - Err(_) => self.process_partial_response(XHRProgress::Errored(gen_id, Network)), + Ok(()) => { + self.process_partial_response(XHRProgress::Done(gen_id)); + Ok(()) + }, + Err(_) => { + self.process_partial_response(XHRProgress::Errored(gen_id, Network)); + Err(Network) + } } } @@ -1140,6 +1113,7 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { let GenerationId(prev_id) = self.generation_id.get(); self.generation_id.set(GenerationId(prev_id + 1)); self.terminate_sender.borrow().as_ref().map(|s| s.send(TerminateReason::AbortedOrReopened)); + *self.timeout_target.borrow_mut() = None; self.response_status.set(Ok(())); } @@ -1198,14 +1172,14 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { let oneshot = self.timer.borrow_mut() .oneshot(Duration::milliseconds(timeout as i64)); let terminate_sender = (*self.terminate_sender.borrow()).clone(); + let timeout_target = (*self.timeout_target.borrow().as_ref().unwrap()).clone(); let global = self.global.root(); - let script_chan = global.r().script_chan(); let xhr = Trusted::new(global.r().get_cx(), self, global.r().script_chan()); let gen_id = self.generation_id.get(); spawn_named("XHR:Timer".to_owned(), move || { match oneshot.recv() { Ok(_) => { - script_chan.send(ScriptMsg::RunnableMsg(box XHRTimeout { + timeout_target.send(ScriptMsg::RunnableMsg(box XHRTimeout { xhr: xhr, gen_id: gen_id, })).unwrap(); @@ -1280,6 +1254,61 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { fn discard_subsequent_responses(self) { self.response_status.set(Err(())); } + + #[allow(unsafe_code)] + fn fetch2(self, + load_data: LoadData, + cors_request: Result,()>, + global: GlobalRef) -> ErrorResult { + let cors_request = match cors_request { + Err(_) => { + // Happens in case of cross-origin non-http URIs + self.process_partial_response(XHRProgress::Errored( + self.generation_id.get(), Network)); + return Err(Network); + } + Ok(req) => req, + }; + + let xhr = Trusted::new(global.get_cx(), self, global.script_chan()); + + let context = Arc::new(Mutex::new(XHRContext { + xhr: xhr, + cors_request: cors_request.clone(), + gen_id: self.generation_id.get(), + buf: DOMRefCell::new(vec!()), + sync_status: DOMRefCell::new(None), + })); + + let (script_chan, script_port) = if self.sync.get() { + let (tx, rx) = global.new_script_pair(); + (tx, Some(rx)) + } else { + (global.script_chan(), None) + }; + *self.timeout_target.borrow_mut() = Some(script_chan.clone()); + + let resource_task = global.resource_task(); + if let Some(req) = cors_request { + XMLHttpRequest::check_cors(context.clone(), load_data, req.clone(), + script_chan.clone(), resource_task); + } else { + XMLHttpRequest::initiate_async_xhr(context.clone(), script_chan, + resource_task, load_data); + } + + if let Some(script_port) = script_port { + loop { + global.process_event(script_port.recv()); + let context = context.lock().unwrap(); + let sync_status = context.sync_status.borrow(); + if let Some(ref status) = *sync_status { + return status.clone(); + } + } + } + Ok(()) + } } trait Extractable { diff --git a/components/script/script_task.rs b/components/script/script_task.rs index b304a15afda..e43ca69a981 100644 --- a/components/script/script_task.rs +++ b/components/script/script_task.rs @@ -38,6 +38,7 @@ use dom::uievent::UIEvent; use dom::eventtarget::EventTarget; use dom::node::{self, Node, NodeHelpers, NodeDamage, window_from_node}; use dom::window::{Window, WindowHelpers, ScriptHelpers, ReflowReason}; +use dom::worker::TrustedWorkerAddress; use parse::html::{HTMLInput, parse_html}; use layout_interface::{ScriptLayoutChan, LayoutChan, ReflowGoal, ReflowQueryType}; use layout_interface; @@ -200,6 +201,22 @@ pub trait ScriptChan { fn clone(&self) -> Box; } +pub trait ScriptPort { + fn recv(&self) -> ScriptMsg; +} + +impl ScriptPort for Receiver { + fn recv(&self) -> ScriptMsg { + self.recv().unwrap() + } +} + +impl ScriptPort for Receiver<(TrustedWorkerAddress, ScriptMsg)> { + fn recv(&self) -> ScriptMsg { + self.recv().unwrap().1 + } +} + /// Encapsulates internal communication within the script task. #[jstraceable] pub struct NonWorkerScriptChan(pub Sender); @@ -403,6 +420,15 @@ unsafe extern "C" fn debug_gc_callback(_rt: *mut JSRuntime, status: JSGCStatus) } impl ScriptTask { + pub fn process_event(msg: ScriptMsg) { + SCRIPT_TASK_ROOT.with(|root| { + if let Some(script_task) = *root.borrow() { + let script_task = unsafe { &*script_task }; + script_task.handle_msg_from_script(msg); + } + }); + } + /// Creates a new script task. pub fn new(compositor: Box, port: Receiver, From f7ac1f1876bbf139bcf7a32ba6d8a602a8269345 Mon Sep 17 00:00:00 2001 From: Josh Matthews Date: Thu, 5 Mar 2015 09:29:04 -0500 Subject: [PATCH 07/11] Remove old implementation of XHR's fetch. --- components/script/dom/xmlhttprequest.rs | 136 ------------------------ 1 file changed, 136 deletions(-) diff --git a/components/script/dom/xmlhttprequest.rs b/components/script/dom/xmlhttprequest.rs index 9f90bc2a628..431400ea5a3 100644 --- a/components/script/dom/xmlhttprequest.rs +++ b/components/script/dom/xmlhttprequest.rs @@ -43,7 +43,6 @@ use js::jsapi::JS_ClearPendingException; use js::jsval::{JSVal, NullValue, UndefinedValue}; use net_traits::ControlMsg::Load; -use net_traits::ProgressMsg::{Payload, Done}; use net_traits::{ResourceTask, ResourceCORSData, LoadData, LoadConsumer, AsyncResponseTarget}; use net_traits::{AsyncResponseListener, ResponseAction, Metadata}; use cors::{allow_cross_origin_request, CORSRequest, RequestMode, AsyncCORSResponseListener}; @@ -54,7 +53,6 @@ use util::task::spawn_named; use std::ascii::AsciiExt; use std::borrow::ToOwned; use std::cell::{RefCell, Cell}; -use std::sync::mpsc::{Sender, Receiver, channel}; use std::default::Default; use std::old_io::Timer; use std::str::FromStr; @@ -114,11 +112,6 @@ impl XHRProgress { } } -enum TerminateReason { - AbortedOrReopened, - TimedOut, -} - #[dom_struct] pub struct XMLHttpRequest { eventtarget: XMLHttpRequestEventTarget, @@ -147,7 +140,6 @@ pub struct XMLHttpRequest { global: GlobalField, timer: DOMRefCell, fetch_time: Cell, - terminate_sender: DOMRefCell>>, timeout_target: DOMRefCell>>, generation_id: Cell, response_status: Cell>, @@ -182,7 +174,6 @@ impl XMLHttpRequest { global: GlobalField::from_rooted(&global), timer: DOMRefCell::new(Timer::new().unwrap()), fetch_time: Cell::new(0), - terminate_sender: DOMRefCell::new(None), timeout_target: DOMRefCell::new(None), generation_id: Cell::new(GenerationId(0)), response_status: Cell::new(Ok(())), @@ -341,128 +332,6 @@ impl XMLHttpRequest { }; resource_task.send(Load(load_data, LoadConsumer::Listener(listener))).unwrap(); } - - #[allow(unsafe_code)] - fn fetch(xhr: JSRef, resource_task: ResourceTask, - mut load_data: LoadData, terminate_receiver: Receiver, - cors_request: Result,()>, gen_id: GenerationId) - -> ErrorResult { - - macro_rules! notify_error_and_return( - ($err:expr) => ({ - xhr.process_partial_response(XHRProgress::Errored(gen_id, $err)); - return Err($err) - }); - ); - - macro_rules! terminate( - ($reason:expr) => ( - match $reason { - TerminateReason::AbortedOrReopened => { - return Err(Abort) - } - TerminateReason::TimedOut => { - notify_error_and_return!(Timeout); - } - } - ); - ); - - - match cors_request { - Err(_) => { - // Happens in case of cross-origin non-http URIs - notify_error_and_return!(Network); - } - - Ok(Some(ref req)) => { - let (chan, cors_port) = channel(); - let req2 = req.clone(); - // TODO: this exists only to make preflight check non-blocking - // perhaps should be handled by the resource_loader? - spawn_named("XHR:Cors".to_owned(), move || { - let response = req2.http_fetch(); - chan.send(response).unwrap(); - }); - - select! ( - response = cors_port.recv() => { - let response = response.unwrap(); - if response.network_error { - notify_error_and_return!(Network); - } else { - load_data.cors = Some(ResourceCORSData { - preflight: req.preflight_flag, - origin: req.origin.clone() - }); - } - }, - reason = terminate_receiver.recv() => terminate!(reason.unwrap()) - ); - } - _ => {} - } - - // Step 10, 13 - let (start_chan, start_port) = channel(); - resource_task.send(Load(load_data, LoadConsumer::Channel(start_chan))).unwrap(); - - - let progress_port; - select! ( - response = start_port.recv() => { - let response = response.unwrap(); - match cors_request { - Ok(Some(ref req)) => { - match response.metadata.headers { - Some(ref h) if allow_cross_origin_request(req, h) => {}, - _ => notify_error_and_return!(Network) - } - }, - - _ => {} - }; - // XXXManishearth Clear cache entries in case of a network error - xhr.process_partial_response(XHRProgress::HeadersReceived(gen_id, - response.metadata.headers.clone(), response.metadata.status.clone())); - - progress_port = response.progress_port; - }, - reason = terminate_receiver.recv() => terminate!(reason.unwrap()) - ); - - let mut buf = vec!(); - loop { - // Under most circumstances, progress_port will contain lots of Payload - // events. Since select! does not have any fairness or priority, it - // might always remove the progress_port event, even when there is - // a terminate event waiting in the terminate_receiver. If this happens, - // a timeout or abort will take too long to be processed. To avoid this, - // in each iteration, we check for a terminate event before we block. - match terminate_receiver.try_recv() { - Ok(reason) => terminate!(reason), - Err(_) => () - }; - - select! ( - progress = progress_port.recv() => match progress.unwrap() { - Payload(data) => { - buf.push_all(data.as_slice()); - xhr.process_partial_response(XHRProgress::Loading( - gen_id, ByteString::new(buf.clone()))); - }, - Done(Ok(())) => { - xhr.process_partial_response(XHRProgress::Done(gen_id)); - return Ok(()); - }, - Done(Err(_)) => { - notify_error_and_return!(Network); - } - }, - reason = terminate_receiver.recv() => terminate!(reason.unwrap()) - ); - } - } } impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { @@ -732,8 +601,6 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { } load_data.method = (*self.request_method.borrow()).clone(); - let (terminate_sender, terminate_receiver) = channel(); - *self.terminate_sender.borrow_mut() = Some(terminate_sender); // CORS stuff let global = self.global.root(); @@ -1112,7 +979,6 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { fn terminate_ongoing_fetch(self) { let GenerationId(prev_id) = self.generation_id.get(); self.generation_id.set(GenerationId(prev_id + 1)); - self.terminate_sender.borrow().as_ref().map(|s| s.send(TerminateReason::AbortedOrReopened)); *self.timeout_target.borrow_mut() = None; self.response_status.set(Ok(())); } @@ -1171,7 +1037,6 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { // This will cancel all previous timeouts let oneshot = self.timer.borrow_mut() .oneshot(Duration::milliseconds(timeout as i64)); - let terminate_sender = (*self.terminate_sender.borrow()).clone(); let timeout_target = (*self.timeout_target.borrow().as_ref().unwrap()).clone(); let global = self.global.root(); let xhr = Trusted::new(global.r().get_cx(), self, global.r().script_chan()); @@ -1183,7 +1048,6 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { xhr: xhr, gen_id: gen_id, })).unwrap(); - terminate_sender.map(|s| s.send(TerminateReason::TimedOut)); }, Err(_) => { // This occurs if xhr.timeout (the sender) goes out of scope (i.e, xhr went out of scope) From 1ca9ff56c8de01056ffaf31d6173e893b6567d46 Mon Sep 17 00:00:00 2001 From: Josh Matthews Date: Thu, 5 Mar 2015 10:34:30 -0500 Subject: [PATCH 08/11] Create easy common interface for off-thread network listeners, and remove the CORS-specific reimplementation of async networking. --- components/script/cors.rs | 51 ++++++++++++-- components/script/dom/xmlhttprequest.rs | 88 +++++-------------------- components/script/lib.rs | 1 + components/script/network_listener.rs | 48 ++++++++++++++ 4 files changed, 110 insertions(+), 78 deletions(-) create mode 100644 components/script/network_listener.rs diff --git a/components/script/cors.rs b/components/script/cors.rs index 0b669ab8bf8..235ec43f779 100644 --- a/components/script/cors.rs +++ b/components/script/cors.rs @@ -9,8 +9,14 @@ //! This library will eventually become the core of the Fetch crate //! with CORSRequest being expanded into FetchRequest (etc) +use network_listener::{NetworkListener, PreInvoke}; +use script_task::ScriptChan; +use net_traits::{AsyncResponseTarget, AsyncResponseListener, ResponseAction, Metadata}; + use std::ascii::AsciiExt; use std::borrow::ToOwned; +use std::cell::RefCell; +use std::sync::{Arc, Mutex}; use time; use time::{now, Timespec}; @@ -27,14 +33,12 @@ use hyper::status::StatusClass::Success; use url::{SchemeData, Url}; use util::task::spawn_named; +/// Interface for network listeners concerned with CORS checks. Proper network requests +/// should be initiated from this method, based on the response provided. pub trait AsyncCORSResponseListener { fn response_available(&self, response: CORSResponse); } -pub trait AsyncCORSResponseTarget { - fn invoke_with_listener(&self, response: CORSResponse); -} - #[derive(Clone)] pub struct CORSRequest { pub origin: Url, @@ -98,13 +102,48 @@ impl CORSRequest { } } - pub fn http_fetch_async(&self, listener: Box) { + pub fn http_fetch_async(&self, + listener: Box, + script_chan: Box) { + struct CORSContext { + listener: Box, + response: RefCell>, + } + + // This is shoe-horning the CORSReponse stuff into the rest of the async network + // framework right now. It would be worth redesigning http_fetch to do this properly. + impl AsyncResponseListener for CORSContext { + fn headers_available(&self, _metadata: Metadata) { + } + + fn data_available(&self, _payload: Vec) { + } + + fn response_complete(&self, _status: Result<(), String>) { + let response = self.response.borrow_mut().take().unwrap(); + self.listener.response_available(response); + } + } + impl PreInvoke for CORSContext {} + + let context = CORSContext { + listener: listener, + response: RefCell::new(None), + }; + let listener = NetworkListener { + context: Arc::new(Mutex::new(context)), + script_chan: script_chan, + }; + // TODO: this exists only to make preflight check non-blocking // perhaps should be handled by the resource task? let req = self.clone(); spawn_named("cors".to_owned(), move || { let response = req.http_fetch(); - listener.invoke_with_listener(response); + let mut context = listener.context.lock(); + let context = context.as_mut().unwrap(); + *context.response.borrow_mut() = Some(response); + listener.invoke_with_listener(ResponseAction::ResponseComplete(Ok(()))); }); } diff --git a/components/script/dom/xmlhttprequest.rs b/components/script/dom/xmlhttprequest.rs index 431400ea5a3..a4e9e767dd5 100644 --- a/components/script/dom/xmlhttprequest.rs +++ b/components/script/dom/xmlhttprequest.rs @@ -26,6 +26,7 @@ use dom::urlsearchparams::URLSearchParamsHelpers; use dom::xmlhttprequesteventtarget::XMLHttpRequestEventTarget; use dom::xmlhttprequesteventtarget::XMLHttpRequestEventTargetTypeId; use dom::xmlhttprequestupload::XMLHttpRequestUpload; +use network_listener::{NetworkListener, PreInvoke}; use script_task::{ScriptChan, ScriptMsg, Runnable, ScriptPort}; use encoding::all::UTF_8; @@ -43,10 +44,10 @@ use js::jsapi::JS_ClearPendingException; use js::jsval::{JSVal, NullValue, UndefinedValue}; use net_traits::ControlMsg::Load; -use net_traits::{ResourceTask, ResourceCORSData, LoadData, LoadConsumer, AsyncResponseTarget}; -use net_traits::{AsyncResponseListener, ResponseAction, Metadata}; +use net_traits::{ResourceTask, ResourceCORSData, LoadData, LoadConsumer}; +use net_traits::{AsyncResponseListener, Metadata}; use cors::{allow_cross_origin_request, CORSRequest, RequestMode, AsyncCORSResponseListener}; -use cors::{AsyncCORSResponseTarget, CORSResponse}; +use cors::CORSResponse; use util::str::DOMString; use util::task::spawn_named; @@ -224,45 +225,15 @@ impl XMLHttpRequest { } } - struct CORSListener { - context: Arc>, - script_chan: Box, - } - - impl AsyncCORSResponseTarget for CORSListener { - fn invoke_with_listener(&self, response: CORSResponse) { - self.script_chan.send(ScriptMsg::RunnableMsg(box CORSRunnable { - context: self.context.clone(), - response: response, - })).unwrap(); - } - } - - struct CORSRunnable { - context: Arc>, - response: CORSResponse, - } - - impl Runnable for CORSRunnable { - fn handler(self: Box) { - let this = *self; - let context = this.context.lock().unwrap(); - context.response_available(this.response); - } - } - - let cors_context = Arc::new(Mutex::new(CORSContext { + let cors_context = CORSContext { xhr: context, load_data: RefCell::new(Some(load_data)), req: req.clone(), script_chan: script_chan.clone(), resource_task: resource_task, - })); + }; - req.http_fetch_async(box CORSListener { - context: cors_context, - script_chan: script_chan - }); + req.http_fetch_async(box cors_context, script_chan); } fn initiate_async_xhr(context: Arc>, @@ -293,40 +264,14 @@ impl XMLHttpRequest { } } - struct XHRRunnable { - context: Arc>, - action: ResponseAction, - } - - impl Runnable for XHRRunnable { - fn handler(self: Box) { - let this = *self; - - let context = this.context.lock().unwrap(); - let xhr = context.xhr.to_temporary().root(); - if xhr.r().generation_id.get() != context.gen_id { - return; - } - - this.action.process(&*context); + impl PreInvoke for XHRContext { + fn should_invoke(&self) -> bool { + let xhr = self.xhr.to_temporary().root(); + xhr.r().generation_id.get() == self.gen_id } } - struct XHRListener { - context: Arc>, - script_chan: Box, - } - - impl AsyncResponseTarget for XHRListener { - fn invoke_with_listener(&self, action: ResponseAction) { - self.script_chan.send(ScriptMsg::RunnableMsg(box XHRRunnable { - context: self.context.clone(), - action: action - })).unwrap(); - } - } - - let listener = box XHRListener { + let listener = box NetworkListener { context: context, script_chan: script_chan, }; @@ -635,7 +580,7 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { debug!("request_headers = {:?}", *self.request_headers.borrow()); self.fetch_time.set(time::now().to_timespec().sec); - let rv = self.fetch2(load_data, cors_request, global.r()); + let rv = self.fetch(load_data, cors_request, global.r()); if self.sync.get() { return rv; } @@ -798,8 +743,8 @@ trait PrivateXMLHttpRequestHelpers { fn cancel_timeout(self); fn filter_response_headers(self) -> Headers; fn discard_subsequent_responses(self); - fn fetch2(self, load_data: LoadData, cors_request: Result,()>, - global: GlobalRef) -> ErrorResult; + fn fetch(self, load_data: LoadData, cors_request: Result,()>, + global: GlobalRef) -> ErrorResult; } impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { @@ -1119,8 +1064,7 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { self.response_status.set(Err(())); } - #[allow(unsafe_code)] - fn fetch2(self, + fn fetch(self, load_data: LoadData, cors_request: Result,()>, global: GlobalRef) -> ErrorResult { diff --git a/components/script/lib.rs b/components/script/lib.rs index 4a9fcdbd09c..b9f4b42f84d 100644 --- a/components/script/lib.rs +++ b/components/script/lib.rs @@ -61,6 +61,7 @@ pub mod dom; pub mod parse; pub mod layout_interface; +mod network_listener; pub mod page; pub mod script_task; mod timers; diff --git a/components/script/network_listener.rs b/components/script/network_listener.rs new file mode 100644 index 00000000000..5f400eb76b3 --- /dev/null +++ b/components/script/network_listener.rs @@ -0,0 +1,48 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use script_task::{ScriptChan, ScriptMsg, Runnable}; +use net_traits::{AsyncResponseTarget, AsyncResponseListener, ResponseAction}; +use std::sync::{Arc, Mutex}; + +/// An off-thread sink for async network event runnables. All such events are forwarded to +/// a target thread, where they are invoked on the provided context object. +pub struct NetworkListener { + pub context: Arc>, + pub script_chan: Box, +} + +impl AsyncResponseTarget for NetworkListener { + fn invoke_with_listener(&self, action: ResponseAction) { + self.script_chan.send(ScriptMsg::RunnableMsg(box ListenerRunnable { + context: self.context.clone(), + action: action, + })).unwrap(); + } +} + +/// A gating mechanism that runs before invoking the runnable on the target thread. +/// If the `should_invoke` method returns false, the runnable is discarded without +/// being invoked. +pub trait PreInvoke { + fn should_invoke(&self) -> bool { + true + } +} + +/// A runnable for moving the async network events between threads. +struct ListenerRunnable { + context: Arc>, + action: ResponseAction, +} + +impl Runnable for ListenerRunnable { + fn handler(self: Box>) { + let this = *self; + let context = this.context.lock().unwrap(); + if context.should_invoke() { + this.action.process(&*context); + } + } +} From 1b9684634f245b3488694ee2c61fbd55f7e19615 Mon Sep 17 00:00:00 2001 From: Josh Matthews Date: Thu, 5 Mar 2015 10:51:32 -0500 Subject: [PATCH 09/11] Document async networking interfaces. --- components/net/resource_task.rs | 1 + components/net_traits/lib.rs | 15 +++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/components/net/resource_task.rs b/components/net/resource_task.rs index 0d4bc19a64d..3ce69bfce17 100644 --- a/components/net/resource_task.rs +++ b/components/net/resource_task.rs @@ -35,6 +35,7 @@ use std::thunk::Invoke; static mut HOST_TABLE: Option<*mut HashMap> = None; pub fn global_init() { +<<<<<<< HEAD //TODO: handle bad file path let path = match env::var("HOST_FILE") { Ok(host_file_path) => host_file_path, diff --git a/components/net_traits/lib.rs b/components/net_traits/lib.rs index 660e3d05320..fe11294b2a5 100644 --- a/components/net_traits/lib.rs +++ b/components/net_traits/lib.rs @@ -66,19 +66,31 @@ impl LoadData { } } +/// A listener for asynchronous network events. Cancelling the underlying request is unsupported. pub trait AsyncResponseListener { + /// The response headers for a request have been received. fn headers_available(&self, metadata: Metadata); + /// A portion of the response body has been received. This data is unavailable after + /// this method returned, and must be stored accordingly. fn data_available(&self, payload: Vec); + /// The response is complete. If the provided status is an Err value, there is no guarantee + /// that the response body was completely read. fn response_complete(&self, status: Result<(), String>); } +/// Data for passing between threads/processes to indicate a particular action to +/// take on a provided network listener. pub enum ResponseAction { + /// Invoke headers_available HeadersAvailable(Metadata), + /// Invoke data_available DataAvailable(Vec), + /// Invoke response_complete ResponseComplete(Result<(), String>) } impl ResponseAction { + /// Execute the default action on a provided listener. pub fn process(self, listener: &AsyncResponseListener) { match self { ResponseAction::HeadersAvailable(m) => listener.headers_available(m), @@ -88,10 +100,13 @@ impl ResponseAction { } } +/// A target for async networking events. Commonly used to dispatch a runnable event to another +/// thread storing the wrapped closure for later execution. pub trait AsyncResponseTarget { fn invoke_with_listener(&self, action: ResponseAction); } +/// A wrapper for a network load that can either be channel or event-based. pub enum LoadConsumer { Channel(Sender), Listener(Box), From 7b3043a59bd01c77d22da3cfd0e4951820c8e972 Mon Sep 17 00:00:00 2001 From: Josh Matthews Date: Thu, 5 Mar 2015 11:15:01 -0500 Subject: [PATCH 10/11] Warning fixes. --- components/net/resource_task.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/components/net/resource_task.rs b/components/net/resource_task.rs index 3ce69bfce17..0d4bc19a64d 100644 --- a/components/net/resource_task.rs +++ b/components/net/resource_task.rs @@ -35,7 +35,6 @@ use std::thunk::Invoke; static mut HOST_TABLE: Option<*mut HashMap> = None; pub fn global_init() { -<<<<<<< HEAD //TODO: handle bad file path let path = match env::var("HOST_FILE") { Ok(host_file_path) => host_file_path, From d0704d17970987a1b96f1e4d6a50aae986542435 Mon Sep 17 00:00:00 2001 From: Josh Matthews Date: Thu, 5 Mar 2015 11:19:58 -0500 Subject: [PATCH 11/11] Documentation. --- components/script/dom/bindings/global.rs | 5 +++-- components/script/script_task.rs | 3 +++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/components/script/dom/bindings/global.rs b/components/script/dom/bindings/global.rs index bff4c66f89b..3c3df454ca1 100644 --- a/components/script/dom/bindings/global.rs +++ b/components/script/dom/bindings/global.rs @@ -130,8 +130,9 @@ impl<'a> GlobalRef<'a> { } } - /// `ScriptChan` used to send messages to the event loop of this global's - /// thread. + /// Create a new sender/receiver pair that can be used to implement an on-demand + /// event loop. Used for implementing web APIs that require blocking semantics + /// without resorting to nested event loops. pub fn new_script_pair(&self) -> (Box, Box) { match *self { GlobalRef::Window(ref window) => window.new_script_pair(), diff --git a/components/script/script_task.rs b/components/script/script_task.rs index e43ca69a981..31116307b3c 100644 --- a/components/script/script_task.rs +++ b/components/script/script_task.rs @@ -201,6 +201,9 @@ pub trait ScriptChan { fn clone(&self) -> Box; } +/// An interface for receiving ScriptMsg values in an event loop. Used for synchronous DOM +/// APIs that need to abstract over multiple kinds of event loops (worker/main thread) with +/// different Receiver interfaces. pub trait ScriptPort { fn recv(&self) -> ScriptMsg; }