mirror of
https://github.com/servo/servo.git
synced 2025-07-23 07:13:52 +01:00
implement broadcastchannel
This commit is contained in:
parent
145c89a2d4
commit
eb21d5f738
32 changed files with 763 additions and 216 deletions
|
@ -3,6 +3,7 @@
|
|||
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
|
||||
|
||||
use crate::dom::bindings::cell::DomRefCell;
|
||||
use crate::dom::bindings::codegen::Bindings::BroadcastChannelBinding::BroadcastChannelMethods;
|
||||
use crate::dom::bindings::codegen::Bindings::EventSourceBinding::EventSourceBinding::EventSourceMethods;
|
||||
use crate::dom::bindings::codegen::Bindings::VoidFunctionBinding::VoidFunction;
|
||||
use crate::dom::bindings::codegen::Bindings::WindowBinding::WindowMethods;
|
||||
|
@ -19,6 +20,7 @@ use crate::dom::bindings::structuredclone;
|
|||
use crate::dom::bindings::utils::to_frozen_array;
|
||||
use crate::dom::bindings::weakref::{DOMTracker, WeakRef};
|
||||
use crate::dom::blob::Blob;
|
||||
use crate::dom::broadcastchannel::BroadcastChannel;
|
||||
use crate::dom::crypto::Crypto;
|
||||
use crate::dom::dedicatedworkerglobalscope::DedicatedWorkerGlobalScope;
|
||||
use crate::dom::errorevent::ErrorEvent;
|
||||
|
@ -71,7 +73,9 @@ use js::rust::wrappers::EvaluateUtf8;
|
|||
use js::rust::{get_object_class, CompileOptionsWrapper, ParentRuntime, Runtime};
|
||||
use js::rust::{HandleValue, MutableHandleValue};
|
||||
use js::{JSCLASS_IS_DOMJSCLASS, JSCLASS_IS_GLOBAL};
|
||||
use msg::constellation_msg::{BlobId, MessagePortId, MessagePortRouterId, PipelineId};
|
||||
use msg::constellation_msg::{
|
||||
BlobId, BroadcastChannelRouterId, MessagePortId, MessagePortRouterId, PipelineId,
|
||||
};
|
||||
use net_traits::blob_url_store::{get_blob_origin, BlobBuf};
|
||||
use net_traits::filemanager_thread::{
|
||||
FileManagerResult, FileManagerThreadMsg, ReadFileProgress, RelativePos,
|
||||
|
@ -82,7 +86,8 @@ use profile_traits::{ipc as profile_ipc, mem as profile_mem, time as profile_tim
|
|||
use script_traits::serializable::{BlobData, BlobImpl, FileBlob};
|
||||
use script_traits::transferable::MessagePortImpl;
|
||||
use script_traits::{
|
||||
MessagePortMsg, MsDuration, PortMessageTask, ScriptMsg, ScriptToConstellationChan, TimerEvent,
|
||||
BroadcastMsg, MessagePortMsg, MsDuration, PortMessageTask, ScriptMsg,
|
||||
ScriptToConstellationChan, TimerEvent,
|
||||
};
|
||||
use script_traits::{TimerEventId, TimerSchedulerMsg, TimerSource};
|
||||
use servo_url::{MutableOrigin, ServoUrl};
|
||||
|
@ -124,6 +129,9 @@ pub struct GlobalScope {
|
|||
/// The message-port router id for this global, if it is managing ports.
|
||||
message_port_state: DomRefCell<MessagePortState>,
|
||||
|
||||
/// The broadcast channels state this global, if it is managing any.
|
||||
broadcast_channel_state: DomRefCell<BroadcastChannelState>,
|
||||
|
||||
/// The blobs managed by this global, if any.
|
||||
blob_state: DomRefCell<BlobState>,
|
||||
|
||||
|
@ -237,6 +245,13 @@ struct MessageListener {
|
|||
context: Trusted<GlobalScope>,
|
||||
}
|
||||
|
||||
/// A wrapper for broadcasts coming in over IPC, and the event-loop.
|
||||
struct BroadcastListener {
|
||||
canceller: TaskCanceller,
|
||||
task_source: DOMManipulationTaskSource,
|
||||
context: Trusted<GlobalScope>,
|
||||
}
|
||||
|
||||
/// A wrapper between timer events coming in over IPC, and the event-loop.
|
||||
struct TimerListener {
|
||||
canceller: TaskCanceller,
|
||||
|
@ -310,6 +325,23 @@ pub struct ManagedMessagePort {
|
|||
closed: bool,
|
||||
}
|
||||
|
||||
/// State representing whether this global is currently managing broadcast channels.
|
||||
#[derive(JSTraceable, MallocSizeOf)]
|
||||
#[unrooted_must_root_lint::must_root]
|
||||
pub enum BroadcastChannelState {
|
||||
/// The broadcast-channel router id for this global, and a queue of managed channels.
|
||||
/// Step 9, "sort destinations"
|
||||
/// of https://html.spec.whatwg.org/multipage/#dom-broadcastchannel-postmessage
|
||||
/// requires keeping track of creation order, hence the queue.
|
||||
Managed(
|
||||
BroadcastChannelRouterId,
|
||||
/// The map of channel-name to queue of channels, in order of creation.
|
||||
HashMap<DOMString, VecDeque<Dom<BroadcastChannel>>>,
|
||||
),
|
||||
/// This global is not managing any broadcast channels at this time.
|
||||
UnManaged,
|
||||
}
|
||||
|
||||
/// State representing whether this global is currently managing messageports.
|
||||
#[derive(JSTraceable, MallocSizeOf)]
|
||||
#[unrooted_must_root_lint::must_root]
|
||||
|
@ -323,6 +355,29 @@ pub enum MessagePortState {
|
|||
UnManaged,
|
||||
}
|
||||
|
||||
impl BroadcastListener {
|
||||
/// Handle a broadcast coming in over IPC,
|
||||
/// by queueing the appropriate task on the relevant event-loop.
|
||||
fn handle(&self, event: BroadcastMsg) {
|
||||
let context = self.context.clone();
|
||||
|
||||
// Note: strictly speaking we should just queue the message event tasks,
|
||||
// not queue a task that then queues more tasks.
|
||||
// This however seems to be hard to avoid in the light of the IPC.
|
||||
// One can imagine queueing tasks directly,
|
||||
// for channels that would be in the same script-thread.
|
||||
let _ = self.task_source.queue_with_canceller(
|
||||
task!(broadcast_message_event: move || {
|
||||
let global = context.root();
|
||||
// Step 10 of https://html.spec.whatwg.org/multipage/#dom-broadcastchannel-postmessage,
|
||||
// For each BroadcastChannel object destination in destinations, queue a task.
|
||||
global.broadcast_message_event(event, None);
|
||||
}),
|
||||
&self.canceller,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl TimerListener {
|
||||
/// Handle a timer-event coming-in over IPC,
|
||||
/// by queuing the appropriate task on the relevant event-loop.
|
||||
|
@ -501,6 +556,7 @@ impl GlobalScope {
|
|||
) -> Self {
|
||||
Self {
|
||||
message_port_state: DomRefCell::new(MessagePortState::UnManaged),
|
||||
broadcast_channel_state: DomRefCell::new(BroadcastChannelState::UnManaged),
|
||||
blob_state: DomRefCell::new(BlobState::UnManaged),
|
||||
eventtarget: EventTarget::new_inherited(),
|
||||
crypto: Default::default(),
|
||||
|
@ -613,11 +669,18 @@ impl GlobalScope {
|
|||
pub fn perform_a_dom_garbage_collection_checkpoint(&self) {
|
||||
self.perform_a_message_port_garbage_collection_checkpoint();
|
||||
self.perform_a_blob_garbage_collection_checkpoint();
|
||||
self.perform_a_broadcast_channel_garbage_collection_checkpoint();
|
||||
}
|
||||
|
||||
/// Remove the routers for ports and broadcast-channels.
|
||||
pub fn remove_web_messaging_infra(&self) {
|
||||
self.remove_message_ports_router();
|
||||
self.remove_broadcast_channel_router();
|
||||
}
|
||||
|
||||
/// Update our state to un-managed,
|
||||
/// and tell the constellation to drop the sender to our message-port router.
|
||||
pub fn remove_message_ports_router(&self) {
|
||||
fn remove_message_ports_router(&self) {
|
||||
if let MessagePortState::Managed(router_id, _message_ports) =
|
||||
&*self.message_port_state.borrow()
|
||||
{
|
||||
|
@ -628,6 +691,22 @@ impl GlobalScope {
|
|||
*self.message_port_state.borrow_mut() = MessagePortState::UnManaged;
|
||||
}
|
||||
|
||||
/// Update our state to un-managed,
|
||||
/// and tell the constellation to drop the sender to our broadcast router.
|
||||
fn remove_broadcast_channel_router(&self) {
|
||||
if let BroadcastChannelState::Managed(router_id, _channels) =
|
||||
&*self.broadcast_channel_state.borrow()
|
||||
{
|
||||
let _ =
|
||||
self.script_to_constellation_chan()
|
||||
.send(ScriptMsg::RemoveBroadcastChannelRouter(
|
||||
router_id.clone(),
|
||||
self.origin().immutable().clone(),
|
||||
));
|
||||
}
|
||||
*self.broadcast_channel_state.borrow_mut() = BroadcastChannelState::UnManaged;
|
||||
}
|
||||
|
||||
/// <https://html.spec.whatwg.org/multipage/#entangle>
|
||||
pub fn entangle_ports(&self, port1: MessagePortId, port2: MessagePortId) {
|
||||
if let MessagePortState::Managed(_id, message_ports) =
|
||||
|
@ -789,6 +868,115 @@ impl GlobalScope {
|
|||
.send(ScriptMsg::RerouteMessagePort(port_id, task));
|
||||
}
|
||||
|
||||
/// <https://html.spec.whatwg.org/multipage/#dom-broadcastchannel-postmessage>
|
||||
/// Step 7 and following steps.
|
||||
pub fn schedule_broadcast(&self, msg: BroadcastMsg, channel_id: &Uuid) {
|
||||
// First, broadcast locally.
|
||||
self.broadcast_message_event(msg.clone(), Some(channel_id));
|
||||
|
||||
if let BroadcastChannelState::Managed(router_id, _) =
|
||||
&*self.broadcast_channel_state.borrow()
|
||||
{
|
||||
// Second, broadcast to other globals via the constellation.
|
||||
//
|
||||
// Note: for globals in the same script-thread,
|
||||
// we could skip the hop to the constellation.
|
||||
let _ = self
|
||||
.script_to_constellation_chan()
|
||||
.send(ScriptMsg::ScheduleBroadcast(router_id.clone(), msg));
|
||||
} else {
|
||||
panic!("Attemps to broadcast a message via global not managing any channels.");
|
||||
}
|
||||
}
|
||||
|
||||
/// <https://html.spec.whatwg.org/multipage/#dom-broadcastchannel-postmessage>
|
||||
/// Step 7 and following steps.
|
||||
pub fn broadcast_message_event(&self, event: BroadcastMsg, channel_id: Option<&Uuid>) {
|
||||
if let BroadcastChannelState::Managed(_, channels) = &*self.broadcast_channel_state.borrow()
|
||||
{
|
||||
let BroadcastMsg {
|
||||
data,
|
||||
origin,
|
||||
channel_name,
|
||||
} = event;
|
||||
|
||||
// Step 7, a few preliminary steps.
|
||||
|
||||
// - Check the worker is not closing.
|
||||
if let Some(worker) = self.downcast::<WorkerGlobalScope>() {
|
||||
if worker.is_closing() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// - Check the associated document is fully-active.
|
||||
if let Some(window) = self.downcast::<Window>() {
|
||||
if !window.Document().is_fully_active() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// - Check for a case-sensitive match for the name of the channel.
|
||||
let channel_name = DOMString::from_string(channel_name);
|
||||
|
||||
if let Some(channels) = channels.get(&channel_name) {
|
||||
channels
|
||||
.iter()
|
||||
.filter(|ref channel| {
|
||||
// Step 8.
|
||||
// Filter out the sender.
|
||||
if let Some(id) = channel_id {
|
||||
channel.id() != id
|
||||
} else {
|
||||
true
|
||||
}
|
||||
})
|
||||
.map(|channel| DomRoot::from_ref(&**channel))
|
||||
// Step 9, sort by creation order,
|
||||
// done by using a queue to store channels in creation order.
|
||||
.for_each(|channel| {
|
||||
let data = data.clone_for_broadcast();
|
||||
let origin = origin.clone();
|
||||
|
||||
// Step 10: Queue a task on the DOM manipulation task-source,
|
||||
// to fire the message event
|
||||
let channel = Trusted::new(&*channel);
|
||||
let global = Trusted::new(&*self);
|
||||
let _ = self.dom_manipulation_task_source().queue(
|
||||
task!(process_pending_port_messages: move || {
|
||||
let destination = channel.root();
|
||||
let global = global.root();
|
||||
|
||||
// 10.1 Check for closed flag.
|
||||
if destination.closed() {
|
||||
return;
|
||||
}
|
||||
|
||||
rooted!(in(*global.get_cx()) let mut message = UndefinedValue());
|
||||
|
||||
// Step 10.3 StructuredDeserialize(serialized, targetRealm).
|
||||
if let Ok(ports) = structuredclone::read(&global, data, message.handle_mut()) {
|
||||
// Step 10.4, Fire an event named message at destination.
|
||||
MessageEvent::dispatch_jsval(
|
||||
&*destination.upcast(),
|
||||
&global,
|
||||
message.handle(),
|
||||
Some(&origin.ascii_serialization()),
|
||||
None,
|
||||
ports,
|
||||
);
|
||||
} else {
|
||||
// Step 10.3, fire an event named messageerror at destination.
|
||||
MessageEvent::dispatch_error(&*destination.upcast(), &global);
|
||||
}
|
||||
}),
|
||||
&self,
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Route the task to be handled by the relevant port.
|
||||
pub fn route_task_to_port(&self, port_id: MessagePortId, task: PortMessageTask) {
|
||||
let should_dispatch = if let MessagePortState::Managed(_id, message_ports) =
|
||||
|
@ -905,6 +1093,93 @@ impl GlobalScope {
|
|||
}
|
||||
}
|
||||
|
||||
/// Remove broadcast-channels that are closed.
|
||||
/// TODO: Also remove them if they do not have an event-listener.
|
||||
/// see https://github.com/servo/servo/issues/25772
|
||||
pub fn perform_a_broadcast_channel_garbage_collection_checkpoint(&self) {
|
||||
let is_empty = if let BroadcastChannelState::Managed(router_id, ref mut channels) =
|
||||
&mut *self.broadcast_channel_state.borrow_mut()
|
||||
{
|
||||
channels.retain(|name, ref mut channels| {
|
||||
channels.retain(|ref chan| !chan.closed());
|
||||
if channels.is_empty() {
|
||||
let _ = self.script_to_constellation_chan().send(
|
||||
ScriptMsg::RemoveBroadcastChannelNameInRouter(
|
||||
router_id.clone(),
|
||||
name.to_string(),
|
||||
self.origin().immutable().clone(),
|
||||
),
|
||||
);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
channels.is_empty()
|
||||
} else {
|
||||
false
|
||||
};
|
||||
if is_empty {
|
||||
self.remove_broadcast_channel_router();
|
||||
}
|
||||
}
|
||||
|
||||
/// Start tracking a broadcast-channel.
|
||||
pub fn track_broadcast_channel(&self, dom_channel: &BroadcastChannel) {
|
||||
let mut current_state = self.broadcast_channel_state.borrow_mut();
|
||||
|
||||
if let BroadcastChannelState::UnManaged = &*current_state {
|
||||
// Setup a route for IPC, for broadcasts from the constellation to our channels.
|
||||
let (broadcast_control_sender, broadcast_control_receiver) =
|
||||
ipc::channel().expect("ipc channel failure");
|
||||
let context = Trusted::new(self);
|
||||
let (task_source, canceller) = (
|
||||
self.dom_manipulation_task_source(),
|
||||
self.task_canceller(TaskSourceName::DOMManipulation),
|
||||
);
|
||||
let listener = BroadcastListener {
|
||||
canceller,
|
||||
task_source,
|
||||
context,
|
||||
};
|
||||
ROUTER.add_route(
|
||||
broadcast_control_receiver.to_opaque(),
|
||||
Box::new(move |message| {
|
||||
let msg = message.to();
|
||||
match msg {
|
||||
Ok(msg) => listener.handle(msg),
|
||||
Err(err) => warn!("Error receiving a BroadcastMsg: {:?}", err),
|
||||
}
|
||||
}),
|
||||
);
|
||||
let router_id = BroadcastChannelRouterId::new();
|
||||
*current_state = BroadcastChannelState::Managed(router_id.clone(), HashMap::new());
|
||||
let _ = self
|
||||
.script_to_constellation_chan()
|
||||
.send(ScriptMsg::NewBroadcastChannelRouter(
|
||||
router_id,
|
||||
broadcast_control_sender,
|
||||
self.origin().immutable().clone(),
|
||||
));
|
||||
}
|
||||
|
||||
if let BroadcastChannelState::Managed(router_id, channels) = &mut *current_state {
|
||||
let entry = channels.entry(dom_channel.Name()).or_insert_with(|| {
|
||||
let _ = self.script_to_constellation_chan().send(
|
||||
ScriptMsg::NewBroadcastChannelNameInRouter(
|
||||
router_id.clone(),
|
||||
dom_channel.Name().to_string(),
|
||||
self.origin().immutable().clone(),
|
||||
),
|
||||
);
|
||||
VecDeque::new()
|
||||
});
|
||||
entry.push_back(Dom::from_ref(dom_channel));
|
||||
} else {
|
||||
panic!("track_broadcast_channel should have first switched the state to managed.");
|
||||
}
|
||||
}
|
||||
|
||||
/// Start tracking a message-port
|
||||
pub fn track_message_port(&self, dom_port: &MessagePort, port_impl: Option<MessagePortImpl>) {
|
||||
let mut current_state = self.message_port_state.borrow_mut();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue