servo/components/script/dom/readablestreamdefaultreader.rs
Josh Matthews c94d909a86
script: Limit public exports. (#34915)
* script: Restrict reexport visibility of DOM types.

Signed-off-by: Josh Matthews <josh@joshmatthews.net>

* script: Mass pub->pub(crate) conversion.

Signed-off-by: Josh Matthews <josh@joshmatthews.net>

* script: Hide existing dead code warnings.

Signed-off-by: Josh Matthews <josh@joshmatthews.net>

* Formatting.

Signed-off-by: Josh Matthews <josh@joshmatthews.net>

* Fix clippy warnings.

Signed-off-by: Josh Matthews <josh@joshmatthews.net>

* Formatting.

Signed-off-by: Josh Matthews <josh@joshmatthews.net>

* Fix unit tests.

Signed-off-by: Josh Matthews <josh@joshmatthews.net>

* Fix clippy.

Signed-off-by: Josh Matthews <josh@joshmatthews.net>

* More formatting.

Signed-off-by: Josh Matthews <josh@joshmatthews.net>

---------

Signed-off-by: Josh Matthews <josh@joshmatthews.net>
2025-01-10 08:19:19 +00:00

533 lines
20 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use std::cell::Cell;
use std::collections::VecDeque;
use std::mem;
use std::rc::Rc;
use dom_struct::dom_struct;
use js::jsapi::Heap;
use js::jsval::{JSVal, UndefinedValue};
use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
use super::bindings::root::MutNullableDom;
use super::types::ReadableStreamDefaultController;
use crate::dom::bindings::cell::DomRefCell;
use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::{
ReadableStreamDefaultReaderMethods, ReadableStreamReadResult,
};
use crate::dom::bindings::error::Error;
use crate::dom::bindings::import::module::Fallible;
use crate::dom::bindings::reflector::{reflect_dom_object_with_proto, DomObject, Reflector};
use crate::dom::bindings::root::{Dom, DomRoot};
use crate::dom::bindings::trace::RootedTraceableBox;
use crate::dom::defaultteereadrequest::DefaultTeeReadRequest;
use crate::dom::globalscope::GlobalScope;
use crate::dom::promise::Promise;
use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
use crate::dom::readablestream::ReadableStream;
use crate::realms::{enter_realm, InRealm};
use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
/// <https://streams.spec.whatwg.org/#read-request>
#[derive(Clone, JSTraceable)]
#[crown::unrooted_must_root_lint::must_root]
pub(crate) enum ReadRequest {
/// <https://streams.spec.whatwg.org/#default-reader-read>
Read(Rc<Promise>),
/// <https://streams.spec.whatwg.org/#ref-for-read-request%E2%91%A2>
DefaultTee {
tee_read_request: Dom<DefaultTeeReadRequest>,
},
}
impl ReadRequest {
/// <https://streams.spec.whatwg.org/#read-request-chunk-steps>
pub(crate) fn chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>) {
match self {
ReadRequest::Read(promise) => {
promise.resolve_native(&ReadableStreamReadResult {
done: Some(false),
value: chunk,
});
},
ReadRequest::DefaultTee { tee_read_request } => {
tee_read_request.enqueue_chunk_steps(chunk);
},
}
}
/// <https://streams.spec.whatwg.org/#read-request-close-steps>
pub(crate) fn close_steps(&self) {
match self {
ReadRequest::Read(promise) => {
let result = RootedTraceableBox::new(Heap::default());
result.set(UndefinedValue());
promise.resolve_native(&ReadableStreamReadResult {
done: Some(true),
value: result,
});
},
ReadRequest::DefaultTee { tee_read_request } => {
tee_read_request.close_steps();
},
}
}
/// <https://streams.spec.whatwg.org/#read-request-error-steps>
pub(crate) fn error_steps(&self, e: SafeHandleValue) {
match self {
ReadRequest::Read(promise) => promise.reject_native(&e),
ReadRequest::DefaultTee { tee_read_request } => {
tee_read_request.error_steps();
},
}
}
}
/// The rejection handler for
/// <https://streams.spec.whatwg.org/#readable-stream-tee>
#[derive(Clone, JSTraceable, MallocSizeOf)]
#[crown::unrooted_must_root_lint::must_root]
struct ClosedPromiseRejectionHandler {
branch_1_controller: Dom<ReadableStreamDefaultController>,
branch_2_controller: Dom<ReadableStreamDefaultController>,
#[ignore_malloc_size_of = "Rc"]
canceled_1: Rc<Cell<bool>>,
#[ignore_malloc_size_of = "Rc"]
canceled_2: Rc<Cell<bool>>,
#[ignore_malloc_size_of = "Rc"]
cancel_promise: Rc<Promise>,
}
impl Callback for ClosedPromiseRejectionHandler {
/// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
/// Upon rejection of `reader.closedPromise` with reason `r``,
fn callback(&self, _cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, _can_gc: CanGc) {
let branch_1_controller = &self.branch_1_controller;
let branch_2_controller = &self.branch_2_controller;
// Perform ! ReadableStreamDefaultControllerError(branch_1.[[controller]], r).
branch_1_controller.error(v);
// Perform ! ReadableStreamDefaultControllerError(branch_2.[[controller]], r).
branch_2_controller.error(v);
// If canceled_1 is false or canceled_2 is false, resolve cancelPromise with undefined.
if !self.canceled_1.get() || !self.canceled_2.get() {
self.cancel_promise.resolve_native(&());
}
}
}
/// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
#[dom_struct]
pub(crate) struct ReadableStreamDefaultReader {
reflector_: Reflector,
/// <https://streams.spec.whatwg.org/#readablestreamgenericreader-stream>
stream: MutNullableDom<ReadableStream>,
#[ignore_malloc_size_of = "no VecDeque support"]
read_requests: DomRefCell<VecDeque<ReadRequest>>,
/// <https://streams.spec.whatwg.org/#readablestreamgenericreader-closedpromise>
#[ignore_malloc_size_of = "Rc is hard"]
closed_promise: DomRefCell<Rc<Promise>>,
}
impl ReadableStreamDefaultReader {
/// <https://streams.spec.whatwg.org/#default-reader-constructor>
#[allow(non_snake_case)]
pub(crate) fn Constructor(
global: &GlobalScope,
proto: Option<SafeHandleObject>,
can_gc: CanGc,
stream: &ReadableStream,
) -> Fallible<DomRoot<Self>> {
let reader = Self::new_with_proto(global, proto, can_gc);
// Perform ? SetUpReadableStreamDefaultReader(this, stream).
Self::set_up(&reader, stream, global, can_gc)?;
Ok(reader)
}
fn new_with_proto(
global: &GlobalScope,
proto: Option<SafeHandleObject>,
can_gc: CanGc,
) -> DomRoot<ReadableStreamDefaultReader> {
reflect_dom_object_with_proto(
Box::new(ReadableStreamDefaultReader::new_inherited(global, can_gc)),
global,
proto,
can_gc,
)
}
pub(crate) fn new_inherited(
global: &GlobalScope,
can_gc: CanGc,
) -> ReadableStreamDefaultReader {
ReadableStreamDefaultReader {
reflector_: Reflector::new(),
stream: MutNullableDom::new(None),
read_requests: DomRefCell::new(Default::default()),
closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
}
}
/// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-reader>
pub(crate) fn set_up(
&self,
stream: &ReadableStream,
global: &GlobalScope,
can_gc: CanGc,
) -> Fallible<()> {
// If ! IsReadableStreamLocked(stream) is true, throw a TypeError exception.
if stream.is_locked() {
return Err(Error::Type("stream is locked".to_owned()));
}
// Perform ! ReadableStreamReaderGenericInitialize(reader, stream).
self.generic_initialize(global, stream, can_gc)?;
// Set reader.[[readRequests]] to a new empty list.
self.read_requests.borrow_mut().clear();
Ok(())
}
/// <https://streams.spec.whatwg.org/#readable-stream-reader-generic-initialize>
pub(crate) fn generic_initialize(
&self,
global: &GlobalScope,
stream: &ReadableStream,
can_gc: CanGc,
) -> Fallible<()> {
// Set reader.[[stream]] to stream.
self.stream.set(Some(stream));
// Set stream.[[reader]] to reader.
stream.set_reader(Some(self));
if stream.is_readable() {
// If stream.[[state]] is "readable
// Set reader.[[closedPromise]] to a new promise.
*self.closed_promise.borrow_mut() = Promise::new(global, can_gc);
} else if stream.is_closed() {
// Otherwise, if stream.[[state]] is "closed",
// Set reader.[[closedPromise]] to a promise resolved with undefined.
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut rval = UndefinedValue());
*self.closed_promise.borrow_mut() = Promise::new_resolved(global, cx, rval.handle())?;
} else {
// Assert: stream.[[state]] is "errored"
assert!(stream.is_errored());
// Set reader.[[closedPromise]] to a promise rejected with stream.[[storedError]].
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut error = UndefinedValue());
stream.get_stored_error(error.handle_mut());
*self.closed_promise.borrow_mut() = Promise::new_rejected(global, cx, error.handle())?;
// Set reader.[[closedPromise]].[[PromiseIsHandled]] to true
self.closed_promise.borrow().set_promise_is_handled();
}
Ok(())
}
/// <https://streams.spec.whatwg.org/#readable-stream-close>
#[allow(crown::unrooted_must_root)]
pub(crate) fn close(&self) {
// Resolve reader.[[closedPromise]] with undefined.
self.closed_promise.borrow().resolve_native(&());
// If reader implements ReadableStreamDefaultReader,
// Let readRequests be reader.[[readRequests]].
let mut read_requests = self.take_read_requests();
// Set reader.[[readRequests]] to an empty list.
// For each readRequest of readRequests,
for request in read_requests.drain(0..) {
// Perform readRequests close steps.
request.close_steps();
}
}
/// <https://streams.spec.whatwg.org/#readable-stream-add-read-request>
pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
self.read_requests
.borrow_mut()
.push_back(read_request.clone());
}
/// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests>
pub(crate) fn get_num_read_requests(&self) -> usize {
self.read_requests.borrow().len()
}
/// <https://streams.spec.whatwg.org/#readable-stream-error>
pub(crate) fn error(&self, e: SafeHandleValue) {
// Reject reader.[[closedPromise]] with e.
self.closed_promise.borrow().reject_native(&e);
// Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
self.closed_promise.borrow().set_promise_is_handled();
// Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
self.error_read_requests(e);
}
/// The removal steps of <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request>
#[allow(crown::unrooted_must_root)]
pub(crate) fn remove_read_request(&self) -> ReadRequest {
self.read_requests
.borrow_mut()
.pop_front()
.expect("Reader must have read request when remove is called into.")
}
/// <https://streams.spec.whatwg.org/#readable-stream-reader-generic-release>
#[allow(unsafe_code)]
pub(crate) fn generic_release(&self) {
// Let stream be reader.[[stream]].
// Assert: stream is not undefined.
assert!(self.stream.get().is_some());
if let Some(stream) = self.stream.get() {
// Assert: stream.[[reader]] is reader.
assert!(stream.has_default_reader());
if stream.is_readable() {
// If stream.[[state]] is "readable", reject reader.[[closedPromise]] with a TypeError exception.
self.closed_promise
.borrow()
.reject_error(Error::Type("stream state is not readable".to_owned()));
} else {
// Otherwise, set reader.[[closedPromise]] to a promise rejected with a TypeError exception.
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut error = UndefinedValue());
unsafe {
Error::Type("Cannot release lock due to stream state.".to_owned()).to_jsval(
*cx,
&self.global(),
error.handle_mut(),
)
};
*self.closed_promise.borrow_mut() =
Promise::new_rejected(&self.global(), cx, error.handle()).unwrap();
}
// Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
self.closed_promise.borrow().set_promise_is_handled();
// Perform ! stream.[[controller]].[[ReleaseSteps]]().
stream.perform_release_steps();
// Set stream.[[reader]] to undefined.
stream.set_reader(None);
// Set reader.[[stream]] to undefined.
self.stream.set(None);
}
}
/// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease>
#[allow(unsafe_code)]
pub(crate) fn release(&self) {
// Perform ! ReadableStreamReaderGenericRelease(reader).
self.generic_release();
// Let e be a new TypeError exception.
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut error = UndefinedValue());
unsafe {
Error::Type("Reader is released".to_owned()).to_jsval(
*cx,
&self.global(),
error.handle_mut(),
)
};
// Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
self.error_read_requests(error.handle());
}
#[allow(crown::unrooted_must_root)]
fn take_read_requests(&self) -> VecDeque<ReadRequest> {
mem::take(&mut *self.read_requests.borrow_mut())
}
/// <https://streams.spec.whatwg.org/#readable-stream-reader-generic-cancel>
fn generic_cancel(&self, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
// Let stream be reader.[[stream]].
let stream = self.stream.get();
// Assert: stream is not undefined.
let stream =
stream.expect("Reader should have a stream when generic cancel is called into.");
// Return ! ReadableStreamCancel(stream, reason).
stream.cancel(reason, can_gc)
}
/// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreadererrorreadrequests>
#[allow(crown::unrooted_must_root)]
fn error_read_requests(&self, rval: SafeHandleValue) {
// step 1
let mut read_requests = self.take_read_requests();
// step 2 & 3
for request in read_requests.drain(0..) {
request.error_steps(rval);
}
}
/// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
pub(crate) fn read(&self, read_request: &ReadRequest, can_gc: CanGc) {
// Let stream be reader.[[stream]].
// Assert: stream is not undefined.
assert!(self.stream.get().is_some());
let stream = self.stream.get().unwrap();
// Set stream.[[disturbed]] to true.
stream.set_is_disturbed(true);
// If stream.[[state]] is "closed", perform readRequests close steps.
if stream.is_closed() {
read_request.close_steps();
} else if stream.is_errored() {
// Otherwise, if stream.[[state]] is "errored",
// perform readRequests error steps given stream.[[storedError]].
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut error = UndefinedValue());
stream.get_stored_error(error.handle_mut());
read_request.error_steps(error.handle());
} else {
// Otherwise
// Assert: stream.[[state]] is "readable".
assert!(stream.is_readable());
// Perform ! stream.[[controller]].[[PullSteps]](readRequest).
stream.perform_pull_steps(read_request, can_gc);
}
}
/// <https://streams.spec.whatwg.org/#ref-for-readablestreamgenericreader-closedpromise%E2%91%A1>
pub(crate) fn append_native_handler_to_closed_promise(
&self,
branch_1: &ReadableStream,
branch_2: &ReadableStream,
canceled_1: Rc<Cell<bool>>,
canceled_2: Rc<Cell<bool>>,
cancel_promise: Rc<Promise>,
can_gc: CanGc,
) {
let branch_1_controller = branch_1.get_default_controller();
let branch_2_controller = branch_2.get_default_controller();
let global = self.global();
let handler = PromiseNativeHandler::new(
&global,
None,
Some(Box::new(ClosedPromiseRejectionHandler {
branch_1_controller: Dom::from_ref(&branch_1_controller),
branch_2_controller: Dom::from_ref(&branch_2_controller),
canceled_1,
canceled_2,
cancel_promise,
})),
);
let realm = enter_realm(&*global);
let comp = InRealm::Entered(&realm);
self.closed_promise
.borrow()
.append_native_handler(&handler, comp, can_gc);
}
}
impl ReadableStreamDefaultReaderMethods<crate::DomTypeHolder> for ReadableStreamDefaultReader {
/// <https://streams.spec.whatwg.org/#default-reader-constructor>
fn Constructor(
global: &GlobalScope,
proto: Option<SafeHandleObject>,
can_gc: CanGc,
stream: &ReadableStream,
) -> Fallible<DomRoot<Self>> {
ReadableStreamDefaultReader::Constructor(global, proto, can_gc, stream)
}
/// <https://streams.spec.whatwg.org/#default-reader-read>
#[allow(unsafe_code)]
#[allow(crown::unrooted_must_root)]
fn Read(&self, can_gc: CanGc) -> Rc<Promise> {
// If this.[[stream]] is undefined, return a promise rejected with a TypeError exception.
if self.stream.get().is_none() {
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut error = UndefinedValue());
unsafe {
Error::Type("stream is undefined".to_owned()).to_jsval(
*cx,
&self.global(),
error.handle_mut(),
)
};
return Promise::new_rejected(&self.global(), cx, error.handle()).unwrap();
}
// Let promise be a new promise.
let promise = Promise::new(&self.reflector_.global(), can_gc);
// Let readRequest be a new read request with the following items:
// chunk steps, given chunk
// Resolve promise with «[ "value" → chunk, "done" → false ]».
//
// close steps
// Resolve promise with «[ "value" → undefined, "done" → true ]».
//
// error steps, given e
// Reject promise with e.
// Rooting(unrooted_must_root): the read request contains only a promise,
// which does not need to be rooted,
// as it is safely managed natively via an Rc.
let read_request = ReadRequest::Read(promise.clone());
// Perform ! ReadableStreamDefaultReaderRead(this, readRequest).
self.read(&read_request, can_gc);
// Return promise.
promise
}
/// <https://streams.spec.whatwg.org/#default-reader-release-lock>
fn ReleaseLock(&self) {
if self.stream.get().is_some() {
// step 2 - Perform ! ReadableStreamDefaultReaderRelease(this).
self.release();
}
// step 1 - If this.[[stream]] is undefined, return.
}
/// <https://streams.spec.whatwg.org/#generic-reader-closed>
fn Closed(&self) -> Rc<Promise> {
self.closed_promise.borrow().clone()
}
/// <https://streams.spec.whatwg.org/#generic-reader-cancel>
fn Cancel(&self, _cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
if self.stream.get().is_none() {
// If this.[[stream]] is undefined,
// return a promise rejected with a TypeError exception.
let promise = Promise::new(&self.reflector_.global(), can_gc);
promise.reject_error(Error::Type("stream is undefined".to_owned()));
promise
} else {
// Return ! ReadableStreamReaderGenericCancel(this, reason).
self.generic_cancel(reason, can_gc)
}
}
}