Auto merge of #19274 - Manishearth:xhr-cancel, r=jdm

Fetch cancellation

This PR implements cancellation for fetch, and uses it for XHR. This means that fetch clients can now send a message to the fetch task asking for the network request to be aborted.

Previously, clients like XHR had abort functionality but would implement it by simply ignoring future messages from the network task; and would not actually cancel the network fetch.

<!-- Reviewable:start -->
---
This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/servo/servo/19274)
<!-- Reviewable:end -->
This commit is contained in:
bors-servo 2017-11-20 20:48:17 -06:00 committed by GitHub
commit 00b3612fe9
12 changed files with 98 additions and 31 deletions

View file

@ -15,6 +15,7 @@ use hyper::header::{Header, HeaderFormat, HeaderView, Headers, Referer as Refere
use hyper::method::Method;
use hyper::mime::{Mime, SubLevel, TopLevel};
use hyper::status::StatusCode;
use ipc_channel::ipc::IpcReceiver;
use mime_guess::guess_mime_type;
use net_traits::{FetchTaskTarget, NetworkError, ReferrerPolicy};
use net_traits::request::{CredentialsMode, Destination, Referrer, Request, RequestMode};
@ -27,7 +28,7 @@ use std::fs::File;
use std::io::Read;
use std::mem;
use std::str;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Sender, Receiver};
use subresource_integrity::is_response_integrity_valid;
@ -36,6 +37,7 @@ pub type Target<'a> = &'a mut (FetchTaskTarget + Send);
pub enum Data {
Payload(Vec<u8>),
Done,
Cancelled,
}
pub struct FetchContext {
@ -43,8 +45,37 @@ pub struct FetchContext {
pub user_agent: Cow<'static, str>,
pub devtools_chan: Option<Sender<DevtoolsControlMsg>>,
pub filemanager: FileManager,
pub cancellation_listener: Arc<Mutex<CancellationListener>>,
}
pub struct CancellationListener {
cancel_chan: Option<IpcReceiver<()>>,
cancelled: bool,
}
impl CancellationListener {
pub fn new(cancel_chan: Option<IpcReceiver<()>>) -> Self {
Self {
cancel_chan: cancel_chan,
cancelled: false,
}
}
pub fn cancelled(&mut self) -> bool {
if let Some(ref cancel_chan) = self.cancel_chan {
if self.cancelled {
true
} else if cancel_chan.try_recv().is_ok() {
self.cancelled = true;
true
} else {
false
}
} else {
false
}
}
}
pub type DoneChannel = Option<(Sender<Data>, Receiver<Data>)>;
/// [Fetch](https://fetch.spec.whatwg.org#concept-fetch)
@ -317,7 +348,7 @@ pub fn main_fetch(request: &mut Request,
};
// Execute deferred rebinding of response.
let response = if let Some(error) = internal_error {
let mut response = if let Some(error) = internal_error {
Response::network_error(error)
} else {
response
@ -325,9 +356,9 @@ pub fn main_fetch(request: &mut Request,
// Step 19.
let mut response_loaded = false;
let response = if !response.is_network_error() && !request.integrity_metadata.is_empty() {
let mut response = if !response.is_network_error() && !request.integrity_metadata.is_empty() {
// Step 19.1.
wait_for_response(&response, target, done_chan);
wait_for_response(&mut response, target, done_chan);
response_loaded = true;
// Step 19.2.
@ -346,9 +377,9 @@ pub fn main_fetch(request: &mut Request,
if request.synchronous {
// process_response is not supposed to be used
// by sync fetch, but we overload it here for simplicity
target.process_response(&response);
target.process_response(&mut response);
if !response_loaded {
wait_for_response(&response, target, done_chan);
wait_for_response(&mut response, target, done_chan);
}
// overloaded similarly to process_response
target.process_response_eof(&response);
@ -370,7 +401,7 @@ pub fn main_fetch(request: &mut Request,
// Step 23.
if !response_loaded {
wait_for_response(&response, target, done_chan);
wait_for_response(&mut response, target, done_chan);
}
// Step 24.
@ -381,7 +412,7 @@ pub fn main_fetch(request: &mut Request,
response
}
fn wait_for_response(response: &Response, target: Target, done_chan: &mut DoneChannel) {
fn wait_for_response(response: &mut Response, target: Target, done_chan: &mut DoneChannel) {
if let Some(ref ch) = *done_chan {
loop {
match ch.1.recv()
@ -390,6 +421,10 @@ fn wait_for_response(response: &Response, target: Target, done_chan: &mut DoneCh
target.process_response_chunk(vec);
},
Data::Done => break,
Data::Cancelled => {
response.aborted = true;
break;
}
}
}
} else {

View file

@ -1103,6 +1103,10 @@ fn http_network_fetch(request: &Request,
let devtools_sender = context.devtools_chan.clone();
let meta_status = meta.status.clone();
let meta_headers = meta.headers.clone();
let cancellation_listener = context.cancellation_listener.clone();
if cancellation_listener.lock().unwrap().cancelled() {
return Response::network_error(NetworkError::Internal("Fetch aborted".into()))
}
thread::Builder::new().name(format!("fetch worker thread")).spawn(move || {
match StreamedResponse::from_http_response(res) {
Ok(mut res) => {
@ -1125,6 +1129,11 @@ fn http_network_fetch(request: &Request,
}
loop {
if cancellation_listener.lock().unwrap().cancelled() {
*res_body.lock().unwrap() = ResponseBody::Done(vec![]);
let _ = done_sender.send(Data::Cancelled);
return;
}
match read_block(&mut res) {
Ok(Data::Payload(chunk)) => {
if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() {
@ -1144,6 +1153,7 @@ fn http_network_fetch(request: &Request,
let _ = done_sender.send(Data::Done);
break;
}
Ok(Data::Cancelled) => unreachable!() // read_block doesn't return Data::Cancelled
}
}
}

View file

@ -9,7 +9,7 @@ use cookie_rs;
use cookie_storage::CookieStorage;
use devtools_traits::DevtoolsControlMsg;
use fetch::cors_cache::CorsCache;
use fetch::methods::{FetchContext, fetch};
use fetch::methods::{CancellationListener, FetchContext, fetch};
use filemanager_thread::{FileManager, TFDProvider};
use hsts::HstsList;
use http_cache::HttpCache;
@ -36,7 +36,7 @@ use std::fs::File;
use std::io::prelude::*;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::mpsc::Sender;
use std::thread;
use storage_thread::StorageThreadFactory;
@ -160,14 +160,14 @@ impl ResourceChannelManager {
match msg {
CoreResourceMsg::Fetch(req_init, channels) => {
match channels {
FetchChannels::ResponseMsg(sender) =>
self.resource_manager.fetch(req_init, None, sender, http_state),
FetchChannels::ResponseMsg(sender, cancel_chan) =>
self.resource_manager.fetch(req_init, None, sender, http_state, cancel_chan),
FetchChannels::WebSocket { event_sender, action_receiver } =>
self.resource_manager.websocket_connect(req_init, event_sender, action_receiver, http_state),
}
}
CoreResourceMsg::FetchRedirect(req_init, res_init, sender) =>
self.resource_manager.fetch(req_init, Some(res_init), sender, http_state),
CoreResourceMsg::FetchRedirect(req_init, res_init, sender, cancel_chan) =>
self.resource_manager.fetch(req_init, Some(res_init), sender, http_state, cancel_chan),
CoreResourceMsg::SetCookieForUrl(request, cookie, source) =>
self.resource_manager.set_cookie_for_url(&request, cookie.into_inner(), source, http_state),
CoreResourceMsg::SetCookiesForUrl(request, cookies, source) => {
@ -332,7 +332,8 @@ impl CoreResourceManager {
req_init: RequestInit,
res_init_: Option<ResponseInit>,
mut sender: IpcSender<FetchResponseMsg>,
http_state: &Arc<HttpState>) {
http_state: &Arc<HttpState>,
cancel_chan: Option<IpcReceiver<()>>) {
let http_state = http_state.clone();
let ua = self.user_agent.clone();
let dc = self.devtools_chan.clone();
@ -349,6 +350,7 @@ impl CoreResourceManager {
user_agent: ua,
devtools_chan: dc,
filemanager: filemanager,
cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(cancel_chan))),
};
match res_init_ {