Shared Sniffer Task

- Added TargetedLoadResponse and ResponseSenders
- LoadData constructor contains the next consumer which means
  SnifferManager doesn't need the next consumer to start
- New SnifferTask is created at new resource_task creation
- Update Unit Tests
This commit is contained in:
Kshitij Parajuli 2014-11-15 13:16:37 -05:00
parent 82050d1e53
commit f5e9ae17cf
9 changed files with 134 additions and 79 deletions

View file

@ -9,6 +9,7 @@ use data_loader;
use file_loader;
use http_loader;
use sniffer_task;
use sniffer_task::SnifferTask;
use std::comm::{channel, Receiver, Sender};
use http::headers::content_type::MediaType;
@ -24,7 +25,7 @@ use servo_util::task::spawn_named;
pub enum ControlMsg {
/// Request the data associated with a particular URL
Load(LoadData, Sender<LoadResponse>),
Load(LoadData),
Exit
}
@ -34,17 +35,19 @@ pub struct LoadData {
pub method: Method,
pub headers: RequestHeaderCollection,
pub data: Option<Vec<u8>>,
pub cors: Option<ResourceCORSData>
pub cors: Option<ResourceCORSData>,
pub consumer: Sender<LoadResponse>,
}
impl LoadData {
pub fn new(url: Url) -> LoadData {
pub fn new(url: Url, consumer: Sender<LoadResponse>) -> LoadData {
LoadData {
url: url,
method: Get,
headers: RequestHeaderCollection::new(),
data: None,
cors: None
cors: None,
consumer: consumer,
}
}
}
@ -116,6 +119,17 @@ pub struct LoadResponse {
/// Port for reading data.
pub progress_port: Receiver<ProgressMsg>,
}
/// A LoadResponse directed at a particular consumer
pub struct TargetedLoadResponse {
pub load_response: LoadResponse,
pub consumer: Sender<LoadResponse>,
}
// Data structure containing ports
pub struct ResponseSenders {
pub immediate_consumer: Sender<TargetedLoadResponse>,
pub eventual_consumer: Sender<LoadResponse>,
}
/// Messages sent in response to a `Load` message
#[deriving(PartialEq,Show)]
@ -127,16 +141,19 @@ pub enum ProgressMsg {
}
/// For use by loaders in responding to a Load message.
pub fn start_sending(start_chan: Sender<LoadResponse>, metadata: Metadata) -> Sender<ProgressMsg> {
start_sending_opt(start_chan, metadata).ok().unwrap()
pub fn start_sending(senders: ResponseSenders, metadata: Metadata) -> Sender<ProgressMsg> {
start_sending_opt(senders, metadata).ok().unwrap()
}
/// For use by loaders in responding to a Load message.
pub fn start_sending_opt(start_chan: Sender<LoadResponse>, metadata: Metadata) -> Result<Sender<ProgressMsg>, ()> {
pub fn start_sending_opt(senders: ResponseSenders, metadata: Metadata) -> Result<Sender<ProgressMsg>, ()> {
let (progress_chan, progress_port) = channel();
let result = start_chan.send_opt(LoadResponse {
metadata: metadata,
progress_port: progress_port,
let result = senders.immediate_consumer.send_opt(TargetedLoadResponse {
load_response: LoadResponse {
metadata: metadata,
progress_port: progress_port,
},
consumer: senders.eventual_consumer
});
match result {
Ok(_) => Ok(progress_chan),
@ -148,7 +165,7 @@ pub fn start_sending_opt(start_chan: Sender<LoadResponse>, metadata: Metadata) -
pub fn load_whole_resource(resource_task: &ResourceTask, url: Url)
-> Result<(Metadata, Vec<u8>), String> {
let (start_chan, start_port) = channel();
resource_task.send(Load(LoadData::new(url), start_chan));
resource_task.send(Load(LoadData::new(url, start_chan)));
let response = start_port.recv();
let mut buf = vec!();
@ -167,8 +184,9 @@ pub type ResourceTask = Sender<ControlMsg>;
/// Create a ResourceTask
pub fn new_resource_task(user_agent: Option<String>) -> ResourceTask {
let (setup_chan, setup_port) = channel();
let sniffer_task = sniffer_task::new_sniffer_task();
spawn_named("ResourceManager", proc() {
ResourceManager::new(setup_port, user_agent).start();
ResourceManager::new(setup_port, user_agent, sniffer_task).start();
});
setup_chan
}
@ -176,13 +194,15 @@ pub fn new_resource_task(user_agent: Option<String>) -> ResourceTask {
struct ResourceManager {
from_client: Receiver<ControlMsg>,
user_agent: Option<String>,
sniffer_task: SnifferTask,
}
impl ResourceManager {
fn new(from_client: Receiver<ControlMsg>, user_agent: Option<String>) -> ResourceManager {
fn new(from_client: Receiver<ControlMsg>, user_agent: Option<String>, sniffer_task: SnifferTask) -> ResourceManager {
ResourceManager {
from_client: from_client,
user_agent: user_agent,
sniffer_task: sniffer_task,
}
}
}
@ -192,8 +212,8 @@ impl ResourceManager {
fn start(&self) {
loop {
match self.from_client.recv() {
Load(load_data, start_chan) => {
self.load(load_data, start_chan)
Load(load_data) => {
self.load(load_data)
}
Exit => {
break
@ -202,15 +222,13 @@ impl ResourceManager {
}
}
fn load(&self, load_data: LoadData, start_chan: Sender<LoadResponse>) {
fn load(&self, load_data: LoadData) {
let mut load_data = load_data;
load_data.headers.user_agent = self.user_agent.clone();
// Create new communication channel, create new sniffer task,
// send all the data to the new sniffer task with the send
// end of the pipe, receive all the data.
let sniffer_task = sniffer_task::new_sniffer_task(start_chan.clone());
let senders = ResponseSenders {
immediate_consumer: self.sniffer_task.clone(),
eventual_consumer: load_data.consumer.clone(),
};
let loader = match load_data.url.scheme.as_slice() {
"file" => file_loader::factory,
@ -219,21 +237,21 @@ impl ResourceManager {
"about" => about_loader::factory,
_ => {
debug!("resource_task: no loader for scheme {:s}", load_data.url.scheme);
start_sending(start_chan, Metadata::default(load_data.url))
start_sending(senders, Metadata::default(load_data.url))
.send(Done(Err("no loader for scheme".to_string())));
return
}
};
debug!("resource_task: loading url: {:s}", load_data.url.serialize());
loader(load_data, sniffer_task);
loader(load_data, self.sniffer_task.clone());
}
}
/// Load a URL asynchronously and iterate over chunks of bytes from the response.
pub fn load_bytes_iter(resource_task: &ResourceTask, url: Url) -> (Metadata, ProgressMsgPortIterator) {
let (input_chan, input_port) = channel();
resource_task.send(Load(LoadData::new(url), input_chan));
resource_task.send(Load(LoadData::new(url, input_chan)));
let response = input_port.recv();
let iter = ProgressMsgPortIterator { progress_port: response.progress_port };
@ -269,7 +287,7 @@ fn test_bad_scheme() {
let resource_task = new_resource_task(None);
let (start_chan, start) = channel();
let url = Url::parse("bogus://whatever").unwrap();
resource_task.send(Load(LoadData::new(url), start_chan));
resource_task.send(Load(LoadData::new(url, start_chan)));
let response = start.recv();
match response.progress_port.recv() {
Done(result) => { assert!(result.is_err()) }