mirror of
https://github.com/servo/servo.git
synced 2025-08-03 12:40:06 +01:00
Re-add support for fetching chunks (and thus xhr download progress)
This commit is contained in:
parent
6e29b872d7
commit
bf99e73cb0
5 changed files with 96 additions and 87 deletions
|
@ -28,6 +28,7 @@ use std::collections::HashSet;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
use std::iter::FromIterator;
|
use std::iter::FromIterator;
|
||||||
|
use std::mem::swap;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
use std::sync::mpsc::{channel, Sender, Receiver};
|
use std::sync::mpsc::{channel, Sender, Receiver};
|
||||||
use unicase::UniCase;
|
use unicase::UniCase;
|
||||||
|
@ -36,7 +37,12 @@ use util::thread::spawn_named;
|
||||||
|
|
||||||
pub type Target = Option<Box<FetchTaskTarget + Send>>;
|
pub type Target = Option<Box<FetchTaskTarget + Send>>;
|
||||||
|
|
||||||
type DoneChannel = Option<(Sender<()>, Receiver<()>)>;
|
enum Data {
|
||||||
|
Payload(Vec<u8>),
|
||||||
|
Done,
|
||||||
|
}
|
||||||
|
|
||||||
|
type DoneChannel = Option<(Sender<Data>, Receiver<Data>)>;
|
||||||
|
|
||||||
/// [Fetch](https://fetch.spec.whatwg.org#concept-fetch)
|
/// [Fetch](https://fetch.spec.whatwg.org#concept-fetch)
|
||||||
pub fn fetch(request: Rc<Request>, target: &mut Target, state: HttpState) -> Response {
|
pub fn fetch(request: Rc<Request>, target: &mut Target, state: HttpState) -> Response {
|
||||||
|
@ -258,8 +264,38 @@ fn main_fetch(request: Rc<Request>, cache: &mut CORSCache, cors_flag: bool,
|
||||||
|
|
||||||
// Step 18
|
// Step 18
|
||||||
if request.synchronous {
|
if request.synchronous {
|
||||||
|
if let Some(ref mut target) = *target {
|
||||||
|
// process_response is not supposed to be used
|
||||||
|
// by sync fetch, but we overload it here for simplicity
|
||||||
|
target.process_response(&response);
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(ref ch) = *done_chan {
|
if let Some(ref ch) = *done_chan {
|
||||||
let _ = ch.1.recv();
|
loop {
|
||||||
|
match ch.1.recv()
|
||||||
|
.expect("fetch worker should always send Done before terminating") {
|
||||||
|
Data::Payload(vec) => {
|
||||||
|
if let Some(ref mut target) = *target {
|
||||||
|
target.process_response_chunk(vec);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Data::Done => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if let ResponseBody::Done(ref vec) = *response.body.lock().unwrap() {
|
||||||
|
// in case there was no channel to wait for, the body was
|
||||||
|
// obtained synchronously via basic_fetch for data/file/about/etc
|
||||||
|
// We should still send the body across as a chunk
|
||||||
|
if let Some(ref mut target) = *target {
|
||||||
|
target.process_response_chunk(vec.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// overloaded similarly to process_response
|
||||||
|
if let Some(ref mut target) = *target {
|
||||||
|
target.process_response_eof(&response);
|
||||||
}
|
}
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
@ -283,7 +319,26 @@ fn main_fetch(request: Rc<Request>, cache: &mut CORSCache, cors_flag: bool,
|
||||||
|
|
||||||
// Step 21
|
// Step 21
|
||||||
if let Some(ref ch) = *done_chan {
|
if let Some(ref ch) = *done_chan {
|
||||||
let _ = ch.1.recv();
|
loop {
|
||||||
|
match ch.1.recv()
|
||||||
|
.expect("fetch worker should always send Done before terminating") {
|
||||||
|
Data::Payload(vec) => {
|
||||||
|
if let Some(ref mut target) = *target {
|
||||||
|
target.process_response_chunk(vec);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Data::Done => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if let Some(ref mut target) = *target {
|
||||||
|
if let ResponseBody::Done(ref vec) = *response.body.lock().unwrap() {
|
||||||
|
// in case there was no channel to wait for, the body was
|
||||||
|
// obtained synchronously via basic_fetch for data/file/about/etc
|
||||||
|
// We should still send the body across as a chunk
|
||||||
|
target.process_response_chunk(vec.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 22
|
// Step 22
|
||||||
|
@ -876,19 +931,28 @@ fn http_network_fetch(request: Rc<Request>,
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match read_block(&mut res.response) {
|
match read_block(&mut res.response) {
|
||||||
Ok(ReadResult::Payload(ref mut chunk)) => {
|
Ok(ReadResult::Payload(chunk)) => {
|
||||||
if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() {
|
if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() {
|
||||||
body.append(chunk);
|
|
||||||
|
body.extend_from_slice(&chunk);
|
||||||
|
if let Some(ref sender) = done_sender {
|
||||||
|
let _ = sender.send(Data::Payload(chunk));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Ok(ReadResult::EOF) | Err(_) => {
|
Ok(ReadResult::EOF) | Err(_) => {
|
||||||
|
let mut empty_vec = Vec::new();
|
||||||
let completed_body = match *res_body.lock().unwrap() {
|
let completed_body = match *res_body.lock().unwrap() {
|
||||||
ResponseBody::Receiving(ref body) => (*body).clone(),
|
ResponseBody::Receiving(ref mut body) => {
|
||||||
_ => vec![]
|
// avoid cloning the body
|
||||||
|
swap(body, &mut empty_vec);
|
||||||
|
empty_vec
|
||||||
|
},
|
||||||
|
_ => empty_vec,
|
||||||
};
|
};
|
||||||
*res_body.lock().unwrap() = ResponseBody::Done(completed_body);
|
*res_body.lock().unwrap() = ResponseBody::Done(completed_body);
|
||||||
if let Some(sender) = done_sender {
|
if let Some(ref sender) = done_sender {
|
||||||
let _ = sender.send(());
|
let _ = sender.send(Data::Done);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -494,15 +494,13 @@ impl CoreResourceManager {
|
||||||
blocked_content: BLOCKED_CONTENT_RULES.clone(),
|
blocked_content: BLOCKED_CONTENT_RULES.clone(),
|
||||||
};
|
};
|
||||||
spawn_named(format!("fetch thread for {}", init.url), move || {
|
spawn_named(format!("fetch thread for {}", init.url), move || {
|
||||||
let sync = init.synchronous;
|
|
||||||
let request = Request::from_init(init);
|
let request = Request::from_init(init);
|
||||||
// XXXManishearth: Check origin against pipeline id
|
// XXXManishearth: Check origin against pipeline id
|
||||||
// todo load context / mimesniff in fetch
|
// todo load context / mimesniff in fetch
|
||||||
// todo referrer policy?
|
// todo referrer policy?
|
||||||
// todo service worker stuff
|
// todo service worker stuff
|
||||||
let mut target = Some(Box::new(sender) as Box<FetchTaskTarget + Send + 'static>);
|
let mut target = Some(Box::new(sender) as Box<FetchTaskTarget + Send + 'static>);
|
||||||
let response = fetch(Rc::new(request), &mut target, http_state);
|
fetch(Rc::new(request), &mut target, http_state);
|
||||||
target.unwrap().fetch_done(&response, sync);
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,7 @@ use hyper::mime::{Attr, Mime};
|
||||||
use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
|
use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
|
||||||
use msg::constellation_msg::{PipelineId, ReferrerPolicy};
|
use msg::constellation_msg::{PipelineId, ReferrerPolicy};
|
||||||
use request::{Request, RequestInit};
|
use request::{Request, RequestInit};
|
||||||
use response::{HttpsState, Response, ResponseBody};
|
use response::{HttpsState, Response};
|
||||||
use std::io::Error as IOError;
|
use std::io::Error as IOError;
|
||||||
use std::sync::mpsc::Sender;
|
use std::sync::mpsc::Sender;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
@ -164,8 +164,8 @@ pub enum FetchResponseMsg {
|
||||||
ProcessRequestEOF,
|
ProcessRequestEOF,
|
||||||
// todo: send more info about the response (or perhaps the entire Response)
|
// todo: send more info about the response (or perhaps the entire Response)
|
||||||
ProcessResponse(Result<Metadata, NetworkError>),
|
ProcessResponse(Result<Metadata, NetworkError>),
|
||||||
ProcessResponseEOF(Result<Option<Vec<u8>>, NetworkError>),
|
ProcessResponseChunk(Vec<u8>),
|
||||||
FetchDone(Result<(Metadata, Option<Vec<u8>>), NetworkError>),
|
ProcessResponseEOF(Result<(), NetworkError>),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait FetchTaskTarget {
|
pub trait FetchTaskTarget {
|
||||||
|
@ -184,22 +184,21 @@ pub trait FetchTaskTarget {
|
||||||
/// Fired when headers are received
|
/// Fired when headers are received
|
||||||
fn process_response(&mut self, response: &Response);
|
fn process_response(&mut self, response: &Response);
|
||||||
|
|
||||||
|
/// Fired when a chunk of response content is received
|
||||||
|
fn process_response_chunk(&mut self, chunk: Vec<u8>);
|
||||||
|
|
||||||
/// https://fetch.spec.whatwg.org/#process-response-end-of-file
|
/// https://fetch.spec.whatwg.org/#process-response-end-of-file
|
||||||
///
|
///
|
||||||
/// Fired when the response is fully fetched
|
/// Fired when the response is fully fetched
|
||||||
fn process_response_eof(&mut self, response: &Response);
|
fn process_response_eof(&mut self, response: &Response);
|
||||||
|
|
||||||
/// Called when fetch terminates, useful for sync
|
|
||||||
fn fetch_done(&mut self, response: &Response, sync: bool);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait FetchResponseListener {
|
pub trait FetchResponseListener {
|
||||||
fn process_request_body(&mut self);
|
fn process_request_body(&mut self);
|
||||||
fn process_request_eof(&mut self);
|
fn process_request_eof(&mut self);
|
||||||
fn process_response(&mut self, metadata: Result<Metadata, NetworkError>);
|
fn process_response(&mut self, metadata: Result<Metadata, NetworkError>);
|
||||||
fn process_response_eof(&mut self, response: Result<Option<Vec<u8>>, NetworkError>);
|
fn process_response_chunk(&mut self, chunk: Vec<u8>);
|
||||||
|
fn process_response_eof(&mut self, response: Result<(), NetworkError>);
|
||||||
fn fetch_done(&mut self, response: Result<(Metadata, Option<Vec<u8>>), NetworkError>);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FetchTaskTarget for IpcSender<FetchResponseMsg> {
|
impl FetchTaskTarget for IpcSender<FetchResponseMsg> {
|
||||||
|
@ -214,57 +213,17 @@ impl FetchTaskTarget for IpcSender<FetchResponseMsg> {
|
||||||
fn process_response(&mut self, response: &Response) {
|
fn process_response(&mut self, response: &Response) {
|
||||||
let _ = self.send(FetchResponseMsg::ProcessResponse(response.metadata()));
|
let _ = self.send(FetchResponseMsg::ProcessResponse(response.metadata()));
|
||||||
}
|
}
|
||||||
|
fn process_response_chunk(&mut self, chunk: Vec<u8>) {
|
||||||
|
let _ = self.send(FetchResponseMsg::ProcessResponseChunk(chunk));
|
||||||
|
}
|
||||||
|
|
||||||
fn process_response_eof(&mut self, response: &Response) {
|
fn process_response_eof(&mut self, response: &Response) {
|
||||||
if response.is_network_error() {
|
if response.is_network_error() {
|
||||||
// todo: finer grained errors
|
// todo: finer grained errors
|
||||||
let _ = self.send(FetchResponseMsg::ProcessResponseEOF(Err(NetworkError::Internal("Network error".into()))));
|
let _ = self.send(FetchResponseMsg::ProcessResponseEOF(Err(NetworkError::Internal("Network error".into()))));
|
||||||
|
} else {
|
||||||
|
let _ = self.send(FetchResponseMsg::ProcessResponseEOF(Ok(())));
|
||||||
}
|
}
|
||||||
if let Ok(ref guard) = response.body.lock() {
|
|
||||||
match **guard {
|
|
||||||
ResponseBody::Done(ref vec) => {
|
|
||||||
let _ = self.send(FetchResponseMsg::ProcessResponseEOF(Ok(Some(vec.clone()))));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
ResponseBody::Empty => {
|
|
||||||
let _ = self.send(FetchResponseMsg::ProcessResponseEOF(Ok(None)));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
_ => ()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If something goes wrong, log it instead of crashing the resource thread
|
|
||||||
let _ = self.send(FetchResponseMsg::ProcessResponseEOF(Err(NetworkError::Internal("Incomplete body".into()))));
|
|
||||||
}
|
|
||||||
|
|
||||||
fn fetch_done(&mut self, response: &Response, sync: bool) {
|
|
||||||
if !sync {
|
|
||||||
// fetch_done is only used by sync XHR, avoid pointless data cloning
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if response.is_network_error() {
|
|
||||||
// todo: finer grained errors
|
|
||||||
let _ = self.send(FetchResponseMsg::FetchDone(Err(NetworkError::Internal("Network error".into()))));
|
|
||||||
}
|
|
||||||
if let Ok(ref guard) = response.body.lock() {
|
|
||||||
match **guard {
|
|
||||||
ResponseBody::Done(ref vec) => {
|
|
||||||
let ret = response.metadata().map(|m| (m, Some(vec.clone())));
|
|
||||||
let _ = self.send(FetchResponseMsg::FetchDone(ret));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
ResponseBody::Empty => {
|
|
||||||
let ret = response.metadata().map(|m| (m, None));
|
|
||||||
let _ = self.send(FetchResponseMsg::FetchDone(ret));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
_ => ()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If something goes wrong, log it instead of crashing the resource thread
|
|
||||||
let _ = self.send(FetchResponseMsg::FetchDone(Err(NetworkError::Internal("Incomplete body".into()))));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -315,8 +274,8 @@ impl<T: FetchResponseListener> Action<T> for FetchResponseMsg {
|
||||||
FetchResponseMsg::ProcessRequestBody => listener.process_request_body(),
|
FetchResponseMsg::ProcessRequestBody => listener.process_request_body(),
|
||||||
FetchResponseMsg::ProcessRequestEOF => listener.process_request_eof(),
|
FetchResponseMsg::ProcessRequestEOF => listener.process_request_eof(),
|
||||||
FetchResponseMsg::ProcessResponse(meta) => listener.process_response(meta),
|
FetchResponseMsg::ProcessResponse(meta) => listener.process_response(meta),
|
||||||
|
FetchResponseMsg::ProcessResponseChunk(data) => listener.process_response_chunk(data),
|
||||||
FetchResponseMsg::ProcessResponseEOF(data) => listener.process_response_eof(data),
|
FetchResponseMsg::ProcessResponseEOF(data) => listener.process_response_eof(data),
|
||||||
FetchResponseMsg::FetchDone(response) => listener.fetch_done(response),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,7 +47,7 @@ use net_traits::CoreResourceMsg::Fetch;
|
||||||
use net_traits::trim_http_whitespace;
|
use net_traits::trim_http_whitespace;
|
||||||
use net_traits::{FetchResponseListener, Metadata, NetworkError, RequestSource};
|
use net_traits::{FetchResponseListener, Metadata, NetworkError, RequestSource};
|
||||||
use net_traits::{CoreResourceThread, LoadOrigin};
|
use net_traits::{CoreResourceThread, LoadOrigin};
|
||||||
use net_traits::request::{CredentialsMode, Destination, RequestInit, RequestMode, Origin};
|
use net_traits::request::{CredentialsMode, Destination, RequestInit, RequestMode};
|
||||||
use network_listener::{NetworkListener, PreInvoke};
|
use network_listener::{NetworkListener, PreInvoke};
|
||||||
use parse::html::{ParseContext, parse_html};
|
use parse::html::{ParseContext, parse_html};
|
||||||
use parse::xml::{self, parse_xml};
|
use parse::xml::{self, parse_xml};
|
||||||
|
@ -230,14 +230,13 @@ impl XMLHttpRequest {
|
||||||
*self.sync_status.borrow_mut() = Some(rv);
|
*self.sync_status.borrow_mut() = Some(rv);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fn process_response_eof(&mut self, response: Result<Option<Vec<u8>>, NetworkError>) {
|
fn process_response_chunk(&mut self, mut chunk: Vec<u8>) {
|
||||||
|
self.buf.borrow_mut().append(&mut chunk);
|
||||||
|
self.xhr.root().process_data_available(self.gen_id, self.buf.borrow().clone());
|
||||||
|
}
|
||||||
|
fn process_response_eof(&mut self, response: Result<(), NetworkError>) {
|
||||||
match response {
|
match response {
|
||||||
Ok(buf) => {
|
Ok(()) => {
|
||||||
if let Some(buf) = buf {
|
|
||||||
*self.buf.borrow_mut() = buf;
|
|
||||||
// todo move to a process_chunk
|
|
||||||
self.xhr.root().process_data_available(self.gen_id, self.buf.borrow().clone());
|
|
||||||
}
|
|
||||||
let rv = self.xhr.root().process_response_complete(self.gen_id, Ok(()));
|
let rv = self.xhr.root().process_response_complete(self.gen_id, Ok(()));
|
||||||
*self.sync_status.borrow_mut() = Some(rv);
|
*self.sync_status.borrow_mut() = Some(rv);
|
||||||
}
|
}
|
||||||
|
@ -247,17 +246,6 @@ impl XMLHttpRequest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fn fetch_done(&mut self, response: Result<(Metadata, Option<Vec<u8>>), NetworkError>) {
|
|
||||||
match response {
|
|
||||||
Ok(response) => {
|
|
||||||
self.process_response(Ok(response.0));
|
|
||||||
self.process_response_eof(Ok(response.1));
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
self.process_response_eof(Err(err));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PreInvoke for XHRContext {
|
impl PreInvoke for XHRContext {
|
||||||
|
|
|
@ -41,11 +41,11 @@ impl FetchTaskTarget for FetchResponseCollector {
|
||||||
fn process_request_body(&mut self, _: &Request) {}
|
fn process_request_body(&mut self, _: &Request) {}
|
||||||
fn process_request_eof(&mut self, _: &Request) {}
|
fn process_request_eof(&mut self, _: &Request) {}
|
||||||
fn process_response(&mut self, _: &Response) {}
|
fn process_response(&mut self, _: &Response) {}
|
||||||
|
fn process_response_chunk(&mut self, _: Vec<u8>) {}
|
||||||
/// Fired when the response is fully fetched
|
/// Fired when the response is fully fetched
|
||||||
fn process_response_eof(&mut self, response: &Response) {
|
fn process_response_eof(&mut self, response: &Response) {
|
||||||
self.sender.send(response.clone());
|
self.sender.send(response.clone());
|
||||||
}
|
}
|
||||||
fn fetch_done(&mut self, _: &Response, _: bool) {}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fetch_async(request: Request, target: Box<FetchTaskTarget + Send>) {
|
fn fetch_async(request: Request, target: Box<FetchTaskTarget + Send>) {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue