implement abort algorithm and integrate with piping shutdown

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
This commit is contained in:
gterzian 2025-06-05 20:26:19 +07:00
parent 5a4c6907c7
commit d110462725
No known key found for this signature in database
GPG key ID: E290318CF2FC84D3
2 changed files with 91 additions and 55 deletions

View file

@ -18,6 +18,7 @@ use crate::dom::bindings::root::{Dom, DomRoot};
use crate::dom::eventtarget::EventTarget; use crate::dom::eventtarget::EventTarget;
use crate::dom::globalscope::GlobalScope; use crate::dom::globalscope::GlobalScope;
use crate::dom::readablestream::PipeTo; use crate::dom::readablestream::PipeTo;
use crate::realms::{InRealm, enter_realm};
use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
impl js::gc::Rootable for AbortAlgorithm {} impl js::gc::Rootable for AbortAlgorithm {}
@ -75,6 +76,8 @@ impl AbortSignal {
/// <https://dom.spec.whatwg.org/#abortsignal-signal-abort> /// <https://dom.spec.whatwg.org/#abortsignal-signal-abort>
pub(crate) fn signal_abort(&self, cx: SafeJSContext, reason: HandleValue, can_gc: CanGc) { pub(crate) fn signal_abort(&self, cx: SafeJSContext, reason: HandleValue, can_gc: CanGc) {
let global = self.global(); let global = self.global();
let realm = enter_realm(&*global);
let comp = InRealm::Entered(&realm);
// If signal is aborted, then return. // If signal is aborted, then return.
if self.Aborted() { if self.Aborted() {
@ -98,7 +101,7 @@ impl AbortSignal {
// TODO: #36936 // TODO: #36936
// Run the abort steps for signal. // Run the abort steps for signal.
self.run_the_abort_steps(cx, &global, can_gc); self.run_the_abort_steps(cx, &global, comp, can_gc);
// For each dependentSignal of dependentSignalsToAbort, run the abort steps for dependentSignal. // For each dependentSignal of dependentSignalsToAbort, run the abort steps for dependentSignal.
// TODO: #36936 // TODO: #36936
@ -116,26 +119,39 @@ impl AbortSignal {
} }
/// Run a specific abort algorithm. /// Run a specific abort algorithm.
pub(crate) fn run_abort_algorithm(&self, cx: SafeJSContext, global: &GlobalScope, algorithm: &AbortAlgorithm, can_gc: CanGc) { pub(crate) fn run_abort_algorithm(
&self,
cx: SafeJSContext,
global: &GlobalScope,
algorithm: &AbortAlgorithm,
realm: InRealm,
can_gc: CanGc,
) {
match algorithm { match algorithm {
AbortAlgorithm::StreamPiping(pipe) => { AbortAlgorithm::StreamPiping(pipe) => {
rooted!(in(*cx) let mut reason = UndefinedValue()); rooted!(in(*cx) let mut reason = UndefinedValue());
reason.set(self.abort_reason.get()); reason.set(self.abort_reason.get());
pipe.abort_with_reason(cx, global, reason.handle(), can_gc); pipe.abort_with_reason(cx, global, reason.handle(), realm, can_gc);
}, },
_ => { _ => {
// TODO: match on variant and implement algo steps. // TODO: match on variant and implement algo steps.
// See the various items of #34866 // See the various items of #34866
} },
} }
} }
/// <https://dom.spec.whatwg.org/#run-the-abort-steps> /// <https://dom.spec.whatwg.org/#run-the-abort-steps>
fn run_the_abort_steps(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) { fn run_the_abort_steps(
&self,
cx: SafeJSContext,
global: &GlobalScope,
realm: InRealm,
can_gc: CanGc,
) {
// For each algorithm of signals abort algorithms: run algorithm. // For each algorithm of signals abort algorithms: run algorithm.
let algos = mem::take(&mut *self.abort_algorithms.borrow_mut()); let algos = mem::take(&mut *self.abort_algorithms.borrow_mut());
for algo in algos { for algo in algos {
self.run_abort_algorithm(cx, global, &algo, can_gc); self.run_abort_algorithm(cx, global, &algo, realm, can_gc);
} }
// Empty signals abort algorithms. // Empty signals abort algorithms.

View file

@ -47,7 +47,7 @@ use crate::dom::bindings::utils::get_dictionary_property;
use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm}; use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
use crate::dom::readablestreamgenericreader::ReadableStreamGenericReader; use crate::dom::readablestreamgenericreader::ReadableStreamGenericReader;
use crate::dom::globalscope::GlobalScope; use crate::dom::globalscope::GlobalScope;
use crate::dom::promise::Promise; use crate::dom::promise::{wait_for_all_promise, Promise};
use crate::dom::readablebytestreamcontroller::ReadableByteStreamController; use crate::dom::readablebytestreamcontroller::ReadableByteStreamController;
use crate::dom::readablestreambyobreader::ReadableStreamBYOBReader; use crate::dom::readablestreambyobreader::ReadableStreamBYOBReader;
use crate::dom::readablestreamdefaultcontroller::ReadableStreamDefaultController; use crate::dom::readablestreamdefaultcontroller::ReadableStreamDefaultController;
@ -100,6 +100,8 @@ enum ShutdownAction {
ReadableStreamCancel, ReadableStreamCancel,
/// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation> /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation>
WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterCloseWithErrorPropagation,
/// <https://streams.spec.whatwg.org/#ref-for-rs-pipeTo-shutdown-with-action>
Abort,
} }
impl js::gc::Rootable for PipeTo {} impl js::gc::Rootable for PipeTo {}
@ -145,6 +147,11 @@ pub(crate) struct PipeTo {
#[ignore_malloc_size_of = "Rc are hard"] #[ignore_malloc_size_of = "Rc are hard"]
shutting_down: Rc<Cell<bool>>, shutting_down: Rc<Cell<bool>>,
/// The abort reason of the abort signal,
/// stored here because we must keep it across a microtask.
#[ignore_malloc_size_of = "mozjs"]
abort_reason: Rc<Heap<JSVal>>,
/// The error potentially passed to shutdown, /// The error potentially passed to shutdown,
/// stored here because we must keep it across a microtask. /// stored here because we must keep it across a microtask.
#[ignore_malloc_size_of = "mozjs"] #[ignore_malloc_size_of = "mozjs"]
@ -164,57 +171,23 @@ pub(crate) struct PipeTo {
impl PipeTo { impl PipeTo {
/// Run the `abortAlgorithm` defined at /// Run the `abortAlgorithm` defined at
/// <https://streams.spec.whatwg.org/#readable-stream-pipe-to> /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
pub(crate) fn abort_with_reason(&self, cx: SafeJSContext, global: &GlobalScope, error: SafeHandleValue, can_gc: CanGc) { pub(crate) fn abort_with_reason(
&self,
cx: SafeJSContext,
global: &GlobalScope,
error: SafeHandleValue,
realm: InRealm,
can_gc: CanGc,
) {
// Let error be signals abort reason. // Let error be signals abort reason.
// Note: passed as the `reason` argument. self.abort_reason.set(error.get());
// Let actions be an empty ordered set. // Let actions be an empty ordered set.
let mut actions = vec![]; // Note: the actions are defined, and performed, inside `shutdown_with_an_action`.
// If preventAbort is false, append the following action to actions: // Shutdown with an action consisting of getting a promise to wait for all of the actions in actions,
if !self.prevent_abort { // and with error.
let dest = self self.shutdown(cx, global, Some(ShutdownAction::Abort), realm, can_gc);
.writer
.get_stream()
.expect("Destination stream must be set");
// If dest.[[state]] is "writable",
let promise = if dest.is_writable() {
// return ! WritableStreamAbort(dest, error)
dest.abort(cx, global, error, can_gc)
} else {
// Otherwise, return a promise resolved with undefined.
Promise::new_resolved(
global,
cx,
&(),
can_gc,
)
};
actions.push(promise);
}
// If preventCancel is false, append the following action action to actions:
if !self.prevent_cancel {
let source = self.reader.get_stream().expect("Source stream must be set");
// If source.[[state]] is "readable",
let promise = if source.is_readable() {
// return ! ReadableStreamCancel(source, error).
source.cancel(cx, global, error, can_gc)
} else {
// Otherwise, return a promise resolved with undefined.
Promise::new_resolved(
global,
cx,
&(),
can_gc,
)
};
actions.push(promise);
}
// Shutdown with an action consisting of getting a promise to wait for all of the actions in actions, and with error.
} }
} }
@ -706,6 +679,52 @@ impl PipeTo {
ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => { ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => {
self.writer.close_with_error_propagation(cx, global, can_gc) self.writer.close_with_error_propagation(cx, global, can_gc)
}, },
ShutdownAction::Abort => {
// Note: implementation of the the `abortAlgorithm`
// of the signal associated with this piping operation.
// Let error be signals abort reason.
rooted!(in(*cx) let mut error = UndefinedValue());
error.set(self.abort_reason.get());
// Let actions be an empty ordered set.
let mut actions = vec![];
// If preventAbort is false, append the following action to actions:
if !self.prevent_abort {
let dest = self
.writer
.get_stream()
.expect("Destination stream must be set");
// If dest.[[state]] is "writable",
let promise = if dest.is_writable() {
// return ! WritableStreamAbort(dest, error)
dest.abort(cx, global, error.handle(), can_gc)
} else {
// Otherwise, return a promise resolved with undefined.
Promise::new_resolved(global, cx, &(), can_gc)
};
actions.push(promise);
}
// If preventCancel is false, append the following action action to actions:
if !self.prevent_cancel {
let source = self.reader.get_stream().expect("Source stream must be set");
// If source.[[state]] is "readable",
let promise = if source.is_readable() {
// return ! ReadableStreamCancel(source, error).
source.cancel(cx, global, error.handle(), can_gc)
} else {
// Otherwise, return a promise resolved with undefined.
Promise::new_resolved(global, cx, &(), can_gc)
};
actions.push(promise);
}
wait_for_all_promise(cx, global, actions, realm, can_gc)
},
}; };
// Upon fulfillment of p, finalize, passing along originalError if it was given. // Upon fulfillment of p, finalize, passing along originalError if it was given.
@ -1751,6 +1770,7 @@ impl ReadableStream {
prevent_cancel, prevent_cancel,
prevent_close, prevent_close,
shutting_down: Default::default(), shutting_down: Default::default(),
abort_reason: Default::default(),
shutdown_error: Default::default(), shutdown_error: Default::default(),
shutdown_action_promise: Default::default(), shutdown_action_promise: Default::default(),
result_promise: promise.clone(), result_promise: promise.clone(),
@ -1770,7 +1790,7 @@ impl ReadableStream {
// If signal is aborted, perform abortAlgorithm and return promise. // If signal is aborted, perform abortAlgorithm and return promise.
if signal.aborted() { if signal.aborted() {
signal.run_abort_algorithm(cx, global, &abort_algorithm, can_gc); signal.run_abort_algorithm(cx, global, &abort_algorithm, realm, can_gc);
return promise; return promise;
} }