net: Use a thread for each AsyncResponseTarget to avoid having to send

trait objects across process boundaries.
This commit is contained in:
Patrick Walton 2015-07-09 16:50:06 -07:00
parent 9c9d7dc93b
commit 44d13f7fd4
10 changed files with 78 additions and 31 deletions

View file

@ -14,6 +14,7 @@ use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::mem;
use std::sync::Arc;
use std::sync::mpsc::{channel, Sender, Receiver, Select};
use std::thread;
use util::resource_files::resources_dir_path;
use util::task::spawn_named;
use util::taskpool::TaskPool;
@ -100,14 +101,17 @@ struct ResourceLoadInfo {
struct ResourceListener {
url: Url,
sender: Sender<ResourceLoadInfo>,
receiver: Receiver<ResponseAction>,
}
impl AsyncResponseTarget for ResourceListener {
fn invoke_with_listener(&self, action: ResponseAction) {
self.sender.send(ResourceLoadInfo {
action: action,
url: self.url.clone(),
}).unwrap();
impl ResourceListener {
fn run(&self) {
while let Ok(action) = self.receiver.recv() {
self.sender.send(ResourceLoadInfo {
action: action,
url: self.url.clone(),
}).unwrap();
}
}
}
@ -330,11 +334,17 @@ impl ImageCache {
e.insert(pending_load);
let load_data = LoadData::new(url.clone(), None);
let (action_sender, action_receiver) = channel();
let listener = box ResourceListener {
url: url,
sender: self.progress_sender.clone(),
receiver: action_receiver,
};
let msg = ControlMsg::Load(load_data, LoadConsumer::Listener(listener));
let msg = ControlMsg::Load(load_data,
LoadConsumer::Listener(AsyncResponseTarget {
sender: action_sender,
}));
thread::spawn(move || listener.run());
self.resource_task.send(msg).unwrap();
}
}

View file

@ -69,7 +69,7 @@ pub fn global_init() {
pub enum ProgressSender {
Channel(Sender<ProgressMsg>),
Listener(Box<AsyncResponseTarget>),
Listener(AsyncResponseTarget),
}
impl ProgressSender {

View file

@ -114,14 +114,20 @@ impl ResponseAction {
/// A target for async networking events. Commonly used to dispatch a runnable event to another
/// thread storing the wrapped closure for later execution.
pub trait AsyncResponseTarget {
fn invoke_with_listener(&self, action: ResponseAction);
pub struct AsyncResponseTarget {
pub sender: Sender<ResponseAction>,
}
impl AsyncResponseTarget {
pub fn invoke_with_listener(&self, action: ResponseAction) {
self.sender.send(action).unwrap()
}
}
/// A wrapper for a network load that can either be channel or event-based.
pub enum LoadConsumer {
Channel(Sender<LoadResponse>),
Listener(Box<AsyncResponseTarget + Send>),
Listener(AsyncResponseTarget),
}
/// Handle to a resource task
@ -195,7 +201,7 @@ impl PendingAsyncLoad {
}
/// Initiate the network request associated with this pending load, using the provided target.
pub fn load_async(mut self, listener: Box<AsyncResponseTarget + Send>) {
pub fn load_async(mut self, listener: AsyncResponseTarget) {
self.guard.neuter();
let load_data = LoadData::new(self.url, self.pipeline);
let consumer = LoadConsumer::Listener(listener);

View file

@ -17,6 +17,7 @@ use net_traits::{SerializableStringResult};
use std::ascii::AsciiExt;
use std::borrow::ToOwned;
use std::cell::RefCell;
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use time;
use time::{now, Timespec};
@ -132,9 +133,14 @@ impl CORSRequest {
listener: listener,
response: RefCell::new(None),
};
let (action_sender, action_receiver) = mpsc::channel();
let listener = NetworkListener {
context: Arc::new(Mutex::new(context)),
script_chan: script_chan,
receiver: action_receiver,
};
let response_target = AsyncResponseTarget {
sender: action_sender,
};
// TODO: this exists only to make preflight check non-blocking
@ -145,7 +151,7 @@ impl CORSRequest {
let mut context = listener.context.lock();
let context = context.as_mut().unwrap();
*context.response.borrow_mut() = Some(response);
listener.invoke_with_listener(ResponseAction::ResponseComplete(
response_target.invoke_with_listener(ResponseAction::ResponseComplete(
SerializableStringResult(Ok(()))));
});
}

View file

@ -73,7 +73,7 @@ impl DocumentLoader {
}
/// Create and initiate a new network request.
pub fn load_async(&mut self, load: LoadType, listener: Box<AsyncResponseTarget + Send>) {
pub fn load_async(&mut self, load: LoadType, listener: AsyncResponseTarget) {
let pending = self.prepare_async_load(load);
pending.load_async(listener)
}

View file

@ -283,7 +283,7 @@ pub trait DocumentHelpers<'a> {
/// https://w3c.github.io/animation-timing/#dfn-invoke-callbacks-algorithm
fn invoke_animation_callbacks(self);
fn prepare_async_load(self, load: LoadType) -> PendingAsyncLoad;
fn load_async(self, load: LoadType, listener: Box<AsyncResponseTarget + Send>);
fn load_async(self, load: LoadType, listener: AsyncResponseTarget);
fn load_sync(self, load: LoadType) -> Result<(Metadata, Vec<u8>), String>;
fn finish_load(self, load: LoadType);
fn set_current_parser(self, script: Option<&ServoHTMLParser>);
@ -968,7 +968,7 @@ impl<'a> DocumentHelpers<'a> for &'a Document {
loader.prepare_async_load(load)
}
fn load_async(self, load: LoadType, listener: Box<AsyncResponseTarget + Send>) {
fn load_async(self, load: LoadType, listener: AsyncResponseTarget) {
let mut loader = self.loader.borrow_mut();
loader.load_async(load, listener)
}

View file

@ -40,12 +40,14 @@ use js::jsval::UndefinedValue;
use encoding::all::UTF_8;
use encoding::label::encoding_from_whatwg_label;
use encoding::types::{Encoding, EncodingRef, DecoderTrap};
use net_traits::{Metadata, AsyncResponseListener};
use net_traits::{Metadata, AsyncResponseListener, AsyncResponseTarget};
use util::str::{DOMString, HTML_SPACE_CHARACTERS, StaticStringVec};
use html5ever::tree_builder::NextParserState;
use std::cell::{RefCell, Cell};
use std::mem;
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread;
use string_cache::Atom;
use url::{Url, UrlParser};
@ -330,12 +332,18 @@ impl<'a> HTMLScriptElementHelpers for &'a HTMLScriptElement {
url: url.clone(),
}));
let (action_sender, action_receiver) = mpsc::channel();
let listener = box NetworkListener {
context: context,
script_chan: script_chan,
receiver: action_receiver,
};
let response_target = AsyncResponseTarget {
sender: action_sender,
};
thread::spawn(move || listener.run());
doc.r().load_async(LoadType::Script(url), listener);
doc.r().load_async(LoadType::Script(url), response_target);
if self.parser_inserted.get() {
doc.r().get_current_parser().unwrap().r().suspend();

View file

@ -46,8 +46,8 @@ use js::jsval::{JSVal, NullValue, UndefinedValue};
use net_traits::ControlMsg::Load;
use net_traits::{ResourceTask, ResourceCORSData, LoadData, LoadConsumer};
use net_traits::{AsyncResponseListener, Metadata, SerializableHeaders, SerializableMethod};
use net_traits::{SerializableUrl};
use net_traits::{AsyncResponseListener, AsyncResponseTarget, Metadata, SerializableHeaders};
use net_traits::{SerializableMethod, SerializableUrl};
use cors::{allow_cross_origin_request, CORSRequest, RequestMode, AsyncCORSResponseListener};
use cors::CORSResponse;
use util::str::DOMString;
@ -59,7 +59,7 @@ use std::cell::{RefCell, Cell};
use std::default::Default;
use std::sync::{Mutex, Arc};
use std::sync::mpsc::{channel, Sender, TryRecvError};
use std::thread::sleep_ms;
use std::thread::{self, sleep_ms};
use time;
use url::{Url, UrlParser};
@ -271,11 +271,17 @@ impl XMLHttpRequest {
}
}
let (action_sender, action_receiver) = channel();
let listener = box NetworkListener {
context: context,
script_chan: script_chan,
receiver: action_receiver,
};
resource_task.send(Load(load_data, LoadConsumer::Listener(listener))).unwrap();
let response_target = AsyncResponseTarget {
sender: action_sender,
};
thread::spawn(move || listener.run());
resource_task.send(Load(load_data, LoadConsumer::Listener(response_target))).unwrap();
}
}

View file

@ -3,7 +3,8 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use script_task::{ScriptChan, ScriptMsg, Runnable};
use net_traits::{AsyncResponseTarget, AsyncResponseListener, ResponseAction};
use net_traits::{AsyncResponseListener, ResponseAction};
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};
/// An off-thread sink for async network event runnables. All such events are forwarded to
@ -11,14 +12,17 @@ use std::sync::{Arc, Mutex};
pub struct NetworkListener<T: AsyncResponseListener + PreInvoke + Send + 'static> {
pub context: Arc<Mutex<T>>,
pub script_chan: Box<ScriptChan+Send>,
pub receiver: Receiver<ResponseAction>,
}
impl<T: AsyncResponseListener + PreInvoke + Send + 'static> AsyncResponseTarget for NetworkListener<T> {
fn invoke_with_listener(&self, action: ResponseAction) {
self.script_chan.send(ScriptMsg::RunnableMsg(box ListenerRunnable {
context: self.context.clone(),
action: action,
})).unwrap();
impl<T: AsyncResponseListener + PreInvoke + Send + 'static> NetworkListener<T> {
pub fn run(&self) {
while let Ok(action) = self.receiver.recv() {
self.script_chan.send(ScriptMsg::RunnableMsg(box ListenerRunnable {
context: self.context.clone(),
action: action,
})).unwrap();
}
}
}

View file

@ -68,7 +68,7 @@ use msg::constellation_msg::{Failure, WindowSizeData, PipelineExitType};
use msg::constellation_msg::Msg as ConstellationMsg;
use msg::webdriver_msg::WebDriverScriptCommand;
use net_traits::LoadData as NetLoadData;
use net_traits::{ResourceTask, LoadConsumer, ControlMsg, Metadata};
use net_traits::{AsyncResponseTarget, ResourceTask, LoadConsumer, ControlMsg, Metadata};
use net_traits::{SerializableContentType, SerializableHeaders, SerializableMethod};
use net_traits::{SerializableUrl};
use net_traits::image_cache_task::{ImageCacheChan, ImageCacheTask, ImageCacheResult};
@ -105,6 +105,7 @@ use std::rc::Rc;
use std::result::Result;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Sender, Receiver, Select};
use std::thread;
use time::Tm;
use hyper::header::{ContentType, HttpDate};
@ -1686,9 +1687,15 @@ impl ScriptTask {
let context = Arc::new(Mutex::new(ParserContext::new(id, subpage, script_chan.clone(),
load_data.url.clone())));
let (action_sender, action_receiver) = channel();
let listener = box NetworkListener {
context: context,
script_chan: script_chan.clone(),
receiver: action_receiver,
};
thread::spawn(move || listener.run());
let response_target = AsyncResponseTarget {
sender: action_sender,
};
if load_data.url.scheme == "javascript" {
@ -1703,7 +1710,7 @@ impl ScriptTask {
data: load_data.data,
cors: None,
pipeline_id: Some(id),
}, LoadConsumer::Listener(listener))).unwrap();
}, LoadConsumer::Listener(response_target))).unwrap();
self.incomplete_loads.borrow_mut().push(incomplete);
}