Make transform stream transferrable (#36905)

Part of https://github.com/servo/servo/issues/34676

#36739 needs to be merged first.

---------

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
This commit is contained in:
Taym Haddadi 2025-05-12 18:02:06 +02:00 committed by GitHub
parent aa4ad0f2be
commit 62569979ff
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 122 additions and 20 deletions

View file

@ -44,7 +44,7 @@ use crate::dom::dompointreadonly::DOMPointReadOnly;
use crate::dom::globalscope::GlobalScope;
use crate::dom::messageport::MessagePort;
use crate::dom::readablestream::ReadableStream;
use crate::dom::types::DOMException;
use crate::dom::types::{DOMException, TransformStream};
use crate::dom::writablestream::WritableStream;
use crate::realms::{AlreadyInRealm, InRealm, enter_realm};
use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
@ -65,6 +65,7 @@ pub(super) enum StructuredCloneTags {
ReadableStream = 0xFFFF8006,
DomException = 0xFFFF8007,
WritableStream = 0xFFFF8008,
TransformStream = 0xFFFF8009,
Max = 0xFFFFFFFF,
}
@ -85,6 +86,7 @@ impl From<TransferrableInterface> for StructuredCloneTags {
TransferrableInterface::MessagePort => StructuredCloneTags::MessagePort,
TransferrableInterface::ReadableStream => StructuredCloneTags::ReadableStream,
TransferrableInterface::WritableStream => StructuredCloneTags::WritableStream,
TransferrableInterface::TransformStream => StructuredCloneTags::TransformStream,
}
}
}
@ -265,6 +267,7 @@ fn receiver_for_type(
TransferrableInterface::MessagePort => receive_object::<MessagePort>,
TransferrableInterface::ReadableStream => receive_object::<ReadableStream>,
TransferrableInterface::WritableStream => receive_object::<WritableStream>,
TransferrableInterface::TransformStream => receive_object::<TransformStream>,
}
}
@ -390,6 +393,7 @@ fn transfer_for_type(val: TransferrableInterface) -> TransferOperation {
TransferrableInterface::MessagePort => try_transfer::<MessagePort>,
TransferrableInterface::ReadableStream => try_transfer::<ReadableStream>,
TransferrableInterface::WritableStream => try_transfer::<WritableStream>,
TransferrableInterface::TransformStream => try_transfer::<TransformStream>,
}
}
@ -438,6 +442,7 @@ unsafe fn can_transfer_for_type(
TransferrableInterface::MessagePort => can_transfer::<MessagePort>(obj, cx),
TransferrableInterface::ReadableStream => can_transfer::<ReadableStream>(obj, cx),
TransferrableInterface::WritableStream => can_transfer::<WritableStream>(obj, cx),
TransferrableInterface::TransformStream => can_transfer::<TransformStream>(obj, cx),
}
}

View file

@ -3,9 +3,12 @@
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use std::cell::Cell;
use std::collections::HashMap;
use std::ptr::{self};
use std::rc::Rc;
use base::id::{MessagePortId, MessagePortIndex};
use constellation_traits::MessagePortImpl;
use dom_struct::dom_struct;
use js::jsapi::{Heap, IsPromiseObject, JSObject};
use js::jsval::{JSVal, ObjectValue, UndefinedValue};
@ -14,6 +17,9 @@ use script_bindings::callback::ExceptionHandling;
use script_bindings::realms::InRealm;
use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
use super::bindings::structuredclone::StructuredData;
use super::bindings::transferable::Transferable;
use super::messageport::MessagePort;
use super::promisenativehandler::Callback;
use super::types::{TransformStreamDefaultController, WritableStream};
use crate::dom::bindings::cell::DomRefCell;
@ -997,3 +1003,103 @@ impl TransformStreamMethods<crate::DomTypeHolder> for TransformStream {
self.writable.get().expect("writable stream is not set")
}
}
/// <https://streams.spec.whatwg.org/#ts-transfer>
impl Transferable for TransformStream {
type Index = MessagePortIndex;
type Data = MessagePortImpl;
fn transfer(&self) -> Result<(MessagePortId, MessagePortImpl), ()> {
let global = self.global();
let realm = enter_realm(&*global);
let comp = InRealm::Entered(&realm);
let cx = GlobalScope::get_cx();
let can_gc = CanGc::note();
// Let readable be value.[[readable]].
let readable = self.get_readable();
// Let writable be value.[[writable]].
let writable = self.get_writable();
// If ! IsReadableStreamLocked(readable) is true, throw a "DataCloneError" DOMException.
if readable.is_locked() {
return Err(());
}
// If ! IsWritableStreamLocked(writable) is true, throw a "DataCloneError" DOMException.
if writable.is_locked() {
return Err(());
}
// Create the shared port pair
let port_1 = MessagePort::new(&global, can_gc);
global.track_message_port(&port_1, None);
let port_2 = MessagePort::new(&global, can_gc);
global.track_message_port(&port_2, None);
global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
// Create a proxy WritableStream wired to port_1
let proxy_writable = WritableStream::new_with_proto(&global, None, can_gc);
proxy_writable.setup_cross_realm_transform_writable(cx, &port_1, can_gc);
// Pipe readable into the proxy writable (→ port_1)
let pipe1 = readable.pipe_to(
cx,
&global,
&proxy_writable,
false,
false,
false,
comp,
can_gc,
);
pipe1.set_promise_is_handled();
// Create a proxy ReadableStream wired to port_1
let proxy_readable = ReadableStream::new_with_proto(&global, None, can_gc);
proxy_readable.setup_cross_realm_transform_readable(cx, &port_1, can_gc);
// Pipe proxy readable (← port_1) into writable
let pipe2 =
proxy_readable.pipe_to(cx, &global, &writable, false, false, false, comp, can_gc);
pipe2.set_promise_is_handled();
// Set dataHolder.[[readable]] to ! StructuredSerializeWithTransfer(readable, « readable »).
// Set dataHolder.[[writable]] to ! StructuredSerializeWithTransfer(writable, « writable »).
port_2.transfer()
}
fn transfer_receive(
owner: &GlobalScope,
id: MessagePortId,
port_impl: MessagePortImpl,
) -> Result<DomRoot<Self>, ()> {
let can_gc = CanGc::note();
// Let readableRecord be ! StructuredDeserializeWithTransfer(dataHolder.[[readable]], the current Realm).
// Set value.[[readable]] to readableRecord.[[Deserialized]].
let readable = ReadableStream::transfer_receive(owner, id, port_impl.clone())?;
// Let writableRecord be ! StructuredDeserializeWithTransfer(dataHolder.[[writable]], the current Realm).
let writable = WritableStream::transfer_receive(owner, id, port_impl)?;
// Set value.[[readable]] to readableRecord.[[Deserialized]].
// Set value.[[writable]] to writableRecord.[[Deserialized]].
// Set value.[[backpressure]], value.[[backpressureChangePromise]], and value.[[controller]] to undefined.
let stream = TransformStream::new_with_proto(owner, None, can_gc);
stream.readable.set(Some(&readable));
stream.writable.set(Some(&writable));
Ok(stream)
}
fn serialized_storage<'a>(
data: StructuredData<'a, '_>,
) -> &'a mut Option<HashMap<MessagePortId, Self::Data>> {
match data {
StructuredData::Reader(r) => &mut r.port_impls,
StructuredData::Writer(w) => &mut w.ports,
}
}
}

