Script implement TransformStream and TransformStreamDefaultController (#36739)

Part of https://github.com/servo/servo/issues/34676

---------

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Co-authored-by: gterzian <2792687+gterzian@users.noreply.github.com>
This commit is contained in:
Taym Haddadi 2025-05-08 10:45:57 +02:00 committed by GitHub
parent d39b9f05ff
commit f3f4cc5500
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
30 changed files with 1784 additions and 48 deletions

View file

@ -631,6 +631,8 @@ pub(crate) mod webgpu;
pub(crate) use self::webgpu::*;
#[cfg(not(feature = "webgpu"))]
pub(crate) mod gpucanvascontext;
pub(crate) mod transformstream;
pub(crate) mod transformstreamdefaultcontroller;
pub(crate) mod wheelevent;
#[allow(dead_code)]
pub(crate) mod window;

View file

@ -383,7 +383,6 @@ impl ReadableStreamDefaultController {
}
/// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
#[allow(unsafe_code)]
pub(crate) fn setup(
&self,
stream: DomRoot<ReadableStream>,
@ -866,7 +865,6 @@ impl ReadableStreamDefaultController {
}
/// <https://streams.spec.whatwg.org/#rs-default-controller-has-backpressure>
#[allow(unused)]
pub(crate) fn has_backpressure(&self) -> bool {
// If ! ReadableStreamDefaultControllerShouldCallPull(controller) is true, return false.
// Otherwise, return true.

View file

@ -80,7 +80,6 @@ pub(crate) trait ReadableStreamGenericReader {
}
/// <https://streams.spec.whatwg.org/#readable-stream-reader-generic-release>
#[allow(unsafe_code)]
fn generic_release(&self, can_gc: CanGc) -> Fallible<()> {
// Let stream be reader.[[stream]].

View file

@ -0,0 +1,999 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use std::cell::Cell;
use std::ptr::{self};
use std::rc::Rc;
use dom_struct::dom_struct;
use js::jsapi::{Heap, IsPromiseObject, JSObject};
use js::jsval::{JSVal, ObjectValue, UndefinedValue};
use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
use script_bindings::callback::ExceptionHandling;
use script_bindings::realms::InRealm;
use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
use super::promisenativehandler::Callback;
use super::types::{TransformStreamDefaultController, WritableStream};
use crate::dom::bindings::cell::DomRefCell;
use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
use crate::dom::bindings::codegen::Bindings::TransformStreamBinding::TransformStreamMethods;
use crate::dom::bindings::codegen::Bindings::TransformerBinding::Transformer;
use crate::dom::bindings::conversions::ConversionResult;
use crate::dom::bindings::error::{Error, Fallible};
use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
use crate::dom::globalscope::GlobalScope;
use crate::dom::promise::Promise;
use crate::dom::readablestream::{ReadableStream, create_readable_stream};
use crate::dom::types::PromiseNativeHandler;
use crate::dom::underlyingsourcecontainer::UnderlyingSourceType;
use crate::dom::writablestream::create_writable_stream;
use crate::dom::writablestreamdefaultcontroller::UnderlyingSinkType;
use crate::realms::enter_realm;
use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
impl js::gc::Rootable for TransformBackPressureChangePromiseFulfillment {}
/// Reacting to backpressureChangePromise as part of
/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
#[derive(JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
struct TransformBackPressureChangePromiseFulfillment {
/// The result of reacting to backpressureChangePromise.
#[ignore_malloc_size_of = "Rc is hard"]
result_promise: Rc<Promise>,
#[ignore_malloc_size_of = "mozjs"]
chunk: Box<Heap<JSVal>>,
/// The writable used in the fulfillment steps
writable: Dom<WritableStream>,
controller: Dom<TransformStreamDefaultController>,
}
impl Callback for TransformBackPressureChangePromiseFulfillment {
/// Reacting to backpressureChangePromise with the following fulfillment steps:
fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
// Let writable be stream.[[writable]].
// Let state be writable.[[state]].
// If state is "erroring", throw writable.[[storedError]].
if self.writable.is_erroring() {
rooted!(in(*cx) let mut error = UndefinedValue());
self.writable.get_stored_error(error.handle_mut());
self.result_promise.reject(cx, error.handle(), can_gc);
return;
}
// Assert: state is "writable".
assert!(self.writable.is_writable());
// Return ! TransformStreamDefaultControllerPerformTransform(controller, chunk).
rooted!(in(*cx) let mut chunk = UndefinedValue());
chunk.set(self.chunk.get());
let transform_result = self
.controller
.transform_stream_default_controller_perform_transform(
cx,
&self.writable.global(),
chunk.handle(),
can_gc,
)
.expect("perform transform failed");
// PerformTransformFulfillment and PerformTransformRejection do not need
// to be rooted because they only contain an Rc.
let handler = PromiseNativeHandler::new(
&self.writable.global(),
Some(Box::new(PerformTransformFulfillment {
result_promise: self.result_promise.clone(),
})),
Some(Box::new(PerformTransformRejection {
result_promise: self.result_promise.clone(),
})),
can_gc,
);
let realm = enter_realm(&*self.writable.global());
let comp = InRealm::Entered(&realm);
transform_result.append_native_handler(&handler, comp, can_gc);
}
}
#[derive(JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
/// Reacting to fulfillment of performTransform as part of
/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
struct PerformTransformFulfillment {
#[ignore_malloc_size_of = "Rc is hard"]
result_promise: Rc<Promise>,
}
impl Callback for PerformTransformFulfillment {
fn callback(&self, _cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
// Fulfilled: resolve the outer promise
self.result_promise.resolve_native(&(), can_gc);
}
}
#[derive(JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
/// Reacting to rejection of performTransform as part of
/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
struct PerformTransformRejection {
#[ignore_malloc_size_of = "Rc is hard"]
result_promise: Rc<Promise>,
}
impl Callback for PerformTransformRejection {
fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
// Stream already errored in perform_transform, just reject result_promise
self.result_promise.reject(cx, v, can_gc);
}
}
#[derive(JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
/// Reacting to rejection of backpressureChangePromise as part of
/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
struct BackpressureChangeRejection {
#[ignore_malloc_size_of = "Rc is hard"]
result_promise: Rc<Promise>,
}
impl Callback for BackpressureChangeRejection {
fn callback(&self, cx: SafeJSContext, reason: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
self.result_promise.reject(cx, reason, can_gc);
}
}
impl js::gc::Rootable for CancelPromiseFulfillment {}
/// Reacting to fulfillment of the cancelpromise as part of
/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
#[derive(JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
struct CancelPromiseFulfillment {
readable: Dom<ReadableStream>,
controller: Dom<TransformStreamDefaultController>,
#[ignore_malloc_size_of = "mozjs"]
reason: Box<Heap<JSVal>>,
}
impl Callback for CancelPromiseFulfillment {
/// Reacting to backpressureChangePromise with the following fulfillment steps:
fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
// If readable.[[state]] is "errored", reject controller.[[finishPromise]] with readable.[[storedError]].
if self.readable.is_errored() {
rooted!(in(*cx) let mut error = UndefinedValue());
self.readable.get_stored_error(error.handle_mut());
self.controller
.get_finish_promise()
.expect("finish promise is not set")
.reject_native(&error.handle(), can_gc);
} else {
// Otherwise:
// Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], reason).
rooted!(in(*cx) let mut reason = UndefinedValue());
reason.set(self.reason.get());
self.readable
.get_default_controller()
.error(reason.handle(), can_gc);
// Resolve controller.[[finishPromise]] with undefined.
self.controller
.get_finish_promise()
.expect("finish promise is not set")
.resolve_native(&(), can_gc);
}
}
}
impl js::gc::Rootable for CancelPromiseRejection {}
/// Reacting to rejection of cancelpromise as part of
/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
#[derive(JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
struct CancelPromiseRejection {
readable: Dom<ReadableStream>,
controller: Dom<TransformStreamDefaultController>,
}
impl Callback for CancelPromiseRejection {
/// Reacting to backpressureChangePromise with the following fulfillment steps:
fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
// Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], r).
self.readable.get_default_controller().error(v, can_gc);
// Reject controller.[[finishPromise]] with r.
self.controller
.get_finish_promise()
.expect("finish promise is not set")
.reject(cx, v, can_gc);
}
}
impl js::gc::Rootable for SourceCancelPromiseFulfillment {}
/// Reacting to fulfillment of the cancelpromise as part of
/// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
#[derive(JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
struct SourceCancelPromiseFulfillment {
writeable: Dom<WritableStream>,
controller: Dom<TransformStreamDefaultController>,
stream: Dom<TransformStream>,
#[ignore_malloc_size_of = "mozjs"]
reason: Box<Heap<JSVal>>,
}
impl Callback for SourceCancelPromiseFulfillment {
/// Reacting to backpressureChangePromise with the following fulfillment steps:
fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
// If cancelPromise was fulfilled, then:
let finish_promise = self
.controller
.get_finish_promise()
.expect("finish promise is not set");
let global = &self.writeable.global();
// If writable.[[state]] is "errored", reject controller.[[finishPromise]] with writable.[[storedError]].
if self.writeable.is_errored() {
rooted!(in(*cx) let mut error = UndefinedValue());
self.writeable.get_stored_error(error.handle_mut());
finish_promise.reject(cx, error.handle(), can_gc);
} else {
// Otherwise:
// Perform ! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], reason).
rooted!(in(*cx) let mut reason = UndefinedValue());
reason.set(self.reason.get());
self.writeable.get_default_controller().error_if_needed(
cx,
reason.handle(),
global,
can_gc,
);
// Perform ! TransformStreamUnblockWrite(stream).
self.stream.unblock_write(global, can_gc);
// Resolve controller.[[finishPromise]] with undefined.
finish_promise.resolve_native(&(), can_gc);
}
}
}
impl js::gc::Rootable for SourceCancelPromiseRejection {}
/// Reacting to rejection of cancelpromise as part of
/// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
#[derive(JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
struct SourceCancelPromiseRejection {
writeable: Dom<WritableStream>,
controller: Dom<TransformStreamDefaultController>,
stream: Dom<TransformStream>,
}
impl Callback for SourceCancelPromiseRejection {
/// Reacting to backpressureChangePromise with the following fulfillment steps:
fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
// Perform ! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], r).
let global = &self.writeable.global();
self.writeable
.get_default_controller()
.error_if_needed(cx, v, global, can_gc);
// Perform ! TransformStreamUnblockWrite(stream).
self.stream.unblock_write(global, can_gc);
// Reject controller.[[finishPromise]] with r.
self.controller
.get_finish_promise()
.expect("finish promise is not set")
.reject(cx, v, can_gc);
}
}
impl js::gc::Rootable for FlushPromiseFulfillment {}
/// Reacting to fulfillment of the flushpromise as part of
/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
#[derive(JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
struct FlushPromiseFulfillment {
readable: Dom<ReadableStream>,
controller: Dom<TransformStreamDefaultController>,
}
impl Callback for FlushPromiseFulfillment {
/// Reacting to flushpromise with the following fulfillment steps:
fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
// If flushPromise was fulfilled, then:
let finish_promise = self
.controller
.get_finish_promise()
.expect("finish promise is not set");
// If readable.[[state]] is "errored", reject controller.[[finishPromise]] with readable.[[storedError]].
if self.readable.is_errored() {
rooted!(in(*cx) let mut error = UndefinedValue());
self.readable.get_stored_error(error.handle_mut());
finish_promise.reject(cx, error.handle(), can_gc);
} else {
// Otherwise:
// Perform ! ReadableStreamDefaultControllerClose(readable.[[controller]]).
self.readable.get_default_controller().close(can_gc);
// Resolve controller.[[finishPromise]] with undefined.
finish_promise.resolve_native(&(), can_gc);
}
}
}
impl js::gc::Rootable for FlushPromiseRejection {}
/// Reacting to rejection of flushpromise as part of
/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
#[derive(JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
struct FlushPromiseRejection {
readable: Dom<ReadableStream>,
controller: Dom<TransformStreamDefaultController>,
}
impl Callback for FlushPromiseRejection {
/// Reacting to flushpromise with the following fulfillment steps:
fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
// If flushPromise was rejected with reason r, then:
// Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], r).
self.readable.get_default_controller().error(v, can_gc);
// Reject controller.[[finishPromise]] with r.
self.controller
.get_finish_promise()
.expect("finish promise is not set")
.reject(cx, v, can_gc);
}
}
/// <https://streams.spec.whatwg.org/#ts-class>
#[dom_struct]
pub struct TransformStream {
reflector_: Reflector,
/// <https://streams.spec.whatwg.org/#transformstream-backpressure>
backpressure: Cell<bool>,
/// <https://streams.spec.whatwg.org/#transformstream-backpressurechangepromise>
#[ignore_malloc_size_of = "Rc is hard"]
backpressure_change_promise: DomRefCell<Option<Rc<Promise>>>,
/// <https://streams.spec.whatwg.org/#transformstream-controller>
controller: MutNullableDom<TransformStreamDefaultController>,
/// <https://streams.spec.whatwg.org/#transformstream-detached>
detached: Cell<bool>,
/// <https://streams.spec.whatwg.org/#transformstream-readable>
readable: MutNullableDom<ReadableStream>,
/// <https://streams.spec.whatwg.org/#transformstream-writable>
writable: MutNullableDom<WritableStream>,
}
impl TransformStream {
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
/// <https://streams.spec.whatwg.org/#initialize-transform-stream>
fn new_inherited() -> TransformStream {
TransformStream {
reflector_: Reflector::new(),
backpressure: Default::default(),
backpressure_change_promise: DomRefCell::new(None),
controller: MutNullableDom::new(None),
detached: Cell::new(false),
readable: MutNullableDom::new(None),
writable: MutNullableDom::new(None),
}
}
pub(crate) fn new_with_proto(
global: &GlobalScope,
proto: Option<SafeHandleObject>,
can_gc: CanGc,
) -> DomRoot<TransformStream> {
reflect_dom_object_with_proto(
Box::new(TransformStream::new_inherited()),
global,
proto,
can_gc,
)
}
pub(crate) fn get_controller(&self) -> DomRoot<TransformStreamDefaultController> {
self.controller.get().expect("controller is not set")
}
pub(crate) fn get_writable(&self) -> DomRoot<WritableStream> {
self.writable.get().expect("writable stream is not set")
}
pub(crate) fn get_readable(&self) -> DomRoot<ReadableStream> {
self.readable.get().expect("readable stream is not set")
}
pub(crate) fn get_backpressure(&self) -> bool {
self.backpressure.get()
}
/// <https://streams.spec.whatwg.org/#initialize-transform-stream>
#[allow(clippy::too_many_arguments)]
fn initialize(
&self,
cx: SafeJSContext,
global: &GlobalScope,
start_promise: Rc<Promise>,
writable_high_water_mark: f64,
writable_size_algorithm: Rc<QueuingStrategySize>,
readable_high_water_mark: f64,
readable_size_algorithm: Rc<QueuingStrategySize>,
can_gc: CanGc,
) -> Fallible<()> {
// Let startAlgorithm be an algorithm that returns startPromise.
// Let writeAlgorithm be the following steps, taking a chunk argument:
// Return ! TransformStreamDefaultSinkWriteAlgorithm(stream, chunk).
// Let abortAlgorithm be the following steps, taking a reason argument:
// Return ! TransformStreamDefaultSinkAbortAlgorithm(stream, reason).
// Let closeAlgorithm be the following steps:
// Return ! TransformStreamDefaultSinkCloseAlgorithm(stream).
// Set stream.[[writable]] to ! CreateWritableStream(startAlgorithm, writeAlgorithm,
// closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm).
// Note: Those steps are implemented using UnderlyingSinkType::Transform.
let writable = create_writable_stream(
cx,
global,
writable_high_water_mark,
writable_size_algorithm,
UnderlyingSinkType::Transform(Dom::from_ref(self), start_promise.clone()),
can_gc,
)?;
self.writable.set(Some(&writable));
// Let pullAlgorithm be the following steps:
// Return ! TransformStreamDefaultSourcePullAlgorithm(stream).
// Let cancelAlgorithm be the following steps, taking a reason argument:
// Return ! TransformStreamDefaultSourceCancelAlgorithm(stream, reason).
// Set stream.[[readable]] to ! CreateReadableStream(startAlgorithm, pullAlgorithm,
// cancelAlgorithm, readableHighWaterMark, readableSizeAlgorithm).
let readable = create_readable_stream(
global,
UnderlyingSourceType::Transform(Dom::from_ref(self), start_promise.clone()),
Some(readable_size_algorithm),
Some(readable_high_water_mark),
can_gc,
);
self.readable.set(Some(&readable));
// Set stream.[[backpressure]] and stream.[[backpressureChangePromise]] to undefined.
// Note: This is done in the constructor.
// Perform ! TransformStreamSetBackpressure(stream, true).
self.set_backpressure(global, true, can_gc);
// Set stream.[[controller]] to undefined.
self.controller.set(None);
Ok(())
}
/// <https://streams.spec.whatwg.org/#transform-stream-set-backpressure>
pub(crate) fn set_backpressure(&self, global: &GlobalScope, backpressure: bool, can_gc: CanGc) {
// Assert: stream.[[backpressure]] is not backpressure.
assert!(self.backpressure.get() != backpressure);
// If stream.[[backpressureChangePromise]] is not undefined, resolve
// stream.[[backpressureChangePromise]] with undefined.
if let Some(promise) = self.backpressure_change_promise.borrow_mut().take() {
promise.resolve_native(&(), can_gc);
}
// Set stream.[[backpressureChangePromise]] to a new promise.;
*self.backpressure_change_promise.borrow_mut() = Some(Promise::new(global, can_gc));
// Set stream.[[backpressure]] to backpressure.
self.backpressure.set(backpressure);
}
/// <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller>
fn set_up_transform_stream_default_controller(
&self,
controller: &TransformStreamDefaultController,
) {
// Assert: stream implements TransformStream.
// Note: this is checked with type.
// Assert: stream.[[controller]] is undefined.
assert!(self.controller.get().is_none());
// Set controller.[[stream]] to stream.
controller.set_stream(self);
// Set stream.[[controller]] to controller.
self.controller.set(Some(controller));
// Set controller.[[transformAlgorithm]] to transformAlgorithm.
// Set controller.[[flushAlgorithm]] to flushAlgorithm.
// Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
// Note: These are set in the constructor.
}
/// <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
fn set_up_transform_stream_default_controller_from_transformer(
&self,
global: &GlobalScope,
transformer_obj: SafeHandleObject,
transformer: &Transformer,
can_gc: CanGc,
) {
// Let controller be a new TransformStreamDefaultController.
let controller = TransformStreamDefaultController::new(global, transformer, can_gc);
// Let transformAlgorithm be the following steps, taking a chunk argument:
// Let result be TransformStreamDefaultControllerEnqueue(controller, chunk).
// If result is an abrupt completion, return a promise rejected with result.[[Value]].
// Otherwise, return a promise resolved with undefined.
// Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
// Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
// If transformerDict["transform"] exists, set transformAlgorithm to an algorithm which
// takes an argument
// chunk and returns the result of invoking transformerDict["transform"] with argument
// list « chunk, controller »
// and callback this value transformer.
// If transformerDict["flush"] exists, set flushAlgorithm to an algorithm which returns
// the result
// of invoking transformerDict["flush"] with argument list « controller » and callback
// this value transformer.
// If transformerDict["cancel"] exists, set cancelAlgorithm to an algorithm which takes an argument
// reason and returns the result of invoking transformerDict["cancel"] with argument list « reason »
// and callback this value transformer.
controller.set_transform_obj(transformer_obj);
// Perform ! SetUpTransformStreamDefaultController(stream, controller,
// transformAlgorithm, flushAlgorithm, cancelAlgorithm).
self.set_up_transform_stream_default_controller(&controller);
}
/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
pub(crate) fn transform_stream_default_sink_write_algorithm(
&self,
cx: SafeJSContext,
global: &GlobalScope,
chunk: SafeHandleValue,
can_gc: CanGc,
) -> Fallible<Rc<Promise>> {
// Assert: stream.[[writable]].[[state]] is "writable".
assert!(self.writable.get().is_some());
// Let controller be stream.[[controller]].
let controller = self.controller.get().expect("controller is not set");
// If stream.[[backpressure]] is true,
if self.backpressure.get() {
// Let backpressureChangePromise be stream.[[backpressureChangePromise]].
let backpressure_change_promise = self.backpressure_change_promise.borrow();
// Assert: backpressureChangePromise is not undefined.
assert!(backpressure_change_promise.is_some());
// Return the result of reacting to backpressureChangePromise with the following fulfillment steps:
let result_promise = Promise::new(global, can_gc);
rooted!(in(*cx) let mut fulfillment_handler = Some(TransformBackPressureChangePromiseFulfillment {
controller: Dom::from_ref(&controller),
writable: Dom::from_ref(&self.writable.get().expect("writable stream")),
chunk: Heap::boxed(chunk.get()),
result_promise: result_promise.clone(),
}));
let handler = PromiseNativeHandler::new(
global,
fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
Some(Box::new(BackpressureChangeRejection {
result_promise: result_promise.clone(),
})),
can_gc,
);
let realm = enter_realm(global);
let comp = InRealm::Entered(&realm);
backpressure_change_promise
.as_ref()
.expect("Promise must be some by now.")
.append_native_handler(&handler, comp, can_gc);
return Ok(result_promise);
}
// Return ! TransformStreamDefaultControllerPerformTransform(controller, chunk).
controller.transform_stream_default_controller_perform_transform(cx, global, chunk, can_gc)
}
/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
pub(crate) fn transform_stream_default_sink_abort_algorithm(
&self,
cx: SafeJSContext,
global: &GlobalScope,
reason: SafeHandleValue,
can_gc: CanGc,
) -> Fallible<Rc<Promise>> {
// Let controller be stream.[[controller]].
let controller = self.controller.get().expect("controller is not set");
// If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
if let Some(finish_promise) = controller.get_finish_promise() {
return Ok(finish_promise);
}
// Let readable be stream.[[readable]].
let readable = self.readable.get().expect("readable stream is not set");
// Let controller.[[finishPromise]] be a new promise.
controller.set_finish_promise(Promise::new(global, can_gc));
// Let cancelPromise be the result of performing controller.[[cancelAlgorithm]], passing reason.
let cancel_promise = controller.perform_cancel(cx, global, reason, can_gc)?;
// Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
controller.clear_algorithms();
// React to cancelPromise:
let handler = PromiseNativeHandler::new(
global,
Some(Box::new(CancelPromiseFulfillment {
readable: Dom::from_ref(&readable),
controller: Dom::from_ref(&controller),
reason: Heap::boxed(reason.get()),
})),
Some(Box::new(CancelPromiseRejection {
readable: Dom::from_ref(&readable),
controller: Dom::from_ref(&controller),
})),
can_gc,
);
let realm = enter_realm(global);
let comp = InRealm::Entered(&realm);
cancel_promise.append_native_handler(&handler, comp, can_gc);
// Return controller.[[finishPromise]].
let finish_promise = controller
.get_finish_promise()
.expect("finish promise is not set");
Ok(finish_promise)
}
/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
pub(crate) fn transform_stream_default_sink_close_algorithm(
&self,
cx: SafeJSContext,
global: &GlobalScope,
can_gc: CanGc,
) -> Fallible<Rc<Promise>> {
// Let controller be stream.[[controller]].
let controller = self
.controller
.get()
.ok_or(Error::Type("controller is not set".to_string()))?;
// If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
if let Some(finish_promise) = controller.get_finish_promise() {
return Ok(finish_promise);
}
// Let readable be stream.[[readable]].
let readable = self
.readable
.get()
.ok_or(Error::Type("readable stream is not set".to_string()))?;
// Let controller.[[finishPromise]] be a new promise.
controller.set_finish_promise(Promise::new(global, can_gc));
// Let flushPromise be the result of performing controller.[[flushAlgorithm]].
let flush_promise = controller.perform_flush(cx, global, can_gc)?;
// Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
controller.clear_algorithms();
// React to flushPromise:
let handler = PromiseNativeHandler::new(
global,
Some(Box::new(FlushPromiseFulfillment {
readable: Dom::from_ref(&readable),
controller: Dom::from_ref(&controller),
})),
Some(Box::new(FlushPromiseRejection {
readable: Dom::from_ref(&readable),
controller: Dom::from_ref(&controller),
})),
can_gc,
);
let realm = enter_realm(global);
let comp = InRealm::Entered(&realm);
flush_promise.append_native_handler(&handler, comp, can_gc);
// Return controller.[[finishPromise]].
let finish_promise = controller
.get_finish_promise()
.expect("finish promise is not set");
Ok(finish_promise)
}
/// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
pub(crate) fn transform_stream_default_source_cancel(
&self,
cx: SafeJSContext,
global: &GlobalScope,
reason: SafeHandleValue,
can_gc: CanGc,
) -> Fallible<Rc<Promise>> {
// Let controller be stream.[[controller]].
let controller = self
.controller
.get()
.ok_or(Error::Type("controller is not set".to_string()))?;
// If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
if let Some(finish_promise) = controller.get_finish_promise() {
return Ok(finish_promise);
}
// Let writable be stream.[[writable]].
let writable = self
.writable
.get()
.ok_or(Error::Type("writable stream is not set".to_string()))?;
// Let controller.[[finishPromise]] be a new promise.
controller.set_finish_promise(Promise::new(global, can_gc));
// Let cancelPromise be the result of performing controller.[[cancelAlgorithm]], passing reason.
let cancel_promise = controller.perform_cancel(cx, global, reason, can_gc)?;
// Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
controller.clear_algorithms();
// React to cancelPromise:
let handler = PromiseNativeHandler::new(
global,
Some(Box::new(SourceCancelPromiseFulfillment {
writeable: Dom::from_ref(&writable),
controller: Dom::from_ref(&controller),
stream: Dom::from_ref(self),
reason: Heap::boxed(reason.get()),
})),
Some(Box::new(SourceCancelPromiseRejection {
writeable: Dom::from_ref(&writable),
controller: Dom::from_ref(&controller),
stream: Dom::from_ref(self),
})),
can_gc,
);
// Return controller.[[finishPromise]].
let finish_promise = controller
.get_finish_promise()
.expect("finish promise is not set");
let realm = enter_realm(global);
let comp = InRealm::Entered(&realm);
cancel_promise.append_native_handler(&handler, comp, can_gc);
Ok(finish_promise)
}
/// <https://streams.spec.whatwg.org/#transform-stream-default-source-pull>
pub(crate) fn transform_stream_default_source_pull(
&self,
global: &GlobalScope,
can_gc: CanGc,
) -> Fallible<Rc<Promise>> {
// Assert: stream.[[backpressure]] is true.
assert!(self.backpressure.get());
// Assert: stream.[[backpressureChangePromise]] is not undefined.
assert!(self.backpressure_change_promise.borrow().is_some());
// Perform ! TransformStreamSetBackpressure(stream, false).
self.set_backpressure(global, false, can_gc);
// Return stream.[[backpressureChangePromise]].
Ok(self
.backpressure_change_promise
.borrow()
.clone()
.expect("Promise must be some by now."))
}
/// <https://streams.spec.whatwg.org/#transform-stream-error-writable-and-unblock-write>
pub(crate) fn error_writable_and_unblock_write(
&self,
cx: SafeJSContext,
global: &GlobalScope,
error: SafeHandleValue,
can_gc: CanGc,
) {
// Perform ! TransformStreamDefaultControllerClearAlgorithms(stream.[[controller]]).
self.get_controller().clear_algorithms();
// Perform ! WritableStreamDefaultControllerErrorIfNeeded(stream.[[writable]].[[controller]], e).
self.get_writable()
.get_default_controller()
.error_if_needed(cx, error, global, can_gc);
// Perform ! TransformStreamUnblockWrite(stream).
self.unblock_write(global, can_gc)
}
/// <https://streams.spec.whatwg.org/#transform-stream-unblock-write>
pub(crate) fn unblock_write(&self, global: &GlobalScope, can_gc: CanGc) {
// If stream.[[backpressure]] is true, perform ! TransformStreamSetBackpressure(stream, false).
if self.backpressure.get() {
self.set_backpressure(global, false, can_gc);
}
}
/// <https://streams.spec.whatwg.org/#transform-stream-error>
pub(crate) fn error(
&self,
cx: SafeJSContext,
global: &GlobalScope,
error: SafeHandleValue,
can_gc: CanGc,
) {
// Perform ! ReadableStreamDefaultControllerError(stream.[[readable]].[[controller]], e).
self.get_readable()
.get_default_controller()
.error(error, can_gc);
// Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, e).
self.error_writable_and_unblock_write(cx, global, error, can_gc);
}
}
impl TransformStreamMethods<crate::DomTypeHolder> for TransformStream {
/// <https://streams.spec.whatwg.org/#ts-constructor>
#[allow(unsafe_code)]
fn Constructor(
cx: SafeJSContext,
global: &GlobalScope,
proto: Option<SafeHandleObject>,
can_gc: CanGc,
transformer: Option<*mut JSObject>,
writable_strategy: &QueuingStrategy,
readable_strategy: &QueuingStrategy,
) -> Fallible<DomRoot<TransformStream>> {
// If transformer is missing, set it to null.
rooted!(in(*cx) let transformer_obj = transformer.unwrap_or(ptr::null_mut()));
// Let underlyingSinkDict be underlyingSink,
// converted to an IDL value of type UnderlyingSink.
let transformer_dict = if !transformer_obj.is_null() {
rooted!(in(*cx) let obj_val = ObjectValue(transformer_obj.get()));
match Transformer::new(cx, obj_val.handle()) {
Ok(ConversionResult::Success(val)) => val,
Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
_ => {
return Err(Error::JSFailed);
},
}
} else {
Transformer::empty()
};
// If transformerDict["readableType"] exists, throw a RangeError exception.
if !transformer_dict.readableType.handle().is_undefined() {
return Err(Error::Range("readableType is set".to_string()));
}
// If transformerDict["writableType"] exists, throw a RangeError exception.
if !transformer_dict.writableType.handle().is_undefined() {
return Err(Error::Range("writableType is set".to_string()));
}
// Let readableHighWaterMark be ? ExtractHighWaterMark(readableStrategy, 0).
let readable_high_water_mark = extract_high_water_mark(readable_strategy, 0.0)?;
// Let readableSizeAlgorithm be ! ExtractSizeAlgorithm(readableStrategy).
let readable_size_algorithm = extract_size_algorithm(readable_strategy, can_gc);
// Let writableHighWaterMark be ? ExtractHighWaterMark(writableStrategy, 1).
let writable_high_water_mark = extract_high_water_mark(writable_strategy, 1.0)?;
// Let writableSizeAlgorithm be ! ExtractSizeAlgorithm(writableStrategy).
let writable_size_algorithm = extract_size_algorithm(writable_strategy, can_gc);
// Let startPromise be a new promise.
let start_promise = Promise::new(global, can_gc);
// Perform ! InitializeTransformStream(this, startPromise, writableHighWaterMark,
// writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm).
let stream = TransformStream::new_with_proto(global, proto, can_gc);
stream.initialize(
cx,
global,
start_promise.clone(),
writable_high_water_mark,
writable_size_algorithm,
readable_high_water_mark,
readable_size_algorithm,
can_gc,
)?;
// Perform ? SetUpTransformStreamDefaultControllerFromTransformer(this, transformer, transformerDict).
stream.set_up_transform_stream_default_controller_from_transformer(
global,
transformer_obj.handle(),
&transformer_dict,
can_gc,
);
// If transformerDict["start"] exists, then resolve startPromise with the
// result of invoking transformerDict["start"]
// with argument list « this.[[controller]] » and callback this value transformer.
if let Some(start) = &transformer_dict.start {
rooted!(in(*cx) let mut result_object = ptr::null_mut::<JSObject>());
rooted!(in(*cx) let mut result: JSVal);
rooted!(in(*cx) let this_object = transformer_obj.get());
start.Call_(
&this_object.handle(),
&stream.get_controller(),
result.handle_mut(),
ExceptionHandling::Rethrow,
can_gc,
)?;
let is_promise = unsafe {
if result.is_object() {
result_object.set(result.to_object());
IsPromiseObject(result_object.handle().into_handle())
} else {
false
}
};
let promise = if is_promise {
let promise = Promise::new_with_js_promise(result_object.handle(), cx);
promise
} else {
Promise::new_resolved(global, cx, result.get(), can_gc)
};
start_promise.resolve_native(&promise, can_gc);
} else {
// Otherwise, resolve startPromise with undefined.
start_promise.resolve_native(&(), can_gc);
};
Ok(stream)
}
/// <https://streams.spec.whatwg.org/#ts-readable>
fn Readable(&self) -> DomRoot<ReadableStream> {
// Return this.[[readable]].
self.readable.get().expect("readable stream is not set")
}
/// <https://streams.spec.whatwg.org/#ts-writable>
fn Writable(&self) -> DomRoot<WritableStream> {
// Return this.[[writable]].
self.writable.get().expect("writable stream is not set")
}
}

