servo/components/script/dom/writablestream.rs
Taym Haddadi f3f4cc5500
Script implement TransformStream and TransformStreamDefaultController (#36739)
Part of https://github.com/servo/servo/issues/34676

---------

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Co-authored-by: gterzian <2792687+gterzian@users.noreply.github.com>
2025-05-08 08:45:57 +00:00

1286 lines
49 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use std::cell::{Cell, RefCell};
use std::collections::{HashMap, VecDeque};
use std::mem;
use std::ptr::{self};
use std::rc::Rc;
use base::id::{MessagePortId, MessagePortIndex};
use constellation_traits::MessagePortImpl;
use dom_struct::dom_struct;
use js::jsapi::{Heap, JSObject};
use js::jsval::{JSVal, ObjectValue, UndefinedValue};
use js::rust::{
HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
MutableHandleValue as SafeMutableHandleValue,
};
use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
use crate::dom::bindings::cell::DomRefCell;
use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSink;
use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods;
use crate::dom::bindings::conversions::ConversionResult;
use crate::dom::bindings::error::{Error, Fallible};
use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
use crate::dom::bindings::structuredclone::StructuredData;
use crate::dom::bindings::transferable::Transferable;
use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
use crate::dom::domexception::{DOMErrorName, DOMException};
use crate::dom::globalscope::GlobalScope;
use crate::dom::messageport::MessagePort;
use crate::dom::promise::Promise;
use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
use crate::dom::readablestream::{ReadableStream, get_type_and_value_from_message};
use crate::dom::writablestreamdefaultcontroller::{
UnderlyingSinkType, WritableStreamDefaultController,
};
use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter;
use crate::js::conversions::ToJSValConvertible;
use crate::realms::{InRealm, enter_realm};
use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
impl js::gc::Rootable for AbortAlgorithmFulfillmentHandler {}
/// The fulfillment handler for the abort steps of
/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
#[derive(JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
struct AbortAlgorithmFulfillmentHandler {
stream: Dom<WritableStream>,
#[ignore_malloc_size_of = "Rc is hard"]
abort_request_promise: Rc<Promise>,
}
impl Callback for AbortAlgorithmFulfillmentHandler {
fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
// Resolve abortRequests promise with undefined.
self.abort_request_promise.resolve_native(&(), can_gc);
// Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
self.stream
.as_rooted()
.reject_close_and_closed_promise_if_needed(cx, can_gc);
}
}
impl js::gc::Rootable for AbortAlgorithmRejectionHandler {}
/// The rejection handler for the abort steps of
/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
#[derive(JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
struct AbortAlgorithmRejectionHandler {
stream: Dom<WritableStream>,
#[ignore_malloc_size_of = "Rc is hard"]
abort_request_promise: Rc<Promise>,
}
impl Callback for AbortAlgorithmRejectionHandler {
fn callback(&self, cx: SafeJSContext, reason: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
// Reject abortRequests promise with reason.
self.abort_request_promise.reject_native(&reason, can_gc);
// Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
self.stream
.as_rooted()
.reject_close_and_closed_promise_if_needed(cx, can_gc);
}
}
impl js::gc::Rootable for PendingAbortRequest {}
/// <https://streams.spec.whatwg.org/#pending-abort-request>
#[derive(JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
struct PendingAbortRequest {
/// <https://streams.spec.whatwg.org/#pending-abort-request-promise>
#[ignore_malloc_size_of = "Rc is hard"]
promise: Rc<Promise>,
/// <https://streams.spec.whatwg.org/#pending-abort-request-reason>
#[ignore_malloc_size_of = "mozjs"]
reason: Box<Heap<JSVal>>,
/// <https://streams.spec.whatwg.org/#pending-abort-request-was-already-erroring>
was_already_erroring: bool,
}
/// <https://streams.spec.whatwg.org/#writablestream-state>
#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf)]
pub(crate) enum WritableStreamState {
#[default]
Writable,
Closed,
Erroring,
Errored,
}
/// <https://streams.spec.whatwg.org/#ws-class>
#[dom_struct]
pub struct WritableStream {
reflector_: Reflector,
/// <https://streams.spec.whatwg.org/#writablestream-backpressure>
backpressure: Cell<bool>,
/// <https://streams.spec.whatwg.org/#writablestream-closerequest>
#[ignore_malloc_size_of = "Rc is hard"]
close_request: DomRefCell<Option<Rc<Promise>>>,
/// <https://streams.spec.whatwg.org/#writablestream-controller>
controller: MutNullableDom<WritableStreamDefaultController>,
/// <https://streams.spec.whatwg.org/#writablestream-detached>
detached: Cell<bool>,
/// <https://streams.spec.whatwg.org/#writablestream-inflightwriterequest>
#[ignore_malloc_size_of = "Rc is hard"]
in_flight_write_request: DomRefCell<Option<Rc<Promise>>>,
/// <https://streams.spec.whatwg.org/#writablestream-inflightcloserequest>
#[ignore_malloc_size_of = "Rc is hard"]
in_flight_close_request: DomRefCell<Option<Rc<Promise>>>,
/// <https://streams.spec.whatwg.org/#writablestream-pendingabortrequest>
pending_abort_request: DomRefCell<Option<PendingAbortRequest>>,
/// <https://streams.spec.whatwg.org/#writablestream-state>
state: Cell<WritableStreamState>,
/// <https://streams.spec.whatwg.org/#writablestream-storederror>
#[ignore_malloc_size_of = "mozjs"]
stored_error: Heap<JSVal>,
/// <https://streams.spec.whatwg.org/#writablestream-writer>
writer: MutNullableDom<WritableStreamDefaultWriter>,
/// <https://streams.spec.whatwg.org/#writablestream-writerequests>
#[ignore_malloc_size_of = "Rc is hard"]
write_requests: DomRefCell<VecDeque<Rc<Promise>>>,
}
impl WritableStream {
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
/// <https://streams.spec.whatwg.org/#initialize-writable-stream>
fn new_inherited() -> WritableStream {
WritableStream {
reflector_: Reflector::new(),
backpressure: Default::default(),
close_request: Default::default(),
controller: Default::default(),
detached: Default::default(),
in_flight_write_request: Default::default(),
in_flight_close_request: Default::default(),
pending_abort_request: Default::default(),
state: Default::default(),
stored_error: Default::default(),
writer: Default::default(),
write_requests: Default::default(),
}
}
pub(crate) fn new_with_proto(
global: &GlobalScope,
proto: Option<SafeHandleObject>,
can_gc: CanGc,
) -> DomRoot<WritableStream> {
reflect_dom_object_with_proto(
Box::new(WritableStream::new_inherited()),
global,
proto,
can_gc,
)
}
/// Used as part of
/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
pub(crate) fn assert_no_controller(&self) {
assert!(self.controller.get().is_none());
}
/// Used as part of
/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
pub(crate) fn set_default_controller(&self, controller: &WritableStreamDefaultController) {
self.controller.set(Some(controller));
}
#[allow(unused)]
pub(crate) fn get_default_controller(&self) -> DomRoot<WritableStreamDefaultController> {
self.controller.get().expect("Controller should be set.")
}
pub(crate) fn is_writable(&self) -> bool {
matches!(self.state.get(), WritableStreamState::Writable)
}
pub(crate) fn is_erroring(&self) -> bool {
matches!(self.state.get(), WritableStreamState::Erroring)
}
pub(crate) fn is_errored(&self) -> bool {
matches!(self.state.get(), WritableStreamState::Errored)
}
pub(crate) fn is_closed(&self) -> bool {
matches!(self.state.get(), WritableStreamState::Closed)
}
pub(crate) fn has_in_flight_write_request(&self) -> bool {
self.in_flight_write_request.borrow().is_some()
}
/// <https://streams.spec.whatwg.org/#writable-stream-has-operation-marked-in-flight>
pub(crate) fn has_operations_marked_inflight(&self) -> bool {
let in_flight_write_requested = self.in_flight_write_request.borrow().is_some();
let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
in_flight_write_requested || in_flight_close_requested
}
/// <https://streams.spec.whatwg.org/#writablestream-storederror>
pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
handle_mut.set(self.stored_error.get());
}
/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
pub(crate) fn finish_erroring(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
// Assert: stream.[[state]] is "erroring".
assert!(self.is_erroring());
// Assert: ! WritableStreamHasOperationMarkedInFlight(stream) is false.
assert!(!self.has_operations_marked_inflight());
// Set stream.[[state]] to "errored".
self.state.set(WritableStreamState::Errored);
// Perform ! stream.[[controller]].[[ErrorSteps]]().
let Some(controller) = self.controller.get() else {
unreachable!("Stream should have a controller.");
};
controller.perform_error_steps();
// Let storedError be stream.[[storedError]].
rooted!(in(*cx) let mut stored_error = UndefinedValue());
self.get_stored_error(stored_error.handle_mut());
// For each writeRequest of stream.[[writeRequests]]:
let write_requests = mem::take(&mut *self.write_requests.borrow_mut());
for request in write_requests {
// Reject writeRequest with storedError.
request.reject(cx, stored_error.handle(), can_gc);
}
// Set stream.[[writeRequests]] to an empty list.
// Done above with `drain`.
// If stream.[[pendingAbortRequest]] is undefined,
if self.pending_abort_request.borrow().is_none() {
// Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
self.reject_close_and_closed_promise_if_needed(cx, can_gc);
// Return.
return;
}
// Let abortRequest be stream.[[pendingAbortRequest]].
// Set stream.[[pendingAbortRequest]] to undefined.
rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
if let Some(pending_abort_request) = &*pending_abort_request {
// If abortRequests was already erroring is true,
if pending_abort_request.was_already_erroring {
// Reject abortRequests promise with storedError.
pending_abort_request
.promise
.reject(cx, stored_error.handle(), can_gc);
// Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
self.reject_close_and_closed_promise_if_needed(cx, can_gc);
// Return.
return;
}
// Let promise be ! stream.[[controller]].[[AbortSteps]](abortRequests reason).
rooted!(in(*cx) let mut reason = UndefinedValue());
reason.set(pending_abort_request.reason.get());
let promise = controller.abort_steps(cx, global, reason.handle(), can_gc);
// Upon fulfillment of promise,
rooted!(in(*cx) let mut fulfillment_handler = Some(AbortAlgorithmFulfillmentHandler {
stream: Dom::from_ref(self),
abort_request_promise: pending_abort_request.promise.clone(),
}));
// Upon rejection of promise with reason r,
rooted!(in(*cx) let mut rejection_handler = Some(AbortAlgorithmRejectionHandler {
stream: Dom::from_ref(self),
abort_request_promise: pending_abort_request.promise.clone(),
}));
let handler = PromiseNativeHandler::new(
global,
fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
rejection_handler.take().map(|h| Box::new(h) as Box<_>),
can_gc,
);
let realm = enter_realm(global);
let comp = InRealm::Entered(&realm);
promise.append_native_handler(&handler, comp, can_gc);
}
}
/// <https://streams.spec.whatwg.org/#writable-stream-reject-close-and-closed-promise-if-needed>
fn reject_close_and_closed_promise_if_needed(&self, cx: SafeJSContext, can_gc: CanGc) {
// Assert: stream.[[state]] is "errored".
assert!(self.is_errored());
rooted!(in(*cx) let mut stored_error = UndefinedValue());
self.get_stored_error(stored_error.handle_mut());
// If stream.[[closeRequest]] is not undefined
let close_request = self.close_request.borrow_mut().take();
if let Some(close_request) = close_request {
// Assert: stream.[[inFlightCloseRequest]] is undefined.
assert!(self.in_flight_close_request.borrow().is_none());
// Reject stream.[[closeRequest]] with stream.[[storedError]].
close_request.reject_native(&stored_error.handle(), can_gc)
// Set stream.[[closeRequest]] to undefined.
// Done with `take` above.
}
// Let writer be stream.[[writer]].
// If writer is not undefined,
if let Some(writer) = self.writer.get() {
// Reject writer.[[closedPromise]] with stream.[[storedError]].
writer.reject_closed_promise_with_stored_error(&stored_error.handle(), can_gc);
// Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
writer.set_close_promise_is_handled();
}
}
/// <https://streams.spec.whatwg.org/#writable-stream-close-queued-or-in-flight>
pub(crate) fn close_queued_or_in_flight(&self) -> bool {
let close_requested = self.close_request.borrow().is_some();
let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
close_requested || in_flight_close_requested
}
/// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write>
pub(crate) fn finish_in_flight_write(&self, can_gc: CanGc) {
let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
// Assert: stream.[[inFlightWriteRequest]] is not undefined.
unreachable!("Stream should have a write request");
};
// Resolve stream.[[inFlightWriteRequest]] with undefined.
in_flight_write_request.resolve_native(&(), can_gc);
// Set stream.[[inFlightWriteRequest]] to undefined.
// Done above with `take`.
}
/// <https://streams.spec.whatwg.org/#writable-stream-start-erroring>
pub(crate) fn start_erroring(
&self,
cx: SafeJSContext,
global: &GlobalScope,
error: SafeHandleValue,
can_gc: CanGc,
) {
// Assert: stream.[[storedError]] is undefined.
assert!(self.stored_error.get().is_undefined());
// Assert: stream.[[state]] is "writable".
assert!(self.is_writable());
// Let controller be stream.[[controller]].
let Some(controller) = self.controller.get() else {
// Assert: controller is not undefined.
unreachable!("Stream should have a controller.");
};
// Set stream.[[state]] to "erroring".
self.state.set(WritableStreamState::Erroring);
// Set stream.[[storedError]] to reason.
self.stored_error.set(*error);
// Let writer be stream.[[writer]].
if let Some(writer) = self.writer.get() {
// If writer is not undefined, perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected
writer.ensure_ready_promise_rejected(global, error, can_gc);
}
// If ! WritableStreamHasOperationMarkedInFlight(stream) is false and controller.[[started]] is true
if !self.has_operations_marked_inflight() && controller.started() {
// perform ! WritableStreamFinishErroring
self.finish_erroring(cx, global, can_gc);
}
}
/// <https://streams.spec.whatwg.org/#writable-stream-deal-with-rejection>
pub(crate) fn deal_with_rejection(
&self,
cx: SafeJSContext,
global: &GlobalScope,
error: SafeHandleValue,
can_gc: CanGc,
) {
// Let state be stream.[[state]].
// If state is "writable",
if self.is_writable() {
// Perform ! WritableStreamStartErroring(stream, error).
self.start_erroring(cx, global, error, can_gc);
// Return.
return;
}
// Assert: state is "erroring".
assert!(self.is_erroring());
// Perform ! WritableStreamFinishErroring(stream).
self.finish_erroring(cx, global, can_gc);
}
/// <https://streams.spec.whatwg.org/#writable-stream-mark-first-write-request-in-flight>
pub(crate) fn mark_first_write_request_in_flight(&self) {
let mut in_flight_write_request = self.in_flight_write_request.borrow_mut();
let mut write_requests = self.write_requests.borrow_mut();
// Assert: stream.[[inFlightWriteRequest]] is undefined.
assert!(in_flight_write_request.is_none());
// Assert: stream.[[writeRequests]] is not empty.
assert!(!write_requests.is_empty());
// Let writeRequest be stream.[[writeRequests]][0].
// Remove writeRequest from stream.[[writeRequests]].
let write_request = write_requests.pop_front().unwrap();
// Set stream.[[inFlightWriteRequest]] to writeRequest.
*in_flight_write_request = Some(write_request);
}
/// <https://streams.spec.whatwg.org/#writable-stream-mark-close-request-in-flight>
pub(crate) fn mark_close_request_in_flight(&self) {
let mut in_flight_close_request = self.in_flight_close_request.borrow_mut();
let mut close_request = self.close_request.borrow_mut();
// Assert: stream.[[inFlightCloseRequest]] is undefined.
assert!(in_flight_close_request.is_none());
// Assert: stream.[[closeRequest]] is not undefined.
assert!(close_request.is_some());
// Let closeRequest be stream.[[closeRequest]].
// Set stream.[[closeRequest]] to undefined.
let close_request = close_request.take().unwrap();
// Set stream.[[inFlightCloseRequest]] to closeRequest.
*in_flight_close_request = Some(close_request);
}
/// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close>
pub(crate) fn finish_in_flight_close(&self, cx: SafeJSContext, can_gc: CanGc) {
let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
// Assert: stream.[[inFlightCloseRequest]] is not undefined.
unreachable!("in_flight_close_request must be Some");
};
// Resolve stream.[[inFlightCloseRequest]] with undefined.
in_flight_close_request.resolve_native(&(), can_gc);
// Set stream.[[inFlightCloseRequest]] to undefined.
// Done with take above.
// Assert: stream.[[state]] is "writable" or "erroring".
assert!(self.is_writable() || self.is_erroring());
// If state is "erroring",
if self.is_erroring() {
// Set stream.[[storedError]] to undefined.
self.stored_error.set(UndefinedValue());
// If stream.[[pendingAbortRequest]] is not undefined,
rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
if let Some(pending_abort_request) = &*pending_abort_request {
// Resolve stream.[[pendingAbortRequest]]'s promise with undefined.
pending_abort_request.promise.resolve_native(&(), can_gc);
// Set stream.[[pendingAbortRequest]] to undefined.
// Done above with `take`.
}
}
// Set stream.[[state]] to "closed".
self.state.set(WritableStreamState::Closed);
// Let writer be stream.[[writer]].
if let Some(writer) = self.writer.get() {
// If writer is not undefined,
// resolve writer.[[closedPromise]] with undefined.
writer.resolve_closed_promise_with_undefined(can_gc);
}
// Assert: stream.[[pendingAbortRequest]] is undefined.
assert!(self.pending_abort_request.borrow().is_none());
// Assert: stream.[[storedError]] is undefined.
assert!(self.stored_error.get().is_undefined());
}
/// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close-with-error>
pub(crate) fn finish_in_flight_close_with_error(
&self,
cx: SafeJSContext,
global: &GlobalScope,
error: SafeHandleValue,
can_gc: CanGc,
) {
let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
// Assert: stream.[[inFlightCloseRequest]] is not undefined.
unreachable!("Inflight close request must be defined.");
};
// Reject stream.[[inFlightCloseRequest]] with error.
in_flight_close_request.reject_native(&error, can_gc);
// Set stream.[[inFlightCloseRequest]] to undefined.
// Done above with `take`.
// Assert: stream.[[state]] is "writable" or "erroring".
assert!(self.is_erroring() || self.is_writable());
// If stream.[[pendingAbortRequest]] is not undefined,
rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
if let Some(pending_abort_request) = &*pending_abort_request {
// Reject stream.[[pendingAbortRequest]]'s promise with error.
pending_abort_request.promise.reject_native(&error, can_gc);
// Set stream.[[pendingAbortRequest]] to undefined.
// Done above with `take`.
}
// Perform ! WritableStreamDealWithRejection(stream, error).
self.deal_with_rejection(cx, global, error, can_gc);
}
/// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write-with-error>
pub(crate) fn finish_in_flight_write_with_error(
&self,
cx: SafeJSContext,
global: &GlobalScope,
error: SafeHandleValue,
can_gc: CanGc,
) {
let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
// Assert: stream.[[inFlightWriteRequest]] is not undefined.
unreachable!("Inflight write request must be defined.");
};
// Reject stream.[[inFlightWriteRequest]] with error.
in_flight_write_request.reject_native(&error, can_gc);
// Set stream.[[inFlightWriteRequest]] to undefined.
// Done above with `take`.
// Assert: stream.[[state]] is "writable" or "erroring".
assert!(self.is_erroring() || self.is_writable());
// Perform ! WritableStreamDealWithRejection(stream, error).
self.deal_with_rejection(cx, global, error, can_gc);
}
pub(crate) fn get_writer(&self) -> Option<DomRoot<WritableStreamDefaultWriter>> {
self.writer.get()
}
pub(crate) fn set_writer(&self, writer: Option<&WritableStreamDefaultWriter>) {
self.writer.set(writer);
}
pub(crate) fn set_backpressure(&self, backpressure: bool) {
self.backpressure.set(backpressure);
}
pub(crate) fn get_backpressure(&self) -> bool {
self.backpressure.get()
}
/// <https://streams.spec.whatwg.org/#is-writable-stream-locked>
pub(crate) fn is_locked(&self) -> bool {
// If stream.[[writer]] is undefined, return false.
// Return true.
self.get_writer().is_some()
}
/// <https://streams.spec.whatwg.org/#writable-stream-add-write-request>
pub(crate) fn add_write_request(&self, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
// Assert: ! IsWritableStreamLocked(stream) is true.
assert!(self.is_locked());
// Assert: stream.[[state]] is "writable".
assert!(self.is_writable());
// Let promise be a new promise.
let promise = Promise::new(global, can_gc);
// Append promise to stream.[[writeRequests]].
self.write_requests.borrow_mut().push_back(promise.clone());
// Return promise.
promise
}
// Returns the rooted controller of the stream, if any.
pub(crate) fn get_controller(&self) -> Option<DomRoot<WritableStreamDefaultController>> {
self.controller.get()
}
/// <https://streams.spec.whatwg.org/#writable-stream-abort>
pub(crate) fn abort(
&self,
cx: SafeJSContext,
global: &GlobalScope,
provided_reason: SafeHandleValue,
can_gc: CanGc,
) -> Rc<Promise> {
// If stream.[[state]] is "closed" or "errored",
if self.is_closed() || self.is_errored() {
// return a promise resolved with undefined.
return Promise::new_resolved(global, cx, (), can_gc);
}
// TODO: Signal abort on stream.[[controller]].[[abortController]] with reason.
// TODO: If state is "closed" or "errored", return a promise resolved with undefined.
// Note: state may have changed because of signal above.
// If stream.[[pendingAbortRequest]] is not undefined,
if self.pending_abort_request.borrow().is_some() {
// return stream.[[pendingAbortRequest]]'s promise.
return self
.pending_abort_request
.borrow()
.as_ref()
.expect("Pending abort request must be Some.")
.promise
.clone();
}
// Assert: state is "writable" or "erroring".
assert!(self.is_writable() || self.is_erroring());
// Let wasAlreadyErroring be false.
let mut was_already_erroring = false;
rooted!(in(*cx) let undefined_reason = UndefinedValue());
// If state is "erroring",
let reason = if self.is_erroring() {
// Set wasAlreadyErroring to true.
was_already_erroring = true;
// Set reason to undefined.
undefined_reason.handle()
} else {
// Use the provided reason.
provided_reason
};
// Let promise be a new promise.
let promise = Promise::new(global, can_gc);
// Set stream.[[pendingAbortRequest]] to a new pending abort request
// whose promise is promise,
// reason is reason,
// and was already erroring is wasAlreadyErroring.
*self.pending_abort_request.borrow_mut() = Some(PendingAbortRequest {
promise: promise.clone(),
reason: Heap::boxed(reason.get()),
was_already_erroring,
});
// If wasAlreadyErroring is false,
if !was_already_erroring {
// perform ! WritableStreamStartErroring(stream, reason)
self.start_erroring(cx, global, reason, can_gc);
}
// Return promise.
promise
}
/// <https://streams.spec.whatwg.org/#writable-stream-close>
pub(crate) fn close(
&self,
cx: SafeJSContext,
global: &GlobalScope,
can_gc: CanGc,
) -> Rc<Promise> {
// Let state be stream.[[state]].
// If state is "closed" or "errored",
if self.is_closed() || self.is_errored() {
// return a promise rejected with a TypeError exception.
let promise = Promise::new(global, can_gc);
promise.reject_error(
Error::Type("Stream is closed or errored.".to_string()),
can_gc,
);
return promise;
}
// Assert: state is "writable" or "erroring".
assert!(self.is_writable() || self.is_erroring());
// Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
assert!(!self.close_queued_or_in_flight());
// Let promise be a new promise.
let promise = Promise::new(global, can_gc);
// Set stream.[[closeRequest]] to promise.
*self.close_request.borrow_mut() = Some(promise.clone());
// Let writer be stream.[[writer]].
// If writer is not undefined,
if let Some(writer) = self.writer.get() {
// and stream.[[backpressure]] is true,
// and state is "writable",
if self.get_backpressure() && self.is_writable() {
// resolve writer.[[readyPromise]] with undefined.
writer.resolve_ready_promise_with_undefined(can_gc);
}
}
// Perform ! WritableStreamDefaultControllerClose(stream.[[controller]]).
let Some(controller) = self.controller.get() else {
unreachable!("Stream must have a controller.");
};
controller.close(cx, global, can_gc);
// Return promise.
promise
}
/// <https://streams.spec.whatwg.org/#writable-stream-default-writer-get-desired-size>
/// Note: implement as a stream method, as opposed to a writer one, for convenience.
pub(crate) fn get_desired_size(&self) -> Option<f64> {
// Let stream be writer.[[stream]].
// Stream is `self`.
// Let state be stream.[[state]].
// If state is "errored" or "erroring", return null.
if self.is_errored() || self.is_erroring() {
return None;
}
// If state is "closed", return 0.
if self.is_closed() {
return Some(0.);
}
let Some(controller) = self.controller.get() else {
unreachable!("Stream must have a controller.");
};
Some(controller.get_desired_size())
}
/// <https://streams.spec.whatwg.org/#acquire-writable-stream-default-writer>
pub(crate) fn aquire_default_writer(
&self,
cx: SafeJSContext,
global: &GlobalScope,
can_gc: CanGc,
) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
// Let writer be a new WritableStreamDefaultWriter object.
let writer = WritableStreamDefaultWriter::new(global, None, can_gc);
// Perform ? SetUpWritableStreamDefaultWriter(writer, stream).
writer.setup(cx, self, can_gc)?;
// Return writer.
Ok(writer)
}
/// <https://streams.spec.whatwg.org/#writable-stream-update-backpressure>
pub(crate) fn update_backpressure(
&self,
backpressure: bool,
global: &GlobalScope,
can_gc: CanGc,
) {
// Assert: stream.[[state]] is "writable".
self.is_writable();
// Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
assert!(!self.close_queued_or_in_flight());
// Let writer be stream.[[writer]].
let writer = self.get_writer();
if writer.is_some() && backpressure != self.get_backpressure() {
// If writer is not undefined
let writer = writer.expect("Writer is some, as per the above check.");
// and backpressure is not stream.[[backpressure]],
if backpressure {
// If backpressure is true, set writer.[[readyPromise]] to a new promise.
let promise = Promise::new(global, can_gc);
writer.set_ready_promise(promise);
} else {
// Otherwise,
// Assert: backpressure is false.
assert!(!backpressure);
// Resolve writer.[[readyPromise]] with undefined.
writer.resolve_ready_promise_with_undefined(can_gc);
}
};
// Set stream.[[backpressure]] to backpressure.
self.set_backpressure(backpressure);
}
/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
pub(crate) fn setup_cross_realm_transform_writable(
&self,
cx: SafeJSContext,
port: &MessagePort,
can_gc: CanGc,
) {
let port_id = port.message_port_id();
let global = self.global();
// Perform ! InitializeWritableStream(stream).
// Done in `new_inherited`.
// Let sizeAlgorithm be an algorithm that returns 1.
// Re-ordered because of the need to pass it to `new`.
let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc);
// Note: other algorithms defined in the controller at call site.
// Let backpressurePromise be a new promise.
let backpressure_promise = Rc::new(RefCell::new(Some(Promise::new(&global, can_gc))));
// Let controller be a new WritableStreamDefaultController.
let controller = WritableStreamDefaultController::new(
&global,
UnderlyingSinkType::Transfer {
backpressure_promise: backpressure_promise.clone(),
port: Dom::from_ref(port),
},
1.0,
size_algorithm,
can_gc,
);
// Add a handler for ports message event with the following steps:
// Add a handler for ports messageerror event with the following steps:
rooted!(in(*cx) let cross_realm_transform_writable = CrossRealmTransformWritable {
controller: Dom::from_ref(&controller),
backpressure_promise: backpressure_promise.clone(),
});
global.note_cross_realm_transform_writable(&cross_realm_transform_writable, port_id);
// Enable ports port message queue.
port.Start(can_gc);
// Perform ! SetUpWritableStreamDefaultController
controller
.setup(cx, &global, self, can_gc)
.expect("Setup for transfer cannot fail");
}
/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink>
#[allow(clippy::too_many_arguments)]
pub(crate) fn setup_from_underlying_sink(
&self,
cx: SafeJSContext,
global: &GlobalScope,
stream: &WritableStream,
underlying_sink_obj: SafeHandleObject,
underlying_sink: &UnderlyingSink,
strategy_hwm: f64,
strategy_size: Rc<QueuingStrategySize>,
can_gc: CanGc,
) -> Result<(), Error> {
// Let controller be a new WritableStreamDefaultController.
// Let startAlgorithm be an algorithm that returns undefined.
// Let writeAlgorithm be an algorithm that returns a promise resolved with undefined.
// Let closeAlgorithm be an algorithm that returns a promise resolved with undefined.
// Let abortAlgorithm be an algorithm that returns a promise resolved with undefined.
// If underlyingSinkDict["start"] exists, then set startAlgorithm to an algorithm which
// returns the result of invoking underlyingSinkDict["start"] with argument
// list « controller », exception behavior "rethrow", and callback this value underlyingSink.
// If underlyingSinkDict["write"] exists, then set writeAlgorithm to an algorithm which
// takes an argument chunk and returns the result of invoking underlyingSinkDict["write"]
// with argument list « chunk, controller » and callback this value underlyingSink.
// If underlyingSinkDict["close"] exists, then set closeAlgorithm to an algorithm which
// returns the result of invoking underlyingSinkDict["close"] with argument
// list «» and callback this value underlyingSink.
// If underlyingSinkDict["abort"] exists, then set abortAlgorithm to an algorithm which
// takes an argument reason and returns the result of invoking underlyingSinkDict["abort"]
// with argument list « reason » and callback this value underlyingSink.
let controller = WritableStreamDefaultController::new(
global,
UnderlyingSinkType::new_js(
underlying_sink.abort.clone(),
underlying_sink.start.clone(),
underlying_sink.close.clone(),
underlying_sink.write.clone(),
),
strategy_hwm,
strategy_size,
can_gc,
);
// Note: this must be done before `setup`,
// otherwise `thisOb` is null in the start callback.
controller.set_underlying_sink_this_object(underlying_sink_obj);
// Perform ? SetUpWritableStreamDefaultController
controller.setup(cx, global, stream, can_gc)
}
}
/// <https://streams.spec.whatwg.org/#create-writable-stream>
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
pub(crate) fn create_writable_stream(
cx: SafeJSContext,
global: &GlobalScope,
writable_high_water_mark: f64,
writable_size_algorithm: Rc<QueuingStrategySize>,
underlying_sink_type: UnderlyingSinkType,
can_gc: CanGc,
) -> Fallible<DomRoot<WritableStream>> {
// Assert: ! IsNonNegativeNumber(highWaterMark) is true.
assert!(writable_high_water_mark >= 0.0);
// Let stream be a new WritableStream.
// Perform ! InitializeWritableStream(stream).
let stream = WritableStream::new_with_proto(global, None, can_gc);
// Let controller be a new WritableStreamDefaultController.
let controller = WritableStreamDefaultController::new(
global,
underlying_sink_type,
writable_high_water_mark,
writable_size_algorithm,
can_gc,
);
// Perform ? SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm,
// closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm).
controller.setup(cx, global, &stream, can_gc)?;
// Return stream.
Ok(stream)
}
impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
/// <https://streams.spec.whatwg.org/#ws-constructor>
fn Constructor(
cx: SafeJSContext,
global: &GlobalScope,
proto: Option<SafeHandleObject>,
can_gc: CanGc,
underlying_sink: Option<*mut JSObject>,
strategy: &QueuingStrategy,
) -> Fallible<DomRoot<WritableStream>> {
// If underlyingSink is missing, set it to null.
rooted!(in(*cx) let underlying_sink_obj = underlying_sink.unwrap_or(ptr::null_mut()));
// Let underlyingSinkDict be underlyingSink,
// converted to an IDL value of type UnderlyingSink.
let underlying_sink_dict = if !underlying_sink_obj.is_null() {
rooted!(in(*cx) let obj_val = ObjectValue(underlying_sink_obj.get()));
match UnderlyingSink::new(cx, obj_val.handle()) {
Ok(ConversionResult::Success(val)) => val,
Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
_ => {
return Err(Error::JSFailed);
},
}
} else {
UnderlyingSink::empty()
};
if !underlying_sink_dict.type_.handle().is_undefined() {
// If underlyingSinkDict["type"] exists, throw a RangeError exception.
return Err(Error::Range("type is set".to_string()));
}
// Perform ! InitializeWritableStream(this).
let stream = WritableStream::new_with_proto(global, proto, can_gc);
// Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
let size_algorithm = extract_size_algorithm(strategy, can_gc);
// Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
// Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink(this, underlyingSink,
// underlyingSinkDict, highWaterMark, sizeAlgorithm).
stream.setup_from_underlying_sink(
cx,
global,
&stream,
underlying_sink_obj.handle(),
&underlying_sink_dict,
high_water_mark,
size_algorithm,
can_gc,
)?;
Ok(stream)
}
/// <https://streams.spec.whatwg.org/#ws-locked>
fn Locked(&self) -> bool {
// Return ! IsWritableStreamLocked(this).
self.is_locked()
}
/// <https://streams.spec.whatwg.org/#ws-abort>
fn Abort(
&self,
cx: SafeJSContext,
reason: SafeHandleValue,
realm: InRealm,
can_gc: CanGc,
) -> Rc<Promise> {
let global = GlobalScope::from_safe_context(cx, realm);
// If ! IsWritableStreamLocked(this) is true,
if self.is_locked() {
// return a promise rejected with a TypeError exception.
let promise = Promise::new(&global, can_gc);
promise.reject_error(Error::Type("Stream is locked.".to_string()), can_gc);
return promise;
}
// Return ! WritableStreamAbort(this, reason).
self.abort(cx, &global, reason, can_gc)
}
/// <https://streams.spec.whatwg.org/#ws-close>
fn Close(&self, realm: InRealm, can_gc: CanGc) -> Rc<Promise> {
let cx = GlobalScope::get_cx();
let global = GlobalScope::from_safe_context(cx, realm);
// If ! IsWritableStreamLocked(this) is true,
if self.is_locked() {
// return a promise rejected with a TypeError exception.
let promise = Promise::new(&global, can_gc);
promise.reject_error(Error::Type("Stream is locked.".to_string()), can_gc);
return promise;
}
// If ! WritableStreamCloseQueuedOrInFlight(this) is true
if self.close_queued_or_in_flight() {
// return a promise rejected with a TypeError exception.
let promise = Promise::new(&global, can_gc);
promise.reject_error(
Error::Type("Stream has closed queued or in-flight".to_string()),
can_gc,
);
return promise;
}
// Return ! WritableStreamClose(this).
self.close(cx, &global, can_gc)
}
/// <https://streams.spec.whatwg.org/#ws-get-writer>
fn GetWriter(
&self,
realm: InRealm,
can_gc: CanGc,
) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
let cx = GlobalScope::get_cx();
let global = GlobalScope::from_safe_context(cx, realm);
// Return ? AcquireWritableStreamDefaultWriter(this).
self.aquire_default_writer(cx, &global, can_gc)
}
}
impl js::gc::Rootable for CrossRealmTransformWritable {}
/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
/// A wrapper to handle `message` and `messageerror` events
/// for the port used by the transfered stream.
#[derive(Clone, JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
pub(crate) struct CrossRealmTransformWritable {
/// The controller used in the algorithm.
controller: Dom<WritableStreamDefaultController>,
/// The `backpressurePromise` used in the algorithm.
#[ignore_malloc_size_of = "Rc is hard"]
backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
}
impl CrossRealmTransformWritable {
/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
/// Add a handler for ports message event with the following steps:
#[allow(unsafe_code)]
pub(crate) fn handle_message(
&self,
cx: SafeJSContext,
global: &GlobalScope,
message: SafeHandleValue,
_realm: InRealm,
can_gc: CanGc,
) {
rooted!(in(*cx) let mut value = UndefinedValue());
let type_string =
unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) };
// If type is "pull",
// Done below as the steps are the same for both types.
// Otherwise, if type is "error",
if type_string == "error" {
// Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, value).
self.controller
.error_if_needed(cx, value.handle(), global, can_gc);
}
let backpressure_promise = self.backpressure_promise.borrow_mut().take();
// Note: the below steps are for both "pull" and "error" types.
// If backpressurePromise is not undefined,
if let Some(promise) = backpressure_promise {
// Resolve backpressurePromise with undefined.
promise.resolve_native(&(), can_gc);
// Set backpressurePromise to undefined.
// Done above with `take`.
}
}
/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
/// Add a handler for ports messageerror event with the following steps:
#[allow(unsafe_code)]
pub(crate) fn handle_error(
&self,
cx: SafeJSContext,
global: &GlobalScope,
port: &MessagePort,
_realm: InRealm,
can_gc: CanGc,
) {
// Let error be a new "DataCloneError" DOMException.
let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc);
rooted!(in(*cx) let mut rooted_error = UndefinedValue());
unsafe { error.to_jsval(*cx, rooted_error.handle_mut()) };
// Perform ! CrossRealmTransformSendError(port, error).
port.cross_realm_transform_send_error(rooted_error.handle(), can_gc);
// Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, error).
self.controller
.error_if_needed(cx, rooted_error.handle(), global, can_gc);
// Disentangle port.
global.disentangle_port(port, can_gc);
}
}
/// <https://streams.spec.whatwg.org/#ws-transfer>
impl Transferable for WritableStream {
type Index = MessagePortIndex;
type Data = MessagePortImpl;
/// <https://streams.spec.whatwg.org/#ref-for-writablestream%E2%91%A0%E2%91%A4>
fn transfer(&self) -> Result<(MessagePortId, MessagePortImpl), ()> {
// If ! IsWritableStreamLocked(value) is true, throw a "DataCloneError" DOMException.
if self.is_locked() {
return Err(());
}
let global = self.global();
let realm = enter_realm(&*global);
let comp = InRealm::Entered(&realm);
let cx = GlobalScope::get_cx();
let can_gc = CanGc::note();
// Let port1 be a new MessagePort in the current Realm.
let port_1 = MessagePort::new(&global, can_gc);
global.track_message_port(&port_1, None);
// Let port2 be a new MessagePort in the current Realm.
let port_2 = MessagePort::new(&global, can_gc);
global.track_message_port(&port_2, None);
// Entangle port1 and port2.
global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
// Let readable be a new ReadableStream in the current Realm.
let readable = ReadableStream::new_with_proto(&global, None, can_gc);
// Perform ! SetUpCrossRealmTransformReadable(readable, port1).
readable.setup_cross_realm_transform_readable(cx, &port_1, can_gc);
// Let promise be ! ReadableStreamPipeTo(readable, value, false, false, false).
let promise = readable.pipe_to(cx, &global, self, false, false, false, comp, can_gc);
// Set promise.[[PromiseIsHandled]] to true.
promise.set_promise_is_handled();
// Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
port_2.transfer()
}
/// <https://streams.spec.whatwg.org/#ref-for-writablestream%E2%91%A0%E2%91%A4>
fn transfer_receive(
owner: &GlobalScope,
id: MessagePortId,
port_impl: MessagePortImpl,
) -> Result<DomRoot<Self>, ()> {
let cx = GlobalScope::get_cx();
let can_gc = CanGc::note();
// Their transfer-receiving steps, given dataHolder and value, are:
// Note: dataHolder is used in `structuredclone.rs`, and value is created here.
let value = WritableStream::new_with_proto(owner, None, can_gc);
// Let deserializedRecord be ! StructuredDeserializeWithTransfer(dataHolder.[[port]], the current Realm).
// Done with the `Deserialize` derive of `MessagePortImpl`.
// Let port be deserializedRecord.[[Deserialized]].
let transferred_port = MessagePort::transfer_receive(owner, id, port_impl)?;
// Perform ! SetUpCrossRealmTransformWritable(value, port).
value.setup_cross_realm_transform_writable(cx, &transferred_port, can_gc);
Ok(value)
}
/// Note: we are relying on the port transfer, so the data returned here are related to the port.
fn serialized_storage<'a>(
data: StructuredData<'a, '_>,
) -> &'a mut Option<HashMap<MessagePortId, Self::Data>> {
match data {
StructuredData::Reader(r) => &mut r.port_impls,
StructuredData::Writer(w) => &mut w.ports,
}
}
}