/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::collections::VecDeque;
use std::ptr::{self};
use std::rc::Rc;

use base::id::{MessagePortId, MessagePortIndex};
use constellation_traits::MessagePortImpl;
use dom_struct::dom_struct;
use ipc_channel::ipc::IpcSharedMemory;
use js::conversions::ToJSValConvertible;
use js::jsapi::{Heap, JSObject};
use js::jsval::{JSVal, ObjectValue, UndefinedValue};
use js::rust::{
    HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
    MutableHandleValue as SafeMutableHandleValue,
};
use js::typedarray::ArrayBufferViewU8;

use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
    ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
    ReadableWritablePair, StreamPipeOptions,
};
use script_bindings::str::DOMString;

use crate::dom::domexception::{DOMErrorName, DOMException};
use script_bindings::conversions::{is_array_like, StringificationBehavior};
use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult};
use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
use crate::dom::bindings::codegen::GenericBindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriter_Binding::WritableStreamDefaultWriterMethods;
use crate::dom::writablestream::WritableStream;
use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
use crate::dom::bindings::trace::RootedTraceableBox;
use crate::dom::bindings::utils::get_dictionary_property;
use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
use crate::dom::readablestreamgenericreader::ReadableStreamGenericReader;
use crate::dom::globalscope::GlobalScope;
use crate::dom::promise::{wait_for_all_promise, Promise};
use crate::dom::readablebytestreamcontroller::ReadableByteStreamController;
use crate::dom::readablestreambyobreader::ReadableStreamBYOBReader;
use crate::dom::readablestreamdefaultcontroller::ReadableStreamDefaultController;
use crate::dom::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
use crate::dom::defaultteeunderlyingsource::TeeCancelAlgorithm;
use crate::dom::types::DefaultTeeUnderlyingSource;
use crate::dom::underlyingsourcecontainer::UnderlyingSourceType;
use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter;
use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
use crate::dom::messageport::MessagePort;
use crate::js::conversions::FromJSValConvertible;
use crate::realms::{enter_realm, InRealm};
use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
use crate::dom::bindings::transferable::Transferable;
use crate::dom::bindings::structuredclone::StructuredData;

use super::bindings::buffer_source::HeapBufferSource;
use super::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderReadOptions;
use super::readablestreambyobreader::ReadIntoRequest;

/// State Machine for `PipeTo`.
#[derive(Clone, Debug, Default, PartialEq)]
enum PipeToState {
    /// The starting state
    #[default]
    Starting,
    /// Waiting for the writer to be ready
    PendingReady,
    /// Waiting for a read to resolve.
    PendingRead,
    /// Waiting for all pending writes to finish,
    /// as part of shutting down with an optional action.
    ShuttingDownWithPendingWrites(Option<ShutdownAction>),
    /// When shutting down with an action,
    /// waiting for the action to complete,
    /// at which point we can `finalize`.
    ShuttingDownPendingAction,
    /// The pipe has been finalized,
    /// no further actions should be performed.
    Finalized,
}

/// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
#[derive(Clone, Debug, PartialEq)]
enum ShutdownAction {
    /// <https://streams.spec.whatwg.org/#writable-stream-abort>
    WritableStreamAbort,
    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
    ReadableStreamCancel,
    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation>
    WritableStreamDefaultWriterCloseWithErrorPropagation,
    /// <https://streams.spec.whatwg.org/#ref-for-rs-pipeTo-shutdown-with-action>
    Abort,
}

impl js::gc::Rootable for PipeTo {}

/// The "in parallel, but not really" part of
/// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
///
/// Note: the spec is flexible about how this is done, but requires the following constraints to apply:
/// - Public API must not be used: we'll only use Rust.
/// - Backpressure must be enforced: we'll only read from source when dest is ready.
/// - Shutdown must stop activity: we'll do this together with the below.
/// - Error and close states must be propagated: we'll do this by checking these states at every step.
#[derive(Clone, JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
pub(crate) struct PipeTo {
    /// <https://streams.spec.whatwg.org/#ref-for-readablestream%E2%91%A7%E2%91%A0>
    reader: Dom<ReadableStreamDefaultReader>,

    /// <https://streams.spec.whatwg.org/#ref-for-acquire-writable-stream-default-writer>
    writer: Dom<WritableStreamDefaultWriter>,

    /// Pending writes are needed when shutting down(with an action),
    /// because we can only finalize when all writes are finished.
    #[ignore_malloc_size_of = "Rc are hard"]
    pending_writes: Rc<RefCell<VecDeque<Rc<Promise>>>>,

    /// The state machine.
    #[ignore_malloc_size_of = "Rc are hard"]
    #[no_trace]
    state: Rc<RefCell<PipeToState>>,

    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventabort>
    prevent_abort: bool,

    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventcancel>
    prevent_cancel: bool,

    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventclose>
    prevent_close: bool,

    /// The `shuttingDown` variable of
    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
    #[ignore_malloc_size_of = "Rc are hard"]
    shutting_down: Rc<Cell<bool>>,

    /// The abort reason of the abort signal,
    /// stored here because we must keep it across a microtask.
    #[ignore_malloc_size_of = "mozjs"]
    abort_reason: Rc<Heap<JSVal>>,

    /// The error potentially passed to shutdown,
    /// stored here because we must keep it across a microtask.
    #[ignore_malloc_size_of = "mozjs"]
    shutdown_error: Rc<RefCell<Option<Heap<JSVal>>>>,

    /// The promise returned by a shutdown action.
    /// We keep it to only continue when it is not pending anymore.
    #[ignore_malloc_size_of = "Rc are hard"]
    shutdown_action_promise: Rc<RefCell<Option<Rc<Promise>>>>,

    /// The promise resolved or rejected at
    /// <https://streams.spec.whatwg.org/#rs-pipeTo-finalize>
    #[ignore_malloc_size_of = "Rc are hard"]
    result_promise: Rc<Promise>,
}

impl PipeTo {
    /// Run the `abortAlgorithm` defined at
    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
    pub(crate) fn abort_with_reason(
        &self,
        cx: SafeJSContext,
        global: &GlobalScope,
        reason: SafeHandleValue,
        realm: InRealm,
        can_gc: CanGc,
    ) {
        // Abort should do nothing if we are already shutting down.
        if self.shutting_down.get() {
            return;
        }

        // Let error be signal’s abort reason.
        // Note: storing it because it may need to be kept across a microtask,
        // and see the note below as to why it is kept separately from `shutdown_error`.
        self.abort_reason.set(reason.get());

        // Note: setting the error now,
        // will result in a rejection of the pipe promise, with this error.
        // Unless any shutdown action raise their own error,
        // in which case this error will be overwritten by the shutdown action error.
        {
            let mut error = Some(Heap::default());
            // Setting the value on the heap after it has been moved.
            if let Some(heap) = error.as_mut() {
                heap.set(reason.get())
            }
            *self.shutdown_error.borrow_mut() = error;
        }

        // Let actions be an empty ordered set.
        // Note: the actions are defined, and performed, inside `shutdown_with_an_action`.

        // Shutdown with an action consisting of getting a promise to wait for all of the actions in actions,
        // and with error.
        self.shutdown(cx, global, Some(ShutdownAction::Abort), realm, can_gc);
    }
}

