Streams: Implement stream pipe-to (#35650)

* implement PipeTo, stub pipe_to

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* define a data structure to manage the piping

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* implement propagation of errors forward and backward, stub shutdown and shutdown with action

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* adding more fine-grain shutdown variants to state

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* implement progagate closing backward and forward

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* implement shutdown and actions

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* implement reading and writing

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* implement shutdown continuation and finalize

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* fix typo

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* add can_gc arguments

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* implement writer close with error propagation

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* move and document wait on pending write

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* more docs

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* write pending reads as part of shutdown

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* turn on piping test suite

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* add comment about using Rust api
improve comment on result
add comment on backpressure

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* fix multiple propagations

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* fix writing of chunks

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* fix error and close propagation
update test expectations

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* fix warnings

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* improve docs
remove redundant logic in pending writes

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* fix clippy

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* remove unnecessary expansion of visibility of enqueued value to_jsval

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* remove unnecessary conditional accessing of streams when propagating states

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* improve docs

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* remove unused result var

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* fix typo

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* remove redundant logic dealing with closed sources with pending writes

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* add doc links for shutdown actions

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* add comments on the need to return early when shutting down before checking close and error states

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* fmt

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* Update test expectations

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* fix can_gc

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

---------

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com>
This commit is contained in:
Gregory Terzian 2025-03-18 19:13:09 +08:00 committed by GitHub
parent 67a5f285ed
commit 8d39d7706a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 1319 additions and 29 deletions

View file

@ -263,7 +263,7 @@ impl WritableStreamDefaultWriter {
}
/// <https://streams.spec.whatwg.org/#writable-stream-default-writer-write>
fn write(
pub(crate) fn write(
&self,
cx: SafeJSContext,
global: &GlobalScope,
@ -377,6 +377,52 @@ impl WritableStreamDefaultWriter {
// Set this.[[stream]] to undefined.
self.stream.set(None);
}
/// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation>
pub(crate) fn close_with_error_propagation(
&self,
cx: SafeJSContext,
global: &GlobalScope,
can_gc: CanGc,
) -> Rc<Promise> {
// Let stream be writer.[[stream]].
let Some(stream) = self.stream.get() else {
// Assert: stream is not undefined.
unreachable!("Stream should be set.");
};
// Let state be stream.[[state]].
// Used via stream method calls.
// If ! WritableStreamCloseQueuedOrInFlight(stream) is true
// or state is "closed",
if stream.close_queued_or_in_flight() || stream.is_closed() {
// return a promise resolved with undefined.
let promise = Promise::new(global, can_gc);
promise.resolve_native(&(), can_gc);
return promise;
}
// If state is "errored",
if stream.is_errored() {
// return a promise rejected with stream.[[storedError]].
rooted!(in(*cx) let mut error = UndefinedValue());
stream.get_stored_error(error.handle_mut());
let promise = Promise::new(global, can_gc);
promise.reject_native(&error.handle(), can_gc);
return promise;
}
// Assert: state is "writable" or "erroring".
assert!(stream.is_writable() || stream.is_erroring());
// Return ! WritableStreamDefaultWriterClose(writer).
self.close(cx, global, can_gc)
}
pub(crate) fn get_stream(&self) -> Option<DomRoot<WritableStream>> {
self.stream.get()
}
}
impl WritableStreamDefaultWriterMethods<crate::DomTypeHolder> for WritableStreamDefaultWriter {