integrate readablestream with fetch and blob

This commit is contained in:
Gregory Terzian 2020-02-29 11:59:10 +08:00
parent 0281acea95
commit bd5796c90b
74 changed files with 2219 additions and 899 deletions

View file

@ -2,19 +2,34 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use crate::dom::bindings::cell::Ref;
use crate::dom::bindings::cell::DomRefCell;
use crate::dom::bindings::codegen::Bindings::BlobBinding::BlobBinding::BlobMethods;
use crate::dom::bindings::codegen::Bindings::FormDataBinding::FormDataMethods;
use crate::dom::bindings::codegen::Bindings::XMLHttpRequestBinding::BodyInit;
use crate::dom::bindings::error::{Error, Fallible};
use crate::dom::bindings::refcounted::Trusted;
use crate::dom::bindings::reflector::DomObject;
use crate::dom::bindings::root::DomRoot;
use crate::dom::bindings::str::USVString;
use crate::dom::bindings::settings_stack::AutoIncumbentScript;
use crate::dom::bindings::str::{DOMString, USVString};
use crate::dom::bindings::trace::RootedTraceableBox;
use crate::dom::blob::{normalize_type_string, Blob};
use crate::dom::formdata::FormData;
use crate::dom::globalscope::GlobalScope;
use crate::dom::htmlformelement::{encode_multipart_form_data, generate_boundary};
use crate::dom::promise::Promise;
use crate::realms::{AlreadyInRealm, InRealm};
use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
use crate::dom::readablestream::{get_read_promise_bytes, get_read_promise_done, ReadableStream};
use crate::dom::urlsearchparams::URLSearchParams;
use crate::realms::{enter_realm, AlreadyInRealm, InRealm};
use crate::script_runtime::JSContext;
use crate::task::TaskCanceller;
use crate::task_source::networking::NetworkingTaskSource;
use crate::task_source::TaskSource;
use crate::task_source::TaskSourceName;
use encoding_rs::UTF_8;
use ipc_channel::ipc::{self, IpcSender};
use ipc_channel::router::ROUTER;
use js::jsapi::Heap;
use js::jsapi::JSObject;
use js::jsapi::JS_ClearPendingException;
@ -23,14 +38,394 @@ use js::jsval::JSVal;
use js::jsval::UndefinedValue;
use js::rust::wrappers::JS_GetPendingException;
use js::rust::wrappers::JS_ParseJSON;
use js::rust::HandleValue;
use js::typedarray::{ArrayBuffer, CreateWith};
use mime::{self, Mime};
use net_traits::request::{BodyChunkRequest, BodySource, RequestBody};
use script_traits::serializable::BlobImpl;
use std::ptr;
use std::rc::Rc;
use std::str;
use url::form_urlencoded;
/// The IPC route handler
/// for <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
/// This route runs in the script process,
/// and will queue tasks to perform operations
/// on the stream and transmit body chunks over IPC.
struct TransmitBodyConnectHandler {
stream: Trusted<ReadableStream>,
task_source: NetworkingTaskSource,
canceller: TaskCanceller,
bytes_sender: Option<IpcSender<Vec<u8>>>,
control_sender: IpcSender<BodyChunkRequest>,
}
impl TransmitBodyConnectHandler {
pub fn new(
stream: Trusted<ReadableStream>,
task_source: NetworkingTaskSource,
canceller: TaskCanceller,
control_sender: IpcSender<BodyChunkRequest>,
) -> TransmitBodyConnectHandler {
TransmitBodyConnectHandler {
stream: stream,
task_source,
canceller,
bytes_sender: None,
control_sender,
}
}
/// Take the IPC sender sent by `net`, so we can send body chunks with it.
pub fn start_reading(&mut self, sender: IpcSender<Vec<u8>>) {
self.bytes_sender = Some(sender);
}
/// Drop the IPC sender sent by `net`
pub fn stop_reading(&mut self) {
// Note: this should close the corresponding receiver,
// and terminate the request stream in `net`.
self.bytes_sender = None;
}
/// The entry point to <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
pub fn transmit_body_chunk(&mut self) {
let stream = self.stream.clone();
let control_sender = self.control_sender.clone();
let bytes_sender = self
.bytes_sender
.clone()
.expect("No bytes sender to transmit chunk.");
let _ = self.task_source.queue_with_canceller(
task!(setup_native_body_promise_handler: move || {
// Step 1, Let body be requests body.
//
// TODO: We need the handle the body null case,
// here assuming body is something and we have the corresponding stream.
let rooted_stream = stream.root();
let global = rooted_stream.global();
// TODO: Step 2, If body is null,
// then queue a fetch task on request to process request end-of-body
// for request and abort these steps.
// TODO: queuing those "process request ..." tasks means we also need a handle on Request here.
// Step 3, get a reader for stream.
if rooted_stream.start_reading().is_err() {
// Note: this can happen if script starts consuming request body
// before fetch starts transmitting it.
// Not in the spec.
return;
}
// Step 4, the result of reading a chunk from bodys stream with reader.
let promise = rooted_stream.read_a_chunk();
// Step 5, the parallel steps waiting for and handling the result of the read promise,
// are a combination of the promise native handler here,
// and the corresponding IPC route in `component::net::http_loader`.
let promise_handler = Box::new(TransmitBodyPromiseHandler {
bytes_sender,
stream: rooted_stream.clone(),
control_sender,
});
let rejection_handler = Box::new(TransmitBodyPromiseRejectionHandler {stream: rooted_stream});
let handler = PromiseNativeHandler::new(&global, Some(promise_handler), Some(rejection_handler));
// Enter a realm, and a script,
// before appending the native handler.
let _realm = enter_realm(&*global);
let _ais = AutoIncumbentScript::new(&*global);
promise.append_native_handler(&handler);
}),
&self.canceller,
);
}
}
/// The handler of read promises of body streams used in
/// <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
#[derive(Clone, JSTraceable, MallocSizeOf)]
struct TransmitBodyPromiseHandler {
#[ignore_malloc_size_of = "Channels are hard"]
bytes_sender: IpcSender<Vec<u8>>,
stream: DomRoot<ReadableStream>,
#[ignore_malloc_size_of = "Channels are hard"]
control_sender: IpcSender<BodyChunkRequest>,
}
impl Callback for TransmitBodyPromiseHandler {
/// Step 5 of <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
fn callback(&self, cx: JSContext, v: HandleValue, _realm: InRealm) {
let is_done = match get_read_promise_done(cx.clone(), &v) {
Ok(is_done) => is_done,
Err(_) => {
// Step 5.5, the "otherwise" steps.
// TODO: terminate fetch.
let _ = self.control_sender.send(BodyChunkRequest::Done);
return self.stream.stop_reading();
},
};
if is_done {
// Step 5.3, the "done" steps.
// TODO: queue a fetch task on request to process request end-of-body.
let _ = self.control_sender.send(BodyChunkRequest::Done);
return self.stream.stop_reading();
}
let chunk = match get_read_promise_bytes(cx.clone(), &v) {
Ok(chunk) => chunk,
Err(_) => {
// Step 5.5, the "otherwise" steps.
// TODO: terminate fetch.
let _ = self.control_sender.send(BodyChunkRequest::Done);
return self.stream.stop_reading();
},
};
// Step 5.1 and 5.2, transmit chunk.
// Send the chunk to the body transmitter in net::http_loader::obtain_response.
// TODO: queue a fetch task on request to process request body for request.
let _ = self.bytes_sender.send(chunk);
}
}
/// The handler of read promises rejection of body streams used in
/// <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
#[derive(Clone, JSTraceable, MallocSizeOf)]
struct TransmitBodyPromiseRejectionHandler {
stream: DomRoot<ReadableStream>,
}
impl Callback for TransmitBodyPromiseRejectionHandler {
/// <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm) {
// Step 5.4, the "rejection" steps.
// TODO: terminate fetch.
return self.stream.stop_reading();
}
}
/// The result of https://fetch.spec.whatwg.org/#concept-bodyinit-extract
pub struct ExtractedBody {
pub stream: DomRoot<ReadableStream>,
pub source: BodySource,
pub total_bytes: Option<usize>,
pub content_type: Option<DOMString>,
}
impl ExtractedBody {
/// Build a request body from the extracted body,
/// to be sent over IPC to net to use with `concept-request-transmit-body`,
/// see https://fetch.spec.whatwg.org/#concept-request-transmit-body.
///
/// Also returning the corresponding readable stream,
/// to be stored on the request in script,
/// and potentially used as part of `consume_body`,
/// see https://fetch.spec.whatwg.org/#concept-body-consume-body
///
/// Transmitting a body over fetch, and consuming it in script,
/// are mutually exclusive operations, since each will lock the stream to a reader.
pub fn into_net_request_body(self) -> (RequestBody, DomRoot<ReadableStream>) {
let ExtractedBody {
stream,
total_bytes,
content_type: _,
source,
} = self;
// First, setup some infra to be used to transmit body
// from `components::script` to `components::net`.
let (chunk_request_sender, chunk_request_receiver) = ipc::channel().unwrap();
let trusted_stream = Trusted::new(&*stream);
let global = stream.global();
let task_source = global.networking_task_source();
let canceller = global.task_canceller(TaskSourceName::Networking);
let mut body_handler = TransmitBodyConnectHandler::new(
trusted_stream,
task_source,
canceller,
chunk_request_sender.clone(),
);
ROUTER.add_route(
chunk_request_receiver.to_opaque(),
Box::new(move |message| {
let request = message.to().unwrap();
match request {
BodyChunkRequest::Connect(sender) => {
body_handler.start_reading(sender);
},
BodyChunkRequest::Chunk => body_handler.transmit_body_chunk(),
// Note: this is actually sent from this process
// by the TransmitBodyPromiseHandler when reading stops.
BodyChunkRequest::Done => body_handler.stop_reading(),
}
}),
);
// Return `components::net` view into this request body,
// which can be used by `net` to transmit it over the network.
let request_body = RequestBody {
stream: Some(chunk_request_sender),
source,
total_bytes,
};
// Also return the stream for this body, which can be used by script to consume it.
(request_body, stream)
}
}
/// <https://fetch.spec.whatwg.org/#concept-bodyinit-extract>
pub trait Extractable {
fn extract(&self, global: &GlobalScope) -> Fallible<ExtractedBody>;
}
impl Extractable for BodyInit {
// https://fetch.spec.whatwg.org/#concept-bodyinit-extract
fn extract(&self, global: &GlobalScope) -> Fallible<ExtractedBody> {
match self {
BodyInit::String(ref s) => s.extract(global),
BodyInit::URLSearchParams(ref usp) => usp.extract(global),
BodyInit::Blob(ref b) => b.extract(global),
BodyInit::FormData(ref formdata) => formdata.extract(global),
BodyInit::ArrayBuffer(ref typedarray) => {
let bytes = typedarray.to_vec();
let total_bytes = bytes.len();
let stream = ReadableStream::new_from_bytes(&global, bytes);
Ok(ExtractedBody {
stream,
total_bytes: Some(total_bytes),
content_type: None,
source: BodySource::BufferSource,
})
},
BodyInit::ArrayBufferView(ref typedarray) => {
let bytes = typedarray.to_vec();
let total_bytes = bytes.len();
let stream = ReadableStream::new_from_bytes(&global, bytes);
Ok(ExtractedBody {
stream,
total_bytes: Some(total_bytes),
content_type: None,
source: BodySource::BufferSource,
})
},
BodyInit::ReadableStream(stream) => {
// TODO:
// 1. If the keepalive flag is set, then throw a TypeError.
if stream.is_locked() || stream.is_disturbed() {
return Err(Error::Type(
"The body's stream is disturbed or locked".to_string(),
));
}
Ok(ExtractedBody {
stream: stream.clone(),
total_bytes: None,
content_type: None,
source: BodySource::Null,
})
},
}
}
}
impl Extractable for Vec<u8> {
fn extract(&self, global: &GlobalScope) -> Fallible<ExtractedBody> {
let bytes = self.clone();
let total_bytes = self.len();
let stream = ReadableStream::new_from_bytes(&global, bytes);
Ok(ExtractedBody {
stream,
total_bytes: Some(total_bytes),
content_type: None,
// A vec is used only in `submit_entity_body`.
source: BodySource::FormData,
})
}
}
impl Extractable for Blob {
fn extract(&self, _global: &GlobalScope) -> Fallible<ExtractedBody> {
let blob_type = self.Type();
let content_type = if blob_type.as_ref().is_empty() {
None
} else {
Some(blob_type)
};
let total_bytes = self.Size() as usize;
Ok(ExtractedBody {
stream: self.get_stream(),
total_bytes: Some(total_bytes),
content_type,
source: BodySource::Blob,
})
}
}
impl Extractable for DOMString {
fn extract(&self, global: &GlobalScope) -> Fallible<ExtractedBody> {
let bytes = self.as_bytes().to_owned();
let total_bytes = bytes.len();
let content_type = Some(DOMString::from("text/plain;charset=UTF-8"));
let stream = ReadableStream::new_from_bytes(&global, bytes);
Ok(ExtractedBody {
stream,
total_bytes: Some(total_bytes),
content_type,
source: BodySource::USVString,
})
}
}
impl Extractable for FormData {
fn extract(&self, global: &GlobalScope) -> Fallible<ExtractedBody> {
let boundary = generate_boundary();
let bytes = encode_multipart_form_data(&mut self.datums(), boundary.clone(), UTF_8);
let total_bytes = bytes.len();
let content_type = Some(DOMString::from(format!(
"multipart/form-data;boundary={}",
boundary
)));
let stream = ReadableStream::new_from_bytes(&global, bytes);
Ok(ExtractedBody {
stream,
total_bytes: Some(total_bytes),
content_type,
source: BodySource::FormData,
})
}
}
impl Extractable for URLSearchParams {
fn extract(&self, global: &GlobalScope) -> Fallible<ExtractedBody> {
let bytes = self.serialize_utf8().into_bytes();
let total_bytes = bytes.len();
let content_type = Some(DOMString::from(
"application/x-www-form-urlencoded;charset=UTF-8",
));
let stream = ReadableStream::new_from_bytes(&global, bytes);
Ok(ExtractedBody {
stream,
total_bytes: Some(total_bytes),
content_type,
source: BodySource::URLSearchParams,
})
}
}
#[derive(Clone, Copy, JSTraceable, MallocSizeOf)]
pub enum BodyType {
Blob,
@ -49,73 +444,212 @@ pub enum FetchedData {
JSException(RootedTraceableBox<Heap<JSVal>>),
}
#[derive(Clone, JSTraceable, MallocSizeOf)]
struct ConsumeBodyPromiseRejectionHandler {
#[ignore_malloc_size_of = "Rc are hard"]
result_promise: Rc<Promise>,
}
impl Callback for ConsumeBodyPromiseRejectionHandler {
/// Continuing Step 4 of <https://fetch.spec.whatwg.org/#concept-body-consume-body>
/// Step 3 of <https://fetch.spec.whatwg.org/#concept-read-all-bytes-from-readablestream>,
// the rejection steps.
fn callback(&self, cx: JSContext, v: HandleValue, _realm: InRealm) {
self.result_promise.reject(cx, v);
}
}
#[derive(Clone, JSTraceable, MallocSizeOf)]
/// The promise handler used to consume the body,
/// <https://fetch.spec.whatwg.org/#concept-body-consume-body>
struct ConsumeBodyPromiseHandler {
#[ignore_malloc_size_of = "Rc are hard"]
result_promise: Rc<Promise>,
stream: Option<DomRoot<ReadableStream>>,
body_type: DomRefCell<Option<BodyType>>,
mime_type: DomRefCell<Option<Vec<u8>>>,
bytes: DomRefCell<Option<Vec<u8>>>,
}
impl ConsumeBodyPromiseHandler {
/// Step 5 of <https://fetch.spec.whatwg.org/#concept-body-consume-body>
fn resolve_result_promise(&self, cx: JSContext) {
let body_type = self.body_type.borrow_mut().take().unwrap();
let mime_type = self.mime_type.borrow_mut().take().unwrap();
let body = self.bytes.borrow_mut().take().unwrap();
let pkg_data_results = run_package_data_algorithm(cx, body, body_type, mime_type);
match pkg_data_results {
Ok(results) => {
match results {
FetchedData::Text(s) => self.result_promise.resolve_native(&USVString(s)),
FetchedData::Json(j) => self.result_promise.resolve_native(&j),
FetchedData::BlobData(b) => self.result_promise.resolve_native(&b),
FetchedData::FormData(f) => self.result_promise.resolve_native(&f),
FetchedData::ArrayBuffer(a) => self.result_promise.resolve_native(&a),
FetchedData::JSException(e) => self.result_promise.reject_native(&e.handle()),
};
},
Err(err) => self.result_promise.reject_error(err),
}
}
}
impl Callback for ConsumeBodyPromiseHandler {
/// Continuing Step 4 of <https://fetch.spec.whatwg.org/#concept-body-consume-body>
/// Step 3 of <https://fetch.spec.whatwg.org/#concept-read-all-bytes-from-readablestream>.
fn callback(&self, cx: JSContext, v: HandleValue, _realm: InRealm) {
let stream = self
.stream
.as_ref()
.expect("ConsumeBodyPromiseHandler has no stream in callback.");
let is_done = match get_read_promise_done(cx.clone(), &v) {
Ok(is_done) => is_done,
Err(err) => {
stream.stop_reading();
// When read is fulfilled with a value that doesn't matches with neither of the above patterns.
return self.result_promise.reject_error(err);
},
};
if is_done {
// When read is fulfilled with an object whose done property is true.
self.resolve_result_promise(cx.clone());
} else {
let chunk = match get_read_promise_bytes(cx.clone(), &v) {
Ok(chunk) => chunk,
Err(err) => {
stream.stop_reading();
// When read is fulfilled with a value that matches with neither of the above patterns
return self.result_promise.reject_error(err);
},
};
let mut bytes = self
.bytes
.borrow_mut()
.take()
.expect("No bytes for ConsumeBodyPromiseHandler.");
// Append the value property to bytes.
bytes.extend_from_slice(&*chunk);
let global = stream.global();
// Run the above step again.
let read_promise = stream.read_a_chunk();
let promise_handler = Box::new(ConsumeBodyPromiseHandler {
result_promise: self.result_promise.clone(),
stream: self.stream.clone(),
body_type: DomRefCell::new(self.body_type.borrow_mut().take()),
mime_type: DomRefCell::new(self.mime_type.borrow_mut().take()),
bytes: DomRefCell::new(Some(bytes)),
});
let rejection_handler = Box::new(ConsumeBodyPromiseRejectionHandler {
result_promise: self.result_promise.clone(),
});
let handler =
PromiseNativeHandler::new(&global, Some(promise_handler), Some(rejection_handler));
// Enter a realm, and a script,
// before appending the native handler.
let _realm = enter_realm(&*global);
let _ais = AutoIncumbentScript::new(&*global);
read_promise.append_native_handler(&handler);
}
}
}
// https://fetch.spec.whatwg.org/#concept-body-consume-body
#[allow(unrooted_must_root)]
pub fn consume_body<T: BodyOperations + DomObject>(object: &T, body_type: BodyType) -> Rc<Promise> {
let in_realm_proof = AlreadyInRealm::assert(&object.global());
pub fn consume_body<T: BodyMixin + DomObject>(object: &T, body_type: BodyType) -> Rc<Promise> {
let global = object.global();
let in_realm_proof = AlreadyInRealm::assert(&global);
let promise =
Promise::new_in_current_realm(&object.global(), InRealm::Already(&in_realm_proof));
// Step 1
if object.get_body_used() || object.is_locked() {
if object.is_disturbed() || object.is_locked() {
promise.reject_error(Error::Type(
"The response's stream is disturbed or locked".to_string(),
"The body's stream is disturbed or locked".to_string(),
));
return promise;
}
object.set_body_promise(&promise, body_type);
// Steps 2-4
// TODO: Body does not yet have a stream.
consume_body_with_promise(object, body_type, &promise);
consume_body_with_promise(object, body_type, promise.clone());
promise
}
// https://fetch.spec.whatwg.org/#concept-body-consume-body
#[allow(unrooted_must_root)]
pub fn consume_body_with_promise<T: BodyOperations + DomObject>(
fn consume_body_with_promise<T: BodyMixin + DomObject>(
object: &T,
body_type: BodyType,
promise: &Promise,
promise: Rc<Promise>,
) {
// Step 5
let body = match object.take_body() {
Some(body) => body,
None => return,
let global = object.global();
// Step 2.
let stream = match object.body() {
Some(stream) => stream,
None => {
let stream = ReadableStream::new_from_bytes(&global, Vec::with_capacity(0));
stream
},
};
let pkg_data_results =
run_package_data_algorithm(object, body, body_type, object.get_mime_type());
match pkg_data_results {
Ok(results) => {
match results {
FetchedData::Text(s) => promise.resolve_native(&USVString(s)),
FetchedData::Json(j) => promise.resolve_native(&j),
FetchedData::BlobData(b) => promise.resolve_native(&b),
FetchedData::FormData(f) => promise.resolve_native(&f),
FetchedData::ArrayBuffer(a) => promise.resolve_native(&a),
FetchedData::JSException(e) => promise.reject_native(&e.handle()),
};
},
Err(err) => promise.reject_error(err),
// Step 3.
if stream.start_reading().is_err() {
return promise.reject_error(Error::Type(
"The response's stream is disturbed or locked".to_string(),
));
}
// Step 4, read all the bytes.
// Starts here, continues in the promise handler.
// Step 1 of
// https://fetch.spec.whatwg.org/#concept-read-all-bytes-from-readablestream
let read_promise = stream.read_a_chunk();
let promise_handler = Box::new(ConsumeBodyPromiseHandler {
result_promise: promise.clone(),
stream: Some(stream),
body_type: DomRefCell::new(Some(body_type)),
mime_type: DomRefCell::new(Some(object.get_mime_type())),
// Step 2.
bytes: DomRefCell::new(Some(vec![])),
});
let rejection_handler = Box::new(ConsumeBodyPromiseRejectionHandler {
result_promise: promise,
});
let handler = PromiseNativeHandler::new(
&object.global(),
Some(promise_handler),
Some(rejection_handler),
);
// We are already in a realm and a script.
read_promise.append_native_handler(&handler);
}
// https://fetch.spec.whatwg.org/#concept-body-package-data
#[allow(unsafe_code)]
fn run_package_data_algorithm<T: BodyOperations + DomObject>(
object: &T,
fn run_package_data_algorithm(
cx: JSContext,
bytes: Vec<u8>,
body_type: BodyType,
mime_type: Ref<Vec<u8>>,
mime_type: Vec<u8>,
) -> Fallible<FetchedData> {
let global = object.global();
let cx = global.get_cx();
let mime = &*mime_type;
let in_realm_proof = AlreadyInRealm::assert_for_cx(cx);
let global = GlobalScope::from_safe_context(cx, InRealm::Already(&in_realm_proof));
match body_type {
BodyType::Text => run_text_data_algorithm(bytes),
BodyType::Json => run_json_data_algorithm(cx, bytes),
@ -218,12 +752,14 @@ pub fn run_array_buffer_data_algorithm(cx: JSContext, bytes: Vec<u8>) -> Fallibl
Ok(FetchedData::ArrayBuffer(rooted_heap))
}
pub trait BodyOperations {
fn get_body_used(&self) -> bool;
fn set_body_promise(&self, p: &Rc<Promise>, body_type: BodyType);
/// Returns `Some(_)` if the body is complete, `None` if there is more to
/// come.
fn take_body(&self) -> Option<Vec<u8>>;
/// <https://fetch.spec.whatwg.org/#body>
pub trait BodyMixin {
/// <https://fetch.spec.whatwg.org/#concept-body-disturbed>
fn is_disturbed(&self) -> bool;
/// <https://fetch.spec.whatwg.org/#dom-body-body>
fn body(&self) -> Option<DomRoot<ReadableStream>>;
/// <https://fetch.spec.whatwg.org/#concept-body-locked>
fn is_locked(&self) -> bool;
fn get_mime_type(&self) -> Ref<Vec<u8>>;
/// <https://fetch.spec.whatwg.org/#concept-body-mime-type>
fn get_mime_type(&self) -> Vec<u8>;
}