impl Callback for PipeTo {
    /// The pipe makes progress one microtask at a time.
    /// Note: we use one struct as the callback for all promises,
    /// and for both of their reactions.
    ///
    /// The context of the callback is determined from:
    /// - the current state.
    /// - the type of `result`.
    /// - the state of a stored promise(in some cases).
    #[allow(unsafe_code)]
    fn callback(&self, cx: SafeJSContext, result: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
        let global = self.reader.global();

        // Note: we only care about the result of writes when they are rejected,
        // and the error is accessed not through handlers,
        // but directly using `dest.get_stored_error`.
        // So we must mark rejected promises as handled
        // to prevent unhandled rejection errors.
        self.pending_writes.borrow_mut().retain(|p| {
            let pending = p.is_pending();
            if !pending {
                p.set_promise_is_handled();
            }
            pending
        });

        // Note: cloning to prevent re-borrow in methods called below.
        let state_before_checks = self.state.borrow().clone();

        // Note: if we are in a `PendingRead` state,
        // and the source is closed,
        // we try to write chunks before doing any shutdown,
        // which is necessary to implement the
        // "If any chunks have been read but not yet written, write them to dest."
        // part of shutdown.
        if state_before_checks == PipeToState::PendingRead {
            let source = self.reader.get_stream().expect("Source stream must be set");
            if source.is_closed() {
                let dest = self
                    .writer
                    .get_stream()
                    .expect("Destination stream must be set");

                // If dest.[[state]] is "writable",
                // and ! WritableStreamCloseQueuedOrInFlight(dest) is false,
                if dest.is_writable() && !dest.close_queued_or_in_flight() {
                    let has_done = {
                        if !result.is_object() {
                            false
                        } else {
                            rooted!(in(*cx) let object = result.to_object());
                            rooted!(in(*cx) let mut done = UndefinedValue());
                            unsafe {
                                get_dictionary_property(
                                    *cx,
                                    object.handle(),
                                    "done",
                                    done.handle_mut(),
                                    can_gc,
                                )
                                .unwrap()
                            }
                        }
                    };
                    // If any chunks have been read but not yet written, write them to dest.
                    let contained_bytes = self.write_chunk(cx, &global, result, can_gc);

                    if !contained_bytes && !has_done {
                        // This is the case that the microtask ran in reaction
                        // to the closed promise of the reader,
                        // so we should wait for subsequent chunks,
                        // and skip the shutdown below
                        // (reader is closed, but there are still pending reads).
                        // Shutdown will happen when the last chunk has been received.
                        return;
                    }
                }
            }
        }

        self.check_and_propagate_errors_forward(cx, &global, realm, can_gc);
        self.check_and_propagate_errors_backward(cx, &global, realm, can_gc);
        self.check_and_propagate_closing_forward(cx, &global, realm, can_gc);
        self.check_and_propagate_closing_backward(cx, &global, realm, can_gc);

        // Note: cloning to prevent re-borrow in methods called below.
        let state = self.state.borrow().clone();

        // If we switched to a shutdown state,
        // return.
        // Progress will be made at the next tick.
        if state != state_before_checks {
            return;
        }

        match state {
            PipeToState::Starting => unreachable!("PipeTo should not be in the Starting state."),
            PipeToState::PendingReady => {
                // Read a chunk.
                self.read_chunk(&global, realm, can_gc);
            },
            PipeToState::PendingRead => {
                // Write the chunk.
                self.write_chunk(cx, &global, result, can_gc);

                // An early return is necessary if the write algorithm aborted the pipe.
                if self.shutting_down.get() {
                    return;
                }

                // Wait for the writer to be ready again.
                self.wait_for_writer_ready(&global, realm, can_gc);
            },
            PipeToState::ShuttingDownWithPendingWrites(action) => {
                // Wait until every chunk that has been read has been written
                // (i.e. the corresponding promises have settled).
                if let Some(write) = self.pending_writes.borrow_mut().front().cloned() {
                    self.wait_on_pending_write(&global, write, realm, can_gc);
                    return;
                }

                // Note: error is stored in `self.shutdown_error`.
                if let Some(action) = action {
                    // Let p be the result of performing action.
                    self.perform_action(cx, &global, action, realm, can_gc);
                } else {
                    // Finalize, passing along error if it was given.
                    self.finalize(cx, &global, can_gc);
                }
            },
            PipeToState::ShuttingDownPendingAction => {
                let Some(ref promise) = *self.shutdown_action_promise.borrow() else {
                    unreachable!();
                };
                if promise.is_pending() {
                    // While waiting for the action to complete,
                    // we may get callbacks for other promises(closed, ready),
                    // and we should ignore those.
                    return;
                }

                let is_array_like = {
                    if !result.is_object() {
                        false
                    } else {
                        unsafe { is_array_like::<crate::DomTypeHolder>(*cx, result) }
                    }
                };

                // Finalize, passing along error if it was given.
                if !result.is_undefined() && !is_array_like {
                    // Most actions either resolve with undefined,
                    // or reject with an error,
                    // and the error should be used when finalizing.
                    // One exception is the `Abort` action,
                    // which resolves with a list of undefined values.

                    // If `result` isn't undefined or array-like,
                    // then it is an error
                    // and should overwrite the current shutdown error.
                    {
                        let mut error = Some(Heap::default());
                        // Setting the value on the heap after it has been moved.
                        if let Some(heap) = error.as_mut() {
                            heap.set(result.get())
                        }
                        *self.shutdown_error.borrow_mut() = error;
                    }
                }
                self.finalize(cx, &global, can_gc);
            },
            PipeToState::Finalized => {},
        }
    }
}

impl PipeTo {
    /// Wait for the writer to be ready,
    /// which implements the constraint that backpressure must be enforced.
    fn wait_for_writer_ready(&self, global: &GlobalScope, realm: InRealm, can_gc: CanGc) {
        {
            let mut state = self.state.borrow_mut();
            *state = PipeToState::PendingReady;
        }

        let ready_promise = self.writer.Ready();
        if ready_promise.is_fulfilled() {
            self.read_chunk(global, realm, can_gc);
        } else {
            let handler = PromiseNativeHandler::new(
                global,
                Some(Box::new(self.clone())),
                Some(Box::new(self.clone())),
                can_gc,
            );
            ready_promise.append_native_handler(&handler, realm, can_gc);

            // Note: if the writer is not ready,
            // in order to ensure progress we must
            // also react to the closure of the source(because source may close empty).
            let closed_promise = self.reader.Closed();
            closed_promise.append_native_handler(&handler, realm, can_gc);
        }
    }

    /// Read a chunk
    fn read_chunk(&self, global: &GlobalScope, realm: InRealm, can_gc: CanGc) {
        *self.state.borrow_mut() = PipeToState::PendingRead;
        let chunk_promise = self.reader.Read(can_gc);
        let handler = PromiseNativeHandler::new(
            global,
            Some(Box::new(self.clone())),
            Some(Box::new(self.clone())),
            can_gc,
        );
        chunk_promise.append_native_handler(&handler, realm, can_gc);

        // Note: in order to ensure progress we must
        // also react to the closure of the destination.
        let ready_promise = self.writer.Closed();
        ready_promise.append_native_handler(&handler, realm, can_gc);
    }

    /// Try to write a chunk using the jsval, and returns wether it succeeded
    // It will fail if it is the last `done` chunk, or if it is not a chunk at all.
    #[allow(unsafe_code)]
    fn write_chunk(
        &self,
        cx: SafeJSContext,
        global: &GlobalScope,
        chunk: SafeHandleValue,
        can_gc: CanGc,
    ) -> bool {
        if chunk.is_object() {
            rooted!(in(*cx) let object = chunk.to_object());
            rooted!(in(*cx) let mut bytes = UndefinedValue());
            let has_value = unsafe {
                get_dictionary_property(*cx, object.handle(), "value", bytes.handle_mut(), can_gc)
                    .expect("Chunk should have a value.")
            };
            if !bytes.is_undefined() && has_value {
                // Write the chunk.
                let write_promise = self.writer.write(cx, global, bytes.handle(), can_gc);
                self.pending_writes.borrow_mut().push_back(write_promise);
                return true;
            }
        }
        false
    }

    /// Only as part of shutting-down do we wait on pending writes
    /// (backpressure is communicated not through pending writes
    /// but through the readiness of the writer).
    fn wait_on_pending_write(
        &self,
        global: &GlobalScope,
        promise: Rc<Promise>,
        realm: InRealm,
        can_gc: CanGc,
    ) {
        let handler = PromiseNativeHandler::new(
            global,
            Some(Box::new(self.clone())),
            Some(Box::new(self.clone())),
            can_gc,
        );
        promise.append_native_handler(&handler, realm, can_gc);
    }

    /// Errors must be propagated forward part of
    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
    fn check_and_propagate_errors_forward(
        &self,
        cx: SafeJSContext,
        global: &GlobalScope,
        realm: InRealm,
        can_gc: CanGc,
    ) {
        // An early return is necessary if we are shutting down,
        // because in that case the source can already have been set to none.
        if self.shutting_down.get() {
            return;
        }

        // if source.[[state]] is or becomes "errored", then
        let source = self
            .reader
            .get_stream()
            .expect("Reader should still have a stream");
        if source.is_errored() {
            rooted!(in(*cx) let mut source_error = UndefinedValue());
            source.get_stored_error(source_error.handle_mut());
            {
                let mut error = Some(Heap::default());
                // Setting the value on the heap after it has been moved.
                if let Some(heap) = error.as_mut() {
                    heap.set(source_error.get())
                }
                *self.shutdown_error.borrow_mut() = error;
            }

            // If preventAbort is false,
            if !self.prevent_abort {
                // shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]])
                // and with source.[[storedError]].
                self.shutdown(
                    cx,
                    global,
                    Some(ShutdownAction::WritableStreamAbort),
                    realm,
                    can_gc,
                )
            } else {
                // Otherwise, shutdown with source.[[storedError]].
                self.shutdown(cx, global, None, realm, can_gc);
            }
        }
    }

    /// Errors must be propagated backward part of
    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
    fn check_and_propagate_errors_backward(
        &self,
        cx: SafeJSContext,
        global: &GlobalScope,
        realm: InRealm,
        can_gc: CanGc,
    ) {
        // An early return is necessary if we are shutting down,
        // because in that case the destination can already have been set to none.
        if self.shutting_down.get() {
            return;
        }

        // if dest.[[state]] is or becomes "errored", then
        let dest = self
            .writer
            .get_stream()
            .expect("Writer should still have a stream");
        if dest.is_errored() {
            rooted!(in(*cx) let mut dest_error = UndefinedValue());
            dest.get_stored_error(dest_error.handle_mut());
            {
                let mut error = Some(Heap::default());
                // Setting the value on the heap after it has been moved.
                if let Some(heap) = error.as_mut() {
                    heap.set(dest_error.get())
                }
                *self.shutdown_error.borrow_mut() = error;
            }

            // If preventCancel is false,
            if !self.prevent_cancel {
                // shutdown with an action of ! ReadableStreamCancel(source, dest.[[storedError]])
                // and with dest.[[storedError]].
                self.shutdown(
                    cx,
                    global,
                    Some(ShutdownAction::ReadableStreamCancel),
                    realm,
                    can_gc,
                )
            } else {
                // Otherwise, shutdown with dest.[[storedError]].
                self.shutdown(cx, global, None, realm, can_gc);
            }
        }
    }

    /// Closing must be propagated forward part of
    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
    fn check_and_propagate_closing_forward(
        &self,
        cx: SafeJSContext,
        global: &GlobalScope,
        realm: InRealm,
        can_gc: CanGc,
    ) {
        // An early return is necessary if we are shutting down,
        // because in that case the source can already have been set to none.
        if self.shutting_down.get() {
            return;
        }

        // if source.[[state]] is or becomes "closed", then
        let source = self
            .reader
            .get_stream()
            .expect("Reader should still have a stream");
        if source.is_closed() {
            // If preventClose is false,
            if !self.prevent_close {
                // shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]])
                // and with source.[[storedError]].
                self.shutdown(
                    cx,
                    global,
                    Some(ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation),
                    realm,
                    can_gc,
                )
            } else {
                // Otherwise, shutdown.
                self.shutdown(cx, global, None, realm, can_gc);
            }
        }
    }

