mirror of
https://github.com/servo/servo.git
synced 2025-08-08 15:05:35 +01:00
basic integration with stream piping
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
This commit is contained in:
parent
f8e67019a9
commit
fe5882013e
5 changed files with 64 additions and 22 deletions
|
@ -14,21 +14,24 @@ use script_bindings::inheritance::Castable;
|
||||||
use crate::dom::bindings::codegen::Bindings::AbortSignalBinding::AbortSignalMethods;
|
use crate::dom::bindings::codegen::Bindings::AbortSignalBinding::AbortSignalMethods;
|
||||||
use crate::dom::bindings::error::{Error, ErrorToJsval};
|
use crate::dom::bindings::error::{Error, ErrorToJsval};
|
||||||
use crate::dom::bindings::reflector::{DomGlobal, reflect_dom_object_with_proto};
|
use crate::dom::bindings::reflector::{DomGlobal, reflect_dom_object_with_proto};
|
||||||
use crate::dom::bindings::root::DomRoot;
|
use crate::dom::bindings::root::{Dom, DomRoot};
|
||||||
use crate::dom::eventtarget::EventTarget;
|
use crate::dom::eventtarget::EventTarget;
|
||||||
use crate::dom::globalscope::GlobalScope;
|
use crate::dom::globalscope::GlobalScope;
|
||||||
|
use crate::dom::readablestream::PipeTo;
|
||||||
use crate::script_runtime::{CanGc, JSContext};
|
use crate::script_runtime::{CanGc, JSContext};
|
||||||
|
|
||||||
|
impl js::gc::Rootable for AbortAlgorithm {}
|
||||||
|
|
||||||
/// <https://dom.spec.whatwg.org/#abortcontroller-api-integration>
|
/// <https://dom.spec.whatwg.org/#abortcontroller-api-integration>
|
||||||
/// TODO: implement algorithms at call point,
|
/// TODO: implement algorithms at call point,
|
||||||
/// in order to integrate the abort signal with its various use cases.
|
/// in order to integrate the abort signal with its various use cases.
|
||||||
#[derive(JSTraceable, MallocSizeOf)]
|
#[derive(Clone, JSTraceable, MallocSizeOf)]
|
||||||
#[allow(dead_code)]
|
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
|
||||||
enum AbortAlgorithm {
|
pub(crate) enum AbortAlgorithm {
|
||||||
/// <https://dom.spec.whatwg.org/#add-an-event-listener>
|
/// <https://dom.spec.whatwg.org/#add-an-event-listener>
|
||||||
DomEventLister,
|
DomEventLister,
|
||||||
/// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
|
/// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
|
||||||
StreamPiping,
|
StreamPiping(PipeTo),
|
||||||
/// <https://fetch.spec.whatwg.org/#dom-global-fetch>
|
/// <https://fetch.spec.whatwg.org/#dom-global-fetch>
|
||||||
Fetch,
|
Fetch,
|
||||||
}
|
}
|
||||||
|
@ -99,13 +102,29 @@ impl AbortSignal {
|
||||||
// TODO: #36936
|
// TODO: #36936
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <https://dom.spec.whatwg.org/#abortsignal-add>
|
||||||
|
pub(crate) fn add(&self, algorithm: &AbortAlgorithm) {
|
||||||
|
// If signal is aborted, then return.
|
||||||
|
if self.aborted() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Append algorithm to signal’s abort algorithms.
|
||||||
|
self.abort_algorithms.borrow_mut().push(algorithm.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run a specific abort algorithm.
|
||||||
|
pub(crate) fn run_abort_algorithm(&self, algorithm: &AbortAlgorithm) {
|
||||||
|
// TODO: match on variant and implement algo steps.
|
||||||
|
// See the various items of #34866
|
||||||
|
}
|
||||||
|
|
||||||
/// <https://dom.spec.whatwg.org/#run-the-abort-steps>
|
/// <https://dom.spec.whatwg.org/#run-the-abort-steps>
|
||||||
fn run_the_abort_steps(&self, can_gc: CanGc) {
|
fn run_the_abort_steps(&self, can_gc: CanGc) {
|
||||||
// For each algorithm of signal’s abort algorithms: run algorithm.
|
// For each algorithm of signal’s abort algorithms: run algorithm.
|
||||||
let algos = mem::take(&mut *self.abort_algorithms.borrow_mut());
|
let algos = mem::take(&mut *self.abort_algorithms.borrow_mut());
|
||||||
for _algo in algos {
|
for algo in algos {
|
||||||
// TODO: match on variant and implement algo steps.
|
self.run_abort_algorithm(&algo);
|
||||||
// See the various items of #34866
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Empty signal’s abort algorithms.
|
// Empty signal’s abort algorithms.
|
||||||
|
@ -117,7 +136,7 @@ impl AbortSignal {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <https://dom.spec.whatwg.org/#abortsignal-aborted>
|
/// <https://dom.spec.whatwg.org/#abortsignal-aborted>
|
||||||
fn aborted(&self) -> bool {
|
pub(crate) fn aborted(&self) -> bool {
|
||||||
// An AbortSignal object is aborted when its abort reason is not undefined.
|
// An AbortSignal object is aborted when its abort reason is not undefined.
|
||||||
!self.abort_reason.get().is_undefined()
|
!self.abort_reason.get().is_undefined()
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@ use script_bindings::str::DOMString;
|
||||||
use crate::dom::domexception::{DOMErrorName, DOMException};
|
use crate::dom::domexception::{DOMErrorName, DOMException};
|
||||||
use script_bindings::conversions::StringificationBehavior;
|
use script_bindings::conversions::StringificationBehavior;
|
||||||
use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
|
use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
|
||||||
|
use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
|
||||||
use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
|
use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
|
||||||
use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
|
use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
|
||||||
use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
|
use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
|
||||||
|
@ -113,7 +114,7 @@ impl js::gc::Rootable for PipeTo {}
|
||||||
/// - Error and close states must be propagated: we'll do this by checking these states at every step.
|
/// - Error and close states must be propagated: we'll do this by checking these states at every step.
|
||||||
#[derive(Clone, JSTraceable, MallocSizeOf)]
|
#[derive(Clone, JSTraceable, MallocSizeOf)]
|
||||||
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
|
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
|
||||||
struct PipeTo {
|
pub(crate) struct PipeTo {
|
||||||
/// <https://streams.spec.whatwg.org/#ref-for-readablestream%E2%91%A7%E2%91%A0>
|
/// <https://streams.spec.whatwg.org/#ref-for-readablestream%E2%91%A7%E2%91%A0>
|
||||||
reader: Dom<ReadableStreamDefaultReader>,
|
reader: Dom<ReadableStreamDefaultReader>,
|
||||||
|
|
||||||
|
@ -1636,9 +1637,10 @@ impl ReadableStream {
|
||||||
cx: SafeJSContext,
|
cx: SafeJSContext,
|
||||||
global: &GlobalScope,
|
global: &GlobalScope,
|
||||||
dest: &WritableStream,
|
dest: &WritableStream,
|
||||||
|
prevent_close: bool,
|
||||||
prevent_abort: bool,
|
prevent_abort: bool,
|
||||||
prevent_cancel: bool,
|
prevent_cancel: bool,
|
||||||
prevent_close: bool,
|
signal: Option<&AbortSignal>,
|
||||||
realm: InRealm,
|
realm: InRealm,
|
||||||
can_gc: CanGc,
|
can_gc: CanGc,
|
||||||
) -> Rc<Promise> {
|
) -> Rc<Promise> {
|
||||||
|
@ -1682,9 +1684,6 @@ impl ReadableStream {
|
||||||
// Let promise be a new promise.
|
// Let promise be a new promise.
|
||||||
let promise = Promise::new(global, can_gc);
|
let promise = Promise::new(global, can_gc);
|
||||||
|
|
||||||
// If signal is not undefined,
|
|
||||||
// TODO: implement AbortSignal.
|
|
||||||
|
|
||||||
// In parallel, but not really, using reader and writer, read all chunks from source and write them to dest.
|
// In parallel, but not really, using reader and writer, read all chunks from source and write them to dest.
|
||||||
rooted!(in(*cx) let pipe_to = PipeTo {
|
rooted!(in(*cx) let pipe_to = PipeTo {
|
||||||
reader: Dom::from_ref(&reader),
|
reader: Dom::from_ref(&reader),
|
||||||
|
@ -1705,6 +1704,23 @@ impl ReadableStream {
|
||||||
// where the error is set to undefined.
|
// where the error is set to undefined.
|
||||||
pipe_to.shutdown_error.set(NullValue());
|
pipe_to.shutdown_error.set(NullValue());
|
||||||
|
|
||||||
|
// If signal is not undefined,
|
||||||
|
// Note: moving the steps to here, so that the `PipeTo` is available.
|
||||||
|
if let Some(signal) = signal {
|
||||||
|
// Let abortAlgorithm be the following steps:
|
||||||
|
// Note: steps are implemented at call site.
|
||||||
|
rooted!(in(*cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));
|
||||||
|
|
||||||
|
// If signal is aborted, perform abortAlgorithm and return promise.
|
||||||
|
if signal.aborted() {
|
||||||
|
signal.run_abort_algorithm(&abort_algorithm);
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add abortAlgorithm to signal.
|
||||||
|
signal.add(&abort_algorithm);
|
||||||
|
}
|
||||||
|
|
||||||
// Note: perfom checks now, since streams can start as closed or errored.
|
// Note: perfom checks now, since streams can start as closed or errored.
|
||||||
pipe_to.check_and_propagate_errors_forward(cx, global, realm, can_gc);
|
pipe_to.check_and_propagate_errors_forward(cx, global, realm, can_gc);
|
||||||
pipe_to.check_and_propagate_errors_backward(cx, global, realm, can_gc);
|
pipe_to.check_and_propagate_errors_backward(cx, global, realm, can_gc);
|
||||||
|
@ -1992,16 +2008,17 @@ impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Let signal be options["signal"] if it exists, or undefined otherwise.
|
// Let signal be options["signal"] if it exists, or undefined otherwise.
|
||||||
// TODO: implement AbortSignal.
|
let signal = options.signal.as_ref().map(|signal| &**signal);
|
||||||
|
|
||||||
// Return ! ReadableStreamPipeTo.
|
// Return ! ReadableStreamPipeTo.
|
||||||
self.pipe_to(
|
self.pipe_to(
|
||||||
cx,
|
cx,
|
||||||
&global,
|
&global,
|
||||||
destination,
|
destination,
|
||||||
|
options.preventClose,
|
||||||
options.preventAbort,
|
options.preventAbort,
|
||||||
options.preventCancel,
|
options.preventCancel,
|
||||||
options.preventClose,
|
signal,
|
||||||
realm,
|
realm,
|
||||||
can_gc,
|
can_gc,
|
||||||
)
|
)
|
||||||
|
@ -2029,7 +2046,7 @@ impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Let signal be options["signal"] if it exists, or undefined otherwise.
|
// Let signal be options["signal"] if it exists, or undefined otherwise.
|
||||||
// TODO: implement AbortSignal.
|
let signal = options.signal.as_ref().map(|signal| &**signal);
|
||||||
|
|
||||||
// Let promise be ! ReadableStreamPipeTo(this, transform["writable"],
|
// Let promise be ! ReadableStreamPipeTo(this, transform["writable"],
|
||||||
// options["preventClose"], options["preventAbort"], options["preventCancel"], signal).
|
// options["preventClose"], options["preventAbort"], options["preventCancel"], signal).
|
||||||
|
@ -2037,9 +2054,10 @@ impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
|
||||||
cx,
|
cx,
|
||||||
&global,
|
&global,
|
||||||
&transform.writable,
|
&transform.writable,
|
||||||
|
options.preventClose,
|
||||||
options.preventAbort,
|
options.preventAbort,
|
||||||
options.preventCancel,
|
options.preventCancel,
|
||||||
options.preventClose,
|
signal,
|
||||||
realm,
|
realm,
|
||||||
can_gc,
|
can_gc,
|
||||||
);
|
);
|
||||||
|
@ -2270,7 +2288,9 @@ impl Transferable for ReadableStream {
|
||||||
writable.setup_cross_realm_transform_writable(cx, &port_1, can_gc);
|
writable.setup_cross_realm_transform_writable(cx, &port_1, can_gc);
|
||||||
|
|
||||||
// Let promise be ! ReadableStreamPipeTo(value, writable, false, false, false).
|
// Let promise be ! ReadableStreamPipeTo(value, writable, false, false, false).
|
||||||
let promise = self.pipe_to(cx, &global, &writable, false, false, false, comp, can_gc);
|
let promise = self.pipe_to(
|
||||||
|
cx, &global, &writable, false, false, false, None, comp, can_gc,
|
||||||
|
);
|
||||||
|
|
||||||
// Set promise.[[PromiseIsHandled]] to true.
|
// Set promise.[[PromiseIsHandled]] to true.
|
||||||
promise.set_promise_is_handled();
|
promise.set_promise_is_handled();
|
||||||
|
|
|
@ -1053,7 +1053,9 @@ impl Transferable for TransformStream {
|
||||||
let proxy_readable = ReadableStream::new_with_proto(&global, None, can_gc);
|
let proxy_readable = ReadableStream::new_with_proto(&global, None, can_gc);
|
||||||
proxy_readable.setup_cross_realm_transform_readable(cx, &port1, can_gc);
|
proxy_readable.setup_cross_realm_transform_readable(cx, &port1, can_gc);
|
||||||
proxy_readable
|
proxy_readable
|
||||||
.pipe_to(cx, &global, &writable, false, false, false, comp, can_gc)
|
.pipe_to(
|
||||||
|
cx, &global, &writable, false, false, false, None, comp, can_gc,
|
||||||
|
)
|
||||||
.set_promise_is_handled();
|
.set_promise_is_handled();
|
||||||
|
|
||||||
// Second port pair (proxy readable → writable)
|
// Second port pair (proxy readable → writable)
|
||||||
|
@ -1075,6 +1077,7 @@ impl Transferable for TransformStream {
|
||||||
false,
|
false,
|
||||||
false,
|
false,
|
||||||
false,
|
false,
|
||||||
|
None,
|
||||||
comp,
|
comp,
|
||||||
can_gc,
|
can_gc,
|
||||||
)
|
)
|
||||||
|
|
|
@ -1241,7 +1241,7 @@ impl Transferable for WritableStream {
|
||||||
readable.setup_cross_realm_transform_readable(cx, &port_1, can_gc);
|
readable.setup_cross_realm_transform_readable(cx, &port_1, can_gc);
|
||||||
|
|
||||||
// Let promise be ! ReadableStreamPipeTo(readable, value, false, false, false).
|
// Let promise be ! ReadableStreamPipeTo(readable, value, false, false, false).
|
||||||
let promise = readable.pipe_to(cx, &global, self, false, false, false, comp, can_gc);
|
let promise = readable.pipe_to(cx, &global, self, false, false, false, None, comp, can_gc);
|
||||||
|
|
||||||
// Set promise.[[PromiseIsHandled]] to true.
|
// Set promise.[[PromiseIsHandled]] to true.
|
||||||
promise.set_promise_is_handled();
|
promise.set_promise_is_handled();
|
||||||
|
|
|
@ -56,5 +56,5 @@ dictionary StreamPipeOptions {
|
||||||
boolean preventClose = false;
|
boolean preventClose = false;
|
||||||
boolean preventAbort = false;
|
boolean preventAbort = false;
|
||||||
boolean preventCancel = false;
|
boolean preventCancel = false;
|
||||||
// AbortSignal signal;
|
AbortSignal signal;
|
||||||
};
|
};
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue