From 5a4c6907c71844e9851a17185c1592475b9af6a8 Mon Sep 17 00:00:00 2001 From: gterzian <2792687+gterzian@users.noreply.github.com> Date: Thu, 5 Jun 2025 18:58:21 +0700 Subject: [PATCH] implement abort algorithm for stream piping Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> --- components/script/dom/abortsignal.rs | 31 ++++++++----- components/script/dom/readablestream.rs | 59 ++++++++++++++++++++++++- 2 files changed, 79 insertions(+), 11 deletions(-) diff --git a/components/script/dom/abortsignal.rs b/components/script/dom/abortsignal.rs index 744e4382dc3..b0f6c4c1b02 100644 --- a/components/script/dom/abortsignal.rs +++ b/components/script/dom/abortsignal.rs @@ -18,7 +18,7 @@ use crate::dom::bindings::root::{Dom, DomRoot}; use crate::dom::eventtarget::EventTarget; use crate::dom::globalscope::GlobalScope; use crate::dom::readablestream::PipeTo; -use crate::script_runtime::{CanGc, JSContext}; +use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; impl js::gc::Rootable for AbortAlgorithm {} @@ -73,7 +73,9 @@ impl AbortSignal { } /// - pub(crate) fn signal_abort(&self, cx: JSContext, reason: HandleValue, can_gc: CanGc) { + pub(crate) fn signal_abort(&self, cx: SafeJSContext, reason: HandleValue, can_gc: CanGc) { + let global = self.global(); + // If signal is aborted, then return. if self.Aborted() { return; @@ -87,7 +89,7 @@ impl AbortSignal { } else { // otherwise to a new "AbortError" DOMException. rooted!(in(*cx) let mut rooted_error = UndefinedValue()); - Error::Abort.to_jsval(cx, &self.global(), rooted_error.handle_mut(), can_gc); + Error::Abort.to_jsval(cx, &global, rooted_error.handle_mut(), can_gc); self.abort_reason.set(rooted_error.get()) } @@ -96,7 +98,7 @@ impl AbortSignal { // TODO: #36936 // Run the abort steps for signal. - self.run_the_abort_steps(can_gc); + self.run_the_abort_steps(cx, &global, can_gc); // For each dependentSignal of dependentSignalsToAbort, run the abort steps for dependentSignal. // TODO: #36936 @@ -114,17 +116,26 @@ impl AbortSignal { } /// Run a specific abort algorithm. - pub(crate) fn run_abort_algorithm(&self, algorithm: &AbortAlgorithm) { - // TODO: match on variant and implement algo steps. - // See the various items of #34866 + pub(crate) fn run_abort_algorithm(&self, cx: SafeJSContext, global: &GlobalScope, algorithm: &AbortAlgorithm, can_gc: CanGc) { + match algorithm { + AbortAlgorithm::StreamPiping(pipe) => { + rooted!(in(*cx) let mut reason = UndefinedValue()); + reason.set(self.abort_reason.get()); + pipe.abort_with_reason(cx, global, reason.handle(), can_gc); + }, + _ => { + // TODO: match on variant and implement algo steps. + // See the various items of #34866 + } + } } /// - fn run_the_abort_steps(&self, can_gc: CanGc) { + fn run_the_abort_steps(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) { // For each algorithm of signal’s abort algorithms: run algorithm. let algos = mem::take(&mut *self.abort_algorithms.borrow_mut()); for algo in algos { - self.run_abort_algorithm(&algo); + self.run_abort_algorithm(cx, global, &algo, can_gc); } // Empty signal’s abort algorithms. @@ -150,7 +161,7 @@ impl AbortSignalMethods for AbortSignal { } /// - fn Reason(&self, _cx: JSContext, mut rval: MutableHandleValue) { + fn Reason(&self, _cx: SafeJSContext, mut rval: MutableHandleValue) { // The reason getter steps are to return this’s abort reason. rval.set(self.abort_reason.get()); } diff --git a/components/script/dom/readablestream.rs b/components/script/dom/readablestream.rs index 30d37e70f8f..1d0c5c9dc2c 100644 --- a/components/script/dom/readablestream.rs +++ b/components/script/dom/readablestream.rs @@ -161,6 +161,63 @@ pub(crate) struct PipeTo { result_promise: Rc, } +impl PipeTo { + /// Run the `abortAlgorithm` defined at + /// + pub(crate) fn abort_with_reason(&self, cx: SafeJSContext, global: &GlobalScope, error: SafeHandleValue, can_gc: CanGc) { + // Let error be signal’s abort reason. + // Note: passed as the `reason` argument. + + // Let actions be an empty ordered set. + let mut actions = vec![]; + + // If preventAbort is false, append the following action to actions: + if !self.prevent_abort { + let dest = self + .writer + .get_stream() + .expect("Destination stream must be set"); + + // If dest.[[state]] is "writable", + let promise = if dest.is_writable() { + // return ! WritableStreamAbort(dest, error) + dest.abort(cx, global, error, can_gc) + } else { + // Otherwise, return a promise resolved with undefined. + Promise::new_resolved( + global, + cx, + &(), + can_gc, + ) + }; + actions.push(promise); + } + + // If preventCancel is false, append the following action action to actions: + if !self.prevent_cancel { + let source = self.reader.get_stream().expect("Source stream must be set"); + + // If source.[[state]] is "readable", + let promise = if source.is_readable() { + // return ! ReadableStreamCancel(source, error). + source.cancel(cx, global, error, can_gc) + } else { + // Otherwise, return a promise resolved with undefined. + Promise::new_resolved( + global, + cx, + &(), + can_gc, + ) + }; + actions.push(promise); + } + + // Shutdown with an action consisting of getting a promise to wait for all of the actions in actions, and with error. + } +} + impl Callback for PipeTo { /// The pipe makes progress one microtask at a time. /// Note: we use one struct as the callback for all promises, @@ -1713,7 +1770,7 @@ impl ReadableStream { // If signal is aborted, perform abortAlgorithm and return promise. if signal.aborted() { - signal.run_abort_algorithm(&abort_algorithm); + signal.run_abort_algorithm(cx, global, &abort_algorithm, can_gc); return promise; }