servo/components/script/dom/readablestreambyobreader.rs
Taym Haddadi a5cf04c479
script: implement ReadableByteStreamController (#35410)
* script: implement ReadableByteStreamController

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* implement can_copy_data_block_bytes and copy_data_block_bytes

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Remove BufferSource::Default

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* implement StartAlgorithmFulfillmentHandler, StartAlgorithmRejectionHandler, PullAlgorithmFulfillmentHandler, PullAlgorithmRejectionHandler for ReadableByteStreamController

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* implement perform_pull_into

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* fix build

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Fix clippy

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Fix build

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Remove RefCell from PullIntoDescriptor and QueueEntry

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Remove commented code

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* add perform_cancel_steps, perform_release_steps and perform_pull_steps

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* fix clippy

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* fix crown

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* unskip readable-byte-streams

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Fix CRASH

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* fix clippy

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Fix more CRASHS

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* fix more crashes

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Fix bad-buffers-and-views.any.js test

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Update test expectations

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Fix BorrowMutError crashes

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* fix view_byte_length  test

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Fix non-transferable-buffers test

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Pass contexts as much as possible by reference

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Make respond_internal Fallible

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Fix crwon

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* fix process pull into descriptors using queue logic and resulting double-borrow

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* Fix clippy

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* FIx more crashes

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Fix timeout tests

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Fix all tests

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Remove all error! logs

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Remove  #[allow(unsafe_code)]

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Fix lint
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Fix tidy

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Fix test expectation

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

---------

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Co-authored-by: gterzian <2792687+gterzian@users.noreply.github.com>
2025-03-10 19:43:04 +00:00

429 lines
16 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#![allow(dead_code)]
use std::collections::VecDeque;
use std::mem;
use std::rc::Rc;
use dom_struct::dom_struct;
use js::gc::CustomAutoRooterGuard;
use js::jsapi::Heap;
use js::jsval::{JSVal, UndefinedValue};
use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
use js::typedarray::{ArrayBufferView, ArrayBufferViewU8};
use super::bindings::buffer_source::HeapBufferSource;
use super::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderReadOptions;
use super::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamReadResult;
use super::bindings::reflector::reflect_dom_object;
use super::readablestreamgenericreader::ReadableStreamGenericReader;
use crate::dom::bindings::cell::DomRefCell;
use crate::dom::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderMethods;
use crate::dom::bindings::error::{Error, ErrorToJsval};
use crate::dom::bindings::import::module::Fallible;
use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
use crate::dom::bindings::root::{DomRoot, MutNullableDom};
use crate::dom::bindings::trace::RootedTraceableBox;
use crate::dom::globalscope::GlobalScope;
use crate::dom::promise::Promise;
use crate::dom::readablestream::ReadableStream;
use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
/// <https://streams.spec.whatwg.org/#read-into-request>
#[derive(Clone, JSTraceable, MallocSizeOf)]
pub enum ReadIntoRequest {
/// <https://streams.spec.whatwg.org/#byob-reader-read>
Read(#[ignore_malloc_size_of = "Rc is hard"] Rc<Promise>),
}
impl ReadIntoRequest {
/// <https://streams.spec.whatwg.org/#ref-for-read-into-request-chunk-steps>
pub fn chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>, can_gc: CanGc) {
// chunk steps, given chunk
// Resolve promise with «[ "value" → chunk, "done" → false ]».
match self {
ReadIntoRequest::Read(promise) => {
promise.resolve_native(
&ReadableStreamReadResult {
done: Some(false),
value: chunk,
},
can_gc,
);
},
}
}
/// <https://streams.spec.whatwg.org/#ref-for-read-into-request-close-steps%E2%91%A0>
pub fn close_steps(&self, chunk: Option<RootedTraceableBox<Heap<JSVal>>>, can_gc: CanGc) {
// close steps, given chunk
// Resolve promise with «[ "value" → chunk, "done" → true ]».
match self {
ReadIntoRequest::Read(promise) => match chunk {
Some(chunk) => promise.resolve_native(
&ReadableStreamReadResult {
done: Some(true),
value: chunk,
},
can_gc,
),
None => {
let result = RootedTraceableBox::new(Heap::default());
result.set(UndefinedValue());
promise.resolve_native(
&ReadableStreamReadResult {
done: Some(true),
value: result,
},
can_gc,
);
},
},
}
}
/// <https://streams.spec.whatwg.org/#ref-for-read-into-request-error-steps>
pub(crate) fn error_steps(&self, e: SafeHandleValue, can_gc: CanGc) {
// error steps, given e
// Reject promise with e.
match self {
ReadIntoRequest::Read(promise) => promise.reject_native(&e, can_gc),
}
}
}
/// <https://streams.spec.whatwg.org/#readablestreambyobreader>
#[dom_struct]
pub(crate) struct ReadableStreamBYOBReader {
reflector_: Reflector,
/// <https://streams.spec.whatwg.org/#readablestreamgenericreader-stream>
stream: MutNullableDom<ReadableStream>,
read_into_requests: DomRefCell<VecDeque<ReadIntoRequest>>,
/// <https://streams.spec.whatwg.org/#readablestreamgenericreader-closedpromise>
#[ignore_malloc_size_of = "Rc is hard"]
closed_promise: DomRefCell<Rc<Promise>>,
}
impl ReadableStreamBYOBReader {
fn new_with_proto(
global: &GlobalScope,
proto: Option<SafeHandleObject>,
can_gc: CanGc,
) -> DomRoot<ReadableStreamBYOBReader> {
reflect_dom_object_with_proto(
Box::new(ReadableStreamBYOBReader::new_inherited(global, can_gc)),
global,
proto,
can_gc,
)
}
fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamBYOBReader {
ReadableStreamBYOBReader {
reflector_: Reflector::new(),
stream: MutNullableDom::new(None),
read_into_requests: DomRefCell::new(Default::default()),
closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
}
}
pub(crate) fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamBYOBReader> {
reflect_dom_object(
Box::new(Self::new_inherited(global, can_gc)),
global,
can_gc,
)
}
/// <https://streams.spec.whatwg.org/#set-up-readable-stream-byob-reader>
pub(crate) fn set_up(
&self,
stream: &ReadableStream,
global: &GlobalScope,
can_gc: CanGc,
) -> Fallible<()> {
// If ! IsReadableStreamLocked(stream) is true, throw a TypeError exception.
if stream.is_locked() {
return Err(Error::Type("stream is locked".to_owned()));
}
// If stream.[[controller]] does not implement ReadableByteStreamController, throw a TypeError exception.
if !stream.has_byte_controller() {
return Err(Error::Type(
"stream controller is not a byte stream controller".to_owned(),
));
}
// Perform ! ReadableStreamReaderGenericInitialize(reader, stream).
self.generic_initialize(global, stream, can_gc);
// Set reader.[[readIntoRequests]] to a new empty list.
self.read_into_requests.borrow_mut().clear();
Ok(())
}
/// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreambyobreaderrelease>
pub(crate) fn release(&self, can_gc: CanGc) -> Fallible<()> {
// Perform ! ReadableStreamReaderGenericRelease(reader).
self.generic_release(can_gc)?;
// Let e be a new TypeError exception.
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut error = UndefinedValue());
Error::Type("Reader is released".to_owned()).to_jsval(
cx,
&self.global(),
error.handle_mut(),
);
// Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
self.error_read_into_requests(error.handle(), can_gc);
Ok(())
}
/// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreambyobreadererrorreadintorequests>
pub(crate) fn error_read_into_requests(&self, e: SafeHandleValue, can_gc: CanGc) {
// Reject reader.[[closedPromise]] with e.
self.closed_promise.borrow().reject_native(&e, can_gc);
// Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
self.closed_promise.borrow().set_promise_is_handled();
// Let readRequests be reader.[[readRequests]].
let mut read_into_requests = self.take_read_into_requests();
// Set reader.[[readIntoRequests]] to a new empty list.
for request in read_into_requests.drain(0..) {
// Perform readIntoRequests error steps, given e.
request.error_steps(e, can_gc);
}
}
fn take_read_into_requests(&self) -> VecDeque<ReadIntoRequest> {
mem::take(&mut *self.read_into_requests.borrow_mut())
}
/// <https://streams.spec.whatwg.org/#readable-stream-add-read-into-request>
pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
self.read_into_requests
.borrow_mut()
.push_back(read_request.clone());
}
/// <https://streams.spec.whatwg.org/#readable-stream-cancel>
pub(crate) fn cancel(&self, can_gc: CanGc) {
// If reader is not undefined and reader implements ReadableStreamBYOBReader,
// Let readIntoRequests be reader.[[readIntoRequests]].
let mut read_into_requests = self.take_read_into_requests();
// Set reader.[[readIntoRequests]] to an empty list.
// Perform readIntoRequests close steps, given undefined.
for request in read_into_requests.drain(0..) {
// Perform readIntoRequests close steps, given undefined.
request.close_steps(None, can_gc);
}
}
pub(crate) fn close(&self, can_gc: CanGc) {
// Resolve reader.[[closedPromise]] with undefined.
self.closed_promise.borrow().resolve_native(&(), can_gc);
}
/// <https://streams.spec.whatwg.org/#readable-stream-byob-reader-read>
pub(crate) fn read(
&self,
cx: SafeJSContext,
view: HeapBufferSource<ArrayBufferViewU8>,
options: &ReadableStreamBYOBReaderReadOptions,
read_into_request: &ReadIntoRequest,
can_gc: CanGc,
) {
// Let stream be reader.[[stream]].
// Assert: stream is not undefined.
assert!(self.stream.get().is_some());
let stream = self.stream.get().unwrap();
// Set stream.[[disturbed]] to true.
stream.set_is_disturbed(true);
// If stream.[[state]] is "errored", perform readIntoRequests error steps given stream.[[storedError]].
if stream.is_errored() {
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut error = UndefinedValue());
stream.get_stored_error(error.handle_mut());
read_into_request.error_steps(error.handle(), can_gc);
} else {
// Otherwise,
// perform ! ReadableByteStreamControllerPullInto(stream.[[controller]], view, min, readIntoRequest).
stream.perform_pull_into(cx, read_into_request, view, options, can_gc);
}
}
pub(crate) fn get_num_read_into_requests(&self) -> usize {
self.read_into_requests.borrow().len()
}
pub(crate) fn remove_read_into_request(&self) -> ReadIntoRequest {
self.read_into_requests
.borrow_mut()
.pop_front()
.expect("read into requests is empty")
}
}
impl ReadableStreamBYOBReaderMethods<crate::DomTypeHolder> for ReadableStreamBYOBReader {
/// <https://streams.spec.whatwg.org/#byob-reader-constructor>
fn Constructor(
global: &GlobalScope,
proto: Option<SafeHandleObject>,
can_gc: CanGc,
stream: &ReadableStream,
) -> Fallible<DomRoot<Self>> {
let reader = Self::new_with_proto(global, proto, can_gc);
// Perform ? SetUpReadableStreamBYOBReader(this, stream).
Self::set_up(&reader, stream, global, can_gc)?;
Ok(reader)
}
/// <https://streams.spec.whatwg.org/#byob-reader-read>
fn Read(
&self,
view: CustomAutoRooterGuard<ArrayBufferView>,
options: &ReadableStreamBYOBReaderReadOptions,
can_gc: CanGc,
) -> Rc<Promise> {
let view = HeapBufferSource::<ArrayBufferViewU8>::from_view(view);
// Let promise be a new promise.
let promise = Promise::new(&self.global(), can_gc);
let cx = GlobalScope::get_cx();
// If view.[[ByteLength]] is 0, return a promise rejected with a TypeError exception.
if view.byte_length() == 0 {
promise.reject_error(Error::Type("view byte length is 0".to_owned()), can_gc);
return promise;
}
// If view.[[ViewedArrayBuffer]].[[ArrayBufferByteLength]] is 0,
// return a promise rejected with a TypeError exception.
if view.viewed_buffer_array_byte_length(cx) == 0 {
promise.reject_error(
Error::Type("viewed buffer byte length is 0".to_owned()),
can_gc,
);
return promise;
}
// If ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is true,
// return a promise rejected with a TypeError exception.
if view.is_detached_buffer(cx) {
promise.reject_error(Error::Type("view is detached".to_owned()), can_gc);
return promise;
}
// If options["min"] is 0, return a promise rejected with a TypeError exception.
if options.min == 0 {
promise.reject_error(Error::Type("min is 0".to_owned()), can_gc);
return promise;
}
// If view has a [[TypedArrayName]] internal slot,
if view.has_typed_array_name() {
// If options["min"] > view.[[ArrayLength]], return a promise rejected with a RangeError exception.
if options.min > (view.get_typed_array_length() as u64) {
promise.reject_error(
Error::Range("min is greater than array length".to_owned()),
can_gc,
);
return promise;
}
} else {
// Otherwise (i.e., it is a DataView),
// If options["min"] > view.[[ByteLength]], return a promise rejected with a RangeError exception.
if options.min > (view.byte_length() as u64) {
promise.reject_error(
Error::Range("min is greater than byte length".to_owned()),
can_gc,
);
return promise;
}
}
// If this.[[stream]] is undefined, return a promise rejected with a TypeError exception.
if self.stream.get().is_none() {
promise.reject_error(
Error::Type("min is greater than byte length".to_owned()),
can_gc,
);
return promise;
}
// Let readIntoRequest be a new read-into request with the following items:
//
// chunk steps, given chunk
// Resolve promise with «[ "value" → chunk, "done" → false ]».
//
// close steps, given chunk
// Resolve promise with «[ "value" → chunk, "done" → true ]».
//
// error steps, given e
// Reject promise with e
let read_into_request = ReadIntoRequest::Read(promise.clone());
// Perform ! ReadableStreamBYOBReaderRead(this, view, options["min"], readIntoRequest).
self.read(cx, view, options, &read_into_request, can_gc);
// Return promise.
promise
}
/// <https://streams.spec.whatwg.org/#byob-reader-release-lock>
fn ReleaseLock(&self, can_gc: CanGc) -> Fallible<()> {
if self.stream.get().is_none() {
// If this.[[stream]] is undefined, return.
return Ok(());
}
// Perform !ReadableStreamBYOBReaderRelease(this).
self.release(can_gc)
}
/// <https://streams.spec.whatwg.org/#generic-reader-closed>
fn Closed(&self) -> Rc<Promise> {
self.closed()
}
/// <https://streams.spec.whatwg.org/#generic-reader-cancel>
fn Cancel(&self, _cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
self.generic_cancel(&self.global(), reason, can_gc)
}
}
impl ReadableStreamGenericReader for ReadableStreamBYOBReader {
fn get_closed_promise(&self) -> Rc<Promise> {
self.closed_promise.borrow().clone()
}
fn set_closed_promise(&self, promise: Rc<Promise>) {
*self.closed_promise.borrow_mut() = promise;
}
fn set_stream(&self, stream: Option<&ReadableStream>) {
self.stream.set(stream);
}
fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
self.stream.get()
}
fn as_byob_reader(&self) -> Option<&ReadableStreamBYOBReader> {
Some(self)
}
}