Streams: add an underlying sink type (#36385)

Introduces the concept of different types of underlying sinks for the
writable controller, and a minor fix to the abort algorithm.

The dead code is already used in the wip at
https://github.com/servo/servo/pull/36181/, and will also be used in a
another wip in parallel to implement transform stream, so the concept is
introduced here with dead code to facilitate the work in parallel and
prevent too much merge conflicts down the road.

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
This commit is contained in:
Gregory Terzian 2025-04-08 00:48:42 +08:00 committed by GitHub
parent 777b74252a
commit a5c547259f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 105 additions and 50 deletions

View file

@ -28,7 +28,9 @@ use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_alg
use crate::dom::globalscope::GlobalScope;
use crate::dom::promise::Promise;
use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
use crate::dom::writablestreamdefaultcontroller::WritableStreamDefaultController;
use crate::dom::writablestreamdefaultcontroller::{
UnderlyingSinkType, WritableStreamDefaultController,
};
use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter;
use crate::realms::{InRealm, enter_realm};
use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
@ -155,7 +157,7 @@ pub struct WritableStream {
impl WritableStream {
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
/// <https://streams.spec.whatwg.org/#initialize-readable-stream>
/// <https://streams.spec.whatwg.org/#initialize-writable-stream>
fn new_inherited() -> WritableStream {
WritableStream {
reflector_: Reflector::new(),
@ -879,6 +881,7 @@ impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
// Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink
let controller = WritableStreamDefaultController::new(
global,
UnderlyingSinkType::Js,
&underlying_sink_dict,
high_water_mark,
size_algorithm,

View file

@ -214,11 +214,26 @@ impl Callback for WriteAlgorithmRejectionHandler {
}
}
/// The type of sink algorithms we are using.
#[allow(dead_code)]
#[derive(JSTraceable, MallocSizeOf, PartialEq)]
pub enum UnderlyingSinkType {
/// Algorithms are provided by Js callbacks.
Js,
/// Algorithms supporting streams transfer are implemented in Rust.
/// TODO: implement transfer.
Transfer,
}
/// <https://streams.spec.whatwg.org/#ws-default-controller-class>
#[dom_struct]
pub struct WritableStreamDefaultController {
reflector_: Reflector,
/// The type of underlying sink used. Besides the default JS one,
/// there will be others for stream transfer, and for transform stream.
underlying_sink_type: UnderlyingSinkType,
/// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortalgorithm>
#[ignore_malloc_size_of = "Rc is hard"]
abort: RefCell<Option<Rc<UnderlyingSinkAbortCallback>>>,
@ -256,12 +271,14 @@ impl WritableStreamDefaultController {
/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink>
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
fn new_inherited(
underlying_sink_type: UnderlyingSinkType,
underlying_sink: &UnderlyingSink,
strategy_hwm: f64,
strategy_size: Rc<QueuingStrategySize>,
) -> WritableStreamDefaultController {
WritableStreamDefaultController {
reflector_: Reflector::new(),
underlying_sink_type,
queue: Default::default(),
stream: Default::default(),
abort: RefCell::new(underlying_sink.abort.clone()),
@ -276,6 +293,7 @@ impl WritableStreamDefaultController {
pub(crate) fn new(
global: &GlobalScope,
underlying_sink_type: UnderlyingSinkType,
underlying_sink: &UnderlyingSink,
strategy_hwm: f64,
strategy_size: Rc<QueuingStrategySize>,
@ -283,6 +301,7 @@ impl WritableStreamDefaultController {
) -> DomRoot<WritableStreamDefaultController> {
reflect_dom_object(
Box::new(WritableStreamDefaultController::new_inherited(
underlying_sink_type,
underlying_sink,
strategy_hwm,
strategy_size,
@ -390,6 +409,7 @@ impl WritableStreamDefaultController {
Promise::new_resolved(global, cx, result.get(), can_gc)
}
} else {
// Let startAlgorithm be an algorithm that returns undefined.
Promise::new_resolved(global, cx, (), can_gc)
};
@ -439,71 +459,103 @@ impl WritableStreamDefaultController {
reason: SafeHandleValue,
can_gc: CanGc,
) -> Rc<Promise> {
rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
let algo = self.abort.borrow().clone();
let result = if let Some(algo) = algo {
algo.Call_(
&this_object.handle(),
Some(reason),
ExceptionHandling::Rethrow,
can_gc,
)
} else {
Ok(Promise::new_resolved(global, cx, (), can_gc))
let result = match self.underlying_sink_type {
UnderlyingSinkType::Js => {
rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
let algo = self.abort.borrow().clone();
// Let result be the result of performing this.[[abortAlgorithm]], passing reason.
let result = if let Some(algo) = algo {
algo.Call_(
&this_object.handle(),
Some(reason),
ExceptionHandling::Rethrow,
can_gc,
)
} else {
Ok(Promise::new_resolved(global, cx, (), can_gc))
};
result.unwrap_or_else(|e| {
let promise = Promise::new(global, can_gc);
promise.reject_error(e, can_gc);
promise
})
},
UnderlyingSinkType::Transfer => {
// TODO: implement transfer.
Promise::new_resolved(global, cx, (), can_gc)
},
};
result.unwrap_or_else(|e| {
let promise = Promise::new(global, can_gc);
promise.reject_error(e, can_gc);
promise
})
// Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
self.clear_algorithms();
result
}
pub(crate) fn call_write_algorithm(
/// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-writealgorithm>
fn call_write_algorithm(
&self,
cx: SafeJSContext,
chunk: SafeHandleValue,
global: &GlobalScope,
can_gc: CanGc,
) -> Rc<Promise> {
rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
let algo = self.write.borrow().clone();
let result = if let Some(algo) = algo {
algo.Call_(
&this_object.handle(),
chunk,
self,
ExceptionHandling::Rethrow,
can_gc,
)
} else {
Ok(Promise::new_resolved(global, cx, (), can_gc))
};
result.unwrap_or_else(|e| {
let promise = Promise::new(global, can_gc);
promise.reject_error(e, can_gc);
promise
})
match self.underlying_sink_type {
UnderlyingSinkType::Js => {
rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
let algo = self.write.borrow().clone();
let result = if let Some(algo) = algo {
algo.Call_(
&this_object.handle(),
chunk,
self,
ExceptionHandling::Rethrow,
can_gc,
)
} else {
Ok(Promise::new_resolved(global, cx, (), can_gc))
};
result.unwrap_or_else(|e| {
let promise = Promise::new(global, can_gc);
promise.reject_error(e, can_gc);
promise
})
},
UnderlyingSinkType::Transfer => {
// TODO: implement transfer.
Promise::new_resolved(global, cx, (), can_gc)
},
}
}
/// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-closealgorithm>
fn call_close_algorithm(
&self,
cx: SafeJSContext,
global: &GlobalScope,
can_gc: CanGc,
) -> Rc<Promise> {
rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>());
this_object.set(self.underlying_sink_obj.get());
let algo = self.close.borrow().clone();
let result = if let Some(algo) = algo {
algo.Call_(&this_object.handle(), ExceptionHandling::Rethrow, can_gc)
} else {
Ok(Promise::new_resolved(global, cx, (), can_gc))
};
result.unwrap_or_else(|e| {
let promise = Promise::new(global, can_gc);
promise.reject_error(e, can_gc);
promise
})
match self.underlying_sink_type {
UnderlyingSinkType::Js => {
rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>());
this_object.set(self.underlying_sink_obj.get());
let algo = self.close.borrow().clone();
let result = if let Some(algo) = algo {
algo.Call_(&this_object.handle(), ExceptionHandling::Rethrow, can_gc)
} else {
Ok(Promise::new_resolved(global, cx, (), can_gc))
};
result.unwrap_or_else(|e| {
let promise = Promise::new(global, can_gc);
promise.reject_error(e, can_gc);
promise
})
},
UnderlyingSinkType::Transfer => {
// TODO: implement transfer.
Promise::new_resolved(global, cx, (), can_gc)
},
}
}
/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>