Support ReadableByteStreamController in stream piping

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
This commit is contained in:
Taym Haddadi 2025-04-15 22:08:43 +02:00
parent 3babf74986
commit 4ed54eee0e
No known key found for this signature in database

View file

@ -16,6 +16,7 @@ use js::rust::{
MutableHandleValue as SafeMutableHandleValue,
};
use js::typedarray::ArrayBufferViewU8;
use script_bindings::codegen::GenericBindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderMethods;
use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
@ -101,8 +102,7 @@ impl js::gc::Rootable for PipeTo {}
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
struct PipeTo {
/// <https://streams.spec.whatwg.org/#ref-for-readablestream%E2%91%A7%E2%91%A0>
reader: Dom<ReadableStreamDefaultReader>,
reader: ReaderType,
/// <https://streams.spec.whatwg.org/#ref-for-acquire-writable-stream-default-writer>
writer: Dom<WritableStreamDefaultWriter>,
@ -157,7 +157,7 @@ impl Callback for PipeTo {
/// - the state of a stored promise(in some cases).
#[allow(unsafe_code)]
fn callback(&self, cx: SafeJSContext, result: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
let global = self.reader.global();
let global = self.writer.global();
// Note: we only care about the result of writes when they are rejected,
// and the error is accessed not through handlers,
@ -320,7 +320,7 @@ impl PipeTo {
// Note: if the writer is not ready,
// in order to ensure progress we must
// also react to the closure of the source(because source may close empty).
let closed_promise = self.reader.Closed();
let closed_promise = self.reader.closed();
closed_promise.append_native_handler(&handler, realm, can_gc);
}
}
@ -328,7 +328,7 @@ impl PipeTo {
/// Read a chunk
fn read_chunk(&self, global: &GlobalScope, realm: InRealm, can_gc: CanGc) {
*self.state.borrow_mut() = PipeToState::PendingRead;
let chunk_promise = self.reader.Read(can_gc);
let chunk_promise = self.reader.read(can_gc);
let handler = PromiseNativeHandler::new(
global,
Some(Box::new(self.clone())),
@ -657,8 +657,6 @@ impl PipeTo {
// If reader implements ReadableStreamBYOBReader,
// perform ! ReadableStreamBYOBReaderRelease(reader).
// TODO.
// Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader).
self.reader
.release(can_gc)
@ -731,6 +729,8 @@ pub(crate) enum ControllerType {
Default(MutNullableDom<ReadableStreamDefaultController>),
}
impl js::gc::Rootable for ReaderType {}
/// <https://streams.spec.whatwg.org/#readablestream-readerr>
#[derive(JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
@ -742,6 +742,57 @@ pub(crate) enum ReaderType {
Default(MutNullableDom<ReadableStreamDefaultReader>),
}
impl Clone for ReaderType {
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
fn clone(&self) -> Self {
match self {
ReaderType::BYOB(reader) => ReaderType::BYOB(MutNullableDom::new(Some(
&reader.get().expect("Reader should be set"),
))),
ReaderType::Default(reader) => ReaderType::Default(MutNullableDom::new(Some(
&reader.get().expect("Reader should be set"),
))),
}
}
}
impl ReaderType {
/// Get the stream from the reader.
pub(crate) fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
match self {
ReaderType::BYOB(reader) => reader.get().expect("Reader should be set").get_stream(),
ReaderType::Default(reader) => reader.get().expect("Reader should be set").get_stream(),
}
}
/// Release the reader.
pub(crate) fn release(&self, can_gc: CanGc) -> Fallible<()> {
match self {
ReaderType::BYOB(reader) => reader.get().expect("Reader should be set").release(can_gc),
ReaderType::Default(reader) => {
reader.get().expect("Reader should be set").release(can_gc)
},
}
}
// get reader Closed promise
pub(crate) fn closed(&self) -> Rc<Promise> {
match self {
ReaderType::BYOB(reader) => reader.get().expect("Reader should be set").Closed(),
ReaderType::Default(reader) => reader.get().expect("Reader should be set").Closed(),
}
}
// Read
pub(crate) fn read(&self, can_gc: CanGc) -> Rc<Promise> {
match self {
ReaderType::BYOB(_) => {
unreachable!("Native reading of a chunk can only be done with a default reader.")
},
ReaderType::Default(reader) => reader.get().expect("Reader should be set").Read(can_gc),
}
}
}
impl Eq for ReaderType {}
impl PartialEq for ReaderType {
fn eq(&self, other: &Self) -> bool {
@ -1634,17 +1685,6 @@ impl ReadableStream {
// Assert: ! IsWritableStreamLocked(dest) is false.
assert!(!dest.is_locked());
// If source.[[controller]] implements ReadableByteStreamController,
// let reader be either ! AcquireReadableStreamBYOBReader(source)
// or ! AcquireReadableStreamDefaultReader(source),
// at the user agents discretion.
// Note: for now only using default readers.
// Otherwise, let reader be ! AcquireReadableStreamDefaultReader(source).
let reader = self
.acquire_default_reader(can_gc)
.expect("Acquiring a default reader for pipe_to cannot fail");
// Let writer be ! AcquireWritableStreamDefaultWriter(dest).
let writer = dest
.aquire_default_writer(cx, global, can_gc)
@ -1664,7 +1704,22 @@ impl ReadableStream {
// In parallel, but not really, using reader and writer, read all chunks from source and write them to dest.
rooted!(in(*cx) let pipe_to = PipeTo {
reader: Dom::from_ref(&reader),
// If source.[[controller]] implements ReadableByteStreamController,
// let reader be either ! AcquireReadableStreamBYOBReader(source)
// or ! AcquireReadableStreamDefaultReader(source),
// at the user agents discretion.
// Otherwise, let reader be ! AcquireReadableStreamDefaultReader(source).
reader: if self.has_byob_reader() {
let byob_reader = &self
.acquire_byob_reader(can_gc)
.expect("Acquiring a BYOB reader for pipe_to should not fail");
ReaderType::BYOB(MutNullableDom::new(Some(byob_reader)))
} else {
let default_reader = &self
.acquire_default_reader(can_gc)
.expect("Acquiring a default reader for pipe_to should not fail");
ReaderType::Default(MutNullableDom::new(Some(default_reader)))
},
writer: Dom::from_ref(&writer),
pending_writes: Default::default(),
state: Default::default(),