    /// Closing must be propagated backward part of
    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
    fn check_and_propagate_closing_backward(
        &self,
        cx: SafeJSContext,
        global: &GlobalScope,
        realm: InRealm,
        can_gc: CanGc,
    ) {
        // An early return is necessary if we are shutting down,
        // because in that case the destination can already have been set to none.
        if self.shutting_down.get() {
            return;
        }

        // if ! WritableStreamCloseQueuedOrInFlight(dest) is true
        // or dest.[[state]] is "closed"
        let dest = self
            .writer
            .get_stream()
            .expect("Writer should still have a stream");
        if dest.close_queued_or_in_flight() || dest.is_closed() {
            // Assert: no chunks have been read or written.
            // Note: unclear how to perform this assertion.

            // Let destClosed be a new TypeError.
            rooted!(in(*cx) let mut dest_closed = UndefinedValue());
            let error =
                Error::Type("Destination is closed or has closed queued or in flight".to_string());
            error.to_jsval(cx, global, dest_closed.handle_mut(), can_gc);
            {
                let mut error = Some(Heap::default());
                // Setting the value on the heap after it has been moved.
                if let Some(heap) = error.as_mut() {
                    heap.set(dest_closed.get())
                }
                *self.shutdown_error.borrow_mut() = error;
            }

            // If preventCancel is false,
            if !self.prevent_cancel {
                // shutdown with an action of ! ReadableStreamCancel(source, destClosed)
                // and with destClosed.
                self.shutdown(
                    cx,
                    global,
                    Some(ShutdownAction::ReadableStreamCancel),
                    realm,
                    can_gc,
                )
            } else {
                // Otherwise, shutdown with destClosed.
                self.shutdown(cx, global, None, realm, can_gc);
            }
        }
    }

    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown>
    /// Combined into one method with an optional action.
    fn shutdown(
        &self,
        cx: SafeJSContext,
        global: &GlobalScope,
        action: Option<ShutdownAction>,
        realm: InRealm,
        can_gc: CanGc,
    ) {
        // If shuttingDown is true, abort these substeps.
        // Set shuttingDown to true.
        if !self.shutting_down.replace(true) {
            let dest = self.writer.get_stream().expect("Stream must be set");
            // If dest.[[state]] is "writable",
            // and ! WritableStreamCloseQueuedOrInFlight(dest) is false,
            if dest.is_writable() && !dest.close_queued_or_in_flight() {
                // If any chunks have been read but not yet written, write them to dest.
                // Done at the top of `Callback`.

                // Wait until every chunk that has been read has been written
                // (i.e. the corresponding promises have settled).
                if let Some(write) = self.pending_writes.borrow_mut().front() {
                    *self.state.borrow_mut() = PipeToState::ShuttingDownWithPendingWrites(action);
                    self.wait_on_pending_write(global, write.clone(), realm, can_gc);
                    return;
                }
            }

            // Note: error is stored in `self.shutdown_error`.
            if let Some(action) = action {
                // Let p be the result of performing action.
                self.perform_action(cx, global, action, realm, can_gc);
            } else {
                // Finalize, passing along error if it was given.
                self.finalize(cx, global, can_gc);
            }
        }
    }

    /// The perform action part of
    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
    fn perform_action(
        &self,
        cx: SafeJSContext,
        global: &GlobalScope,
        action: ShutdownAction,
        realm: InRealm,
        can_gc: CanGc,
    ) {
        rooted!(in(*cx) let mut error = UndefinedValue());
        if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
            error.set(shutdown_error.get());
        }

        *self.state.borrow_mut() = PipeToState::ShuttingDownPendingAction;

        // Let p be the result of performing action.
        let promise = match action {
            ShutdownAction::WritableStreamAbort => {
                let dest = self.writer.get_stream().expect("Stream must be set");
                dest.abort(cx, global, error.handle(), realm, can_gc)
            },
            ShutdownAction::ReadableStreamCancel => {
                let source = self
                    .reader
                    .get_stream()
                    .expect("Reader should have a stream.");
                source.cancel(cx, global, error.handle(), can_gc)
            },
            ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => {
                self.writer.close_with_error_propagation(cx, global, can_gc)
            },
            ShutdownAction::Abort => {
                // Note: implementation of the the `abortAlgorithm`
                // of the signal associated with this piping operation.

                // Let error be signal’s abort reason.
                rooted!(in(*cx) let mut error = UndefinedValue());
                error.set(self.abort_reason.get());

                // Let actions be an empty ordered set.
                let mut actions = vec![];

                // If preventAbort is false, append the following action to actions:
                if !self.prevent_abort {
                    let dest = self
                        .writer
                        .get_stream()
                        .expect("Destination stream must be set");

                    // If dest.[[state]] is "writable",
                    let promise = if dest.is_writable() {
                        // return ! WritableStreamAbort(dest, error)
                        dest.abort(cx, global, error.handle(), realm, can_gc)
                    } else {
                        // Otherwise, return a promise resolved with undefined.
                        Promise::new_resolved(global, cx, (), can_gc)
                    };
                    actions.push(promise);
                }

                // If preventCancel is false, append the following action action to actions:
                if !self.prevent_cancel {
                    let source = self.reader.get_stream().expect("Source stream must be set");

                    // If source.[[state]] is "readable",
                    let promise = if source.is_readable() {
                        // return ! ReadableStreamCancel(source, error).
                        source.cancel(cx, global, error.handle(), can_gc)
                    } else {
                        // Otherwise, return a promise resolved with undefined.
                        Promise::new_resolved(global, cx, (), can_gc)
                    };
                    actions.push(promise);
                }

                // Shutdown with an action consisting
                // of getting a promise to wait for all of the actions in actions,
                // and with error.
                wait_for_all_promise(cx, global, actions, realm, can_gc)
            },
        };

        // Upon fulfillment of p, finalize, passing along originalError if it was given.
        // Upon rejection of p with reason newError, finalize with newError.
        let handler = PromiseNativeHandler::new(
            global,
            Some(Box::new(self.clone())),
            Some(Box::new(self.clone())),
            can_gc,
        );
        promise.append_native_handler(&handler, realm, can_gc);
        *self.shutdown_action_promise.borrow_mut() = Some(promise);
    }

    /// <https://streams.spec.whatwg.org/#rs-pipeTo-finalize>
    fn finalize(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
        *self.state.borrow_mut() = PipeToState::Finalized;

        // Perform ! WritableStreamDefaultWriterRelease(writer).
        self.writer.release(cx, global, can_gc);

        // If reader implements ReadableStreamBYOBReader,
        // perform ! ReadableStreamBYOBReaderRelease(reader).
        // TODO.

        // Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader).
        self.reader
            .release(can_gc)
            .expect("Releasing the reader should not fail");

        // If signal is not undefined, remove abortAlgorithm from signal.
        // Note: since `self.shutdown` is true at this point,
        // the abort algorithm is a no-op,
        // so for now not implementing this step.

        if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
            rooted!(in(*cx) let mut error = UndefinedValue());
            error.set(shutdown_error.get());
            // If error was given, reject promise with error.
            self.result_promise.reject_native(&error.handle(), can_gc);
        } else {
            // Otherwise, resolve promise with undefined.
            self.result_promise.resolve_native(&(), can_gc);
        }
    }
}

/// The fulfillment handler for the reacting to sourceCancelPromise part of
/// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
#[derive(Clone, JSTraceable, MallocSizeOf)]
struct SourceCancelPromiseFulfillmentHandler {
    #[ignore_malloc_size_of = "Rc are hard"]
    result: Rc<Promise>,
}

impl Callback for SourceCancelPromiseFulfillmentHandler {
    /// The fulfillment handler for the reacting to sourceCancelPromise part of
    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
    /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled>
    fn callback(&self, _cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
        self.result.resolve_native(&(), can_gc);
    }
}

/// The rejection handler for the reacting to sourceCancelPromise part of
/// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
#[derive(Clone, JSTraceable, MallocSizeOf)]
struct SourceCancelPromiseRejectionHandler {
    #[ignore_malloc_size_of = "Rc are hard"]
    result: Rc<Promise>,
}

impl Callback for SourceCancelPromiseRejectionHandler {
    /// The rejection handler for the reacting to sourceCancelPromise part of
    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
    /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled>
    fn callback(&self, _cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
        self.result.reject_native(&v, can_gc);
    }
}

/// <https://streams.spec.whatwg.org/#readablestream-state>
#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
pub(crate) enum ReadableStreamState {
    #[default]
    Readable,
    Closed,
    Errored,
}

/// <https://streams.spec.whatwg.org/#readablestream-controller>
#[derive(JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
pub(crate) enum ControllerType {
    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
    Byte(MutNullableDom<ReadableByteStreamController>),
    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller>
    Default(MutNullableDom<ReadableStreamDefaultController>),
}

/// <https://streams.spec.whatwg.org/#readablestream-readerr>
#[derive(JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
pub(crate) enum ReaderType {
    /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
    #[allow(clippy::upper_case_acronyms)]
    BYOB(MutNullableDom<ReadableStreamBYOBReader>),
    /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
    Default(MutNullableDom<ReadableStreamDefaultReader>),
}

impl Eq for ReaderType {}
impl PartialEq for ReaderType {
    fn eq(&self, other: &Self) -> bool {
        matches!(
            (self, other),
            (ReaderType::BYOB(_), ReaderType::BYOB(_)) |
                (ReaderType::Default(_), ReaderType::Default(_))
        )
    }
}

/// <https://streams.spec.whatwg.org/#create-readable-stream>
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
pub(crate) fn create_readable_stream(
    global: &GlobalScope,
    underlying_source_type: UnderlyingSourceType,
    queuing_strategy: Option<Rc<QueuingStrategySize>>,
    high_water_mark: Option<f64>,
    can_gc: CanGc,
) -> DomRoot<ReadableStream> {
    // If highWaterMark was not passed, set it to 1.
    let high_water_mark = high_water_mark.unwrap_or(1.0);

    // If sizeAlgorithm was not passed, set it to an algorithm that returns 1.
    let size_algorithm =
        queuing_strategy.unwrap_or(extract_size_algorithm(&QueuingStrategy::empty(), can_gc));

    // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
    assert!(high_water_mark >= 0.0);

    // Let stream be a new ReadableStream.
    // Perform ! InitializeReadableStream(stream).
    let stream = ReadableStream::new_with_proto(global, None, can_gc);

    // Let controller be a new ReadableStreamDefaultController.
    let controller = ReadableStreamDefaultController::new(
        global,
        underlying_source_type,
        high_water_mark,
        size_algorithm,
        can_gc,
    );

    // Perform ? SetUpReadableStreamDefaultController(stream, controller, startAlgorithm,
    // pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm).
    controller
        .setup(stream.clone(), can_gc)
        .expect("Setup of default controller cannot fail");

    // Return stream.
    stream
}