View file

@ -0,0 +1,420 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use std::cell::RefCell;
use std::rc::Rc;
use dom_struct::dom_struct;
use js::jsapi::{
ExceptionStackBehavior, Heap, JS_IsExceptionPending, JS_SetPendingException, JSObject,
};
use js::jsval::UndefinedValue;
use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
use super::bindings::cell::DomRefCell;
use super::bindings::codegen::Bindings::TransformerBinding::{
Transformer, TransformerCancelCallback, TransformerFlushCallback, TransformerTransformCallback,
};
use super::types::TransformStream;
use crate::dom::bindings::callback::ExceptionHandling;
use crate::dom::bindings::codegen::Bindings::TransformStreamDefaultControllerBinding::TransformStreamDefaultControllerMethods;
use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
use crate::dom::globalscope::GlobalScope;
use crate::dom::promise::Promise;
use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
use crate::realms::{InRealm, enter_realm};
use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
impl js::gc::Rootable for TransformTransformPromiseRejection {}
/// Reacting to transformPromise as part of
/// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
#[derive(JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
struct TransformTransformPromiseRejection {
controller: Dom<TransformStreamDefaultController>,
}
impl Callback for TransformTransformPromiseRejection {
/// Reacting to transformPromise with the following fulfillment steps:
#[allow(unsafe_code)]
fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
// Perform ! TransformStreamError(controller.[[stream]], r).
self.controller
.error(cx, &self.controller.global(), v, can_gc);
// Throw r.
// Note: this is done part of perform_transform().
}
}
/// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller>
#[dom_struct]
pub struct TransformStreamDefaultController {
reflector_: Reflector,
/// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-cancelalgorithm>
#[ignore_malloc_size_of = "Rc is hard"]
cancel: RefCell<Option<Rc<TransformerCancelCallback>>>,
/// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-flushalgorithm>
#[ignore_malloc_size_of = "Rc is hard"]
flush: RefCell<Option<Rc<TransformerFlushCallback>>>,
/// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-transformalgorithm>
#[ignore_malloc_size_of = "Rc is hard"]
transform: RefCell<Option<Rc<TransformerTransformCallback>>>,
/// The JS object used as `this` when invoking sink algorithms.
#[ignore_malloc_size_of = "mozjs"]
transform_obj: Heap<*mut JSObject>,
/// <https://streams.spec.whatwg.org/#TransformStreamDefaultController-stream>
stream: MutNullableDom<TransformStream>,
/// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-finishpromise>
#[ignore_malloc_size_of = "Rc is hard"]
finish_promise: DomRefCell<Option<Rc<Promise>>>,
}
impl TransformStreamDefaultController {
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
fn new_inherited(transformer: &Transformer) -> TransformStreamDefaultController {
TransformStreamDefaultController {
reflector_: Reflector::new(),
cancel: RefCell::new(transformer.cancel.clone()),
flush: RefCell::new(transformer.flush.clone()),
transform: RefCell::new(transformer.transform.clone()),
finish_promise: DomRefCell::new(None),
stream: MutNullableDom::new(None),
transform_obj: Default::default(),
}
}
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
pub(crate) fn new(
global: &GlobalScope,
transformer: &Transformer,
can_gc: CanGc,
) -> DomRoot<TransformStreamDefaultController> {
reflect_dom_object(
Box::new(TransformStreamDefaultController::new_inherited(transformer)),
global,
can_gc,
)
}
/// Setting the JS object after the heap has settled down.
pub(crate) fn set_transform_obj(&self, this_object: SafeHandleObject) {
self.transform_obj.set(*this_object);
}
pub(crate) fn set_stream(&self, stream: &TransformStream) {
self.stream.set(Some(stream));
}
pub(crate) fn get_finish_promise(&self) -> Option<Rc<Promise>> {
self.finish_promise.borrow().clone()
}
pub(crate) fn set_finish_promise(&self, promise: Rc<Promise>) {
*self.finish_promise.borrow_mut() = Some(promise);
}
/// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
pub(crate) fn transform_stream_default_controller_perform_transform(
&self,
cx: SafeJSContext,
global: &GlobalScope,
chunk: SafeHandleValue,
can_gc: CanGc,
) -> Fallible<Rc<Promise>> {
// Let transformPromise be the result of performing controller.[[transformAlgorithm]], passing chunk.
let transform_promise = self.perform_transform(cx, global, chunk, can_gc)?;
// Return the result of reacting to transformPromise with the following rejection steps given the argument r:
rooted!(in(*cx) let mut reject_handler = Some(TransformTransformPromiseRejection {
controller: Dom::from_ref(self),
}));
let handler = PromiseNativeHandler::new(
global,
None,
reject_handler.take().map(|h| Box::new(h) as Box<_>),
can_gc,
);
let realm = enter_realm(global);
let comp = InRealm::Entered(&realm);
transform_promise.append_native_handler(&handler, comp, can_gc);
Ok(transform_promise)
}
pub(crate) fn perform_transform(
&self,
cx: SafeJSContext,
global: &GlobalScope,
chunk: SafeHandleValue,
can_gc: CanGc,
) -> Fallible<Rc<Promise>> {
// If transformerDict["transform"] exists, set transformAlgorithm to an algorithm which
// takes an argument chunk and returns the result of invoking transformerDict["transform"] with argument list
// « chunk, controller » and callback this value transformer.
let algo = self.transform.borrow().clone();
let result = if let Some(transform) = algo {
rooted!(in(*cx) let this_object = self.transform_obj.get());
let call_result = transform.Call_(
&this_object.handle(),
chunk,
self,
ExceptionHandling::Rethrow,
can_gc,
);
match call_result {
Ok(p) => p,
Err(e) => {
let p = Promise::new(global, can_gc);
p.reject_error(e, can_gc);
p
},
}
} else {
// Let transformAlgorithm be the following steps, taking a chunk argument:
// Let result be TransformStreamDefaultControllerEnqueue(controller, chunk).
// If result is an abrupt completion, return a promise rejected with result.[[Value]].
let promise = if let Err(error) = self.enqueue(cx, global, chunk, can_gc) {
rooted!(in(*cx) let mut error_val = UndefinedValue());
error.to_jsval(cx, global, error_val.handle_mut(), can_gc);
Promise::new_rejected(global, cx, error_val.handle(), can_gc)
} else {
// Otherwise, return a promise resolved with undefined.
Promise::new_resolved(global, cx, (), can_gc)
};
promise
};
Ok(result)
}
pub(crate) fn perform_cancel(
&self,
cx: SafeJSContext,
global: &GlobalScope,
chunk: SafeHandleValue,
can_gc: CanGc,
) -> Fallible<Rc<Promise>> {
// If transformerDict["cancel"] exists, set cancelAlgorithm to an algorithm which takes an argument
// reason and returns the result of invoking transformerDict["cancel"] with argument list « reason »
// and callback this value transformer.
let algo = self.cancel.borrow().clone();
let result = if let Some(cancel) = algo {
rooted!(in(*cx) let this_object = self.transform_obj.get());
let call_result = cancel.Call_(
&this_object.handle(),
chunk,
ExceptionHandling::Rethrow,
can_gc,
);
match call_result {
Ok(p) => p,
Err(e) => {
let p = Promise::new(global, can_gc);
p.reject_error(e, can_gc);
p
},
}
} else {
// Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
Promise::new_resolved(global, cx, (), can_gc)
};
Ok(result)
}
pub(crate) fn perform_flush(
&self,
cx: SafeJSContext,
global: &GlobalScope,
can_gc: CanGc,
) -> Fallible<Rc<Promise>> {
// If transformerDict["flush"] exists, set flushAlgorithm to an algorithm which returns the result of
// invoking transformerDict["flush"] with argument list « controller » and callback this value transformer.
let algo = self.flush.borrow().clone();
let result = if let Some(flush) = algo {
rooted!(in(*cx) let this_object = self.transform_obj.get());
let call_result = flush.Call_(
&this_object.handle(),
self,
ExceptionHandling::Rethrow,
can_gc,
);
match call_result {
Ok(p) => p,
Err(e) => {
let p = Promise::new(global, can_gc);
p.reject_error(e, can_gc);
p
},
}
} else {
// Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
Promise::new_resolved(global, cx, (), can_gc)
};
Ok(result)
}
/// <https://streams.spec.whatwg.org/#transform-stream-default-controller-enqueue>
#[allow(unsafe_code)]
pub(crate) fn enqueue(
&self,
cx: SafeJSContext,
global: &GlobalScope,
chunk: SafeHandleValue,
can_gc: CanGc,
) -> Fallible<()> {
// Let stream be controller.[[stream]].
let stream = self.stream.get().expect("stream is null");
// Let readableController be stream.[[readable]].[[controller]].
let readable = stream.get_readable();
let readable_controller = readable.get_default_controller();
// If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(readableController)
// is false, throw a TypeError exception.
if !readable_controller.can_close_or_enqueue() {
return Err(Error::Type(
"ReadableStreamDefaultControllerCanCloseOrEnqueue is false".to_owned(),
));
}
// Let enqueueResult be ReadableStreamDefaultControllerEnqueue(readableController, chunk).
// If enqueueResult is an abrupt completion,
if let Err(error) = readable_controller.enqueue(cx, chunk, can_gc) {
// Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, enqueueResult.[[Value]]).
rooted!(in(*cx) let mut rooted_error = UndefinedValue());
error
.clone()
.to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
stream.error_writable_and_unblock_write(cx, global, rooted_error.handle(), can_gc);
// Throw stream.[[readable]].[[storedError]].
unsafe {
if !JS_IsExceptionPending(*cx) {
rooted!(in(*cx) let mut stored_error = UndefinedValue());
readable.get_stored_error(stored_error.handle_mut());
JS_SetPendingException(
*cx,
stored_error.handle().into(),
ExceptionStackBehavior::Capture,
);
}
}
return Err(error);
}
// Let backpressure be ! ReadableStreamDefaultControllerHasBackpressure(readableController).
let backpressure = readable_controller.has_backpressure();
// If backpressure is not stream.[[backpressure]],
if backpressure != stream.get_backpressure() {
// Assert: backpressure is true.
assert!(backpressure);
// Perform ! TransformStreamSetBackpressure(stream, true).
stream.set_backpressure(global, true, can_gc);
}
Ok(())
}
/// <https://streams.spec.whatwg.org/#transform-stream-default-controller-error>
pub(crate) fn error(
&self,
cx: SafeJSContext,
global: &GlobalScope,
reason: SafeHandleValue,
can_gc: CanGc,
) {
// Perform ! TransformStreamError(controller.[[stream]], e).
self.stream
.get()
.expect("stream is undefined")
.error(cx, global, reason, can_gc);
}
/// <https://streams.spec.whatwg.org/#transform-stream-default-controller-clear-algorithms>
pub(crate) fn clear_algorithms(&self) {
// Set controller.[[transformAlgorithm]] to undefined.
self.transform.replace(None);
// Set controller.[[flushAlgorithm]] to undefined.
self.flush.replace(None);
// Set controller.[[cancelAlgorithm]] to undefined.
self.cancel.replace(None);
}
/// <https://streams.spec.whatwg.org/#transform-stream-default-controller-terminate>
pub(crate) fn terminate(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
// Let stream be controller.[[stream]].
let stream = self.stream.get().expect("stream is null");
// Let readableController be stream.[[readable]].[[controller]].
let readable = stream.get_readable();
let readable_controller = readable.get_default_controller();
// Perform ! ReadableStreamDefaultControllerClose(readableController).
readable_controller.close(can_gc);
// Let error be a TypeError exception indicating that the stream has been terminated.
let error = Error::Type("stream has been terminated".to_owned());
// Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, error).
rooted!(in(*cx) let mut rooted_error = UndefinedValue());
error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
stream.error_writable_and_unblock_write(cx, global, rooted_error.handle(), can_gc);
}
}
impl TransformStreamDefaultControllerMethods<crate::DomTypeHolder>
for TransformStreamDefaultController
{
/// <https://streams.spec.whatwg.org/#ts-default-controller-desired-size>
fn GetDesiredSize(&self) -> Option<f64> {
// Let readableController be this.[[stream]].[[readable]].[[controller]].
let readable_controller = self
.stream
.get()
.expect("stream is null")
.get_readable()
.get_default_controller();
// Return ! ReadableStreamDefaultControllerGetDesiredSize(readableController).
readable_controller.get_desired_size()
}
/// <https://streams.spec.whatwg.org/#ts-default-controller-enqueue>
fn Enqueue(&self, cx: SafeJSContext, chunk: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
// Perform ? TransformStreamDefaultControllerEnqueue(this, chunk).
self.enqueue(cx, &self.global(), chunk, can_gc)
}
/// <https://streams.spec.whatwg.org/#ts-default-controller-error>
fn Error(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
// Perform ? TransformStreamDefaultControllerError(this, e).
self.error(cx, &self.global(), reason, can_gc);
Ok(())
}
/// <https://streams.spec.whatwg.org/#ts-default-controller-terminate>
fn Terminate(&self, can_gc: CanGc) -> Fallible<()> {
// Perform ? TransformStreamDefaultControllerTerminate(this).
self.terminate(GlobalScope::get_cx(), &self.global(), can_gc);
Ok(())
}
}

