AbortController: integrate with stream piping. (#37244)

Start using abort signal in Use in
https://streams.spec.whatwg.org/#readablestream-pipe-to-signal

Part of https://github.com/servo/servo/issues/34866

Will also cover https://github.com/servo/servo/issues/37230 and
https://github.com/servo/servo/issues/37232

---------

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
This commit is contained in:
Gregory Terzian 2025-06-13 16:52:38 +07:00 committed by GitHub
parent 099fd10317
commit 730fe35b42
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 508 additions and 263 deletions

View file

@ -14,7 +14,7 @@ use dom_struct::dom_struct;
use ipc_channel::ipc::IpcSharedMemory;
use js::conversions::ToJSValConvertible;
use js::jsapi::{Heap, JSObject};
use js::jsval::{JSVal, NullValue, ObjectValue, UndefinedValue};
use js::jsval::{JSVal, ObjectValue, UndefinedValue};
use js::rust::{
HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
MutableHandleValue as SafeMutableHandleValue,
@ -29,8 +29,9 @@ use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
use script_bindings::str::DOMString;
use crate::dom::domexception::{DOMErrorName, DOMException};
use script_bindings::conversions::StringificationBehavior;
use script_bindings::conversions::{is_array_like, StringificationBehavior};
use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
@ -46,7 +47,7 @@ use crate::dom::bindings::utils::get_dictionary_property;
use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
use crate::dom::readablestreamgenericreader::ReadableStreamGenericReader;
use crate::dom::globalscope::GlobalScope;
use crate::dom::promise::Promise;
use crate::dom::promise::{wait_for_all_promise, Promise};
use crate::dom::readablebytestreamcontroller::ReadableByteStreamController;
use crate::dom::readablestreambyobreader::ReadableStreamBYOBReader;
use crate::dom::readablestreamdefaultcontroller::ReadableStreamDefaultController;
@ -99,6 +100,8 @@ enum ShutdownAction {
ReadableStreamCancel,
/// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation>
WritableStreamDefaultWriterCloseWithErrorPropagation,
/// <https://streams.spec.whatwg.org/#ref-for-rs-pipeTo-shutdown-with-action>
Abort,
}
impl js::gc::Rootable for PipeTo {}
@ -113,7 +116,7 @@ impl js::gc::Rootable for PipeTo {}
/// - Error and close states must be propagated: we'll do this by checking these states at every step.
#[derive(Clone, JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
struct PipeTo {
pub(crate) struct PipeTo {
/// <https://streams.spec.whatwg.org/#ref-for-readablestream%E2%91%A7%E2%91%A0>
reader: Dom<ReadableStreamDefaultReader>,
@ -144,10 +147,15 @@ struct PipeTo {
#[ignore_malloc_size_of = "Rc are hard"]
shutting_down: Rc<Cell<bool>>,
/// The abort reason of the abort signal,
/// stored here because we must keep it across a microtask.
#[ignore_malloc_size_of = "mozjs"]
abort_reason: Rc<Heap<JSVal>>,
/// The error potentially passed to shutdown,
/// stored here because we must keep it across a microtask.
#[ignore_malloc_size_of = "mozjs"]
shutdown_error: Rc<Heap<JSVal>>,
shutdown_error: Rc<RefCell<Option<Heap<JSVal>>>>,
/// The promise returned by a shutdown action.
/// We keep it to only continue when it is not pending anymore.
@ -160,6 +168,49 @@ struct PipeTo {
result_promise: Rc<Promise>,
}
impl PipeTo {
/// Run the `abortAlgorithm` defined at
/// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
pub(crate) fn abort_with_reason(
&self,
cx: SafeJSContext,
global: &GlobalScope,
reason: SafeHandleValue,
realm: InRealm,
can_gc: CanGc,
) {
// Abort should do nothing if we are already shutting down.
if self.shutting_down.get() {
return;
}
// Let error be signals abort reason.
// Note: storing it because it may need to be kept across a microtask,
// and see the note below as to why it is kept separately from `shutdown_error`.
self.abort_reason.set(reason.get());
// Note: setting the error now,
// will result in a rejection of the pipe promise, with this error.
// Unless any shutdown action raise their own error,
// in which case this error will be overwritten by the shutdown action error.
{
let mut error = Some(Heap::default());
// Setting the value on the heap after it has been moved.
if let Some(heap) = error.as_mut() {
heap.set(reason.get())
}
*self.shutdown_error.borrow_mut() = error;
}
// Let actions be an empty ordered set.
// Note: the actions are defined, and performed, inside `shutdown_with_an_action`.
// Shutdown with an action consisting of getting a promise to wait for all of the actions in actions,
// and with error.
self.shutdown(cx, global, Some(ShutdownAction::Abort), realm, can_gc);
}
}
impl Callback for PipeTo {
/// The pipe makes progress one microtask at a time.
/// Note: we use one struct as the callback for all promises,
@ -265,6 +316,11 @@ impl Callback for PipeTo {
// Write the chunk.
self.write_chunk(cx, &global, result, can_gc);
// An early return is necessary if the write algorithm aborted the pipe.
if self.shutting_down.get() {
return;
}
// Wait for the writer to be ready again.
self.wait_for_writer_ready(&global, realm, can_gc);
},
@ -296,12 +352,33 @@ impl Callback for PipeTo {
return;
}
let is_array_like = {
if !result.is_object() {
false
} else {
unsafe { is_array_like::<crate::DomTypeHolder>(*cx, result) }
}
};
// Finalize, passing along error if it was given.
if !result.is_undefined() {
// All actions either resolve with undefined,
if !result.is_undefined() && !is_array_like {
// Most actions either resolve with undefined,
// or reject with an error,
// and the error should be used when finalizing.
self.shutdown_error.set(result.get());
// One exception is the `Abort` action,
// which resolves with a list of undefined values.
// If `result` isn't undefined or array-like,
// then it is an error
// and should overwrite the current shutdown error.
{
let mut error = Some(Heap::default());
// Setting the value on the heap after it has been moved.
if let Some(heap) = error.as_mut() {
heap.set(result.get())
}
*self.shutdown_error.borrow_mut() = error;
}
}
self.finalize(cx, &global, can_gc);
},
@ -426,7 +503,14 @@ impl PipeTo {
if source.is_errored() {
rooted!(in(*cx) let mut source_error = UndefinedValue());
source.get_stored_error(source_error.handle_mut());
self.shutdown_error.set(source_error.get());
{
let mut error = Some(Heap::default());
// Setting the value on the heap after it has been moved.
if let Some(heap) = error.as_mut() {
heap.set(source_error.get())
}
*self.shutdown_error.borrow_mut() = error;
}
// If preventAbort is false,
if !self.prevent_abort {
@ -469,7 +553,14 @@ impl PipeTo {
if dest.is_errored() {
rooted!(in(*cx) let mut dest_error = UndefinedValue());
dest.get_stored_error(dest_error.handle_mut());
self.shutdown_error.set(dest_error.get());
{
let mut error = Some(Heap::default());
// Setting the value on the heap after it has been moved.
if let Some(heap) = error.as_mut() {
heap.set(dest_error.get())
}
*self.shutdown_error.borrow_mut() = error;
}
// If preventCancel is false,
if !self.prevent_cancel {
@ -558,7 +649,14 @@ impl PipeTo {
let error =
Error::Type("Destination is closed or has closed queued or in flight".to_string());
error.to_jsval(cx, global, dest_closed.handle_mut(), can_gc);
self.shutdown_error.set(dest_closed.get());
{
let mut error = Some(Heap::default());
// Setting the value on the heap after it has been moved.
if let Some(heap) = error.as_mut() {
heap.set(dest_closed.get())
}
*self.shutdown_error.borrow_mut() = error;
}
// If preventCancel is false,
if !self.prevent_cancel {
@ -629,7 +727,11 @@ impl PipeTo {
realm: InRealm,
can_gc: CanGc,
) {
rooted!(in(*cx) let mut error = self.shutdown_error.get());
rooted!(in(*cx) let mut error = UndefinedValue());
if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
error.set(shutdown_error.get());
}
*self.state.borrow_mut() = PipeToState::ShuttingDownPendingAction;
// Let p be the result of performing action.
@ -648,6 +750,55 @@ impl PipeTo {
ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => {
self.writer.close_with_error_propagation(cx, global, can_gc)
},
ShutdownAction::Abort => {
// Note: implementation of the the `abortAlgorithm`
// of the signal associated with this piping operation.
// Let error be signals abort reason.
rooted!(in(*cx) let mut error = UndefinedValue());
error.set(self.abort_reason.get());
// Let actions be an empty ordered set.
let mut actions = vec![];
// If preventAbort is false, append the following action to actions:
if !self.prevent_abort {
let dest = self
.writer
.get_stream()
.expect("Destination stream must be set");
// If dest.[[state]] is "writable",
let promise = if dest.is_writable() {
// return ! WritableStreamAbort(dest, error)
dest.abort(cx, global, error.handle(), can_gc)
} else {
// Otherwise, return a promise resolved with undefined.
Promise::new_resolved(global, cx, (), can_gc)
};
actions.push(promise);
}
// If preventCancel is false, append the following action action to actions:
if !self.prevent_cancel {
let source = self.reader.get_stream().expect("Source stream must be set");
// If source.[[state]] is "readable",
let promise = if source.is_readable() {
// return ! ReadableStreamCancel(source, error).
source.cancel(cx, global, error.handle(), can_gc)
} else {
// Otherwise, return a promise resolved with undefined.
Promise::new_resolved(global, cx, (), can_gc)
};
actions.push(promise);
}
// Shutdown with an action consisting
// of getting a promise to wait for all of the actions in actions,
// and with error.
wait_for_all_promise(cx, global, actions, realm, can_gc)
},
};
// Upon fulfillment of p, finalize, passing along originalError if it was given.
@ -679,10 +830,13 @@ impl PipeTo {
.expect("Releasing the reader should not fail");
// If signal is not undefined, remove abortAlgorithm from signal.
// TODO: implement AbortSignal.
// Note: since `self.shutdown` is true at this point,
// the abort algorithm is a no-op,
// so for now not implementing this step.
rooted!(in(*cx) let mut error = self.shutdown_error.get());
if !error.is_null() {
if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
rooted!(in(*cx) let mut error = UndefinedValue());
error.set(shutdown_error.get());
// If error was given, reject promise with error.
self.result_promise.reject_native(&error.handle(), can_gc);
} else {
@ -1636,9 +1790,10 @@ impl ReadableStream {
cx: SafeJSContext,
global: &GlobalScope,
dest: &WritableStream,
prevent_close: bool,
prevent_abort: bool,
prevent_cancel: bool,
prevent_close: bool,
signal: Option<&AbortSignal>,
realm: InRealm,
can_gc: CanGc,
) -> Rc<Promise> {
@ -1649,7 +1804,7 @@ impl ReadableStream {
// If signal was not given, let signal be undefined.
// Assert: either signal is undefined, or signal implements AbortSignal.
// TODO: implement AbortSignal.
// Note: done with the `signal` argument.
// Assert: ! IsReadableStreamLocked(source) is false.
assert!(!self.is_locked());
@ -1682,9 +1837,6 @@ impl ReadableStream {
// Let promise be a new promise.
let promise = Promise::new(global, can_gc);
// If signal is not undefined,
// TODO: implement AbortSignal.
// In parallel, but not really, using reader and writer, read all chunks from source and write them to dest.
rooted!(in(*cx) let pipe_to = PipeTo {
reader: Dom::from_ref(&reader),
@ -1695,15 +1847,28 @@ impl ReadableStream {
prevent_cancel,
prevent_close,
shutting_down: Default::default(),
abort_reason: Default::default(),
shutdown_error: Default::default(),
shutdown_action_promise: Default::default(),
result_promise: promise.clone(),
});
// Note: set the shutdown error to null,
// to distinguish it from cases
// where the error is set to undefined.
pipe_to.shutdown_error.set(NullValue());
// If signal is not undefined,
// Note: moving the steps to here, so that the `PipeTo` is available.
if let Some(signal) = signal {
// Let abortAlgorithm be the following steps:
// Note: steps are implemented at call site.
rooted!(in(*cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));
// If signal is aborted, perform abortAlgorithm and return promise.
if signal.aborted() {
signal.run_abort_algorithm(cx, global, &abort_algorithm, realm, can_gc);
return promise;
}
// Add abortAlgorithm to signal.
signal.add(&abort_algorithm);
}
// Note: perfom checks now, since streams can start as closed or errored.
pipe_to.check_and_propagate_errors_forward(cx, global, realm, can_gc);
@ -1992,16 +2157,17 @@ impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
}
// Let signal be options["signal"] if it exists, or undefined otherwise.
// TODO: implement AbortSignal.
let signal = options.signal.as_deref();
// Return ! ReadableStreamPipeTo.
self.pipe_to(
cx,
&global,
destination,
options.preventClose,
options.preventAbort,
options.preventCancel,
options.preventClose,
signal,
realm,
can_gc,
)
@ -2029,7 +2195,7 @@ impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
}
// Let signal be options["signal"] if it exists, or undefined otherwise.
// TODO: implement AbortSignal.
let signal = options.signal.as_deref();
// Let promise be ! ReadableStreamPipeTo(this, transform["writable"],
// options["preventClose"], options["preventAbort"], options["preventCancel"], signal).
@ -2037,9 +2203,10 @@ impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
cx,
&global,
&transform.writable,
options.preventClose,
options.preventAbort,
options.preventCancel,
options.preventClose,
signal,
realm,
can_gc,
);
@ -2270,7 +2437,9 @@ impl Transferable for ReadableStream {
writable.setup_cross_realm_transform_writable(cx, &port_1, can_gc);
// Let promise be ! ReadableStreamPipeTo(value, writable, false, false, false).
let promise = self.pipe_to(cx, &global, &writable, false, false, false, comp, can_gc);
let promise = self.pipe_to(
cx, &global, &writable, false, false, false, None, comp, can_gc,
);
// Set promise.[[PromiseIsHandled]] to true.
promise.set_promise_is_handled();