diff --git a/components/script/dom/defaultteereadrequest.rs b/components/script/dom/defaultteereadrequest.rs index debc084e068..94e285da72b 100644 --- a/components/script/dom/defaultteereadrequest.rs +++ b/components/script/dom/defaultteereadrequest.rs @@ -21,7 +21,7 @@ use crate::dom::globalscope::GlobalScope; use crate::dom::promise::Promise; use crate::dom::readablestream::ReadableStream; use crate::microtask::Microtask; -use crate::script_runtime::CanGc; +use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; #[derive(JSTraceable, MallocSizeOf)] #[cfg_attr(crown, allow(crown::unrooted_must_root))] @@ -32,8 +32,8 @@ pub(crate) struct DefaultTeeReadRequestMicrotask { } impl DefaultTeeReadRequestMicrotask { - pub(crate) fn microtask_chunk_steps(&self, can_gc: CanGc) { - self.tee_read_request.chunk_steps(&self.chunk, can_gc) + pub(crate) fn microtask_chunk_steps(&self, cx: SafeJSContext, can_gc: CanGc) { + self.tee_read_request.chunk_steps(cx, &self.chunk, can_gc) } } @@ -94,8 +94,14 @@ impl DefaultTeeReadRequest { } /// Call into cancel of the stream, /// - pub(crate) fn stream_cancel(&self, reason: SafeHandleValue, can_gc: CanGc) { - self.stream.cancel(reason, can_gc); + pub(crate) fn stream_cancel( + &self, + cx: SafeJSContext, + global: &GlobalScope, + reason: SafeHandleValue, + can_gc: CanGc, + ) { + self.stream.cancel(cx, global, reason, can_gc); } /// Enqueue a microtask to perform the chunk steps /// @@ -115,13 +121,13 @@ impl DefaultTeeReadRequest { } /// #[allow(clippy::borrowed_box)] - pub(crate) fn chunk_steps(&self, chunk: &Box>, can_gc: CanGc) { + pub(crate) fn chunk_steps(&self, cx: SafeJSContext, chunk: &Box>, can_gc: CanGc) { + let global = &self.stream.global(); // Set readAgain to false. self.read_again.set(false); // Let chunk1 and chunk2 be chunk. let chunk1 = chunk; let chunk2 = chunk; - let cx = GlobalScope::get_cx(); rooted!(in(*cx) let chunk1_value = chunk1.get()); rooted!(in(*cx) let chunk2_value = chunk2.get()); @@ -131,9 +137,7 @@ impl DefaultTeeReadRequest { rooted!(in(*cx) let mut clone_result = UndefinedValue()); let data = structuredclone::write(cx, chunk2_value.handle(), None).unwrap(); // If cloneResult is an abrupt completion, - if structuredclone::read(&self.stream.global(), data, clone_result.handle_mut()) - .is_err() - { + if structuredclone::read(global, data, clone_result.handle_mut()).is_err() { // Perform ! ReadableStreamDefaultControllerError(branch_1.[[controller]], cloneResult.[[Value]]). self.readable_stream_default_controller_error( &self.branch_1, @@ -148,7 +152,7 @@ impl DefaultTeeReadRequest { can_gc, ); // Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]). - self.stream_cancel(clone_result.handle(), can_gc); + self.stream_cancel(cx, global, clone_result.handle(), can_gc); // Return. return; } else { diff --git a/components/script/dom/defaultteeunderlyingsource.rs b/components/script/dom/defaultteeunderlyingsource.rs index 5895297d982..7935c388842 100644 --- a/components/script/dom/defaultteeunderlyingsource.rs +++ b/components/script/dom/defaultteeunderlyingsource.rs @@ -19,7 +19,7 @@ use crate::dom::defaultteereadrequest::DefaultTeeReadRequest; use crate::dom::globalscope::GlobalScope; use crate::dom::promise::Promise; use crate::dom::readablestreamdefaultreader::ReadRequest; -use crate::script_runtime::CanGc; +use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; #[derive(JSTraceable, MallocSizeOf)] pub(crate) enum TeeCancelAlgorithm { @@ -156,6 +156,8 @@ impl DefaultTeeUnderlyingSource { #[allow(unsafe_code)] pub(crate) fn cancel_algorithm( &self, + cx: SafeJSContext, + global: &GlobalScope, reason: SafeHandleValue, can_gc: CanGc, ) -> Option, Error>> { @@ -169,7 +171,7 @@ impl DefaultTeeUnderlyingSource { // If canceled_2 is true, if self.canceled_2.get() { - self.resolve_cancel_promise(can_gc); + self.resolve_cancel_promise(cx, global, can_gc); } // Return cancelPromise. Some(Ok(self.cancel_promise.clone())) @@ -183,7 +185,7 @@ impl DefaultTeeUnderlyingSource { // If canceled_1 is true, if self.canceled_1.get() { - self.resolve_cancel_promise(can_gc); + self.resolve_cancel_promise(cx, global, can_gc); } // Return cancelPromise. Some(Ok(self.cancel_promise.clone())) @@ -192,9 +194,8 @@ impl DefaultTeeUnderlyingSource { } #[allow(unsafe_code)] - fn resolve_cancel_promise(&self, can_gc: CanGc) { + fn resolve_cancel_promise(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) { // Let compositeReason be ! CreateArrayFromList(« reason_1, reason_2 »). - let cx = GlobalScope::get_cx(); rooted_vec!(let mut reasons_values); reasons_values.push(self.reason_1.get()); reasons_values.push(self.reason_2.get()); @@ -204,7 +205,9 @@ impl DefaultTeeUnderlyingSource { rooted!(in(*cx) let reasons_value = ObjectValue(reasons.get())); // Let cancelResult be ! ReadableStreamCancel(stream, compositeReason). - let cancel_result = self.stream.cancel(reasons_value.handle(), can_gc); + let cancel_result = self + .stream + .cancel(cx, global, reasons_value.handle(), can_gc); // Resolve cancelPromise with cancelResult. self.cancel_promise.resolve_native(&cancel_result, can_gc); diff --git a/components/script/dom/readablebytestreamcontroller.rs b/components/script/dom/readablebytestreamcontroller.rs index 340e6d04eab..8f28a9a1215 100644 --- a/components/script/dom/readablebytestreamcontroller.rs +++ b/components/script/dom/readablebytestreamcontroller.rs @@ -1612,7 +1612,7 @@ impl ReadableByteStreamController { let realm = enter_realm(&*global); let comp = InRealm::Entered(&realm); let result = underlying_source - .call_pull_algorithm(controller, can_gc) + .call_pull_algorithm(controller, &global, can_gc) .unwrap_or_else(|| { let promise = Promise::new(&global, can_gc); promise.resolve_native(&(), can_gc); @@ -1781,6 +1781,8 @@ impl ReadableByteStreamController { /// pub(crate) fn perform_cancel_steps( &self, + cx: SafeJSContext, + global: &GlobalScope, reason: SafeHandleValue, can_gc: CanGc, ) -> Rc { @@ -1794,13 +1796,12 @@ impl ReadableByteStreamController { .underlying_source .get() .expect("Controller should have a source when the cancel steps are called into."); - let global = self.global(); // Let result be the result of performing this.[[cancelAlgorithm]], passing in reason. let result = underlying_source - .call_cancel_algorithm(reason, can_gc) + .call_cancel_algorithm(cx, global, reason, can_gc) .unwrap_or_else(|| { - let promise = Promise::new(&global, can_gc); + let promise = Promise::new(global, can_gc); promise.resolve_native(&(), can_gc); Ok(promise) }); @@ -1808,11 +1809,10 @@ impl ReadableByteStreamController { let promise = result.unwrap_or_else(|error| { let cx = GlobalScope::get_cx(); rooted!(in(*cx) let mut rval = UndefinedValue()); - // TODO: check if `self.global()` is the right globalscope. error .clone() - .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc); - let promise = Promise::new(&global, can_gc); + .to_jsval(cx, global, rval.handle_mut(), can_gc); + let promise = Promise::new(global, can_gc); promise.reject_native(&rval.handle(), can_gc); promise }); diff --git a/components/script/dom/readablestream.rs b/components/script/dom/readablestream.rs index 37899f18fec..51393ab33ae 100644 --- a/components/script/dom/readablestream.rs +++ b/components/script/dom/readablestream.rs @@ -3,10 +3,10 @@ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ use std::cell::{Cell, RefCell}; +use std::collections::HashMap; use std::collections::VecDeque; use std::ptr::{self}; use std::rc::Rc; -use std::collections::HashMap; use base::id::{MessagePortId, MessagePortIndex}; use constellation_traits::MessagePortImpl; @@ -22,12 +22,14 @@ use js::typedarray::ArrayBufferViewU8; use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy; use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{ - ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode, StreamPipeOptions + ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode, + StreamPipeOptions, }; use script_bindings::str::DOMString; use crate::dom::domexception::{DOMErrorName, DOMException}; use script_bindings::conversions::StringificationBehavior; +use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize; use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods; use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods; use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource; @@ -640,7 +642,7 @@ impl PipeTo { .reader .get_stream() .expect("Reader should have a stream."); - source.cancel(error.handle(), can_gc) + source.cancel(cx, global, error.handle(), can_gc) }, ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => { self.writer.close_with_error_propagation(cx, global, can_gc) @@ -766,19 +768,19 @@ impl PartialEq for ReaderType { /// #[cfg_attr(crown, allow(crown::unrooted_must_root))] -fn create_readable_stream( +pub(crate) fn create_readable_stream( global: &GlobalScope, underlying_source_type: UnderlyingSourceType, - queuing_strategy: QueuingStrategy, + queuing_strategy: Option>, + high_water_mark: Option, can_gc: CanGc, ) -> DomRoot { // If highWaterMark was not passed, set it to 1. - let high_water_mark = queuing_strategy.highWaterMark.unwrap_or(1.0); + let high_water_mark = high_water_mark.unwrap_or(1.0); // If sizeAlgorithm was not passed, set it to an algorithm that returns 1. - let size_algorithm = queuing_strategy - .size - .unwrap_or(extract_size_algorithm(&QueuingStrategy::empty(), can_gc)); + let size_algorithm = + queuing_strategy.unwrap_or(extract_size_algorithm(&QueuingStrategy::empty(), can_gc)); // Assert: ! IsNonNegativeNumber(highWaterMark) is true. assert!(high_water_mark >= 0.0); @@ -1437,19 +1439,24 @@ impl ReadableStream { /// #[allow(unsafe_code)] - pub(crate) fn cancel(&self, reason: SafeHandleValue, can_gc: CanGc) -> Rc { + pub(crate) fn cancel( + &self, + cx: SafeJSContext, + global: &GlobalScope, + reason: SafeHandleValue, + can_gc: CanGc, + ) -> Rc { // Set stream.[[disturbed]] to true. self.disturbed.set(true); // If stream.[[state]] is "closed", return a promise resolved with undefined. if self.is_closed() { - return Promise::new_resolved(&self.global(), GlobalScope::get_cx(), (), can_gc); + return Promise::new_resolved(global, cx, (), can_gc); } // If stream.[[state]] is "errored", return a promise rejected with stream.[[storedError]]. if self.is_errored() { - let promise = Promise::new(&self.global(), can_gc); + let promise = Promise::new(global, can_gc); unsafe { - let cx = GlobalScope::get_cx(); rooted!(in(*cx) let mut rval = UndefinedValue()); self.stored_error.to_jsval(*cx, rval.handle_mut()); promise.reject_native(&rval.handle(), can_gc); @@ -1473,11 +1480,11 @@ impl ReadableStream { Some(ControllerType::Default(controller)) => controller .get() .expect("Stream should have controller.") - .perform_cancel_steps(reason, can_gc), + .perform_cancel_steps(cx, global, reason, can_gc), Some(ControllerType::Byte(controller)) => controller .get() .expect("Stream should have controller.") - .perform_cancel_steps(reason, can_gc), + .perform_cancel_steps(cx, global, reason, can_gc), None => { panic!("Stream does not have a controller."); }, @@ -1587,7 +1594,8 @@ impl ReadableStream { let branch_1 = create_readable_stream( &self.global(), underlying_source_type_branch_1, - QueuingStrategy::empty(), + None, + None, can_gc, ); tee_source_1.set_branch_1(&branch_1); @@ -1597,7 +1605,8 @@ impl ReadableStream { let branch_2 = create_readable_stream( &self.global(), underlying_source_type_branch_2, - QueuingStrategy::empty(), + None, + None, can_gc, ); tee_source_1.set_branch_2(&branch_2); @@ -1908,16 +1917,17 @@ impl ReadableStreamMethods for ReadableStream { } /// - fn Cancel(&self, _cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc { + fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc { + let global = self.global(); if self.is_locked() { // If ! IsReadableStreamLocked(this) is true, // return a promise rejected with a TypeError exception. - let promise = Promise::new(&self.global(), can_gc); + let promise = Promise::new(&global, can_gc); promise.reject_error(Error::Type("stream is not locked".to_owned()), can_gc); promise } else { // Return ! ReadableStreamCancel(this, reason). - self.cancel(reason, can_gc) + self.cancel(cx, &global, reason, can_gc) } } diff --git a/components/script/dom/readablestreambyobreader.rs b/components/script/dom/readablestreambyobreader.rs index 16827c1add6..3ccfb255009 100644 --- a/components/script/dom/readablestreambyobreader.rs +++ b/components/script/dom/readablestreambyobreader.rs @@ -401,8 +401,8 @@ impl ReadableStreamBYOBReaderMethods for ReadableStreamBYO } /// - fn Cancel(&self, _cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc { - self.generic_cancel(&self.global(), reason, can_gc) + fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc { + self.generic_cancel(cx, &self.global(), reason, can_gc) } } diff --git a/components/script/dom/readablestreamdefaultcontroller.rs b/components/script/dom/readablestreamdefaultcontroller.rs index 66ba3d209c7..c52fb712a03 100644 --- a/components/script/dom/readablestreamdefaultcontroller.rs +++ b/components/script/dom/readablestreamdefaultcontroller.rs @@ -540,7 +540,7 @@ impl ReadableStreamDefaultController { let realm = enter_realm(&*global); let comp = InRealm::Entered(&realm); let result = underlying_source - .call_pull_algorithm(controller, can_gc) + .call_pull_algorithm(controller, &global, can_gc) .unwrap_or_else(|| { let promise = Promise::new(&global, can_gc); promise.resolve_native(&(), can_gc); @@ -563,6 +563,8 @@ impl ReadableStreamDefaultController { /// pub(crate) fn perform_cancel_steps( &self, + cx: SafeJSContext, + global: &GlobalScope, reason: SafeHandleValue, can_gc: CanGc, ) -> Rc { @@ -573,24 +575,21 @@ impl ReadableStreamDefaultController { .underlying_source .get() .expect("Controller should have a source when the cancel steps are called into."); - let global = self.global(); - // Let result be the result of performing this.[[cancelAlgorithm]], passing reason. let result = underlying_source - .call_cancel_algorithm(reason, can_gc) + .call_cancel_algorithm(cx, global, reason, can_gc) .unwrap_or_else(|| { - let promise = Promise::new(&global, can_gc); + let promise = Promise::new(global, can_gc); promise.resolve_native(&(), can_gc); Ok(promise) }); let promise = result.unwrap_or_else(|error| { - let cx = GlobalScope::get_cx(); rooted!(in(*cx) let mut rval = UndefinedValue()); - // TODO: check if `self.global()` is the right globalscope. + error .clone() - .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc); - let promise = Promise::new(&global, can_gc); + .to_jsval(cx, global, rval.handle_mut(), can_gc); + let promise = Promise::new(global, can_gc); promise.reject_native(&rval.handle(), can_gc); promise }); @@ -812,7 +811,7 @@ impl ReadableStreamDefaultController { } /// - fn get_desired_size(&self) -> Option { + pub(crate) fn get_desired_size(&self) -> Option { let stream = self.stream.get()?; // If state is "errored", return null. @@ -832,7 +831,7 @@ impl ReadableStreamDefaultController { } /// - fn can_close_or_enqueue(&self) -> bool { + pub(crate) fn can_close_or_enqueue(&self) -> bool { let Some(stream) = self.stream.get() else { return false; }; @@ -865,6 +864,14 @@ impl ReadableStreamDefaultController { stream.error(e, can_gc); } + + /// + #[allow(unused)] + pub(crate) fn has_backpressure(&self) -> bool { + // If ! ReadableStreamDefaultControllerShouldCallPull(controller) is true, return false. + // Otherwise, return true. + !self.should_call_pull() + } } impl ReadableStreamDefaultControllerMethods diff --git a/components/script/dom/readablestreamdefaultreader.rs b/components/script/dom/readablestreamdefaultreader.rs index f490627a2ee..7fd243b0b56 100644 --- a/components/script/dom/readablestreamdefaultreader.rs +++ b/components/script/dom/readablestreamdefaultreader.rs @@ -605,8 +605,8 @@ impl ReadableStreamDefaultReaderMethods for ReadableStream } /// - fn Cancel(&self, _cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc { - self.generic_cancel(&self.global(), reason, can_gc) + fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc { + self.generic_cancel(cx, &self.global(), reason, can_gc) } } diff --git a/components/script/dom/readablestreamgenericreader.rs b/components/script/dom/readablestreamgenericreader.rs index b437605953b..8ba1149bcb5 100644 --- a/components/script/dom/readablestreamgenericreader.rs +++ b/components/script/dom/readablestreamgenericreader.rs @@ -16,7 +16,7 @@ use crate::dom::globalscope::GlobalScope; use crate::dom::promise::Promise; use crate::dom::readablestreambyobreader::ReadableStreamBYOBReader; use crate::dom::readablestreamdefaultreader::ReadableStreamDefaultReader; -use crate::script_runtime::CanGc; +use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; /// pub(crate) trait ReadableStreamGenericReader { @@ -61,7 +61,13 @@ pub(crate) trait ReadableStreamGenericReader { } /// - fn reader_generic_cancel(&self, reason: SafeHandleValue, can_gc: CanGc) -> Rc { + fn reader_generic_cancel( + &self, + cx: SafeJSContext, + global: &GlobalScope, + reason: SafeHandleValue, + can_gc: CanGc, + ) -> Rc { // Let stream be reader.[[stream]]. let stream = self.get_stream(); @@ -70,7 +76,7 @@ pub(crate) trait ReadableStreamGenericReader { stream.expect("Reader should have a stream when generic cancel is called into."); // Return ! ReadableStreamCancel(stream, reason). - stream.cancel(reason, can_gc) + stream.cancel(cx, global, reason, can_gc) } /// @@ -135,6 +141,7 @@ pub(crate) trait ReadableStreamGenericReader { // fn generic_cancel( &self, + cx: SafeJSContext, global: &GlobalScope, reason: SafeHandleValue, can_gc: CanGc, @@ -147,7 +154,7 @@ pub(crate) trait ReadableStreamGenericReader { promise } else { // Return ! ReadableStreamReaderGenericCancel(this, reason). - self.reader_generic_cancel(reason, can_gc) + self.reader_generic_cancel(cx, global, reason, can_gc) } } diff --git a/components/script/dom/underlyingsourcecontainer.rs b/components/script/dom/underlyingsourcecontainer.rs index cf396825d4f..541a831693a 100644 --- a/components/script/dom/underlyingsourcecontainer.rs +++ b/components/script/dom/underlyingsourcecontainer.rs @@ -20,7 +20,7 @@ use crate::dom::defaultteeunderlyingsource::DefaultTeeUnderlyingSource; use crate::dom::globalscope::GlobalScope; use crate::dom::messageport::MessagePort; use crate::dom::promise::Promise; -use crate::script_runtime::CanGc; +use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; /// /// The `Js` variant corresponds to @@ -43,6 +43,11 @@ pub(crate) enum UnderlyingSourceType { Tee(Dom), /// Transfer, with the port used in some of the algorithms. Transfer(Dom), + /// A struct representing a JS object as underlying source, + /// and the actual JS object for use as `thisArg` in callbacks. + /// This is used for the `TransformStream` API. + #[allow(unused)] + Transform(/* Dom, Rc*/), } impl UnderlyingSourceType { @@ -110,6 +115,8 @@ impl UnderlyingSourceContainer { #[allow(unsafe_code)] pub(crate) fn call_cancel_algorithm( &self, + cx: SafeJSContext, + global: &GlobalScope, reason: SafeHandleValue, can_gc: CanGc, ) -> Option, Error>> { @@ -128,9 +135,13 @@ impl UnderlyingSourceContainer { } None }, - UnderlyingSourceType::Tee(tee_underlyin_source) => { + UnderlyingSourceType::Tee(tee_underlying_source) => { // Call the cancel algorithm for the appropriate branch. - tee_underlyin_source.cancel_algorithm(reason, can_gc) + tee_underlying_source.cancel_algorithm(cx, global, reason, can_gc) + }, + UnderlyingSourceType::Transform() => { + // Return ! TransformStreamDefaultSourceCancelAlgorithm(stream, reason). + todo!(); }, UnderlyingSourceType::Transfer(port) => { // Let cancelAlgorithm be the following steps, taking a reason argument: @@ -163,6 +174,7 @@ impl UnderlyingSourceContainer { pub(crate) fn call_pull_algorithm( &self, controller: Controller, + _global: &GlobalScope, can_gc: CanGc, ) -> Option, Error>> { match &self.underlying_source_type { @@ -180,9 +192,9 @@ impl UnderlyingSourceContainer { } None }, - UnderlyingSourceType::Tee(tee_underlyin_source) => { + UnderlyingSourceType::Tee(tee_underlying_source) => { // Call the pull algorithm for the appropriate branch. - Some(Ok(tee_underlyin_source.pull_algorithm(can_gc))) + Some(Ok(tee_underlying_source.pull_algorithm(can_gc))) }, UnderlyingSourceType::Transfer(port) => { // Let pullAlgorithm be the following steps: @@ -201,6 +213,10 @@ impl UnderlyingSourceContainer { Some(Ok(promise)) }, // Note: other source type have no pull steps for now. + UnderlyingSourceType::Transform() => { + // Return ! TransformStreamDefaultSourcePullAlgorithm(stream). + todo!(); + }, _ => None, } } @@ -264,6 +280,10 @@ impl UnderlyingSourceContainer { // from { + // Some(transform_underlying_source.start_algorithm()) + todo!(); + }, _ => None, } } diff --git a/components/script/dom/writablestream.rs b/components/script/dom/writablestream.rs index e7e9ce906a6..8c2b2434cd2 100644 --- a/components/script/dom/writablestream.rs +++ b/components/script/dom/writablestream.rs @@ -19,6 +19,7 @@ use js::rust::{ }; use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods; +use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize; use crate::dom::bindings::cell::DomRefCell; use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy; use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSink; @@ -209,6 +210,11 @@ impl WritableStream { self.controller.set(Some(controller)); } + #[allow(unused)] + pub(crate) fn get_default_controller(&self) -> DomRoot { + self.controller.get().expect("Controller should be set.") + } + pub(crate) fn is_writable(&self) -> bool { matches!(self.state.get(), WritableStreamState::Writable) } @@ -873,7 +879,6 @@ impl WritableStream { backpressure_promise: backpressure_promise.clone(), port: Dom::from_ref(port), }, - &UnderlyingSink::empty(), 1.0, size_algorithm, can_gc, @@ -892,9 +897,102 @@ impl WritableStream { // Perform ! SetUpWritableStreamDefaultController controller - .setup(cx, &global, self, &None, can_gc) + .setup(cx, &global, self, can_gc) .expect("Setup for transfer cannot fail"); } + /// + #[allow(clippy::too_many_arguments)] + pub(crate) fn setup_from_underlying_sink( + &self, + cx: SafeJSContext, + global: &GlobalScope, + stream: &WritableStream, + underlying_sink_obj: SafeHandleObject, + underlying_sink: &UnderlyingSink, + strategy_hwm: f64, + strategy_size: Rc, + can_gc: CanGc, + ) -> Result<(), Error> { + // Let controller be a new WritableStreamDefaultController. + + // Let startAlgorithm be an algorithm that returns undefined. + + // Let writeAlgorithm be an algorithm that returns a promise resolved with undefined. + + // Let closeAlgorithm be an algorithm that returns a promise resolved with undefined. + + // Let abortAlgorithm be an algorithm that returns a promise resolved with undefined. + + // If underlyingSinkDict["start"] exists, then set startAlgorithm to an algorithm which + // returns the result of invoking underlyingSinkDict["start"] with argument + // list « controller », exception behavior "rethrow", and callback this value underlyingSink. + + // If underlyingSinkDict["write"] exists, then set writeAlgorithm to an algorithm which + // takes an argument chunk and returns the result of invoking underlyingSinkDict["write"] + // with argument list « chunk, controller » and callback this value underlyingSink. + + // If underlyingSinkDict["close"] exists, then set closeAlgorithm to an algorithm which + // returns the result of invoking underlyingSinkDict["close"] with argument + // list «» and callback this value underlyingSink. + + // If underlyingSinkDict["abort"] exists, then set abortAlgorithm to an algorithm which + // takes an argument reason and returns the result of invoking underlyingSinkDict["abort"] + // with argument list « reason » and callback this value underlyingSink. + let controller = WritableStreamDefaultController::new( + global, + UnderlyingSinkType::new_js( + underlying_sink.abort.clone(), + underlying_sink.start.clone(), + underlying_sink.close.clone(), + underlying_sink.write.clone(), + ), + strategy_hwm, + strategy_size, + can_gc, + ); + + // Note: this must be done before `setup`, + // otherwise `thisOb` is null in the start callback. + controller.set_underlying_sink_this_object(underlying_sink_obj); + + // Perform ? SetUpWritableStreamDefaultController + controller.setup(cx, global, stream, can_gc) + } +} + +/// +#[cfg_attr(crown, allow(crown::unrooted_must_root))] +#[allow(unused)] +pub(crate) fn create_writable_stream( + cx: SafeJSContext, + global: &GlobalScope, + can_gc: CanGc, + writable_high_water_mark: f64, + writable_size_algorithm: Rc, + underlying_sink_type: UnderlyingSinkType, +) -> Fallible> { + // Assert: ! IsNonNegativeNumber(highWaterMark) is true. + assert!(writable_high_water_mark >= 0.0); + + // Let stream be a new WritableStream. + // Perform ! InitializeWritableStream(stream). + let stream = WritableStream::new_with_proto(global, None, can_gc); + + // Let controller be a new WritableStreamDefaultController. + let controller = WritableStreamDefaultController::new( + global, + underlying_sink_type, + writable_high_water_mark, + writable_size_algorithm, + can_gc, + ); + + // Perform ? SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, + // closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm). + controller.setup(cx, global, &stream, can_gc)?; + + // Return stream. + Ok(stream) } impl WritableStreamMethods for WritableStream { @@ -939,22 +1037,18 @@ impl WritableStreamMethods for WritableStream { // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1). let high_water_mark = extract_high_water_mark(strategy, 1.0)?; - // Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink - let controller = WritableStreamDefaultController::new( + // Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink(this, underlyingSink, + // underlyingSinkDict, highWaterMark, sizeAlgorithm). + stream.setup_from_underlying_sink( + cx, global, - UnderlyingSinkType::Js, + &stream, + underlying_sink_obj.handle(), &underlying_sink_dict, high_water_mark, size_algorithm, can_gc, - ); - - // Note: this must be done before `setup`, - // otherwise `thisOb` is null in the start callback. - controller.set_underlying_sink_this_object(underlying_sink_obj.handle()); - - // Perform ? SetUpWritableStreamDefaultController - controller.setup(cx, global, &stream, &underlying_sink_dict.start, can_gc)?; + )?; Ok(stream) } diff --git a/components/script/dom/writablestreamdefaultcontroller.rs b/components/script/dom/writablestreamdefaultcontroller.rs index 751f5d8d976..301404ffdb2 100644 --- a/components/script/dom/writablestreamdefaultcontroller.rs +++ b/components/script/dom/writablestreamdefaultcontroller.rs @@ -14,11 +14,11 @@ use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize; use crate::dom::bindings::callback::ExceptionHandling; use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{ - UnderlyingSink, UnderlyingSinkAbortCallback, UnderlyingSinkCloseCallback, - UnderlyingSinkStartCallback, UnderlyingSinkWriteCallback, + UnderlyingSinkAbortCallback, UnderlyingSinkCloseCallback, UnderlyingSinkStartCallback, + UnderlyingSinkWriteCallback, }; use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods; -use crate::dom::bindings::error::{Error, ErrorToJsval}; +use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible}; use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object}; use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom}; use crate::dom::globalscope::GlobalScope; @@ -268,15 +268,46 @@ impl Callback for WriteAlgorithmRejectionHandler { /// The type of sink algorithms we are using. #[derive(JSTraceable, PartialEq)] +#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] pub enum UnderlyingSinkType { /// Algorithms are provided by Js callbacks. - Js, + Js { + /// + abort: RefCell>>, + + start: RefCell>>, + + /// + close: RefCell>>, + + /// + write: RefCell>>, + }, /// Algorithms supporting streams transfer are implemented in Rust. /// The promise and port used in those algorithms are stored here. Transfer { backpressure_promise: Rc>>>, port: Dom, }, + /// Algorithms supporting transform streams are implemented in Rust. + #[allow(unused)] + Transform(/*Dom, Rc*/), +} + +impl UnderlyingSinkType { + pub(crate) fn new_js( + abort: Option>, + start: Option>, + close: Option>, + write: Option>, + ) -> Self { + UnderlyingSinkType::Js { + abort: RefCell::new(abort), + start: RefCell::new(start), + close: RefCell::new(close), + write: RefCell::new(write), + } + } } /// @@ -284,21 +315,11 @@ pub enum UnderlyingSinkType { pub struct WritableStreamDefaultController { reflector_: Reflector, - #[ignore_malloc_size_of = "Rc is hard"] + /// The type of underlying sink used. Besides the default JS one, + /// there will be others for stream transfer, and for transform stream. + #[ignore_malloc_size_of = "underlying_sink_type"] underlying_sink_type: UnderlyingSinkType, - /// - #[ignore_malloc_size_of = "Rc is hard"] - abort: RefCell>>, - - /// - #[ignore_malloc_size_of = "Rc is hard"] - close: RefCell>>, - - /// - #[ignore_malloc_size_of = "Rc is hard"] - write: RefCell>>, - /// The JS object used as `this` when invoking sink algorithms. #[ignore_malloc_size_of = "mozjs"] underlying_sink_obj: Heap<*mut JSObject>, @@ -325,7 +346,6 @@ impl WritableStreamDefaultController { #[cfg_attr(crown, allow(crown::unrooted_must_root))] fn new_inherited( underlying_sink_type: UnderlyingSinkType, - underlying_sink: &UnderlyingSink, strategy_hwm: f64, strategy_size: Rc, ) -> WritableStreamDefaultController { @@ -334,9 +354,6 @@ impl WritableStreamDefaultController { underlying_sink_type, queue: Default::default(), stream: Default::default(), - abort: RefCell::new(underlying_sink.abort.clone()), - close: RefCell::new(underlying_sink.close.clone()), - write: RefCell::new(underlying_sink.write.clone()), underlying_sink_obj: Default::default(), strategy_hwm, strategy_size: RefCell::new(Some(strategy_size)), @@ -344,10 +361,10 @@ impl WritableStreamDefaultController { } } + #[cfg_attr(crown, allow(crown::unrooted_must_root))] pub(crate) fn new( global: &GlobalScope, underlying_sink_type: UnderlyingSinkType, - underlying_sink: &UnderlyingSink, strategy_hwm: f64, strategy_size: Rc, can_gc: CanGc, @@ -355,7 +372,6 @@ impl WritableStreamDefaultController { reflect_dom_object( Box::new(WritableStreamDefaultController::new_inherited( underlying_sink_type, - underlying_sink, strategy_hwm, strategy_size, )), @@ -375,27 +391,44 @@ impl WritableStreamDefaultController { /// fn clear_algorithms(&self) { - // Set controller.[[writeAlgorithm]] to undefined. - self.write.borrow_mut().take(); + match &self.underlying_sink_type { + UnderlyingSinkType::Js { + abort, + start: _, + close, + write, + } => { + // Set controller.[[writeAlgorithm]] to undefined. + write.borrow_mut().take(); - // Set controller.[[closeAlgorithm]] to undefined. - self.close.borrow_mut().take(); + // Set controller.[[closeAlgorithm]] to undefined. + close.borrow_mut().take(); - // Set controller.[[abortAlgorithm]] to undefined. - self.abort.borrow_mut().take(); + // Set controller.[[abortAlgorithm]] to undefined. + abort.borrow_mut().take(); + }, + UnderlyingSinkType::Transfer { + backpressure_promise, + .. + } => { + backpressure_promise.borrow_mut().take(); + }, + UnderlyingSinkType::Transform() => { + return; + }, + } // Set controller.[[strategySizeAlgorithm]] to undefined. self.strategy_size.borrow_mut().take(); } - /// + /// #[allow(unsafe_code)] pub(crate) fn setup( &self, cx: SafeJSContext, global: &GlobalScope, stream: &WritableStream, - start: &Option>, can_gc: CanGc, ) -> Result<(), Error> { // Assert: stream implements WritableStream. @@ -436,40 +469,7 @@ impl WritableStreamDefaultController { // Let startResult be the result of performing startAlgorithm. (This may throw an exception.) // Let startPromise be a promise resolved with startResult. - let start_promise = if let Some(start) = start { - rooted!(in(*cx) let mut result_object = ptr::null_mut::()); - rooted!(in(*cx) let mut result: JSVal); - rooted!(in(*cx) let this_object = self.underlying_sink_obj.get()); - start.Call_( - &this_object.handle(), - self, - result.handle_mut(), - ExceptionHandling::Rethrow, - can_gc, - )?; - let is_promise = unsafe { - if result.is_object() { - result_object.set(result.to_object()); - IsPromiseObject(result_object.handle().into_handle()) - } else { - false - } - }; - if is_promise { - let promise = Promise::new_with_js_promise(result_object.handle(), cx); - promise - } else { - Promise::new_resolved(global, cx, result.get(), can_gc) - } - } else { - // Note: we are either here because the Js algorithm is none, - // or because we are suppporting a stream transfer as - // part of #abstract-opdef-setupcrossrealmtransformwritable - // and the logic is the same for both. - - // Let startAlgorithm be an algorithm that returns undefined. - Promise::new_resolved(global, cx, (), can_gc) - }; + let start_promise = self.start_algorithm(cx, global, can_gc)?; let rooted_default_controller = DomRoot::from_ref(self); @@ -509,6 +509,64 @@ impl WritableStreamDefaultController { self.advance_queue_if_needed(cx, global, can_gc); } + #[allow(unsafe_code)] + fn start_algorithm( + &self, + cx: SafeJSContext, + global: &GlobalScope, + can_gc: CanGc, + ) -> Fallible> { + match &self.underlying_sink_type { + UnderlyingSinkType::Js { + start, + abort: _, + close: _, + write: _, + } => { + let algo = start.borrow().clone(); + let start_promise = if let Some(start) = algo { + rooted!(in(*cx) let mut result_object = ptr::null_mut::()); + rooted!(in(*cx) let mut result: JSVal); + rooted!(in(*cx) let this_object = self.underlying_sink_obj.get()); + start.Call_( + &this_object.handle(), + self, + result.handle_mut(), + ExceptionHandling::Rethrow, + can_gc, + )?; + let is_promise = unsafe { + if result.is_object() { + result_object.set(result.to_object()); + IsPromiseObject(result_object.handle().into_handle()) + } else { + false + } + }; + if is_promise { + let promise = Promise::new_with_js_promise(result_object.handle(), cx); + promise + } else { + Promise::new_resolved(global, cx, result.get(), can_gc) + } + } else { + // Let startAlgorithm be an algorithm that returns undefined. + Promise::new_resolved(global, cx, (), can_gc) + }; + + Ok(start_promise) + }, + UnderlyingSinkType::Transfer { .. } => { + // Let startAlgorithm be an algorithm that returns undefined. + Ok(Promise::new_resolved(global, cx, (), can_gc)) + }, + UnderlyingSinkType::Transform() => { + // Let startAlgorithm be an algorithm that returns startPromise. + todo!() + }, + } + } + /// pub(crate) fn abort_steps( &self, @@ -517,10 +575,15 @@ impl WritableStreamDefaultController { reason: SafeHandleValue, can_gc: CanGc, ) -> Rc { - let result = match self.underlying_sink_type { - UnderlyingSinkType::Js => { + let result = match &self.underlying_sink_type { + UnderlyingSinkType::Js { + abort, + start: _, + close: _, + write: _, + } => { rooted!(in(*cx) let this_object = self.underlying_sink_obj.get()); - let algo = self.abort.borrow().clone(); + let algo = abort.borrow().clone(); // Let result be the result of performing this.[[abortAlgorithm]], passing reason. let result = if let Some(algo) = algo { algo.Call_( @@ -538,7 +601,7 @@ impl WritableStreamDefaultController { promise }) }, - UnderlyingSinkType::Transfer { ref port, .. } => { + UnderlyingSinkType::Transfer { port, .. } => { // The steps from the `abortAlgorithm` at // @@ -559,6 +622,10 @@ impl WritableStreamDefaultController { } promise }, + UnderlyingSinkType::Transform() => { + // Return ! TransformStreamDefaultSinkAbortAlgorithm(stream, reason). + todo!() + }, }; // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller). @@ -575,10 +642,15 @@ impl WritableStreamDefaultController { global: &GlobalScope, can_gc: CanGc, ) -> Rc { - match self.underlying_sink_type { - UnderlyingSinkType::Js => { + match &self.underlying_sink_type { + UnderlyingSinkType::Js { + abort: _, + start: _, + close: _, + write, + } => { rooted!(in(*cx) let this_object = self.underlying_sink_obj.get()); - let algo = self.write.borrow().clone(); + let algo = write.borrow().clone(); let result = if let Some(algo) = algo { algo.Call_( &this_object.handle(), @@ -597,9 +669,8 @@ impl WritableStreamDefaultController { }) }, UnderlyingSinkType::Transfer { - ref backpressure_promise, - ref port, - .. + backpressure_promise, + port, } => { // The steps from the `writeAlgorithm` at // @@ -636,6 +707,10 @@ impl WritableStreamDefaultController { .append_native_handler(&handler, comp, can_gc); result_promise }, + UnderlyingSinkType::Transform() => { + // Return ! TransformStreamDefaultSinkWriteAlgorithm(stream, chunk). + todo!() + }, } } @@ -646,11 +721,16 @@ impl WritableStreamDefaultController { global: &GlobalScope, can_gc: CanGc, ) -> Rc { - match self.underlying_sink_type { - UnderlyingSinkType::Js => { + match &self.underlying_sink_type { + UnderlyingSinkType::Js { + abort: _, + start: _, + close, + write: _, + } => { rooted!(in(*cx) let mut this_object = ptr::null_mut::()); this_object.set(self.underlying_sink_obj.get()); - let algo = self.close.borrow().clone(); + let algo = close.borrow().clone(); let result = if let Some(algo) = algo { algo.Call_(&this_object.handle(), ExceptionHandling::Rethrow, can_gc) } else { @@ -662,7 +742,7 @@ impl WritableStreamDefaultController { promise }) }, - UnderlyingSinkType::Transfer { ref port, .. } => { + UnderlyingSinkType::Transfer { port, .. } => { // The steps from the `closeAlgorithm` at // @@ -677,6 +757,10 @@ impl WritableStreamDefaultController { // Return a promise resolved with undefined. Promise::new_resolved(global, cx, (), can_gc) }, + UnderlyingSinkType::Transform() => { + // Return ! TransformStreamDefaultSinkCloseAlgorithm(stream). + todo!() + }, } } diff --git a/components/script/microtask.rs b/components/script/microtask.rs index 57f558c1aac..453e587e892 100644 --- a/components/script/microtask.rs +++ b/components/script/microtask.rs @@ -147,7 +147,7 @@ impl MicrotaskQueue { MutationObserver::notify_mutation_observers(can_gc); }, Microtask::ReadableStreamTeeReadRequest(ref task) => { - task.microtask_chunk_steps(can_gc) + task.microtask_chunk_steps(cx, can_gc) }, } } diff --git a/components/script_bindings/codegen/Bindings.conf b/components/script_bindings/codegen/Bindings.conf index c457bf70b85..c50bc31a7f5 100644 --- a/components/script_bindings/codegen/Bindings.conf +++ b/components/script_bindings/codegen/Bindings.conf @@ -764,6 +764,10 @@ DOMInterfaces = { 'inRealms': ['Abort', 'Close', 'Write'], }, +'TransformStreamDefaultController': { + 'canGc': ['Enqueue', 'Error', 'Terminate'], +}, + 'WorkerNavigator': { 'canGc': ['Languages'], },