Implement cancellation listener for cancelling network requests

This commit is contained in:
Ravi Shankar 2015-11-07 18:48:37 +05:30
parent 92f9e58310
commit 10f5584f78
11 changed files with 191 additions and 53 deletions

View file

@ -19,10 +19,12 @@ use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
use mime_classifier::{ApacheBugFlag, MIMEClassifier, NoSniffFlag};
use net_traits::ProgressMsg::Done;
use net_traits::{AsyncResponseTarget, Metadata, ProgressMsg, ResourceTask, ResponseAction};
use net_traits::{ControlMsg, CookieSource, LoadConsumer, LoadData, LoadResponse};
use net_traits::{ControlMsg, CookieSource, LoadConsumer, LoadData, LoadResponse, ResourceId};
use std::borrow::ToOwned;
use std::boxed::FnBox;
use std::sync::mpsc::{Sender, channel};
use std::cell::Cell;
use std::collections::HashMap;
use std::sync::mpsc::{Receiver, Sender, channel};
use std::sync::{Arc, RwLock};
use url::Url;
use util::opts;
@ -146,6 +148,7 @@ pub fn new_resource_task(user_agent: String,
};
let (setup_chan, setup_port) = ipc::channel().unwrap();
let setup_chan_clone = setup_chan.clone();
spawn_named("ResourceManager".to_owned(), move || {
let resource_manager = ResourceManager::new(
user_agent, hsts_preload, devtools_chan
@ -155,8 +158,7 @@ pub fn new_resource_task(user_agent: String,
from_client: setup_port,
resource_manager: resource_manager
};
channel_manager.start();
channel_manager.start(setup_chan_clone);
});
setup_chan
}
@ -167,28 +169,85 @@ struct ResourceChannelManager {
}
impl ResourceChannelManager {
fn start(&mut self) {
fn start(&mut self, control_sender: ResourceTask) {
loop {
match self.from_client.recv().unwrap() {
ControlMsg::Load(load_data, consumer) => {
self.resource_manager.load(load_data, consumer)
}
ControlMsg::SetCookiesForUrl(request, cookie_list, source) => {
self.resource_manager.set_cookies_for_url(request, cookie_list, source)
}
ControlMsg::Load(load_data, consumer, id_sender) =>
self.resource_manager.load(load_data, consumer, id_sender, control_sender.clone()),
ControlMsg::SetCookiesForUrl(request, cookie_list, source) =>
self.resource_manager.set_cookies_for_url(request, cookie_list, source),
ControlMsg::GetCookiesForUrl(url, consumer, source) => {
let cookie_jar = &self.resource_manager.cookie_storage;
let mut cookie_jar = cookie_jar.write().unwrap();
consumer.send(cookie_jar.cookies_for_url(&url, source)).unwrap();
}
ControlMsg::Exit => {
break
ControlMsg::Cancel(res_id) => {
if let Some(cancel_sender) = self.resource_manager.cancel_load_map.get(&res_id) {
let _ = cancel_sender.send(());
}
self.resource_manager.cancel_load_map.remove(&res_id);
}
ControlMsg::Exit => break,
}
}
}
}
/// The optional resources required by the `CancellationListener`
pub struct CancellableResource {
/// The receiver which receives a message on load cancellation
cancel_receiver: Receiver<()>,
/// The `CancellationListener` is unique to this `ResourceId`
resource_id: ResourceId,
/// If we haven't initiated any cancel requests, then the loaders ask
/// the listener to remove the `ResourceId` in the `HashMap` of
/// `ResourceManager` once they finish loading
resource_task: ResourceTask,
}
/// A listener which is basically a wrapped optional receiver which looks
/// for the load cancellation message. Some of the loading processes always keep
/// an eye out for this message and stop loading stuff once they receive it.
pub struct CancellationListener {
/// We'll be needing the resources only if we plan to cancel it
cancel_resource: Option<CancellableResource>,
/// This lets us know whether the request has already been cancelled
cancel_status: Cell<bool>,
}
impl CancellationListener {
pub fn new(resources: Option<CancellableResource>) -> CancellationListener {
CancellationListener {
cancel_resource: resources,
cancel_status: Cell::new(false),
}
}
pub fn is_cancelled(&self) -> bool {
match self.cancel_resource {
Some(ref resource) => {
match resource.cancel_receiver.try_recv() {
Ok(_) => {
self.cancel_status.set(true);
true
},
Err(_) => self.cancel_status.get(),
}
},
None => false, // channel doesn't exist!
}
}
}
impl Drop for CancellationListener {
fn drop(&mut self) {
if let Some(ref resource) = self.cancel_resource {
// Ensure that the resource manager stops tracking this request now that it's terminated.
let _ = resource.resource_task.send(ControlMsg::Cancel(resource.resource_id));
}
}
}
pub struct ResourceManager {
user_agent: String,
cookie_storage: Arc<RwLock<CookieStorage>>,
@ -196,6 +255,8 @@ pub struct ResourceManager {
devtools_chan: Option<Sender<DevtoolsControlMsg>>,
hsts_list: Arc<RwLock<HSTSList>>,
connector: Arc<Pool<Connector>>,
cancel_load_map: HashMap<ResourceId, Sender<()>>,
next_resource_id: ResourceId,
}
impl ResourceManager {
@ -209,11 +270,11 @@ impl ResourceManager {
devtools_chan: devtools_channel,
hsts_list: Arc::new(RwLock::new(hsts_list)),
connector: create_http_connector(),
cancel_load_map: HashMap::new(),
next_resource_id: ResourceId(0),
}
}
}
impl ResourceManager {
fn set_cookies_for_url(&mut self, request: Url, cookie_list: String, source: CookieSource) {
let header = Header::parse_header(&[cookie_list.into_bytes()]);
if let Ok(SetCookie(cookies)) = header {
@ -227,15 +288,36 @@ impl ResourceManager {
}
}
fn load(&mut self, load_data: LoadData, consumer: LoadConsumer) {
fn load(&mut self,
load_data: LoadData,
consumer: LoadConsumer,
id_sender: Option<IpcSender<ResourceId>>,
resource_task: ResourceTask) {
fn from_factory(factory: fn(LoadData, LoadConsumer, Arc<MIMEClassifier>))
-> Box<FnBox(LoadData, LoadConsumer, Arc<MIMEClassifier>) + Send> {
box move |load_data, senders, classifier| {
factory(load_data, senders, classifier)
fn from_factory(factory: fn(LoadData, LoadConsumer, Arc<MIMEClassifier>, CancellationListener))
-> Box<FnBox(LoadData,
LoadConsumer,
Arc<MIMEClassifier>,
CancellationListener) + Send> {
box move |load_data, senders, classifier, cancel_listener| {
factory(load_data, senders, classifier, cancel_listener)
}
}
let cancel_resource = id_sender.map(|sender| {
let current_res_id = self.next_resource_id;
let _ = sender.send(current_res_id);
let (cancel_sender, cancel_receiver) = channel();
self.cancel_load_map.insert(current_res_id, cancel_sender);
self.next_resource_id.0 += 1;
CancellableResource {
cancel_receiver: cancel_receiver,
resource_id: current_res_id,
resource_task: resource_task,
}
});
let cancel_listener = CancellationListener::new(cancel_resource);
let loader = match &*load_data.url.scheme {
"file" => from_factory(file_loader::factory),
"http" | "https" | "view-source" =>
@ -254,6 +336,9 @@ impl ResourceManager {
};
debug!("resource_task: loading url: {}", load_data.url.serialize());
loader.call_box((load_data, consumer, self.mime_classifier.clone()));
loader.call_box((load_data,
consumer,
self.mime_classifier.clone(),
cancel_listener));
}
}