servo/components/script/dom/writablestreamdefaultwriter.rs
Gregory Terzian 8d39d7706a
Streams: Implement stream pipe-to (#35650)
* implement PipeTo, stub pipe_to

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* define a data structure to manage the piping

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* implement propagation of errors forward and backward, stub shutdown and shutdown with action

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* adding more fine-grain shutdown variants to state

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* implement progagate closing backward and forward

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* implement shutdown and actions

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* implement reading and writing

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* implement shutdown continuation and finalize

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* fix typo

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* add can_gc arguments

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* implement writer close with error propagation

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* move and document wait on pending write

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* more docs

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* write pending reads as part of shutdown

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* turn on piping test suite

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* add comment about using Rust api
improve comment on result
add comment on backpressure

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* fix multiple propagations

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* fix writing of chunks

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* fix error and close propagation
update test expectations

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* fix warnings

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* improve docs
remove redundant logic in pending writes

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* fix clippy

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* remove unnecessary expansion of visibility of enqueued value to_jsval

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* remove unnecessary conditional accessing of streams when propagating states

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* improve docs

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* remove unused result var

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* fix typo

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* remove redundant logic dealing with closed sources with pending writes

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* add doc links for shutdown actions

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* add comments on the need to return early when shutting down before checking close and error states

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* fmt

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* Update test expectations

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* fix can_gc

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

---------

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com>
2025-03-18 11:13:09 +00:00

558 lines
21 KiB
Rust

/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use std::cell::RefCell;
use std::rc::Rc;
use dom_struct::dom_struct;
use js::jsval::UndefinedValue;
use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriterMethods;
use crate::dom::bindings::error::{Error, ErrorToJsval};
use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
use crate::dom::bindings::root::{DomRoot, MutNullableDom};
use crate::dom::globalscope::GlobalScope;
use crate::dom::promise::Promise;
use crate::dom::writablestream::WritableStream;
use crate::realms::InRealm;
use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
/// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter>
#[dom_struct]
pub struct WritableStreamDefaultWriter {
reflector_: Reflector,
#[ignore_malloc_size_of = "Rc is hard"]
ready_promise: RefCell<Rc<Promise>>,
/// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-closedpromise>
#[ignore_malloc_size_of = "Rc is hard"]
closed_promise: RefCell<Rc<Promise>>,
/// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-stream>
stream: MutNullableDom<WritableStream>,
}
impl WritableStreamDefaultWriter {
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer>
/// The parts that create a new promise.
fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> WritableStreamDefaultWriter {
WritableStreamDefaultWriter {
reflector_: Reflector::new(),
stream: Default::default(),
closed_promise: RefCell::new(Promise::new(global, can_gc)),
ready_promise: RefCell::new(Promise::new(global, can_gc)),
}
}
pub(crate) fn new(
global: &GlobalScope,
proto: Option<SafeHandleObject>,
can_gc: CanGc,
) -> DomRoot<WritableStreamDefaultWriter> {
reflect_dom_object_with_proto(
Box::new(WritableStreamDefaultWriter::new_inherited(global, can_gc)),
global,
proto,
can_gc,
)
}
/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer>
/// Continuing from `new_inherited`, the rest.
pub(crate) fn setup(
&self,
cx: SafeJSContext,
stream: &WritableStream,
can_gc: CanGc,
) -> Result<(), Error> {
// If ! IsWritableStreamLocked(stream) is true, throw a TypeError exception.
if stream.is_locked() {
return Err(Error::Type("Stream is locked".to_string()));
}
// Set writer.[[stream]] to stream.
self.stream.set(Some(stream));
// Set stream.[[writer]] to writer.
stream.set_writer(Some(self));
// Let state be stream.[[state]].
// If state is "writable",
if stream.is_writable() {
// If ! WritableStreamCloseQueuedOrInFlight(stream) is false
// and stream.[[backpressure]] is true,
if !stream.close_queued_or_in_flight() && stream.get_backpressure() {
// set writer.[[readyPromise]] to a new promise.
// Done in `new_inherited`.
} else {
// Otherwise, set writer.[[readyPromise]] to a promise resolved with undefined.
// Note: new promise created in `new_inherited`.
self.ready_promise.borrow().resolve_native(&(), can_gc);
}
// Set writer.[[closedPromise]] to a new promise.
// Done in `new_inherited`.
return Ok(());
}
// Otherwise, if state is "erroring",
if stream.is_erroring() {
rooted!(in(*cx) let mut error = UndefinedValue());
stream.get_stored_error(error.handle_mut());
// Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]].
// Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
// Note: new promise created in `new_inherited`.
let ready_promise = self.ready_promise.borrow();
ready_promise.reject_native(&error.handle(), can_gc);
ready_promise.set_promise_is_handled();
// Set writer.[[closedPromise]] to a new promise.
// Done in `new_inherited`.
return Ok(());
}
// Otherwise, if state is "closed",
if stream.is_closed() {
// Set writer.[[readyPromise]] to a promise resolved with undefined.
// Note: new promise created in `new_inherited`.
self.ready_promise.borrow().resolve_native(&(), can_gc);
// Set writer.[[closedPromise]] to a promise resolved with undefined.
// Note: new promise created in `new_inherited`.
self.closed_promise.borrow().resolve_native(&(), can_gc);
return Ok(());
}
// Otherwise,
// Assert: state is "errored".
assert!(stream.is_errored());
// Let storedError be stream.[[storedError]].
rooted!(in(*cx) let mut error = UndefinedValue());
stream.get_stored_error(error.handle_mut());
// Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]].
// Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
// Note: new promise created in `new_inherited`.
let ready_promise = self.ready_promise.borrow();
ready_promise.reject_native(&error.handle(), can_gc);
ready_promise.set_promise_is_handled();
// Set writer.[[closedPromise]] to a promise rejected with storedError.
// Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
// Note: new promise created in `new_inherited`.
let ready_promise = self.closed_promise.borrow();
ready_promise.reject_native(&error.handle(), can_gc);
ready_promise.set_promise_is_handled();
Ok(())
}
pub(crate) fn reject_closed_promise_with_stored_error(
&self,
error: &SafeHandleValue,
can_gc: CanGc,
) {
self.closed_promise.borrow().reject_native(error, can_gc);
}
pub(crate) fn set_close_promise_is_handled(&self) {
self.closed_promise.borrow().set_promise_is_handled();
}
pub(crate) fn set_ready_promise(&self, promise: Rc<Promise>) {
*self.ready_promise.borrow_mut() = promise;
}
pub(crate) fn resolve_ready_promise_with_undefined(&self, can_gc: CanGc) {
self.ready_promise.borrow().resolve_native(&(), can_gc);
}
pub(crate) fn resolve_closed_promise_with_undefined(&self, can_gc: CanGc) {
self.closed_promise.borrow().resolve_native(&(), can_gc);
}
/// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-ready-promise-rejected>
pub(crate) fn ensure_ready_promise_rejected(
&self,
global: &GlobalScope,
error: SafeHandleValue,
can_gc: CanGc,
) {
let ready_promise = self.ready_promise.borrow().clone();
// If writer.[[readyPromise]].[[PromiseState]] is "pending",
if ready_promise.is_pending() {
// reject writer.[[readyPromise]] with error.
ready_promise.reject_native(&error, can_gc);
// Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
ready_promise.set_promise_is_handled();
} else {
// Otherwise, set writer.[[readyPromise]] to a promise rejected with error.
let promise = Promise::new(global, can_gc);
promise.reject_native(&error, can_gc);
// Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
promise.set_promise_is_handled();
*self.ready_promise.borrow_mut() = promise;
}
}
/// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-closed-promise-rejected>
pub(crate) fn ensure_closed_promise_rejected(
&self,
global: &GlobalScope,
error: SafeHandleValue,
can_gc: CanGc,
) {
let closed_promise = self.closed_promise.borrow().clone();
// If writer.[[closedPromise]].[[PromiseState]] is "pending",
if closed_promise.is_pending() {
// reject writer.[[closedPromise]] with error.
closed_promise.reject_native(&error, can_gc);
// Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
closed_promise.set_promise_is_handled();
} else {
// Otherwise, set writer.[[closedPromise]] to a promise rejected with error.
let promise = Promise::new(global, can_gc);
promise.reject_native(&error, can_gc);
// Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
promise.set_promise_is_handled();
*self.closed_promise.borrow_mut() = promise;
}
}
/// <https://streams.spec.whatwg.org/#writable-stream-default-writer-abort>
fn abort(
&self,
cx: SafeJSContext,
global: &GlobalScope,
reason: SafeHandleValue,
can_gc: CanGc,
) -> Rc<Promise> {
// Let stream be writer.[[stream]].
let Some(stream) = self.stream.get() else {
// Assert: stream is not undefined.
unreachable!("Stream should be set.");
};
// Return ! WritableStreamAbort(stream, reason).
stream.abort(cx, global, reason, can_gc)
}
/// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close>
fn close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
// Let stream be writer.[[stream]].
let Some(stream) = self.stream.get() else {
// Assert: stream is not undefined.
unreachable!("Stream should be set.");
};
// Return ! WritableStreamClose(stream).
stream.close(cx, global, can_gc)
}
/// <https://streams.spec.whatwg.org/#writable-stream-default-writer-write>
pub(crate) fn write(
&self,
cx: SafeJSContext,
global: &GlobalScope,
chunk: SafeHandleValue,
can_gc: CanGc,
) -> Rc<Promise> {
// Let stream be writer.[[stream]].
let Some(stream) = self.stream.get() else {
// Assert: stream is not undefined.
unreachable!("Stream should be set.");
};
// Let controller be stream.[[controller]].
// Note: asserting controller is some.
let Some(controller) = stream.get_controller() else {
unreachable!("Controller should be set.");
};
// Let chunkSize be ! WritableStreamDefaultControllerGetChunkSize(controller, chunk).
let chunk_size = controller.get_chunk_size(cx, global, chunk, can_gc);
// If stream is not equal to writer.[[stream]],
// return a promise rejected with a TypeError exception.
if !self
.stream
.get()
.is_some_and(|current_stream| current_stream == stream)
{
let promise = Promise::new(global, can_gc);
promise.reject_error(
Error::Type("Stream is not equal to writer stream".to_string()),
can_gc,
);
return promise;
}
// Let state be stream.[[state]].
// If state is "errored",
if stream.is_errored() {
// return a promise rejected with stream.[[storedError]].
rooted!(in(*cx) let mut error = UndefinedValue());
stream.get_stored_error(error.handle_mut());
let promise = Promise::new(global, can_gc);
promise.reject_native(&error.handle(), can_gc);
return promise;
}
// If ! WritableStreamCloseQueuedOrInFlight(stream) is true
// or state is "closed",
if stream.close_queued_or_in_flight() || stream.is_closed() {
// return a promise rejected with a TypeError exception
// indicating that the stream is closing or closed
let promise = Promise::new(global, can_gc);
promise.reject_error(
Error::Type("Stream has been closed, or has close queued or in-flight".to_string()),
can_gc,
);
return promise;
}
// If state is "erroring",
if stream.is_erroring() {
// return a promise rejected with stream.[[storedError]].
rooted!(in(*cx) let mut error = UndefinedValue());
stream.get_stored_error(error.handle_mut());
let promise = Promise::new(global, can_gc);
promise.reject_native(&error.handle(), can_gc);
return promise;
}
// Assert: state is "writable".
assert!(stream.is_writable());
// Let promise be ! WritableStreamAddWriteRequest(stream).
let promise = stream.add_write_request(global, can_gc);
// Perform ! WritableStreamDefaultControllerWrite(controller, chunk, chunkSize).
controller.write(cx, global, chunk, chunk_size, can_gc);
// Return promise.
promise
}
/// <https://streams.spec.whatwg.org/#writable-stream-default-writer-release>
pub(crate) fn release(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
// Let stream be this.[[stream]].
let Some(stream) = self.stream.get() else {
// Assert: stream is not undefined.
unreachable!("Stream should be set.");
};
// Assert: stream.[[writer]] is writer.
assert!(stream.get_writer().is_some_and(|writer| &*writer == self));
// Let releasedError be a new TypeError.
let released_error = Error::Type("Writer has been released".to_string());
// Root the js val of the error.
rooted!(in(*cx) let mut error = UndefinedValue());
released_error.to_jsval(cx, global, error.handle_mut(), can_gc);
// Perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError).
self.ensure_ready_promise_rejected(global, error.handle(), can_gc);
// Perform ! WritableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError).
self.ensure_closed_promise_rejected(global, error.handle(), can_gc);
// Set stream.[[writer]] to undefined.
stream.set_writer(None);
// Set this.[[stream]] to undefined.
self.stream.set(None);
}
/// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation>
pub(crate) fn close_with_error_propagation(
&self,
cx: SafeJSContext,
global: &GlobalScope,
can_gc: CanGc,
) -> Rc<Promise> {
// Let stream be writer.[[stream]].
let Some(stream) = self.stream.get() else {
// Assert: stream is not undefined.
unreachable!("Stream should be set.");
};
// Let state be stream.[[state]].
// Used via stream method calls.
// If ! WritableStreamCloseQueuedOrInFlight(stream) is true
// or state is "closed",
if stream.close_queued_or_in_flight() || stream.is_closed() {
// return a promise resolved with undefined.
let promise = Promise::new(global, can_gc);
promise.resolve_native(&(), can_gc);
return promise;
}
// If state is "errored",
if stream.is_errored() {
// return a promise rejected with stream.[[storedError]].
rooted!(in(*cx) let mut error = UndefinedValue());
stream.get_stored_error(error.handle_mut());
let promise = Promise::new(global, can_gc);
promise.reject_native(&error.handle(), can_gc);
return promise;
}
// Assert: state is "writable" or "erroring".
assert!(stream.is_writable() || stream.is_erroring());
// Return ! WritableStreamDefaultWriterClose(writer).
self.close(cx, global, can_gc)
}
pub(crate) fn get_stream(&self) -> Option<DomRoot<WritableStream>> {
self.stream.get()
}
}
impl WritableStreamDefaultWriterMethods<crate::DomTypeHolder> for WritableStreamDefaultWriter {
/// <https://streams.spec.whatwg.org/#default-writer-closed>
fn Closed(&self) -> Rc<Promise> {
// Return this.[[closedPromise]].
return self.closed_promise.borrow().clone();
}
/// <https://streams.spec.whatwg.org/#default-writer-desired-size>
fn GetDesiredSize(&self) -> Result<Option<f64>, Error> {
// If this.[[stream]] is undefined, throw a TypeError exception.
let Some(stream) = self.stream.get() else {
return Err(Error::Type("Stream is undefined".to_string()));
};
// Return ! WritableStreamDefaultWriterGetDesiredSize(this).
Ok(stream.get_desired_size())
}
/// <https://streams.spec.whatwg.org/#default-writer-ready>
fn Ready(&self) -> Rc<Promise> {
// Return this.[[readyPromise]].
return self.ready_promise.borrow().clone();
}
/// <https://streams.spec.whatwg.org/#default-writer-abort>
fn Abort(
&self,
cx: SafeJSContext,
reason: SafeHandleValue,
realm: InRealm,
can_gc: CanGc,
) -> Rc<Promise> {
let global = GlobalScope::from_safe_context(cx, realm);
// If this.[[stream]] is undefined,
if self.stream.get().is_none() {
// return a promise rejected with a TypeError exception.
let promise = Promise::new(&global, can_gc);
promise.reject_error(Error::Type("Stream is undefined".to_string()), can_gc);
return promise;
}
// Return ! WritableStreamDefaultWriterAbort(this, reason).
self.abort(cx, &global, reason, can_gc)
}
/// <https://streams.spec.whatwg.org/#default-writer-close>
fn Close(&self, in_realm: InRealm, can_gc: CanGc) -> Rc<Promise> {
let cx = GlobalScope::get_cx();
let global = GlobalScope::from_safe_context(cx, in_realm);
let promise = Promise::new(&global, can_gc);
// Let stream be this.[[stream]].
let Some(stream) = self.stream.get() else {
// If stream is undefined,
// return a promise rejected with a TypeError exception.
promise.reject_error(Error::Type("Stream is undefined".to_string()), can_gc);
return promise;
};
// If ! WritableStreamCloseQueuedOrInFlight(stream) is true
if stream.close_queued_or_in_flight() {
// return a promise rejected with a TypeError exception.
promise.reject_error(
Error::Type("Stream has closed queued or in-flight".to_string()),
can_gc,
);
return promise;
}
self.close(cx, &global, can_gc)
}
/// <https://streams.spec.whatwg.org/#default-writer-release-lock>
fn ReleaseLock(&self, can_gc: CanGc) {
// Let stream be this.[[stream]].
let Some(stream) = self.stream.get() else {
// If stream is undefined, return.
return;
};
// Assert: stream.[[writer]] is not undefined.
assert!(stream.get_writer().is_some());
let global = self.global();
let cx = GlobalScope::get_cx();
// Perform ! WritableStreamDefaultWriterRelease(this).
self.release(cx, &global, can_gc);
}
/// <https://streams.spec.whatwg.org/#default-writer-write>
fn Write(
&self,
cx: SafeJSContext,
chunk: SafeHandleValue,
realm: InRealm,
can_gc: CanGc,
) -> Rc<Promise> {
let global = GlobalScope::from_safe_context(cx, realm);
// If this.[[stream]] is undefined,
if self.stream.get().is_none() {
// return a promise rejected with a TypeError exception.
let global = GlobalScope::from_safe_context(cx, realm);
let promise = Promise::new(&global, can_gc);
promise.reject_error(Error::Type("Stream is undefined".to_string()), can_gc);
return promise;
}
// Return ! WritableStreamDefaultWriterWrite(this, chunk).
self.write(cx, &global, chunk, can_gc)
}
/// <https://streams.spec.whatwg.org/#default-writer-constructor>
fn Constructor(
global: &GlobalScope,
proto: Option<SafeHandleObject>,
can_gc: CanGc,
stream: &WritableStream,
) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
let writer = WritableStreamDefaultWriter::new(global, proto, can_gc);
let cx = GlobalScope::get_cx();
// Perform ? SetUpWritableStreamDefaultWriter(this, stream).
writer.setup(cx, stream, can_gc)?;
Ok(writer)
}
}