diff --git a/components/net/fetch/methods.rs b/components/net/fetch/methods.rs index 16d64a86a7c..92b9aec1b00 100644 --- a/components/net/fetch/methods.rs +++ b/components/net/fetch/methods.rs @@ -16,9 +16,10 @@ use hyper::method::Method; use hyper::mime::{Mime, SubLevel, TopLevel}; use hyper::status::StatusCode; use mime_guess::guess_mime_type; -use net_traits::AsyncFetchListener; -use net_traits::request::{CacheMode, CredentialsMode, Type, Origin, Window}; +use net_traits::FetchTaskTarget; +use net_traits::request::{CacheMode, CredentialsMode}; use net_traits::request::{RedirectMode, Referer, Request, RequestMode, ResponseTainting}; +use net_traits::request::{Type, Origin, Window}; use net_traits::response::{HttpsState, TerminationReason}; use net_traits::response::{Response, ResponseBody, ResponseType}; use resource_thread::CancellationListener; @@ -27,26 +28,23 @@ use std::fs::File; use std::io::Read; use std::iter::FromIterator; use std::rc::Rc; -use std::thread; +use std::sync::mpsc::{channel, Sender, Receiver}; use unicase::UniCase; use url::{Origin as UrlOrigin, Url}; use util::thread::spawn_named; -pub fn fetch_async(request: Request, listener: Box) { - spawn_named(format!("fetch for {:?}", request.current_url_string()), move || { - let request = Rc::new(request); - let fetch_response = fetch(request); - fetch_response.wait_until_done(); - listener.response_available(fetch_response); - }) -} +pub type Target = Option>; + +type DoneChannel = Option<(Sender<()>, Receiver<()>)>; /// [Fetch](https://fetch.spec.whatwg.org#concept-fetch) -pub fn fetch(request: Rc) -> Response { - fetch_with_cors_cache(request, &mut CORSCache::new()) +pub fn fetch(request: Rc, target: Target) -> Response { + fetch_with_cors_cache(request, &mut CORSCache::new(), target) } -pub fn fetch_with_cors_cache(request: Rc, cache: &mut CORSCache) -> Response { +pub fn fetch_with_cors_cache(request: Rc, + cache: &mut CORSCache, + mut target: Target) -> Response { // Step 1 if request.window.get() == Window::Client { // TODO: Set window to request's client object if client is a Window object @@ -105,12 +103,14 @@ pub fn fetch_with_cors_cache(request: Rc, cache: &mut CORSCache) -> Res if request.is_subresource_request() { // TODO: create a fetch record and append it to request's client's fetch group list } + // Step 7 - main_fetch(request, cache, false, false) + main_fetch(request, cache, false, false, &mut target, &mut None) } /// [Main fetch](https://fetch.spec.whatwg.org/#concept-main-fetch) -fn main_fetch(request: Rc, cache: &mut CORSCache, cors_flag: bool, recursive_flag: bool) -> Response { +fn main_fetch(request: Rc, cache: &mut CORSCache, cors_flag: bool, + recursive_flag: bool, target: &mut Target, done_chan: &mut DoneChannel) -> Response { // TODO: Implement main fetch spec // Step 1 @@ -128,23 +128,26 @@ fn main_fetch(request: Rc, cache: &mut CORSCache, cors_flag: bool, recu // TODO be able to execute report CSP // Step 4 - // TODO this step, based off of http_loader.rs + // TODO this step, based off of http_loader.rs (upgrade) // Step 5 - // TODO this step + // TODO this step (CSP port/content blocking) - // Step 6 + // Step 6-7 + // TODO this step (referer policy) + + // Step 8 if request.referer != Referer::NoReferer { // TODO be able to invoke "determine request's referer" } - // Step 7 - // TODO this step + // Step 9 + // TODO this step (HSTS) - // Step 8 + // Step 10 // this step is obsoleted by fetch_async - // Step 9 + // Step 11 let response = match response { Some(response) => response, None => { @@ -160,14 +163,14 @@ fn main_fetch(request: Rc, cache: &mut CORSCache, cors_flag: bool, recu (current_url.scheme() == "file" && request.same_origin_data.get()) || current_url.scheme() == "about" || request.mode == RequestMode::Navigate { - basic_fetch(request.clone(), cache) + basic_fetch(request.clone(), cache, target, done_chan) } else if request.mode == RequestMode::SameOrigin { Response::network_error() } else if request.mode == RequestMode::NoCORS { request.response_tainting.set(ResponseTainting::Opaque); - basic_fetch(request.clone(), cache) + basic_fetch(request.clone(), cache, target, done_chan) } else if !matches!(current_url.scheme(), "http" | "https") { Response::network_error() @@ -178,7 +181,7 @@ fn main_fetch(request: Rc, cache: &mut CORSCache, cors_flag: bool, recu request.headers.borrow().iter().any(|h| !is_simple_header(&h)))) { request.response_tainting.set(ResponseTainting::CORSTainting); request.redirect_mode.set(RedirectMode::Error); - let response = http_fetch(request.clone(), cache, true, true, false); + let response = http_fetch(request.clone(), cache, true, true, false, target, done_chan); if response.is_network_error() { // TODO clear cache entries using request } @@ -186,17 +189,17 @@ fn main_fetch(request: Rc, cache: &mut CORSCache, cors_flag: bool, recu } else { request.response_tainting.set(ResponseTainting::CORSTainting); - http_fetch(request.clone(), cache, true, false, false) + http_fetch(request.clone(), cache, true, false, false, target, done_chan) } } }; - // Step 10 + // Step 12 if recursive_flag { return response; } - // Step 11 + // Step 13 // no need to check if response is a network error, since the type would not be `Default` let response = if response.response_type == ResponseType::Default { let response_type = match request.response_tainting.get() { @@ -210,7 +213,7 @@ fn main_fetch(request: Rc, cache: &mut CORSCache, cors_flag: bool, recu }; { - // Step 12 + // Step 14 let network_error_res = Response::network_error(); let internal_response = if response.is_network_error() { &network_error_res @@ -218,10 +221,10 @@ fn main_fetch(request: Rc, cache: &mut CORSCache, cors_flag: bool, recu response.actual_response() }; - // Step 13 - // TODO this step + // Step 15 + // TODO this step (CSP/blocking) - // Step 14 + // Step 16 if !response.is_network_error() && (is_null_body_status(&internal_response.status) || match *request.method.borrow() { Method::Head | Method::Connect => true, @@ -233,7 +236,7 @@ fn main_fetch(request: Rc, cache: &mut CORSCache, cors_flag: bool, recu *body = ResponseBody::Empty; } - // Step 15 + // Step 17 // TODO be able to compare response integrity against request integrity metadata // if !response.is_network_error() { @@ -248,34 +251,45 @@ fn main_fetch(request: Rc, cache: &mut CORSCache, cors_flag: bool, recu // } } - // Step 16 + // Step 18 if request.synchronous { - response.actual_response().wait_until_done(); + if !response.is_network_error() { + if let Some(ref ch) = *done_chan { + let _ = ch.1.recv(); + } + } return response; } - // Step 17 + // Step 19 if request.body.borrow().is_some() && matches!(request.current_url().scheme(), "http" | "https") { - // TODO queue a fetch task on request to process end-of-file + if let Some(ref mut target) = *target { + // XXXManishearth: We actually should be calling process_request + // in http_network_fetch. However, we can't yet follow the request + // upload progress, so I'm keeping it here for now and pretending + // the body got sent in one chunk + target.process_request_body(&request); + target.process_request_eof(&request); + } } { - // Step 12 repeated to use internal_response - let network_error_res = Response::network_error(); - let internal_response = if response.is_network_error() { - &network_error_res - } else { - response.actual_response() - }; - - // Step 18 - // TODO this step - - // Step 19 - internal_response.wait_until_done(); - // Step 20 - // TODO this step + if let Some(ref mut target) = *target { + target.process_response(&response); + } + + // Step 21 + if !response.is_network_error() { + if let Some(ref ch) = *done_chan { + let _ = ch.1.recv(); + } + } + + // Step 22 + if let Some(ref mut target) = *target { + target.process_response_eof(&response); + } } // TODO remove this line when only asynchronous fetches are used @@ -283,7 +297,8 @@ fn main_fetch(request: Rc, cache: &mut CORSCache, cors_flag: bool, recu } /// [Basic fetch](https://fetch.spec.whatwg.org#basic-fetch) -fn basic_fetch(request: Rc, cache: &mut CORSCache) -> Response { +fn basic_fetch(request: Rc, cache: &mut CORSCache, + target: &mut Target, done_chan: &mut DoneChannel) -> Response { let url = request.current_url(); match url.scheme() { @@ -295,7 +310,7 @@ fn basic_fetch(request: Rc, cache: &mut CORSCache) -> Response { }, "http" | "https" => { - http_fetch(request.clone(), cache, false, false, false) + http_fetch(request.clone(), cache, false, false, false, target, done_chan) }, "data" => { @@ -350,7 +365,12 @@ fn http_fetch(request: Rc, cache: &mut CORSCache, cors_flag: bool, cors_preflight_flag: bool, - authentication_fetch_flag: bool) -> Response { + authentication_fetch_flag: bool, + target: &mut Target, + done_chan: &mut DoneChannel) -> Response { + + // This is a new async fetch, reset the channel we are waiting on + *done_chan = None; // Step 1 let mut response: Option = None; @@ -423,7 +443,8 @@ fn http_fetch(request: Rc, }; // Substep 4 - let fetch_result = http_network_or_cache_fetch(request.clone(), credentials, authentication_fetch_flag); + let fetch_result = http_network_or_cache_fetch(request.clone(), credentials, authentication_fetch_flag, + done_chan); // Substep 5 if cors_flag && cors_check(request.clone(), &fetch_result).is_err() { @@ -450,7 +471,8 @@ fn http_fetch(request: Rc, RedirectMode::Follow => { // set back to default response.return_internal.set(true); - http_redirect_fetch(request, cache, Rc::new(response), cors_flag) + http_redirect_fetch(request, cache, Rc::new(response), + cors_flag, target, done_chan) } } }, @@ -472,7 +494,8 @@ fn http_fetch(request: Rc, } // Step 4 - return http_fetch(request, cache, cors_flag, cors_preflight_flag, true); + return http_fetch(request, cache, cors_flag, cors_preflight_flag, + true, target, done_chan); } // Code 407 @@ -489,7 +512,8 @@ fn http_fetch(request: Rc, // Step 4 return http_fetch(request, cache, cors_flag, cors_preflight_flag, - authentication_fetch_flag); + authentication_fetch_flag, target, + done_chan); } _ => { } @@ -510,7 +534,9 @@ fn http_fetch(request: Rc, fn http_redirect_fetch(request: Rc, cache: &mut CORSCache, response: Rc, - cors_flag: bool) -> Response { + cors_flag: bool, + target: &mut Target, + done_chan: &mut DoneChannel) -> Response { // Step 1 assert_eq!(response.return_internal.get(), true); @@ -584,13 +610,14 @@ fn http_redirect_fetch(request: Rc, request.url_list.borrow_mut().push(location_url); // Step 15 - main_fetch(request, cache, cors_flag, true) + main_fetch(request, cache, cors_flag, true, target, done_chan) } /// [HTTP network or cache fetch](https://fetch.spec.whatwg.org#http-network-or-cache-fetch) fn http_network_or_cache_fetch(request: Rc, credentials_flag: bool, - authentication_fetch_flag: bool) -> Response { + authentication_fetch_flag: bool, + done_chan: &mut DoneChannel) -> Response { // TODO: Implement Window enum for Request let request_has_no_window = true; @@ -753,7 +780,7 @@ fn http_network_or_cache_fetch(request: Rc, // Step 18 if response.is_none() { - response = Some(http_network_fetch(request.clone(), http_request.clone(), credentials_flag)); + response = Some(http_network_fetch(request.clone(), http_request.clone(), credentials_flag, done_chan)); } let response = response.unwrap(); @@ -789,7 +816,8 @@ fn http_network_or_cache_fetch(request: Rc, /// [HTTP network fetch](https://fetch.spec.whatwg.org/#http-network-fetch) fn http_network_fetch(request: Rc, _http_request: Rc, - _credentials_flag: bool) -> Response { + _credentials_flag: bool, + done_chan: &mut DoneChannel) -> Response { // TODO: Implement HTTP network fetch spec // Step 1 @@ -822,7 +850,11 @@ fn http_network_fetch(request: Rc, response.headers = res.response.headers.clone(); let res_body = response.body.clone(); - thread::spawn(move || { + + // We're about to spawn a thread to be waited on here + *done_chan = Some(channel()); + let done_sender = done_chan.as_ref().map(|ch| ch.0.clone()); + spawn_named(format!("fetch worker thread"), move || { *res_body.lock().unwrap() = ResponseBody::Receiving(vec![]); loop { @@ -838,6 +870,9 @@ fn http_network_fetch(request: Rc, _ => vec![] }; *res_body.lock().unwrap() = ResponseBody::Done(completed_body); + if let Some(sender) = done_sender { + let _ = sender.send(()); + } break; } } @@ -845,8 +880,9 @@ fn http_network_fetch(request: Rc, } }); }, - Err(_) => - response.termination_reason = Some(TerminationReason::Fatal) + Err(_) => { + response.termination_reason = Some(TerminationReason::Fatal); + } }; // TODO these substeps aren't possible yet @@ -929,7 +965,7 @@ fn cors_preflight_fetch(request: Rc, cache: &mut CORSCache) -> Response // Step 6 let preflight = Rc::new(preflight); - let response = http_network_or_cache_fetch(preflight.clone(), false, false); + let response = http_network_or_cache_fetch(preflight.clone(), false, false, &mut None); // Step 7 if cors_check(request.clone(), &response).is_ok() && diff --git a/components/net_traits/lib.rs b/components/net_traits/lib.rs index 4af820b0ebd..ed17e52c876 100644 --- a/components/net_traits/lib.rs +++ b/components/net_traits/lib.rs @@ -154,9 +154,26 @@ pub trait LoadOrigin { fn pipeline_id(&self) -> Option; } -/// Interface for observing the final response for an asynchronous fetch operation. -pub trait AsyncFetchListener { - fn response_available(&self, response: response::Response); +pub trait FetchTaskTarget { + /// https://fetch.spec.whatwg.org/#process-request-body + /// + /// Fired when a chunk of the request body is transmitted + fn process_request_body(&mut self, request: &request::Request); + + /// https://fetch.spec.whatwg.org/#process-request-end-of-file + /// + /// Fired when the entire request finishes being transmitted + fn process_request_eof(&mut self, request: &request::Request); + + /// https://fetch.spec.whatwg.org/#process-response + /// + /// Fired when headers are received + fn process_response(&mut self, response: &response::Response); + + /// https://fetch.spec.whatwg.org/#process-response-end-of-file + /// + /// Fired when the response is fully fetched + fn process_response_eof(&mut self, response: &response::Response); } /// A listener for asynchronous network events. Cancelling the underlying request is unsupported. diff --git a/components/net_traits/request.rs b/components/net_traits/request.rs index 675a78fd788..be7a09e4b25 100644 --- a/components/net_traits/request.rs +++ b/components/net_traits/request.rs @@ -145,7 +145,7 @@ pub struct Request { pub url_list: RefCell>, pub redirect_count: Cell, pub response_tainting: Cell, - pub done: Cell + pub done: Cell, } impl Request { diff --git a/components/net_traits/response.rs b/components/net_traits/response.rs index d107e219988..84826fed2b1 100644 --- a/components/net_traits/response.rs +++ b/components/net_traits/response.rs @@ -132,18 +132,6 @@ impl Response { } } - pub fn wait_until_done(&self) { - match self.response_type { - // since these response types can't hold a body, they should be considered done - ResponseType::Error | ResponseType::Opaque | ResponseType::OpaqueRedirect => {}, - _ => { - while !self.body.lock().unwrap().is_done() && !self.is_network_error() { - // loop until done - } - } - } - } - pub fn actual_response(&self) -> &Response { if self.return_internal.get() && self.internal_response.is_some() { &**self.internal_response.as_ref().unwrap() diff --git a/tests/unit/net/fetch.rs b/tests/unit/net/fetch.rs index 4803eee5178..16dc664ffff 100644 --- a/tests/unit/net/fetch.rs +++ b/tests/unit/net/fetch.rs @@ -14,13 +14,14 @@ use hyper::server::{Request as HyperRequest, Response as HyperResponse}; use hyper::status::StatusCode; use hyper::uri::RequestUri; use net::fetch::cors_cache::CORSCache; -use net::fetch::methods::{fetch, fetch_async, fetch_with_cors_cache}; -use net_traits::AsyncFetchListener; +use net::fetch::methods::{fetch, fetch_with_cors_cache}; +use net_traits::FetchTaskTarget; use net_traits::request::{Origin, RedirectMode, Referer, Request, RequestMode}; use net_traits::response::{CacheState, Response, ResponseBody, ResponseType}; use std::fs::File; use std::io::Read; use std::rc::Rc; +use std::thread; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::{Sender, channel}; use std::sync::{Arc, Mutex}; @@ -35,12 +36,22 @@ struct FetchResponseCollector { sender: Sender, } -impl AsyncFetchListener for FetchResponseCollector { - fn response_available(&self, response: Response) { - let _ = self.sender.send(response); +impl FetchTaskTarget for FetchResponseCollector { + fn process_request_body(&mut self, _: &Request) {} + fn process_request_eof(&mut self, _: &Request) {} + fn process_response(&mut self, _: &Response) {} + /// Fired when the response is fully fetched + fn process_response_eof(&mut self, response: &Response) { + self.sender.send(response.clone()); } } +fn fetch_async(request: Request, target: Box) { + thread::spawn(move || { + fetch(Rc::new(request), Some(target)); + }); +} + fn make_server(handler: H) -> (Listening, Url) { // this is a Listening server because of handle_threads() let server = Server::http("0.0.0.0:0").unwrap().handle_threads(handler, 1).unwrap(); @@ -64,7 +75,7 @@ fn test_fetch_response_is_not_network_error() { request.referer = Referer::NoReferer; let wrapped_request = Rc::new(request); - let fetch_response = fetch(wrapped_request); + let fetch_response = fetch(wrapped_request, None); let _ = server.close(); if fetch_response.is_network_error() { @@ -85,7 +96,7 @@ fn test_fetch_response_body_matches_const_message() { request.referer = Referer::NoReferer; let wrapped_request = Rc::new(request); - let fetch_response = fetch(wrapped_request); + let fetch_response = fetch(wrapped_request, None); let _ = server.close(); assert!(!fetch_response.is_network_error()); @@ -107,7 +118,7 @@ fn test_fetch_aboutblank() { request.referer = Referer::NoReferer; let wrapped_request = Rc::new(request); - let fetch_response = fetch(wrapped_request); + let fetch_response = fetch(wrapped_request, None); assert!(!fetch_response.is_network_error()); assert!(*fetch_response.body.lock().unwrap() == ResponseBody::Done(vec![])); } @@ -119,7 +130,7 @@ fn test_fetch_data() { let request = Request::new(url, Some(origin), false); request.same_origin_data.set(true); let expected_resp_body = "