/// <https://streams.spec.whatwg.org/#rs-class>
#[dom_struct]
pub(crate) struct ReadableStream {
    reflector_: Reflector,

    /// <https://streams.spec.whatwg.org/#readablestream-controller>
    /// Note: the inner `MutNullableDom` should really be an `Option<Dom>`,
    /// because it is never unset once set.
    controller: RefCell<Option<ControllerType>>,

    /// <https://streams.spec.whatwg.org/#readablestream-storederror>
    #[ignore_malloc_size_of = "mozjs"]
    stored_error: Heap<JSVal>,

    /// <https://streams.spec.whatwg.org/#readablestream-disturbed>
    disturbed: Cell<bool>,

    /// <https://streams.spec.whatwg.org/#readablestream-reader>
    reader: RefCell<Option<ReaderType>>,

    /// <https://streams.spec.whatwg.org/#readablestream-state>
    state: Cell<ReadableStreamState>,
}

impl ReadableStream {
    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
    /// <https://streams.spec.whatwg.org/#initialize-readable-stream>
    fn new_inherited() -> ReadableStream {
        ReadableStream {
            reflector_: Reflector::new(),
            controller: RefCell::new(None),
            stored_error: Heap::default(),
            disturbed: Default::default(),
            reader: RefCell::new(None),
            state: Cell::new(Default::default()),
        }
    }

    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
    pub(crate) fn new_with_proto(
        global: &GlobalScope,
        proto: Option<SafeHandleObject>,
        can_gc: CanGc,
    ) -> DomRoot<ReadableStream> {
        reflect_dom_object_with_proto(
            Box::new(ReadableStream::new_inherited()),
            global,
            proto,
            can_gc,
        )
    }

