partially integrate streaming request bodies with http re-direct

This commit is contained in:
Gregory Terzian 2020-05-31 12:00:55 +08:00
parent ad4dea7d84
commit c1b76533fa
6 changed files with 132 additions and 44 deletions

View file

@ -973,8 +973,7 @@ fn http_network_or_cache_fetch(
let http_request = if request_has_no_window && request.redirect_mode == RedirectMode::Error {
request
} else {
// Step 5.2
// TODO Implement body source
// Step 5.2.1, .2.2 and .2.3 and 2.4
http_request = request.clone();
&mut http_request
};

View file

@ -113,11 +113,7 @@ fn create_request_body_with_content(content: Vec<u8>) -> RequestBody {
}),
);
RequestBody {
stream: Some(chunk_request_sender),
source: BodySource::USVString,
total_bytes: Some(content_len),
}
RequestBody::new(chunk_request_sender, BodySource::Object, Some(content_len))
}
#[test]

View file

@ -8,7 +8,7 @@ use crate::ResourceTimingType;
use content_security_policy::{self as csp, CspList};
use http::HeaderMap;
use hyper::Method;
use ipc_channel::ipc::IpcSender;
use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
use mime::Mime;
use msg::constellation_msg::PipelineId;
use servo_url::{ImmutableOrigin, ServoUrl};
@ -120,18 +120,16 @@ pub enum ParserMetadata {
#[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)]
pub enum BodySource {
Null,
Blob,
BufferSource,
FormData,
URLSearchParams,
USVString,
Object,
}
/// Messages used to implement <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize)]
pub enum BodyChunkRequest {
/// Connect a fetch in `net`, with a stream of bytes from `script`.
Connect(IpcSender<Vec<u8>>),
/// Re-extract a new stream from the source, following a redirect.
Extract(IpcReceiver<BodyChunkRequest>),
/// Ask for another chunk.
Chunk,
/// Signal the stream is done.
@ -141,18 +139,47 @@ pub enum BodyChunkRequest {
/// The net component's view into <https://fetch.spec.whatwg.org/#bodies>
#[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)]
pub struct RequestBody {
/// Net's view into a <https://fetch.spec.whatwg.org/#concept-body-stream>
/// Net's channel to communicate with script re this body.
#[ignore_malloc_size_of = "Channels are hard"]
pub stream: Option<IpcSender<BodyChunkRequest>>,
chan: IpcSender<BodyChunkRequest>,
/// Has the stream been read from already?
read_from: bool,
/// <https://fetch.spec.whatwg.org/#concept-body-source>
pub source: BodySource,
source: BodySource,
/// <https://fetch.spec.whatwg.org/#concept-body-total-bytes>
pub total_bytes: Option<usize>,
total_bytes: Option<usize>,
}
impl RequestBody {
pub fn new(
chan: IpcSender<BodyChunkRequest>,
source: BodySource,
total_bytes: Option<usize>,
) -> Self {
RequestBody {
chan,
source,
total_bytes,
read_from: false,
}
}
pub fn take_stream(&mut self) -> Option<IpcSender<BodyChunkRequest>> {
self.stream.take()
if self.read_from {
match self.source {
BodySource::Null => panic!(
"Null sources should never be read more than once(no re-direct allowed)."
),
BodySource::Object => {
let (chan, port) = ipc::channel().unwrap();
let _ = self.chan.send(BodyChunkRequest::Extract(port));
self.chan = chan.clone();
return Some(chan);
},
}
}
self.read_from = true;
Some(self.chan.clone())
}
pub fn source_is_null(&self) -> bool {

View file

@ -27,7 +27,7 @@ use crate::task_source::networking::NetworkingTaskSource;
use crate::task_source::TaskSource;
use crate::task_source::TaskSourceName;
use encoding_rs::UTF_8;
use ipc_channel::ipc::{self, IpcSender};
use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
use ipc_channel::router::ROUTER;
use js::jsapi::Heap;
use js::jsapi::JSObject;
@ -40,18 +40,31 @@ use js::rust::wrappers::JS_ParseJSON;
use js::rust::HandleValue;
use js::typedarray::{ArrayBuffer, CreateWith};
use mime::{self, Mime};
use net_traits::request::{BodyChunkRequest, BodySource, RequestBody};
use net_traits::request::{BodyChunkRequest, BodySource as NetBodySource, RequestBody};
use script_traits::serializable::BlobImpl;
use std::ptr;
use std::rc::Rc;
use std::str;
use url::form_urlencoded;
/// The Dom object, or ReadableStream, that is the source of a body.
/// <https://fetch.spec.whatwg.org/#concept-body-source>
#[derive(Clone)]
pub enum BodySource {
/// A ReadableStream comes with a null-source.
Null,
/// Another Dom object as source,
/// TODO: store the actual object
/// and re-exctact a stream on re-direct.
Object,
}
/// The IPC route handler
/// for <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
/// This route runs in the script process,
/// and will queue tasks to perform operations
/// on the stream and transmit body chunks over IPC.
#[derive(Clone)]
struct TransmitBodyConnectHandler {
stream: Trusted<ReadableStream>,
task_source: NetworkingTaskSource,
@ -59,6 +72,7 @@ struct TransmitBodyConnectHandler {
bytes_sender: Option<IpcSender<Vec<u8>>>,
control_sender: IpcSender<BodyChunkRequest>,
in_memory: Option<Vec<u8>>,
source: BodySource,
}
impl TransmitBodyConnectHandler {
@ -68,6 +82,7 @@ impl TransmitBodyConnectHandler {
canceller: TaskCanceller,
control_sender: IpcSender<BodyChunkRequest>,
in_memory: Option<Vec<u8>>,
source: BodySource,
) -> TransmitBodyConnectHandler {
TransmitBodyConnectHandler {
stream: stream,
@ -76,23 +91,70 @@ impl TransmitBodyConnectHandler {
bytes_sender: None,
control_sender,
in_memory,
source,
}
}
/// Re-extract the source to support streaming it again for a re-direct.
/// TODO: actually re-extract the source, instead of just cloning data, to support Blob.
fn re_extract(&self, chunk_request_receiver: IpcReceiver<BodyChunkRequest>) {
let mut body_handler = self.clone();
ROUTER.add_route(
chunk_request_receiver.to_opaque(),
Box::new(move |message| {
let request = message.to().unwrap();
match request {
BodyChunkRequest::Connect(sender) => {
body_handler.start_reading(sender);
},
BodyChunkRequest::Extract(receiver) => {
body_handler.re_extract(receiver);
},
BodyChunkRequest::Chunk => body_handler.transmit_source(),
// Note: this is actually sent from this process
// by the TransmitBodyPromiseHandler when reading stops.
BodyChunkRequest::Done => body_handler.stop_reading(),
}
}),
);
}
/// In case of re-direct, and of a source available in memory,
/// send it all in one chunk.
///
/// TODO: this method should be deprecated
/// in favor of making `re_extract` actually re-extract a stream from the source.
/// See #26686
fn transmit_source(&self) {
if let BodySource::Null = self.source {
panic!("ReadableStream(Null) sources should not re-direct.");
}
if let Some(bytes) = self.in_memory.clone() {
let _ = self
.bytes_sender
.as_ref()
.expect("No bytes sender to transmit source.")
.send(bytes.clone());
return;
}
warn!("Re-directs for file-based Blobs not supported yet.");
}
/// Take the IPC sender sent by `net`, so we can send body chunks with it.
pub fn start_reading(&mut self, sender: IpcSender<Vec<u8>>) {
fn start_reading(&mut self, sender: IpcSender<Vec<u8>>) {
self.bytes_sender = Some(sender);
}
/// Drop the IPC sender sent by `net`
pub fn stop_reading(&mut self) {
fn stop_reading(&mut self) {
// Note: this should close the corresponding receiver,
// and terminate the request stream in `net`.
self.bytes_sender = None;
}
/// The entry point to <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
pub fn transmit_body_chunk(&mut self) {
fn transmit_body_chunk(&mut self) {
let stream = self.stream.clone();
let control_sender = self.control_sender.clone();
let bytes_sender = self
@ -101,7 +163,7 @@ impl TransmitBodyConnectHandler {
.expect("No bytes sender to transmit chunk.");
// In case of the data being in-memory, send everything in one chunk, by-passing SpiderMonkey.
if let Some(bytes) = self.in_memory.take() {
if let Some(bytes) = self.in_memory.clone() {
let _ = bytes_sender.send(bytes);
return;
}
@ -259,12 +321,18 @@ impl ExtractedBody {
// In case of the data being in-memory, send everything in one chunk, by-passing SM.
let in_memory = stream.get_in_memory_bytes();
let net_source = match source {
BodySource::Null => NetBodySource::Null,
_ => NetBodySource::Object,
};
let mut body_handler = TransmitBodyConnectHandler::new(
trusted_stream,
task_source,
canceller,
chunk_request_sender.clone(),
in_memory,
source,
);
ROUTER.add_route(
@ -275,6 +343,9 @@ impl ExtractedBody {
BodyChunkRequest::Connect(sender) => {
body_handler.start_reading(sender);
},
BodyChunkRequest::Extract(receiver) => {
body_handler.re_extract(receiver);
},
BodyChunkRequest::Chunk => body_handler.transmit_body_chunk(),
// Note: this is actually sent from this process
// by the TransmitBodyPromiseHandler when reading stops.
@ -285,11 +356,7 @@ impl ExtractedBody {
// Return `components::net` view into this request body,
// which can be used by `net` to transmit it over the network.
let request_body = RequestBody {
stream: Some(chunk_request_sender),
source,
total_bytes,
};
let request_body = RequestBody::new(chunk_request_sender, net_source, total_bytes);
// Also return the stream for this body, which can be used by script to consume it.
(request_body, stream)
@ -322,7 +389,7 @@ impl Extractable for BodyInit {
stream,
total_bytes: Some(total_bytes),
content_type: None,
source: BodySource::BufferSource,
source: BodySource::Object,
})
},
BodyInit::ArrayBufferView(ref typedarray) => {
@ -333,7 +400,7 @@ impl Extractable for BodyInit {
stream,
total_bytes: Some(total_bytes),
content_type: None,
source: BodySource::BufferSource,
source: BodySource::Object,
})
},
BodyInit::ReadableStream(stream) => {
@ -367,7 +434,7 @@ impl Extractable for Vec<u8> {
total_bytes: Some(total_bytes),
content_type: None,
// A vec is used only in `submit_entity_body`.
source: BodySource::FormData,
source: BodySource::Object,
})
}
}
@ -385,7 +452,7 @@ impl Extractable for Blob {
stream: self.get_stream(),
total_bytes: Some(total_bytes),
content_type,
source: BodySource::Blob,
source: BodySource::Object,
})
}
}
@ -400,7 +467,7 @@ impl Extractable for DOMString {
stream,
total_bytes: Some(total_bytes),
content_type,
source: BodySource::USVString,
source: BodySource::Object,
})
}
}
@ -419,7 +486,7 @@ impl Extractable for FormData {
stream,
total_bytes: Some(total_bytes),
content_type,
source: BodySource::FormData,
source: BodySource::Object,
})
}
}
@ -436,7 +503,7 @@ impl Extractable for URLSearchParams {
stream,
total_bytes: Some(total_bytes),
content_type,
source: BodySource::URLSearchParams,
source: BodySource::Object,
})
}
}

View file

@ -2,7 +2,7 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use crate::body::{Extractable, ExtractedBody};
use crate::body::{BodySource, Extractable, ExtractedBody};
use crate::document_loader::DocumentLoader;
use crate::dom::bindings::cell::DomRefCell;
use crate::dom::bindings::codegen::Bindings::WindowBinding::WindowMethods;
@ -55,9 +55,7 @@ use js::jsval::{JSVal, NullValue, UndefinedValue};
use js::rust::wrappers::JS_ParseJSON;
use js::typedarray::{ArrayBuffer, CreateWith};
use mime::{self, Mime, Name};
use net_traits::request::{
BodySource, CredentialsMode, Destination, Referrer, RequestBuilder, RequestMode,
};
use net_traits::request::{CredentialsMode, Destination, Referrer, RequestBuilder, RequestMode};
use net_traits::trim_http_whitespace;
use net_traits::CoreResourceMsg::Fetch;
use net_traits::{FetchChannels, FetchMetadata, FilteredMetadata};
@ -576,7 +574,7 @@ impl XMLHttpRequestMethods for XMLHttpRequest {
stream,
total_bytes: Some(total_bytes),
content_type: Some(DOMString::from(content_type)),
source: BodySource::Null,
source: BodySource::Object,
})
},
Some(DocumentOrBodyInit::Blob(ref b)) => {
@ -610,7 +608,7 @@ impl XMLHttpRequestMethods for XMLHttpRequest {
stream,
total_bytes: Some(total_bytes),
content_type: None,
source: BodySource::BufferSource,
source: BodySource::Object,
})
},
Some(DocumentOrBodyInit::ArrayBufferView(ref typedarray)) => {
@ -622,7 +620,7 @@ impl XMLHttpRequestMethods for XMLHttpRequest {
stream,
total_bytes: Some(total_bytes),
content_type: None,
source: BodySource::BufferSource,
source: BodySource::Object,
})
},
Some(DocumentOrBodyInit::ReadableStream(ref stream)) => {

View file

@ -67,6 +67,7 @@ impl fmt::Debug for dyn TaskBox {
}
/// Encapsulated state required to create cancellable tasks from non-script threads.
#[derive(Clone)]
pub struct TaskCanceller {
pub cancelled: Option<Arc<AtomicBool>>,
}