From 4ed54eee0e4c6620126fb5769190ab52eab1732b Mon Sep 17 00:00:00 2001 From: Taym Haddadi Date: Tue, 15 Apr 2025 22:08:43 +0200 Subject: [PATCH 1/2] Support ReadableByteStreamController in stream piping Signed-off-by: Taym Haddadi --- components/script/dom/readablestream.rs | 93 ++++++++++++++++++++----- 1 file changed, 74 insertions(+), 19 deletions(-) diff --git a/components/script/dom/readablestream.rs b/components/script/dom/readablestream.rs index b5048a4644d..a5bd5858651 100644 --- a/components/script/dom/readablestream.rs +++ b/components/script/dom/readablestream.rs @@ -16,6 +16,7 @@ use js::rust::{ MutableHandleValue as SafeMutableHandleValue, }; use js::typedarray::ArrayBufferViewU8; +use script_bindings::codegen::GenericBindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderMethods; use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy; use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{ @@ -101,8 +102,7 @@ impl js::gc::Rootable for PipeTo {} #[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] struct PipeTo { /// - reader: Dom, - + reader: ReaderType, /// writer: Dom, @@ -157,7 +157,7 @@ impl Callback for PipeTo { /// - the state of a stored promise(in some cases). #[allow(unsafe_code)] fn callback(&self, cx: SafeJSContext, result: SafeHandleValue, realm: InRealm, can_gc: CanGc) { - let global = self.reader.global(); + let global = self.writer.global(); // Note: we only care about the result of writes when they are rejected, // and the error is accessed not through handlers, @@ -320,7 +320,7 @@ impl PipeTo { // Note: if the writer is not ready, // in order to ensure progress we must // also react to the closure of the source(because source may close empty). - let closed_promise = self.reader.Closed(); + let closed_promise = self.reader.closed(); closed_promise.append_native_handler(&handler, realm, can_gc); } } @@ -328,7 +328,7 @@ impl PipeTo { /// Read a chunk fn read_chunk(&self, global: &GlobalScope, realm: InRealm, can_gc: CanGc) { *self.state.borrow_mut() = PipeToState::PendingRead; - let chunk_promise = self.reader.Read(can_gc); + let chunk_promise = self.reader.read(can_gc); let handler = PromiseNativeHandler::new( global, Some(Box::new(self.clone())), @@ -657,8 +657,6 @@ impl PipeTo { // If reader implements ReadableStreamBYOBReader, // perform ! ReadableStreamBYOBReaderRelease(reader). - // TODO. - // Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader). self.reader .release(can_gc) @@ -731,6 +729,8 @@ pub(crate) enum ControllerType { Default(MutNullableDom), } +impl js::gc::Rootable for ReaderType {} + /// #[derive(JSTraceable, MallocSizeOf)] #[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] @@ -742,6 +742,57 @@ pub(crate) enum ReaderType { Default(MutNullableDom), } +impl Clone for ReaderType { + #[cfg_attr(crown, allow(crown::unrooted_must_root))] + fn clone(&self) -> Self { + match self { + ReaderType::BYOB(reader) => ReaderType::BYOB(MutNullableDom::new(Some( + &reader.get().expect("Reader should be set"), + ))), + ReaderType::Default(reader) => ReaderType::Default(MutNullableDom::new(Some( + &reader.get().expect("Reader should be set"), + ))), + } + } +} + +impl ReaderType { + /// Get the stream from the reader. + pub(crate) fn get_stream(&self) -> Option> { + match self { + ReaderType::BYOB(reader) => reader.get().expect("Reader should be set").get_stream(), + ReaderType::Default(reader) => reader.get().expect("Reader should be set").get_stream(), + } + } + + /// Release the reader. + pub(crate) fn release(&self, can_gc: CanGc) -> Fallible<()> { + match self { + ReaderType::BYOB(reader) => reader.get().expect("Reader should be set").release(can_gc), + ReaderType::Default(reader) => { + reader.get().expect("Reader should be set").release(can_gc) + }, + } + } + + // get reader Closed promise + pub(crate) fn closed(&self) -> Rc { + match self { + ReaderType::BYOB(reader) => reader.get().expect("Reader should be set").Closed(), + ReaderType::Default(reader) => reader.get().expect("Reader should be set").Closed(), + } + } + + // Read + pub(crate) fn read(&self, can_gc: CanGc) -> Rc { + match self { + ReaderType::BYOB(_) => { + unreachable!("Native reading of a chunk can only be done with a default reader.") + }, + ReaderType::Default(reader) => reader.get().expect("Reader should be set").Read(can_gc), + } + } +} impl Eq for ReaderType {} impl PartialEq for ReaderType { fn eq(&self, other: &Self) -> bool { @@ -1634,17 +1685,6 @@ impl ReadableStream { // Assert: ! IsWritableStreamLocked(dest) is false. assert!(!dest.is_locked()); - // If source.[[controller]] implements ReadableByteStreamController, - // let reader be either ! AcquireReadableStreamBYOBReader(source) - // or ! AcquireReadableStreamDefaultReader(source), - // at the user agent’s discretion. - // Note: for now only using default readers. - - // Otherwise, let reader be ! AcquireReadableStreamDefaultReader(source). - let reader = self - .acquire_default_reader(can_gc) - .expect("Acquiring a default reader for pipe_to cannot fail"); - // Let writer be ! AcquireWritableStreamDefaultWriter(dest). let writer = dest .aquire_default_writer(cx, global, can_gc) @@ -1664,7 +1704,22 @@ impl ReadableStream { // In parallel, but not really, using reader and writer, read all chunks from source and write them to dest. rooted!(in(*cx) let pipe_to = PipeTo { - reader: Dom::from_ref(&reader), + // If source.[[controller]] implements ReadableByteStreamController, + // let reader be either ! AcquireReadableStreamBYOBReader(source) + // or ! AcquireReadableStreamDefaultReader(source), + // at the user agent’s discretion. + // Otherwise, let reader be ! AcquireReadableStreamDefaultReader(source). + reader: if self.has_byob_reader() { + let byob_reader = &self + .acquire_byob_reader(can_gc) + .expect("Acquiring a BYOB reader for pipe_to should not fail"); + ReaderType::BYOB(MutNullableDom::new(Some(byob_reader))) + } else { + let default_reader = &self + .acquire_default_reader(can_gc) + .expect("Acquiring a default reader for pipe_to should not fail"); + ReaderType::Default(MutNullableDom::new(Some(default_reader))) + }, writer: Dom::from_ref(&writer), pending_writes: Default::default(), state: Default::default(), From 3bdcaf1cf62d3d5aaef026f726776e47bb37542b Mon Sep 17 00:00:00 2001 From: Taym Haddadi Date: Tue, 15 Apr 2025 22:44:06 +0200 Subject: [PATCH 2/2] Add pipeTo with different readers type Signed-off-by: Taym Haddadi --- .../mozilla/stream/pipeTo-reader-locks.any.js | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 tests/wpt/mozilla/tests/mozilla/stream/pipeTo-reader-locks.any.js diff --git a/tests/wpt/mozilla/tests/mozilla/stream/pipeTo-reader-locks.any.js b/tests/wpt/mozilla/tests/mozilla/stream/pipeTo-reader-locks.any.js new file mode 100644 index 00000000000..e4ef1799ff0 --- /dev/null +++ b/tests/wpt/mozilla/tests/mozilla/stream/pipeTo-reader-locks.any.js @@ -0,0 +1,33 @@ +'use strict'; + +promise_test(t => { + const rs = new ReadableStream(); + const ws = new WritableStream(); + + rs.getReader(); + + assert_true(rs.locked, 'sanity check: the ReadableStream starts locked'); + assert_false(ws.locked, 'sanity check: the WritableStream does not start locked'); + + return promise_rejects_js(t, TypeError, rs.pipeTo(ws)).then(() => { + assert_false(ws.locked, 'the WritableStream must still be unlocked'); + }); + +}, 'pipeTo must fail if the ReadableStream is locked with a Default reader, and not lock the WritableStream'); + + +promise_test(t => { + const rs = new ReadableStream({ + type: 'bytes', + pull(controller) { + // No chunks are enqueued; the stream remains readable but empty. + } + }); + const ws = new WritableStream(); + rs.getReader({ mode: 'byob' }); + assert_true(rs.locked, 'sanity check: the ReadableStream is locked by the BYOB reader'); + assert_false(ws.locked, 'sanity check: the WritableStream starts unlocked'); + return promise_rejects_js(t, TypeError, rs.pipeTo(ws)).then(() => { + assert_false(ws.locked, 'the WritableStream must remain unlocked after pipeTo rejection'); + }); +}, 'pipeTo must fail if the ReadableStream is locked with a BYOB reader, and must not lock the WritableStream');