Stream implement pipeThrough (#36977)

Part of https://github.com/servo/servo/issues/34676

https://github.com/servo/servo/pull/36905 needs to be merged first.

---------

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
This commit is contained in:
Taym Haddadi 2025-05-20 16:33:22 +02:00 committed by GitHub
parent d8294fa423
commit 5b2305784a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 129 additions and 227 deletions

View file

@ -14,7 +14,7 @@ use base::id::{
};
use constellation_traits::{
BlobImpl, DomException, DomPoint, MessagePortImpl, Serializable as SerializableInterface,
StructuredSerializedData, Transferrable as TransferrableInterface,
StructuredSerializedData, Transferrable as TransferrableInterface, TransformStreamData,
};
use js::gc::RootedVec;
use js::glue::{
@ -517,6 +517,8 @@ pub(crate) struct StructuredDataReader<'a> {
/// used as part of the "transfer-receiving" steps of ports,
/// to produce the DOM ports stored in `message_ports` above.
pub(crate) port_impls: Option<HashMap<MessagePortId, MessagePortImpl>>,
/// A map of transform stream implementations,
pub(crate) transform_streams_port_impls: Option<HashMap<MessagePortId, TransformStreamData>>,
/// A map of blob implementations,
/// used as part of the "deserialize" steps of blobs,
/// to produce the DOM blobs stored in `blobs` above.
@ -535,6 +537,8 @@ pub(crate) struct StructuredDataWriter {
pub(crate) errors: DOMErrorRecord,
/// Transferred ports.
pub(crate) ports: Option<HashMap<MessagePortId, MessagePortImpl>>,
/// Transferred transform streams.
pub(crate) transform_streams_port: Option<HashMap<MessagePortId, TransformStreamData>>,
/// Serialized points.
pub(crate) points: Option<HashMap<DomPointId, DomPoint>>,
/// Serialized exceptions.
@ -591,6 +595,7 @@ pub(crate) fn write(
let data = StructuredSerializedData {
serialized: data,
ports: sc_writer.ports.take(),
transform_streams: sc_writer.transform_streams_port.take(),
points: sc_writer.points.take(),
exceptions: sc_writer.exceptions.take(),
blobs: sc_writer.blobs.take(),
@ -613,6 +618,7 @@ pub(crate) fn read(
let mut sc_reader = StructuredDataReader {
roots,
port_impls: data.ports.take(),
transform_streams_port_impls: data.transform_streams.take(),
blob_impls: data.blobs.take(),
points: data.points.take(),
exceptions: data.exceptions.take(),

View file

@ -24,7 +24,7 @@ use js::typedarray::ArrayBufferViewU8;
use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
StreamPipeOptions,
ReadableWritablePair, StreamPipeOptions,
};
use script_bindings::str::DOMString;
@ -2006,6 +2006,50 @@ impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
can_gc,
)
}
/// <https://streams.spec.whatwg.org/#rs-pipe-through>
fn PipeThrough(
&self,
transform: &ReadableWritablePair,
options: &StreamPipeOptions,
realm: InRealm,
can_gc: CanGc,
) -> Fallible<DomRoot<ReadableStream>> {
let global = self.global();
let cx = GlobalScope::get_cx();
// If ! IsReadableStreamLocked(this) is true, throw a TypeError exception.
if self.is_locked() {
return Err(Error::Type("Source stream is locked".to_owned()));
}
// If ! IsWritableStreamLocked(transform["writable"]) is true, throw a TypeError exception.
if transform.writable.is_locked() {
return Err(Error::Type("Destination stream is locked".to_owned()));
}
// Let signal be options["signal"] if it exists, or undefined otherwise.
// TODO: implement AbortSignal.
// Let promise be ! ReadableStreamPipeTo(this, transform["writable"],
// options["preventClose"], options["preventAbort"], options["preventCancel"], signal).
let promise = self.pipe_to(
cx,
&global,
&transform.writable,
options.preventAbort,
options.preventCancel,
options.preventClose,
realm,
can_gc,
);
// Set promise.[[PromiseIsHandled]] to true.
promise.set_promise_is_handled();
// Return transform["readable"].
Ok(transform.readable.clone())
}
}
#[allow(unsafe_code)]

View file

@ -8,7 +8,7 @@ use std::ptr::{self};
use std::rc::Rc;
use base::id::{MessagePortId, MessagePortIndex};
use constellation_traits::MessagePortImpl;
use constellation_traits::TransformStreamData;
use dom_struct::dom_struct;
use js::jsapi::{Heap, IsPromiseObject, JSObject};
use js::jsval::{JSVal, ObjectValue, UndefinedValue};
@ -1007,9 +1007,9 @@ impl TransformStreamMethods<crate::DomTypeHolder> for TransformStream {
/// <https://streams.spec.whatwg.org/#ts-transfer>
impl Transferable for TransformStream {
type Index = MessagePortIndex;
type Data = MessagePortImpl;
type Data = TransformStreamData;
fn transfer(&self) -> Result<(MessagePortId, MessagePortImpl), ()> {
fn transfer(&self) -> Result<(MessagePortId, TransformStreamData), ()> {
let global = self.global();
let realm = enter_realm(&*global);
let comp = InRealm::Entered(&realm);
@ -1023,73 +1023,85 @@ impl Transferable for TransformStream {
let writable = self.get_writable();
// If ! IsReadableStreamLocked(readable) is true, throw a "DataCloneError" DOMException.
if readable.is_locked() {
return Err(());
}
// If ! IsWritableStreamLocked(writable) is true, throw a "DataCloneError" DOMException.
if writable.is_locked() {
if readable.is_locked() || writable.is_locked() {
return Err(());
}
// Create the shared port pair
let port_1 = MessagePort::new(&global, can_gc);
global.track_message_port(&port_1, None);
let port_2 = MessagePort::new(&global, can_gc);
global.track_message_port(&port_2, None);
global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
// First port pair (readable → proxy writable)
let port1 = MessagePort::new(&global, can_gc);
global.track_message_port(&port1, None);
let port1_peer = MessagePort::new(&global, can_gc);
global.track_message_port(&port1_peer, None);
global.entangle_ports(*port1.message_port_id(), *port1_peer.message_port_id());
let proxy_readable = ReadableStream::new_with_proto(&global, None, can_gc);
proxy_readable.setup_cross_realm_transform_readable(cx, &port1, can_gc);
proxy_readable
.pipe_to(cx, &global, &writable, false, false, false, comp, can_gc)
.set_promise_is_handled();
// Second port pair (proxy readable → writable)
let port2 = MessagePort::new(&global, can_gc);
global.track_message_port(&port2, None);
let port2_peer = MessagePort::new(&global, can_gc);
global.track_message_port(&port2_peer, None);
global.entangle_ports(*port2.message_port_id(), *port2_peer.message_port_id());
// Create a proxy WritableStream wired to port_1
let proxy_writable = WritableStream::new_with_proto(&global, None, can_gc);
proxy_writable.setup_cross_realm_transform_writable(cx, &port_1, can_gc);
proxy_writable.setup_cross_realm_transform_writable(cx, &port2, can_gc);
// Pipe readable into the proxy writable (→ port_1)
let pipe1 = readable.pipe_to(
cx,
&global,
&proxy_writable,
false,
false,
false,
comp,
can_gc,
);
pipe1.set_promise_is_handled();
// Create a proxy ReadableStream wired to port_1
let proxy_readable = ReadableStream::new_with_proto(&global, None, can_gc);
proxy_readable.setup_cross_realm_transform_readable(cx, &port_1, can_gc);
// Pipe proxy readable (← port_1) into writable
let pipe2 =
proxy_readable.pipe_to(cx, &global, &writable, false, false, false, comp, can_gc);
pipe2.set_promise_is_handled();
readable
.pipe_to(
cx,
&global,
&proxy_writable,
false,
false,
false,
comp,
can_gc,
)
.set_promise_is_handled();
// Set dataHolder.[[readable]] to ! StructuredSerializeWithTransfer(readable, « readable »).
// Set dataHolder.[[writable]] to ! StructuredSerializeWithTransfer(writable, « writable »).
port_2.transfer()
Ok((
*port1_peer.message_port_id(),
TransformStreamData {
readable: port1_peer.transfer()?,
writable: port2_peer.transfer()?,
},
))
}
fn transfer_receive(
owner: &GlobalScope,
id: MessagePortId,
port_impl: MessagePortImpl,
_id: MessagePortId,
data: TransformStreamData,
) -> Result<DomRoot<Self>, ()> {
let can_gc = CanGc::note();
let cx = GlobalScope::get_cx();
let port1 = MessagePort::transfer_receive(owner, data.readable.0, data.readable.1)?;
let port2 = MessagePort::transfer_receive(owner, data.writable.0, data.writable.1)?;
// Let readableRecord be ! StructuredDeserializeWithTransfer(dataHolder.[[readable]], the current Realm).
// Set value.[[readable]] to readableRecord.[[Deserialized]].
let readable = ReadableStream::transfer_receive(owner, id, port_impl.clone())?;
// Let writableRecord be ! StructuredDeserializeWithTransfer(dataHolder.[[writable]], the current Realm).
let writable = WritableStream::transfer_receive(owner, id, port_impl)?;
let proxy_readable = ReadableStream::new_with_proto(owner, None, can_gc);
proxy_readable.setup_cross_realm_transform_readable(cx, &port2, can_gc);
let proxy_writable = WritableStream::new_with_proto(owner, None, can_gc);
proxy_writable.setup_cross_realm_transform_writable(cx, &port1, can_gc);
// Set value.[[readable]] to readableRecord.[[Deserialized]].
// Set value.[[writable]] to writableRecord.[[Deserialized]].
// Set value.[[backpressure]], value.[[backpressureChangePromise]], and value.[[controller]] to undefined.
let stream = TransformStream::new_with_proto(owner, None, can_gc);
stream.readable.set(Some(&readable));
stream.writable.set(Some(&writable));
stream.readable.set(Some(&proxy_readable));
stream.writable.set(Some(&proxy_writable));
Ok(stream)
}
@ -1098,8 +1110,8 @@ impl Transferable for TransformStream {
data: StructuredData<'a, '_>,
) -> &'a mut Option<HashMap<MessagePortId, Self::Data>> {
match data {
StructuredData::Reader(r) => &mut r.port_impls,
StructuredData::Writer(w) => &mut w.ports,
StructuredData::Reader(r) => &mut r.transform_streams_port_impls,
StructuredData::Writer(w) => &mut w.transform_streams_port,
}
}
}