mirror of
https://github.com/servo/servo.git
synced 2025-08-04 21:20:23 +01:00
MessagePort: implement disentanglement (#36654)
Implement [disentangle](https://html.spec.whatwg.org/multipage/#disentangle) Remove bespoke gc logic which now becomes unnecessary. Adds a wpt test that hits the "disentangle while in transfer" logic. Updates streams code, fixing an error where disentanglement is conditional on an error. Test coverage: there are existing tests in `/webmessaging/message-channels/close-event/explicitly-closed.tentative.window.js` for the no transfer case, and the simple completed transfer case, and this PR adds a test for the more complicated transfer in progress case. Fix https://github.com/servo/servo/issues/36465 --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
This commit is contained in:
parent
c46402e222
commit
af5d665efa
19 changed files with 356 additions and 191 deletions
|
@ -457,8 +457,9 @@ pub(crate) struct ManagedMessagePort {
|
|||
/// and only add them, and ask the constellation to complete the transfer,
|
||||
/// in a subsequent task if the port hasn't been re-transfered.
|
||||
pending: bool,
|
||||
/// Has the port been closed? If closed, it can be dropped and later GC'ed.
|
||||
closed: bool,
|
||||
/// Whether the port has been closed by script in this global,
|
||||
/// so it can be removed.
|
||||
explicitly_closed: bool,
|
||||
/// Note: it may seem strange to use a pair of options, versus for example an enum.
|
||||
/// But it looks like tranform streams will require both of those in their transfer.
|
||||
/// This will be resolved when we reach that point of the implementation.
|
||||
|
@ -546,12 +547,17 @@ impl MessageListener {
|
|||
let mut succeeded = vec![];
|
||||
let mut failed = HashMap::new();
|
||||
|
||||
for (id, buffer) in ports.into_iter() {
|
||||
for (id, info) in ports.into_iter() {
|
||||
if global.is_managing_port(&id) {
|
||||
succeeded.push(id);
|
||||
global.complete_port_transfer(id, buffer);
|
||||
global.complete_port_transfer(
|
||||
id,
|
||||
info.port_message_queue,
|
||||
info.disentangled,
|
||||
CanGc::note()
|
||||
);
|
||||
} else {
|
||||
failed.insert(id, buffer);
|
||||
failed.insert(id, info);
|
||||
}
|
||||
}
|
||||
let _ = global.script_to_constellation_chan().send(
|
||||
|
@ -560,13 +566,21 @@ impl MessageListener {
|
|||
})
|
||||
);
|
||||
},
|
||||
MessagePortMsg::CompletePendingTransfer(port_id, buffer) => {
|
||||
MessagePortMsg::CompletePendingTransfer(port_id, info) => {
|
||||
let context = self.context.clone();
|
||||
self.task_source.queue(task!(complete_pending: move || {
|
||||
let global = context.root();
|
||||
global.complete_port_transfer(port_id, buffer);
|
||||
global.complete_port_transfer(port_id, info.port_message_queue, info.disentangled, CanGc::note());
|
||||
}));
|
||||
},
|
||||
MessagePortMsg::CompleteDisentanglement(port_id) => {
|
||||
let context = self.context.clone();
|
||||
self.task_source
|
||||
.queue(task!(try_complete_disentanglement: move || {
|
||||
let global = context.root();
|
||||
global.try_complete_disentanglement(port_id, CanGc::note());
|
||||
}));
|
||||
},
|
||||
MessagePortMsg::NewTask(port_id, task) => {
|
||||
let context = self.context.clone();
|
||||
self.task_source.queue(task!(process_new_task: move || {
|
||||
|
@ -574,14 +588,6 @@ impl MessageListener {
|
|||
global.route_task_to_port(port_id, task, CanGc::note());
|
||||
}));
|
||||
},
|
||||
MessagePortMsg::RemoveMessagePort(port_id) => {
|
||||
let context = self.context.clone();
|
||||
self.task_source
|
||||
.queue(task!(process_remove_message_port: move || {
|
||||
let global = context.root();
|
||||
global.note_entangled_port_removed(&port_id);
|
||||
}));
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -871,7 +877,13 @@ impl GlobalScope {
|
|||
}
|
||||
|
||||
/// Complete the transfer of a message-port.
|
||||
fn complete_port_transfer(&self, port_id: MessagePortId, tasks: VecDeque<PortMessageTask>) {
|
||||
fn complete_port_transfer(
|
||||
&self,
|
||||
port_id: MessagePortId,
|
||||
tasks: VecDeque<PortMessageTask>,
|
||||
disentangled: bool,
|
||||
can_gc: CanGc,
|
||||
) {
|
||||
let should_start = if let MessagePortState::Managed(_id, message_ports) =
|
||||
&mut *self.message_port_state.borrow_mut()
|
||||
{
|
||||
|
@ -885,6 +897,10 @@ impl GlobalScope {
|
|||
}
|
||||
if let Some(port_impl) = managed_port.port_impl.as_mut() {
|
||||
port_impl.complete_transfer(tasks);
|
||||
if disentangled {
|
||||
port_impl.disentangle();
|
||||
managed_port.dom_port.disentangle();
|
||||
}
|
||||
port_impl.enabled()
|
||||
} else {
|
||||
panic!("managed-port has no port-impl.");
|
||||
|
@ -895,7 +911,45 @@ impl GlobalScope {
|
|||
panic!("complete_port_transfer called for an unknown port.");
|
||||
};
|
||||
if should_start {
|
||||
self.start_message_port(&port_id);
|
||||
self.start_message_port(&port_id, can_gc);
|
||||
}
|
||||
}
|
||||
|
||||
/// The closing of `otherPort`, if it is in a different global.
|
||||
/// <https://html.spec.whatwg.org/multipage/#disentangle>
|
||||
fn try_complete_disentanglement(&self, port_id: MessagePortId, can_gc: CanGc) {
|
||||
let dom_port = if let MessagePortState::Managed(_id, message_ports) =
|
||||
&mut *self.message_port_state.borrow_mut()
|
||||
{
|
||||
let dom_port = if let Some(managed_port) = message_ports.get_mut(&port_id) {
|
||||
if managed_port.pending {
|
||||
unreachable!("CompleteDisentanglement msg received for a pending port.");
|
||||
}
|
||||
let port_impl = managed_port
|
||||
.port_impl
|
||||
.as_mut()
|
||||
.expect("managed-port has no port-impl.");
|
||||
port_impl.disentangle();
|
||||
managed_port.dom_port.as_rooted()
|
||||
} else {
|
||||
// Note: this, and the other return below,
|
||||
// can happen if the port has already been transferred out of this global,
|
||||
// in which case the disentanglement will complete along with the transfer.
|
||||
return;
|
||||
};
|
||||
dom_port
|
||||
} else {
|
||||
return;
|
||||
};
|
||||
|
||||
// Fire an event named close at otherPort.
|
||||
dom_port.upcast().fire_event(atom!("close"), can_gc);
|
||||
|
||||
let res = self.script_to_constellation_chan().send(
|
||||
ScriptToConstellationMessage::DisentanglePorts(port_id, None),
|
||||
);
|
||||
if res.is_err() {
|
||||
warn!("Sending DisentanglePorts failed");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -951,8 +1005,64 @@ impl GlobalScope {
|
|||
}
|
||||
|
||||
/// <https://html.spec.whatwg.org/multipage/#disentangle>
|
||||
pub(crate) fn disentangle_port(&self, _port: &MessagePort) {
|
||||
// TODO: #36465
|
||||
pub(crate) fn disentangle_port(&self, port: &MessagePort, can_gc: CanGc) {
|
||||
let initiator_port = port.message_port_id();
|
||||
// Let otherPort be the MessagePort which initiatorPort was entangled with.
|
||||
let Some(other_port) = port.disentangle() else {
|
||||
// Assert: otherPort exists.
|
||||
// Note: ignoring the assert,
|
||||
// because the streams spec seems to disentangle ports that are disentangled already.
|
||||
return;
|
||||
};
|
||||
|
||||
// Disentangle initiatorPort and otherPort, so that they are no longer entangled or associated with each other.
|
||||
// Note: this is done in part here, and in part at the constellation(if otherPort is in another global).
|
||||
let dom_port = if let MessagePortState::Managed(_id, message_ports) =
|
||||
&mut *self.message_port_state.borrow_mut()
|
||||
{
|
||||
let mut dom_port = None;
|
||||
for port_id in &[initiator_port, &other_port] {
|
||||
match message_ports.get_mut(port_id) {
|
||||
None => {
|
||||
continue;
|
||||
},
|
||||
Some(managed_port) => {
|
||||
let port_impl = managed_port
|
||||
.port_impl
|
||||
.as_mut()
|
||||
.expect("managed-port has no port-impl.");
|
||||
managed_port.dom_port.disentangle();
|
||||
port_impl.disentangle();
|
||||
|
||||
if **port_id == other_port {
|
||||
dom_port = Some(managed_port.dom_port.as_rooted())
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
dom_port
|
||||
} else {
|
||||
panic!("disentangle_port called on a global not managing any ports.");
|
||||
};
|
||||
|
||||
// Fire an event named close at `otherPort`.
|
||||
// Note: done here if the port is managed by the same global as `initialPort`.
|
||||
if let Some(dom_port) = dom_port {
|
||||
dom_port.upcast().fire_event(atom!("close"), can_gc);
|
||||
}
|
||||
|
||||
let chan = self.script_to_constellation_chan().clone();
|
||||
let initiator_port = *initiator_port;
|
||||
self.task_manager()
|
||||
.port_message_queue()
|
||||
.queue(task!(post_message: move || {
|
||||
// Note: we do this in a task to ensure it doesn't affect messages that are still to be routed,
|
||||
// see the task queueing in `post_messageport_msg`.
|
||||
let res = chan.send(ScriptToConstellationMessage::DisentanglePorts(initiator_port, Some(other_port)));
|
||||
if res.is_err() {
|
||||
warn!("Sending DisentanglePorts failed");
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
/// <https://html.spec.whatwg.org/multipage/#entangle>
|
||||
|
@ -984,18 +1094,6 @@ impl GlobalScope {
|
|||
.send(ScriptToConstellationMessage::EntanglePorts(port1, port2));
|
||||
}
|
||||
|
||||
/// Note that the entangled port of `port_id` has been removed in another global.
|
||||
pub(crate) fn note_entangled_port_removed(&self, port_id: &MessagePortId) {
|
||||
// Note: currently this is a no-op,
|
||||
// as we only use the `close` method to manage the local lifecyle of a port.
|
||||
// This could be used as part of lifecyle management to determine a port can be GC'ed.
|
||||
// See https://github.com/servo/servo/issues/25772
|
||||
warn!(
|
||||
"Entangled port of {:?} has been removed in another global",
|
||||
port_id
|
||||
);
|
||||
}
|
||||
|
||||
/// Handle the transfer of a port in the current task.
|
||||
pub(crate) fn mark_port_as_transferred(&self, port_id: &MessagePortId) -> MessagePortImpl {
|
||||
if let MessagePortState::Managed(_id, message_ports) =
|
||||
|
@ -1021,20 +1119,21 @@ impl GlobalScope {
|
|||
}
|
||||
|
||||
/// <https://html.spec.whatwg.org/multipage/#dom-messageport-start>
|
||||
pub(crate) fn start_message_port(&self, port_id: &MessagePortId) {
|
||||
let message_buffer = if let MessagePortState::Managed(_id, message_ports) =
|
||||
pub(crate) fn start_message_port(&self, port_id: &MessagePortId, can_gc: CanGc) {
|
||||
let (message_buffer, dom_port) = if let MessagePortState::Managed(_id, message_ports) =
|
||||
&mut *self.message_port_state.borrow_mut()
|
||||
{
|
||||
match message_ports.get_mut(port_id) {
|
||||
let (message_buffer, dom_port) = match message_ports.get_mut(port_id) {
|
||||
None => panic!("start_message_port called on a unknown port."),
|
||||
Some(managed_port) => {
|
||||
if let Some(port_impl) = managed_port.port_impl.as_mut() {
|
||||
port_impl.start()
|
||||
(port_impl.start(), managed_port.dom_port.as_rooted())
|
||||
} else {
|
||||
panic!("managed-port has no port-impl.");
|
||||
}
|
||||
},
|
||||
}
|
||||
};
|
||||
(message_buffer, dom_port)
|
||||
} else {
|
||||
return warn!("start_message_port called on a global not managing any ports.");
|
||||
};
|
||||
|
@ -1042,6 +1141,18 @@ impl GlobalScope {
|
|||
for task in message_buffer {
|
||||
self.route_task_to_port(*port_id, task, CanGc::note());
|
||||
}
|
||||
if dom_port.disentangled() {
|
||||
// <https://html.spec.whatwg.org/multipage/#disentangle>
|
||||
// Fire an event named close at otherPort.
|
||||
dom_port.upcast().fire_event(atom!("close"), can_gc);
|
||||
|
||||
let res = self.script_to_constellation_chan().send(
|
||||
ScriptToConstellationMessage::DisentanglePorts(*port_id, None),
|
||||
);
|
||||
if res.is_err() {
|
||||
warn!("Sending DisentanglePorts failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1055,7 +1166,7 @@ impl GlobalScope {
|
|||
Some(managed_port) => {
|
||||
if let Some(port_impl) = managed_port.port_impl.as_mut() {
|
||||
port_impl.close();
|
||||
managed_port.closed = true;
|
||||
managed_port.explicitly_closed = true;
|
||||
} else {
|
||||
panic!("managed-port has no port-impl.");
|
||||
}
|
||||
|
@ -1436,12 +1547,7 @@ impl GlobalScope {
|
|||
let to_be_removed: Vec<MessagePortId> = message_ports
|
||||
.iter()
|
||||
.filter_map(|(id, managed_port)| {
|
||||
if managed_port.closed {
|
||||
// Let the constellation know to drop this port and the one it is entangled with,
|
||||
// and to forward this message to the script-process where the entangled is found.
|
||||
let _ = self
|
||||
.script_to_constellation_chan()
|
||||
.send(ScriptToConstellationMessage::RemoveMessagePort(*id));
|
||||
if managed_port.explicitly_closed {
|
||||
Some(*id)
|
||||
} else {
|
||||
None
|
||||
|
@ -1451,6 +1557,9 @@ impl GlobalScope {
|
|||
for id in to_be_removed {
|
||||
message_ports.remove(&id);
|
||||
}
|
||||
// Note: ports are only removed throught explicit closure by script in this global.
|
||||
// TODO: #25772
|
||||
// TODO: remove ports when we can be sure their port message queue is empty(via the constellation).
|
||||
message_ports.is_empty()
|
||||
} else {
|
||||
false
|
||||
|
@ -1581,7 +1690,7 @@ impl GlobalScope {
|
|||
port_impl: Some(port_impl),
|
||||
dom_port: Dom::from_ref(dom_port),
|
||||
pending: true,
|
||||
closed: false,
|
||||
explicitly_closed: false,
|
||||
cross_realm_transform_readable: None,
|
||||
cross_realm_transform_writable: None,
|
||||
},
|
||||
|
@ -1605,7 +1714,7 @@ impl GlobalScope {
|
|||
port_impl: Some(port_impl),
|
||||
dom_port: Dom::from_ref(dom_port),
|
||||
pending: false,
|
||||
closed: false,
|
||||
explicitly_closed: false,
|
||||
cross_realm_transform_readable: None,
|
||||
cross_realm_transform_writable: None,
|
||||
},
|
||||
|
|
|
@ -83,6 +83,20 @@ impl MessagePort {
|
|||
*self.entangled_port.borrow_mut() = Some(other_id);
|
||||
}
|
||||
|
||||
/// <https://html.spec.whatwg.org/multipage/#disentangle>
|
||||
pub(crate) fn disentangle(&self) -> Option<MessagePortId> {
|
||||
// Disentangle initiatorPort and otherPort, so that they are no longer entangled or associated with each other.
|
||||
// Note: called from `disentangle_port` in the global, where the rest happens.
|
||||
self.entangled_port.borrow_mut().take()
|
||||
}
|
||||
|
||||
/// Has the port been disentangled?
|
||||
/// Used when starting the port to fire the `close` event,
|
||||
/// to cover the case of a disentanglement while in transfer.
|
||||
pub(crate) fn disentangled(&self) -> bool {
|
||||
self.entangled_port.borrow().is_none()
|
||||
}
|
||||
|
||||
pub(crate) fn message_port_id(&self) -> &MessagePortId {
|
||||
&self.message_port_id
|
||||
}
|
||||
|
@ -314,20 +328,28 @@ impl MessagePortMethods<crate::DomTypeHolder> for MessagePort {
|
|||
}
|
||||
|
||||
/// <https://html.spec.whatwg.org/multipage/#dom-messageport-start>
|
||||
fn Start(&self) {
|
||||
fn Start(&self, can_gc: CanGc) {
|
||||
if self.detached.get() {
|
||||
return;
|
||||
}
|
||||
self.global().start_message_port(self.message_port_id());
|
||||
self.global()
|
||||
.start_message_port(self.message_port_id(), can_gc);
|
||||
}
|
||||
|
||||
/// <https://html.spec.whatwg.org/multipage/#dom-messageport-close>
|
||||
fn Close(&self) {
|
||||
fn Close(&self, can_gc: CanGc) {
|
||||
if self.detached.get() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Set this's [[Detached]] internal slot value to true.
|
||||
self.detached.set(true);
|
||||
self.global().close_message_port(self.message_port_id());
|
||||
|
||||
let global = self.global();
|
||||
global.close_message_port(self.message_port_id());
|
||||
|
||||
// If this is entangled, disentangle it.
|
||||
global.disentangle_port(self, can_gc);
|
||||
}
|
||||
|
||||
/// <https://html.spec.whatwg.org/multipage/#handler-messageport-onmessage>
|
||||
|
@ -340,15 +362,19 @@ impl MessagePortMethods<crate::DomTypeHolder> for MessagePort {
|
|||
}
|
||||
|
||||
/// <https://html.spec.whatwg.org/multipage/#handler-messageport-onmessage>
|
||||
fn SetOnmessage(&self, listener: Option<Rc<EventHandlerNonNull>>) {
|
||||
fn SetOnmessage(&self, listener: Option<Rc<EventHandlerNonNull>>, can_gc: CanGc) {
|
||||
if self.detached.get() {
|
||||
return;
|
||||
}
|
||||
self.set_onmessage(listener);
|
||||
// Note: we cannot use the event_handler macro, due to the need to start the port.
|
||||
self.global().start_message_port(self.message_port_id());
|
||||
self.global()
|
||||
.start_message_port(self.message_port_id(), can_gc);
|
||||
}
|
||||
|
||||
// <https://html.spec.whatwg.org/multipage/#handler-messageport-onmessageerror>
|
||||
event_handler!(messageerror, GetOnmessageerror, SetOnmessageerror);
|
||||
|
||||
// <https://html.spec.whatwg.org/multipage/#handler-messageport-onclose>
|
||||
event_handler!(close, GetOnclose, SetOnclose);
|
||||
}
|
||||
|
|
|
@ -1825,7 +1825,7 @@ impl ReadableStream {
|
|||
global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);
|
||||
|
||||
// Enable port’s port message queue.
|
||||
port.Start();
|
||||
port.Start(can_gc);
|
||||
|
||||
// Perform ! SetUpReadableStreamDefaultController
|
||||
controller
|
||||
|
@ -2093,7 +2093,7 @@ impl CrossRealmTransformReadable {
|
|||
self.controller.close(can_gc);
|
||||
|
||||
// Disentangle port.
|
||||
global.disentangle_port(port);
|
||||
global.disentangle_port(port, can_gc);
|
||||
}
|
||||
|
||||
// Otherwise, if type is "error",
|
||||
|
@ -2102,7 +2102,7 @@ impl CrossRealmTransformReadable {
|
|||
self.controller.error(value.handle(), can_gc);
|
||||
|
||||
// Disentangle port.
|
||||
global.disentangle_port(port);
|
||||
global.disentangle_port(port, can_gc);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2129,7 +2129,7 @@ impl CrossRealmTransformReadable {
|
|||
self.controller.error(rooted_error.handle(), can_gc);
|
||||
|
||||
// Disentangle port.
|
||||
global.disentangle_port(port);
|
||||
global.disentangle_port(port, can_gc);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -151,7 +151,7 @@ impl UnderlyingSourceContainer {
|
|||
let result = port.pack_and_post_message_handling_error("error", reason, can_gc);
|
||||
|
||||
// Disentangle port.
|
||||
self.global().disentangle_port(port);
|
||||
self.global().disentangle_port(port, can_gc);
|
||||
|
||||
let promise = Promise::new(&self.global(), can_gc);
|
||||
|
||||
|
|
|
@ -893,7 +893,7 @@ impl WritableStream {
|
|||
global.note_cross_realm_transform_writable(&cross_realm_transform_writable, port_id);
|
||||
|
||||
// Enable port’s port message queue.
|
||||
port.Start();
|
||||
port.Start(can_gc);
|
||||
|
||||
// Perform ! SetUpWritableStreamDefaultController
|
||||
controller
|
||||
|
@ -1202,7 +1202,7 @@ impl CrossRealmTransformWritable {
|
|||
.error_if_needed(cx, rooted_error.handle(), global, can_gc);
|
||||
|
||||
// Disentangle port.
|
||||
global.disentangle_port(port);
|
||||
global.disentangle_port(port, can_gc);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -173,11 +173,11 @@ impl Callback for TransferBackPressurePromiseReaction {
|
|||
self.port
|
||||
.pack_and_post_message_handling_error("chunk", chunk.handle(), can_gc);
|
||||
|
||||
// Disentangle port.
|
||||
global.disentangle_port(&self.port);
|
||||
|
||||
// If result is an abrupt completion,
|
||||
if let Err(error) = result {
|
||||
// Disentangle port.
|
||||
global.disentangle_port(&self.port, can_gc);
|
||||
|
||||
// Return a promise rejected with result.[[Value]].
|
||||
self.result_promise.reject_error(error, can_gc);
|
||||
} else {
|
||||
|
@ -609,7 +609,7 @@ impl WritableStreamDefaultController {
|
|||
let result = port.pack_and_post_message_handling_error("error", reason, can_gc);
|
||||
|
||||
// Disentangle port.
|
||||
global.disentangle_port(port);
|
||||
global.disentangle_port(port, can_gc);
|
||||
|
||||
let promise = Promise::new(global, can_gc);
|
||||
|
||||
|
@ -752,7 +752,7 @@ impl WritableStreamDefaultController {
|
|||
.expect("Sending close should not fail.");
|
||||
|
||||
// Disentangle port.
|
||||
global.disentangle_port(port);
|
||||
global.disentangle_port(port, can_gc);
|
||||
|
||||
// Return a promise resolved with undefined.
|
||||
Promise::new_resolved(global, cx, (), can_gc)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue