mirror of
https://github.com/servo/servo.git
synced 2025-08-03 04:30:10 +01:00
script: implement ReadableByteStreamController (#35410)
* script: implement ReadableByteStreamController Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * implement can_copy_data_block_bytes and copy_data_block_bytes Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Remove BufferSource::Default Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * implement StartAlgorithmFulfillmentHandler, StartAlgorithmRejectionHandler, PullAlgorithmFulfillmentHandler, PullAlgorithmRejectionHandler for ReadableByteStreamController Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * implement perform_pull_into Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix build Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix clippy Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix build Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Remove RefCell from PullIntoDescriptor and QueueEntry Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Remove commented code Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * add perform_cancel_steps, perform_release_steps and perform_pull_steps Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix clippy Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix crown Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * unskip readable-byte-streams Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix CRASH Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix clippy Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix more CRASHS Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix more crashes Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix bad-buffers-and-views.any.js test Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Update test expectations Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix BorrowMutError crashes Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix view_byte_length test Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix non-transferable-buffers test Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Pass contexts as much as possible by reference Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Make respond_internal Fallible Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix crwon Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * fix process pull into descriptors using queue logic and resulting double-borrow Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> * Fix clippy Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * FIx more crashes Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix timeout tests Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix all tests Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Remove all error! logs Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Remove #[allow(unsafe_code)] Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix lint Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix tidy Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> * Fix test expectation Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> --------- Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com> Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> Co-authored-by: gterzian <2792687+gterzian@users.noreply.github.com>
This commit is contained in:
parent
459aee27b6
commit
a5cf04c479
30 changed files with 3410 additions and 324 deletions
|
@ -2,7 +2,7 @@
|
|||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
use std::cell::Cell;
|
||||
use std::cell::{Cell, RefCell};
|
||||
use std::ptr::{self};
|
||||
use std::rc::Rc;
|
||||
|
||||
|
@ -114,6 +114,17 @@ pub(crate) enum ReaderType {
|
|||
Default(MutNullableDom<ReadableStreamDefaultReader>),
|
||||
}
|
||||
|
||||
impl Eq for ReaderType {}
|
||||
impl PartialEq for ReaderType {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
matches!(
|
||||
(self, other),
|
||||
(ReaderType::BYOB(_), ReaderType::BYOB(_)) |
|
||||
(ReaderType::Default(_), ReaderType::Default(_))
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// <https://streams.spec.whatwg.org/#create-readable-stream>
|
||||
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
|
||||
fn create_readable_stream(
|
||||
|
@ -135,12 +146,7 @@ fn create_readable_stream(
|
|||
|
||||
// Let stream be a new ReadableStream.
|
||||
// Perform ! InitializeReadableStream(stream).
|
||||
let stream = ReadableStream::new_with_proto(
|
||||
global,
|
||||
None,
|
||||
ControllerType::Default(MutNullableDom::new(None)),
|
||||
can_gc,
|
||||
);
|
||||
let stream = ReadableStream::new_with_proto(global, None, can_gc);
|
||||
|
||||
// Let controller be a new ReadableStreamDefaultController.
|
||||
let controller = ReadableStreamDefaultController::new(
|
||||
|
@ -169,7 +175,7 @@ pub(crate) struct ReadableStream {
|
|||
/// <https://streams.spec.whatwg.org/#readablestream-controller>
|
||||
/// Note: the inner `MutNullableDom` should really be an `Option<Dom>`,
|
||||
/// because it is never unset once set.
|
||||
controller: ControllerType,
|
||||
controller: RefCell<Option<ControllerType>>,
|
||||
|
||||
/// <https://streams.spec.whatwg.org/#readablestream-storederror>
|
||||
#[ignore_malloc_size_of = "mozjs"]
|
||||
|
@ -179,7 +185,7 @@ pub(crate) struct ReadableStream {
|
|||
disturbed: Cell<bool>,
|
||||
|
||||
/// <https://streams.spec.whatwg.org/#readablestream-reader>
|
||||
reader: ReaderType,
|
||||
reader: RefCell<Option<ReaderType>>,
|
||||
|
||||
/// <https://streams.spec.whatwg.org/#readablestream-state>
|
||||
state: Cell<ReadableStreamState>,
|
||||
|
@ -188,17 +194,13 @@ pub(crate) struct ReadableStream {
|
|||
impl ReadableStream {
|
||||
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
|
||||
/// <https://streams.spec.whatwg.org/#initialize-readable-stream>
|
||||
fn new_inherited(controller: ControllerType) -> ReadableStream {
|
||||
let reader = match &controller {
|
||||
ControllerType::Default(_) => ReaderType::Default(MutNullableDom::new(None)),
|
||||
ControllerType::Byte(_) => ReaderType::BYOB(MutNullableDom::new(None)),
|
||||
};
|
||||
fn new_inherited() -> ReadableStream {
|
||||
ReadableStream {
|
||||
reflector_: Reflector::new(),
|
||||
controller,
|
||||
controller: RefCell::new(None),
|
||||
stored_error: Heap::default(),
|
||||
disturbed: Default::default(),
|
||||
reader,
|
||||
reader: RefCell::new(None),
|
||||
state: Cell::new(Default::default()),
|
||||
}
|
||||
}
|
||||
|
@ -207,11 +209,10 @@ impl ReadableStream {
|
|||
fn new_with_proto(
|
||||
global: &GlobalScope,
|
||||
proto: Option<SafeHandleObject>,
|
||||
controller: ControllerType,
|
||||
can_gc: CanGc,
|
||||
) -> DomRoot<ReadableStream> {
|
||||
reflect_dom_object_with_proto(
|
||||
Box::new(ReadableStream::new_inherited(controller)),
|
||||
Box::new(ReadableStream::new_inherited()),
|
||||
global,
|
||||
proto,
|
||||
can_gc,
|
||||
|
@ -221,21 +222,22 @@ impl ReadableStream {
|
|||
/// Used as part of
|
||||
/// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
|
||||
pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
|
||||
match self.controller {
|
||||
ControllerType::Default(ref ctrl) => ctrl.set(Some(controller)),
|
||||
ControllerType::Byte(_) => {
|
||||
unreachable!("set_default_controller called in setup of default controller.")
|
||||
},
|
||||
}
|
||||
*self.controller.borrow_mut() = Some(ControllerType::Default(MutNullableDom::new(Some(
|
||||
controller,
|
||||
))));
|
||||
}
|
||||
|
||||
/// Used as part of
|
||||
/// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
|
||||
pub(crate) fn set_byte_controller(&self, controller: &ReadableByteStreamController) {
|
||||
*self.controller.borrow_mut() =
|
||||
Some(ControllerType::Byte(MutNullableDom::new(Some(controller))));
|
||||
}
|
||||
|
||||
/// Used as part of
|
||||
/// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
|
||||
pub(crate) fn assert_no_controller(&self) {
|
||||
let has_no_controller = match self.controller {
|
||||
ControllerType::Default(ref ctrl) => ctrl.get().is_none(),
|
||||
ControllerType::Byte(ref ctrl) => ctrl.get().is_none(),
|
||||
};
|
||||
let has_no_controller = self.controller.borrow().is_none();
|
||||
assert!(has_no_controller);
|
||||
}
|
||||
|
||||
|
@ -264,12 +266,7 @@ impl ReadableStream {
|
|||
can_gc: CanGc,
|
||||
) -> Fallible<DomRoot<ReadableStream>> {
|
||||
assert!(source.is_native());
|
||||
let stream = ReadableStream::new_with_proto(
|
||||
global,
|
||||
None,
|
||||
ControllerType::Default(MutNullableDom::new(None)),
|
||||
can_gc,
|
||||
);
|
||||
let stream = ReadableStream::new_with_proto(global, None, can_gc);
|
||||
let controller = ReadableStreamDefaultController::new(
|
||||
global,
|
||||
source,
|
||||
|
@ -283,28 +280,43 @@ impl ReadableStream {
|
|||
|
||||
/// Call into the release steps of the controller,
|
||||
pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
|
||||
match &self.controller {
|
||||
ControllerType::Default(controller) => controller
|
||||
.get()
|
||||
.map(|controller_ref| controller_ref.perform_release_steps())
|
||||
.unwrap_or_else(|| Err(Error::Type("Stream should have controller.".to_string()))),
|
||||
ControllerType::Byte(_) => todo!(),
|
||||
match self.controller.borrow().as_ref() {
|
||||
Some(ControllerType::Default(ref controller)) => {
|
||||
let controller = controller
|
||||
.get()
|
||||
.ok_or_else(|| Error::Type("Stream should have controller.".to_string()))?;
|
||||
controller.perform_release_steps()
|
||||
},
|
||||
Some(ControllerType::Byte(ref controller)) => {
|
||||
let controller = controller
|
||||
.get()
|
||||
.ok_or_else(|| Error::Type("Stream should have controller.".to_string()))?;
|
||||
controller.perform_release_steps()
|
||||
},
|
||||
None => Err(Error::Type("Stream should have controller.".to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Call into the pull steps of the controller,
|
||||
/// as part of
|
||||
/// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
|
||||
pub(crate) fn perform_pull_steps(&self, read_request: &ReadRequest, can_gc: CanGc) {
|
||||
match self.controller {
|
||||
ControllerType::Default(ref controller) => controller
|
||||
pub(crate) fn perform_pull_steps(
|
||||
&self,
|
||||
cx: SafeJSContext,
|
||||
read_request: &ReadRequest,
|
||||
can_gc: CanGc,
|
||||
) {
|
||||
match self.controller.borrow().as_ref() {
|
||||
Some(ControllerType::Default(ref controller)) => controller
|
||||
.get()
|
||||
.expect("Stream should have controller.")
|
||||
.perform_pull_steps(read_request, can_gc),
|
||||
ControllerType::Byte(_) => {
|
||||
unreachable!(
|
||||
"Pulling a chunk from a stream with a byte controller using a default reader"
|
||||
)
|
||||
Some(ControllerType::Byte(ref controller)) => controller
|
||||
.get()
|
||||
.expect("Stream should have controller.")
|
||||
.perform_pull_steps(cx, read_request, can_gc),
|
||||
None => {
|
||||
unreachable!("Stream does not have a controller.");
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -312,29 +324,31 @@ impl ReadableStream {
|
|||
/// Call into the pull steps of the controller,
|
||||
/// as part of
|
||||
/// <https://streams.spec.whatwg.org/#readable-stream-byob-reader-read>
|
||||
pub(crate) fn perform_pull_into_steps(
|
||||
pub(crate) fn perform_pull_into(
|
||||
&self,
|
||||
cx: SafeJSContext,
|
||||
read_into_request: &ReadIntoRequest,
|
||||
view: HeapBufferSource<ArrayBufferViewU8>,
|
||||
options: &ReadableStreamBYOBReaderReadOptions,
|
||||
can_gc: CanGc,
|
||||
) {
|
||||
match self.controller {
|
||||
ControllerType::Byte(ref controller) => controller
|
||||
match self.controller.borrow().as_ref() {
|
||||
Some(ControllerType::Byte(ref controller)) => controller
|
||||
.get()
|
||||
.expect("Stream should have controller.")
|
||||
.perform_pull_into(read_into_request, view, options, can_gc),
|
||||
ControllerType::Default(_) => unreachable!(
|
||||
"Pulling a chunk from a stream with a default controller using a BYOB reader"
|
||||
),
|
||||
.perform_pull_into(cx, read_into_request, view, options, can_gc),
|
||||
_ => {
|
||||
unreachable!(
|
||||
"Pulling a chunk from a stream with a default controller using a BYOB reader"
|
||||
)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// <https://streams.spec.whatwg.org/#readable-stream-add-read-request>
|
||||
pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
|
||||
match self.reader {
|
||||
// Assert: stream.[[reader]] implements ReadableStreamDefaultReader.
|
||||
ReaderType::Default(ref reader) => {
|
||||
match self.reader.borrow().as_ref() {
|
||||
Some(ReaderType::Default(ref reader)) => {
|
||||
let Some(reader) = reader.get() else {
|
||||
panic!("Attempt to add a read request without having first acquired a reader.");
|
||||
};
|
||||
|
@ -345,21 +359,17 @@ impl ReadableStream {
|
|||
// Append readRequest to stream.[[reader]].[[readRequests]].
|
||||
reader.add_read_request(read_request);
|
||||
},
|
||||
ReaderType::BYOB(_) => {
|
||||
_ => {
|
||||
unreachable!("Adding a read request can only be done on a default reader.")
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
/// <https://streams.spec.whatwg.org/#readable-stream-add-read-into-request>
|
||||
pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
|
||||
match self.reader {
|
||||
match self.reader.borrow().as_ref() {
|
||||
// Assert: stream.[[reader]] implements ReadableStreamBYOBReader.
|
||||
ReaderType::Default(_) => {
|
||||
unreachable!("Adding a read into request can only be done on a BYOB reader.")
|
||||
},
|
||||
ReaderType::BYOB(ref reader) => {
|
||||
Some(ReaderType::BYOB(ref reader)) => {
|
||||
let Some(reader) = reader.get() else {
|
||||
unreachable!(
|
||||
"Attempt to add a read into request without having first acquired a reader."
|
||||
|
@ -372,20 +382,25 @@ impl ReadableStream {
|
|||
// Append readRequest to stream.[[reader]].[[readIntoRequests]].
|
||||
reader.add_read_into_request(read_request);
|
||||
},
|
||||
_ => {
|
||||
unreachable!("Adding a read into request can only be done on a BYOB reader.")
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Endpoint to enqueue chunks directly from Rust.
|
||||
/// Note: in other use cases this call happens via the controller.
|
||||
pub(crate) fn enqueue_native(&self, bytes: Vec<u8>, can_gc: CanGc) {
|
||||
match self.controller {
|
||||
ControllerType::Default(ref controller) => controller
|
||||
match self.controller.borrow().as_ref() {
|
||||
Some(ControllerType::Default(ref controller)) => controller
|
||||
.get()
|
||||
.expect("Stream should have controller.")
|
||||
.enqueue_native(bytes, can_gc),
|
||||
_ => unreachable!(
|
||||
"Enqueueing chunk to a stream from Rust on other than default controller"
|
||||
),
|
||||
_ => {
|
||||
unreachable!(
|
||||
"Enqueueing chunk to a stream from Rust on other than default controller"
|
||||
);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -393,22 +408,37 @@ impl ReadableStream {
|
|||
pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
|
||||
// Assert: stream.[[state]] is "readable".
|
||||
assert!(self.is_readable());
|
||||
|
||||
// Set stream.[[state]] to "errored".
|
||||
self.state.set(ReadableStreamState::Errored);
|
||||
|
||||
// Set stream.[[storedError]] to e.
|
||||
self.stored_error.set(e.get());
|
||||
|
||||
// Let reader be stream.[[reader]].
|
||||
match self.reader {
|
||||
ReaderType::Default(ref reader) => {
|
||||
|
||||
match self.reader.borrow().as_ref() {
|
||||
Some(ReaderType::Default(ref reader)) => {
|
||||
let Some(reader) = reader.get() else {
|
||||
// If reader is undefined, return.
|
||||
return;
|
||||
};
|
||||
|
||||
// Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
|
||||
reader.error(e, can_gc);
|
||||
},
|
||||
// Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
|
||||
_ => todo!(),
|
||||
Some(ReaderType::BYOB(ref reader)) => {
|
||||
let Some(reader) = reader.get() else {
|
||||
// If reader is undefined, return.
|
||||
return;
|
||||
};
|
||||
|
||||
// Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
|
||||
reader.error_read_into_requests(e, can_gc);
|
||||
},
|
||||
None => {
|
||||
// If reader is undefined, return.
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -429,14 +459,14 @@ impl ReadableStream {
|
|||
/// Call into the controller's `Close` method.
|
||||
/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
|
||||
pub(crate) fn controller_close_native(&self, can_gc: CanGc) {
|
||||
match self.controller {
|
||||
ControllerType::Default(ref controller) => {
|
||||
match self.controller.borrow().as_ref() {
|
||||
Some(ControllerType::Default(ref controller)) => {
|
||||
let _ = controller
|
||||
.get()
|
||||
.expect("Stream should have controller.")
|
||||
.Close(can_gc);
|
||||
},
|
||||
ControllerType::Byte(_) => {
|
||||
_ => {
|
||||
unreachable!("Native closing is only done on default controllers.")
|
||||
},
|
||||
}
|
||||
|
@ -445,26 +475,28 @@ impl ReadableStream {
|
|||
/// Returns a boolean reflecting whether the stream has all data in memory.
|
||||
/// Useful for native source integration only.
|
||||
pub(crate) fn in_memory(&self) -> bool {
|
||||
match self.controller {
|
||||
ControllerType::Default(ref controller) => controller
|
||||
match self.controller.borrow().as_ref() {
|
||||
Some(ControllerType::Default(ref controller)) => controller
|
||||
.get()
|
||||
.expect("Stream should have controller.")
|
||||
.in_memory(),
|
||||
ControllerType::Byte(_) => unreachable!(
|
||||
"Checking if source is in memory for a stream with a non-default controller"
|
||||
),
|
||||
_ => {
|
||||
unreachable!(
|
||||
"Checking if source is in memory for a stream with a non-default controller"
|
||||
)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Return bytes for synchronous use, if the stream has all data in memory.
|
||||
/// Useful for native source integration only.
|
||||
pub(crate) fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
|
||||
match self.controller {
|
||||
ControllerType::Default(ref controller) => controller
|
||||
match self.controller.borrow().as_ref() {
|
||||
Some(ControllerType::Default(ref controller)) => controller
|
||||
.get()
|
||||
.expect("Stream should have controller.")
|
||||
.get_in_memory_bytes(),
|
||||
ControllerType::Byte(_) => {
|
||||
_ => {
|
||||
unreachable!("Getting in-memory bytes for a stream with a non-default controller")
|
||||
},
|
||||
}
|
||||
|
@ -503,13 +535,26 @@ impl ReadableStream {
|
|||
}
|
||||
|
||||
pub(crate) fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
|
||||
match self.controller {
|
||||
ControllerType::Default(ref controller) => {
|
||||
match self.controller.borrow().as_ref() {
|
||||
Some(ControllerType::Default(ref controller)) => {
|
||||
controller.get().expect("Stream should have controller.")
|
||||
},
|
||||
ControllerType::Byte(_) => unreachable!(
|
||||
"Getting default controller for a stream with a non-default controller"
|
||||
),
|
||||
_ => {
|
||||
unreachable!(
|
||||
"Getting default controller for a stream with a non-default controller"
|
||||
)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_default_reader(&self) -> DomRoot<ReadableStreamDefaultReader> {
|
||||
match self.reader.borrow().as_ref() {
|
||||
Some(ReaderType::Default(ref reader)) => {
|
||||
reader.get().expect("Stream should have reader.")
|
||||
},
|
||||
_ => {
|
||||
unreachable!("Getting default reader for a stream with a non-default reader")
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -519,8 +564,8 @@ impl ReadableStream {
|
|||
/// Native call to
|
||||
/// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
|
||||
pub(crate) fn read_a_chunk(&self, can_gc: CanGc) -> Rc<Promise> {
|
||||
match self.reader {
|
||||
ReaderType::Default(ref reader) => {
|
||||
match self.reader.borrow().as_ref() {
|
||||
Some(ReaderType::Default(ref reader)) => {
|
||||
let Some(reader) = reader.get() else {
|
||||
unreachable!(
|
||||
"Attempt to read stream chunk without having first acquired a reader."
|
||||
|
@ -528,7 +573,7 @@ impl ReadableStream {
|
|||
};
|
||||
reader.Read(can_gc)
|
||||
},
|
||||
ReaderType::BYOB(_) => {
|
||||
_ => {
|
||||
unreachable!("Native reading of a chunk can only be done with a default reader.")
|
||||
},
|
||||
}
|
||||
|
@ -539,14 +584,18 @@ impl ReadableStream {
|
|||
/// Native call to
|
||||
/// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease>
|
||||
pub(crate) fn stop_reading(&self, can_gc: CanGc) {
|
||||
match self.reader {
|
||||
ReaderType::Default(ref reader) => {
|
||||
let reader_ref = self.reader.borrow();
|
||||
|
||||
match reader_ref.as_ref() {
|
||||
Some(ReaderType::Default(ref reader)) => {
|
||||
let Some(reader) = reader.get() else {
|
||||
unreachable!("Attempt to stop reading without having first acquired a reader.");
|
||||
};
|
||||
|
||||
drop(reader_ref);
|
||||
reader.release(can_gc).expect("Reader release cannot fail.");
|
||||
},
|
||||
ReaderType::BYOB(_) => {
|
||||
_ => {
|
||||
unreachable!("Native stop reading can only be done with a default reader.")
|
||||
},
|
||||
}
|
||||
|
@ -554,9 +603,10 @@ impl ReadableStream {
|
|||
|
||||
/// <https://streams.spec.whatwg.org/#is-readable-stream-locked>
|
||||
pub(crate) fn is_locked(&self) -> bool {
|
||||
match self.reader {
|
||||
ReaderType::Default(ref reader) => reader.get().is_some(),
|
||||
ReaderType::BYOB(ref reader) => reader.get().is_some(),
|
||||
match self.reader.borrow().as_ref() {
|
||||
Some(ReaderType::Default(ref reader)) => reader.get().is_some(),
|
||||
Some(ReaderType::BYOB(ref reader)) => reader.get().is_some(),
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -581,39 +631,70 @@ impl ReadableStream {
|
|||
}
|
||||
|
||||
pub(crate) fn has_default_reader(&self) -> bool {
|
||||
match self.reader {
|
||||
ReaderType::Default(ref reader) => reader.get().is_some(),
|
||||
ReaderType::BYOB(_) => false,
|
||||
match self.reader.borrow().as_ref() {
|
||||
Some(ReaderType::Default(ref reader)) => reader.get().is_some(),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn has_byob_reader(&self) -> bool {
|
||||
match self.reader.borrow().as_ref() {
|
||||
Some(ReaderType::BYOB(ref reader)) => reader.get().is_some(),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn has_byte_controller(&self) -> bool {
|
||||
matches!(self.controller, ControllerType::Byte(_))
|
||||
match self.controller.borrow().as_ref() {
|
||||
Some(ControllerType::Byte(ref controller)) => controller.get().is_some(),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests>
|
||||
pub(crate) fn get_num_read_requests(&self) -> usize {
|
||||
assert!(self.has_default_reader());
|
||||
match self.reader {
|
||||
ReaderType::Default(ref reader) => {
|
||||
match self.reader.borrow().as_ref() {
|
||||
Some(ReaderType::Default(ref reader)) => {
|
||||
let reader = reader
|
||||
.get()
|
||||
.expect("Stream must have a reader when get num read requests is called into.");
|
||||
.expect("Stream must have a reader when getting the number of read requests.");
|
||||
reader.get_num_read_requests()
|
||||
},
|
||||
ReaderType::BYOB(_) => unreachable!(
|
||||
_ => unreachable!(
|
||||
"Stream must have a default reader when get num read requests is called into."
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-into-requests>
|
||||
pub(crate) fn get_num_read_into_requests(&self) -> usize {
|
||||
assert!(self.has_byob_reader());
|
||||
|
||||
match self.reader.borrow().as_ref() {
|
||||
Some(ReaderType::BYOB(ref reader)) => {
|
||||
let Some(reader) = reader.get() else {
|
||||
unreachable!(
|
||||
"Stream must have a reader when get num read into requests is called into."
|
||||
);
|
||||
};
|
||||
reader.get_num_read_into_requests()
|
||||
},
|
||||
_ => {
|
||||
unreachable!(
|
||||
"Stream must have a BYOB reader when get num read into requests is called into."
|
||||
);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request>
|
||||
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
|
||||
pub(crate) fn fulfill_read_request(&self, chunk: SafeHandleValue, done: bool, can_gc: CanGc) {
|
||||
// step 1 - Assert: ! ReadableStreamHasDefaultReader(stream) is true.
|
||||
assert!(self.has_default_reader());
|
||||
match self.reader {
|
||||
ReaderType::Default(ref reader) => {
|
||||
|
||||
match self.reader.borrow().as_ref() {
|
||||
Some(ReaderType::Default(ref reader)) => {
|
||||
// step 2 - Let reader be stream.[[reader]].
|
||||
let reader = reader
|
||||
.get()
|
||||
|
@ -634,12 +715,59 @@ impl ReadableStream {
|
|||
request.chunk_steps(result, can_gc);
|
||||
}
|
||||
},
|
||||
ReaderType::BYOB(_) => unreachable!(
|
||||
"Stream must have a default reader when fulfill read requests is called into."
|
||||
),
|
||||
_ => {
|
||||
unreachable!(
|
||||
"Stream must have a default reader when fulfill read requests is called into."
|
||||
);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-into-request>
|
||||
pub(crate) fn fulfill_read_into_request(
|
||||
&self,
|
||||
chunk: SafeHandleValue,
|
||||
done: bool,
|
||||
can_gc: CanGc,
|
||||
) {
|
||||
// Assert: ! ReadableStreamHasBYOBReader(stream) is true.
|
||||
assert!(self.has_byob_reader());
|
||||
|
||||
// Let reader be stream.[[reader]].
|
||||
match self.reader.borrow().as_ref() {
|
||||
Some(ReaderType::BYOB(ref reader)) => {
|
||||
let Some(reader) = reader.get() else {
|
||||
unreachable!(
|
||||
"Stream must have a reader when a read into request is fulfilled."
|
||||
);
|
||||
};
|
||||
|
||||
// Assert: reader.[[readIntoRequests]] is not empty.
|
||||
assert!(reader.get_num_read_into_requests() > 0);
|
||||
|
||||
// Let readIntoRequest be reader.[[readIntoRequests]][0].
|
||||
// Remove readIntoRequest from reader.[[readIntoRequests]].
|
||||
let read_into_request = reader.remove_read_into_request();
|
||||
|
||||
// If done is true, perform readIntoRequest’s close steps, given chunk.
|
||||
let result = RootedTraceableBox::new(Heap::default());
|
||||
if done {
|
||||
result.set(*chunk);
|
||||
read_into_request.close_steps(Some(result), can_gc);
|
||||
} else {
|
||||
// Otherwise, perform readIntoRequest’s chunk steps, given chunk.
|
||||
result.set(*chunk);
|
||||
read_into_request.chunk_steps(result, can_gc);
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
unreachable!(
|
||||
"Stream must have a BYOB reader when fulfill read into requests is called into."
|
||||
);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/// <https://streams.spec.whatwg.org/#readable-stream-close>
|
||||
pub(crate) fn close(&self, can_gc: CanGc) {
|
||||
// Assert: stream.[[state]] is "readable".
|
||||
|
@ -647,8 +775,8 @@ impl ReadableStream {
|
|||
// Set stream.[[state]] to "closed".
|
||||
self.state.set(ReadableStreamState::Closed);
|
||||
// Let reader be stream.[[reader]].
|
||||
match self.reader {
|
||||
ReaderType::Default(ref reader) => {
|
||||
match self.reader.borrow().as_ref() {
|
||||
Some(ReaderType::Default(ref reader)) => {
|
||||
let Some(reader) = reader.get() else {
|
||||
// If reader is undefined, return.
|
||||
return;
|
||||
|
@ -656,7 +784,17 @@ impl ReadableStream {
|
|||
// step 5 & 6
|
||||
reader.close(can_gc);
|
||||
},
|
||||
ReaderType::BYOB(ref _reader) => {},
|
||||
Some(ReaderType::BYOB(ref reader)) => {
|
||||
let Some(reader) = reader.get() else {
|
||||
// If reader is undefined, return.
|
||||
return;
|
||||
};
|
||||
|
||||
reader.close(can_gc)
|
||||
},
|
||||
None => {
|
||||
// If reader is undefined, return.
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -685,24 +823,26 @@ impl ReadableStream {
|
|||
self.close(can_gc);
|
||||
|
||||
// If reader is not undefined and reader implements ReadableStreamBYOBReader,
|
||||
match self.reader {
|
||||
ReaderType::BYOB(ref reader) => {
|
||||
if let Some(reader) = reader.get() {
|
||||
// step 6.1, 6.2 & 6.3 of https://streams.spec.whatwg.org/#readable-stream-cancel
|
||||
reader.close(can_gc);
|
||||
}
|
||||
},
|
||||
ReaderType::Default(ref _reader) => {},
|
||||
if let Some(ReaderType::BYOB(ref reader)) = self.reader.borrow().as_ref() {
|
||||
if let Some(reader) = reader.get() {
|
||||
// step 6.1, 6.2 & 6.3 of https://streams.spec.whatwg.org/#readable-stream-cancel
|
||||
reader.cancel(can_gc);
|
||||
}
|
||||
}
|
||||
|
||||
// Let sourceCancelPromise be ! stream.[[controller]].[[CancelSteps]](reason).
|
||||
let source_cancel_promise = match self.controller {
|
||||
ControllerType::Default(ref controller) => controller
|
||||
|
||||
let source_cancel_promise = match self.controller.borrow().as_ref() {
|
||||
Some(ControllerType::Default(ref controller)) => controller
|
||||
.get()
|
||||
.expect("Stream should have controller.")
|
||||
.perform_cancel_steps(reason, can_gc),
|
||||
ControllerType::Byte(_) => {
|
||||
todo!()
|
||||
Some(ControllerType::Byte(ref controller)) => controller
|
||||
.get()
|
||||
.expect("Stream should have controller.")
|
||||
.perform_cancel_steps(reason, can_gc),
|
||||
None => {
|
||||
panic!("Stream does not have a controller.");
|
||||
},
|
||||
};
|
||||
|
||||
|
@ -733,23 +873,7 @@ impl ReadableStream {
|
|||
|
||||
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
|
||||
pub(crate) fn set_reader(&self, new_reader: Option<ReaderType>) {
|
||||
match (&self.reader, new_reader) {
|
||||
(ReaderType::Default(ref reader), Some(ReaderType::Default(new_reader))) => {
|
||||
reader.set(new_reader.get().as_deref());
|
||||
},
|
||||
(ReaderType::BYOB(ref reader), Some(ReaderType::BYOB(new_reader))) => {
|
||||
reader.set(new_reader.get().as_deref());
|
||||
},
|
||||
(ReaderType::Default(ref reader), None) => {
|
||||
reader.set(None);
|
||||
},
|
||||
(ReaderType::BYOB(ref reader), None) => {
|
||||
reader.set(None);
|
||||
},
|
||||
(_, _) => {
|
||||
unreachable!("Setting a mismatched reader type is not allowed.");
|
||||
},
|
||||
}
|
||||
*self.reader.borrow_mut() = new_reader;
|
||||
}
|
||||
|
||||
/// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
|
||||
|
@ -865,18 +989,68 @@ impl ReadableStream {
|
|||
// Assert: stream implements ReadableStream.
|
||||
// Assert: cloneForBranch2 is a boolean.
|
||||
|
||||
match self.controller {
|
||||
ControllerType::Default(ref _controller) => {
|
||||
match self.controller.borrow().as_ref() {
|
||||
Some(ControllerType::Default(_)) => {
|
||||
// Return ? ReadableStreamDefaultTee(stream, cloneForBranch2).
|
||||
self.default_tee(clone_for_branch_2, can_gc)
|
||||
},
|
||||
ControllerType::Byte(ref _controller) => {
|
||||
Some(ControllerType::Byte(_)) => {
|
||||
// If stream.[[controller]] implements ReadableByteStreamController,
|
||||
// return ? ReadableByteStreamTee(stream).
|
||||
todo!()
|
||||
Err(Error::Type(
|
||||
"Teeing is not yet supported for byte streams".to_owned(),
|
||||
))
|
||||
},
|
||||
None => {
|
||||
unreachable!("Stream should have a controller.");
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller-from-underlying-source>
|
||||
pub(crate) fn set_up_byte_controller(
|
||||
&self,
|
||||
global: &GlobalScope,
|
||||
underlying_source_dict: JsUnderlyingSource,
|
||||
underlying_source_handle: SafeHandleObject,
|
||||
stream: DomRoot<ReadableStream>,
|
||||
strategy_hwm: f64,
|
||||
can_gc: CanGc,
|
||||
) -> Fallible<()> {
|
||||
// Let pullAlgorithm be an algorithm that returns a promise resolved with undefined.
|
||||
// Let cancelAlgorithm be an algorithm that returns a promise resolved with undefined.
|
||||
// If underlyingSourceDict["start"] exists, then set startAlgorithm to an algorithm which returns the result
|
||||
// of invoking underlyingSourceDict["start"] with argument list « controller »
|
||||
// and callback this value underlyingSource.
|
||||
// If underlyingSourceDict["pull"] exists, then set pullAlgorithm to an algorithm which returns the result
|
||||
// of invoking underlyingSourceDict["pull"] with argument list « controller »
|
||||
// and callback this value underlyingSource.
|
||||
// If underlyingSourceDict["cancel"] exists, then set cancelAlgorithm to an algorithm which takes an
|
||||
// argument reason and returns the result of invoking underlyingSourceDict["cancel"] with argument list
|
||||
// « reason » and callback this value underlyingSource.
|
||||
|
||||
// Let autoAllocateChunkSize be underlyingSourceDict["autoAllocateChunkSize"],
|
||||
// if it exists, or undefined otherwise.
|
||||
// If autoAllocateChunkSize is 0, then throw a TypeError exception.
|
||||
if let Some(0) = underlying_source_dict.autoAllocateChunkSize {
|
||||
return Err(Error::Type("autoAllocateChunkSize cannot be 0".to_owned()));
|
||||
}
|
||||
|
||||
let controller = ReadableByteStreamController::new(
|
||||
UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
|
||||
strategy_hwm,
|
||||
global,
|
||||
can_gc,
|
||||
);
|
||||
|
||||
// Note: this must be done before `setup`,
|
||||
// otherwise `thisOb` is null in the start callback.
|
||||
controller.set_underlying_source_this_object(underlying_source_handle);
|
||||
|
||||
// Perform ? SetUpReadableByteStreamController(stream, controller, startAlgorithm,
|
||||
// pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize).
|
||||
controller.setup(global, stream, can_gc)
|
||||
}
|
||||
}
|
||||
|
||||
impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
|
||||
|
@ -907,25 +1081,29 @@ impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
|
|||
};
|
||||
|
||||
// Perform ! InitializeReadableStream(this).
|
||||
let stream = if underlying_source_dict.type_.is_some() {
|
||||
ReadableStream::new_with_proto(
|
||||
global,
|
||||
proto,
|
||||
ControllerType::Byte(MutNullableDom::new(None)),
|
||||
can_gc,
|
||||
)
|
||||
} else {
|
||||
ReadableStream::new_with_proto(
|
||||
global,
|
||||
proto,
|
||||
ControllerType::Default(MutNullableDom::new(None)),
|
||||
can_gc,
|
||||
)
|
||||
};
|
||||
let stream = ReadableStream::new_with_proto(global, proto, can_gc);
|
||||
|
||||
if underlying_source_dict.type_.is_some() {
|
||||
// TODO: If underlyingSourceDict["type"] is "bytes"
|
||||
return Err(Error::Type("Bytes streams not implemented".to_string()));
|
||||
// If strategy["size"] exists, throw a RangeError exception.
|
||||
if strategy.size.is_some() {
|
||||
return Err(Error::Range(
|
||||
"size is not supported for byte streams".to_owned(),
|
||||
));
|
||||
}
|
||||
|
||||
// Let highWaterMark be ? ExtractHighWaterMark(strategy, 0).
|
||||
let strategy_hwm = extract_high_water_mark(strategy, 0.0)?;
|
||||
|
||||
// Perform ? SetUpReadableByteStreamControllerFromUnderlyingSource(this,
|
||||
// underlyingSource, underlyingSourceDict, highWaterMark).
|
||||
stream.set_up_byte_controller(
|
||||
global,
|
||||
underlying_source_dict,
|
||||
underlying_source_obj.handle(),
|
||||
stream.clone(),
|
||||
strategy_hwm,
|
||||
can_gc,
|
||||
)?;
|
||||
} else {
|
||||
// Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
|
||||
let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue