Start adding support for transforms in readable and writable streams (#36470)

Start adding support for transforms in readable and writable streams.
Part of https://github.com/servo/servo/issues/34676
This commit is contained in:
Taym Haddadi 2025-04-28 13:02:55 +02:00 committed by GitHub
parent 02b38adf43
commit 4d975e947b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 394 additions and 161 deletions

View file

@ -3,10 +3,10 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::collections::VecDeque;
use std::ptr::{self};
use std::rc::Rc;
use std::collections::HashMap;
use base::id::{MessagePortId, MessagePortIndex};
use constellation_traits::MessagePortImpl;
@ -22,12 +22,14 @@ use js::typedarray::ArrayBufferViewU8;
use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode, StreamPipeOptions
ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
StreamPipeOptions,
};
use script_bindings::str::DOMString;
use crate::dom::domexception::{DOMErrorName, DOMException};
use script_bindings::conversions::StringificationBehavior;
use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
@ -640,7 +642,7 @@ impl PipeTo {
.reader
.get_stream()
.expect("Reader should have a stream.");
source.cancel(error.handle(), can_gc)
source.cancel(cx, global, error.handle(), can_gc)
},
ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => {
self.writer.close_with_error_propagation(cx, global, can_gc)
@ -766,19 +768,19 @@ impl PartialEq for ReaderType {
/// <https://streams.spec.whatwg.org/#create-readable-stream>
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
fn create_readable_stream(
pub(crate) fn create_readable_stream(
global: &GlobalScope,
underlying_source_type: UnderlyingSourceType,
queuing_strategy: QueuingStrategy,
queuing_strategy: Option<Rc<QueuingStrategySize>>,
high_water_mark: Option<f64>,
can_gc: CanGc,
) -> DomRoot<ReadableStream> {
// If highWaterMark was not passed, set it to 1.
let high_water_mark = queuing_strategy.highWaterMark.unwrap_or(1.0);
let high_water_mark = high_water_mark.unwrap_or(1.0);
// If sizeAlgorithm was not passed, set it to an algorithm that returns 1.
let size_algorithm = queuing_strategy
.size
.unwrap_or(extract_size_algorithm(&QueuingStrategy::empty(), can_gc));
let size_algorithm =
queuing_strategy.unwrap_or(extract_size_algorithm(&QueuingStrategy::empty(), can_gc));
// Assert: ! IsNonNegativeNumber(highWaterMark) is true.
assert!(high_water_mark >= 0.0);
@ -1437,19 +1439,24 @@ impl ReadableStream {
/// <https://streams.spec.whatwg.org/#readable-stream-cancel>
#[allow(unsafe_code)]
pub(crate) fn cancel(&self, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
pub(crate) fn cancel(
&self,
cx: SafeJSContext,
global: &GlobalScope,
reason: SafeHandleValue,
can_gc: CanGc,
) -> Rc<Promise> {
// Set stream.[[disturbed]] to true.
self.disturbed.set(true);
// If stream.[[state]] is "closed", return a promise resolved with undefined.
if self.is_closed() {
return Promise::new_resolved(&self.global(), GlobalScope::get_cx(), (), can_gc);
return Promise::new_resolved(global, cx, (), can_gc);
}
// If stream.[[state]] is "errored", return a promise rejected with stream.[[storedError]].
if self.is_errored() {
let promise = Promise::new(&self.global(), can_gc);
let promise = Promise::new(global, can_gc);
unsafe {
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut rval = UndefinedValue());
self.stored_error.to_jsval(*cx, rval.handle_mut());
promise.reject_native(&rval.handle(), can_gc);
@ -1473,11 +1480,11 @@ impl ReadableStream {
Some(ControllerType::Default(controller)) => controller
.get()
.expect("Stream should have controller.")
.perform_cancel_steps(reason, can_gc),
.perform_cancel_steps(cx, global, reason, can_gc),
Some(ControllerType::Byte(controller)) => controller
.get()
.expect("Stream should have controller.")
.perform_cancel_steps(reason, can_gc),
.perform_cancel_steps(cx, global, reason, can_gc),
None => {
panic!("Stream does not have a controller.");
},
@ -1587,7 +1594,8 @@ impl ReadableStream {
let branch_1 = create_readable_stream(
&self.global(),
underlying_source_type_branch_1,
QueuingStrategy::empty(),
None,
None,
can_gc,
);
tee_source_1.set_branch_1(&branch_1);
@ -1597,7 +1605,8 @@ impl ReadableStream {
let branch_2 = create_readable_stream(
&self.global(),
underlying_source_type_branch_2,
QueuingStrategy::empty(),
None,
None,
can_gc,
);
tee_source_1.set_branch_2(&branch_2);
@ -1908,16 +1917,17 @@ impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
}
/// <https://streams.spec.whatwg.org/#rs-cancel>
fn Cancel(&self, _cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
let global = self.global();
if self.is_locked() {
// If ! IsReadableStreamLocked(this) is true,
// return a promise rejected with a TypeError exception.
let promise = Promise::new(&self.global(), can_gc);
let promise = Promise::new(&global, can_gc);
promise.reject_error(Error::Type("stream is not locked".to_owned()), can_gc);
promise
} else {
// Return ! ReadableStreamCancel(this, reason).
self.cancel(reason, can_gc)
self.cancel(cx, &global, reason, can_gc)
}
}