set response.body asynchronously in Fetch

This commit is contained in:
Nikki 2016-03-01 14:22:17 -07:00
parent 22ce878edc
commit b187985e49
5 changed files with 114 additions and 73 deletions

View file

@ -5,7 +5,7 @@
use fetch::cors_cache::{BasicCORSCache, CORSCache, CacheRequestDetails};
use fetch::response::ResponseMethods;
use http_loader::{NetworkHttpRequestFactory, WrappedHttpResponse};
use http_loader::{create_http_connector, obtain_response};
use http_loader::{create_http_connector, obtain_response, read_block, ReadResult};
use hyper::client::response::Response as HyperResponse;
use hyper::header::{Accept, CacheControl, IfMatch, IfRange, IfUnmodifiedSince, Location};
use hyper::header::{AcceptLanguage, ContentLength, ContentLanguage, HeaderView, Pragma};
@ -27,6 +27,7 @@ use std::cell::RefCell;
use std::io::Read;
use std::rc::Rc;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::thread;
use url::idna::domain_to_ascii;
use url::{Origin as UrlOrigin, OpaqueOrigin, Url, UrlParser, whatwg_scheme_type_mapper};
@ -35,8 +36,9 @@ use util::thread::spawn_named;
pub fn fetch_async(request: Request, listener: Box<AsyncFetchListener + Send>) {
spawn_named(format!("fetch for {:?}", request.current_url_string()), move || {
let request = Rc::new(request);
let res = fetch(request);
listener.response_available(res);
let fetch_response = fetch(request);
fetch_response.wait_until_done();
listener.response_available(fetch_response);
})
}
@ -140,9 +142,7 @@ fn main_fetch(request: Rc<Request>, cors_flag: bool, recursive_flag: bool) -> Re
// TODO this step
// Step 8
if !request.synchronous && !recursive_flag {
// TODO run the remaining steps in parallel
}
// this step is obsoleted by fetch_async
// Step 9
let mut response = if response.is_none() {
@ -228,9 +228,10 @@ fn main_fetch(request: Rc<Request>, cors_flag: bool, recursive_flag: bool) -> Re
Method::Head | Method::Connect => true,
_ => false })
{
// when the Fetch implementation does asynchronous retrieval of the body,
// we will need to make sure nothing tries to write to the body at this point
*internal_response.body.borrow_mut() = ResponseBody::Empty;
// when Fetch is used only asynchronously, we will need to make sure
// that nothing tries to write to the body at this point
let mut body = internal_response.body.lock().unwrap();
*body = ResponseBody::Empty;
}
// Step 15
@ -238,7 +239,7 @@ fn main_fetch(request: Rc<Request>, cors_flag: bool, recursive_flag: bool) -> Re
// if !response.is_network_error() {
// // Substep 1
// // TODO wait for response
// response.wait_until_done();
// // Substep 2
// if response.termination_reason.is_none() {
@ -250,7 +251,7 @@ fn main_fetch(request: Rc<Request>, cors_flag: bool, recursive_flag: bool) -> Re
// Step 16
if request.synchronous {
// TODO wait for internal_response
response.get_actual_response().wait_until_done();
return response;
}
@ -274,22 +275,14 @@ fn main_fetch(request: Rc<Request>, cors_flag: bool, recursive_flag: bool) -> Re
// Step 18
// TODO this step
match *internal_response.body.borrow() {
// Step 20
ResponseBody::Empty => {
// Substep 1
// Substep 2
},
// Step 19
internal_response.wait_until_done();
// Step 19
_ => {
// Substep 1
// Substep 2
}
};
// Step 20
// TODO this step
}
// TODO remove this line when asynchronous fetches are supported
// TODO remove this line when only asynchronous fetches are used
return response;
}
@ -544,11 +537,12 @@ fn http_redirect_fetch(request: Rc<Request>,
let location = match response.get_actual_response().headers.get::<Location>() {
Some(&Location(ref location)) => location.clone(),
// Step 4
_ => return Response::network_error(),
_ => return Response::network_error()
};
// Step 5
let location_url = UrlParser::new().base_url(&request.current_url()).parse(&*location);
let response_url = response.get_actual_response().url.as_ref().unwrap();
let location_url = UrlParser::new().base_url(response_url).parse(&*location);
// Step 6
let location_url = match location_url {
@ -663,29 +657,38 @@ fn http_network_or_cache_fetch(request: Rc<Request>,
http_request.headers.borrow_mut().set(UserAgent(global_user_agent().to_owned()));
}
// Step 9
if http_request.cache_mode.get() == CacheMode::Default && is_no_store_cache(&http_request.headers.borrow()) {
http_request.cache_mode.set(CacheMode::NoStore);
match http_request.cache_mode.get() {
// Step 9
CacheMode::Default if is_no_store_cache(&http_request.headers.borrow()) => {
http_request.cache_mode.set(CacheMode::NoStore);
},
// Step 10
CacheMode::NoCache if !http_request.headers.borrow().has::<CacheControl>() => {
http_request.headers.borrow_mut().set(CacheControl(vec![CacheDirective::MaxAge(0)]));
},
// Step 11
CacheMode::Reload => {
// Substep 1
if !http_request.headers.borrow().has::<Pragma>() {
http_request.headers.borrow_mut().set(Pragma::NoCache);
}
// Substep 2
if !http_request.headers.borrow().has::<CacheControl>() {
http_request.headers.borrow_mut().set(CacheControl(vec![CacheDirective::NoCache]));
}
},
_ => {}
}
// Step 10
if http_request.cache_mode.get() == CacheMode::Reload {
// Substep 1
if !http_request.headers.borrow().has::<Pragma>() {
http_request.headers.borrow_mut().set(Pragma::NoCache);
}
// Substep 2
if !http_request.headers.borrow().has::<CacheControl>() {
http_request.headers.borrow_mut().set(CacheControl(vec![CacheDirective::NoCache]));
}
}
// Step 11
// modify_request_headers(http_request.headers.borrow());
// Step 12
// modify_request_headers(http_request.headers.borrow());
// Step 13
// TODO some of this step can't be implemented yet
if credentials_flag {
@ -723,13 +726,13 @@ fn http_network_or_cache_fetch(request: Rc<Request>,
}
}
// Step 13
// TODO this step can't be implemented
// Step 14
let mut response: Option<Response> = None;
// TODO this step can't be implemented yet
// Step 15
let mut response: Option<Response> = None;
// Step 16
// TODO have a HTTP cache to check for a completed response
let complete_http_response_from_cache: Option<Response> = None;
if http_request.cache_mode.get() != CacheMode::NoStore &&
@ -761,20 +764,20 @@ fn http_network_or_cache_fetch(request: Rc<Request>,
// TODO this substep
}
// Step 16
// Step 17
// TODO have a HTTP cache to check for a partial response
} else if http_request.cache_mode.get() == CacheMode::Default ||
http_request.cache_mode.get() == CacheMode::ForceCache {
// TODO this substep
}
// Step 17
// Step 18
if response.is_none() {
response = Some(http_network_fetch(request.clone(), http_request.clone(), credentials_flag));
}
let response = response.unwrap();
// Step 18
// Step 19
if let Some(status) = response.status {
if status == StatusCode::NotModified &&
(http_request.cache_mode.get() == CacheMode::Default ||
@ -800,7 +803,7 @@ fn http_network_or_cache_fetch(request: Rc<Request>,
}
}
// Step 19
// Step 20
response
}
@ -835,14 +838,43 @@ fn http_network_fetch(request: Rc<Request>,
let mut response = Response::new();
match wrapped_response {
Ok(mut res) => {
// is it okay for res.version to be unused?
response.url = Some(res.response.url.clone());
response.status = Some(res.response.status);
response.headers = res.response.headers.clone();
let mut body = vec![];
res.response.read_to_end(&mut body);
*response.body.borrow_mut() = ResponseBody::Done(body);
let res_body = response.body.clone();
thread::spawn(move || {
*res_body.lock().unwrap() = ResponseBody::Receiving(vec![]);
let mut new_body = vec![];
res.response.read_to_end(&mut new_body);
let mut body = res_body.lock().unwrap();
assert!(*body != ResponseBody::Empty);
*body = ResponseBody::Done(new_body);
// TODO: the vec storage format is much too slow for these operations,
// response.body needs to use something else before this code can be used
// *res_body.lock().unwrap() = ResponseBody::Receiving(vec![]);
// loop {
// match read_block(&mut res.response) {
// Ok(ReadResult::Payload(ref mut new_body)) => {
// if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() {
// (body).append(new_body);
// }
// },
// Ok(ReadResult::EOF) | Err(_) => break
// }
// }
// let mut completed_body = res_body.lock().unwrap();
// if let ResponseBody::Receiving(ref body) = *completed_body {
// // TODO cloning seems sub-optimal, but I couldn't figure anything else out
// *res_body.lock().unwrap() = ResponseBody::Done((*body).clone());
// }
});
},
Err(e) =>
response.termination_reason = Some(TerminationReason::Fatal)