auto merge of #4000 : t29/servo/mime-sniffing, r=jdm

Issue: #3144

This PR addresses the second step of the ticket. i.e. move from a 1:1 sniffer:request task model to a shared sniffer task.
This commit is contained in:
bors-servo 2014-11-28 10:51:44 -07:00
commit 1ac79c64da
9 changed files with 134 additions and 79 deletions

View file

@ -2,7 +2,7 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use resource_task::{LoadResponse, Metadata, Done, LoadData, start_sending};
use resource_task::{TargetedLoadResponse, Metadata, Done, LoadData, start_sending, ResponseSenders};
use file_loader;
use std::io::fs::PathExtensions;
@ -11,10 +11,14 @@ use http::status::Ok as StatusOk;
use servo_util::resource_files::resources_dir_path;
pub fn factory(mut load_data: LoadData, start_chan: Sender<LoadResponse>) {
pub fn factory(mut load_data: LoadData, start_chan: Sender<TargetedLoadResponse>) {
let senders = ResponseSenders {
immediate_consumer: start_chan.clone(),
eventual_consumer: load_data.consumer.clone(),
};
match load_data.url.non_relative_scheme_data().unwrap() {
"blank" => {
let chan = start_sending(start_chan, Metadata {
let chan = start_sending(senders, Metadata {
final_url: load_data.url,
content_type: Some(("text".to_string(), "html".to_string())),
charset: Some("utf-8".to_string()),
@ -32,7 +36,7 @@ pub fn factory(mut load_data: LoadData, start_chan: Sender<LoadResponse>) {
load_data.url = Url::from_file_path(&path).unwrap();
}
_ => {
start_sending(start_chan, Metadata::default(load_data.url))
start_sending(senders, Metadata::default(load_data.url))
.send(Done(Err("Unknown about: URL.".to_string())));
return
}

View file

@ -2,7 +2,7 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use resource_task::{Done, Payload, Metadata, LoadData, LoadResponse, start_sending};
use resource_task::{Done, Payload, Metadata, LoadData, TargetedLoadResponse, start_sending, ResponseSenders};
use serialize::base64::FromBase64;
@ -11,7 +11,7 @@ use http::headers::content_type::MediaType;
use url::{percent_decode, NonRelativeSchemeData};
pub fn factory(load_data: LoadData, start_chan: Sender<LoadResponse>) {
pub fn factory(load_data: LoadData, start_chan: Sender<TargetedLoadResponse>) {
// NB: we don't spawn a new task.
// Hypothesis: data URLs are too small for parallel base64 etc. to be worth it.
// Should be tested at some point.
@ -19,12 +19,17 @@ pub fn factory(load_data: LoadData, start_chan: Sender<LoadResponse>) {
load(load_data, start_chan)
}
fn load(load_data: LoadData, start_chan: Sender<LoadResponse>) {
fn load(load_data: LoadData, start_chan: Sender<TargetedLoadResponse>) {
let url = load_data.url;
assert!("data" == url.scheme.as_slice());
let mut metadata = Metadata::default(url.clone());
let senders = ResponseSenders {
immediate_consumer: start_chan,
eventual_consumer: load_data.consumer,
};
// Split out content type and data.
let mut scheme_data = match url.scheme_data {
NonRelativeSchemeData(scheme_data) => scheme_data,
@ -39,7 +44,7 @@ fn load(load_data: LoadData, start_chan: Sender<LoadResponse>) {
}
let parts: Vec<&str> = scheme_data.as_slice().splitn(1, ',').collect();
if parts.len() != 2 {
start_sending(start_chan, metadata).send(Done(Err("invalid data uri".to_string())));
start_sending(senders, metadata).send(Done(Err("invalid data uri".to_string())));
return;
}
@ -57,7 +62,7 @@ fn load(load_data: LoadData, start_chan: Sender<LoadResponse>) {
let content_type: Option<MediaType> = from_stream_with_str(ct_str);
metadata.set_content_type(&content_type);
let progress_chan = start_sending(start_chan, metadata);
let progress_chan = start_sending(senders, metadata);
let bytes = percent_decode(parts[1].as_bytes());
if is_base64 {
@ -86,9 +91,11 @@ fn assert_parse(url: &'static str,
data: Option<Vec<u8>>) {
use std::comm;
use url::Url;
use sniffer_task;
let (start_chan, start_port) = comm::channel();
load(LoadData::new(Url::parse(url).unwrap()), start_chan);
let sniffer_task = sniffer_task::new_sniffer_task();
load(LoadData::new(Url::parse(url).unwrap(), start_chan), sniffer_task);
let response = start_port.recv();
assert_eq!(&response.metadata.content_type, &content_type);

View file

@ -2,8 +2,7 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use resource_task::{ProgressMsg, Metadata, Payload, Done, LoadData, start_sending};
use resource_task::{LoadResponse};
use resource_task::{ProgressMsg, Metadata, Payload, Done, LoadData, start_sending, TargetedLoadResponse, ResponseSenders};
use std::io;
use std::io::File;
@ -30,10 +29,14 @@ fn read_all(reader: &mut io::Stream, progress_chan: &Sender<ProgressMsg>)
}
}
pub fn factory(load_data: LoadData, start_chan: Sender<LoadResponse>) {
pub fn factory(load_data: LoadData, start_chan: Sender<TargetedLoadResponse>) {
let url = load_data.url;
assert!("file" == url.scheme.as_slice());
let progress_chan = start_sending(start_chan, Metadata::default(url.clone()));
let senders = ResponseSenders {
immediate_consumer: start_chan,
eventual_consumer: load_data.consumer,
};
let progress_chan = start_sending(senders, Metadata::default(url.clone()));
spawn_named("file_loader", proc() {
let file_path: Result<Path, ()> = url.to_file_path();
match file_path {

View file

@ -2,7 +2,7 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use resource_task::{Metadata, Payload, Done, LoadResponse, LoadData, start_sending_opt};
use resource_task::{Metadata, Payload, Done, TargetedLoadResponse, LoadData, start_sending_opt, ResponseSenders};
use log;
use std::collections::HashSet;
@ -12,21 +12,21 @@ use std::io::Reader;
use servo_util::task::spawn_named;
use url::Url;
pub fn factory(load_data: LoadData, start_chan: Sender<LoadResponse>) {
pub fn factory(load_data: LoadData, start_chan: Sender<TargetedLoadResponse>) {
spawn_named("http_loader", proc() load(load_data, start_chan))
}
fn send_error(url: Url, err: String, start_chan: Sender<LoadResponse>) {
fn send_error(url: Url, err: String, senders: ResponseSenders) {
let mut metadata = Metadata::default(url);
metadata.status = None;
match start_sending_opt(start_chan, metadata) {
match start_sending_opt(senders, metadata) {
Ok(p) => p.send(Done(Err(err))),
_ => {}
};
}
fn load(load_data: LoadData, start_chan: Sender<LoadResponse>) {
fn load(load_data: LoadData, start_chan: Sender<TargetedLoadResponse>) {
// FIXME: At the time of writing this FIXME, servo didn't have any central
// location for configuration. If you're reading this and such a
// repository DOES exist, please update this constant to use it.
@ -35,17 +35,22 @@ fn load(load_data: LoadData, start_chan: Sender<LoadResponse>) {
let mut url = load_data.url.clone();
let mut redirected_to = HashSet::new();
let senders = ResponseSenders {
immediate_consumer: start_chan,
eventual_consumer: load_data.consumer
};
// Loop to handle redirects.
loop {
iters = iters + 1;
if iters > max_redirects {
send_error(url, "too many redirects".to_string(), start_chan);
send_error(url, "too many redirects".to_string(), senders);
return;
}
if redirected_to.contains(&url) {
send_error(url, "redirect loop".to_string(), start_chan);
send_error(url, "redirect loop".to_string(), senders);
return;
}
@ -55,7 +60,7 @@ fn load(load_data: LoadData, start_chan: Sender<LoadResponse>) {
"http" | "https" => {}
_ => {
let s = format!("{:s} request, but we don't support that scheme", url.scheme);
send_error(url, s, start_chan);
send_error(url, s, senders);
return;
}
}
@ -66,7 +71,7 @@ fn load(load_data: LoadData, start_chan: Sender<LoadResponse>) {
let mut writer = match request {
Ok(w) => box w,
Err(e) => {
send_error(url, e.desc.to_string(), start_chan);
send_error(url, e.desc.to_string(), senders);
return;
}
};
@ -84,7 +89,7 @@ fn load(load_data: LoadData, start_chan: Sender<LoadResponse>) {
writer.headers.content_length = Some(data.len());
match writer.write(data.as_slice()) {
Err(e) => {
send_error(url, e.desc.to_string(), start_chan);
send_error(url, e.desc.to_string(), senders);
return;
}
_ => {}
@ -95,7 +100,7 @@ fn load(load_data: LoadData, start_chan: Sender<LoadResponse>) {
let mut response = match writer.read_response() {
Ok(r) => r,
Err((_, e)) => {
send_error(url, e.desc.to_string(), start_chan);
send_error(url, e.desc.to_string(), senders);
return;
}
};
@ -116,7 +121,7 @@ fn load(load_data: LoadData, start_chan: Sender<LoadResponse>) {
Some(ref c) => {
if c.preflight {
// The preflight lied
send_error(url, "Preflight fetch inconsistent with main fetch".to_string(), start_chan);
send_error(url, "Preflight fetch inconsistent with main fetch".to_string(), senders);
return;
} else {
// XXXManishearth There are some CORS-related steps here,
@ -138,7 +143,7 @@ fn load(load_data: LoadData, start_chan: Sender<LoadResponse>) {
metadata.headers = Some(response.headers.clone());
metadata.status = Some(response.status.clone());
let progress_chan = match start_sending_opt(start_chan, metadata) {
let progress_chan = match start_sending_opt(senders, metadata) {
Ok(p) => p,
_ => return
};

View file

@ -443,7 +443,7 @@ impl ImageCacheTask {
fn load_image_data(url: Url, resource_task: ResourceTask) -> Result<Vec<u8>, ()> {
let (response_chan, response_port) = channel();
resource_task.send(resource_task::Load(LoadData::new(url), response_chan));
resource_task.send(resource_task::Load(LoadData::new(url, response_chan)));
let mut image_data = vec!();
@ -481,7 +481,8 @@ mod tests {
use super::*;
use resource_task;
use resource_task::{ResourceTask, Metadata, start_sending};
use resource_task::{ResourceTask, Metadata, start_sending, ResponseSenders};
use sniffer_task;
use image::base::test_image_bin;
use servo_util::taskpool::TaskPool;
use std::comm;
@ -557,8 +558,13 @@ mod tests {
spawn_listener(proc(port: Receiver<resource_task::ControlMsg>) {
loop {
match port.recv() {
resource_task::Load(_, response) => {
let chan = start_sending(response, Metadata::default(
resource_task::Load(response) => {
let sniffer_task = sniffer_task::new_sniffer_task();
let senders = ResponseSenders {
immediate_consumer: sniffer_task,
eventual_consumer: response.consumer.clone(),
};
let chan = start_sending(senders, Metadata::default(
Url::parse("file:///fake").unwrap()));
on_load.invoke(chan);
}
@ -708,8 +714,13 @@ mod tests {
let mock_resource_task = spawn_listener(proc(port: Receiver<resource_task::ControlMsg>) {
loop {
match port.recv() {
resource_task::Load(_, response) => {
let chan = start_sending(response, Metadata::default(
resource_task::Load(response) => {
let sniffer_task = sniffer_task::new_sniffer_task();
let senders = ResponseSenders {
immediate_consumer: sniffer_task,
eventual_consumer: response.consumer.clone(),
};
let chan = start_sending(senders, Metadata::default(
Url::parse("file:///fake").unwrap()));
chan.send(resource_task::Payload(test_image_bin()));
chan.send(resource_task::Done(Ok(())));
@ -755,8 +766,13 @@ mod tests {
let mock_resource_task = spawn_listener(proc(port: Receiver<resource_task::ControlMsg>) {
loop {
match port.recv() {
resource_task::Load(_, response) => {
let chan = start_sending(response, Metadata::default(
resource_task::Load(response) => {
let sniffer_task = sniffer_task::new_sniffer_task();
let senders = ResponseSenders {
immediate_consumer: sniffer_task,
eventual_consumer: response.consumer.clone(),
};
let chan = start_sending(senders, Metadata::default(
Url::parse("file:///fake").unwrap()));
chan.send(resource_task::Payload(test_image_bin()));
chan.send(resource_task::Done(Err("".to_string())));

View file

@ -9,6 +9,7 @@ use data_loader;
use file_loader;
use http_loader;
use sniffer_task;
use sniffer_task::SnifferTask;
use std::comm::{channel, Receiver, Sender};
use http::headers::content_type::MediaType;
@ -24,7 +25,7 @@ use servo_util::task::spawn_named;
pub enum ControlMsg {
/// Request the data associated with a particular URL
Load(LoadData, Sender<LoadResponse>),
Load(LoadData),
Exit
}
@ -34,17 +35,19 @@ pub struct LoadData {
pub method: Method,
pub headers: RequestHeaderCollection,
pub data: Option<Vec<u8>>,
pub cors: Option<ResourceCORSData>
pub cors: Option<ResourceCORSData>,
pub consumer: Sender<LoadResponse>,
}
impl LoadData {
pub fn new(url: Url) -> LoadData {
pub fn new(url: Url, consumer: Sender<LoadResponse>) -> LoadData {
LoadData {
url: url,
method: Get,
headers: RequestHeaderCollection::new(),
data: None,
cors: None
cors: None,
consumer: consumer,
}
}
}
@ -116,6 +119,17 @@ pub struct LoadResponse {
/// Port for reading data.
pub progress_port: Receiver<ProgressMsg>,
}
/// A LoadResponse directed at a particular consumer
pub struct TargetedLoadResponse {
pub load_response: LoadResponse,
pub consumer: Sender<LoadResponse>,
}
// Data structure containing ports
pub struct ResponseSenders {
pub immediate_consumer: Sender<TargetedLoadResponse>,
pub eventual_consumer: Sender<LoadResponse>,
}
/// Messages sent in response to a `Load` message
#[deriving(PartialEq,Show)]
@ -127,16 +141,19 @@ pub enum ProgressMsg {
}
/// For use by loaders in responding to a Load message.
pub fn start_sending(start_chan: Sender<LoadResponse>, metadata: Metadata) -> Sender<ProgressMsg> {
start_sending_opt(start_chan, metadata).ok().unwrap()
pub fn start_sending(senders: ResponseSenders, metadata: Metadata) -> Sender<ProgressMsg> {
start_sending_opt(senders, metadata).ok().unwrap()
}
/// For use by loaders in responding to a Load message.
pub fn start_sending_opt(start_chan: Sender<LoadResponse>, metadata: Metadata) -> Result<Sender<ProgressMsg>, ()> {
pub fn start_sending_opt(senders: ResponseSenders, metadata: Metadata) -> Result<Sender<ProgressMsg>, ()> {
let (progress_chan, progress_port) = channel();
let result = start_chan.send_opt(LoadResponse {
let result = senders.immediate_consumer.send_opt(TargetedLoadResponse {
load_response: LoadResponse {
metadata: metadata,
progress_port: progress_port,
},
consumer: senders.eventual_consumer
});
match result {
Ok(_) => Ok(progress_chan),
@ -148,7 +165,7 @@ pub fn start_sending_opt(start_chan: Sender<LoadResponse>, metadata: Metadata) -
pub fn load_whole_resource(resource_task: &ResourceTask, url: Url)
-> Result<(Metadata, Vec<u8>), String> {
let (start_chan, start_port) = channel();
resource_task.send(Load(LoadData::new(url), start_chan));
resource_task.send(Load(LoadData::new(url, start_chan)));
let response = start_port.recv();
let mut buf = vec!();
@ -167,8 +184,9 @@ pub type ResourceTask = Sender<ControlMsg>;
/// Create a ResourceTask
pub fn new_resource_task(user_agent: Option<String>) -> ResourceTask {
let (setup_chan, setup_port) = channel();
let sniffer_task = sniffer_task::new_sniffer_task();
spawn_named("ResourceManager", proc() {
ResourceManager::new(setup_port, user_agent).start();
ResourceManager::new(setup_port, user_agent, sniffer_task).start();
});
setup_chan
}
@ -176,13 +194,15 @@ pub fn new_resource_task(user_agent: Option<String>) -> ResourceTask {
struct ResourceManager {
from_client: Receiver<ControlMsg>,
user_agent: Option<String>,
sniffer_task: SnifferTask,
}
impl ResourceManager {
fn new(from_client: Receiver<ControlMsg>, user_agent: Option<String>) -> ResourceManager {
fn new(from_client: Receiver<ControlMsg>, user_agent: Option<String>, sniffer_task: SnifferTask) -> ResourceManager {
ResourceManager {
from_client: from_client,
user_agent: user_agent,
sniffer_task: sniffer_task,
}
}
}
@ -192,8 +212,8 @@ impl ResourceManager {
fn start(&self) {
loop {
match self.from_client.recv() {
Load(load_data, start_chan) => {
self.load(load_data, start_chan)
Load(load_data) => {
self.load(load_data)
}
Exit => {
break
@ -202,15 +222,13 @@ impl ResourceManager {
}
}
fn load(&self, load_data: LoadData, start_chan: Sender<LoadResponse>) {
fn load(&self, load_data: LoadData) {
let mut load_data = load_data;
load_data.headers.user_agent = self.user_agent.clone();
// Create new communication channel, create new sniffer task,
// send all the data to the new sniffer task with the send
// end of the pipe, receive all the data.
let sniffer_task = sniffer_task::new_sniffer_task(start_chan.clone());
let senders = ResponseSenders {
immediate_consumer: self.sniffer_task.clone(),
eventual_consumer: load_data.consumer.clone(),
};
let loader = match load_data.url.scheme.as_slice() {
"file" => file_loader::factory,
@ -219,21 +237,21 @@ impl ResourceManager {
"about" => about_loader::factory,
_ => {
debug!("resource_task: no loader for scheme {:s}", load_data.url.scheme);
start_sending(start_chan, Metadata::default(load_data.url))
start_sending(senders, Metadata::default(load_data.url))
.send(Done(Err("no loader for scheme".to_string())));
return
}
};
debug!("resource_task: loading url: {:s}", load_data.url.serialize());
loader(load_data, sniffer_task);
loader(load_data, self.sniffer_task.clone());
}
}
/// Load a URL asynchronously and iterate over chunks of bytes from the response.
pub fn load_bytes_iter(resource_task: &ResourceTask, url: Url) -> (Metadata, ProgressMsgPortIterator) {
let (input_chan, input_port) = channel();
resource_task.send(Load(LoadData::new(url), input_chan));
resource_task.send(Load(LoadData::new(url, input_chan)));
let response = input_port.recv();
let iter = ProgressMsgPortIterator { progress_port: response.progress_port };
@ -269,7 +287,7 @@ fn test_bad_scheme() {
let resource_task = new_resource_task(None);
let (start_chan, start) = channel();
let url = Url::parse("bogus://whatever").unwrap();
resource_task.send(Load(LoadData::new(url), start_chan));
resource_task.send(Load(LoadData::new(url, start_chan)));
let response = start.recv();
match response.progress_port.recv() {
Done(result) => { assert!(result.is_err()) }

View file

@ -5,25 +5,25 @@
//! A task that sniffs data
use std::comm::{channel, Receiver, Sender, Disconnected};
use std::task::TaskBuilder;
use resource_task::{LoadResponse};
use resource_task::{TargetedLoadResponse};
pub type SnifferTask = Sender<LoadResponse>;
pub type SnifferTask = Sender<TargetedLoadResponse>;
pub fn new_sniffer_task(next_rx: Sender<LoadResponse>) -> SnifferTask {
let (sen, rec) = channel();
pub fn new_sniffer_task() -> SnifferTask {
let(sen, rec) = channel();
let builder = TaskBuilder::new().named("SnifferManager");
builder.spawn(proc() {
SnifferManager::new(rec).start(next_rx);
SnifferManager::new(rec).start();
});
sen
}
struct SnifferManager {
data_receiver: Receiver<LoadResponse>,
data_receiver: Receiver<TargetedLoadResponse>,
}
impl SnifferManager {
fn new(data_receiver: Receiver<LoadResponse>) -> SnifferManager {
fn new(data_receiver: Receiver <TargetedLoadResponse>) -> SnifferManager {
SnifferManager {
data_receiver: data_receiver,
}
@ -31,11 +31,11 @@ impl SnifferManager {
}
impl SnifferManager {
fn start(self, next_rx: Sender<LoadResponse>) {
fn start(self) {
loop {
match self.data_receiver.try_recv() {
Ok(snif_data) => {
let result = next_rx.send_opt(snif_data);
let result = snif_data.consumer.send_opt(snif_data.load_response);
if result.is_err() {
break;
}

View file

@ -44,7 +44,7 @@ use js::jsval::{JSVal, NullValue, UndefinedValue};
use libc;
use libc::c_void;
use net::resource_task::{ResourceTask, ResourceCORSData, Load, LoadData, Payload, Done};
use net::resource_task::{ResourceTask, ResourceCORSData, Load, LoadData, LoadResponse, Payload, Done};
use cors::{allow_cross_origin_request, CORSRequest, CORSMode, ForcedPreflightMode};
use script_task::{ScriptChan, XHRProgressMsg, XHRReleaseMsg};
use servo_util::str::DOMString;
@ -207,7 +207,8 @@ impl XMLHttpRequest {
fn fetch(fetch_type: &SyncOrAsync, resource_task: ResourceTask,
mut load_data: LoadData, terminate_receiver: Receiver<TerminateReason>,
cors_request: Result<Option<CORSRequest>,()>, gen_id: GenerationId) -> ErrorResult {
cors_request: Result<Option<CORSRequest>,()>, gen_id: GenerationId,
start_port: Receiver<LoadResponse>) -> ErrorResult {
fn notify_partial_progress(fetch_type: &SyncOrAsync, msg: XHRProgress) {
match *fetch_type {
@ -277,8 +278,7 @@ impl XMLHttpRequest {
}
// Step 10, 13
let (start_chan, start_port) = channel();
resource_task.send(Load(load_data, start_chan));
resource_task.send(Load(load_data));
let progress_port;
@ -557,7 +557,8 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> {
let global = self.global.root();
let resource_task = global.root_ref().resource_task();
let mut load_data = LoadData::new(self.request_url.borrow().clone().unwrap());
let (start_chan, start_port) = channel();
let mut load_data = LoadData::new(self.request_url.borrow().clone().unwrap(), start_chan);
load_data.data = extracted;
// Default headers
@ -620,7 +621,7 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> {
let gen_id = self.generation_id.get();
if self.sync.get() {
return XMLHttpRequest::fetch(&mut Sync(self), resource_task, load_data,
terminate_receiver, cors_request, gen_id);
terminate_receiver, cors_request, gen_id, start_port);
} else {
self.fetch_time.set(time::now().to_timespec().sec);
let script_chan = global.root_ref().script_chan().clone();
@ -638,7 +639,8 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> {
load_data,
terminate_receiver,
cors_request,
gen_id);
gen_id,
start_port);
let ScriptChan(ref chan) = script_chan;
chan.send(XHRReleaseMsg(addr));
});

View file

@ -200,13 +200,13 @@ pub fn parse_html(page: &Page,
InputUrl(ref url) => {
// Wait for the LoadResponse so that the parser knows the final URL.
let (input_chan, input_port) = channel();
let mut load_data = LoadData::new(url.clone());
let mut load_data = LoadData::new(url.clone(), input_chan);
msg_load_data.map(|m| {
load_data.headers = m.headers;
load_data.method = m.method;
load_data.data = m.data;
});
resource_task.send(Load(load_data, input_chan));
resource_task.send(Load(load_data));
let load_response = input_port.recv();