    /// Used as part of
    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
    pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
        *self.controller.borrow_mut() = Some(ControllerType::Default(MutNullableDom::new(Some(
            controller,
        ))));
    }

    /// Used as part of
    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
    pub(crate) fn set_byte_controller(&self, controller: &ReadableByteStreamController) {
        *self.controller.borrow_mut() =
            Some(ControllerType::Byte(MutNullableDom::new(Some(controller))));
    }

    /// Used as part of
    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
    pub(crate) fn assert_no_controller(&self) {
        let has_no_controller = self.controller.borrow().is_none();
        assert!(has_no_controller);
    }

    /// Build a stream backed by a Rust source that has already been read into memory.
    pub(crate) fn new_from_bytes(
        global: &GlobalScope,
        bytes: Vec<u8>,
        can_gc: CanGc,
    ) -> Fallible<DomRoot<ReadableStream>> {
        let stream = ReadableStream::new_with_external_underlying_source(
            global,
            UnderlyingSourceType::Memory(bytes.len()),
            can_gc,
        )?;
        stream.enqueue_native(bytes, can_gc);
        stream.controller_close_native(can_gc);
        Ok(stream)
    }

    /// Build a stream backed by a Rust underlying source.
    /// Note: external sources are always paired with a default controller.
    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
    pub(crate) fn new_with_external_underlying_source(
        global: &GlobalScope,
        source: UnderlyingSourceType,
        can_gc: CanGc,
    ) -> Fallible<DomRoot<ReadableStream>> {
        assert!(source.is_native());
        let stream = ReadableStream::new_with_proto(global, None, can_gc);
        let controller = ReadableStreamDefaultController::new(
            global,
            source,
            1.0,
            extract_size_algorithm(&QueuingStrategy::empty(), can_gc),
            can_gc,
        );
        controller.setup(stream.clone(), can_gc)?;
        Ok(stream)
    }

    /// Call into the release steps of the controller,
    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
        match self.controller.borrow().as_ref() {
            Some(ControllerType::Default(controller)) => {
                let controller = controller
                    .get()
                    .ok_or_else(|| Error::Type("Stream should have controller.".to_string()))?;
                controller.perform_release_steps()
            },
            Some(ControllerType::Byte(controller)) => {
                let controller = controller
                    .get()
                    .ok_or_else(|| Error::Type("Stream should have controller.".to_string()))?;
                controller.perform_release_steps()
            },
            None => Err(Error::Type("Stream should have controller.".to_string())),
        }
    }

    /// Call into the pull steps of the controller,
    /// as part of
    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
    pub(crate) fn perform_pull_steps(
        &self,
        cx: SafeJSContext,
        read_request: &ReadRequest,
        can_gc: CanGc,
    ) {
        match self.controller.borrow().as_ref() {
            Some(ControllerType::Default(controller)) => controller
                .get()
                .expect("Stream should have controller.")
                .perform_pull_steps(read_request, can_gc),
            Some(ControllerType::Byte(controller)) => controller
                .get()
                .expect("Stream should have controller.")
                .perform_pull_steps(cx, read_request, can_gc),
            None => {
                unreachable!("Stream does not have a controller.");
            },
        }
    }

    /// Call into the pull steps of the controller,
    /// as part of
    /// <https://streams.spec.whatwg.org/#readable-stream-byob-reader-read>
    pub(crate) fn perform_pull_into(
        &self,
        cx: SafeJSContext,
        read_into_request: &ReadIntoRequest,
        view: HeapBufferSource<ArrayBufferViewU8>,
        options: &ReadableStreamBYOBReaderReadOptions,
        can_gc: CanGc,
    ) {
        match self.controller.borrow().as_ref() {
            Some(ControllerType::Byte(controller)) => controller
                .get()
                .expect("Stream should have controller.")
                .perform_pull_into(cx, read_into_request, view, options, can_gc),
            _ => {
                unreachable!(
                    "Pulling a chunk from a stream with a default controller using a BYOB reader"
                )
            },
        }
    }

    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-request>
    pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
        match self.reader.borrow().as_ref() {
            Some(ReaderType::Default(reader)) => {
                let Some(reader) = reader.get() else {
                    panic!("Attempt to add a read request without having first acquired a reader.");
                };

                // Assert: stream.[[state]] is "readable".
                assert!(self.is_readable());

                // Append readRequest to stream.[[reader]].[[readRequests]].
                reader.add_read_request(read_request);
            },
            _ => {
                unreachable!("Adding a read request can only be done on a default reader.")
            },
        }
    }

    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-into-request>
    pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
        match self.reader.borrow().as_ref() {
            // Assert: stream.[[reader]] implements ReadableStreamBYOBReader.
            Some(ReaderType::BYOB(reader)) => {
                let Some(reader) = reader.get() else {
                    unreachable!(
                        "Attempt to add a read into request without having first acquired a reader."
                    );
                };

                // Assert: stream.[[state]] is "readable" or "closed".
                assert!(self.is_readable() || self.is_closed());

                // Append readRequest to stream.[[reader]].[[readIntoRequests]].
                reader.add_read_into_request(read_request);
            },
            _ => {
                unreachable!("Adding a read into request can only be done on a BYOB reader.")
            },
        }
    }

    /// Endpoint to enqueue chunks directly from Rust.
    /// Note: in other use cases this call happens via the controller.
    pub(crate) fn enqueue_native(&self, bytes: Vec<u8>, can_gc: CanGc) {
        match self.controller.borrow().as_ref() {
            Some(ControllerType::Default(controller)) => controller
                .get()
                .expect("Stream should have controller.")
                .enqueue_native(bytes, can_gc),
            _ => {
                unreachable!(
                    "Enqueueing chunk to a stream from Rust on other than default controller"
                );
            },
        }
    }

    /// <https://streams.spec.whatwg.org/#readable-stream-error>
    pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
        // Assert: stream.[[state]] is "readable".
        assert!(self.is_readable());

        // Set stream.[[state]] to "errored".
        self.state.set(ReadableStreamState::Errored);

        // Set stream.[[storedError]] to e.
        self.stored_error.set(e.get());

        // Let reader be stream.[[reader]].

        match self.reader.borrow().as_ref() {
            Some(ReaderType::Default(reader)) => {
                let Some(reader) = reader.get() else {
                    // If reader is undefined, return.
                    return;
                };

                // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
                reader.error(e, can_gc);
            },
            Some(ReaderType::BYOB(reader)) => {
                let Some(reader) = reader.get() else {
                    // If reader is undefined, return.
                    return;
                };

                // Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
                reader.error_read_into_requests(e, can_gc);
            },
            None => {
                // If reader is undefined, return.
            },
        }
    }

    /// <https://streams.spec.whatwg.org/#readablestream-storederror>
    pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
        handle_mut.set(self.stored_error.get());
    }

    /// <https://streams.spec.whatwg.org/#readable-stream-error>
    /// Note: in other use cases this call happens via the controller.
    pub(crate) fn error_native(&self, error: Error, can_gc: CanGc) {
        let cx = GlobalScope::get_cx();
        rooted!(in(*cx) let mut error_val = UndefinedValue());
        error.to_jsval(cx, &self.global(), error_val.handle_mut(), can_gc);
        self.error(error_val.handle(), can_gc);
    }

    /// Call into the controller's `Close` method.
    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
    pub(crate) fn controller_close_native(&self, can_gc: CanGc) {
        match self.controller.borrow().as_ref() {
            Some(ControllerType::Default(controller)) => {
                let _ = controller
                    .get()
                    .expect("Stream should have controller.")
                    .Close(can_gc);
            },
            _ => {
                unreachable!("Native closing is only done on default controllers.")
            },
        }
    }

    /// Returns a boolean reflecting whether the stream has all data in memory.
    /// Useful for native source integration only.
    pub(crate) fn in_memory(&self) -> bool {
        match self.controller.borrow().as_ref() {
            Some(ControllerType::Default(controller)) => controller
                .get()
                .expect("Stream should have controller.")
                .in_memory(),
            _ => {
                unreachable!(
                    "Checking if source is in memory for a stream with a non-default controller"
                )
            },
        }
    }

    /// Return bytes for synchronous use, if the stream has all data in memory.
    /// Useful for native source integration only.
    pub(crate) fn get_in_memory_bytes(&self) -> Option<IpcSharedMemory> {
        match self.controller.borrow().as_ref() {
            Some(ControllerType::Default(controller)) => controller
                .get()
                .expect("Stream should have controller.")
                .get_in_memory_bytes()
                .as_deref()
                .map(IpcSharedMemory::from_bytes),
            _ => {
                unreachable!("Getting in-memory bytes for a stream with a non-default controller")
            },
        }
    }

    /// Acquires a reader and locks the stream,
    /// must be done before `read_a_chunk`.
    /// Native call to
    /// <https://streams.spec.whatwg.org/#acquire-readable-stream-reader>
    pub(crate) fn acquire_default_reader(
        &self,
        can_gc: CanGc,
    ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
        // Let reader be a new ReadableStreamDefaultReader.
        let reader = ReadableStreamDefaultReader::new(&self.global(), can_gc);

        // Perform ? SetUpReadableStreamDefaultReader(reader, stream).
        reader.set_up(self, &self.global(), can_gc)?;

        // Return reader.
        Ok(reader)
    }

    /// <https://streams.spec.whatwg.org/#acquire-readable-stream-byob-reader>
    pub(crate) fn acquire_byob_reader(
        &self,
        can_gc: CanGc,
    ) -> Fallible<DomRoot<ReadableStreamBYOBReader>> {
        // Let reader be a new ReadableStreamBYOBReader.
        let reader = ReadableStreamBYOBReader::new(&self.global(), can_gc);
        // Perform ? SetUpReadableStreamBYOBReader(reader, stream).
        reader.set_up(self, &self.global(), can_gc)?;

        // Return reader.
        Ok(reader)
    }

    pub(crate) fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
        match self.controller.borrow().as_ref() {
            Some(ControllerType::Default(controller)) => {
                controller.get().expect("Stream should have controller.")
            },
            _ => {
                unreachable!(
                    "Getting default controller for a stream with a non-default controller"
                )
            },
        }
    }

    pub(crate) fn get_default_reader(&self) -> DomRoot<ReadableStreamDefaultReader> {
        match self.reader.borrow().as_ref() {
            Some(ReaderType::Default(reader)) => reader.get().expect("Stream should have reader."),
            _ => {
                unreachable!("Getting default reader for a stream with a non-default reader")
            },
        }
    }

    /// Read a chunk from the stream,
    /// must be called after `start_reading`,
    /// and before `stop_reading`.
    /// Native call to
    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
    pub(crate) fn read_a_chunk(&self, can_gc: CanGc) -> Rc<Promise> {
        match self.reader.borrow().as_ref() {
            Some(ReaderType::Default(reader)) => {
                let Some(reader) = reader.get() else {
                    unreachable!(
                        "Attempt to read stream chunk without having first acquired a reader."
                    );
                };
                reader.Read(can_gc)
            },
            _ => {
                unreachable!("Native reading of a chunk can only be done with a default reader.")
            },
        }
    }

    /// Releases the lock on the reader,
    /// must be done after `start_reading`.
    /// Native call to
    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease>
    pub(crate) fn stop_reading(&self, can_gc: CanGc) {
        let reader_ref = self.reader.borrow();

        match reader_ref.as_ref() {
            Some(ReaderType::Default(reader)) => {
                let Some(reader) = reader.get() else {
                    unreachable!("Attempt to stop reading without having first acquired a reader.");
                };

                drop(reader_ref);
                reader.release(can_gc).expect("Reader release cannot fail.");
            },
            _ => {
                unreachable!("Native stop reading can only be done with a default reader.")
            },
        }
    }

    /// <https://streams.spec.whatwg.org/#is-readable-stream-locked>
    pub(crate) fn is_locked(&self) -> bool {
        match self.reader.borrow().as_ref() {
            Some(ReaderType::Default(reader)) => reader.get().is_some(),
            Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
            None => false,
        }
    }

    pub(crate) fn is_disturbed(&self) -> bool {
        self.disturbed.get()
    }

    pub(crate) fn set_is_disturbed(&self, disturbed: bool) {
        self.disturbed.set(disturbed);
    }

    pub(crate) fn is_closed(&self) -> bool {
        self.state.get() == ReadableStreamState::Closed
    }

    pub(crate) fn is_errored(&self) -> bool {
        self.state.get() == ReadableStreamState::Errored
    }

    pub(crate) fn is_readable(&self) -> bool {
        self.state.get() == ReadableStreamState::Readable
    }

    pub(crate) fn has_default_reader(&self) -> bool {
        match self.reader.borrow().as_ref() {
            Some(ReaderType::Default(reader)) => reader.get().is_some(),
            _ => false,
        }
    }

    pub(crate) fn has_byob_reader(&self) -> bool {
        match self.reader.borrow().as_ref() {
            Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
            _ => false,
        }
    }

    pub(crate) fn has_byte_controller(&self) -> bool {
        match self.controller.borrow().as_ref() {
            Some(ControllerType::Byte(controller)) => controller.get().is_some(),
            _ => false,
        }
    }

    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests>
    pub(crate) fn get_num_read_requests(&self) -> usize {
        match self.reader.borrow().as_ref() {
            Some(ReaderType::Default(reader)) => {
                let reader = reader
                    .get()
                    .expect("Stream must have a reader when getting the number of read requests.");
                reader.get_num_read_requests()
            },
            _ => unreachable!(
                "Stream must have a default reader when get num read requests is called into."
            ),
        }
    }

    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-into-requests>
    pub(crate) fn get_num_read_into_requests(&self) -> usize {
        assert!(self.has_byob_reader());

        match self.reader.borrow().as_ref() {
            Some(ReaderType::BYOB(reader)) => {
                let Some(reader) = reader.get() else {
                    unreachable!(
                        "Stream must have a reader when get num read into requests is called into."
                    );
                };
                reader.get_num_read_into_requests()
            },
            _ => {
                unreachable!(
                    "Stream must have a BYOB reader when get num read into requests is called into."
                );
            },
        }
    }

    /// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request>
    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
    pub(crate) fn fulfill_read_request(&self, chunk: SafeHandleValue, done: bool, can_gc: CanGc) {
        // step 1 - Assert: ! ReadableStreamHasDefaultReader(stream) is true.
        assert!(self.has_default_reader());

        match self.reader.borrow().as_ref() {
            Some(ReaderType::Default(reader)) => {
                // step 2 - Let reader be stream.[[reader]].
                let reader = reader
                    .get()
                    .expect("Stream must have a reader when a read request is fulfilled.");
                // step 3 - Assert: reader.[[readRequests]] is not empty.
                assert_ne!(reader.get_num_read_requests(), 0);
                // step 4 & 5
                // Let readRequest be reader.[[readRequests]][0]. & Remove readRequest from reader.[[readRequests]].
                let request = reader.remove_read_request();

                if done {
                    // step 6 - If done is true, perform readRequest’s close steps.
                    request.close_steps(can_gc);
                } else {
                    // step 7 - Otherwise, perform readRequest’s chunk steps, given chunk.
                    let result = RootedTraceableBox::new(Heap::default());
                    result.set(*chunk);
                    request.chunk_steps(result, can_gc);
                }
            },
            _ => {
                unreachable!(
                    "Stream must have a default reader when fulfill read requests is called into."
                );
            },
        }
    }

    /// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-into-request>
    pub(crate) fn fulfill_read_into_request(
        &self,
        chunk: SafeHandleValue,
        done: bool,
        can_gc: CanGc,
    ) {
        // Assert: ! ReadableStreamHasBYOBReader(stream) is true.
        assert!(self.has_byob_reader());

        // Let reader be stream.[[reader]].
        match self.reader.borrow().as_ref() {
            Some(ReaderType::BYOB(reader)) => {
                let Some(reader) = reader.get() else {
                    unreachable!(
                        "Stream must have a reader when a read into request is fulfilled."
                    );
                };

                // Assert: reader.[[readIntoRequests]] is not empty.
                assert!(reader.get_num_read_into_requests() > 0);

                // Let readIntoRequest be reader.[[readIntoRequests]][0].
                // Remove readIntoRequest from reader.[[readIntoRequests]].
                let read_into_request = reader.remove_read_into_request();

                // If done is true, perform readIntoRequest’s close steps, given chunk.
                let result = RootedTraceableBox::new(Heap::default());
                if done {
                    result.set(*chunk);
                    read_into_request.close_steps(Some(result), can_gc);
                } else {
                    // Otherwise, perform readIntoRequest’s chunk steps, given chunk.
                    result.set(*chunk);
                    read_into_request.chunk_steps(result, can_gc);
                }
            },
            _ => {
                unreachable!(
                    "Stream must have a BYOB reader when fulfill read into requests is called into."
                );
            },
        };
    }

    /// <https://streams.spec.whatwg.org/#readable-stream-close>
    pub(crate) fn close(&self, can_gc: CanGc) {
        // Assert: stream.[[state]] is "readable".
        assert!(self.is_readable());
        // Set stream.[[state]] to "closed".
        self.state.set(ReadableStreamState::Closed);
        // Let reader be stream.[[reader]].
        match self.reader.borrow().as_ref() {
            Some(ReaderType::Default(reader)) => {
                let Some(reader) = reader.get() else {
                    // If reader is undefined, return.
                    return;
                };
                // step 5 & 6
                reader.close(can_gc);
            },
            Some(ReaderType::BYOB(reader)) => {
                let Some(reader) = reader.get() else {
                    // If reader is undefined, return.
                    return;
                };

                reader.close(can_gc)
            },
            None => {
                // If reader is undefined, return.
            },
        }
    }

    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
    #[allow(unsafe_code)]
    pub(crate) fn cancel(
        &self,
        cx: SafeJSContext,
        global: &GlobalScope,
        reason: SafeHandleValue,
        can_gc: CanGc,
    ) -> Rc<Promise> {
        // Set stream.[[disturbed]] to true.
        self.disturbed.set(true);

        // If stream.[[state]] is "closed", return a promise resolved with undefined.
        if self.is_closed() {
            return Promise::new_resolved(global, cx, (), can_gc);
        }
        // If stream.[[state]] is "errored", return a promise rejected with stream.[[storedError]].
        if self.is_errored() {
            let promise = Promise::new(global, can_gc);
            unsafe {
                rooted!(in(*cx) let mut rval = UndefinedValue());
                self.stored_error.to_jsval(*cx, rval.handle_mut());
                promise.reject_native(&rval.handle(), can_gc);
                return promise;
            }
        }
        // Perform ! ReadableStreamClose(stream).
        self.close(can_gc);

        // If reader is not undefined and reader implements ReadableStreamBYOBReader,
        if let Some(ReaderType::BYOB(reader)) = self.reader.borrow().as_ref() {
            if let Some(reader) = reader.get() {
                // step 6.1, 6.2 & 6.3 of https://streams.spec.whatwg.org/#readable-stream-cancel
                reader.cancel(can_gc);
            }
        }

        // Let sourceCancelPromise be ! stream.[[controller]].[[CancelSteps]](reason).

        let source_cancel_promise = match self.controller.borrow().as_ref() {
            Some(ControllerType::Default(controller)) => controller
                .get()
                .expect("Stream should have controller.")
                .perform_cancel_steps(cx, global, reason, can_gc),
            Some(ControllerType::Byte(controller)) => controller
                .get()
                .expect("Stream should have controller.")
                .perform_cancel_steps(cx, global, reason, can_gc),
            None => {
                panic!("Stream does not have a controller.");
            },
        };

        // Create a new promise,
        // and setup a handler in order to react to the fulfillment of sourceCancelPromise.
        let global = self.global();
        let result_promise = Promise::new(&global, can_gc);
        let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
            result: result_promise.clone(),
        });
        let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
            result: result_promise.clone(),
        });
        let handler = PromiseNativeHandler::new(
            &global,
            Some(fulfillment_handler),
            Some(rejection_handler),
            can_gc,
        );
        let realm = enter_realm(&*global);
        let comp = InRealm::Entered(&realm);
        source_cancel_promise.append_native_handler(&handler, comp, can_gc);

        // Return the result of reacting to sourceCancelPromise
        // with a fulfillment step that returns undefined.
        result_promise
    }

    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
    pub(crate) fn set_reader(&self, new_reader: Option<ReaderType>) {
        *self.reader.borrow_mut() = new_reader;
    }

    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
    fn default_tee(
        &self,
        clone_for_branch_2: bool,
        can_gc: CanGc,
    ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
        // Assert: stream implements ReadableStream.

        // Assert: cloneForBranch2 is a boolean.
        let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));

        // Let reader be ? AcquireReadableStreamDefaultReader(stream).
        let reader = self.acquire_default_reader(can_gc)?;
        self.set_reader(Some(ReaderType::Default(MutNullableDom::new(Some(
            &reader,
        )))));

        // Let reading be false.
        let reading = Rc::new(Cell::new(false));
        // Let readAgain be false.
        let read_again = Rc::new(Cell::new(false));
        // Let canceled1 be false.
        let canceled_1 = Rc::new(Cell::new(false));
        // Let canceled2 be false.
        let canceled_2 = Rc::new(Cell::new(false));

        // Let reason1 be undefined.
        let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
        // Let reason2 be undefined.
        let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
        // Let cancelPromise be a new promise.
        let cancel_promise = Promise::new(&self.global(), can_gc);

        let tee_source_1 = DefaultTeeUnderlyingSource::new(
            &reader,
            self,
            reading.clone(),
            read_again.clone(),
            canceled_1.clone(),
            canceled_2.clone(),
            clone_for_branch_2.clone(),
            reason_1.clone(),
            reason_2.clone(),
            cancel_promise.clone(),
            TeeCancelAlgorithm::Cancel1Algorithm,
            can_gc,
        );

        let underlying_source_type_branch_1 =
            UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_1));

        let tee_source_2 = DefaultTeeUnderlyingSource::new(
            &reader,
            self,
            reading,
            read_again,
            canceled_1.clone(),
            canceled_2.clone(),
            clone_for_branch_2,
            reason_1,
            reason_2,
            cancel_promise.clone(),
            TeeCancelAlgorithm::Cancel2Algorithm,
            can_gc,
        );

        let underlying_source_type_branch_2 =
            UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_2));

        // Set branch_1 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel1Algorithm).
        let branch_1 = create_readable_stream(
            &self.global(),
            underlying_source_type_branch_1,
            None,
            None,
            can_gc,
        );
        tee_source_1.set_branch_1(&branch_1);
        tee_source_2.set_branch_1(&branch_1);

        // Set branch_2 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel2Algorithm).
        let branch_2 = create_readable_stream(
            &self.global(),
            underlying_source_type_branch_2,
            None,
            None,
            can_gc,
        );
        tee_source_1.set_branch_2(&branch_2);
        tee_source_2.set_branch_2(&branch_2);

        // Upon rejection of reader.[[closedPromise]] with reason r,
        reader.append_native_handler_to_closed_promise(
            &branch_1,
            &branch_2,
            canceled_1,
            canceled_2,
            cancel_promise,
            can_gc,
        );

        // Return « branch_1, branch_2 ».
        Ok(vec![branch_1, branch_2])
    }

    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
    #[allow(clippy::too_many_arguments)]
    pub(crate) fn pipe_to(
        &self,
        cx: SafeJSContext,
        global: &GlobalScope,
        dest: &WritableStream,
        prevent_close: bool,
        prevent_abort: bool,
        prevent_cancel: bool,
        signal: Option<&AbortSignal>,
        realm: InRealm,
        can_gc: CanGc,
    ) -> Rc<Promise> {
        // Assert: source implements ReadableStream.
        // Assert: dest implements WritableStream.
        // Assert: prevent_close, prevent_abort, and prevent_cancel are all booleans.
        // Done with method signature types.

        // If signal was not given, let signal be undefined.
        // Assert: either signal is undefined, or signal implements AbortSignal.
        // Note: done with the `signal` argument.

        // Assert: ! IsReadableStreamLocked(source) is false.
        assert!(!self.is_locked());

        // Assert: ! IsWritableStreamLocked(dest) is false.
        assert!(!dest.is_locked());

        // If source.[[controller]] implements ReadableByteStreamController,
        // let reader be either ! AcquireReadableStreamBYOBReader(source)
        // or ! AcquireReadableStreamDefaultReader(source),
        // at the user agent’s discretion.
        // Note: for now only using default readers.

        // Otherwise, let reader be ! AcquireReadableStreamDefaultReader(source).
        let reader = self
            .acquire_default_reader(can_gc)
            .expect("Acquiring a default reader for pipe_to cannot fail");

        // Let writer be ! AcquireWritableStreamDefaultWriter(dest).
        let writer = dest
            .aquire_default_writer(cx, global, can_gc)
            .expect("Acquiring a default writer for pipe_to cannot fail");

        // Set source.[[disturbed]] to true.
        self.disturbed.set(true);

        // Let shuttingDown be false.
        // Done below with default.

        // Let promise be a new promise.
        let promise = Promise::new(global, can_gc);

        // In parallel, but not really, using reader and writer, read all chunks from source and write them to dest.
        rooted!(in(*cx) let pipe_to = PipeTo {
            reader: Dom::from_ref(&reader),
            writer: Dom::from_ref(&writer),
            pending_writes: Default::default(),
            state: Default::default(),
            prevent_abort,
            prevent_cancel,
            prevent_close,
            shutting_down: Default::default(),
            abort_reason: Default::default(),
            shutdown_error: Default::default(),
            shutdown_action_promise:  Default::default(),
            result_promise: promise.clone(),
        });

        // If signal is not undefined,
        // Note: moving the steps to here, so that the `PipeTo` is available.
        if let Some(signal) = signal {
            // Let abortAlgorithm be the following steps:
            // Note: steps are implemented at call site.
            rooted!(in(*cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));

            // If signal is aborted, perform abortAlgorithm and return promise.
            if signal.aborted() {
                signal.run_abort_algorithm(cx, global, &abort_algorithm, realm, can_gc);
                return promise;
            }

            // Add abortAlgorithm to signal.
            signal.add(&abort_algorithm);
        }

        // Note: perfom checks now, since streams can start as closed or errored.
        pipe_to.check_and_propagate_errors_forward(cx, global, realm, can_gc);
        pipe_to.check_and_propagate_errors_backward(cx, global, realm, can_gc);
        pipe_to.check_and_propagate_closing_forward(cx, global, realm, can_gc);
        pipe_to.check_and_propagate_closing_backward(cx, global, realm, can_gc);

        // If we are not closed or errored,
        if *pipe_to.state.borrow() == PipeToState::Starting {
            // Start the pipe, by waiting on the writer being ready for a chunk.
            pipe_to.wait_for_writer_ready(global, realm, can_gc);
        }

        // Return promise.
        promise
    }

    /// <https://streams.spec.whatwg.org/#readable-stream-tee>
    fn tee(
        &self,
        clone_for_branch_2: bool,
        can_gc: CanGc,
    ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
        // Assert: stream implements ReadableStream.
        // Assert: cloneForBranch2 is a boolean.

        match self.controller.borrow().as_ref() {
            Some(ControllerType::Default(_)) => {
                // Return ? ReadableStreamDefaultTee(stream, cloneForBranch2).
                self.default_tee(clone_for_branch_2, can_gc)
            },
            Some(ControllerType::Byte(_)) => {
                // If stream.[[controller]] implements ReadableByteStreamController,
                // return ? ReadableByteStreamTee(stream).
                Err(Error::Type(
                    "Teeing is not yet supported for byte streams".to_owned(),
                ))
            },
            None => {
                unreachable!("Stream should have a controller.");
            },
        }
    }

    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller-from-underlying-source>
    pub(crate) fn set_up_byte_controller(
        &self,
        global: &GlobalScope,
        underlying_source_dict: JsUnderlyingSource,
        underlying_source_handle: SafeHandleObject,
        stream: DomRoot<ReadableStream>,
        strategy_hwm: f64,
        can_gc: CanGc,
    ) -> Fallible<()> {
        // Let pullAlgorithm be an algorithm that returns a promise resolved with undefined.
        // Let cancelAlgorithm be an algorithm that returns a promise resolved with undefined.
        // If underlyingSourceDict["start"] exists, then set startAlgorithm to an algorithm which returns the result
        // of invoking underlyingSourceDict["start"] with argument list « controller »
        // and callback this value underlyingSource.
        // If underlyingSourceDict["pull"] exists, then set pullAlgorithm to an algorithm which returns the result
        // of invoking underlyingSourceDict["pull"] with argument list « controller »
        // and callback this value underlyingSource.
        // If underlyingSourceDict["cancel"] exists, then set cancelAlgorithm to an algorithm which takes an
        // argument reason and returns the result of invoking underlyingSourceDict["cancel"] with argument list
        // « reason » and callback this value underlyingSource.

        // Let autoAllocateChunkSize be underlyingSourceDict["autoAllocateChunkSize"],
        // if it exists, or undefined otherwise.
        // If autoAllocateChunkSize is 0, then throw a TypeError exception.
        if let Some(0) = underlying_source_dict.autoAllocateChunkSize {
            return Err(Error::Type("autoAllocateChunkSize cannot be 0".to_owned()));
        }

        let controller = ReadableByteStreamController::new(
            UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
            strategy_hwm,
            global,
            can_gc,
        );

        // Note: this must be done before `setup`,
        // otherwise `thisOb` is null in the start callback.
        controller.set_underlying_source_this_object(underlying_source_handle);

        // Perform ? SetUpReadableByteStreamController(stream, controller, startAlgorithm,
        // pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize).
        controller.setup(global, stream, can_gc)
    }

    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
    pub(crate) fn setup_cross_realm_transform_readable(
        &self,
        cx: SafeJSContext,
        port: &MessagePort,
        can_gc: CanGc,
    ) {
        let port_id = port.message_port_id();
        let global = self.global();

        // Perform ! InitializeReadableStream(stream).
        // Done in `new_inherited`.

        // Let sizeAlgorithm be an algorithm that returns 1.
        let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc);

        // Note: other algorithms defined in the underlying source container.

        // Let controller be a new ReadableStreamDefaultController.
        let controller = ReadableStreamDefaultController::new(
            &self.global(),
            UnderlyingSourceType::Transfer(Dom::from_ref(port)),
            0.,
            size_algorithm,
            can_gc,
        );

        // Add a handler for port’s message event with the following steps:
        // Add a handler for port’s messageerror event with the following steps:
        rooted!(in(*cx) let cross_realm_transform_readable = CrossRealmTransformReadable {
            controller: Dom::from_ref(&controller),
        });
        global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);

        // Enable port’s port message queue.
        port.Start(can_gc);

        // Perform ! SetUpReadableStreamDefaultController
        controller
            .setup(DomRoot::from_ref(self), can_gc)
            .expect("Setting up controller for transfer cannot fail.");
    }
}

impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
    /// <https://streams.spec.whatwg.org/#rs-constructor>
    fn Constructor(
        cx: SafeJSContext,
        global: &GlobalScope,
        proto: Option<SafeHandleObject>,
        can_gc: CanGc,
        underlying_source: Option<*mut JSObject>,
        strategy: &QueuingStrategy,
    ) -> Fallible<DomRoot<Self>> {
        // If underlyingSource is missing, set it to null.
        rooted!(in(*cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
        // Let underlyingSourceDict be underlyingSource,
        // converted to an IDL value of type UnderlyingSource.
        let underlying_source_dict = if !underlying_source_obj.is_null() {
            rooted!(in(*cx) let obj_val = ObjectValue(underlying_source_obj.get()));
            match JsUnderlyingSource::new(cx, obj_val.handle()) {
                Ok(ConversionResult::Success(val)) => val,
                Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
                _ => {
                    return Err(Error::JSFailed);
                },
            }
        } else {
            JsUnderlyingSource::empty()
        };

        // Perform ! InitializeReadableStream(this).
        let stream = ReadableStream::new_with_proto(global, proto, can_gc);

        if underlying_source_dict.type_.is_some() {
            // If strategy["size"] exists, throw a RangeError exception.
            if strategy.size.is_some() {
                return Err(Error::Range(
                    "size is not supported for byte streams".to_owned(),
                ));
            }

            // Let highWaterMark be ? ExtractHighWaterMark(strategy, 0).
            let strategy_hwm = extract_high_water_mark(strategy, 0.0)?;

            // Perform ? SetUpReadableByteStreamControllerFromUnderlyingSource(this,
            // underlyingSource, underlyingSourceDict, highWaterMark).
            stream.set_up_byte_controller(
                global,
                underlying_source_dict,
                underlying_source_obj.handle(),
                stream.clone(),
                strategy_hwm,
                can_gc,
            )?;
        } else {
            // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
            let high_water_mark = extract_high_water_mark(strategy, 1.0)?;

            // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
            let size_algorithm = extract_size_algorithm(strategy, can_gc);

            let controller = ReadableStreamDefaultController::new(
                global,
                UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
                high_water_mark,
                size_algorithm,
                can_gc,
            );

            // Note: this must be done before `setup`,
            // otherwise `thisOb` is null in the start callback.
            controller.set_underlying_source_this_object(underlying_source_obj.handle());

            // Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource
            controller.setup(stream.clone(), can_gc)?;
        };

        Ok(stream)
    }

    /// <https://streams.spec.whatwg.org/#rs-locked>
    fn Locked(&self) -> bool {
        self.is_locked()
    }

    /// <https://streams.spec.whatwg.org/#rs-cancel>
    fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
        let global = self.global();
        if self.is_locked() {
            // If ! IsReadableStreamLocked(this) is true,
            // return a promise rejected with a TypeError exception.
            let promise = Promise::new(&global, can_gc);
            promise.reject_error(Error::Type("stream is locked".to_owned()), can_gc);
            promise
        } else {
            // Return ! ReadableStreamCancel(this, reason).
            self.cancel(cx, &global, reason, can_gc)
        }
    }

    /// <https://streams.spec.whatwg.org/#rs-get-reader>
    fn GetReader(
        &self,
        options: &ReadableStreamGetReaderOptions,
        can_gc: CanGc,
    ) -> Fallible<ReadableStreamReader> {
        // 1, If options["mode"] does not exist, return ? AcquireReadableStreamDefaultReader(this).
        if options.mode.is_none() {
            return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
                self.acquire_default_reader(can_gc)?,
            ));
        }
        // 2. Assert: options["mode"] is "byob".
        assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);

        // 3. Return ? AcquireReadableStreamBYOBReader(this).
        Ok(ReadableStreamReader::ReadableStreamBYOBReader(
            self.acquire_byob_reader(can_gc)?,
        ))
    }

    /// <https://streams.spec.whatwg.org/#rs-tee>
    fn Tee(&self, can_gc: CanGc) -> Fallible<Vec<DomRoot<ReadableStream>>> {
        // Return ? ReadableStreamTee(this, false).
        self.tee(false, can_gc)
    }

    /// <https://streams.spec.whatwg.org/#rs-pipe-to>
    fn PipeTo(
        &self,
        destination: &WritableStream,
        options: &StreamPipeOptions,
        realm: InRealm,
        can_gc: CanGc,
    ) -> Rc<Promise> {
        let cx = GlobalScope::get_cx();
        let global = self.global();

        // If ! IsReadableStreamLocked(this) is true,
        if self.is_locked() {
            // return a promise rejected with a TypeError exception.
            let promise = Promise::new(&global, can_gc);
            promise.reject_error(Error::Type("Source stream is locked".to_owned()), can_gc);
            return promise;
        }

        // If ! IsWritableStreamLocked(destination) is true,
        if destination.is_locked() {
            // return a promise rejected with a TypeError exception.
            let promise = Promise::new(&global, can_gc);
            promise.reject_error(
                Error::Type("Destination stream is locked".to_owned()),
                can_gc,
            );
            return promise;
        }

        // Let signal be options["signal"] if it exists, or undefined otherwise.
        let signal = options.signal.as_deref();

        // Return ! ReadableStreamPipeTo.
        self.pipe_to(
            cx,
            &global,
            destination,
            options.preventClose,
            options.preventAbort,
            options.preventCancel,
            signal,
            realm,
            can_gc,
        )
    }

    /// <https://streams.spec.whatwg.org/#rs-pipe-through>
    fn PipeThrough(
        &self,
        transform: &ReadableWritablePair,
        options: &StreamPipeOptions,
        realm: InRealm,
        can_gc: CanGc,
    ) -> Fallible<DomRoot<ReadableStream>> {
        let global = self.global();
        let cx = GlobalScope::get_cx();

        // If ! IsReadableStreamLocked(this) is true, throw a TypeError exception.
        if self.is_locked() {
            return Err(Error::Type("Source stream is locked".to_owned()));
        }

        // If ! IsWritableStreamLocked(transform["writable"]) is true, throw a TypeError exception.
        if transform.writable.is_locked() {
            return Err(Error::Type("Destination stream is locked".to_owned()));
        }

        // Let signal be options["signal"] if it exists, or undefined otherwise.
        let signal = options.signal.as_deref();

        // Let promise be ! ReadableStreamPipeTo(this, transform["writable"],
        // options["preventClose"], options["preventAbort"], options["preventCancel"], signal).
        let promise = self.pipe_to(
            cx,
            &global,
            &transform.writable,
            options.preventClose,
            options.preventAbort,
            options.preventCancel,
            signal,
            realm,
            can_gc,
        );

        // Set promise.[[PromiseIsHandled]] to true.
        promise.set_promise_is_handled();

        // Return transform["readable"].
        Ok(transform.readable.clone())
    }
}