View file

@ -149,7 +149,7 @@ pub enum TraversalDirection {
}
/// A task on the <https://html.spec.whatwg.org/multipage/#port-message-queue>
#[derive(Debug, Deserialize, MallocSizeOf, Serialize)]
#[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)]
pub struct PortMessageTask {
/// The origin of this task.
pub origin: ImmutableOrigin,

View file

@ -20,7 +20,7 @@ pub use transferable::*;
/// A data-holder for serialized data and transferred objects.
/// <https://html.spec.whatwg.org/multipage/#structuredserializewithtransfer>
#[derive(Debug, Default, Deserialize, MallocSizeOf, Serialize)]
#[derive(Clone, Debug, Default, Deserialize, MallocSizeOf, Serialize)]
pub struct StructuredSerializedData {
/// Data serialized by SpiderMonkey.
pub serialized: Vec<u8>,
@ -43,6 +43,7 @@ impl StructuredSerializedData {
Transferrable::MessagePort => is_field_empty(&self.ports),
Transferrable::ReadableStream => is_field_empty(&self.ports),
Transferrable::WritableStream => is_field_empty(&self.ports),
Transferrable::TransformStream => is_field_empty(&self.ports),
}
}

View file

@ -88,7 +88,7 @@ impl Clone for BroadcastMsg {
}
/// File-based blob
#[derive(Debug, Deserialize, MallocSizeOf, Serialize)]
#[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)]
pub struct FileBlob {
#[ignore_malloc_size_of = "Uuid are hard(not really)"]
id: Uuid,
@ -164,7 +164,7 @@ impl BroadcastClone for BlobImpl {
}
/// The data backing a DOM Blob.
#[derive(Debug, Deserialize, MallocSizeOf, Serialize)]
#[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)]
pub struct BlobImpl {
/// UUID of the blob.
blob_id: BlobId,
@ -177,7 +177,7 @@ pub struct BlobImpl {
}
/// Different backends of Blob
#[derive(Debug, Deserialize, MallocSizeOf, Serialize)]
#[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)]
pub enum BlobData {
/// File-based blob, whose content lives in the net process
File(FileBlob),

View file

@ -24,9 +24,11 @@ pub enum Transferrable {
ReadableStream,
/// The `WritableStream` interface.
WritableStream,
/// The `TransformStream` interface.
TransformStream,
}
#[derive(Debug, Deserialize, MallocSizeOf, Serialize)]
#[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)]
enum MessagePortState {
/// <https://html.spec.whatwg.org/multipage/#detached>
Detached,
@ -40,7 +42,7 @@ enum MessagePortState {
Disabled(bool),
}
#[derive(Debug, Deserialize, MallocSizeOf, Serialize)]
#[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)]
/// The data and logic backing the DOM managed MessagePort.
pub struct MessagePortImpl {
/// The current state of the port.

View file

@ -1,9 +0,0 @@
[transfer-with-messageport.window.html]
[Transferring a MessagePort with a TransformStream should set `.ports`]
expected: FAIL
[Transferring a MessagePort with a TransformStream should set `.ports`, advanced]
expected: FAIL
[Transferring a MessagePort with multiple streams should set `.ports`]
expected: FAIL

View file

@ -1,6 +1,3 @@
[transform-stream.html]
[window.postMessage should be able to transfer a TransformStream]
expected: FAIL
[piping through transferred transforms should work]
expected: FAIL