continue messageport, transferable, postmessage options

This commit is contained in:
Gregory Terzian 2019-06-26 00:25:48 +08:00
parent c3b17c1201
commit 2f8932a6a1
100 changed files with 2456 additions and 1171 deletions

View file

@ -9,17 +9,21 @@ use crate::dom::bindings::codegen::Bindings::WorkerGlobalScopeBinding::WorkerGlo
use crate::dom::bindings::conversions::{root_from_object, root_from_object_static};
use crate::dom::bindings::error::{report_pending_exception, ErrorInfo};
use crate::dom::bindings::inheritance::Castable;
use crate::dom::bindings::refcounted::Trusted;
use crate::dom::bindings::reflector::DomObject;
use crate::dom::bindings::root::{DomRoot, MutNullableDom};
use crate::dom::bindings::settings_stack::{entry_global, incumbent_global, AutoEntryScript};
use crate::dom::bindings::str::DOMString;
use crate::dom::bindings::weakref::DOMTracker;
use crate::dom::bindings::structuredclone;
use crate::dom::bindings::weakref::{DOMTracker, WeakRef};
use crate::dom::crypto::Crypto;
use crate::dom::dedicatedworkerglobalscope::DedicatedWorkerGlobalScope;
use crate::dom::errorevent::ErrorEvent;
use crate::dom::event::{Event, EventBubbles, EventCancelable, EventStatus};
use crate::dom::eventsource::EventSource;
use crate::dom::eventtarget::EventTarget;
use crate::dom::messageevent::MessageEvent;
use crate::dom::messageport::MessagePort;
use crate::dom::paintworkletglobalscope::PaintWorkletGlobalScope;
use crate::dom::performance::Performance;
use crate::dom::window::Window;
@ -36,34 +40,40 @@ use crate::task_source::performance_timeline::PerformanceTimelineTaskSource;
use crate::task_source::port_message::PortMessageQueue;
use crate::task_source::remote_event::RemoteEventTaskSource;
use crate::task_source::websocket::WebsocketTaskSource;
use crate::task_source::TaskSource;
use crate::task_source::TaskSourceName;
use crate::timers::{IsInterval, OneshotTimerCallback, OneshotTimerHandle};
use crate::timers::{OneshotTimers, TimerCallback};
use content_security_policy::CspList;
use devtools_traits::{ScriptToDevtoolsControlMsg, WorkerId};
use dom_struct::dom_struct;
use ipc_channel::ipc::IpcSender;
use ipc_channel::ipc::{self, IpcSender};
use ipc_channel::router::ROUTER;
use js::glue::{IsWrapper, UnwrapObjectDynamic};
use js::jsapi::JSObject;
use js::jsapi::{CurrentGlobalOrNull, GetNonCCWObjectGlobal};
use js::jsapi::{HandleObject, Heap};
use js::jsapi::{JSAutoRealm, JSContext};
use js::jsval::UndefinedValue;
use js::panic::maybe_resume_unwind;
use js::rust::wrappers::EvaluateUtf8;
use js::rust::{get_object_class, CompileOptionsWrapper, ParentRuntime, Runtime};
use js::rust::{HandleValue, MutableHandleValue};
use js::{JSCLASS_IS_DOMJSCLASS, JSCLASS_IS_GLOBAL};
use msg::constellation_msg::PipelineId;
use msg::constellation_msg::{MessagePortId, MessagePortRouterId, PipelineId};
use net_traits::image_cache::ImageCache;
use net_traits::{CoreResourceThread, IpcSend, ResourceThreads};
use profile_traits::{mem as profile_mem, time as profile_time};
use script_traits::{MsDuration, ScriptToConstellationChan, TimerEvent};
use script_traits::transferable::MessagePortImpl;
use script_traits::{
MessagePortMsg, MsDuration, PortMessageTask, ScriptMsg, ScriptToConstellationChan, TimerEvent,
};
use script_traits::{TimerEventId, TimerSchedulerMsg, TimerSource};
use servo_url::{MutableOrigin, ServoUrl};
use std::borrow::Cow;
use std::cell::Cell;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::ffi::CString;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
@ -85,6 +95,9 @@ pub struct GlobalScope {
crypto: MutNullableDom<Crypto>,
next_worker_id: Cell<WorkerId>,
/// The message-port router id for this global, if it is managing ports.
message_port_state: DomRefCell<MessagePortState>,
/// Pipeline id associated with this global.
pipeline_id: PipelineId,
@ -168,6 +181,77 @@ pub struct GlobalScope {
user_agent: Cow<'static, str>,
}
/// A wrapper for glue-code between the ipc router and the event-loop.
struct MessageListener {
canceller: TaskCanceller,
task_source: PortMessageQueue,
context: Trusted<GlobalScope>,
}
/// Data representing a message-port managed by this global.
#[derive(JSTraceable, MallocSizeOf)]
pub enum ManagedMessagePort {
/// We keep ports pending when they are first transfer-received,
/// and only add them, and ask the constellation to complete the transfer,
/// in a subsequent task if the port hasn't been re-transfered.
Pending(MessagePortImpl, WeakRef<MessagePort>),
/// A port who was transferred into, or initially created in, this realm,
/// and that hasn't been re-transferred in the same task it was noted.
Added(MessagePortImpl, WeakRef<MessagePort>),
}
/// State representing whether this global is currently managing messageports.
#[derive(JSTraceable, MallocSizeOf)]
pub enum MessagePortState {
/// The message-port router id for this global, and a map of managed ports.
Managed(
MessagePortRouterId,
HashMap<MessagePortId, ManagedMessagePort>,
),
/// This global is not managing any ports at this time.
UnManaged,
}
impl MessageListener {
/// A new message came in, handle it via a task enqueued on the event-loop.
/// A task is required, since we are using a trusted globalscope,
/// and we can only access the root from the event-loop.
fn notify(&self, msg: MessagePortMsg) {
match msg {
MessagePortMsg::CompleteTransfer(port_id, tasks) => {
let context = self.context.clone();
let _ = self.task_source.queue_with_canceller(
task!(process_complete_transfer: move || {
let global = context.root();
global.complete_port_transfer(port_id, tasks);
}),
&self.canceller,
);
},
MessagePortMsg::NewTask(port_id, task) => {
let context = self.context.clone();
let _ = self.task_source.queue_with_canceller(
task!(process_new_task: move || {
let global = context.root();
global.route_task_to_port(port_id, task);
}),
&self.canceller,
);
},
MessagePortMsg::RemoveMessagePort(port_id) => {
let context = self.context.clone();
let _ = self.task_source.queue_with_canceller(
task!(process_remove_message_port: move || {
let global = context.root();
global.remove_message_port(&port_id);
}),
&self.canceller,
);
},
}
}
}
impl GlobalScope {
pub fn new_inherited(
pipeline_id: PipelineId,
@ -184,6 +268,7 @@ impl GlobalScope {
user_agent: Cow<'static, str>,
) -> Self {
Self {
message_port_state: DomRefCell::new(MessagePortState::UnManaged),
eventtarget: EventTarget::new_inherited(),
crypto: Default::default(),
next_worker_id: Cell::new(WorkerId(0)),
@ -209,6 +294,397 @@ impl GlobalScope {
}
}
/// Complete the transfer of a message-port.
fn complete_port_transfer(&self, port_id: MessagePortId, tasks: VecDeque<PortMessageTask>) {
let should_start = if let MessagePortState::Managed(_id, message_ports) =
&mut *self.message_port_state.borrow_mut()
{
match message_ports.get_mut(&port_id) {
None => {
panic!("CompleteTransfer msg received in a global not managing the port.");
},
Some(ManagedMessagePort::Pending(_, _)) => {
panic!("CompleteTransfer msg received for a pending port.");
},
Some(ManagedMessagePort::Added(port_impl, _port)) => {
port_impl.complete_transfer(tasks);
port_impl.enabled()
},
}
} else {
return warn!("CompleteTransfer msg received in a global not managing any ports.");
};
if should_start {
self.start_message_port(&port_id);
}
}
/// Update our state to un-managed,
/// and tell the constellation to drop the sender to our message-port router.
pub fn remove_message_ports_router(&self) {
if let MessagePortState::Managed(router_id, _message_ports) =
&*self.message_port_state.borrow()
{
let _ = self
.script_to_constellation_chan()
.send(ScriptMsg::RemoveMessagePortRouter(router_id.clone()));
}
*self.message_port_state.borrow_mut() = MessagePortState::UnManaged;
}
/// <https://html.spec.whatwg.org/multipage/#entangle>
pub fn entangle_ports(&self, port1: MessagePortId, port2: MessagePortId) {
if let MessagePortState::Managed(_id, message_ports) =
&mut *self.message_port_state.borrow_mut()
{
for (port_id, entangled_id) in &[(port1, port2), (port2, port1)] {
match message_ports.get_mut(&port_id) {
None => {
return warn!("entangled_ports called on a global not managing the port.");
},
Some(ManagedMessagePort::Pending(port_impl, dom_port)) => {
dom_port
.root()
.expect("Port to be entangled to not have been GC'ed")
.entangle(entangled_id.clone());
port_impl.entangle(entangled_id.clone());
},
Some(ManagedMessagePort::Added(port_impl, dom_port)) => {
dom_port
.root()
.expect("Port to be entangled to not have been GC'ed")
.entangle(entangled_id.clone());
port_impl.entangle(entangled_id.clone());
},
}
}
} else {
panic!("entangled_ports called on a global not managing any ports.");
}
let _ = self
.script_to_constellation_chan()
.send(ScriptMsg::EntanglePorts(port1, port2));
}
/// Remove all referrences to a port.
pub fn remove_message_port(&self, port_id: &MessagePortId) {
let is_empty = if let MessagePortState::Managed(_id, message_ports) =
&mut *self.message_port_state.borrow_mut()
{
match message_ports.remove(&port_id) {
None => panic!("remove_message_port called on a global not managing the port."),
Some(_) => message_ports.is_empty(),
}
} else {
return warn!("remove_message_port called on a global not managing any ports.");
};
if is_empty {
// Remove our port router,
// it will be setup again if we start managing ports again.
self.remove_message_ports_router();
}
}
/// Handle the transfer of a port in the current task.
pub fn mark_port_as_transferred(&self, port_id: &MessagePortId) -> MessagePortImpl {
if let MessagePortState::Managed(_id, message_ports) =
&mut *self.message_port_state.borrow_mut()
{
let mut port = match message_ports.remove(&port_id) {
None => {
panic!("mark_port_as_transferred called on a global not managing the port.")
},
Some(ManagedMessagePort::Pending(port_impl, _)) => port_impl,
Some(ManagedMessagePort::Added(port_impl, _)) => port_impl,
};
port.set_has_been_shipped();
let _ = self
.script_to_constellation_chan()
.send(ScriptMsg::MessagePortShipped(port_id.clone()));
port
} else {
panic!("mark_port_as_transferred called on a global not managing any ports.");
}
}
/// <https://html.spec.whatwg.org/multipage/#dom-messageport-start>
pub fn start_message_port(&self, port_id: &MessagePortId) {
if let MessagePortState::Managed(_id, message_ports) =
&mut *self.message_port_state.borrow_mut()
{
let port = match message_ports.get_mut(&port_id) {
None => panic!("start_message_port called on a unknown port."),
Some(ManagedMessagePort::Pending(port_impl, _)) => port_impl,
Some(ManagedMessagePort::Added(port_impl, _)) => port_impl,
};
if let Some(message_buffer) = port.start() {
for task in message_buffer {
let port_id = port_id.clone();
let this = Trusted::new(&*self);
let _ = self.port_message_queue().queue(
task!(process_pending_port_messages: move || {
let target_global = this.root();
target_global.route_task_to_port(port_id, task);
}),
&self,
);
}
}
} else {
return warn!("start_message_port called on a global not managing any ports.");
}
}
/// <https://html.spec.whatwg.org/multipage/#dom-messageport-close>
pub fn close_message_port(&self, port_id: &MessagePortId) {
if let MessagePortState::Managed(_id, message_ports) =
&mut *self.message_port_state.borrow_mut()
{
let port = match message_ports.get_mut(&port_id) {
None => panic!("close_message_port called on an unknown port."),
Some(ManagedMessagePort::Pending(port_impl, _)) => port_impl,
Some(ManagedMessagePort::Added(port_impl, _)) => port_impl,
};
port.close();
} else {
return warn!("close_message_port called on a global not managing any ports.");
}
}
/// <https://html.spec.whatwg.org/multipage/#message-port-post-message-steps>
// Steps 6 and 7
pub fn post_messageport_msg(&self, port_id: MessagePortId, task: PortMessageTask) {
if let MessagePortState::Managed(_id, message_ports) =
&mut *self.message_port_state.borrow_mut()
{
let port = match message_ports.get_mut(&port_id) {
None => panic!("post_messageport_msg called on an unknown port."),
Some(ManagedMessagePort::Pending(port_impl, _)) => port_impl,
Some(ManagedMessagePort::Added(port_impl, _)) => port_impl,
};
if let Some(entangled_id) = port.entangled_port_id() {
// Step 7
let this = Trusted::new(&*self);
let _ = self.port_message_queue().queue(
task!(post_message: move || {
let global = this.root();
// Note: we do this in a task, as this will ensure the global and constellation
// are aware of any transfer that might still take place in the current task.
global.route_task_to_port(entangled_id, task);
}),
self,
);
}
} else {
return warn!("post_messageport_msg called on a global not managing any ports.");
}
}
/// If we don't know about the port,
/// send the message to the constellation for routing.
fn re_route_port_task(&self, port_id: MessagePortId, task: PortMessageTask) {
let _ = self
.script_to_constellation_chan()
.send(ScriptMsg::RerouteMessagePort(port_id, task));
}
/// Route the task to be handled by the relevant port.
pub fn route_task_to_port(&self, port_id: MessagePortId, task: PortMessageTask) {
let should_dispatch = if let MessagePortState::Managed(_id, message_ports) =
&mut *self.message_port_state.borrow_mut()
{
if !message_ports.contains_key(&port_id) {
self.re_route_port_task(port_id, task);
return;
}
let (port_impl, dom_port) = match message_ports.get_mut(&port_id) {
None => panic!("route_task_to_port called for an unknown port."),
Some(ManagedMessagePort::Pending(port_impl, dom_port)) => (port_impl, dom_port),
Some(ManagedMessagePort::Added(port_impl, dom_port)) => (port_impl, dom_port),
};
// If the port is not enabled yet, or if is awaiting the completion of it's transfer,
// the task will be buffered and dispatched upon enablement or completion of the transfer.
if let Some(task_to_dispatch) = port_impl.handle_incoming(task) {
// Get a corresponding DOM message-port object.
let dom_port = match dom_port.root() {
Some(dom_port) => dom_port,
None => panic!("Messageport Gc'ed too early"),
};
Some((dom_port, task_to_dispatch))
} else {
None
}
} else {
self.re_route_port_task(port_id, task);
return;
};
if let Some((dom_port, PortMessageTask { origin, data })) = should_dispatch {
// Substep 3-4
rooted!(in(*self.get_cx()) let mut message_clone = UndefinedValue());
if let Ok(ports) = structuredclone::read(self, data, message_clone.handle_mut()) {
// Substep 6
// Dispatch the event, using the dom message-port.
MessageEvent::dispatch_jsval(
&dom_port.upcast(),
self,
message_clone.handle(),
Some(&origin.ascii_serialization()),
None,
ports,
);
} else {
// Step 4, fire messageerror event.
MessageEvent::dispatch_error(&dom_port.upcast(), self);
}
}
}
/// Check all ports that have been transfer-received in the previous task,
/// and complete their transfer if they haven't been re-transferred.
pub fn maybe_add_pending_ports(&self) {
if let MessagePortState::Managed(router_id, message_ports) =
&mut *self.message_port_state.borrow_mut()
{
let to_be_added: Vec<MessagePortId> = message_ports
.iter()
.filter_map(|(id, port_info)| match port_info {
ManagedMessagePort::Pending(_, _) => Some(id.clone()),
_ => None,
})
.collect();
for id in to_be_added {
let (id, port_info) = message_ports
.remove_entry(&id)
.expect("Collected port-id to match an entry");
if let ManagedMessagePort::Pending(port_impl, dom_port) = port_info {
let _ = self
.script_to_constellation_chan()
.send(ScriptMsg::NewMessagePort(
router_id.clone(),
port_impl.message_port_id().clone(),
));
let new_port_info = ManagedMessagePort::Added(port_impl, dom_port);
let present = message_ports.insert(id, new_port_info);
assert!(present.is_none());
}
}
} else {
warn!("maybe_add_pending_ports called on a global not managing any ports.");
}
}
/// https://html.spec.whatwg.org/multipage/#ports-and-garbage-collection
pub fn perform_a_message_port_garbage_collection_checkpoint(&self) {
let is_empty = if let MessagePortState::Managed(_id, message_ports) =
&mut *self.message_port_state.borrow_mut()
{
let to_be_removed: Vec<MessagePortId> = message_ports
.iter()
.filter_map(|(id, port_info)| {
if let ManagedMessagePort::Added(_port_impl, dom_port) = port_info {
if dom_port.root().is_none() {
// Let the constellation know to drop this port and the one it is entangled with,
// and to forward this message to the script-process where the entangled is found.
let _ = self
.script_to_constellation_chan()
.send(ScriptMsg::RemoveMessagePort(id.clone()));
return Some(id.clone());
}
}
None
})
.collect();
for id in to_be_removed {
message_ports.remove(&id);
}
message_ports.is_empty()
} else {
false
};
if is_empty {
self.remove_message_ports_router();
}
}
/// Start tracking a message-port
pub fn track_message_port(&self, dom_port: &MessagePort, port_impl: Option<MessagePortImpl>) {
let mut current_state = self.message_port_state.borrow_mut();
if let MessagePortState::UnManaged = &*current_state {
// Setup a route for IPC, for messages from the constellation to our ports.
let (port_control_sender, port_control_receiver) =
ipc::channel().expect("ipc channel failure");
let context = Trusted::new(self);
let (task_source, canceller) = (
self.port_message_queue(),
self.task_canceller(TaskSourceName::PortMessage),
);
let listener = MessageListener {
canceller,
task_source,
context,
};
ROUTER.add_route(
port_control_receiver.to_opaque(),
Box::new(move |message| {
let msg = message.to();
match msg {
Ok(msg) => listener.notify(msg),
Err(err) => warn!("Error receiving a MessagePortMsg: {:?}", err),
}
}),
);
let router_id = MessagePortRouterId::new();
*current_state = MessagePortState::Managed(router_id.clone(), HashMap::new());
let _ = self
.script_to_constellation_chan()
.send(ScriptMsg::NewMessagePortRouter(
router_id,
port_control_sender,
));
}
if let MessagePortState::Managed(router_id, message_ports) = &mut *current_state {
if let Some(port_impl) = port_impl {
// We keep transfer-received ports as "pending",
// and only ask the constellation to complete the transfer
// if they're not re-shipped in the current task.
message_ports.insert(
dom_port.message_port_id().clone(),
ManagedMessagePort::Pending(port_impl, WeakRef::new(dom_port)),
);
// Queue a task to complete the transfer,
// unless the port is re-transferred in the current task.
let this = Trusted::new(&*self);
let _ = self.port_message_queue().queue(
task!(process_pending_port_messages: move || {
let target_global = this.root();
target_global.maybe_add_pending_ports();
}),
&self,
);
} else {
// If this is a newly-created port, let the constellation immediately know.
let port_impl = MessagePortImpl::new(dom_port.message_port_id().clone());
message_ports.insert(
dom_port.message_port_id().clone(),
ManagedMessagePort::Added(port_impl, WeakRef::new(dom_port)),
);
let _ = self
.script_to_constellation_chan()
.send(ScriptMsg::NewMessagePort(
router_id.clone(),
dom_port.message_port_id().clone(),
));
};
} else {
panic!("track_message_port should have first switched the state to managed.");
}
}
pub fn track_worker(&self, closing_worker: Arc<AtomicBool>) {
self.list_auto_close_worker
.borrow_mut()
@ -550,7 +1026,7 @@ impl GlobalScope {
if let Some(worker) = self.downcast::<WorkerGlobalScope>() {
return worker.websocket_task_source();
}
unreachable!()
unreachable!();
}
/// Evaluate JS code on this global scope.