From d1104627253407aec8845a62bc94cd733a176db4 Mon Sep 17 00:00:00 2001 From: gterzian <2792687+gterzian@users.noreply.github.com> Date: Thu, 5 Jun 2025 20:26:19 +0700 Subject: [PATCH] implement abort algorithm and integrate with piping shutdown Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> --- components/script/dom/abortsignal.rs | 28 ++++-- components/script/dom/readablestream.rs | 118 ++++++++++++++---------- 2 files changed, 91 insertions(+), 55 deletions(-) diff --git a/components/script/dom/abortsignal.rs b/components/script/dom/abortsignal.rs index b0f6c4c1b02..c0feb2541a9 100644 --- a/components/script/dom/abortsignal.rs +++ b/components/script/dom/abortsignal.rs @@ -18,6 +18,7 @@ use crate::dom::bindings::root::{Dom, DomRoot}; use crate::dom::eventtarget::EventTarget; use crate::dom::globalscope::GlobalScope; use crate::dom::readablestream::PipeTo; +use crate::realms::{InRealm, enter_realm}; use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; impl js::gc::Rootable for AbortAlgorithm {} @@ -75,6 +76,8 @@ impl AbortSignal { /// pub(crate) fn signal_abort(&self, cx: SafeJSContext, reason: HandleValue, can_gc: CanGc) { let global = self.global(); + let realm = enter_realm(&*global); + let comp = InRealm::Entered(&realm); // If signal is aborted, then return. if self.Aborted() { @@ -98,7 +101,7 @@ impl AbortSignal { // TODO: #36936 // Run the abort steps for signal. - self.run_the_abort_steps(cx, &global, can_gc); + self.run_the_abort_steps(cx, &global, comp, can_gc); // For each dependentSignal of dependentSignalsToAbort, run the abort steps for dependentSignal. // TODO: #36936 @@ -116,26 +119,39 @@ impl AbortSignal { } /// Run a specific abort algorithm. - pub(crate) fn run_abort_algorithm(&self, cx: SafeJSContext, global: &GlobalScope, algorithm: &AbortAlgorithm, can_gc: CanGc) { + pub(crate) fn run_abort_algorithm( + &self, + cx: SafeJSContext, + global: &GlobalScope, + algorithm: &AbortAlgorithm, + realm: InRealm, + can_gc: CanGc, + ) { match algorithm { AbortAlgorithm::StreamPiping(pipe) => { rooted!(in(*cx) let mut reason = UndefinedValue()); reason.set(self.abort_reason.get()); - pipe.abort_with_reason(cx, global, reason.handle(), can_gc); + pipe.abort_with_reason(cx, global, reason.handle(), realm, can_gc); }, _ => { // TODO: match on variant and implement algo steps. // See the various items of #34866 - } + }, } } /// - fn run_the_abort_steps(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) { + fn run_the_abort_steps( + &self, + cx: SafeJSContext, + global: &GlobalScope, + realm: InRealm, + can_gc: CanGc, + ) { // For each algorithm of signal’s abort algorithms: run algorithm. let algos = mem::take(&mut *self.abort_algorithms.borrow_mut()); for algo in algos { - self.run_abort_algorithm(cx, global, &algo, can_gc); + self.run_abort_algorithm(cx, global, &algo, realm, can_gc); } // Empty signal’s abort algorithms. diff --git a/components/script/dom/readablestream.rs b/components/script/dom/readablestream.rs index 1d0c5c9dc2c..f605b0a0422 100644 --- a/components/script/dom/readablestream.rs +++ b/components/script/dom/readablestream.rs @@ -47,7 +47,7 @@ use crate::dom::bindings::utils::get_dictionary_property; use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm}; use crate::dom::readablestreamgenericreader::ReadableStreamGenericReader; use crate::dom::globalscope::GlobalScope; -use crate::dom::promise::Promise; +use crate::dom::promise::{wait_for_all_promise, Promise}; use crate::dom::readablebytestreamcontroller::ReadableByteStreamController; use crate::dom::readablestreambyobreader::ReadableStreamBYOBReader; use crate::dom::readablestreamdefaultcontroller::ReadableStreamDefaultController; @@ -100,6 +100,8 @@ enum ShutdownAction { ReadableStreamCancel, /// WritableStreamDefaultWriterCloseWithErrorPropagation, + /// + Abort, } impl js::gc::Rootable for PipeTo {} @@ -145,6 +147,11 @@ pub(crate) struct PipeTo { #[ignore_malloc_size_of = "Rc are hard"] shutting_down: Rc>, + /// The abort reason of the abort signal, + /// stored here because we must keep it across a microtask. + #[ignore_malloc_size_of = "mozjs"] + abort_reason: Rc>, + /// The error potentially passed to shutdown, /// stored here because we must keep it across a microtask. #[ignore_malloc_size_of = "mozjs"] @@ -164,57 +171,23 @@ pub(crate) struct PipeTo { impl PipeTo { /// Run the `abortAlgorithm` defined at /// - pub(crate) fn abort_with_reason(&self, cx: SafeJSContext, global: &GlobalScope, error: SafeHandleValue, can_gc: CanGc) { + pub(crate) fn abort_with_reason( + &self, + cx: SafeJSContext, + global: &GlobalScope, + error: SafeHandleValue, + realm: InRealm, + can_gc: CanGc, + ) { // Let error be signal’s abort reason. - // Note: passed as the `reason` argument. + self.abort_reason.set(error.get()); // Let actions be an empty ordered set. - let mut actions = vec![]; + // Note: the actions are defined, and performed, inside `shutdown_with_an_action`. - // If preventAbort is false, append the following action to actions: - if !self.prevent_abort { - let dest = self - .writer - .get_stream() - .expect("Destination stream must be set"); - - // If dest.[[state]] is "writable", - let promise = if dest.is_writable() { - // return ! WritableStreamAbort(dest, error) - dest.abort(cx, global, error, can_gc) - } else { - // Otherwise, return a promise resolved with undefined. - Promise::new_resolved( - global, - cx, - &(), - can_gc, - ) - }; - actions.push(promise); - } - - // If preventCancel is false, append the following action action to actions: - if !self.prevent_cancel { - let source = self.reader.get_stream().expect("Source stream must be set"); - - // If source.[[state]] is "readable", - let promise = if source.is_readable() { - // return ! ReadableStreamCancel(source, error). - source.cancel(cx, global, error, can_gc) - } else { - // Otherwise, return a promise resolved with undefined. - Promise::new_resolved( - global, - cx, - &(), - can_gc, - ) - }; - actions.push(promise); - } - - // Shutdown with an action consisting of getting a promise to wait for all of the actions in actions, and with error. + // Shutdown with an action consisting of getting a promise to wait for all of the actions in actions, + // and with error. + self.shutdown(cx, global, Some(ShutdownAction::Abort), realm, can_gc); } } @@ -706,6 +679,52 @@ impl PipeTo { ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => { self.writer.close_with_error_propagation(cx, global, can_gc) }, + ShutdownAction::Abort => { + // Note: implementation of the the `abortAlgorithm` + // of the signal associated with this piping operation. + + // Let error be signal’s abort reason. + rooted!(in(*cx) let mut error = UndefinedValue()); + error.set(self.abort_reason.get()); + + // Let actions be an empty ordered set. + let mut actions = vec![]; + + // If preventAbort is false, append the following action to actions: + if !self.prevent_abort { + let dest = self + .writer + .get_stream() + .expect("Destination stream must be set"); + + // If dest.[[state]] is "writable", + let promise = if dest.is_writable() { + // return ! WritableStreamAbort(dest, error) + dest.abort(cx, global, error.handle(), can_gc) + } else { + // Otherwise, return a promise resolved with undefined. + Promise::new_resolved(global, cx, &(), can_gc) + }; + actions.push(promise); + } + + // If preventCancel is false, append the following action action to actions: + if !self.prevent_cancel { + let source = self.reader.get_stream().expect("Source stream must be set"); + + // If source.[[state]] is "readable", + let promise = if source.is_readable() { + // return ! ReadableStreamCancel(source, error). + source.cancel(cx, global, error.handle(), can_gc) + } else { + // Otherwise, return a promise resolved with undefined. + Promise::new_resolved(global, cx, &(), can_gc) + }; + actions.push(promise); + } + + wait_for_all_promise(cx, global, actions, realm, can_gc) + }, }; // Upon fulfillment of p, finalize, passing along originalError if it was given. @@ -1751,6 +1770,7 @@ impl ReadableStream { prevent_cancel, prevent_close, shutting_down: Default::default(), + abort_reason: Default::default(), shutdown_error: Default::default(), shutdown_action_promise: Default::default(), result_promise: promise.clone(), @@ -1770,7 +1790,7 @@ impl ReadableStream { // If signal is aborted, perform abortAlgorithm and return promise. if signal.aborted() { - signal.run_abort_algorithm(cx, global, &abort_algorithm, can_gc); + signal.run_abort_algorithm(cx, global, &abort_algorithm, realm, can_gc); return promise; }