Servo

".to_owned(); - let fetch_response = fetch(Rc::new(request)); + let fetch_response = fetch(Rc::new(request), None); assert!(!fetch_response.is_network_error()); assert_eq!(fetch_response.headers.len(), 1); @@ -148,7 +159,7 @@ fn test_fetch_file() { let request = Request::new(url, Some(origin), false); request.same_origin_data.set(true); - let fetch_response = fetch(Rc::new(request)); + let fetch_response = fetch(Rc::new(request), None); assert!(!fetch_response.is_network_error()); assert_eq!(fetch_response.headers.len(), 1); let content_type: &ContentType = fetch_response.headers.get().unwrap(); @@ -192,7 +203,7 @@ fn test_cors_preflight_fetch() { request.mode = RequestMode::CORSMode; let wrapped_request = Rc::new(request); - let fetch_response = fetch(wrapped_request); + let fetch_response = fetch(wrapped_request, None); let _ = server.close(); assert!(!fetch_response.is_network_error()); @@ -232,8 +243,8 @@ fn test_cors_preflight_cache_fetch() { let wrapped_request0 = Rc::new(request.clone()); let wrapped_request1 = Rc::new(request); - let fetch_response0 = fetch_with_cors_cache(wrapped_request0.clone(), &mut cache); - let fetch_response1 = fetch_with_cors_cache(wrapped_request1.clone(), &mut cache); + let fetch_response0 = fetch_with_cors_cache(wrapped_request0.clone(), &mut cache, None); + let fetch_response1 = fetch_with_cors_cache(wrapped_request1.clone(), &mut cache, None); let _ = server.close(); assert!(!fetch_response0.is_network_error() && !fetch_response1.is_network_error()); @@ -281,7 +292,7 @@ fn test_cors_preflight_fetch_network_error() { request.mode = RequestMode::CORSMode; let wrapped_request = Rc::new(request); - let fetch_response = fetch(wrapped_request); + let fetch_response = fetch(wrapped_request, None); let _ = server.close(); assert!(fetch_response.is_network_error()); @@ -304,7 +315,7 @@ fn test_fetch_response_is_basic_filtered() { request.referer = Referer::NoReferer; let wrapped_request = Rc::new(request); - let fetch_response = fetch(wrapped_request); + let fetch_response = fetch(wrapped_request, None); let _ = server.close(); assert!(!fetch_response.is_network_error()); @@ -352,7 +363,7 @@ fn test_fetch_response_is_cors_filtered() { request.mode = RequestMode::CORSMode; let wrapped_request = Rc::new(request); - let fetch_response = fetch(wrapped_request); + let fetch_response = fetch(wrapped_request, None); let _ = server.close(); assert!(!fetch_response.is_network_error()); @@ -385,7 +396,7 @@ fn test_fetch_response_is_opaque_filtered() { request.referer = Referer::NoReferer; let wrapped_request = Rc::new(request); - let fetch_response = fetch(wrapped_request); + let fetch_response = fetch(wrapped_request, None); let _ = server.close(); assert!(!fetch_response.is_network_error()); @@ -435,7 +446,7 @@ fn test_fetch_response_is_opaque_redirect_filtered() { request.redirect_mode.set(RedirectMode::Manual); let wrapped_request = Rc::new(request); - let fetch_response = fetch(wrapped_request); + let fetch_response = fetch(wrapped_request, None); let _ = server.close(); assert!(!fetch_response.is_network_error()); @@ -473,7 +484,7 @@ fn test_fetch_with_local_urls_only() { request.local_urls_only = true; let wrapped_request = Rc::new(request); - fetch(wrapped_request) + fetch(wrapped_request, None) }; let local_url = Url::parse("about:blank").unwrap(); @@ -512,7 +523,7 @@ fn setup_server_and_fetch(message: &'static [u8], redirect_cap: u32) -> Response request.referer = Referer::NoReferer; let wrapped_request = Rc::new(request); - let fetch_response = fetch(wrapped_request); + let fetch_response = fetch(wrapped_request, None); let _ = server.close(); fetch_response } @@ -598,7 +609,7 @@ fn test_fetch_redirect_updates_method_runner(tx: Sender, status_code: Stat *request.method.borrow_mut() = method; let wrapped_request = Rc::new(request); - let _ = fetch(wrapped_request); + let _ = fetch(wrapped_request, None); let _ = server.close(); }