Auto merge of #6586 - pcwalton:resource-task-ipc, r=jdm

script: Make the resource task communication use IPC channels.

This change makes Servo use serialized messages over IPC channels for resource loading. The goal is to make it easier to make Servo multiprocess in the future. This patch does not make Servo multiprocess now; there are many other channels that need to be changed to IPC before that can happen. It does introduce a dependency on https://github.com/serde-rs/serde and https://github.com/pcwalton/ipc-channel for the first time.

At the moment, `ipc-channel` uses JSON for serialization. This is because serde does not yet have official support for bincode. When serde gains support for bincode, I'll switch to that. For now, however, the JSON encoding and decoding will constitute a significant performance regression in resource loading.

To avoid having to send boxed `AsyncResponseTarget` trait objects across process boundaries, this series of commits changes `AsyncResponseTarget` to wrap a sender only. It is then the client's responsibility to spawn a thread to proxy calls from that sender to the consumer of the resource data. This only had to be done in a few places. In the future, we may want to collapse those threads into one per process to reduce overhead. (It is impossible to continue to use `AsyncResponseTarget` as a boxed trait object across processes, regardless of how much work is done on `ipc-channel`. Vtables are fundamentally incompatible with IPC across mutually untrusting processes.)

In general, I was pretty pleased with how this turned out. The main changes are adding serialization functionality to various objects that `serde` does not know how to serialize natively—the most complicated being Hyper objects—and reworking `AsyncResponseTarget`. The overall structure of the code is unchanged, and other than `AsyncResponseTarget` no functionality was lost in moving to serialization and IPC.

r? @jdm

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/servo/servo/6586)
<!-- Reviewable:end -->
This commit is contained in:
bors-servo 2015-07-31 16:06:36 -06:00
commit 2eb122f394
36 changed files with 232 additions and 129 deletions

View file

@ -76,7 +76,7 @@ rustc-serialize = "0.3"
libc = "0.1"
unicase = "0.1"
num = "0.1.24"
websocket = "0.12"
websocket = "0.12.0"
uuid = "0.1.16"
smallvec = "0.1"
html5ever = { version = "0.2.1", features = ["unstable"] }

View file

@ -11,7 +11,7 @@
use network_listener::{NetworkListener, PreInvoke};
use script_task::ScriptChan;
use net_traits::{AsyncResponseTarget, AsyncResponseListener, ResponseAction, Metadata};
use net_traits::{AsyncResponseListener, ResponseAction, Metadata};
use std::ascii::AsciiExt;
use std::borrow::ToOwned;
@ -144,7 +144,7 @@ impl CORSRequest {
let mut context = listener.context.lock();
let context = context.as_mut().unwrap();
*context.response.borrow_mut() = Some(response);
listener.invoke_with_listener(ResponseAction::ResponseComplete(Ok(())));
listener.notify(ResponseAction::ResponseComplete(Ok(())));
});
}

View file

