Refactor constellation broadcast channel (#38077)

- Move the 2 hash maps used to manage channels in their own struct.
- The constellation is still in charge of origin checks since it holds
the pipeline information required.
- BroadcastMsg is renamed to BroadcastChannelMsg for consistency.


Testing: covered by existing tests.
Fixes: #38060

Signed-off-by: webbeef <me@webbeef.org>
This commit is contained in:
webbeef 2025-07-15 06:57:05 -07:00 committed by GitHub
parent 34829dfce7
commit 30b6e289e0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 199 additions and 198 deletions

View file

@ -0,0 +1,129 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use std::collections::HashMap;
use base::id::BroadcastChannelRouterId;
use constellation_traits::BroadcastChannelMsg;
use ipc_channel::ipc::IpcSender;
use log::warn;
use servo_url::ImmutableOrigin;
#[derive(Default)]
pub(crate) struct BroadcastChannels {
/// A map of broadcast routers to their IPC sender.
routers: HashMap<BroadcastChannelRouterId, IpcSender<BroadcastChannelMsg>>,
/// A map of origin to a map of channel name to a list of relevant routers.
channels: HashMap<ImmutableOrigin, HashMap<String, Vec<BroadcastChannelRouterId>>>,
}
impl BroadcastChannels {
/// Add a new broadcast router.
#[servo_tracing::instrument(skip_all)]
pub fn new_broadcast_channel_router(
&mut self,
router_id: BroadcastChannelRouterId,
broadcast_ipc_sender: IpcSender<BroadcastChannelMsg>,
) {
if self
.routers
.insert(router_id, broadcast_ipc_sender)
.is_some()
{
warn!("Multiple attempts to add BroadcastChannel router.");
}
}
/// Remove a broadcast router.
#[servo_tracing::instrument(skip_all)]
pub fn remove_broadcast_channel_router(&mut self, router_id: BroadcastChannelRouterId) {
if self.routers.remove(&router_id).is_none() {
warn!("Attempt to remove unknown BroadcastChannel router.");
}
// Also remove the router_id from the broadcast_channels list.
for channels in self.channels.values_mut() {
for routers in channels.values_mut() {
routers.retain(|router| router != &router_id);
}
}
}
/// Note a new channel-name relevant to a given broadcast router.
#[servo_tracing::instrument(skip_all)]
pub fn new_broadcast_channel_name_in_router(
&mut self,
router_id: BroadcastChannelRouterId,
channel_name: String,
origin: ImmutableOrigin,
) {
let channels = self.channels.entry(origin).or_default();
let routers = channels.entry(channel_name).or_default();
routers.push(router_id);
}
/// Remove a channel-name for a given broadcast router.
#[servo_tracing::instrument(skip_all)]
pub fn remove_broadcast_channel_name_in_router(
&mut self,
router_id: BroadcastChannelRouterId,
channel_name: String,
origin: ImmutableOrigin,
) {
if let Some(channels) = self.channels.get_mut(&origin) {
let is_empty = if let Some(routers) = channels.get_mut(&channel_name) {
routers.retain(|router| router != &router_id);
routers.is_empty()
} else {
return warn!(
"Multiple attempts to remove name for BroadcastChannel {:?} at {:?}",
channel_name, origin
);
};
if is_empty {
channels.remove(&channel_name);
}
} else {
warn!(
"Attempt to remove a channel name for an origin without channels {:?}",
origin
);
}
}
/// Broadcast a message via routers in various event-loops.
#[servo_tracing::instrument(skip_all)]
pub fn schedule_broadcast(
&self,
router_id: BroadcastChannelRouterId,
message: BroadcastChannelMsg,
) {
if let Some(channels) = self.channels.get(&message.origin) {
let routers = match channels.get(&message.channel_name) {
Some(routers) => routers,
None => return warn!("Broadcast to channel name without active routers."),
};
for router in routers {
// Exclude the sender of the broadcast.
// Broadcasting locally is done at the point of sending.
if router == &router_id {
continue;
}
if let Some(broadcast_ipc_sender) = self.routers.get(router) {
if broadcast_ipc_sender.send(message.clone()).is_err() {
warn!("Failed to broadcast message to router: {:?}", router);
}
} else {
warn!("No sender for broadcast router: {:?}", router);
}
}
} else {
warn!(
"Attempt to schedule a broadcast for an origin without routers {:?}",
message.origin
);
}
}
}

View file

@ -100,9 +100,8 @@ use background_hang_monitor_api::{
}; };
use base::Epoch; use base::Epoch;
use base::id::{ use base::id::{
BroadcastChannelRouterId, BrowsingContextGroupId, BrowsingContextId, HistoryStateId, BrowsingContextGroupId, BrowsingContextId, HistoryStateId, MessagePortId, MessagePortRouterId,
MessagePortId, MessagePortRouterId, PipelineId, PipelineNamespace, PipelineNamespaceId, PipelineId, PipelineNamespace, PipelineNamespaceId, PipelineNamespaceRequest, WebViewId,
PipelineNamespaceRequest, WebViewId,
}; };
#[cfg(feature = "bluetooth")] #[cfg(feature = "bluetooth")]
use bluetooth_traits::BluetoothRequest; use bluetooth_traits::BluetoothRequest;
@ -115,7 +114,7 @@ use compositing_traits::{
WebrenderExternalImageRegistry, WebrenderExternalImageRegistry,
}; };
use constellation_traits::{ use constellation_traits::{
AuxiliaryWebViewCreationRequest, AuxiliaryWebViewCreationResponse, BroadcastMsg, DocumentState, AuxiliaryWebViewCreationRequest, AuxiliaryWebViewCreationResponse, DocumentState,
EmbedderToConstellationMessage, IFrameLoadInfo, IFrameLoadInfoWithData, IFrameSandboxState, EmbedderToConstellationMessage, IFrameLoadInfo, IFrameLoadInfoWithData, IFrameSandboxState,
IFrameSizeMsg, Job, LoadData, LoadOrigin, LogEntry, MessagePortMsg, NavigationHistoryBehavior, IFrameSizeMsg, Job, LoadData, LoadOrigin, LogEntry, MessagePortMsg, NavigationHistoryBehavior,
PaintMetricEvent, PortMessageTask, PortTransferInfo, SWManagerMsg, SWManagerSenders, PaintMetricEvent, PortMessageTask, PortTransferInfo, SWManagerMsg, SWManagerSenders,
@ -169,6 +168,7 @@ use webrender::RenderApiSender;
use webrender_api::units::LayoutVector2D; use webrender_api::units::LayoutVector2D;
use webrender_api::{DocumentId, ExternalScrollId, ImageKey}; use webrender_api::{DocumentId, ExternalScrollId, ImageKey};
use crate::broadcastchannel::BroadcastChannels;
use crate::browsingcontext::{ use crate::browsingcontext::{
AllBrowsingContextsIterator, BrowsingContext, FullyActiveBrowsingContextsIterator, AllBrowsingContextsIterator, BrowsingContext, FullyActiveBrowsingContextsIterator,
NewBrowsingContextInfo, NewBrowsingContextInfo,
@ -368,11 +368,8 @@ pub struct Constellation<STF, SWF> {
/// A map of router-id to ipc-sender, to route messages to ports. /// A map of router-id to ipc-sender, to route messages to ports.
message_port_routers: HashMap<MessagePortRouterId, IpcSender<MessagePortMsg>>, message_port_routers: HashMap<MessagePortRouterId, IpcSender<MessagePortMsg>>,
/// A map of broadcast routers to their IPC sender. /// Bookkeeping for BroadcastChannel functionnality.
broadcast_routers: HashMap<BroadcastChannelRouterId, IpcSender<BroadcastMsg>>, broadcast_channels: BroadcastChannels,
/// A map of origin to a map of channel-name to a list of relevant routers.
broadcast_channels: HashMap<ImmutableOrigin, HashMap<String, Vec<BroadcastChannelRouterId>>>,
/// The set of all the pipelines in the browser. (See the `pipeline` module /// The set of all the pipelines in the browser. (See the `pipeline` module
/// for more details.) /// for more details.)
@ -678,8 +675,7 @@ where
browsing_context_group_next_id: Default::default(), browsing_context_group_next_id: Default::default(),
message_ports: HashMap::new(), message_ports: HashMap::new(),
message_port_routers: HashMap::new(), message_port_routers: HashMap::new(),
broadcast_routers: HashMap::new(), broadcast_channels: Default::default(),
broadcast_channels: HashMap::new(),
pipelines: HashMap::new(), pipelines: HashMap::new(),
browsing_contexts: HashMap::new(), browsing_contexts: HashMap::new(),
pending_changes: vec![], pending_changes: vec![],
@ -1570,42 +1566,64 @@ where
response_sender, response_sender,
origin, origin,
) => { ) => {
self.handle_new_broadcast_channel_router( if self
source_pipeline_id, .check_origin_against_pipeline(&source_pipeline_id, &origin)
router_id, .is_err()
response_sender, {
origin, return warn!("Attempt to add broadcast router from an unexpected origin.");
); }
self.broadcast_channels
.new_broadcast_channel_router(router_id, response_sender);
}, },
ScriptToConstellationMessage::NewBroadcastChannelNameInRouter( ScriptToConstellationMessage::NewBroadcastChannelNameInRouter(
router_id, router_id,
channel_name, channel_name,
origin, origin,
) => { ) => {
self.handle_new_broadcast_channel_name_in_router( if self
source_pipeline_id, .check_origin_against_pipeline(&source_pipeline_id, &origin)
router_id, .is_err()
channel_name, {
origin, return warn!("Attempt to add channel name from an unexpected origin.");
); }
self.broadcast_channels
.new_broadcast_channel_name_in_router(router_id, channel_name, origin);
}, },
ScriptToConstellationMessage::RemoveBroadcastChannelNameInRouter( ScriptToConstellationMessage::RemoveBroadcastChannelNameInRouter(
router_id, router_id,
channel_name, channel_name,
origin, origin,
) => { ) => {
self.handle_remove_broadcast_channel_name_in_router( if self
source_pipeline_id, .check_origin_against_pipeline(&source_pipeline_id, &origin)
router_id, .is_err()
channel_name, {
origin, return warn!("Attempt to remove channel name from an unexpected origin.");
); }
self.broadcast_channels
.remove_broadcast_channel_name_in_router(router_id, channel_name, origin);
}, },
ScriptToConstellationMessage::RemoveBroadcastChannelRouter(router_id, origin) => { ScriptToConstellationMessage::RemoveBroadcastChannelRouter(router_id, origin) => {
self.handle_remove_broadcast_channel_router(source_pipeline_id, router_id, origin); if self
.check_origin_against_pipeline(&source_pipeline_id, &origin)
.is_err()
{
return warn!("Attempt to remove broadcast router from an unexpected origin.");
}
self.broadcast_channels
.remove_broadcast_channel_router(router_id);
}, },
ScriptToConstellationMessage::ScheduleBroadcast(router_id, message) => { ScriptToConstellationMessage::ScheduleBroadcast(router_id, message) => {
self.handle_schedule_broadcast(source_pipeline_id, router_id, message); if self
.check_origin_against_pipeline(&source_pipeline_id, &message.origin)
.is_err()
{
return warn!(
"Attempt to schedule broadcast from an origin not matching the origin of the msg."
);
}
self.broadcast_channels
.schedule_broadcast(router_id, message);
}, },
ScriptToConstellationMessage::ForwardToEmbedder(embedder_msg) => { ScriptToConstellationMessage::ForwardToEmbedder(embedder_msg) => {
self.embedder_proxy.send(embedder_msg); self.embedder_proxy.send(embedder_msg);
@ -1890,157 +1908,6 @@ where
Err(()) Err(())
} }
/// Broadcast a message via routers in various event-loops.
#[servo_tracing::instrument(skip_all)]
fn handle_schedule_broadcast(
&self,
pipeline_id: PipelineId,
router_id: BroadcastChannelRouterId,
message: BroadcastMsg,
) {
if self
.check_origin_against_pipeline(&pipeline_id, &message.origin)
.is_err()
{
return warn!(
"Attempt to schedule broadcast from an origin not matching the origin of the msg."
);
}
if let Some(channels) = self.broadcast_channels.get(&message.origin) {
let routers = match channels.get(&message.channel_name) {
Some(routers) => routers,
None => return warn!("Broadcast to channel name without active routers."),
};
for router in routers {
// Exclude the sender of the broadcast.
// Broadcasting locally is done at the point of sending.
if router == &router_id {
continue;
}
if let Some(broadcast_ipc_sender) = self.broadcast_routers.get(router) {
if broadcast_ipc_sender.send(message.clone()).is_err() {
warn!("Failed to broadcast message to router: {:?}", router);
}
} else {
warn!("No sender for broadcast router: {:?}", router);
}
}
} else {
warn!(
"Attempt to schedule a broadcast for an origin without routers {:?}",
message.origin
);
}
}
/// Remove a channel-name for a given broadcast router.
#[servo_tracing::instrument(skip_all)]
fn handle_remove_broadcast_channel_name_in_router(
&mut self,
pipeline_id: PipelineId,
router_id: BroadcastChannelRouterId,
channel_name: String,
origin: ImmutableOrigin,
) {
if self
.check_origin_against_pipeline(&pipeline_id, &origin)
.is_err()
{
return warn!("Attempt to remove channel name from an unexpected origin.");
}
if let Some(channels) = self.broadcast_channels.get_mut(&origin) {
let is_empty = if let Some(routers) = channels.get_mut(&channel_name) {
routers.retain(|router| router != &router_id);
routers.is_empty()
} else {
return warn!(
"Multiple attempts to remove name for broadcast-channel {:?} at {:?}",
channel_name, origin
);
};
if is_empty {
channels.remove(&channel_name);
}
} else {
warn!(
"Attempt to remove a channel-name for an origin without channels {:?}",
origin
);
}
}
/// Note a new channel-name relevant to a given broadcast router.
#[servo_tracing::instrument(skip_all)]
fn handle_new_broadcast_channel_name_in_router(
&mut self,
pipeline_id: PipelineId,
router_id: BroadcastChannelRouterId,
channel_name: String,
origin: ImmutableOrigin,
) {
if self
.check_origin_against_pipeline(&pipeline_id, &origin)
.is_err()
{
return warn!("Attempt to add channel name from an unexpected origin.");
}
let channels = self.broadcast_channels.entry(origin).or_default();
let routers = channels.entry(channel_name).or_default();
routers.push(router_id);
}
/// Remove a broadcast router.
#[servo_tracing::instrument(skip_all)]
fn handle_remove_broadcast_channel_router(
&mut self,
pipeline_id: PipelineId,
router_id: BroadcastChannelRouterId,
origin: ImmutableOrigin,
) {
if self
.check_origin_against_pipeline(&pipeline_id, &origin)
.is_err()
{
return warn!("Attempt to remove broadcast router from an unexpected origin.");
}
if self.broadcast_routers.remove(&router_id).is_none() {
warn!("Attempt to remove unknown broadcast-channel router.");
}
// Also remove the router_id from the broadcast_channels list.
for channels in self.broadcast_channels.values_mut() {
for routers in channels.values_mut() {
routers.retain(|router| router != &router_id);
}
}
}
/// Add a new broadcast router.
#[servo_tracing::instrument(skip_all)]
fn handle_new_broadcast_channel_router(
&mut self,
pipeline_id: PipelineId,
router_id: BroadcastChannelRouterId,
broadcast_ipc_sender: IpcSender<BroadcastMsg>,
origin: ImmutableOrigin,
) {
if self
.check_origin_against_pipeline(&pipeline_id, &origin)
.is_err()
{
return warn!("Attempt to add broadcast router from an unexpected origin.");
}
if self
.broadcast_routers
.insert(router_id, broadcast_ipc_sender)
.is_some()
{
warn!("Multple attempt to add broadcast-channel router.");
}
}
#[servo_tracing::instrument(skip_all)] #[servo_tracing::instrument(skip_all)]
#[cfg(feature = "webgpu")] #[cfg(feature = "webgpu")]
fn handle_wgpu_request( fn handle_wgpu_request(

View file

@ -7,6 +7,7 @@
#[macro_use] #[macro_use]
mod tracing; mod tracing;
mod broadcastchannel;
mod browsingcontext; mod browsingcontext;
mod constellation; mod constellation;
mod constellation_webview; mod constellation_webview;

View file

@ -4,7 +4,7 @@
use std::cell::Cell; use std::cell::Cell;
use constellation_traits::BroadcastMsg; use constellation_traits::BroadcastChannelMsg;
use dom_struct::dom_struct; use dom_struct::dom_struct;
use js::rust::{HandleObject, HandleValue}; use js::rust::{HandleObject, HandleValue};
use uuid::Uuid; use uuid::Uuid;
@ -89,7 +89,7 @@ impl BroadcastChannelMethods<crate::DomTypeHolder> for BroadcastChannel {
let global = self.global(); let global = self.global();
let msg = BroadcastMsg { let msg = BroadcastChannelMsg {
origin: global.origin().immutable().clone(), origin: global.origin().immutable().clone(),
channel_name: self.Name().to_string(), channel_name: self.Name().to_string(),
data, data,

View file

@ -18,8 +18,8 @@ use base::id::{
ServiceWorkerId, ServiceWorkerRegistrationId, WebViewId, ServiceWorkerId, ServiceWorkerRegistrationId, WebViewId,
}; };
use constellation_traits::{ use constellation_traits::{
BlobData, BlobImpl, BroadcastMsg, FileBlob, MessagePortImpl, MessagePortMsg, PortMessageTask, BlobData, BlobImpl, BroadcastChannelMsg, FileBlob, MessagePortImpl, MessagePortMsg,
ScriptToConstellationChan, ScriptToConstellationMessage, PortMessageTask, ScriptToConstellationChan, ScriptToConstellationMessage,
}; };
use content_security_policy::CspList; use content_security_policy::CspList;
use crossbeam_channel::Sender; use crossbeam_channel::Sender;
@ -501,7 +501,7 @@ pub(crate) enum MessagePortState {
impl BroadcastListener { impl BroadcastListener {
/// Handle a broadcast coming in over IPC, /// Handle a broadcast coming in over IPC,
/// by queueing the appropriate task on the relevant event-loop. /// by queueing the appropriate task on the relevant event-loop.
fn handle(&self, event: BroadcastMsg) { fn handle(&self, event: BroadcastChannelMsg) {
let context = self.context.clone(); let context = self.context.clone();
// Note: strictly speaking we should just queue the message event tasks, // Note: strictly speaking we should just queue the message event tasks,
@ -1223,7 +1223,7 @@ impl GlobalScope {
/// <https://html.spec.whatwg.org/multipage/#dom-broadcastchannel-postmessage> /// <https://html.spec.whatwg.org/multipage/#dom-broadcastchannel-postmessage>
/// Step 7 and following steps. /// Step 7 and following steps.
pub(crate) fn schedule_broadcast(&self, msg: BroadcastMsg, channel_id: &Uuid) { pub(crate) fn schedule_broadcast(&self, msg: BroadcastChannelMsg, channel_id: &Uuid) {
// First, broadcast locally. // First, broadcast locally.
self.broadcast_message_event(msg.clone(), Some(channel_id)); self.broadcast_message_event(msg.clone(), Some(channel_id));
@ -1244,10 +1244,14 @@ impl GlobalScope {
/// <https://html.spec.whatwg.org/multipage/#dom-broadcastchannel-postmessage> /// <https://html.spec.whatwg.org/multipage/#dom-broadcastchannel-postmessage>
/// Step 7 and following steps. /// Step 7 and following steps.
pub(crate) fn broadcast_message_event(&self, event: BroadcastMsg, channel_id: Option<&Uuid>) { pub(crate) fn broadcast_message_event(
&self,
event: BroadcastChannelMsg,
channel_id: Option<&Uuid>,
) {
if let BroadcastChannelState::Managed(_, channels) = &*self.broadcast_channel_state.borrow() if let BroadcastChannelState::Managed(_, channels) = &*self.broadcast_channel_state.borrow()
{ {
let BroadcastMsg { let BroadcastChannelMsg {
data, data,
origin, origin,
channel_name, channel_name,
@ -1617,7 +1621,7 @@ impl GlobalScope {
broadcast_control_receiver, broadcast_control_receiver,
Box::new(move |message| match message { Box::new(move |message| match message {
Ok(msg) => listener.handle(msg), Ok(msg) => listener.handle(msg),
Err(err) => warn!("Error receiving a BroadcastMsg: {:?}", err), Err(err) => warn!("Error receiving a BroadcastChannelMsg: {:?}", err),
}), }),
); );
let router_id = BroadcastChannelRouterId::new(); let router_id = BroadcastChannelRouterId::new();

View file

@ -36,7 +36,7 @@ use strum_macros::IntoStaticStr;
use webgpu_traits::{WebGPU, WebGPUAdapterResponse}; use webgpu_traits::{WebGPU, WebGPUAdapterResponse};
use webrender_api::ImageKey; use webrender_api::ImageKey;
use crate::structured_data::{BroadcastMsg, StructuredSerializedData}; use crate::structured_data::{BroadcastChannelMsg, StructuredSerializedData};
use crate::{ use crate::{
LogEntry, MessagePortMsg, PortMessageTask, PortTransferInfo, TraversalDirection, WindowSizeType, LogEntry, MessagePortMsg, PortMessageTask, PortTransferInfo, TraversalDirection, WindowSizeType,
}; };
@ -506,7 +506,7 @@ pub enum ScriptToConstellationMessage {
/// A global has started managing broadcast-channels. /// A global has started managing broadcast-channels.
NewBroadcastChannelRouter( NewBroadcastChannelRouter(
BroadcastChannelRouterId, BroadcastChannelRouterId,
IpcSender<BroadcastMsg>, IpcSender<BroadcastChannelMsg>,
ImmutableOrigin, ImmutableOrigin,
), ),
/// A global has stopped managing broadcast-channels. /// A global has stopped managing broadcast-channels.
@ -517,7 +517,7 @@ pub enum ScriptToConstellationMessage {
RemoveBroadcastChannelNameInRouter(BroadcastChannelRouterId, String, ImmutableOrigin), RemoveBroadcastChannelNameInRouter(BroadcastChannelRouterId, String, ImmutableOrigin),
/// Broadcast a message to all same-origin broadcast channels, /// Broadcast a message to all same-origin broadcast channels,
/// excluding the source of the broadcast. /// excluding the source of the broadcast.
ScheduleBroadcast(BroadcastChannelRouterId, BroadcastMsg), ScheduleBroadcast(BroadcastChannelRouterId, BroadcastChannelMsg),
/// Forward a message to the embedder. /// Forward a message to the embedder.
ForwardToEmbedder(EmbedderMsg), ForwardToEmbedder(EmbedderMsg),
/// Broadcast a storage event to every same-origin pipeline. /// Broadcast a storage event to every same-origin pipeline.

View file

@ -74,7 +74,7 @@ impl Serializable {
/// Message for communication between the constellation and a global managing broadcast channels. /// Message for communication between the constellation and a global managing broadcast channels.
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]
pub struct BroadcastMsg { pub struct BroadcastChannelMsg {
/// The origin of this message. /// The origin of this message.
pub origin: ImmutableOrigin, pub origin: ImmutableOrigin,
/// The name of the channel. /// The name of the channel.
@ -83,9 +83,9 @@ pub struct BroadcastMsg {
pub data: StructuredSerializedData, pub data: StructuredSerializedData,
} }
impl Clone for BroadcastMsg { impl Clone for BroadcastChannelMsg {
fn clone(&self) -> BroadcastMsg { fn clone(&self) -> BroadcastChannelMsg {
BroadcastMsg { BroadcastChannelMsg {
data: self.data.clone_for_broadcast(), data: self.data.clone_for_broadcast(),
origin: self.origin.clone(), origin: self.origin.clone(),
channel_name: self.channel_name.clone(), channel_name: self.channel_name.clone(),