Auto merge of #9753 - nikkisquared:async_fetch, r=jdm

Make Fetch Protocol Asynchronous

I'm working on making it possible to run Fetch Asynchronously, as required for some steps, such as Main Fetch. It looks like somebody has already laid some groundwork for that, with a AsyncFetchListener trait and two async fetch functions defined, which I'm building on top of.

So far, as a sort of proof of concept, I've written a test to asynchronously retrieve a fetch response, which uses a simple function to check if the fetch response is complete or not. I'd like to be checked if I'm on the right path, to see if I need to rework anything so far, and what my next step can be.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.svg" height="40" alt="Review on Reviewable"/>](https://reviewable.io/reviews/servo/servo/9753)
<!-- Reviewable:end -->
This commit is contained in:
bors-servo 2016-03-02 01:49:29 +05:30
commit 22ce878edc
5 changed files with 264 additions and 169 deletions

View file

@ -154,7 +154,7 @@ fn main_fetch(request: Rc<Request>, cors_flag: bool, recursive_flag: bool) -> Re
false
};
if (!cors_flag && same_origin) ||
if (same_origin && !cors_flag ) ||
(current_url.scheme == "data" && request.same_origin_data.get()) ||
current_url.scheme == "about" ||
request.mode == RequestMode::Navigate {
@ -200,52 +200,54 @@ fn main_fetch(request: Rc<Request>, cors_flag: bool, recursive_flag: bool) -> Re
// Step 11
// no need to check if response is a network error, since the type would not be `Default`
let mut response = if response.response_type == ResponseType::Default {
let old_response = Rc::new(response);
let response_type = match request.response_tainting.get() {
ResponseTainting::Basic => ResponseType::Basic,
ResponseTainting::CORSTainting => ResponseType::CORS,
ResponseTainting::Opaque => ResponseType::Opaque,
};
Response::to_filtered(old_response, response_type)
response.to_filtered(response_type)
} else {
response
};
// Step 12
let mut internal_response = if response.is_network_error() {
Rc::new(Response::network_error())
} else {
response.internal_response.clone().unwrap()
};
{
// Step 12
let network_error_res = Response::network_error();
let mut internal_response = if response.is_network_error() {
&network_error_res
} else {
response.get_actual_response()
};
// Step 13
// TODO this step
// Step 13
// TODO this step
// Step 14
if !response.is_network_error() && (is_null_body_status(&internal_response.status) ||
match *request.method.borrow() {
Method::Head | Method::Connect => true,
_ => false })
{
// when the Fetch implementation does asynchronous retrieval of the body,
// we will need to make sure nothing tries to write to the body at this point
*internal_response.body.borrow_mut() = ResponseBody::Empty;
// Step 14
if !response.is_network_error() && (is_null_body_status(&internal_response.status) ||
match *request.method.borrow() {
Method::Head | Method::Connect => true,
_ => false })
{
// when the Fetch implementation does asynchronous retrieval of the body,
// we will need to make sure nothing tries to write to the body at this point
*internal_response.body.borrow_mut() = ResponseBody::Empty;
}
// Step 15
// TODO be able to compare response integrity against request integrity metadata
// if !response.is_network_error() {
// // Substep 1
// // TODO wait for response
// // Substep 2
// if response.termination_reason.is_none() {
// response = Response::network_error();
// internal_response = Response::network_error();
// }
// }
}
// Step 15
// TODO be able to compare response integrity against request integrity metadata
// if !response.is_network_error() {
// // Substep 1
// // TODO wait for response
// // Substep 2
// if response.termination_reason.is_none() {
// response = Response::network_error();
// internal_response = Response::network_error();
// }
// }
// Step 16
if request.synchronous {
// TODO wait for internal_response
@ -260,22 +262,32 @@ fn main_fetch(request: Rc<Request>, cors_flag: bool, recursive_flag: bool) -> Re
// TODO queue a fetch task on request to process end-of-file
}
// Step 18
// TODO this step
{
// Step 12 repeated to use internal_response
let network_error_res = Response::network_error();
let mut internal_response = if response.is_network_error() {
&network_error_res
} else {
response.get_actual_response()
};
match *internal_response.body.borrow() {
// Step 20
ResponseBody::Empty => {
// Substep 1
// Substep 2
},
// Step 18
// TODO this step
// Step 19
_ => {
// Substep 1
// Substep 2
}
};
match *internal_response.body.borrow() {
// Step 20
ResponseBody::Empty => {
// Substep 1
// Substep 2
},
// Step 19
_ => {
// Substep 1
// Substep 2
}
};
}
// TODO remove this line when asynchronous fetches are supported
return response;
@ -338,10 +350,10 @@ fn http_fetch(request: Rc<Request>,
authentication_fetch_flag: bool) -> Response {
// Step 1
let mut response: Option<Rc<Response>> = None;
let mut response: Option<Response> = None;
// Step 2
let mut actual_response: Option<Rc<Response>> = None;
// nothing to do, since actual_response is a function on response
// Step 3
if !request.skip_service_worker.get() && !request.is_service_worker_global_scope {
@ -352,10 +364,7 @@ fn http_fetch(request: Rc<Request>,
if let Some(ref res) = response {
// Substep 2
actual_response = match res.internal_response {
Some(ref internal_res) => Some(internal_res.clone()),
None => Some(res.clone())
};
// nothing to do, since actual_response is a function on response
// Substep 3
if (res.response_type == ResponseType::Opaque &&
@ -367,17 +376,16 @@ fn http_fetch(request: Rc<Request>,
res.response_type == ResponseType::Error {
return Response::network_error();
}
}
// Substep 4
if let Some(ref res) = actual_response {
if res.url_list.borrow().is_empty() {
*res.url_list.borrow_mut() = request.url_list.borrow().clone();
// Substep 4
let actual_response = res.get_actual_response();
if actual_response.url_list.borrow().is_empty() {
*actual_response.url_list.borrow_mut() = request.url_list.borrow().clone();
}
}
// Substep 5
// TODO: set response's CSP list on actual_response
// Substep 5
// TODO: set response's CSP list on actual_response
}
}
// Step 4
@ -437,29 +445,32 @@ fn http_fetch(request: Rc<Request>,
return Response::network_error();
}
response = Some(Rc::new(fetch_result));
actual_response = response.clone();
fetch_result.return_internal.set(false);
response = Some(fetch_result);
}
// response and actual_response are guaranteed to be something by now
// response is guaranteed to be something by now
let mut response = response.unwrap();
let actual_response = actual_response.unwrap();
// Step 5
match actual_response.status.unwrap() {
match response.get_actual_response().status.unwrap() {
// Code 301, 302, 303, 307, 308
StatusCode::MovedPermanently | StatusCode::Found | StatusCode::SeeOther |
StatusCode::TemporaryRedirect | StatusCode::PermanentRedirect => {
response = match request.redirect_mode.get() {
RedirectMode::Error => Rc::new(Response::network_error()),
RedirectMode::Error => Response::network_error(),
RedirectMode::Manual => {
Rc::new(Response::to_filtered(actual_response, ResponseType::OpaqueRedirect))
response.to_filtered(ResponseType::OpaqueRedirect)
},
RedirectMode::Follow => Rc::new(http_redirect_fetch(request, response, cors_flag))
RedirectMode::Follow => {
// set back to default
response.return_internal.set(true);
http_redirect_fetch(request, Rc::new(response), cors_flag)
}
}
}
},
// Code 401
StatusCode::Unauthorized => {
@ -467,8 +478,7 @@ fn http_fetch(request: Rc<Request>,
// Step 1
// FIXME: Figure out what to do with request window objects
if cors_flag || request.credentials_mode != CredentialsMode::Include {
drop(actual_response);
return Rc::try_unwrap(response).ok().unwrap();
return response;
}
// Step 2
@ -501,7 +511,7 @@ fn http_fetch(request: Rc<Request>,
authentication_fetch_flag);
}
_ => drop(actual_response)
_ => { }
}
// Step 6
@ -509,8 +519,10 @@ fn http_fetch(request: Rc<Request>,
// TODO: Create authentication entry for this request
}
// set back to default
response.return_internal.set(true);
// Step 7
Rc::try_unwrap(response).ok().unwrap()
response
}
/// [HTTP redirect fetch](https://fetch.spec.whatwg.org#http-redirect-fetch)
@ -519,21 +531,17 @@ fn http_redirect_fetch(request: Rc<Request>,
cors_flag: bool) -> Response {
// Step 1
let actual_response = match response.internal_response {
Some(ref res) => res.clone(),
_ => response.clone()
};
assert_eq!(response.return_internal.get(), true);
// Step 3
// this step is done early, because querying if Location is available says
// if it is None or Some, making it easy to seperate from the retrieval failure case
if !actual_response.headers.has::<Location>() {
drop(actual_response);
if !response.get_actual_response().headers.has::<Location>() {
return Rc::try_unwrap(response).ok().unwrap();
}
// Step 2
let location = match actual_response.headers.get::<Location>() {
let location = match response.get_actual_response().headers.get::<Location>() {
Some(&Location(ref location)) => location.clone(),
// Step 4
_ => return Response::network_error(),
@ -582,7 +590,7 @@ fn http_redirect_fetch(request: Rc<Request>,
}
// Step 13
let status_code = actual_response.status.unwrap();
let status_code = response.get_actual_response().status.unwrap();
if ((status_code == StatusCode::MovedPermanently || status_code == StatusCode::Found) &&
*request.method.borrow() == Method::Post) ||
status_code == StatusCode::SeeOther {

View file

@ -2,18 +2,17 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use hyper::header::{AccessControlExposeHeaders, Headers};
use hyper::header::Headers;
use hyper::status::StatusCode;
use net_traits::response::{CacheState, HttpsState, Response, ResponseBody, ResponseType};
use std::ascii::AsciiExt;
use std::cell::RefCell;
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use std::sync::mpsc::Receiver;
use url::Url;
pub trait ResponseMethods {
fn new() -> Response;
fn to_filtered(Rc<Response>, ResponseType) -> Response;
}
impl ResponseMethods for Response {
@ -28,77 +27,9 @@ impl ResponseMethods for Response {
body: RefCell::new(ResponseBody::Empty),
cache_state: CacheState::None,
https_state: HttpsState::None,
internal_response: None
internal_response: None,
return_internal: Cell::new(true)
}
}
/// Convert to a filtered response, of type `filter_type`.
/// Do not use with type Error or Default
fn to_filtered(old_response: Rc<Response>, filter_type: ResponseType) -> Response {
assert!(filter_type != ResponseType::Error);
assert!(filter_type != ResponseType::Default);
if Response::is_network_error(&old_response) {
return Response::network_error();
}
let old_headers = old_response.headers.clone();
let mut response = (*old_response).clone();
response.internal_response = Some(old_response);
response.response_type = filter_type;
match filter_type {
ResponseType::Default | ResponseType::Error => unreachable!(),
ResponseType::Basic => {
let headers = old_headers.iter().filter(|header| {
match &*header.name().to_ascii_lowercase() {
"set-cookie" | "set-cookie2" => false,
_ => true
}
}).collect();
response.headers = headers;
},
ResponseType::CORS => {
let access = old_headers.get::<AccessControlExposeHeaders>();
let allowed_headers = access.as_ref().map(|v| &v[..]).unwrap_or(&[]);
let headers = old_headers.iter().filter(|header| {
match &*header.name().to_ascii_lowercase() {
"cache-control" | "content-language" | "content-type" |
"expires" | "last-modified" | "pragma" => true,
"set-cookie" | "set-cookie2" => false,
header => {
let result =
allowed_headers.iter().find(|h| *header == *h.to_ascii_lowercase());
result.is_some()
}
}
}).collect();
response.headers = headers;
},
ResponseType::Opaque => {
response.url_list = RefCell::new(vec![]);
response.url = None;
response.headers = Headers::new();
response.status = None;
response.body = RefCell::new(ResponseBody::Empty);
response.cache_state = CacheState::None;
},
ResponseType::OpaqueRedirect => {
response.headers = Headers::new();
response.status = None;
response.body = RefCell::new(ResponseBody::Empty);
response.cache_state = CacheState::None;
}
}
response
}
}

View file

@ -111,7 +111,7 @@ pub enum CORSSettings {
pub struct Request {
pub method: RefCell<Method>,
pub local_urls_only: bool,
pub sanboxed_storage_area_urls: bool,
pub sandboxed_storage_area_urls: bool,
pub headers: RefCell<Headers>,
pub unsafe_request: bool,
pub body: RefCell<Option<Vec<u8>>>,
@ -155,7 +155,7 @@ impl Request {
Request {
method: RefCell::new(Method::Get),
local_urls_only: false,
sanboxed_storage_area_urls: false,
sandboxed_storage_area_urls: false,
headers: RefCell::new(Headers::new()),
unsafe_request: false,
body: RefCell::new(None),
@ -193,7 +193,7 @@ impl Request {
Request {
method: RefCell::new(Method::Get),
local_urls_only: false,
sanboxed_storage_area_urls: false,
sandboxed_storage_area_urls: false,
headers: RefCell::new(Headers::new()),
unsafe_request: false,
body: RefCell::new(None),

View file

@ -4,10 +4,10 @@
//! The [Response](https://fetch.spec.whatwg.org/#responses) object
//! resulting from a [fetch operation](https://fetch.spec.whatwg.org/#concept-fetch)
use hyper::header::Headers;
use hyper::header::{AccessControlExposeHeaders, Headers};
use hyper::status::StatusCode;
use std::cell::RefCell;
use std::rc::Rc;
use std::ascii::AsciiExt;
use std::cell::{Cell, RefCell};
use url::Url;
/// [Response type](https://fetch.spec.whatwg.org/#concept-response-type)
@ -38,6 +38,16 @@ pub enum ResponseBody {
Done(Vec<u8>),
}
impl ResponseBody {
pub fn is_done(&self) -> bool {
match *self {
ResponseBody::Done(..) => true,
ResponseBody::Empty | ResponseBody::Receiving(..) => false
}
}
}
/// [Cache state](https://fetch.spec.whatwg.org/#concept-response-cache-state)
#[derive(Clone, Debug)]
pub enum CacheState {
@ -76,7 +86,9 @@ pub struct Response {
pub https_state: HttpsState,
/// [Internal response](https://fetch.spec.whatwg.org/#concept-internal-response), only used if the Response
/// is a filtered response
pub internal_response: Option<Rc<Response>>,
pub internal_response: Option<Box<Response>>,
/// whether or not to try to return the internal_response when asked for actual_response
pub return_internal: Cell<bool>,
}
impl Response {
@ -91,7 +103,8 @@ impl Response {
body: RefCell::new(ResponseBody::Empty),
cache_state: CacheState::None,
https_state: HttpsState::None,
internal_response: None
internal_response: None,
return_internal: Cell::new(true)
}
}
@ -101,4 +114,92 @@ impl Response {
_ => false
}
}
pub fn get_actual_response(&self) -> &Response {
if self.return_internal.get() && self.internal_response.is_some() {
&**self.internal_response.as_ref().unwrap()
} else {
self
}
}
pub fn to_actual(self) -> Response {
if self.return_internal.get() && self.internal_response.is_some() {
*self.internal_response.unwrap()
} else {
self
}
}
/// Convert to a filtered response, of type `filter_type`.
/// Do not use with type Error or Default
pub fn to_filtered(self, filter_type: ResponseType) -> Response {
assert!(filter_type != ResponseType::Error);
assert!(filter_type != ResponseType::Default);
let old_response = self.to_actual();
if Response::is_network_error(&old_response) {
return Response::network_error();
}
let old_headers = old_response.headers.clone();
let mut response = old_response.clone();
response.internal_response = Some(Box::new(old_response));
response.response_type = filter_type;
match filter_type {
ResponseType::Default | ResponseType::Error => unreachable!(),
ResponseType::Basic => {
let headers = old_headers.iter().filter(|header| {
match &*header.name().to_ascii_lowercase() {
"set-cookie" | "set-cookie2" => false,
_ => true
}
}).collect();
response.headers = headers;
},
ResponseType::CORS => {
let access = old_headers.get::<AccessControlExposeHeaders>();
let allowed_headers = access.as_ref().map(|v| &v[..]).unwrap_or(&[]);
let headers = old_headers.iter().filter(|header| {
match &*header.name().to_ascii_lowercase() {
"cache-control" | "content-language" | "content-type" |
"expires" | "last-modified" | "pragma" => true,
"set-cookie" | "set-cookie2" => false,
header => {
let result =
allowed_headers.iter().find(|h| *header == *h.to_ascii_lowercase());
result.is_some()
}
}
}).collect();
response.headers = headers;
},
ResponseType::Opaque => {
response.url_list = RefCell::new(vec![]);
response.url = None;
response.headers = Headers::new();
response.status = None;
response.body = RefCell::new(ResponseBody::Empty);
response.cache_state = CacheState::None;
},
ResponseType::OpaqueRedirect => {
response.headers = Headers::new();
response.status = None;
response.body = RefCell::new(ResponseBody::Empty);
response.cache_state = CacheState::None;
}
}
response
}
}

View file

@ -10,18 +10,31 @@ use hyper::server::{Handler, Listening, Server};
use hyper::server::{Request as HyperRequest, Response as HyperResponse};
use hyper::status::StatusCode;
use hyper::uri::RequestUri;
use net::fetch::methods::fetch;
use net::fetch::methods::{fetch, fetch_async};
use net::fetch::response::ResponseMethods;
use net_traits::request::{Origin, RedirectMode, Referer, Request, RequestMode};
use net_traits::response::{CacheState, Response, ResponseBody, ResponseType};
use net_traits::{AsyncFetchListener};
use std::cell::Cell;
use std::rc::Rc;
use std::sync::{Arc, Mutex, mpsc};
use std::sync::mpsc::{Sender, channel};
use std::sync::{Arc, Mutex};
use time::{self, Duration};
use unicase::UniCase;
use url::{Origin as UrlOrigin, OpaqueOrigin, Url};
// TODO write a struct that impls Handler for storing test values
struct FetchResponseCollector {
sender: Sender<Response>,
}
impl AsyncFetchListener for FetchResponseCollector {
fn response_available(&self, response: Response) {
let _ = self.sender.send(response);
}
}
fn make_server<H: Handler + 'static>(handler: H) -> (Listening, Url) {
// this is a Listening server because of handle_threads()
@ -327,7 +340,7 @@ fn test_fetch_redirect_count_failure() {
};
}
fn test_fetch_redirect_updates_method_runner(tx: mpsc::Sender<bool>, status_code: StatusCode, method: Method) {
fn test_fetch_redirect_updates_method_runner(tx: Sender<bool>, status_code: StatusCode, method: Method) {
let handler_method = method.clone();
let handler_tx = Arc::new(Mutex::new(tx));
@ -384,7 +397,7 @@ fn test_fetch_redirect_updates_method_runner(tx: mpsc::Sender<bool>, status_code
#[test]
fn test_fetch_redirect_updates_method() {
let (tx, rx) = mpsc::channel();
let (tx, rx) = channel();
test_fetch_redirect_updates_method_runner(tx.clone(), StatusCode::MovedPermanently, Method::Post);
assert_eq!(rx.recv().unwrap(), true);
@ -421,3 +434,45 @@ fn test_fetch_redirect_updates_method() {
assert_eq!(rx.recv().unwrap(), true);
assert_eq!(rx.try_recv().is_err(), true);
}
fn response_is_done(response: &Response) -> bool {
let response_complete = match response.response_type {
ResponseType::Default | ResponseType::Basic | ResponseType::CORS => response.body.borrow().is_done(),
// if the internal response cannot have a body, it shouldn't block the "done" state
ResponseType::Opaque | ResponseType::OpaqueRedirect | ResponseType::Error => true
};
let internal_complete = if let Some(ref res) = response.internal_response {
res.body.borrow().is_done()
} else {
true
};
response_complete && internal_complete
}
#[test]
fn test_fetch_async_returns_complete_response() {
static MESSAGE: &'static [u8] = b"";
let handler = move |_: HyperRequest, response: HyperResponse| {
response.send(MESSAGE).unwrap();
};
let (mut server, url) = make_server(handler);
let origin = Origin::Origin(url.origin());
let mut request = Request::new(url, Some(origin), false);
request.referer = Referer::NoReferer;
let (tx, rx) = channel();
let listener = Box::new(FetchResponseCollector {
sender: tx.clone()
});
fetch_async(request, listener);
let fetch_response = rx.recv().unwrap();
let _ = server.close();
assert_eq!(response_is_done(&fetch_response), true);
}