diff --git a/components/net/about_loader.rs b/components/net/about_loader.rs index cd7d40671fd..e12fc32839c 100644 --- a/components/net/about_loader.rs +++ b/components/net/about_loader.rs @@ -2,7 +2,7 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use net_traits::{LoadData, Metadata}; +use net_traits::{LoadData, Metadata,ResponseSenders}; use net_traits::ProgressMsg::Done; use mime_classifier::MIMEClassifier; use resource_task::start_sending; @@ -18,10 +18,9 @@ use std::borrow::IntoCow; use std::fs::PathExt; use std::sync::Arc; -pub fn factory(mut load_data: LoadData, classifier: Arc) { +pub fn factory(mut load_data: LoadData, start_chan: ResponseSenders, classifier: Arc) { match load_data.url.non_relative_scheme_data().unwrap() { "blank" => { - let start_chan = load_data.consumer; let chan = start_sending(start_chan, Metadata { final_url: load_data.url, content_type: Some(ContentType(Mime(TopLevel::Text, SubLevel::Html, vec![]))), @@ -40,11 +39,10 @@ pub fn factory(mut load_data: LoadData, classifier: Arc) { load_data.url = Url::from_file_path(&*path).unwrap(); } _ => { - let start_chan = load_data.consumer; start_sending(start_chan, Metadata::default(load_data.url)) .send(Done(Err("Unknown about: URL.".to_string()))).unwrap(); return } }; - file_loader::factory(load_data, classifier) + file_loader::factory(load_data, start_chan, classifier) } diff --git a/components/net/data_loader.rs b/components/net/data_loader.rs index 03ec60be3a8..d4146a0f4b1 100644 --- a/components/net/data_loader.rs +++ b/components/net/data_loader.rs @@ -2,7 +2,7 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use net_traits::{LoadData, Metadata}; +use net_traits::{LoadData, Metadata, ResponseSenders}; use net_traits::ProgressMsg::{Payload, Done}; use mime_classifier::MIMEClassifier; use resource_task::start_sending; @@ -13,16 +13,15 @@ use hyper::mime::Mime; use std::sync::Arc; use url::{percent_decode, SchemeData}; -pub fn factory(load_data: LoadData, _classifier: Arc) { +pub fn factory(load_data: LoadData, senders: ResponseSenders, _classifier: Arc) { // NB: we don't spawn a new task. // Hypothesis: data URLs are too small for parallel base64 etc. to be worth it. // Should be tested at some point. // Left in separate function to allow easy moving to a task, if desired. - load(load_data) + load(load_data, senders) } -pub fn load(load_data: LoadData) { - let start_chan = load_data.consumer; +pub fn load(load_data: LoadData, start_chan: ResponseSenders) { let url = load_data.url; assert!(&*url.scheme == "data"); diff --git a/components/net/file_loader.rs b/components/net/file_loader.rs index 64539c29477..2d02d9b4c19 100644 --- a/components/net/file_loader.rs +++ b/components/net/file_loader.rs @@ -2,17 +2,16 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use net_traits::{LoadData, Metadata, ProgressMsg}; +use net_traits::{LoadData, Metadata, ResponseSenders}; use net_traits::ProgressMsg::{Payload, Done}; use mime_classifier::MIMEClassifier; -use resource_task::{start_sending, start_sending_sniffed}; +use resource_task::{start_sending, start_sending_sniffed, ProgressSender}; use std::borrow::ToOwned; use std::fs::File; use std::io::Read; use std::path::PathBuf; use std::sync::Arc; -use std::sync::mpsc::Sender; use util::task::spawn_named; static READ_SIZE: usize = 8192; @@ -34,7 +33,7 @@ fn read_block(reader: &mut File) -> Result { } } -fn read_all(reader: &mut File, progress_chan: &Sender) +fn read_all(reader: &mut File, progress_chan: &ProgressSender) -> Result<(), String> { loop { match try!(read_block(reader)) { @@ -44,9 +43,8 @@ fn read_all(reader: &mut File, progress_chan: &Sender) } } -pub fn factory(load_data: LoadData, classifier: Arc) { +pub fn factory(load_data: LoadData, senders: ResponseSenders, classifier: Arc) { let url = load_data.url; - let start_chan = load_data.consumer; assert!(&*url.scheme == "file"); spawn_named("file_loader".to_owned(), move || { let metadata = Metadata::default(url.clone()); @@ -58,24 +56,24 @@ pub fn factory(load_data: LoadData, classifier: Arc) { let res = read_block(reader); let (res, progress_chan) = match res { Ok(ReadStatus::Partial(buf)) => { - let progress_chan = start_sending_sniffed(start_chan, metadata, + let progress_chan = start_sending_sniffed(senders, metadata, classifier, &buf); progress_chan.send(Payload(buf)).unwrap(); (read_all(reader, &progress_chan), progress_chan) } Ok(ReadStatus::EOF) | Err(_) => - (res.map(|_| ()), start_sending(start_chan, metadata)), + (res.map(|_| ()), start_sending(senders, metadata)), }; progress_chan.send(Done(res)).unwrap(); } Err(e) => { - let progress_chan = start_sending(start_chan, metadata); + let progress_chan = start_sending(senders, metadata); progress_chan.send(Done(Err(e.description().to_string()))).unwrap(); } } } Err(_) => { - let progress_chan = start_sending(start_chan, metadata); + let progress_chan = start_sending(senders, metadata); progress_chan.send(Done(Err(url.to_string()))).unwrap(); } } diff --git a/components/net/http_loader.rs b/components/net/http_loader.rs index d544c307401..bccfb03b8a4 100644 --- a/components/net/http_loader.rs +++ b/components/net/http_loader.rs @@ -2,7 +2,7 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use net_traits::{ControlMsg, CookieSource, LoadData, LoadResponse, Metadata}; +use net_traits::{ControlMsg, CookieSource, LoadData, Metadata, ResponseSenders}; use net_traits::ProgressMsg::{Payload, Done}; use mime_classifier::MIMEClassifier; use resource_task::{start_sending_opt, start_sending_sniffed_opt}; @@ -32,13 +32,13 @@ use url::{Url, UrlParser}; use std::borrow::ToOwned; pub fn factory(cookies_chan: Sender) - -> Box)> + Send> { - box move |(load_data, classifier)| { - spawn_named("http_loader".to_owned(), move || load(load_data, classifier, cookies_chan)) + -> Box)> + Send> { + box move |(load_data, senders, classifier)| { + spawn_named("http_loader".to_owned(), move || load(load_data, senders, classifier, cookies_chan)) } } -fn send_error(url: Url, err: String, start_chan: Sender) { +fn send_error(url: Url, err: String, start_chan: ResponseSenders) { let mut metadata: Metadata = Metadata::default(url); metadata.status = None; @@ -66,13 +66,12 @@ fn read_block(reader: &mut R) -> Result { } } -fn load(mut load_data: LoadData, classifier: Arc, cookies_chan: Sender) { +fn load(mut load_data: LoadData, start_chan: ResponseSenders, classifier: Arc, cookies_chan: Sender) { // FIXME: At the time of writing this FIXME, servo didn't have any central // location for configuration. If you're reading this and such a // repository DOES exist, please update this constant to use it. let max_redirects = 50; let mut iters = 0; - let start_chan = load_data.consumer; let mut url = load_data.url.clone(); let mut redirected_to = HashSet::new(); @@ -140,8 +139,8 @@ reason: \"certificate verify failed\" }]"; ) => { let mut image = resources_dir_path(); image.push("badcert.html"); - let load_data = LoadData::new(Url::from_file_path(&*image).unwrap(), start_chan); - file_loader::factory(load_data, classifier); + let load_data = LoadData::new(Url::from_file_path(&*image).unwrap()); + file_loader::factory(load_data, start_chan, classifier); return; }, Err(e) => { @@ -340,7 +339,7 @@ reason: \"certificate verify failed\" }]"; } fn send_data(reader: &mut R, - start_chan: Sender, + start_chan: ResponseSenders, metadata: Metadata, classifier: Arc) { let (progress_chan, mut chunk) = { diff --git a/components/net/resource_task.rs b/components/net/resource_task.rs index 9952190c23b..0d4bc19a64d 100644 --- a/components/net/resource_task.rs +++ b/components/net/resource_task.rs @@ -12,8 +12,8 @@ use cookie_storage::CookieStorage; use cookie; use mime_classifier::MIMEClassifier; -use net_traits::{ControlMsg, LoadData, LoadResponse}; -use net_traits::{Metadata, ProgressMsg, ResourceTask}; +use net_traits::{ControlMsg, LoadData, LoadResponse, ResponseSenders, LoadConsumer}; +use net_traits::{Metadata, ProgressMsg, ResourceTask, AsyncResponseTarget, ResponseAction}; use net_traits::ProgressMsg::Done; use util::opts; use util::task::spawn_named; @@ -58,22 +58,44 @@ pub fn global_init() { } } +pub enum ProgressSender { + Channel(Sender), + Listener(Box), +} + +impl ProgressSender { + //XXXjdm return actual error + pub fn send(&self, msg: ProgressMsg) -> Result<(), ()> { + match *self { + ProgressSender::Channel(ref c) => c.send(msg).map_err(|_| ()), + ProgressSender::Listener(ref b) => { + let action = match msg { + ProgressMsg::Payload(buf) => ResponseAction::DataAvailable(buf), + ProgressMsg::Done(status) => ResponseAction::ResponseComplete(status), + }; + b.invoke_with_listener(action); + Ok(()) + } + } + } +} + /// For use by loaders in responding to a Load message. -pub fn start_sending(start_chan: Sender, metadata: Metadata) -> Sender { +pub fn start_sending(start_chan: ResponseSenders, metadata: Metadata) -> ProgressSender { start_sending_opt(start_chan, metadata).ok().unwrap() } /// For use by loaders in responding to a Load message that allows content sniffing. -pub fn start_sending_sniffed(start_chan: Sender, metadata: Metadata, +pub fn start_sending_sniffed(start_chan: ResponseSenders, metadata: Metadata, classifier: Arc, partial_body: &Vec) - -> Sender { + -> ProgressSender { start_sending_sniffed_opt(start_chan, metadata, classifier, partial_body).ok().unwrap() } /// For use by loaders in responding to a Load message that allows content sniffing. -pub fn start_sending_sniffed_opt(start_chan: Sender, mut metadata: Metadata, +pub fn start_sending_sniffed_opt(start_chan: ResponseSenders, mut metadata: Metadata, classifier: Arc, partial_body: &Vec) - -> Result, ()> { + -> Result { if opts::get().sniff_mime_types { // TODO: should be calculated in the resource loader, from pull requeset #4094 let nosniff = false; @@ -94,15 +116,23 @@ pub fn start_sending_sniffed_opt(start_chan: Sender, mut metadata: } /// For use by loaders in responding to a Load message. -pub fn start_sending_opt(start_chan: Sender, metadata: Metadata) -> Result, ()> { - let (progress_chan, progress_port) = channel(); - let result = start_chan.send(LoadResponse { - metadata: metadata, - progress_port: progress_port, - }); - match result { - Ok(_) => Ok(progress_chan), - Err(_) => Err(()) +pub fn start_sending_opt(start_chan: ResponseSenders, metadata: Metadata) -> Result { + match start_chan { + ResponseSenders::Channel(start_chan) => { + let (progress_chan, progress_port) = channel(); + let result = start_chan.send(LoadResponse { + metadata: metadata, + progress_port: progress_port, + }); + match result { + Ok(_) => Ok(ProgressSender::Channel(progress_chan)), + Err(_) => Err(()) + } + } + ResponseSenders::Listener(target) => { + target.invoke_with_listener(ResponseAction::HeadersAvailable(metadata)); + Ok(ProgressSender::Listener(target)) + } } } @@ -176,8 +206,8 @@ impl ResourceManager { fn start(&mut self) { loop { match self.from_client.recv().unwrap() { - ControlMsg::Load(load_data) => { - self.load(load_data) + ControlMsg::Load(load_data, consumer) => { + self.load(load_data, consumer) } ControlMsg::SetCookiesForUrl(request, cookie_list, source) => { let header = Header::parse_header(&[cookie_list.into_bytes()]); @@ -199,7 +229,7 @@ impl ResourceManager { } } - fn load(&mut self, mut load_data: LoadData) { + fn load(&mut self, mut load_data: LoadData, consumer: LoadConsumer) { unsafe { if let Some(host_table) = HOST_TABLE { load_data = replace_hosts(load_data, host_table); @@ -208,13 +238,14 @@ impl ResourceManager { self.user_agent.as_ref().map(|ua| load_data.headers.set(UserAgent(ua.clone()))); - fn from_factory(factory: fn(LoadData, Arc)) - -> Box)> + Send> { - box move |(load_data, classifier)| { - factory(load_data, classifier) + fn from_factory(factory: fn(LoadData, ResponseSenders, Arc)) + -> Box)> + Send> { + box move |(load_data, senders, classifier)| { + factory(load_data, senders, classifier) } } + let senders = ResponseSenders::from_consumer(consumer); let loader = match &*load_data.url.scheme { "file" => from_factory(file_loader::factory), "http" | "https" | "view-source" => http_loader::factory(self.resource_task.clone()), @@ -222,13 +253,13 @@ impl ResourceManager { "about" => from_factory(about_loader::factory), _ => { debug!("resource_task: no loader for scheme {}", load_data.url.scheme); - start_sending(load_data.consumer, Metadata::default(load_data.url)) + start_sending(senders, Metadata::default(load_data.url)) .send(ProgressMsg::Done(Err("no loader for scheme".to_string()))).unwrap(); return } }; debug!("resource_task: loading url: {}", load_data.url.serialize()); - loader.invoke((load_data, self.mime_classifier.clone())); + loader.invoke((load_data, senders, self.mime_classifier.clone())); } } diff --git a/components/net_traits/image_cache_task.rs b/components/net_traits/image_cache_task.rs index 852b4ad9c71..905a7de3611 100644 --- a/components/net_traits/image_cache_task.rs +++ b/components/net_traits/image_cache_task.rs @@ -3,6 +3,7 @@ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ use image::base::Image; +use LoadConsumer::Channel; use {ControlMsg, LoadData, ProgressMsg, ResourceTask}; use url::Url; @@ -84,7 +85,7 @@ impl ImageCacheTaskClient for ImageCacheTask { pub fn load_image_data(url: Url, resource_task: ResourceTask, placeholder: &[u8]) -> Result, ()> { let (response_chan, response_port) = channel(); - resource_task.send(ControlMsg::Load(LoadData::new(url.clone(), response_chan))).unwrap(); + resource_task.send(ControlMsg::Load(LoadData::new(url.clone()), Channel(response_chan))).unwrap(); let mut image_data = vec!(); diff --git a/components/net_traits/lib.rs b/components/net_traits/lib.rs index 8a35fc87762..fe11294b2a5 100644 --- a/components/net_traits/lib.rs +++ b/components/net_traits/lib.rs @@ -51,11 +51,10 @@ pub struct LoadData { pub preserved_headers: Headers, pub data: Option>, pub cors: Option, - pub consumer: Sender, } impl LoadData { - pub fn new(url: Url, consumer: Sender) -> LoadData { + pub fn new(url: Url) -> LoadData { LoadData { url: url, method: Method::Get, @@ -63,17 +62,62 @@ impl LoadData { preserved_headers: Headers::new(), data: None, cors: None, - consumer: consumer, } } } +/// A listener for asynchronous network events. Cancelling the underlying request is unsupported. +pub trait AsyncResponseListener { + /// The response headers for a request have been received. + fn headers_available(&self, metadata: Metadata); + /// A portion of the response body has been received. This data is unavailable after + /// this method returned, and must be stored accordingly. + fn data_available(&self, payload: Vec); + /// The response is complete. If the provided status is an Err value, there is no guarantee + /// that the response body was completely read. + fn response_complete(&self, status: Result<(), String>); +} + +/// Data for passing between threads/processes to indicate a particular action to +/// take on a provided network listener. +pub enum ResponseAction { + /// Invoke headers_available + HeadersAvailable(Metadata), + /// Invoke data_available + DataAvailable(Vec), + /// Invoke response_complete + ResponseComplete(Result<(), String>) +} + +impl ResponseAction { + /// Execute the default action on a provided listener. + pub fn process(self, listener: &AsyncResponseListener) { + match self { + ResponseAction::HeadersAvailable(m) => listener.headers_available(m), + ResponseAction::DataAvailable(d) => listener.data_available(d), + ResponseAction::ResponseComplete(r) => listener.response_complete(r), + } + } +} + +/// A target for async networking events. Commonly used to dispatch a runnable event to another +/// thread storing the wrapped closure for later execution. +pub trait AsyncResponseTarget { + fn invoke_with_listener(&self, action: ResponseAction); +} + +/// A wrapper for a network load that can either be channel or event-based. +pub enum LoadConsumer { + Channel(Sender), + Listener(Box), +} + /// Handle to a resource task pub type ResourceTask = Sender; pub enum ControlMsg { /// Request the data associated with a particular URL - Load(LoadData), + Load(LoadData, LoadConsumer), /// Store a set of cookies for a given originating URL SetCookiesForUrl(Url, String, CookieSource), /// Retrieve the stored cookies for a given URL @@ -159,6 +203,20 @@ pub enum CookieSource { NonHTTP, } +pub enum ResponseSenders { + Channel(Sender), + Listener(Box), +} + +impl ResponseSenders { + pub fn from_consumer(consumer: LoadConsumer) -> ResponseSenders { + match consumer { + LoadConsumer::Channel(c) => ResponseSenders::Channel(c), + LoadConsumer::Listener(l) => ResponseSenders::Listener(l), + } + } +} + /// Messages sent in response to a `Load` message #[derive(PartialEq,Debug)] pub enum ProgressMsg { @@ -172,7 +230,7 @@ pub enum ProgressMsg { pub fn load_whole_resource(resource_task: &ResourceTask, url: Url) -> Result<(Metadata, Vec), String> { let (start_chan, start_port) = channel(); - resource_task.send(ControlMsg::Load(LoadData::new(url, start_chan))).unwrap(); + resource_task.send(ControlMsg::Load(LoadData::new(url), LoadConsumer::Channel(start_chan))).unwrap(); let response = start_port.recv().unwrap(); let mut buf = vec!(); @@ -188,7 +246,7 @@ pub fn load_whole_resource(resource_task: &ResourceTask, url: Url) /// Load a URL asynchronously and iterate over chunks of bytes from the response. pub fn load_bytes_iter(resource_task: &ResourceTask, url: Url) -> (Metadata, ProgressMsgPortIterator) { let (input_chan, input_port) = channel(); - resource_task.send(ControlMsg::Load(LoadData::new(url, input_chan))).unwrap(); + resource_task.send(ControlMsg::Load(LoadData::new(url), LoadConsumer::Channel(input_chan))).unwrap(); let response = input_port.recv().unwrap(); let iter = ProgressMsgPortIterator { progress_port: response.progress_port }; diff --git a/components/script/cors.rs b/components/script/cors.rs index e9521296464..235ec43f779 100644 --- a/components/script/cors.rs +++ b/components/script/cors.rs @@ -9,7 +9,14 @@ //! This library will eventually become the core of the Fetch crate //! with CORSRequest being expanded into FetchRequest (etc) +use network_listener::{NetworkListener, PreInvoke}; +use script_task::ScriptChan; +use net_traits::{AsyncResponseTarget, AsyncResponseListener, ResponseAction, Metadata}; + use std::ascii::AsciiExt; +use std::borrow::ToOwned; +use std::cell::RefCell; +use std::sync::{Arc, Mutex}; use time; use time::{now, Timespec}; @@ -24,6 +31,13 @@ use hyper::method::Method; use hyper::status::StatusClass::Success; use url::{SchemeData, Url}; +use util::task::spawn_named; + +/// Interface for network listeners concerned with CORS checks. Proper network requests +/// should be initiated from this method, based on the response provided. +pub trait AsyncCORSResponseListener { + fn response_available(&self, response: CORSResponse); +} #[derive(Clone)] pub struct CORSRequest { @@ -88,7 +102,52 @@ impl CORSRequest { } } - /// https://fetch.spec.whatwg.org/#concept-http-fetch + pub fn http_fetch_async(&self, + listener: Box, + script_chan: Box) { + struct CORSContext { + listener: Box, + response: RefCell>, + } + + // This is shoe-horning the CORSReponse stuff into the rest of the async network + // framework right now. It would be worth redesigning http_fetch to do this properly. + impl AsyncResponseListener for CORSContext { + fn headers_available(&self, _metadata: Metadata) { + } + + fn data_available(&self, _payload: Vec) { + } + + fn response_complete(&self, _status: Result<(), String>) { + let response = self.response.borrow_mut().take().unwrap(); + self.listener.response_available(response); + } + } + impl PreInvoke for CORSContext {} + + let context = CORSContext { + listener: listener, + response: RefCell::new(None), + }; + let listener = NetworkListener { + context: Arc::new(Mutex::new(context)), + script_chan: script_chan, + }; + + // TODO: this exists only to make preflight check non-blocking + // perhaps should be handled by the resource task? + let req = self.clone(); + spawn_named("cors".to_owned(), move || { + let response = req.http_fetch(); + let mut context = listener.context.lock(); + let context = context.as_mut().unwrap(); + *context.response.borrow_mut() = Some(response); + listener.invoke_with_listener(ResponseAction::ResponseComplete(Ok(()))); + }); + } + + /// http://fetch.spec.whatwg.org/#concept-http-fetch /// This method assumes that the CORS flag is set /// This does not perform the full HTTP fetch, rather it handles part of the CORS filtering /// if self.mode is ForcedPreflight, then the CORS-with-forced-preflight diff --git a/components/script/dom/bindings/cell.rs b/components/script/dom/bindings/cell.rs index b64d80cefc3..177fed9397b 100644 --- a/components/script/dom/bindings/cell.rs +++ b/components/script/dom/bindings/cell.rs @@ -16,6 +16,7 @@ use std::cell::{BorrowState, RefCell, Ref, RefMut}; /// /// This extends the API of `core::cell::RefCell` to allow unsafe access in /// certain situations, with dynamic checking in debug builds. +#[derive(Clone)] pub struct DOMRefCell { value: RefCell, } diff --git a/components/script/dom/bindings/global.rs b/components/script/dom/bindings/global.rs index 233143851e6..3c3df454ca1 100644 --- a/components/script/dom/bindings/global.rs +++ b/components/script/dom/bindings/global.rs @@ -12,8 +12,8 @@ use dom::bindings::js::{JS, JSRef, Root, Unrooted}; use dom::bindings::utils::{Reflectable, Reflector}; use dom::workerglobalscope::{WorkerGlobalScope, WorkerGlobalScopeHelpers}; use dom::window::{self, WindowHelpers}; -use script_task::ScriptChan; use devtools_traits::DevtoolsControlChan; +use script_task::{ScriptChan, ScriptPort, ScriptMsg, ScriptTask}; use msg::constellation_msg::{PipelineId, WorkerId}; use net_traits::ResourceTask; @@ -129,6 +129,25 @@ impl<'a> GlobalRef<'a> { GlobalRef::Worker(ref worker) => worker.script_chan(), } } + + /// Create a new sender/receiver pair that can be used to implement an on-demand + /// event loop. Used for implementing web APIs that require blocking semantics + /// without resorting to nested event loops. + pub fn new_script_pair(&self) -> (Box, Box) { + match *self { + GlobalRef::Window(ref window) => window.new_script_pair(), + GlobalRef::Worker(ref worker) => worker.new_script_pair(), + } + } + + /// Process a single event as if it were the next event in the task queue for + /// this global. + pub fn process_event(&self, msg: ScriptMsg) { + match *self { + GlobalRef::Window(_) => ScriptTask::process_event(msg), + GlobalRef::Worker(ref worker) => worker.process_event(msg), + } + } } impl<'a> Reflectable for GlobalRef<'a> { diff --git a/components/script/dom/bindings/trace.rs b/components/script/dom/bindings/trace.rs index 1695f16ead7..3d32abb610a 100644 --- a/components/script/dom/bindings/trace.rs +++ b/components/script/dom/bindings/trace.rs @@ -207,6 +207,16 @@ impl JSTraceable for Option { } } +impl JSTraceable for Result { + #[inline] + fn trace(&self, trc: *mut JSTracer) { + match *self { + Ok(ref inner) => inner.trace(trc), + Err(ref inner) => inner.trace(trc), + } + } +} + impl JSTraceable for HashMap where K: Hash + Eq + JSTraceable, V: JSTraceable, @@ -297,6 +307,12 @@ impl JSTraceable for Box { } } +impl JSTraceable for () { + #[inline] + fn trace(&self, _trc: *mut JSTracer) { + } +} + /// Holds a set of vectors that need to be rooted pub struct RootedCollectionSet { set: Vec>> diff --git a/components/script/dom/dedicatedworkerglobalscope.rs b/components/script/dom/dedicatedworkerglobalscope.rs index 83168d44e2d..8ab0621e0f9 100644 --- a/components/script/dom/dedicatedworkerglobalscope.rs +++ b/components/script/dom/dedicatedworkerglobalscope.rs @@ -21,7 +21,7 @@ use dom::messageevent::MessageEvent; use dom::worker::{TrustedWorkerAddress, WorkerMessageHandler, WorkerEventHandler, WorkerErrorHandler}; use dom::workerglobalscope::{WorkerGlobalScope, WorkerGlobalScopeHelpers}; use dom::workerglobalscope::WorkerGlobalScopeTypeId; -use script_task::{ScriptTask, ScriptChan, ScriptMsg, TimerSource}; +use script_task::{ScriptTask, ScriptChan, ScriptMsg, TimerSource, ScriptPort}; use script_task::StackRootTLS; use msg::constellation_msg::PipelineId; @@ -38,7 +38,7 @@ use js::jsval::JSVal; use js::rust::Cx; use std::rc::Rc; -use std::sync::mpsc::{Sender, Receiver}; +use std::sync::mpsc::{Sender, Receiver, channel}; use url::Url; /// A ScriptChan that can be cloned freely and will silently send a TrustedWorkerAddress with @@ -198,6 +198,8 @@ impl DedicatedWorkerGlobalScope { pub trait DedicatedWorkerGlobalScopeHelpers { fn script_chan(self) -> Box; fn pipeline(self) -> PipelineId; + fn new_script_pair(self) -> (Box, Box); + fn process_event(self, msg: ScriptMsg); } impl<'a> DedicatedWorkerGlobalScopeHelpers for JSRef<'a, DedicatedWorkerGlobalScope> { @@ -213,6 +215,19 @@ impl<'a> DedicatedWorkerGlobalScopeHelpers for JSRef<'a, DedicatedWorkerGlobalSc fn pipeline(self) -> PipelineId { self.id } + + fn new_script_pair(self) -> (Box, Box) { + let (tx, rx) = channel(); + let chan = box SendableWorkerScriptChan { + sender: tx, + worker: self.worker.borrow().as_ref().unwrap().clone(), + }; + (chan, box rx) + } + + fn process_event(self, msg: ScriptMsg) { + self.handle_event(msg); + } } trait PrivateDedicatedWorkerGlobalScopeHelpers { diff --git a/components/script/dom/window.rs b/components/script/dom/window.rs index f4e2f5b0732..b46a5af952d 100644 --- a/components/script/dom/window.rs +++ b/components/script/dom/window.rs @@ -29,7 +29,7 @@ use dom::storage::Storage; use layout_interface::{ReflowGoal, ReflowQueryType, LayoutRPC, LayoutChan, Reflow, Msg}; use layout_interface::{ContentBoxResponse, ContentBoxesResponse, ScriptReflow}; use page::Page; -use script_task::{TimerSource, ScriptChan}; +use script_task::{TimerSource, ScriptChan, ScriptPort, NonWorkerScriptChan}; use script_task::ScriptMsg; use script_traits::ScriptControlChan; use timers::{IsInterval, TimerId, TimerManager, TimerCallback}; @@ -198,6 +198,11 @@ impl Window { self.parent_info } + pub fn new_script_pair(&self) -> (Box, Box) { + let (tx, rx) = channel(); + (box NonWorkerScriptChan(tx), box rx) + } + pub fn control_chan<'a>(&'a self) -> &'a ScriptControlChan { &self.control_chan } diff --git a/components/script/dom/workerglobalscope.rs b/components/script/dom/workerglobalscope.rs index 76fd85e0603..d22f45c971a 100644 --- a/components/script/dom/workerglobalscope.rs +++ b/components/script/dom/workerglobalscope.rs @@ -16,7 +16,7 @@ use dom::eventtarget::{EventTarget, EventTargetTypeId}; use dom::workerlocation::WorkerLocation; use dom::workernavigator::WorkerNavigator; use dom::window::{base64_atob, base64_btoa}; -use script_task::{ScriptChan, TimerSource}; +use script_task::{ScriptChan, TimerSource, ScriptPort, ScriptMsg}; use timers::{IsInterval, TimerId, TimerManager, TimerCallback}; use devtools_traits::DevtoolsControlChan; @@ -216,6 +216,8 @@ pub trait WorkerGlobalScopeHelpers { fn handle_fire_timer(self, timer_id: TimerId); fn script_chan(self) -> Box; fn pipeline(self) -> PipelineId; + fn new_script_pair(self) -> (Box, Box); + fn process_event(self, msg: ScriptMsg); fn get_cx(self) -> *mut JSContext; } @@ -238,6 +240,24 @@ impl<'a> WorkerGlobalScopeHelpers for JSRef<'a, WorkerGlobalScope> { } } + fn new_script_pair(self) -> (Box, Box) { + let dedicated: Option> = + DedicatedWorkerGlobalScopeCast::to_ref(self); + match dedicated { + Some(dedicated) => dedicated.new_script_pair(), + None => panic!("need to implement creating isolated event loops for SharedWorker"), + } + } + + fn process_event(self, msg: ScriptMsg) { + let dedicated: Option> = + DedicatedWorkerGlobalScopeCast::to_ref(self); + match dedicated { + Some(dedicated) => dedicated.process_event(msg), + None => panic!("need to implement processing single events for SharedWorker"), + } + } + fn handle_fire_timer(self, timer_id: TimerId) { self.timers.fire_timer(timer_id, self); } diff --git a/components/script/dom/xmlhttprequest.rs b/components/script/dom/xmlhttprequest.rs index 5d0f76e5463..a4e9e767dd5 100644 --- a/components/script/dom/xmlhttprequest.rs +++ b/components/script/dom/xmlhttprequest.rs @@ -26,7 +26,8 @@ use dom::urlsearchparams::URLSearchParamsHelpers; use dom::xmlhttprequesteventtarget::XMLHttpRequestEventTarget; use dom::xmlhttprequesteventtarget::XMLHttpRequestEventTargetTypeId; use dom::xmlhttprequestupload::XMLHttpRequestUpload; -use script_task::{ScriptChan, ScriptMsg, Runnable}; +use network_listener::{NetworkListener, PreInvoke}; +use script_task::{ScriptChan, ScriptMsg, Runnable, ScriptPort}; use encoding::all::UTF_8; use encoding::label::encoding_from_whatwg_label; @@ -43,19 +44,20 @@ use js::jsapi::JS_ClearPendingException; use js::jsval::{JSVal, NullValue, UndefinedValue}; use net_traits::ControlMsg::Load; -use net_traits::ProgressMsg::{Payload, Done}; -use net_traits::{ResourceTask, ResourceCORSData, LoadData, LoadResponse}; -use cors::{allow_cross_origin_request, CORSRequest, RequestMode}; +use net_traits::{ResourceTask, ResourceCORSData, LoadData, LoadConsumer}; +use net_traits::{AsyncResponseListener, Metadata}; +use cors::{allow_cross_origin_request, CORSRequest, RequestMode, AsyncCORSResponseListener}; +use cors::CORSResponse; use util::str::DOMString; use util::task::spawn_named; use std::ascii::AsciiExt; use std::borrow::ToOwned; -use std::cell::Cell; -use std::sync::mpsc::{Sender, Receiver, channel}; +use std::cell::{RefCell, Cell}; use std::default::Default; use std::old_io::Timer; use std::str::FromStr; +use std::sync::{Mutex, Arc}; use std::time::duration::Duration; use time; use url::{Url, UrlParser}; @@ -74,28 +76,20 @@ enum XMLHttpRequestState { Done = 4, } -struct XHRProgressHandler { - addr: TrustedXHRAddress, - progress: XHRProgress, -} - -impl XHRProgressHandler { - fn new(addr: TrustedXHRAddress, progress: XHRProgress) -> XHRProgressHandler { - XHRProgressHandler { addr: addr, progress: progress } - } -} - -impl Runnable for XHRProgressHandler { - fn handler(self: Box) { - let this = *self; - XMLHttpRequest::handle_progress(this.addr, this.progress); - } -} - #[derive(PartialEq, Clone, Copy)] #[jstraceable] pub struct GenerationId(u32); +/// Closure of required data for each async network event that comprises the +/// XHR's response. +struct XHRContext { + xhr: TrustedXHRAddress, + gen_id: GenerationId, + cors_request: Option, + buf: DOMRefCell>, + sync_status: DOMRefCell>, +} + #[derive(Clone)] pub enum XHRProgress { /// Notify that headers have been received @@ -119,16 +113,6 @@ impl XHRProgress { } } -enum SyncOrAsync<'a> { - Sync(JSRef<'a, XMLHttpRequest>), - Async(TrustedXHRAddress, Box) -} - -enum TerminateReason { - AbortedOrReopened, - TimedOut, -} - #[dom_struct] pub struct XMLHttpRequest { eventtarget: XMLHttpRequestEventTarget, @@ -157,8 +141,9 @@ pub struct XMLHttpRequest { global: GlobalField, timer: DOMRefCell, fetch_time: Cell, - terminate_sender: DOMRefCell>>, + timeout_target: DOMRefCell>>, generation_id: Cell, + response_status: Cell>, } impl XMLHttpRequest { @@ -190,8 +175,9 @@ impl XMLHttpRequest { global: GlobalField::from_rooted(&global), timer: DOMRefCell::new(Timer::new().unwrap()), fetch_time: Cell::new(0), - terminate_sender: DOMRefCell::new(None), - generation_id: Cell::new(GenerationId(0)) + timeout_target: DOMRefCell::new(None), + generation_id: Cell::new(GenerationId(0)), + response_status: Cell::new(Ok(())), } } pub fn new(global: GlobalRef) -> Temporary { @@ -205,141 +191,91 @@ impl XMLHttpRequest { Ok(XMLHttpRequest::new(global)) } - pub fn handle_progress(addr: TrustedXHRAddress, progress: XHRProgress) { - let xhr = addr.to_temporary().root(); - xhr.r().process_partial_response(progress); - } - - #[allow(unsafe_code)] - fn fetch(fetch_type: &SyncOrAsync, resource_task: ResourceTask, - mut load_data: LoadData, terminate_receiver: Receiver, - cors_request: Result,()>, gen_id: GenerationId, - start_port: Receiver) -> ErrorResult { - - fn notify_partial_progress(fetch_type: &SyncOrAsync, msg: XHRProgress) { - match *fetch_type { - SyncOrAsync::Sync(xhr) => { - xhr.process_partial_response(msg); - }, - SyncOrAsync::Async(ref addr, ref script_chan) => { - script_chan.send(ScriptMsg::RunnableMsg(box XHRProgressHandler::new(addr.clone(), msg))).unwrap(); - } - } + fn check_cors(context: Arc>, + load_data: LoadData, + req: CORSRequest, + script_chan: Box, + resource_task: ResourceTask) { + struct CORSContext { + xhr: Arc>, + load_data: RefCell>, + req: CORSRequest, + script_chan: Box, + resource_task: ResourceTask, } - macro_rules! notify_error_and_return( - ($err:expr) => ({ - notify_partial_progress(fetch_type, XHRProgress::Errored(gen_id, $err)); - return Err($err) - }); - ); - - macro_rules! terminate( - ($reason:expr) => ( - match $reason { - TerminateReason::AbortedOrReopened => { - return Err(Abort) - } - TerminateReason::TimedOut => { - notify_error_and_return!(Timeout); - } + impl AsyncCORSResponseListener for CORSContext { + fn response_available(&self, response: CORSResponse) { + if response.network_error { + let mut context = self.xhr.lock().unwrap(); + let xhr = context.xhr.to_temporary().root(); + xhr.r().process_partial_response(XHRProgress::Errored(context.gen_id, Network)); + *context.sync_status.borrow_mut() = Some(Err(Network)); + return; } - ); - ); - - match cors_request { - Err(_) => { - // Happens in case of cross-origin non-http URIs - notify_error_and_return!(Network); - } - - Ok(Some(ref req)) => { - let (chan, cors_port) = channel(); - let req2 = req.clone(); - // TODO: this exists only to make preflight check non-blocking - // perhaps should be handled by the resource_loader? - spawn_named("XHR:Cors".to_owned(), move || { - let response = req2.http_fetch(); - chan.send(response).unwrap(); + let mut load_data = self.load_data.borrow_mut().take().unwrap(); + load_data.cors = Some(ResourceCORSData { + preflight: self.req.preflight_flag, + origin: self.req.origin.clone() }); - select! ( - response = cors_port.recv() => { - let response = response.unwrap(); - if response.network_error { - notify_error_and_return!(Network); - } else { - load_data.cors = Some(ResourceCORSData { - preflight: req.preflight_flag, - origin: req.origin.clone() - }); - } - }, - reason = terminate_receiver.recv() => terminate!(reason.unwrap()) - ); + XMLHttpRequest::initiate_async_xhr(self.xhr.clone(), self.script_chan.clone(), + self.resource_task.clone(), load_data); } - _ => {} } - // Step 10, 13 - resource_task.send(Load(load_data)).unwrap(); + let cors_context = CORSContext { + xhr: context, + load_data: RefCell::new(Some(load_data)), + req: req.clone(), + script_chan: script_chan.clone(), + resource_task: resource_task, + }; + req.http_fetch_async(box cors_context, script_chan); + } - let progress_port; - select! ( - response = start_port.recv() => { - let response = response.unwrap(); - match cors_request { - Ok(Some(ref req)) => { - match response.metadata.headers { - Some(ref h) if allow_cross_origin_request(req, h) => {}, - _ => notify_error_and_return!(Network) - } - }, + fn initiate_async_xhr(context: Arc>, + script_chan: Box, + resource_task: ResourceTask, + load_data: LoadData) { + impl AsyncResponseListener for XHRContext { + fn headers_available(&self, metadata: Metadata) { + let xhr = self.xhr.to_temporary().root(); + let rv = xhr.r().process_headers_available(self.cors_request.clone(), + self.gen_id, + metadata); + if rv.is_err() { + *self.sync_status.borrow_mut() = Some(rv); + } + } - _ => {} - }; - // XXXManishearth Clear cache entries in case of a network error - notify_partial_progress(fetch_type, XHRProgress::HeadersReceived(gen_id, - response.metadata.headers.clone(), response.metadata.status.clone())); + fn data_available(&self, payload: Vec) { + self.buf.borrow_mut().push_all(payload.as_slice()); + let xhr = self.xhr.to_temporary().root(); + xhr.r().process_data_available(self.gen_id, self.buf.borrow().clone()); + } - progress_port = response.progress_port; - }, - reason = terminate_receiver.recv() => terminate!(reason.unwrap()) - ); - - let mut buf = vec!(); - loop { - // Under most circumstances, progress_port will contain lots of Payload - // events. Since select! does not have any fairness or priority, it - // might always remove the progress_port event, even when there is - // a terminate event waiting in the terminate_receiver. If this happens, - // a timeout or abort will take too long to be processed. To avoid this, - // in each iteration, we check for a terminate event before we block. - match terminate_receiver.try_recv() { - Ok(reason) => terminate!(reason), - Err(_) => () - }; - - select! ( - progress = progress_port.recv() => match progress.unwrap() { - Payload(data) => { - buf.push_all(data.as_slice()); - notify_partial_progress(fetch_type, - XHRProgress::Loading(gen_id, ByteString::new(buf.clone()))); - }, - Done(Ok(())) => { - notify_partial_progress(fetch_type, XHRProgress::Done(gen_id)); - return Ok(()); - }, - Done(Err(_)) => { - notify_error_and_return!(Network); - } - }, - reason = terminate_receiver.recv() => terminate!(reason.unwrap()) - ); + fn response_complete(&self, status: Result<(), String>) { + let xhr = self.xhr.to_temporary().root(); + let rv = xhr.r().process_response_complete(self.gen_id, status); + *self.sync_status.borrow_mut() = Some(rv); + } } + + impl PreInvoke for XHRContext { + fn should_invoke(&self) -> bool { + let xhr = self.xhr.to_temporary().root(); + xhr.r().generation_id.get() == self.gen_id + } + } + + let listener = box NetworkListener { + context: context, + script_chan: script_chan, + }; + resource_task.send(Load(load_data, LoadConsumer::Listener(listener))).unwrap(); } } @@ -577,10 +513,7 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { } - let global = self.global.root(); - let resource_task = global.r().resource_task(); - let (start_chan, start_port) = channel(); - let mut load_data = LoadData::new(self.request_url.borrow().clone().unwrap(), start_chan); + let mut load_data = LoadData::new(self.request_url.borrow().clone().unwrap()); load_data.data = extracted; #[inline] @@ -613,10 +546,9 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { } load_data.method = (*self.request_method.borrow()).clone(); - let (terminate_sender, terminate_receiver) = channel(); - *self.terminate_sender.borrow_mut() = Some(terminate_sender); // CORS stuff + let global = self.global.root(); let referer_url = self.global.root().r().get_url(); let mode = if self.upload_events.get() { RequestMode::ForcedPreflight @@ -647,31 +579,15 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> { debug!("request_headers = {:?}", *self.request_headers.borrow()); - let gen_id = self.generation_id.get(); + self.fetch_time.set(time::now().to_timespec().sec); + let rv = self.fetch(load_data, cors_request, global.r()); if self.sync.get() { - return XMLHttpRequest::fetch(&mut SyncOrAsync::Sync(self), resource_task, load_data, - terminate_receiver, cors_request, gen_id, start_port); - } else { - self.fetch_time.set(time::now().to_timespec().sec); - let script_chan = global.r().script_chan(); - // Pin the object before launching the fetch task. This is to ensure that - // the object will stay alive as long as there are (possibly cancelled) - // inflight events queued up in the script task's port. - let addr = Trusted::new(self.global.root().r().get_cx(), self, - script_chan.clone()); - spawn_named("XHRTask".to_owned(), move || { - let _ = XMLHttpRequest::fetch(&mut SyncOrAsync::Async(addr, script_chan), - resource_task, - load_data, - terminate_receiver, - cors_request, - gen_id, - start_port); - }); - let timeout = self.timeout.get(); - if timeout > 0 { - self.set_timeout(timeout); - } + return rv; + } + + let timeout = self.timeout.get(); + if timeout > 0 { + self.set_timeout(timeout); } Ok(()) } @@ -812,6 +728,10 @@ pub type TrustedXHRAddress = Trusted; trait PrivateXMLHttpRequestHelpers { fn change_ready_state(self, XMLHttpRequestState); + fn process_headers_available(&self, cors_request: Option, + gen_id: GenerationId, metadata: Metadata) -> Result<(), Error>; + fn process_data_available(self, gen_id: GenerationId, payload: Vec); + fn process_response_complete(self, gen_id: GenerationId, status: Result<(), String>) -> ErrorResult; fn process_partial_response(self, progress: XHRProgress); fn terminate_ongoing_fetch(self); fn insert_trusted_header(self, name: String, value: String); @@ -822,6 +742,9 @@ trait PrivateXMLHttpRequestHelpers { fn set_timeout(self, timeout:u32); fn cancel_timeout(self); fn filter_response_headers(self) -> Headers; + fn discard_subsequent_responses(self); + fn fetch(self, load_data: LoadData, cors_request: Result,()>, + global: GlobalRef) -> ErrorResult; } impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { @@ -837,6 +760,45 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { event.r().fire(target); } + fn process_headers_available(&self, cors_request: Option, + gen_id: GenerationId, metadata: Metadata) -> Result<(), Error> { + match cors_request { + Some(ref req) => { + match metadata.headers { + Some(ref h) if allow_cross_origin_request(req, h) => {}, + _ => { + self.process_partial_response(XHRProgress::Errored(gen_id, Network)); + return Err(Network); + } + } + }, + + _ => {} + }; + // XXXManishearth Clear cache entries in case of a network error + self.process_partial_response(XHRProgress::HeadersReceived(gen_id, + metadata.headers, metadata.status)); + Ok(()) + } + + fn process_data_available(self, gen_id: GenerationId, payload: Vec) { + self.process_partial_response(XHRProgress::Loading(gen_id, ByteString::new(payload))); + } + + fn process_response_complete(self, gen_id: GenerationId, status: Result<(), String>) + -> ErrorResult { + match status { + Ok(()) => { + self.process_partial_response(XHRProgress::Done(gen_id)); + Ok(()) + }, + Err(_) => { + self.process_partial_response(XHRProgress::Errored(gen_id, Network)); + Err(Network) + } + } + } + fn process_partial_response(self, progress: XHRProgress) { let msg_id = progress.generation_id(); @@ -853,6 +815,11 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { // Ignore message if it belongs to a terminated fetch return_if_fetch_was_terminated!(); + // Ignore messages coming from previously-errored responses or requests that have timed out + if self.response_status.get().is_err() { + return; + } + match progress { XHRProgress::HeadersReceived(_, headers, status) => { assert!(self.ready_state.get() == XMLHttpRequestState::Opened); @@ -904,6 +871,8 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { self.ready_state.get() == XMLHttpRequestState::Loading || self.sync.get()); + self.cancel_timeout(); + // Part of step 11, send() (processing response end of file) // XXXManishearth handle errors, if any (substep 2) @@ -919,6 +888,9 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { self.dispatch_response_progress_event("loadend".to_owned()); }, XHRProgress::Errored(_, e) => { + self.cancel_timeout(); + + self.discard_subsequent_responses(); self.send_flag.set(false); // XXXManishearth set response to NetworkError self.change_ready_state(XMLHttpRequestState::Done); @@ -952,7 +924,8 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { fn terminate_ongoing_fetch(self) { let GenerationId(prev_id) = self.generation_id.get(); self.generation_id.set(GenerationId(prev_id + 1)); - self.terminate_sender.borrow().as_ref().map(|s| s.send(TerminateReason::AbortedOrReopened)); + *self.timeout_target.borrow_mut() = None; + self.response_status.set(Ok(())); } fn insert_trusted_header(self, name: String, value: String) { @@ -990,15 +963,36 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { self.dispatch_progress_event(false, type_, len, total); } fn set_timeout(self, timeout: u32) { + struct XHRTimeout { + xhr: TrustedXHRAddress, + gen_id: GenerationId, + } + + impl Runnable for XHRTimeout { + fn handler(self: Box) { + let this = *self; + let xhr = this.xhr.to_temporary().root(); + if xhr.r().ready_state.get() != XMLHttpRequestState::Done { + xhr.r().process_partial_response(XHRProgress::Errored(this.gen_id, Timeout)); + } + } + } + // Sets up the object to timeout in a given number of milliseconds // This will cancel all previous timeouts let oneshot = self.timer.borrow_mut() .oneshot(Duration::milliseconds(timeout as i64)); - let terminate_sender = (*self.terminate_sender.borrow()).clone(); + let timeout_target = (*self.timeout_target.borrow().as_ref().unwrap()).clone(); + let global = self.global.root(); + let xhr = Trusted::new(global.r().get_cx(), self, global.r().script_chan()); + let gen_id = self.generation_id.get(); spawn_named("XHR:Timer".to_owned(), move || { match oneshot.recv() { Ok(_) => { - terminate_sender.map(|s| s.send(TerminateReason::TimedOut)); + timeout_target.send(ScriptMsg::RunnableMsg(box XHRTimeout { + xhr: xhr, + gen_id: gen_id, + })).unwrap(); }, Err(_) => { // This occurs if xhr.timeout (the sender) goes out of scope (i.e, xhr went out of scope) @@ -1065,6 +1059,64 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> { // XXXManishearth additional CORS filtering goes here headers } + + fn discard_subsequent_responses(self) { + self.response_status.set(Err(())); + } + + fn fetch(self, + load_data: LoadData, + cors_request: Result,()>, + global: GlobalRef) -> ErrorResult { + let cors_request = match cors_request { + Err(_) => { + // Happens in case of cross-origin non-http URIs + self.process_partial_response(XHRProgress::Errored( + self.generation_id.get(), Network)); + return Err(Network); + } + Ok(req) => req, + }; + + let xhr = Trusted::new(global.get_cx(), self, global.script_chan()); + + let context = Arc::new(Mutex::new(XHRContext { + xhr: xhr, + cors_request: cors_request.clone(), + gen_id: self.generation_id.get(), + buf: DOMRefCell::new(vec!()), + sync_status: DOMRefCell::new(None), + })); + + let (script_chan, script_port) = if self.sync.get() { + let (tx, rx) = global.new_script_pair(); + (tx, Some(rx)) + } else { + (global.script_chan(), None) + }; + *self.timeout_target.borrow_mut() = Some(script_chan.clone()); + + let resource_task = global.resource_task(); + if let Some(req) = cors_request { + XMLHttpRequest::check_cors(context.clone(), load_data, req.clone(), + script_chan.clone(), resource_task); + } else { + XMLHttpRequest::initiate_async_xhr(context.clone(), script_chan, + resource_task, load_data); + } + + if let Some(script_port) = script_port { + loop { + global.process_event(script_port.recv()); + let context = context.lock().unwrap(); + let sync_status = context.sync_status.borrow(); + if let Some(ref status) = *sync_status { + return status.clone(); + } + } + } + Ok(()) + } } trait Extractable { diff --git a/components/script/lib.rs b/components/script/lib.rs index 4a9fcdbd09c..b9f4b42f84d 100644 --- a/components/script/lib.rs +++ b/components/script/lib.rs @@ -61,6 +61,7 @@ pub mod dom; pub mod parse; pub mod layout_interface; +mod network_listener; pub mod page; pub mod script_task; mod timers; diff --git a/components/script/network_listener.rs b/components/script/network_listener.rs new file mode 100644 index 00000000000..5f400eb76b3 --- /dev/null +++ b/components/script/network_listener.rs @@ -0,0 +1,48 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use script_task::{ScriptChan, ScriptMsg, Runnable}; +use net_traits::{AsyncResponseTarget, AsyncResponseListener, ResponseAction}; +use std::sync::{Arc, Mutex}; + +/// An off-thread sink for async network event runnables. All such events are forwarded to +/// a target thread, where they are invoked on the provided context object. +pub struct NetworkListener { + pub context: Arc>, + pub script_chan: Box, +} + +impl AsyncResponseTarget for NetworkListener { + fn invoke_with_listener(&self, action: ResponseAction) { + self.script_chan.send(ScriptMsg::RunnableMsg(box ListenerRunnable { + context: self.context.clone(), + action: action, + })).unwrap(); + } +} + +/// A gating mechanism that runs before invoking the runnable on the target thread. +/// If the `should_invoke` method returns false, the runnable is discarded without +/// being invoked. +pub trait PreInvoke { + fn should_invoke(&self) -> bool { + true + } +} + +/// A runnable for moving the async network events between threads. +struct ListenerRunnable { + context: Arc>, + action: ResponseAction, +} + +impl Runnable for ListenerRunnable { + fn handler(self: Box>) { + let this = *self; + let context = this.context.lock().unwrap(); + if context.should_invoke() { + this.action.process(&*context); + } + } +} diff --git a/components/script/script_task.rs b/components/script/script_task.rs index 877289a27fd..31116307b3c 100644 --- a/components/script/script_task.rs +++ b/components/script/script_task.rs @@ -38,6 +38,7 @@ use dom::uievent::UIEvent; use dom::eventtarget::EventTarget; use dom::node::{self, Node, NodeHelpers, NodeDamage, window_from_node}; use dom::window::{Window, WindowHelpers, ScriptHelpers, ReflowReason}; +use dom::worker::TrustedWorkerAddress; use parse::html::{HTMLInput, parse_html}; use layout_interface::{ScriptLayoutChan, LayoutChan, ReflowGoal, ReflowQueryType}; use layout_interface; @@ -61,7 +62,7 @@ use msg::constellation_msg::{ConstellationChan}; use msg::constellation_msg::{LoadData, PipelineId, SubpageId, MozBrowserEvent, WorkerId}; use msg::constellation_msg::{Failure, WindowSizeData, PipelineExitType}; use msg::constellation_msg::Msg as ConstellationMsg; -use net_traits::{ResourceTask, ControlMsg, LoadResponse}; +use net_traits::{ResourceTask, ControlMsg, LoadResponse, LoadConsumer}; use net_traits::LoadData as NetLoadData; use net_traits::image_cache_task::ImageCacheTask; use net_traits::storage_task::StorageTask; @@ -200,6 +201,25 @@ pub trait ScriptChan { fn clone(&self) -> Box; } +/// An interface for receiving ScriptMsg values in an event loop. Used for synchronous DOM +/// APIs that need to abstract over multiple kinds of event loops (worker/main thread) with +/// different Receiver interfaces. +pub trait ScriptPort { + fn recv(&self) -> ScriptMsg; +} + +impl ScriptPort for Receiver { + fn recv(&self) -> ScriptMsg { + self.recv().unwrap() + } +} + +impl ScriptPort for Receiver<(TrustedWorkerAddress, ScriptMsg)> { + fn recv(&self) -> ScriptMsg { + self.recv().unwrap().1 + } +} + /// Encapsulates internal communication within the script task. #[jstraceable] pub struct NonWorkerScriptChan(pub Sender); @@ -403,6 +423,15 @@ unsafe extern "C" fn debug_gc_callback(_rt: *mut JSRuntime, status: JSGCStatus) } impl ScriptTask { + pub fn process_event(msg: ScriptMsg) { + SCRIPT_TASK_ROOT.with(|root| { + if let Some(script_task) = *root.borrow() { + let script_task = unsafe { &*script_task }; + script_task.handle_msg_from_script(msg); + } + }); + } + /// Creates a new script task. pub fn new(compositor: Box, port: Receiver, @@ -1331,8 +1360,7 @@ impl ScriptTask { preserved_headers: load_data.headers, data: load_data.data, cors: None, - consumer: input_chan, - })).unwrap(); + }, LoadConsumer::Channel(input_chan))).unwrap(); let load_response = input_port.recv().unwrap(); script_chan.send(ScriptMsg::PageFetchComplete(id, subpage, load_response)).unwrap(); diff --git a/components/servo/Cargo.lock b/components/servo/Cargo.lock index 153569455a3..15a82748036 100644 --- a/components/servo/Cargo.lock +++ b/components/servo/Cargo.lock @@ -932,6 +932,7 @@ dependencies = [ "cssparser 0.2.0 (git+https://github.com/servo/rust-cssparser)", "geom 0.1.0 (git+https://github.com/servo/rust-geom)", "gfx 0.0.1", + "hyper 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "net 0.0.1", "net_traits 0.0.1", "profile 0.0.1", diff --git a/tests/unit/net/data_loader.rs b/tests/unit/net/data_loader.rs index 482a7462bea..6f6f8b6c02c 100644 --- a/tests/unit/net/data_loader.rs +++ b/tests/unit/net/data_loader.rs @@ -4,6 +4,7 @@ extern crate hyper; +use net_traits::ResponseSenders::Channel; use net_traits::LoadData; use net_traits::ProgressMsg::{Payload, Done}; use self::hyper::header::ContentType; @@ -19,7 +20,7 @@ fn assert_parse(url: &'static str, use net::data_loader::load; let (start_chan, start_port) = channel(); - load(LoadData::new(Url::parse(url).unwrap(), start_chan)); + load(LoadData::new(Url::parse(url).unwrap()), Channel(start_chan)); let response = start_port.recv().unwrap(); assert_eq!(&response.metadata.content_type, &content_type); diff --git a/tests/unit/net/image_cache_task.rs b/tests/unit/net/image_cache_task.rs index c384411fb99..a12e9b2b2e0 100644 --- a/tests/unit/net/image_cache_task.rs +++ b/tests/unit/net/image_cache_task.rs @@ -6,8 +6,8 @@ use net::image_cache_task::*; use net_traits::image_cache_task::ImageResponseMsg::*; use net_traits::image_cache_task::Msg::*; -use net::resource_task::start_sending; -use net_traits::{ControlMsg, Metadata, ProgressMsg, ResourceTask}; +use net::resource_task::{start_sending, ProgressSender}; +use net_traits::{ControlMsg, Metadata, ProgressMsg, ResourceTask, ResponseSenders}; use net_traits::image_cache_task::{ImageCacheTask, ImageCacheTaskClient, ImageResponseMsg, Msg}; use net_traits::ProgressMsg::{Payload, Done}; use profile::time; @@ -41,7 +41,7 @@ impl ImageCacheTaskHelper for ImageCacheTask { } trait Closure { - fn invoke(&self, _response: Sender) { } + fn invoke(&self, _response: ProgressSender) { } } struct DoesNothing; impl Closure for DoesNothing { } @@ -50,7 +50,7 @@ struct JustSendOK { url_requested_chan: Sender<()>, } impl Closure for JustSendOK { - fn invoke(&self, response: Sender) { + fn invoke(&self, response: ProgressSender) { self.url_requested_chan.send(()).unwrap(); response.send(Done(Ok(()))).unwrap(); } @@ -58,7 +58,7 @@ impl Closure for JustSendOK { struct SendTestImage; impl Closure for SendTestImage { - fn invoke(&self, response: Sender) { + fn invoke(&self, response: ProgressSender) { response.send(Payload(test_image_bin())).unwrap(); response.send(Done(Ok(()))).unwrap(); } @@ -66,7 +66,7 @@ impl Closure for SendTestImage { struct SendBogusImage; impl Closure for SendBogusImage { - fn invoke(&self, response: Sender) { + fn invoke(&self, response: ProgressSender) { response.send(Payload(vec!())).unwrap(); response.send(Done(Ok(()))).unwrap(); } @@ -74,7 +74,7 @@ impl Closure for SendBogusImage { struct SendTestImageErr; impl Closure for SendTestImageErr { - fn invoke(&self, response: Sender) { + fn invoke(&self, response: ProgressSender) { response.send(Payload(test_image_bin())).unwrap(); response.send(Done(Err("".to_string()))).unwrap(); } @@ -84,7 +84,7 @@ struct WaitSendTestImage { wait_port: Receiver<()>, } impl Closure for WaitSendTestImage { - fn invoke(&self, response: Sender) { + fn invoke(&self, response: ProgressSender) { // Don't send the data until after the client requests // the image self.wait_port.recv().unwrap(); @@ -97,7 +97,7 @@ struct WaitSendTestImageErr { wait_port: Receiver<()>, } impl Closure for WaitSendTestImageErr { - fn invoke(&self, response: Sender) { + fn invoke(&self, response: ProgressSender) { // Don't send the data until after the client requests // the image self.wait_port.recv().unwrap(); @@ -110,8 +110,8 @@ fn mock_resource_task(on_load: Box) -> ResourceT spawn_listener(move |port: Receiver| { loop { match port.recv().unwrap() { - ControlMsg::Load(response) => { - let chan = start_sending(response.consumer, Metadata::default( + ControlMsg::Load(response, consumer) => { + let chan = start_sending(ResponseSenders::from_consumer(consumer), Metadata::default( Url::parse("file:///fake").unwrap())); on_load.invoke(chan); } @@ -280,8 +280,8 @@ fn should_not_request_image_from_resource_task_if_image_is_already_available() { let mock_resource_task = spawn_listener(move |port: Receiver| { loop { match port.recv().unwrap() { - ControlMsg::Load(response) => { - let chan = start_sending(response.consumer, Metadata::default( + ControlMsg::Load(response, consumer) => { + let chan = start_sending(ResponseSenders::from_consumer(consumer), Metadata::default( Url::parse("file:///fake").unwrap())); chan.send(Payload(test_image_bin())); chan.send(Done(Ok(()))); @@ -329,8 +329,8 @@ fn should_not_request_image_from_resource_task_if_image_fetch_already_failed() { let mock_resource_task = spawn_listener(move |port: Receiver| { loop { match port.recv().unwrap() { - ControlMsg::Load(response) => { - let chan = start_sending(response.consumer, Metadata::default( + ControlMsg::Load(response, consumer) => { + let chan = start_sending(ResponseSenders::from_consumer(consumer), Metadata::default( Url::parse("file:///fake").unwrap())); chan.send(Payload(test_image_bin())); chan.send(Done(Err("".to_string()))); diff --git a/tests/unit/net/resource_task.rs b/tests/unit/net/resource_task.rs index da79322a2ff..d9ef7cde4d3 100644 --- a/tests/unit/net/resource_task.rs +++ b/tests/unit/net/resource_task.rs @@ -3,7 +3,7 @@ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ use net::resource_task::{new_resource_task, parse_hostsfile, replace_hosts}; -use net_traits::{ControlMsg, LoadData}; +use net_traits::{ControlMsg, LoadData, LoadConsumer}; use net_traits::ProgressMsg; use std::borrow::ToOwned; use std::boxed; @@ -23,7 +23,7 @@ fn test_bad_scheme() { let resource_task = new_resource_task(None); let (start_chan, start) = channel(); let url = Url::parse("bogus://whatever").unwrap(); - resource_task.send(ControlMsg::Load(LoadData::new(url, start_chan))).unwrap(); + resource_task.send(ControlMsg::Load(LoadData::new(url), LoadConsumer::Channel(start_chan))).unwrap(); let response = start.recv().unwrap(); match response.progress_port.recv().unwrap() { ProgressMsg::Done(result) => { assert!(result.is_err()) } @@ -173,7 +173,7 @@ fn test_replace_hosts() { let resource_task = new_resource_task(None); let (start_chan, _) = channel(); let url = Url::parse(&format!("http://foo.bar.com:{}", port)).unwrap(); - resource_task.send(ControlMsg::Load(replace_hosts(LoadData::new(url, start_chan), host_table))).unwrap(); + resource_task.send(ControlMsg::Load(replace_hosts(LoadData::new(url), host_table), LoadConsumer::Channel(start_chan))).unwrap(); match listener.accept() { Ok(..) => assert!(true, "received request"),