#[allow(unsafe_code)]
/// The initial steps for the message handler for both readable and writable cross realm transforms.
/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
pub(crate) unsafe fn get_type_and_value_from_message(
    cx: SafeJSContext,
    data: SafeHandleValue,
    value: SafeMutableHandleValue,
    can_gc: CanGc,
) -> DOMString {
    // Let data be the data of the message.
    // Note: we are passed the data as argument,
    // which originates in the return value of `structuredclone::read`.

    // Assert: data is an Object.
    assert!(data.is_object());
    rooted!(in(*cx) let data_object = data.to_object());

    // Let type be ! Get(data, "type").
    rooted!(in(*cx) let mut type_ = UndefinedValue());
    get_dictionary_property(
        *cx,
        data_object.handle(),
        "type",
        type_.handle_mut(),
        can_gc,
    )
    .expect("Getting the type should not fail.");

    // Let value be ! Get(data, "value").
    get_dictionary_property(*cx, data_object.handle(), "value", value, can_gc)
        .expect("Getting the value should not fail.");

    // Assert: type is a String.
    let result = unsafe {
        DOMString::from_jsval(*cx, type_.handle(), StringificationBehavior::Empty)
            .expect("The type of the message should be a string")
    };
    let ConversionResult::Success(type_string) = result else {
        unreachable!("The type of the message should be a string");
    };

    type_string
}