@ -9,6 +9,7 @@ use script_task::{ScriptMsg, ScriptChan};
use msg::constellation_msg::{PipelineId};
use net_traits::{Metadata, load_whole_resource, ResourceTask, PendingAsyncLoad};
use net_traits::AsyncResponseTarget;
use std::sync::Arc;
use url::Url;
#[derive(JSTraceable, PartialEq, Clone, Debug)]
@ -34,7 +35,9 @@ impl LoadType {
#[derive(JSTraceable)]
pub struct DocumentLoader {
pub resource_task: ResourceTask,
/// We use an `Arc<ResourceTask>` here in order to avoid file descriptor exhaustion when there
/// are lots of iframes.
pub resource_task: Arc<ResourceTask>,
notifier_data: Option<NotifierData>,
blocking_loads: Vec<LoadType>,
}
@ -50,7 +53,9 @@ impl DocumentLoader {
DocumentLoader::new_with_task(existing.resource_task.clone(), None, None)
}
pub fn new_with_task(resource_task: ResourceTask,
/// We use an `Arc<ResourceTask>` here in order to avoid file descriptor exhaustion when there
/// are lots of iframes.
pub fn new_with_task(resource_task: Arc<ResourceTask>,
data: Option<NotifierData>,
initial_load: Option<Url>,)
-> DocumentLoader {
@ -69,11 +74,11 @@ impl DocumentLoader {
let url = load.url().clone();
self.blocking_loads.push(load);
let pipeline = self.notifier_data.as_ref().map(|data| data.pipeline);
PendingAsyncLoad::new(self.resource_task.clone(), url, pipeline)
PendingAsyncLoad::new((*self.resource_task).clone(), url, pipeline)
}
/// Create and initiate a new network request.
pub fn load_async(&mut self, load: LoadType, listener: Box<AsyncResponseTarget + Send>) {
pub fn load_async(&mut self, load: LoadType, listener: AsyncResponseTarget) {
let pending = self.prepare_async_load(load);
pending.load_async(listener)
}

View file

@ -116,7 +116,7 @@ impl<'a> GlobalRef<'a> {
let doc = window.Document();
let doc = doc.r();
let loader = doc.loader();
loader.resource_task.clone()
(*loader.resource_task).clone()
}
GlobalRef::Worker(ref worker) => worker.resource_task().clone(),
}

View file

@ -45,7 +45,7 @@ use euclid::size::Size2D;
use html5ever::tree_builder::QuirksMode;
use hyper::header::Headers;
use hyper::method::Method;
use ipc_channel::ipc::IpcSender;
use ipc_channel::ipc::{IpcReceiver, IpcSender};
use js::jsapi::{JSObject, JSTracer, JSGCTraceKind, JS_CallValueTracer, JS_CallObjectTracer, GCTraceKindToAscii, Heap};
use js::jsapi::JS_CallUnbarrieredObjectTracer;
use js::jsval::JSVal;
@ -57,6 +57,7 @@ use net_traits::image_cache_task::{ImageCacheChan, ImageCacheTask};
use net_traits::storage_task::StorageType;
use script_traits::ScriptControlChan;
use script_traits::UntrustedNodeAddress;
use serde::{Serialize, Deserialize};
use smallvec::SmallVec;
use msg::compositor_msg::ScriptListener;
use msg::constellation_msg::ConstellationChan;
@ -64,7 +65,6 @@ use net_traits::image::base::Image;
use profile_traits::mem::ProfilerChan;
use util::str::{LengthOrPercentageOrAuto};
use selectors::parser::PseudoElement;
use serde::{Deserialize, Serialize};
use std::cell::{Cell, UnsafeCell, RefCell};
use std::collections::{HashMap, HashSet};
use std::collections::hash_state::HashState;
@ -358,7 +358,15 @@ impl JSTraceable for Box<LayoutRPC+'static> {
impl JSTraceable for () {
#[inline]
fn trace(&self, _trc: *mut JSTracer) {
fn trace(&self, _: *mut JSTracer) {
// Do nothing
}
}
impl<T> JSTraceable for IpcReceiver<T> where T: Deserialize + Serialize {
#[inline]
fn trace(&self, _: *mut JSTracer) {
// Do nothing
}
}

View file

@ -181,8 +181,8 @@ impl DedicatedWorkerGlobalScope {
let serialized_url = url.serialize();
let parent_sender_for_reporter = parent_sender.clone();
let global = DedicatedWorkerGlobalScope::new(
url, id, mem_profiler_chan.clone(), devtools_chan, runtime.clone(), resource_task,
constellation_chan, parent_sender, own_sender, receiver);
url, id, mem_profiler_chan.clone(), devtools_chan, runtime.clone(),
resource_task, constellation_chan, parent_sender, own_sender, receiver);
// FIXME(njn): workers currently don't have a unique ID suitable for using in reporter
// registration (#6631), so we instead use a random number and cross our fingers.
let reporter_name = format!("worker-reporter-{}", random::<u64>());

View file

@ -83,6 +83,7 @@ use layout_interface::{ReflowGoal, ReflowQueryType};
use euclid::point::Point2D;
use html5ever::tree_builder::{QuirksMode, NoQuirks, LimitedQuirks, Quirks};
use ipc_channel::ipc;
use layout_interface::{LayoutChan, Msg};
use string_cache::{Atom, QualName};
use url::Url;
@ -283,7 +284,7 @@ pub trait DocumentHelpers<'a> {
/// https://w3c.github.io/animation-timing/#dfn-invoke-callbacks-algorithm
fn invoke_animation_callbacks(self);
fn prepare_async_load(self, load: LoadType) -> PendingAsyncLoad;
fn load_async(self, load: LoadType, listener: Box<AsyncResponseTarget + Send>);
fn load_async(self, load: LoadType, listener: AsyncResponseTarget);
fn load_sync(self, load: LoadType) -> Result<(Metadata, Vec<u8>), String>;
fn finish_load(self, load: LoadType);
fn set_current_parser(self, script: Option<&ServoHTMLParser>);
@ -968,7 +969,7 @@ impl<'a> DocumentHelpers<'a> for &'a Document {
loader.prepare_async_load(load)
}
fn load_async(self, load: LoadType, listener: Box<AsyncResponseTarget + Send>) {
fn load_async(self, load: LoadType, listener: AsyncResponseTarget) {
let mut loader = self.loader.borrow_mut();
loader.load_async(load, listener)
}
@ -1720,7 +1721,7 @@ impl<'a> DocumentMethods for &'a Document {
return Err(Security);
}
let window = self.window.root();
let (tx, rx) = channel();
let (tx, rx) = ipc::channel().unwrap();
let _ = window.r().resource_task().send(GetCookiesForUrl(url, tx, NonHTTP));
let cookies = rx.recv().unwrap();
Ok(cookies.unwrap_or("".to_owned()))

View file

@ -40,7 +40,9 @@ use js::jsval::UndefinedValue;
use encoding::all::UTF_8;
use encoding::label::encoding_from_whatwg_label;
use encoding::types::{Encoding, EncodingRef, DecoderTrap};
use net_traits::{Metadata, AsyncResponseListener};
use ipc_channel::ipc;
use ipc_channel::router::ROUTER;
use net_traits::{Metadata, AsyncResponseListener, AsyncResponseTarget};
use util::str::{DOMString, HTML_SPACE_CHARACTERS, StaticStringVec};
use html5ever::tree_builder::NextParserState;
use std::cell::{RefCell, Cell};
@ -330,12 +332,19 @@ impl<'a> HTMLScriptElementHelpers for &'a HTMLScriptElement {
url: url.clone(),
}));
let (action_sender, action_receiver) = ipc::channel().unwrap();
let listener = box NetworkListener {
context: context,
script_chan: script_chan,
};
let response_target = AsyncResponseTarget {
sender: action_sender,
};
ROUTER.add_route(action_receiver.to_opaque(), box move |message| {
listener.notify(message.to().unwrap());
});
doc.r().load_async(LoadType::Script(url), listener);
doc.r().load_async(LoadType::Script(url), response_target);
if self.parser_inserted.get() {
doc.r().get_current_parser().unwrap().r().suspend();

View file

@ -74,8 +74,9 @@ use std::default::Default;
use std::ffi::CString;
use std::mem as std_mem;
use std::rc::Rc;
use std::sync::mpsc::{channel, Receiver};
use std::sync::Arc;
use std::sync::mpsc::TryRecvError::{Empty, Disconnected};
use std::sync::mpsc::{channel, Receiver};
use time;
/// Current state of the window object
@ -173,7 +174,7 @@ pub struct Window {
window_size: Cell<Option<WindowSizeData>>,
/// Associated resource task for use by DOM objects like XMLHttpRequest
resource_task: ResourceTask,
resource_task: Arc<ResourceTask>,
/// A handle for communicating messages to the storage task.
storage_task: StorageTask,
@ -883,7 +884,7 @@ impl<'a> WindowHelpers for &'a Window {
}
fn resource_task(self) -> ResourceTask {
self.resource_task.clone()
(*self.resource_task).clone()
}
fn mem_profiler_chan(self) -> mem::ProfilerChan {
@ -1035,7 +1036,7 @@ impl Window {
control_chan: ScriptControlChan,
compositor: ScriptListener,
image_cache_task: ImageCacheTask,
resource_task: ResourceTask,
resource_task: Arc<ResourceTask>,
storage_task: StorageTask,
mem_profiler_chan: mem::ProfilerChan,
devtools_chan: Option<IpcSender<ScriptToDevtoolsControlMsg>>,

View file

@ -46,12 +46,14 @@ use js::jsval::{JSVal, NullValue, UndefinedValue};
use net_traits::ControlMsg::Load;
use net_traits::{ResourceTask, ResourceCORSData, LoadData, LoadConsumer};
use net_traits::{AsyncResponseListener, Metadata};
use net_traits::{AsyncResponseListener, AsyncResponseTarget, Metadata};
use cors::{allow_cross_origin_request, CORSRequest, RequestMode, AsyncCORSResponseListener};
use cors::CORSResponse;
use util::str::DOMString;
use util::task::spawn_named;
use ipc_channel::ipc;
use ipc_channel::router::ROUTER;
use std::ascii::AsciiExt;
use std::borrow::ToOwned;
use std::cell::{RefCell, Cell};
@ -270,11 +272,18 @@ impl XMLHttpRequest {
}
}
let (action_sender, action_receiver) = ipc::channel().unwrap();
let listener = box NetworkListener {
context: context,
script_chan: script_chan,
};
resource_task.send(Load(load_data, LoadConsumer::Listener(listener))).unwrap();
let response_target = AsyncResponseTarget {
sender: action_sender,
};
ROUTER.add_route(action_receiver.to_opaque(), box move |message| {
listener.notify(message.to().unwrap());
});
resource_task.send(Load(load_data, LoadConsumer::Listener(response_target))).unwrap();
}
}
@ -558,8 +567,11 @@ impl<'a> XMLHttpRequestMethods for &'a XMLHttpRequest {
};
let mut combined_headers = load_data.headers.clone();
combined_headers.extend(load_data.preserved_headers.iter());
let cors_request = CORSRequest::maybe_new(referer_url.clone(), load_data.url.clone(), mode,
load_data.method.clone(), combined_headers);
let cors_request = CORSRequest::maybe_new(referer_url.clone(),
load_data.url.clone(),
mode,
load_data.method.clone(),
combined_headers);
match cors_request {
Ok(None) => {
let mut buf = String::new();
@ -781,7 +793,8 @@ impl<'a> PrivateXMLHttpRequestHelpers for &'a XMLHttpRequest {
};
// XXXManishearth Clear cache entries in case of a network error
self.process_partial_response(XHRProgress::HeadersReceived(gen_id,
metadata.headers, metadata.status));
metadata.headers,
metadata.status));
Ok(())
}

View file

@ -3,7 +3,7 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use script_task::{ScriptChan, ScriptMsg, Runnable};
use net_traits::{AsyncResponseTarget, AsyncResponseListener, ResponseAction};
use net_traits::{AsyncResponseListener, ResponseAction};
use std::sync::{Arc, Mutex};
/// An off-thread sink for async network event runnables. All such events are forwarded to
@ -13,12 +13,14 @@ pub struct NetworkListener<T: AsyncResponseListener + PreInvoke + Send + 'static
pub script_chan: Box<ScriptChan+Send>,
}
impl<T: AsyncResponseListener + PreInvoke + Send + 'static> AsyncResponseTarget for NetworkListener<T> {
fn invoke_with_listener(&self, action: ResponseAction) {
self.script_chan.send(ScriptMsg::RunnableMsg(box ListenerRunnable {
impl<T: AsyncResponseListener + PreInvoke + Send + 'static> NetworkListener<T> {
pub fn notify(&self, action: ResponseAction) {
if let Err(err) = self.script_chan.send(ScriptMsg::RunnableMsg(box ListenerRunnable {
context: self.context.clone(),
action: action,
})).unwrap();
})) {
warn!("failed to deliver network data: {:?}", err);
}
}
}

View file

@ -67,8 +67,8 @@ use msg::constellation_msg::{LoadData, PipelineId, SubpageId, MozBrowserEvent, W
use msg::constellation_msg::{Failure, WindowSizeData, PipelineExitType};
use msg::constellation_msg::Msg as ConstellationMsg;
use msg::webdriver_msg::WebDriverScriptCommand;
use net_traits::{ResourceTask, LoadConsumer, ControlMsg, Metadata};
use net_traits::LoadData as NetLoadData;
use net_traits::{AsyncResponseTarget, ResourceTask, LoadConsumer, ControlMsg, Metadata};
use net_traits::image_cache_task::{ImageCacheChan, ImageCacheTask, ImageCacheResult};
use net_traits::storage_task::StorageTask;
use profile_traits::mem::{self, Report, Reporter, ReporterRequest, ReportKind, ReportsChan};
@ -287,8 +287,9 @@ pub struct ScriptTask {
incomplete_loads: DOMRefCell<Vec<InProgressLoad>>,
/// A handle to the image cache task.
image_cache_task: ImageCacheTask,
/// A handle to the resource task.
resource_task: ResourceTask,
/// A handle to the resource task. This is an `Arc` to avoid running out of file descriptors if
/// there are many iframes.
resource_task: Arc<ResourceTask>,
/// A handle to the storage task.
storage_task: StorageTask,
@ -418,7 +419,7 @@ impl ScriptTaskFactory for ScriptTask {
control_chan,
control_port,
constellation_chan,
resource_task,
Arc::new(resource_task),
storage_task,
image_cache_task,
mem_profiler_chan.clone(),
@ -504,7 +505,7 @@ impl ScriptTask {
control_chan: ScriptControlChan,
control_port: Receiver<ConstellationControlMsg>,
constellation_chan: ConstellationChan,
resource_task: ResourceTask,
resource_task: Arc<ResourceTask>,
storage_task: StorageTask,
image_cache_task: ImageCacheTask,
mem_profiler_chan: mem::ProfilerChan,
@ -1415,7 +1416,9 @@ impl ScriptTask {
});
let content_type = match metadata.content_type {
Some(ContentType(Mime(TopLevel::Text, SubLevel::Plain, _))) => Some("text/plain".to_owned()),
Some(ContentType(Mime(TopLevel::Text, SubLevel::Plain, _))) => {
Some("text/plain".to_owned())
}
_ => None
};
@ -1680,10 +1683,17 @@ impl ScriptTask {
let context = Arc::new(Mutex::new(ParserContext::new(id, subpage, script_chan.clone(),
load_data.url.clone())));
let (action_sender, action_receiver) = ipc::channel().unwrap();
let listener = box NetworkListener {
context: context,
script_chan: script_chan.clone(),
};
ROUTER.add_route(action_receiver.to_opaque(), box move |message| {
listener.notify(message.to().unwrap());
});
let response_target = AsyncResponseTarget {
sender: action_sender,
};
if load_data.url.scheme == "javascript" {
load_data.url = Url::parse("about:blank").unwrap();
@ -1697,7 +1707,7 @@ impl ScriptTask {
data: load_data.data,
cors: None,
pipeline_id: Some(id),
}, LoadConsumer::Listener(listener))).unwrap();
}, LoadConsumer::Listener(response_target))).unwrap();
self.incomplete_loads.borrow_mut().push(incomplete);
}