Dom: Re-implement ReadableStream Part 1 : Default Reader and Controller (#34064)

* Re-implement readablestream: basics and default reader and controller

---------

Co-authored-by: Jason Tsai <jason@pews.dev>
Signed-off-by: Wu Wayne <yuweiwu@pm.me>

Add remaining WebIDLs of ReadableStream (#32605)

* Add Reader's WebIDL files

* Add necessary methods in ReadableStream.webidl

Signed-off-by: Wu Wayne <yuweiwu@pm.me>

Create safe wrapper for JSFunctions (#32620)

* Create safe wrapper for JSFunctions

Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>

* Add assert to check if the  name ends in a null character

Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>

* Create macro to wrap unsafe extern "C" function calls

Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>

* Remove WRAPPER_FN

Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>

* Add macro example documentation

Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>

* Use  C-string literals

Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>

* Ensure name is Cstr type

Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>

* Scope #[allow(unsafe_code)]

Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>

---------

Signed-off-by: Bentaimia Haddadi <haddadi.taym@gmail.com>
Signed-off-by: Wu Wayne <yuweiwu@pm.me>

Start implementation of default controller and reader

Start implementation of default controller and reader

* implement basic internal slots, with todos

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* enum for controller

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* re-implement native controller methods

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* add calling into pull algo

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* more details on chunk enqueuing

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* add fulfill read request, clean-up warnings

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* read request and reader typing

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* allow for more than one non-native underlying source type

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* add todo for should pull

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* add underlying source dom struct container

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* remove rc around source type

* add default controller init in stream constructor

* setup source container with prototype of source dict

* clean-up docs, dispatch of controller in pull algo call

* turn off SM streams

* remove prototype setting on underlying source container

* fix read request promise resolving

* tidy

* clean-up js conversions in read req handlers

* add queue with sizes concept

* use dom in pull promise handlers

* Demonstrate using dictionary as callback this object.

* move value with size to a struct

* fmt

* put readable stream state in a cell

* nits in expectations

* remove allow unroot by passing read result directly to promise resolving

* tidy

* root default controller inside call_pull_if_needed

---------

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Co-authored-by: Josh Matthews <josh@joshmatthews.net>
Signed-off-by: Wu Wayne <yuweiwu@pm.me>

ReadableStream: implement Cancel and Locked (#33136)

* implement Locked

* implement Cancel and close

Signed-off-by: Wu Wayne <yuweiwu@pm.me>

Add GetPromiseIsHandled and SetAnyPromiseIsHandled to Promise

Signed-off-by: Taym <haddadi.taym@gmail.com>

mach fmt

Signed-off-by: Taym <haddadi.taym@gmail.com>

Readablestream default controller: get desired size (#33497)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

stream: implement controller close (#33498)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

implement stream default controller error (#33503)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

Readablestream default controller: enqueue (#33528)

* Implement ReadableStreamDefaultControllerMethods::Enqueue

Signed-off-by: Wu Wayne <yuweiwu@pm.me>

* Add spec comments

Signed-off-by: Wu Wayne <yuweiwu@pm.me>

---------

Signed-off-by: Wu Wayne <yuweiwu@pm.me>

readablestream default controller: fulfill read requests (#33542)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

Fix extract_size_algorithm (#33561)

Signed-off-by: Wu Wayne <yuweiwu@pm.me>

Readablestream default controller: use strategy size (#33551)

* readablestream default controller: use strategy size, fallible enqueue

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

docs

* readablestream default controller: clear strategy size

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* prevent potential re-borrow panics when calling into the strategy size

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* document readablestream constructor

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

---------

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

Readablestream: impl default controller should pull, start algo (#33586)

* implement should-pull algo for default controller

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* add start algorithm setup for default controller

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

---------

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

implement promise native handling for start and pull algorithms (#33603)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

Implement ReadableStreamDefaultReader (#33160)

* Implement ReadableStreamDefaultReader

 Make the stream mutable
 readable-stream-reader-generic-release
 Proper error types when releasing
 Closed
 Cancel

Signed-off-by: Taym <haddadi.taym@gmail.com>

* follow the spec more closely

Signed-off-by: Taym <haddadi.taym@gmail.com>

---------

Signed-off-by: Taym <haddadi.taym@gmail.com>

Implement ReadableStreamDefaultReader read (#34007)

* Implement ReadableStreamDefaultReader read

Signed-off-by: Taym <haddadi.taym@gmail.com>

* Perform readRequest’s error steps with stream.stored_error

Signed-off-by: Taym <haddadi.taym@gmail.com>

---------

Signed-off-by: Taym <haddadi.taym@gmail.com>

Improve ReadableStreamDefaultReader close (#34014)

* improve ReadableStreamDefaultReader close

Signed-off-by: Taym <haddadi.taym@gmail.com>

* remove resolve_closed_promise

Signed-off-by: Taym <haddadi.taym@gmail.com>

---------

Signed-off-by: Taym <haddadi.taym@gmail.com>

Use Rc<Box<[u8]>> for queue to optimize get_in_memory_bytes

Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>

* Improve read_a_chunk and stop_reading implemntation (#34077)

Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Implement ReadableStreamDefaultReader::Constructor  (#34056)

* Implement ReadableStreamDefaultReader::Constructor

Signed-off-by: Taym <haddadi.taym@gmail.com>

* make start_reading returns ReadableStreamDefaultReader

Signed-off-by: Taym <haddadi.taym@gmail.com>

* Fix can_gc
Signed-off-by: Taym <haddadi.taym@gmail.com>

* Add canGc to ReadableStream::GetReader

Signed-off-by: Taym <haddadi.taym@gmail.com>

---------

Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Readablestream fix CanGc (#34080)

Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* correct ReadableStream::error_native implementation and fix clippy warnings (#34088)

Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* turn assertion of stream present on controller on a early return with false (#34097)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Fix already mutably borrowed crash (#34105)

Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Refactor `get_in_memory_bytes` to return `Option<Vec<u8>> and avoid `unreachable!` panic (#34123)

Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Set ReadableStream ReadableStreamDefaultReader in start_reading (#34125)

Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Fix  Unhandled rejection with value: object `TypeError: stream is not locked` (#34204)

Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Fix  assert!(self.is_readable()) crash in ReadableStream::close (#34207)

Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* fix call to to_js_object in underlying source algos (#34098)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* do not assume presence of a stream when performing pull steps (#34244)

* do not assume presence of a stream when performing pull steps

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* add doc comments

Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com>
Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com>

---------

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com>
Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* gracefully handle failure of underlying source algorithms (#34243)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* ensure result of calling start algo is an object (#34245)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* return js failed error if underlying source constructor threw (#34246)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Use JSVal for ValueWithSize::value (#34259)

Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* fix release reader lock, (#34255)

fix setting stream on controller in new,
fix matching fallthrough,
reduce visibility of controller error method

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* in stream cancel, reject promist if locked (#34271)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Fix UnderlyingSourceContainer::call_start_algorithm (#34277)

Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* implement controller cancel steps, fix stream cancel method (#34301)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* fix conditional in perform pull steps (#34324)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* set reader closed promise to one resolved with undefined if stream closed on init (#34321)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* fix init of stream and controller (#34323)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Stream: Fix reborrow in controller enqueue, and fix error and exception handling. (#34338)

* fix re-borrow in controller enqueue

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* do not call to_jsval on JSFailed error in enqueue

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* fix error and exception handling in controller enqueue

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* remove TODO about correctness of stored error, since this was done as part of the switch to a js val.

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

---------

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Stream: Fix incorrect "this" object in underlying source callbacks (#34368)

* in controller close, throw type error if stream cannot be closed

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* store original js object for underlying source, for use as this object in callbacks

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

---------

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* fix conditional logic in enqueue to ensure pull is called into (#34375)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Stream: Fix bytelength queueing strategy (#34376)

* fix handling of value that is not an object in bytelength queuing strategy

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* return type error if strategy size call fails, to prevent panic because no exception is pending

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

---------

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* set correct  default count queuing size strategy (#34389)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* use proto in stream constructor (#34441)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* fix edge cases in get_desired_size (#34440)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Stream: fix algo and strategy calls error handling. (#34424)

* fix error handling in cancel steps

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* in pull steps, reject promise if pull algo throws

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* if start algorithm fails, rethrow the error

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* when the strategy size fails, directly get the pending exception and use it to error the stream

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* add error handling to enqueue value with size

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* when enqueueing a value errors, ensure we error and stream with the same error used to throw an exception

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

---------

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* fix native use of streams (#34468)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Implement readablestreamdefaulttee (#34405)

* Implement readablestreamdefaulttee

Signed-off-by: Taym <haddadi.taym@gmail.com>

* Create UnderlyingSourceType::Tee each stream

Signed-off-by: Taym <haddadi.taym@gmail.com>

* Use Dom instead of DomRoot

Signed-off-by: Taym <haddadi.taym@gmail.com>

* Queue a microtask for readRequest chunk steps

Signed-off-by: Taym <haddadi.taym@gmail.com>

* fix create_readable_stream

Signed-off-by: Taym <haddadi.taym@gmail.com>

* Remove unnecessary Rc

Signed-off-by: Taym <haddadi.taym@gmail.com>

* Use correct doc link

Signed-off-by: Taym <haddadi.taym@gmail.com>

* Add #[allow(crown::unrooted_must_root)]

Signed-off-by: Taym <haddadi.taym@gmail.com>

* Fix crash in ClosedPromiseRejectionHandler

Signed-off-by: Taym <haddadi.taym@gmail.com>

* reflect TeeReadRequest and TeeUnderlyingSource
Signed-off-by: Taym <haddadi.taym@gmail.com>

* fix can_gc

Signed-off-by: Taym <haddadi.taym@gmail.com>

* reflect tee source, and fix use of mutable dom for tee source and request

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* Fix typo that resolves multiple test failures in 'Tee' tests

Signed-off-by: Taym <haddadi.taym@gmail.com>

* Fix readable-streams/tee.any.js test

Signed-off-by: Taym <haddadi.taym@gmail.com>

---------

Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Co-authored-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Align ReadableStreamDefaultReader with spec and fix additional tests in default-reader.any.js (#34531)

And fix crate::DomTypeHolder usage

* Align ReadableStreamDefaultReader with spec and fix additional tests in default-reader.any.js

Signed-off-by: Taym <haddadi.taym@gmail.com>

* make reader rooted in Constructor and acquire_default_reader

Signed-off-by: Taym <haddadi.taym@gmail.com>

* Remove spaces

Signed-off-by: Taym <haddadi.taym@gmail.com>

---------

Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Streams: fetch stream chunks should be uint8 arrays (#34553)

* fetch stream chunks should be uint8 arrays

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

* fix clippy

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

---------

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Update wpt test for ReadableStream reimplementation (#34579)

Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Fix ignore_malloc_size_of in readablestream tee (#34578)

Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Remove incorrect use of handle array, fail test safely by giving only one reason (#34560)

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Update more wpt test for ReadableStream reimplementation (#34598)

Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Fix doc and rename Tee to DefaultTee (#34612)

Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* fix: Address review comments

Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Update response-stream-with-broken-then.any.js.ini test expectation

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* fix reflect_dom_object can_gc

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Fix compositeReason for DefaultTeeUnderlyingSource (#34627)

* Fix compositeReason for DefaultTeeUnderlyingSource

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Update test

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

---------

Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>

* Last fixes stream (#34636)

* remove now unsused from_js method of readable stream

* fix documenation of error steps

* return type error instread of panicking on a todo, when trying to construct a stream of type bytes

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>

---------

Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com>

* fix crown rooting related errors (#34662)

Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com>

---------

Signed-off-by: Taym <haddadi.taym@gmail.com>
Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
Signed-off-by: Taym Haddadi <haddadi.taym@gmail.com>
Signed-off-by: Gregory Terzian <2792687+gterzian@users.noreply.github.com>
Co-authored-by: Wu Wayne <yuweiwu@pm.me>
Co-authored-by: Taym Haddadi <haddadi.taym@gmail.com>
This commit is contained in:
Gregory Terzian 2024-12-18 05:14:00 +08:00 committed by GitHub
parent 026d371717
commit 379bbb41dd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
48 changed files with 3792 additions and 548 deletions

View file

@ -0,0 +1,878 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use std::cell::{Cell, RefCell};
use std::collections::VecDeque;
use std::ptr;
use std::rc::Rc;
use dom_struct::dom_struct;
use js::jsapi::{Heap, JSObject};
use js::jsval::{JSVal, UndefinedValue};
use js::rust::wrappers::JS_GetPendingException;
use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue, MutableHandleValue};
use js::typedarray::Uint8;
use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
use crate::dom::bindings::buffer_source::create_buffer_source;
use crate::dom::bindings::callback::ExceptionHandling;
use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods;
use crate::dom::bindings::import::module::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
use crate::dom::bindings::import::module::{throw_dom_exception, Error, Fallible};
use crate::dom::bindings::refcounted::Trusted;
use crate::dom::bindings::reflector::{reflect_dom_object, DomObject, Reflector};
use crate::dom::bindings::root::{DomRoot, MutNullableDom};
use crate::dom::bindings::trace::RootedTraceableBox;
use crate::dom::globalscope::GlobalScope;
use crate::dom::promise::Promise;
use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
use crate::dom::readablestream::ReadableStream;
use crate::dom::readablestreamdefaultreader::ReadRequest;
use crate::dom::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
use crate::js::conversions::ToJSValConvertible;
use crate::realms::{enter_realm, InRealm};
use crate::script_runtime::{CanGc, JSContext, JSContext as SafeJSContext};
/// The fulfillment handler for
/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
#[derive(Clone, JSTraceable, MallocSizeOf)]
#[allow(crown::unrooted_must_root)]
struct PullAlgorithmFulfillmentHandler {
#[ignore_malloc_size_of = "Trusted are hard"]
controller: Trusted<ReadableStreamDefaultController>,
}
impl Callback for PullAlgorithmFulfillmentHandler {
/// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
/// Upon fulfillment of pullPromise
fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
let controller = self.controller.root();
// Set controller.[[pulling]] to false.
controller.pulling.set(false);
// If controller.[[pullAgain]] is true,
if controller.pull_again.get() {
// Set controller.[[pullAgain]] to false.
controller.pull_again.set(false);
// Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
controller.call_pull_if_needed(can_gc);
}
}
}
/// The rejection handler for
/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
#[derive(Clone, JSTraceable, MallocSizeOf)]
#[allow(crown::unrooted_must_root)]
struct PullAlgorithmRejectionHandler {
#[ignore_malloc_size_of = "Trusted are hard"]
controller: Trusted<ReadableStreamDefaultController>,
}
impl Callback for PullAlgorithmRejectionHandler {
/// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
/// Upon rejection of pullPromise with reason e.
fn callback(&self, _cx: JSContext, v: HandleValue, _realm: InRealm, _can_gc: CanGc) {
let controller = self.controller.root();
// Perform ! ReadableStreamDefaultControllerError(controller, e).
controller.error(v);
}
}
/// The fulfillment handler for
/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
#[derive(Clone, JSTraceable, MallocSizeOf)]
#[allow(crown::unrooted_must_root)]
struct StartAlgorithmFulfillmentHandler {
#[ignore_malloc_size_of = "Trusted are hard"]
controller: Trusted<ReadableStreamDefaultController>,
}
impl Callback for StartAlgorithmFulfillmentHandler {
/// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
/// Upon fulfillment of startPromise,
fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
let controller = self.controller.root();
// Set controller.[[started]] to true.
controller.started.set(true);
// Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
controller.call_pull_if_needed(can_gc);
}
}
/// The rejection handler for
/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
#[derive(Clone, JSTraceable, MallocSizeOf)]
#[allow(crown::unrooted_must_root)]
struct StartAlgorithmRejectionHandler {
#[ignore_malloc_size_of = "Trusted are hard"]
controller: Trusted<ReadableStreamDefaultController>,
}
impl Callback for StartAlgorithmRejectionHandler {
/// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
/// Upon rejection of startPromise with reason r,
fn callback(&self, _cx: JSContext, v: HandleValue, _realm: InRealm, _can_gc: CanGc) {
let controller = self.controller.root();
// Perform ! ReadableStreamDefaultControllerError(controller, r).
controller.error(v);
}
}
/// <https://streams.spec.whatwg.org/#value-with-size>
#[derive(JSTraceable)]
#[crown::unrooted_must_root_lint::must_root]
pub struct ValueWithSize {
value: Box<Heap<JSVal>>,
size: f64,
}
/// <https://streams.spec.whatwg.org/#value-with-size>
#[derive(JSTraceable)]
#[crown::unrooted_must_root_lint::must_root]
pub enum EnqueuedValue {
/// A value enqueued from Rust.
Native(Box<[u8]>),
/// A Js value.
Js(ValueWithSize),
}
impl EnqueuedValue {
fn size(&self) -> f64 {
match self {
EnqueuedValue::Native(v) => v.len() as f64,
EnqueuedValue::Js(v) => v.size,
}
}
#[allow(unsafe_code)]
fn to_jsval(&self, cx: SafeJSContext, rval: MutableHandleValue) {
match self {
EnqueuedValue::Native(chunk) => {
rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
create_buffer_source::<Uint8>(cx, chunk, array_buffer_ptr.handle_mut())
.expect("failed to create buffer source for native chunk.");
unsafe { array_buffer_ptr.to_jsval(*cx, rval) };
},
EnqueuedValue::Js(value_with_size) => unsafe {
value_with_size.value.to_jsval(*cx, rval);
},
}
}
}
/// <https://streams.spec.whatwg.org/#is-non-negative-number>
fn is_non_negative_number(value: &EnqueuedValue) -> bool {
let value_with_size = match value {
EnqueuedValue::Native(_) => return true,
EnqueuedValue::Js(value_with_size) => value_with_size,
};
// If v is not a Number, return false.
// Checked as part of the WebIDL.
// If v is NaN, return false.
if value_with_size.size.is_nan() {
return false;
}
// If v < 0, return false.
if value_with_size.size.is_sign_negative() {
return false;
}
true
}
/// <https://streams.spec.whatwg.org/#queue-with-sizes>
#[derive(Default, JSTraceable, MallocSizeOf)]
#[crown::unrooted_must_root_lint::must_root]
pub struct QueueWithSizes {
#[ignore_malloc_size_of = "EnqueuedValue::Js"]
queue: VecDeque<EnqueuedValue>,
/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queuetotalsize>
total_size: f64,
}
impl QueueWithSizes {
/// <https://streams.spec.whatwg.org/#dequeue-value>
#[allow(crown::unrooted_must_root)]
fn dequeue_value(&mut self) -> EnqueuedValue {
let value = self
.queue
.pop_front()
.expect("Buffer cannot be empty when dequeue value is called into.");
self.total_size -= value.size();
value
}
/// <https://streams.spec.whatwg.org/#enqueue-value-with-size>
#[allow(crown::unrooted_must_root)]
fn enqueue_value_with_size(&mut self, value: EnqueuedValue) -> Result<(), Error> {
// If ! IsNonNegativeNumber(size) is false, throw a RangeError exception.
if !is_non_negative_number(&value) {
return Err(Error::Range(
"The size of the enqueued chunk is not a non-negative number.".to_string(),
));
}
// If size is +∞, throw a RangeError exception.
if value.size().is_infinite() {
return Err(Error::Range(
"The size of the enqueued chunk is infinite.".to_string(),
));
}
self.total_size += value.size();
self.queue.push_back(value);
Ok(())
}
fn is_empty(&self) -> bool {
self.queue.is_empty()
}
/// Only used with native sources.
fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
self.queue
.iter()
.try_fold(Vec::new(), |mut acc, value| match value {
EnqueuedValue::Native(chunk) => {
acc.extend(chunk.iter().copied());
Some(acc)
},
_ => {
warn!("get_in_memory_bytes called on a controller with non-native source.");
None
},
})
}
/// <https://streams.spec.whatwg.org/#reset-queue>
fn reset(&mut self) {
self.queue.clear();
self.total_size = Default::default();
}
}
/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller>
#[dom_struct]
pub struct ReadableStreamDefaultController {
reflector_: Reflector,
/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queue>
queue: RefCell<QueueWithSizes>,
/// A mutable reference to the underlying source is used to implement these two
/// internal slots:
///
/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullalgorithm>
/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-cancelalgorithm>
underlying_source: MutNullableDom<UnderlyingSourceContainer>,
stream: MutNullableDom<ReadableStream>,
/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategyhwm>
strategy_hwm: f64,
/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategysizealgorithm>
#[ignore_malloc_size_of = "mozjs"]
strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-closerequested>
close_requested: Cell<bool>,
/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-started>
started: Cell<bool>,
/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pulling>
pulling: Cell<bool>,
/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullagain>
pull_again: Cell<bool>,
}
impl ReadableStreamDefaultController {
#[allow(crown::unrooted_must_root)]
fn new_inherited(
global: &GlobalScope,
underlying_source_type: UnderlyingSourceType,
strategy_hwm: f64,
strategy_size: Rc<QueuingStrategySize>,
can_gc: CanGc,
) -> ReadableStreamDefaultController {
ReadableStreamDefaultController {
reflector_: Reflector::new(),
queue: RefCell::new(Default::default()),
stream: MutNullableDom::new(None),
underlying_source: MutNullableDom::new(Some(&*UnderlyingSourceContainer::new(
global,
underlying_source_type,
can_gc,
))),
strategy_hwm,
strategy_size: RefCell::new(Some(strategy_size)),
close_requested: Default::default(),
started: Default::default(),
pulling: Default::default(),
pull_again: Default::default(),
}
}
#[allow(crown::unrooted_must_root)]
pub fn new(
global: &GlobalScope,
underlying_source: UnderlyingSourceType,
strategy_hwm: f64,
strategy_size: Rc<QueuingStrategySize>,
can_gc: CanGc,
) -> DomRoot<ReadableStreamDefaultController> {
reflect_dom_object(
Box::new(ReadableStreamDefaultController::new_inherited(
global,
underlying_source,
strategy_hwm,
strategy_size,
can_gc,
)),
global,
can_gc,
)
}
/// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
#[allow(unsafe_code)]
pub fn setup(&self, stream: DomRoot<ReadableStream>, can_gc: CanGc) -> Result<(), Error> {
// Assert: stream.[[controller]] is undefined
stream.assert_no_controller();
// Set controller.[[stream]] to stream.
self.stream.set(Some(&stream));
let global = &*self.global();
let rooted_default_controller = DomRoot::from_ref(self);
// Perform ! ResetQueue(controller).
// Set controller.[[started]], controller.[[closeRequested]],
// controller.[[pullAgain]], and controller.[[pulling]] to false.
// Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm
// and controller.[[strategyHWM]] to highWaterMark.
// Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm
// and controller.[[strategyHWM]] to highWaterMark.
// Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
// Note: the above steps are done in `new`.
// Set stream.[[controller]] to controller.
stream.set_default_controller(&rooted_default_controller);
if let Some(underlying_source) = rooted_default_controller.underlying_source.get() {
// Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
let start_result = underlying_source
.call_start_algorithm(
Controller::ReadableStreamDefaultController(rooted_default_controller.clone()),
can_gc,
)
.unwrap_or_else(|| {
let promise = Promise::new(global, can_gc);
promise.resolve_native(&());
Ok(promise)
});
// Let startPromise be a promise resolved with startResult.
let start_promise = start_result?;
// Upon fulfillment of startPromise,
let fulfillment_handler = Box::new(StartAlgorithmFulfillmentHandler {
controller: Trusted::new(&*rooted_default_controller),
});
// Upon rejection of startPromise with reason r,
let rejection_handler = Box::new(StartAlgorithmRejectionHandler {
controller: Trusted::new(&*rooted_default_controller),
});
let handler = PromiseNativeHandler::new(
global,
Some(fulfillment_handler),
Some(rejection_handler),
);
let realm = enter_realm(global);
let comp = InRealm::Entered(&realm);
start_promise.append_native_handler(&handler, comp, can_gc);
};
Ok(())
}
/// Setting the JS object after the heap has settled down.
pub fn set_underlying_source_this_object(&self, this_object: HandleObject) {
if let Some(underlying_source) = self.underlying_source.get() {
underlying_source.set_underlying_source_this_object(this_object);
}
}
/// <https://streams.spec.whatwg.org/#dequeue-value>
#[allow(crown::unrooted_must_root)]
fn dequeue_value(&self) -> EnqueuedValue {
let mut queue = self.queue.borrow_mut();
queue.dequeue_value()
}
/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-should-call-pull>
fn should_call_pull(&self) -> bool {
// Let stream be controller.[[stream]].
// Note: the spec does not assert that stream is not undefined here,
// so we return false if it is.
let Some(stream) = self.stream.get() else {
debug!("`should_call_pull` called on a controller without a stream.");
return false;
};
// If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
if !self.can_close_or_enqueue() {
return false;
}
// If controller.[[started]] is false, return false.
if !self.started.get() {
return false;
}
// If ! IsReadableStreamLocked(stream) is true
// and ! ReadableStreamGetNumReadRequests(stream) > 0, return true.
if stream.is_locked() && stream.get_num_read_requests() > 0 {
return true;
}
// Let desiredSize be ! ReadableStreamDefaultControllerGetDesiredSize(controller).
// Assert: desiredSize is not null.
let desired_size = self.get_desired_size().expect("desiredSize is not null.");
if desired_size > 0. {
return true;
}
false
}
/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
#[allow(unsafe_code)]
fn call_pull_if_needed(&self, can_gc: CanGc) {
if !self.should_call_pull() {
return;
}
// If controller.[[pulling]] is true,
if self.pulling.get() {
// Set controller.[[pullAgain]] to true.
self.pull_again.set(true);
return;
}
// Set controller.[[pulling]] to true.
self.pulling.set(true);
// Let pullPromise be the result of performing controller.[[pullAlgorithm]].
// Continues into the resolve and reject handling of the native handler.
let global = self.global();
let rooted_default_controller = DomRoot::from_ref(self);
let controller =
Controller::ReadableStreamDefaultController(rooted_default_controller.clone());
let Some(underlying_source) = self.underlying_source.get() else {
return;
};
let fulfillment_handler = Box::new(PullAlgorithmFulfillmentHandler {
controller: Trusted::new(&*rooted_default_controller),
});
let rejection_handler = Box::new(PullAlgorithmRejectionHandler {
controller: Trusted::new(&*rooted_default_controller),
});
let handler =
PromiseNativeHandler::new(&global, Some(fulfillment_handler), Some(rejection_handler));
let realm = enter_realm(&*global);
let comp = InRealm::Entered(&realm);
let result = underlying_source
.call_pull_algorithm(controller, can_gc)
.unwrap_or_else(|| {
let promise = Promise::new(&global, can_gc);
promise.resolve_native(&());
Ok(promise)
});
let promise = result.unwrap_or_else(|error| {
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut rval = UndefinedValue());
// TODO: check if `self.global()` is the right globalscope.
unsafe {
error
.clone()
.to_jsval(*cx, &self.global(), rval.handle_mut())
};
let promise = Promise::new(&global, can_gc);
promise.reject_native(&rval.handle());
promise
});
promise.append_native_handler(&handler, comp, can_gc);
}
/// <https://streams.spec.whatwg.org/#rs-default-controller-private-cancel>
#[allow(unsafe_code)]
pub fn perform_cancel_steps(&self, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
// Perform ! ResetQueue(this).
self.queue.borrow_mut().reset();
let underlying_source = self
.underlying_source
.get()
.expect("Controller should have a source when the cancel steps are called into.");
let global = self.global();
// Let result be the result of performing this.[[cancelAlgorithm]], passing reason.
let result = underlying_source
.call_cancel_algorithm(reason, can_gc)
.unwrap_or_else(|| {
let promise = Promise::new(&global, can_gc);
promise.resolve_native(&());
Ok(promise)
});
let promise = result.unwrap_or_else(|error| {
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut rval = UndefinedValue());
// TODO: check if `self.global()` is the right globalscope.
unsafe {
error
.clone()
.to_jsval(*cx, &self.global(), rval.handle_mut())
};
let promise = Promise::new(&global, can_gc);
promise.reject_native(&rval.handle());
promise
});
// Perform ! ReadableStreamDefaultControllerClearAlgorithms(this).
self.clear_algorithms();
// Return result(the promise).
promise
}
/// <https://streams.spec.whatwg.org/#rs-default-controller-private-pull>
#[allow(crown::unrooted_must_root)]
pub fn perform_pull_steps(&self, read_request: &ReadRequest, can_gc: CanGc) {
// Let stream be this.[[stream]].
// Note: the spec does not assert that there is a stream.
let Some(stream) = self.stream.get() else {
return;
};
// if queue contains bytes, perform chunk steps.
if !self.queue.borrow().is_empty() {
// Rooting: chunk will be tied to a rooted value
// before calling into a function that can GC.
let chunk = self.dequeue_value();
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut rval = UndefinedValue());
let result = RootedTraceableBox::new(Heap::default());
chunk.to_jsval(cx, rval.handle_mut());
result.set(*rval);
// If this.[[closeRequested]] is true and this.[[queue]] is empty
if self.close_requested.get() && self.queue.borrow().is_empty() {
// Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
self.clear_algorithms();
// Perform ! ReadableStreamClose(stream).
stream.close();
} else {
// Otherwise, perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
self.call_pull_if_needed(can_gc);
}
// Perform readRequests chunk steps, given chunk.
read_request.chunk_steps(result);
} else {
// Perform ! ReadableStreamAddReadRequest(stream, readRequest).
stream.add_read_request(read_request);
// Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
self.call_pull_if_needed(can_gc);
}
}
/// <https://streams.spec.whatwg.org/#ref-for-abstract-opdef-readablestreamcontroller-releasesteps>
pub fn perform_release_steps(&self) {
// step 1 - Return.
}
/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
#[allow(unsafe_code)]
pub fn enqueue(
&self,
cx: SafeJSContext,
chunk: SafeHandleValue,
can_gc: CanGc,
) -> Result<(), Error> {
// If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
if !self.can_close_or_enqueue() {
return Ok(());
}
let stream = self
.stream
.get()
.expect("Controller must have a stream when a chunk is enqueued.");
// If ! IsReadableStreamLocked(stream) is true
// and ! ReadableStreamGetNumReadRequests(stream) > 0,
// perform ! ReadableStreamFulfillReadRequest(stream, chunk, false).
if stream.is_locked() && stream.get_num_read_requests() > 0 {
stream.fulfill_read_request(chunk, false);
} else {
// Otherwise,
// Let result be the result of performing controller.[[strategySizeAlgorithm]],
// passing in chunk, and interpreting the result as a completion record.
// Note: the clone is necessary to prevent potential re-borrow panics.
let strategy_size = {
let reference = self.strategy_size.borrow();
reference.clone()
};
let size = if let Some(strategy_size) = strategy_size {
// Note: the Rethrow exception handling is necessary,
// otherwise returning JSFailed will panic because no exception is pending.
let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow);
match result {
// Let chunkSize be result.[[Value]].
Ok(size) => size,
Err(error) => {
// If result is an abrupt completion,
rooted!(in(*cx) let mut rval = UndefinedValue());
unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) };
// Perform ! ReadableStreamDefaultControllerError(controller, result.[[Value]]).
self.error(rval.handle());
// Return result.
// Note: we need to return a type error, because no exception is pending.
return Err(error);
},
}
} else {
0.
};
{
// Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize).
let res = {
let mut queue = self.queue.borrow_mut();
queue.enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
value: Heap::boxed(chunk.get()),
size,
}))
};
if let Err(error) = res {
// If enqueueResult is an abrupt completion,
// First, throw the exception.
// Note: this must be done manually here,
// because `enqueue_value_with_size` does not call into JS.
throw_dom_exception(cx, &self.global(), error);
// Then, get a handle to the JS val for the exception,
// and use that to error the stream.
rooted!(in(*cx) let mut rval = UndefinedValue());
unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) };
// Perform ! ReadableStreamDefaultControllerError(controller, enqueueResult.[[Value]]).
self.error(rval.handle());
// Return enqueueResult.
// Note: because we threw the exception above,
// there is a pending exception and we can return JSFailed.
return Err(Error::JSFailed);
}
}
}
// Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
self.call_pull_if_needed(can_gc);
Ok(())
}
/// Native call to
/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
#[allow(crown::unrooted_must_root)]
pub fn enqueue_native(&self, chunk: Vec<u8>) {
let stream = self
.stream
.get()
.expect("Controller must have a stream when a chunk is enqueued.");
if stream.is_locked() && stream.get_num_read_requests() > 0 {
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut rval = UndefinedValue());
let enqueued_chunk = EnqueuedValue::Native(chunk.into_boxed_slice());
enqueued_chunk.to_jsval(cx, rval.handle_mut());
stream.fulfill_read_request(rval.handle(), false);
} else {
let mut queue = self.queue.borrow_mut();
queue
.enqueue_value_with_size(EnqueuedValue::Native(chunk.into_boxed_slice()))
.expect("Enqueuing a chunk from Rust should not fail.");
}
}
/// Does the stream have all data in memory?
pub fn in_memory(&self) -> bool {
let Some(underlying_source) = self.underlying_source.get() else {
return false;
};
underlying_source.in_memory()
}
/// Return bytes synchronously if the stream has all data in memory.
pub fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
let underlying_source = self.underlying_source.get()?;
if underlying_source.in_memory() {
return self.queue.borrow().get_in_memory_bytes();
}
None
}
/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-clear-algorithms>
fn clear_algorithms(&self) {
// Set controller.[[pullAlgorithm]] to undefined.
// Set controller.[[cancelAlgorithm]] to undefined.
self.underlying_source.set(None);
// Set controller.[[strategySizeAlgorithm]] to undefined.
*self.strategy_size.borrow_mut() = None;
}
/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
pub fn close(&self) {
// If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
if !self.can_close_or_enqueue() {
return;
}
let Some(stream) = self.stream.get() else {
return;
};
// Set controller.[[closeRequested]] to true.
self.close_requested.set(true);
if self.queue.borrow().is_empty() {
// Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
self.clear_algorithms();
// Perform ! ReadableStreamClose(stream).
stream.close();
}
}
/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-get-desired-size>
fn get_desired_size(&self) -> Option<f64> {
let stream = self.stream.get()?;
// If state is "errored", return null.
if stream.is_errored() {
return None;
}
// If state is "closed", return 0.
if stream.is_closed() {
return Some(0.0);
}
// Return controller.[[strategyHWM]] controller.[[queueTotalSize]].
let queue = self.queue.borrow();
let desired_size = self.strategy_hwm - queue.total_size.clamp(0.0, f64::MAX);
Some(desired_size.clamp(desired_size, self.strategy_hwm))
}
/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-can-close-or-enqueue>
fn can_close_or_enqueue(&self) -> bool {
let Some(stream) = self.stream.get() else {
return false;
};
// If controller.[[closeRequested]] is false and state is "readable", return true.
if !self.close_requested.get() && stream.is_readable() {
return true;
}
// Otherwise, return false.
false
}
/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-error>
pub fn error(&self, e: SafeHandleValue) {
let Some(stream) = self.stream.get() else {
return;
};
// If stream.[[state]] is not "readable", return.
if !stream.is_readable() {
return;
}
// Perform ! ResetQueue(controller).
self.queue.borrow_mut().reset();
// Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
self.clear_algorithms();
stream.error(e);
}
}
impl ReadableStreamDefaultControllerMethods<crate::DomTypeHolder>
for ReadableStreamDefaultController
{
/// <https://streams.spec.whatwg.org/#rs-default-controller-desired-size>
fn GetDesiredSize(&self) -> Option<f64> {
self.get_desired_size()
}
/// <https://streams.spec.whatwg.org/#rs-default-controller-close>
fn Close(&self) -> Fallible<()> {
if !self.can_close_or_enqueue() {
// If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false,
// throw a TypeError exception.
return Err(Error::Type("Stream cannot be closed.".to_string()));
}
// Perform ! ReadableStreamDefaultControllerClose(this).
self.close();
Ok(())
}
/// <https://streams.spec.whatwg.org/#rs-default-controller-enqueue>
fn Enqueue(&self, cx: SafeJSContext, chunk: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
// If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false, throw a TypeError exception.
if !self.can_close_or_enqueue() {
return Err(Error::Type("Stream cannot be enqueued to.".to_string()));
}
// Perform ? ReadableStreamDefaultControllerEnqueue(this, chunk).
self.enqueue(cx, chunk, can_gc)
}
/// <https://streams.spec.whatwg.org/#rs-default-controller-error>
fn Error(&self, _cx: SafeJSContext, e: SafeHandleValue) -> Fallible<()> {
self.error(e);
Ok(())
}
}