impl js::gc::Rootable for CrossRealmTransformReadable {}

/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
/// A wrapper to handle `message` and `messageerror` events
/// for the port used by the transfered stream.
#[derive(Clone, JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
pub(crate) struct CrossRealmTransformReadable {
    /// The controller used in the algorithm.
    controller: Dom<ReadableStreamDefaultController>,
}

impl CrossRealmTransformReadable {
    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
    /// Add a handler for port’s message event with the following steps:
    #[allow(unsafe_code)]
    pub(crate) fn handle_message(
        &self,
        cx: SafeJSContext,
        global: &GlobalScope,
        port: &MessagePort,
        message: SafeHandleValue,
        _realm: InRealm,
        can_gc: CanGc,
    ) {
        rooted!(in(*cx) let mut value = UndefinedValue());
        let type_string =
            unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) };

        // If type is "chunk",
        if type_string == "chunk" {
            // Perform ! ReadableStreamDefaultControllerEnqueue(controller, value).
            self.controller
                .enqueue(cx, value.handle(), can_gc)
                .expect("Enqueing a chunk should not fail.");
        }

        // Otherwise, if type is "close",
        if type_string == "close" {
            // Perform ! ReadableStreamDefaultControllerClose(controller).
            self.controller.close(can_gc);

            // Disentangle port.
            global.disentangle_port(port, can_gc);
        }

        // Otherwise, if type is "error",
        if type_string == "error" {
            // Perform ! ReadableStreamDefaultControllerError(controller, value).
            self.controller.error(value.handle(), can_gc);

            // Disentangle port.
            global.disentangle_port(port, can_gc);
        }
    }

    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
    /// Add a handler for port’s messageerror event with the following steps:
    #[allow(unsafe_code)]
    pub(crate) fn handle_error(
        &self,
        cx: SafeJSContext,
        global: &GlobalScope,
        port: &MessagePort,
        _realm: InRealm,
        can_gc: CanGc,
    ) {
        // Let error be a new "DataCloneError" DOMException.
        let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc);
        rooted!(in(*cx) let mut rooted_error = UndefinedValue());
        unsafe { error.to_jsval(*cx, rooted_error.handle_mut()) };

        // Perform ! CrossRealmTransformSendError(port, error).
        port.cross_realm_transform_send_error(rooted_error.handle(), can_gc);

        // Perform ! ReadableStreamDefaultControllerError(controller, error).
        self.controller.error(rooted_error.handle(), can_gc);

        // Disentangle port.
        global.disentangle_port(port, can_gc);
    }
}

#[allow(unsafe_code)]
/// Get the `done` property of an object that a read promise resolved to.
pub(crate) fn get_read_promise_done(
    cx: SafeJSContext,
    v: &SafeHandleValue,
    can_gc: CanGc,
) -> Result<bool, Error> {
    if !v.is_object() {
        return Err(Error::Type("Unknown format for done property.".to_string()));
    }
    unsafe {
        rooted!(in(*cx) let object = v.to_object());
        rooted!(in(*cx) let mut done = UndefinedValue());
        match get_dictionary_property(*cx, object.handle(), "done", done.handle_mut(), can_gc) {
            Ok(true) => match bool::from_jsval(*cx, done.handle(), ()) {
                Ok(ConversionResult::Success(val)) => Ok(val),
                Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
                _ => Err(Error::Type("Unknown format for done property.".to_string())),
            },
            Ok(false) => Err(Error::Type("Promise has no done property.".to_string())),
            Err(()) => Err(Error::JSFailed),
        }
    }
}

#[allow(unsafe_code)]
/// Get the `value` property of an object that a read promise resolved to.
pub(crate) fn get_read_promise_bytes(
    cx: SafeJSContext,
    v: &SafeHandleValue,
    can_gc: CanGc,
) -> Result<Vec<u8>, Error> {
    if !v.is_object() {
        return Err(Error::Type(
            "Unknown format for for bytes read.".to_string(),
        ));
    }
    unsafe {
        rooted!(in(*cx) let object = v.to_object());
        rooted!(in(*cx) let mut bytes = UndefinedValue());
        match get_dictionary_property(*cx, object.handle(), "value", bytes.handle_mut(), can_gc) {
            Ok(true) => {
                match Vec::<u8>::from_jsval(*cx, bytes.handle(), ConversionBehavior::EnforceRange) {
                    Ok(ConversionResult::Success(val)) => Ok(val),
                    Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
                    _ => Err(Error::Type("Unknown format for bytes read.".to_string())),
                }
            },
            Ok(false) => Err(Error::Type("Promise has no value property.".to_string())),
            Err(()) => Err(Error::JSFailed),
        }
    }
}

/// <https://streams.spec.whatwg.org/#rs-transfer>
impl Transferable for ReadableStream {
    type Index = MessagePortIndex;
    type Data = MessagePortImpl;

    /// <https://streams.spec.whatwg.org/#ref-for-readablestream%E2%91%A1%E2%91%A0>
    fn transfer(&self) -> Result<(MessagePortId, MessagePortImpl), ()> {
        // If ! IsReadableStreamLocked(value) is true, throw a "DataCloneError" DOMException.
        if self.is_locked() {
            return Err(());
        }

        let global = self.global();
        let realm = enter_realm(&*global);
        let comp = InRealm::Entered(&realm);
        let cx = GlobalScope::get_cx();
        let can_gc = CanGc::note();

        // Let port1 be a new MessagePort in the current Realm.
        let port_1 = MessagePort::new(&global, can_gc);
        global.track_message_port(&port_1, None);

        // Let port2 be a new MessagePort in the current Realm.
        let port_2 = MessagePort::new(&global, can_gc);
        global.track_message_port(&port_2, None);

        // Entangle port1 and port2.
        global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());

        // Let writable be a new WritableStream in the current Realm.
        let writable = WritableStream::new_with_proto(&global, None, can_gc);

        // Perform ! SetUpCrossRealmTransformWritable(writable, port1).
        writable.setup_cross_realm_transform_writable(cx, &port_1, can_gc);

        // Let promise be ! ReadableStreamPipeTo(value, writable, false, false, false).
        let promise = self.pipe_to(
            cx, &global, &writable, false, false, false, None, comp, can_gc,
        );

        // Set promise.[[PromiseIsHandled]] to true.
        promise.set_promise_is_handled();

        // Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
        port_2.transfer()
    }

    /// <https://streams.spec.whatwg.org/#ref-for-readablestream%E2%91%A1%E2%91%A0>
    fn transfer_receive(
        owner: &GlobalScope,
        id: MessagePortId,
        port_impl: MessagePortImpl,
    ) -> Result<DomRoot<Self>, ()> {
        let cx = GlobalScope::get_cx();
        let can_gc = CanGc::note();

        // Their transfer-receiving steps, given dataHolder and value, are:
        // Note: dataHolder is used in `structuredclone.rs`, and value is created here.
        let value = ReadableStream::new_with_proto(owner, None, can_gc);

        // Let deserializedRecord be ! StructuredDeserializeWithTransfer(dataHolder.[[port]], the current Realm).
        // Done with the `Deserialize` derive of `MessagePortImpl`.

        // Let port be deserializedRecord.[[Deserialized]].
        let transferred_port = MessagePort::transfer_receive(owner, id, port_impl)?;

        // Perform ! SetUpCrossRealmTransformReadable(value, port).
        value.setup_cross_realm_transform_readable(cx, &transferred_port, can_gc);
        Ok(value)
    }

    /// Note: we are relying on the port transfer, so the data returned here are related to the port.
    fn serialized_storage<'a>(
        data: StructuredData<'a, '_>,
    ) -> &'a mut Option<HashMap<MessagePortId, Self::Data>> {
        match data {
            StructuredData::Reader(r) => &mut r.port_impls,
            StructuredData::Writer(w) => &mut w.ports,
        }
    }
}