Transfer ReadableStream (#36181)

<!-- Please describe your changes on the following line: -->

Add transfer support to ReadableStream. Part of
https://github.com/servo/servo/issues/34676

---
<!-- Thank you for contributing to Servo! Please replace each `[ ]` by
`[X]` when the step is complete, and replace `___` with appropriate
data: -->
- [ ] `./mach build -d` does not report any errors
- [ ] `./mach test-tidy` does not report any errors
- [ ] These changes fix #___ (GitHub issue number if applicable)

<!-- Either: -->
- [ ] There are tests for these changes OR
- [ ] These changes do not require tests because ___

<!-- Also, please make sure that "Allow edits from maintainers" checkbox
is checked, so that we can help you if you get stuck somewhere along the
way.-->

<!-- Pull requests that do not address these steps are welcome, but they
will require additional verification as part of the review process. -->

---------

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
This commit is contained in:
Gregory Terzian 2025-04-15 15:39:26 +08:00 committed by GitHub
parent c9489ca04f
commit f8b6b9f7b6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 983 additions and 75 deletions

View file

@ -19,9 +19,10 @@ use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{
};
use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods;
use crate::dom::bindings::error::{Error, ErrorToJsval};
use crate::dom::bindings::reflector::{Reflector, reflect_dom_object};
use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
use crate::dom::globalscope::GlobalScope;
use crate::dom::messageport::MessagePort;
use crate::dom::promise::Promise;
use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
use crate::dom::readablestreamdefaultcontroller::{EnqueuedValue, QueueWithSizes, ValueWithSize};
@ -135,6 +136,57 @@ impl Callback for StartAlgorithmRejectionHandler {
}
}
impl js::gc::Rootable for TransferBackPressurePromiseReaction {}
/// Reacting to backpressurePromise as part of the `writeAlgorithm` of
/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
#[derive(JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
struct TransferBackPressurePromiseReaction {
/// The result of reacting to backpressurePromise.
#[ignore_malloc_size_of = "Rc is hard"]
result_promise: Rc<Promise>,
/// The backpressurePromise.
#[ignore_malloc_size_of = "Rc is hard"]
backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
/// The chunk received by the `writeAlgorithm`.
#[ignore_malloc_size_of = "mozjs"]
chunk: Box<Heap<JSVal>>,
/// The port used in the algorithm.
port: Dom<MessagePort>,
}
impl Callback for TransferBackPressurePromiseReaction {
/// Reacting to backpressurePromise with the following fulfillment steps:
fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
let global = self.result_promise.global();
// Set backpressurePromise to a new promise.
*self.backpressure_promise.borrow_mut() = Some(Promise::new(&global, can_gc));
// Let result be PackAndPostMessageHandlingError(port, "chunk", chunk).
rooted!(in(*cx) let mut chunk = UndefinedValue());
chunk.set(self.chunk.get());
let result =
self.port
.pack_and_post_message_handling_error("chunk", chunk.handle(), can_gc);
// Disentangle port.
global.disentangle_port(&self.port);
// If result is an abrupt completion,
if let Err(error) = result {
// Return a promise rejected with result.[[Value]].
self.result_promise.reject_error(error, can_gc);
} else {
// Otherwise, return a promise resolved with undefined.
self.result_promise.resolve_native(&(), can_gc);
}
}
}
impl js::gc::Rootable for WriteAlgorithmFulfillmentHandler {}
/// The fulfillment handler for
@ -215,14 +267,16 @@ impl Callback for WriteAlgorithmRejectionHandler {
}
/// The type of sink algorithms we are using.
#[allow(dead_code)]
#[derive(JSTraceable, MallocSizeOf, PartialEq)]
#[derive(JSTraceable, PartialEq)]
pub enum UnderlyingSinkType {
/// Algorithms are provided by Js callbacks.
Js,
/// Algorithms supporting streams transfer are implemented in Rust.
/// TODO: implement transfer.
Transfer,
/// The promise and port used in those algorithms are stored here.
Transfer {
backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
port: Dom<MessagePort>,
},
}
/// <https://streams.spec.whatwg.org/#ws-default-controller-class>
@ -230,8 +284,7 @@ pub enum UnderlyingSinkType {
pub struct WritableStreamDefaultController {
reflector_: Reflector,
/// The type of underlying sink used. Besides the default JS one,
/// there will be others for stream transfer, and for transform stream.
#[ignore_malloc_size_of = "Rc is hard"]
underlying_sink_type: UnderlyingSinkType,
/// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortalgorithm>
@ -409,6 +462,11 @@ impl WritableStreamDefaultController {
Promise::new_resolved(global, cx, result.get(), can_gc)
}
} else {
// Note: we are either here because the Js algorithm is none,
// or because we are suppporting a stream transfer as
// part of #abstract-opdef-setupcrossrealmtransformwritable
// and the logic is the same for both.
// Let startAlgorithm be an algorithm that returns undefined.
Promise::new_resolved(global, cx, (), can_gc)
};
@ -480,9 +538,26 @@ impl WritableStreamDefaultController {
promise
})
},
UnderlyingSinkType::Transfer => {
// TODO: implement transfer.
Promise::new_resolved(global, cx, (), can_gc)
UnderlyingSinkType::Transfer { ref port, .. } => {
// The steps from the `abortAlgorithm` at
// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
// Let result be PackAndPostMessageHandlingError(port, "error", reason).
let result = port.pack_and_post_message_handling_error("error", reason, can_gc);
// Disentangle port.
global.disentangle_port(port);
let promise = Promise::new(global, can_gc);
// If result is an abrupt completion, return a promise rejected with result.[[Value]]
if let Err(error) = result {
promise.reject_error(error, can_gc);
} else {
// Otherwise, return a promise resolved with undefined.
promise.reject_native(&(), can_gc);
}
promise
},
};
@ -521,9 +596,45 @@ impl WritableStreamDefaultController {
promise
})
},
UnderlyingSinkType::Transfer => {
// TODO: implement transfer.
Promise::new_resolved(global, cx, (), can_gc)
UnderlyingSinkType::Transfer {
ref backpressure_promise,
ref port,
..
} => {
// The steps from the `writeAlgorithm` at
// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
{
// If backpressurePromise is undefined,
// set backpressurePromise to a promise resolved with undefined.
let mut backpressure_promise = backpressure_promise.borrow_mut();
if backpressure_promise.is_none() {
*backpressure_promise = Some(Promise::new_resolved(global, cx, (), can_gc));
}
}
// Return the result of reacting to backpressurePromise with the following fulfillment steps:
let result_promise = Promise::new(global, can_gc);
rooted!(in(*cx) let mut fulfillment_handler = Some(TransferBackPressurePromiseReaction {
port: port.clone(),
backpressure_promise: backpressure_promise.clone(),
chunk: Heap::boxed(chunk.get()),
result_promise: result_promise.clone(),
}));
let handler = PromiseNativeHandler::new(
global,
fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
None,
can_gc,
);
let realm = enter_realm(global);
let comp = InRealm::Entered(&realm);
backpressure_promise
.borrow()
.as_ref()
.expect("Promise must be some by now.")
.append_native_handler(&handler, comp, can_gc);
result_promise
},
}
}
@ -551,8 +662,19 @@ impl WritableStreamDefaultController {
promise
})
},
UnderlyingSinkType::Transfer => {
// TODO: implement transfer.
UnderlyingSinkType::Transfer { ref port, .. } => {
// The steps from the `closeAlgorithm` at
// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
// Perform ! PackAndPostMessage(port, "close", undefined).
rooted!(in(*cx) let mut value = UndefinedValue());
port.pack_and_post_message("close", value.handle(), can_gc)
.expect("Sending close should not fail.");
// Disentangle port.
global.disentangle_port(port);
// Return a promise resolved with undefined.
Promise::new_resolved(global, cx, (), can_gc)
},
}