View file

@ -20,6 +20,7 @@ use crate::dom::defaultteeunderlyingsource::DefaultTeeUnderlyingSource;
use crate::dom::globalscope::GlobalScope;
use crate::dom::messageport::MessagePort;
use crate::dom::promise::Promise;
use crate::dom::transformstream::TransformStream;
use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
/// <https://streams.spec.whatwg.org/#underlying-source-api>
@ -46,8 +47,7 @@ pub(crate) enum UnderlyingSourceType {
/// A struct representing a JS object as underlying source,
/// and the actual JS object for use as `thisArg` in callbacks.
/// This is used for the `TransformStream` API.
#[allow(unused)]
Transform(/* Dom<TransformStream>, Rc<Promise>*/),
Transform(Dom<TransformStream>, Rc<Promise>),
}
impl UnderlyingSourceType {
@ -139,9 +139,9 @@ impl UnderlyingSourceContainer {
// Call the cancel algorithm for the appropriate branch.
tee_underlying_source.cancel_algorithm(cx, global, reason, can_gc)
},
UnderlyingSourceType::Transform() => {
UnderlyingSourceType::Transform(stream, _) => {
// Return ! TransformStreamDefaultSourceCancelAlgorithm(stream, reason).
todo!();
Some(stream.transform_stream_default_source_cancel(cx, global, reason, can_gc))
},
UnderlyingSourceType::Transfer(port) => {
// Let cancelAlgorithm be the following steps, taking a reason argument:
@ -213,9 +213,9 @@ impl UnderlyingSourceContainer {
Some(Ok(promise))
},
// Note: other source type have no pull steps for now.
UnderlyingSourceType::Transform() => {
UnderlyingSourceType::Transform(stream, _) => {
// Return ! TransformStreamDefaultSourcePullAlgorithm(stream).
todo!();
Some(stream.transform_stream_default_source_pull(&self.global(), can_gc))
},
_ => None,
}
@ -280,9 +280,9 @@ impl UnderlyingSourceContainer {
// from <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable
None
},
UnderlyingSourceType::Transform() => {
// Some(transform_underlying_source.start_algorithm())
todo!();
UnderlyingSourceType::Transform(_, start_promise) => {
// Let startAlgorithm be an algorithm that returns startPromise.
Some(Ok(start_promise.clone()))
},
_ => None,
}

View file

@ -962,14 +962,13 @@ impl WritableStream {
/// <https://streams.spec.whatwg.org/#create-writable-stream>
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
#[allow(unused)]
pub(crate) fn create_writable_stream(
cx: SafeJSContext,
global: &GlobalScope,
can_gc: CanGc,
writable_high_water_mark: f64,
writable_size_algorithm: Rc<QueuingStrategySize>,
underlying_sink_type: UnderlyingSinkType,
can_gc: CanGc,
) -> Fallible<DomRoot<WritableStream>> {
// Assert: ! IsNonNegativeNumber(highWaterMark) is true.
assert!(writable_high_water_mark >= 0.0);

View file

@ -12,6 +12,7 @@ use js::jsval::{JSVal, UndefinedValue};
use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
use super::types::TransformStream;
use crate::dom::bindings::callback::ExceptionHandling;
use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{
UnderlyingSinkAbortCallback, UnderlyingSinkCloseCallback, UnderlyingSinkStartCallback,
@ -290,8 +291,7 @@ pub enum UnderlyingSinkType {
port: Dom<MessagePort>,
},
/// Algorithms supporting transform streams are implemented in Rust.
#[allow(unused)]
Transform(/*Dom<TransformStream>, Rc<Promise>*/),
Transform(Dom<TransformStream>, Rc<Promise>),
}
impl UnderlyingSinkType {
@ -413,7 +413,7 @@ impl WritableStreamDefaultController {
} => {
backpressure_promise.borrow_mut().take();
},
UnderlyingSinkType::Transform() => {
UnderlyingSinkType::Transform(_, _) => {
return;
},
}
@ -423,7 +423,6 @@ impl WritableStreamDefaultController {
}
/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
#[allow(unsafe_code)]
pub(crate) fn setup(
&self,
cx: SafeJSContext,
@ -560,9 +559,9 @@ impl WritableStreamDefaultController {
// Let startAlgorithm be an algorithm that returns undefined.
Ok(Promise::new_resolved(global, cx, (), can_gc))
},
UnderlyingSinkType::Transform() => {
UnderlyingSinkType::Transform(_, start_promise) => {
// Let startAlgorithm be an algorithm that returns startPromise.
todo!()
Ok(start_promise.clone())
},
}
}
@ -622,9 +621,11 @@ impl WritableStreamDefaultController {
}
promise
},
UnderlyingSinkType::Transform() => {
UnderlyingSinkType::Transform(stream, _) => {
// Return ! TransformStreamDefaultSinkAbortAlgorithm(stream, reason).
todo!()
stream
.transform_stream_default_sink_abort_algorithm(cx, global, reason, can_gc)
.expect("Transform stream default sink abort algorithm should not fail.")
},
};
@ -707,9 +708,11 @@ impl WritableStreamDefaultController {
.append_native_handler(&handler, comp, can_gc);
result_promise
},
UnderlyingSinkType::Transform() => {
UnderlyingSinkType::Transform(stream, _) => {
// Return ! TransformStreamDefaultSinkWriteAlgorithm(stream, chunk).
todo!()
stream
.transform_stream_default_sink_write_algorithm(cx, global, chunk, can_gc)
.expect("Transform stream default sink write algorithm should not fail.")
},
}
}
@ -757,9 +760,11 @@ impl WritableStreamDefaultController {
// Return a promise resolved with undefined.
Promise::new_resolved(global, cx, (), can_gc)
},
UnderlyingSinkType::Transform() => {
UnderlyingSinkType::Transform(stream, _) => {
// Return ! TransformStreamDefaultSinkCloseAlgorithm(stream).
todo!()
stream
.transform_stream_default_sink_close_algorithm(cx, global, can_gc)
.expect("Transform stream default sink close algorithm should not fail.")
},
}
}
@ -1038,7 +1043,7 @@ impl WritableStreamDefaultController {
}
/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-error>
fn error(
pub(crate) fn error(
&self,
stream: &WritableStream,
cx: SafeJSContext,

View file

@ -0,0 +1,18 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/.
*
* The origin of this IDL file is
* https://streams.spec.whatwg.org/#ts-class-definition
*/
[Exposed=*] // [Transferable] - See Bug 1562065
interface TransformStream {
[Throws]
constructor(optional object transformer,
optional QueuingStrategy writableStrategy = {},
optional QueuingStrategy readableStrategy = {});
readonly attribute ReadableStream readable;
readonly attribute WritableStream writable;
};

View file

@ -0,0 +1,15 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/.
*
* The origin of this IDL file is
* https://streams.spec.whatwg.org/#ts-default-controller-class-definition
*/
[Exposed=*]
interface TransformStreamDefaultController {
readonly attribute unrestricted double? desiredSize;
[Throws] undefined enqueue(optional any chunk);
[Throws] undefined error(optional any reason);
[Throws] undefined terminate();
};

View file

@ -0,0 +1,22 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/.
*
* The origin of this IDL file is
* https://streams.spec.whatwg.org/#transformer-api
*/
[GenerateInit]
dictionary Transformer {
TransformerStartCallback start;
TransformerTransformCallback transform;
TransformerFlushCallback flush;
TransformerCancelCallback cancel;
any readableType;
any writableType;
};
callback TransformerStartCallback = any (TransformStreamDefaultController controller);
callback TransformerFlushCallback = Promise<undefined> (TransformStreamDefaultController controller);
callback TransformerTransformCallback = Promise<undefined> (any chunk, TransformStreamDefaultController controller);
callback TransformerCancelCallback = Promise<undefined> (any reason);

View file

@ -256,6 +256,8 @@ skip: true
skip: true
[writable-streams]
skip: false
[transform-streams]
skip: false
[subresource-integrity]
skip: false
[touch-events]

View file

@ -7,6 +7,3 @@
[Transferring a MessagePort with multiple streams should set `.ports`]
expected: FAIL
[TransformStream must not be serializable]
expected: FAIL

View file

@ -10,14 +10,10 @@
[transform-stream-members.any.shadowrealm-in-window.html]
expected: ERROR
[transform-stream-members.any.html]
expected: ERROR
[transform-stream-members.https.any.shadowrealm-in-serviceworker.html]
expected: ERROR
[transform-stream-members.any.worker.html]
expected: ERROR
[transform-stream-members.any.shadowrealm-in-dedicatedworker.html]
expected: ERROR

View file

@ -2,14 +2,5 @@
[window.postMessage should be able to transfer a TransformStream]
expected: FAIL
[a TransformStream with a locked writable should not be transferable]
expected: FAIL
[a TransformStream with a locked readable should not be transferable]
expected: FAIL
[a TransformStream with both sides locked should not be transferable]
expected: FAIL
[piping through transferred transforms should work]
expected: FAIL

View file

@ -1,3 +0,0 @@
[writable-stream.html]
[window.postMessage should be able to transfer a {readable, writable} pair]
expected: FAIL

View file

@ -0,0 +1,23 @@
[backpressure.any.shadowrealm-in-shadowrealm.html]
expected: ERROR
[backpressure.any.shadowrealm-in-window.html]
expected: ERROR
[backpressure.any.serviceworker.html]
expected: ERROR
[backpressure.any.shadowrealm-in-dedicatedworker.html]
expected: ERROR
[backpressure.any.sharedworker.html]
expected: ERROR
[backpressure.https.any.shadowrealm-in-serviceworker.html]
expected: ERROR
[backpressure.any.shadowrealm-in-sharedworker.html]
expected: ERROR
[backpressure.https.any.shadowrealm-in-audioworklet.html]
expected: ERROR

View file

@ -0,0 +1,32 @@
[cancel.any.shadowrealm-in-dedicatedworker.html]
expected: ERROR
[cancel.any.serviceworker.html]
expected: ERROR
[cancel.https.any.shadowrealm-in-audioworklet.html]
expected: ERROR
[cancel.any.shadowrealm-in-sharedworker.html]
expected: ERROR
[cancel.any.sharedworker.html]
expected: ERROR
[cancel.any.shadowrealm-in-window.html]
expected: ERROR
[cancel.any.shadowrealm-in-shadowrealm.html]
expected: ERROR
[cancel.https.any.shadowrealm-in-serviceworker.html]
expected: ERROR
[cancel.any.worker.html]
[readable.cancel() and a parallel writable.close() should reject if a transformer.cancel() calls controller.error()]
expected: FAIL
[cancel.any.html]
[readable.cancel() and a parallel writable.close() should reject if a transformer.cancel() calls controller.error()]
expected: FAIL

View file

@ -0,0 +1,32 @@
[errors.any.sharedworker.html]
expected: ERROR
[errors.https.any.shadowrealm-in-serviceworker.html]
expected: ERROR
[errors.any.shadowrealm-in-sharedworker.html]
expected: ERROR
[errors.any.shadowrealm-in-shadowrealm.html]
expected: ERROR
[errors.any.serviceworker.html]
expected: ERROR
[errors.https.any.shadowrealm-in-audioworklet.html]
expected: ERROR
[errors.any.shadowrealm-in-dedicatedworker.html]
expected: ERROR
[errors.any.shadowrealm-in-window.html]
expected: ERROR
[errors.any.html]
[abort should set the close reason for the writable when it happens before cancel during start, and cancel should reject]
expected: FAIL
[errors.any.worker.html]
[abort should set the close reason for the writable when it happens before cancel during start, and cancel should reject]
expected: FAIL

View file

@ -0,0 +1,23 @@
[flush.any.shadowrealm-in-dedicatedworker.html]
expected: ERROR
[flush.any.shadowrealm-in-window.html]
expected: ERROR
[flush.https.any.shadowrealm-in-audioworklet.html]
expected: ERROR
[flush.any.shadowrealm-in-sharedworker.html]
expected: ERROR
[flush.any.shadowrealm-in-shadowrealm.html]
expected: ERROR
[flush.any.sharedworker.html]
expected: ERROR
[flush.https.any.shadowrealm-in-serviceworker.html]
expected: ERROR
[flush.any.serviceworker.html]
expected: ERROR

View file

@ -0,0 +1,23 @@
[general.any.shadowrealm-in-sharedworker.html]
expected: ERROR
[general.https.any.shadowrealm-in-serviceworker.html]
expected: ERROR
[general.any.shadowrealm-in-shadowrealm.html]
expected: ERROR
[general.any.shadowrealm-in-dedicatedworker.html]
expected: ERROR
[general.https.any.shadowrealm-in-audioworklet.html]
expected: ERROR
[general.any.serviceworker.html]
expected: ERROR
[general.any.shadowrealm-in-window.html]
expected: ERROR
[general.any.sharedworker.html]
expected: ERROR

View file

@ -0,0 +1,23 @@
[lipfuzz.any.shadowrealm-in-window.html]
expected: ERROR
[lipfuzz.https.any.shadowrealm-in-serviceworker.html]
expected: ERROR
[lipfuzz.any.serviceworker.html]
expected: ERROR
[lipfuzz.any.shadowrealm-in-shadowrealm.html]
expected: ERROR
[lipfuzz.any.shadowrealm-in-dedicatedworker.html]
expected: ERROR
[lipfuzz.https.any.shadowrealm-in-audioworklet.html]
expected: ERROR
[lipfuzz.any.sharedworker.html]
expected: ERROR
[lipfuzz.any.shadowrealm-in-sharedworker.html]
expected: ERROR

View file

@ -0,0 +1,23 @@
[patched-global.any.shadowrealm-in-dedicatedworker.html]
expected: ERROR
[patched-global.any.shadowrealm-in-window.html]
expected: ERROR
[patched-global.any.shadowrealm-in-sharedworker.html]
expected: ERROR
[patched-global.https.any.shadowrealm-in-serviceworker.html]
expected: ERROR
[patched-global.any.shadowrealm-in-shadowrealm.html]
expected: ERROR
[patched-global.https.any.shadowrealm-in-audioworklet.html]
expected: ERROR
[patched-global.any.sharedworker.html]
expected: ERROR
[patched-global.any.serviceworker.html]
expected: ERROR

View file

@ -0,0 +1,23 @@
[properties.any.shadowrealm-in-shadowrealm.html]
expected: ERROR
[properties.https.any.shadowrealm-in-serviceworker.html]
expected: ERROR
[properties.any.shadowrealm-in-window.html]
expected: ERROR
[properties.any.shadowrealm-in-dedicatedworker.html]
expected: ERROR
[properties.any.sharedworker.html]
expected: ERROR
[properties.https.any.shadowrealm-in-audioworklet.html]
expected: ERROR
[properties.any.serviceworker.html]
expected: ERROR
[properties.any.shadowrealm-in-sharedworker.html]
expected: ERROR

View file

@ -0,0 +1,23 @@
[reentrant-strategies.any.shadowrealm-in-window.html]
expected: ERROR
[reentrant-strategies.https.any.shadowrealm-in-serviceworker.html]
expected: ERROR
[reentrant-strategies.any.sharedworker.html]
expected: ERROR
[reentrant-strategies.https.any.shadowrealm-in-audioworklet.html]
expected: ERROR
[reentrant-strategies.any.shadowrealm-in-dedicatedworker.html]
expected: ERROR
[reentrant-strategies.any.serviceworker.html]
expected: ERROR
[reentrant-strategies.any.shadowrealm-in-sharedworker.html]
expected: ERROR
[reentrant-strategies.any.shadowrealm-in-shadowrealm.html]
expected: ERROR

View file

@ -0,0 +1,24 @@
[strategies.any.shadowrealm-in-shadowrealm.html]
expected: ERROR
[strategies.any.serviceworker.html]
expected: ERROR
[strategies.https.any.shadowrealm-in-serviceworker.html]
expected: ERROR
[strategies.any.shadowrealm-in-window.html]
expected: ERROR
[strategies.any.shadowrealm-in-sharedworker.html]
expected: ERROR
[strategies.any.shadowrealm-in-dedicatedworker.html]
expected: ERROR
[strategies.any.sharedworker.html]
expected: ERROR
[strategies.https.any.shadowrealm-in-audioworklet.html]
expected: ERROR

View file

@ -0,0 +1,23 @@
[terminate.any.serviceworker.html]
expected: ERROR
[terminate.any.shadowrealm-in-sharedworker.html]
expected: ERROR
[terminate.https.any.shadowrealm-in-serviceworker.html]
expected: ERROR
[terminate.any.shadowrealm-in-window.html]
expected: ERROR
[terminate.any.shadowrealm-in-shadowrealm.html]
expected: ERROR
[terminate.https.any.shadowrealm-in-audioworklet.html]
expected: ERROR
[terminate.any.shadowrealm-in-dedicatedworker.html]
expected: ERROR
[terminate.any.sharedworker.html]
expected: ERROR

View file

@ -13575,14 +13575,14 @@
]
],
"interfaces.https.html": [
"76d746b0663ed73865816e678c2536eceff31f2d",
"72918e837726b58740a491a9223eeeb625055ae5",
[
null,
{}
]
],
"interfaces.worker.js": [
"8d109502622fac7266a4564de09684a3ab94118c",
"e86f34f261442aeaa7074c525fb4b1206219769d",
[
"mozilla/interfaces.worker.html",
{}

View file

@ -371,6 +371,8 @@ test_interfaces([
"WritableStream",
"WritableStreamDefaultController",
"WritableStreamDefaultWriter",
"TransformStream",
"TransformStreamDefaultController",
"WGSLLanguageFeatures",
"XMLDocument",
"XMLHttpRequest",

View file

@ -137,6 +137,8 @@ test_interfaces([
"WritableStream",
"WritableStreamDefaultController",
"WritableStreamDefaultWriter",
"TransformStream",
"TransformStreamDefaultController",
"WGSLLanguageFeatures",
"XMLHttpRequest",
"XMLHttpRequestEventTarget",