Auto merge of #26906 - gterzian:fix_streams_and_re-direct, r=Manishearth

Fix streaming request bodies

<!-- Please describe your changes on the following line: -->

FIX #26904

At this point I'm not sure if the hyper `Body::wrap_stream` integration by way of the IPC route is broken, or if perhaps there is a problem with the WPT server not handling the requests properly, as I am getting a bunch of:

```
 0:04.54 INFO STDERR: 127.0.0.1 - - [13/Jun/2020 18:22:08] code 400, message Bad request syntax ('4')
 0:04.54 INFO STDERR: 127.0.0.1 - - [13/Jun/2020 18:22:08] "4" 400 -
```

In any case, the `start_reading` call in `body.rs` had to be put into the other function, since it should not be called for each chunk(this was actually failing before).

I've also added the chunked header in case the source is a stream.

Last thing to figure out, why is the request data not actually reaching the server, or not being handled properly by it.

---
<!-- Thank you for contributing to Servo! Please replace each `[ ]` by `[X]` when the step is complete, and replace `___` with appropriate data: -->
- [ ] `./mach build -d` does not report any errors
- [ ] `./mach test-tidy` does not report any errors
- [ ] These changes fix #___ (GitHub issue number if applicable)

<!-- Either: -->
- [ ] There are tests for these changes OR
- [ ] These changes do not require tests because ___

<!-- Also, please make sure that "Allow edits from maintainers" checkbox is checked, so that we can help you if you get stuck somewhere along the way.-->

<!-- Pull requests that do not address these steps are welcome, but they will require additional verification as part of the review process. -->
This commit is contained in:
bors-servo 2020-06-16 13:01:20 -04:00 committed by GitHub
commit 16a99daa22
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 282 additions and 109 deletions

View file

@ -23,7 +23,8 @@ use net_traits::request::{
is_cors_safelisted_method, is_cors_safelisted_request_header, Origin, ResponseTainting, Window, is_cors_safelisted_method, is_cors_safelisted_request_header, Origin, ResponseTainting, Window,
}; };
use net_traits::request::{ use net_traits::request::{
BodyChunkRequest, CredentialsMode, Destination, Referrer, Request, RequestMode, BodyChunkRequest, BodyChunkResponse, CredentialsMode, Destination, Referrer, Request,
RequestMode,
}; };
use net_traits::response::{Response, ResponseBody, ResponseType}; use net_traits::response::{Response, ResponseBody, ResponseType};
use net_traits::{FetchTaskTarget, NetworkError, ReferrerPolicy, ResourceFetchTiming}; use net_traits::{FetchTaskTarget, NetworkError, ReferrerPolicy, ResourceFetchTiming};
@ -641,7 +642,10 @@ fn scheme_fetch(
let (body_chan, body_port) = ipc::channel().unwrap(); let (body_chan, body_port) = ipc::channel().unwrap();
let _ = stream.send(BodyChunkRequest::Connect(body_chan)); let _ = stream.send(BodyChunkRequest::Connect(body_chan));
let _ = stream.send(BodyChunkRequest::Chunk); let _ = stream.send(BodyChunkRequest::Chunk);
body_port.recv().ok() match body_port.recv().ok() {
Some(BodyChunkResponse::Chunk(bytes)) => Some(bytes),
_ => panic!("cert should be sent in a single chunk."),
}
}); });
let data = data.as_ref().and_then(|b| { let data = data.as_ref().and_then(|b| {
let idx = b.iter().position(|b| *b == b'&')?; let idx = b.iter().position(|b| *b == b'&')?;

View file

@ -11,7 +11,7 @@ use crate::fetch::methods::{main_fetch, Data, DoneChannel, FetchContext, Target}
use crate::hsts::HstsList; use crate::hsts::HstsList;
use crate::http_cache::{CacheKey, HttpCache}; use crate::http_cache::{CacheKey, HttpCache};
use crate::resource_thread::AuthCache; use crate::resource_thread::AuthCache;
use crossbeam_channel::{unbounded, Sender}; use crossbeam_channel::{unbounded, Receiver, Sender};
use devtools_traits::{ use devtools_traits::{
ChromeToDevtoolsControlMsg, DevtoolsControlMsg, HttpRequest as DevtoolsHttpRequest, ChromeToDevtoolsControlMsg, DevtoolsControlMsg, HttpRequest as DevtoolsHttpRequest,
}; };
@ -30,6 +30,7 @@ use http::header::{
CONTENT_TYPE, CONTENT_TYPE,
}; };
use http::{HeaderMap, Request as HyperRequest}; use http::{HeaderMap, Request as HyperRequest};
use hyper::header::TRANSFER_ENCODING;
use hyper::{Body, Client, Method, Response as HyperResponse, StatusCode}; use hyper::{Body, Client, Method, Response as HyperResponse, StatusCode};
use hyper_serde::Serde; use hyper_serde::Serde;
use ipc_channel::ipc::{self, IpcSender}; use ipc_channel::ipc::{self, IpcSender};
@ -40,7 +41,8 @@ use net_traits::quality::{quality_to_value, Quality, QualityItem};
use net_traits::request::Origin::Origin as SpecificOrigin; use net_traits::request::Origin::Origin as SpecificOrigin;
use net_traits::request::{is_cors_safelisted_method, is_cors_safelisted_request_header}; use net_traits::request::{is_cors_safelisted_method, is_cors_safelisted_request_header};
use net_traits::request::{ use net_traits::request::{
BodyChunkRequest, RedirectMode, Referrer, Request, RequestBuilder, RequestMode, BodyChunkRequest, BodyChunkResponse, RedirectMode, Referrer, Request, RequestBuilder,
RequestMode,
}; };
use net_traits::request::{CacheMode, CredentialsMode, Destination, Origin}; use net_traits::request::{CacheMode, CredentialsMode, Destination, Origin};
use net_traits::request::{ResponseTainting, ServiceWorkersMode}; use net_traits::request::{ResponseTainting, ServiceWorkersMode};
@ -61,7 +63,7 @@ use std::time::{Duration, SystemTime};
use time::{self, Tm}; use time::{self, Tm};
use tokio::prelude::{future, Future, Sink, Stream}; use tokio::prelude::{future, Future, Sink, Stream};
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use tokio::sync::mpsc::channel; use tokio::sync::mpsc::{channel, Receiver as TokioReceiver, Sender as TokioSender};
lazy_static! { lazy_static! {
pub static ref HANDLE: Mutex<Option<Runtime>> = Mutex::new(Some(Runtime::new().unwrap())); pub static ref HANDLE: Mutex<Option<Runtime>> = Mutex::new(Some(Runtime::new().unwrap()));
@ -405,16 +407,89 @@ fn auth_from_cache(
} }
} }
/// Messages from the IPC route to the fetch worker,
/// used to fill the body with bytes coming-in over IPC.
enum BodyChunk {
/// A chunk of bytes.
Chunk(Vec<u8>),
/// Body is done.
Done,
}
/// The stream side of the body passed to hyper.
enum BodyStream {
/// A receiver that can be used in Body::wrap_stream,
/// for streaming the request over the network.
Chunked(TokioReceiver<Vec<u8>>),
/// A body whose bytes are buffered
/// and sent in one chunk over the network.
Buffered(Receiver<BodyChunk>),
}
/// The sink side of the body passed to hyper,
/// used to enqueue chunks.
enum BodySink {
/// A Tokio sender used to feed chunks to the network stream.
Chunked(TokioSender<Vec<u8>>),
/// A Crossbeam sender used to send chunks to the fetch worker,
/// where they will be buffered
/// in order to ensure they are not streamed them over the network.
Buffered(Sender<BodyChunk>),
}
impl BodySink {
pub fn transmit_bytes(&self, bytes: Vec<u8>) {
match self {
BodySink::Chunked(ref sender) => {
let sender = sender.clone();
HANDLE
.lock()
.unwrap()
.as_mut()
.unwrap()
.spawn(sender.send(bytes).map(|_| ()).map_err(|_| ()));
},
BodySink::Buffered(ref sender) => {
let _ = sender.send(BodyChunk::Chunk(bytes));
},
}
}
pub fn close(&self) {
match self {
BodySink::Chunked(ref sender) => {
let mut sender = sender.clone();
HANDLE
.lock()
.unwrap()
.as_mut()
.unwrap()
.spawn(future::lazy(move || {
if sender.close().is_err() {
warn!("Failed to close network request sink.");
}
Ok(())
}));
},
BodySink::Buffered(ref sender) => {
let _ = sender.send(BodyChunk::Done);
},
}
}
}
fn obtain_response( fn obtain_response(
client: &Client<Connector, Body>, client: &Client<Connector, Body>,
url: &ServoUrl, url: &ServoUrl,
method: &Method, method: &Method,
request_headers: &HeaderMap, request_headers: &mut HeaderMap,
body: Option<IpcSender<BodyChunkRequest>>, body: Option<IpcSender<BodyChunkRequest>>,
source_is_null: bool,
pipeline_id: &Option<PipelineId>, pipeline_id: &Option<PipelineId>,
request_id: Option<&str>, request_id: Option<&str>,
is_xhr: bool, is_xhr: bool,
context: &FetchContext, context: &FetchContext,
fetch_terminated: Sender<bool>,
) -> Box< ) -> Box<
dyn Future< dyn Future<
Item = (HyperResponse<Decoder>, Option<ChromeToDevtoolsControlMsg>), Item = (HyperResponse<Decoder>, Option<ChromeToDevtoolsControlMsg>),
@ -435,12 +510,25 @@ fn obtain_response(
.replace("}", "%7D"); .replace("}", "%7D");
let request = if let Some(chunk_requester) = body { let request = if let Some(chunk_requester) = body {
// TODO: If body is a stream, append `Transfer-Encoding`/`chunked`, let (sink, stream) = if source_is_null {
// see step 4.2 of https://fetch.spec.whatwg.org/#concept-http-network-fetch // Step 4.2 of https://fetch.spec.whatwg.org/#concept-http-network-fetch
// TODO: this should not be set for HTTP/2(currently not supported?).
let (body_chan, body_port) = ipc::channel().unwrap(); request_headers.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
let (sender, receiver) = channel(1); let (sender, receiver) = channel(1);
(BodySink::Chunked(sender), BodyStream::Chunked(receiver))
} else {
// Note: Hyper seems to already buffer bytes when the request appears not stream-able,
// see https://github.com/hyperium/hyper/issues/2232#issuecomment-644322104
//
// However since this doesn't appear documented, and we're using an ancient version,
// for now we buffer manually to ensure we don't stream requests
// to servers that might not know how to handle them.
let (sender, receiver) = unbounded();
(BodySink::Buffered(sender), BodyStream::Buffered(receiver))
};
let (body_chan, body_port) = ipc::channel().unwrap();
let _ = chunk_requester.send(BodyChunkRequest::Connect(body_chan)); let _ = chunk_requester.send(BodyChunkRequest::Connect(body_chan));
@ -453,32 +541,58 @@ fn obtain_response(
ROUTER.add_route( ROUTER.add_route(
body_port.to_opaque(), body_port.to_opaque(),
Box::new(move |message| { Box::new(move |message| {
let bytes: Vec<u8> = message.to().unwrap(); let bytes: Vec<u8> = match message.to().unwrap() {
let chunk_requester = chunk_requester.clone(); BodyChunkResponse::Chunk(bytes) => bytes,
let sender = sender.clone(); BodyChunkResponse::Done => {
// Step 3, abort these parallel steps.
let _ = fetch_terminated.send(false);
sink.close();
return;
},
BodyChunkResponse::Error => {
// Step 4 and/or 5.
// TODO: differentiate between the two steps,
// where step 5 requires setting an `aborted` flag on the fetch.
let _ = fetch_terminated.send(true);
sink.close();
return;
},
};
devtools_bytes.lock().unwrap().append(&mut bytes.clone()); devtools_bytes.lock().unwrap().append(&mut bytes.clone());
HANDLE.lock().unwrap().as_mut().unwrap().spawn( // Step 5.1.2.2, transmit chunk over the network,
// Step 5.1.2.2 // currently implemented by sending the bytes to the fetch worker.
// Transmit a chunk over the network(and blocking until this is done). sink.transmit_bytes(bytes);
sender
.send(bytes)
.map(move |_| {
// Step 5.1.2.3 // Step 5.1.2.3
// Request the next chunk. // Request the next chunk.
let _ = chunk_requester.send(BodyChunkRequest::Chunk); let _ = chunk_requester.send(BodyChunkRequest::Chunk);
()
})
.map_err(|_| ()),
);
}), }),
); );
let body = match stream {
BodyStream::Chunked(receiver) => Body::wrap_stream(receiver),
BodyStream::Buffered(receiver) => {
// Accumulate bytes received over IPC into a vector.
let mut body = vec![];
loop {
match receiver.recv() {
Ok(BodyChunk::Chunk(mut bytes)) => {
body.append(&mut bytes);
},
Ok(BodyChunk::Done) => break,
Err(_) => warn!("Failed to read all chunks from request body."),
}
}
body.into()
},
};
HyperRequest::builder() HyperRequest::builder()
.method(method) .method(method)
.uri(encoded_url) .uri(encoded_url)
.body(Body::wrap_stream(receiver)) .body(body)
} else { } else {
HyperRequest::builder() HyperRequest::builder()
.method(method) .method(method)
@ -1566,16 +1680,26 @@ fn http_network_fetch(
// do not. Once we support other kinds of fetches we'll need to be more fine grained here // do not. Once we support other kinds of fetches we'll need to be more fine grained here
// since things like image fetches are classified differently by devtools // since things like image fetches are classified differently by devtools
let is_xhr = request.destination == Destination::None; let is_xhr = request.destination == Destination::None;
// The receiver will receive true if there has been an error streaming the request body.
let (fetch_terminated_sender, fetch_terminated_receiver) = unbounded();
let response_future = obtain_response( let response_future = obtain_response(
&context.state.client, &context.state.client,
&url, &url,
&request.method, &request.method,
&request.headers, &mut request.headers,
request.body.as_mut().map(|body| body.take_stream()), request.body.as_ref().map(|body| body.take_stream()),
request
.body
.as_ref()
.map(|body| body.source_is_null())
.unwrap_or(false),
&request.pipeline_id, &request.pipeline_id,
request_id.as_ref().map(Deref::deref), request_id.as_ref().map(Deref::deref),
is_xhr, is_xhr,
context, context,
fetch_terminated_sender,
); );
let pipeline_id = request.pipeline_id; let pipeline_id = request.pipeline_id;
@ -1585,6 +1709,22 @@ fn http_network_fetch(
Err(error) => return Response::network_error(error), Err(error) => return Response::network_error(error),
}; };
// Check if there was an error while streaming the request body.
//
// It's ok to block on the receiver,
// since we're already blocking on the response future above,
// so we can be sure that the request has already been processed,
// and a message is in the channel(or soon will be).
match fetch_terminated_receiver.recv() {
Ok(true) => {
return Response::network_error(NetworkError::Internal(
"Request body streaming failed.".into(),
));
},
Ok(false) => {},
Err(_) => warn!("Failed to receive confirmation request was streamed without error."),
}
if log_enabled!(log::Level::Info) { if log_enabled!(log::Level::Info) {
info!("{:?} response for {}", res.version(), url); info!("{:?} response for {}", res.version(), url);
for header in res.headers().iter() { for header in res.headers().iter() {

View file

@ -33,8 +33,8 @@ use net::http_loader::determine_request_referrer;
use net::resource_thread::AuthCacheEntry; use net::resource_thread::AuthCacheEntry;
use net::test::replace_host_table; use net::test::replace_host_table;
use net_traits::request::{ use net_traits::request::{
BodyChunkRequest, BodySource, CredentialsMode, Destination, RequestBody, RequestBuilder, BodyChunkRequest, BodyChunkResponse, BodySource, CredentialsMode, Destination, RequestBody,
RequestMode, RequestBuilder, RequestMode,
}; };
use net_traits::response::{HttpsState, ResponseBody}; use net_traits::response::{HttpsState, ResponseBody};
use net_traits::{CookieSource, NetworkError, ReferrerPolicy}; use net_traits::{CookieSource, NetworkError, ReferrerPolicy};
@ -108,7 +108,8 @@ fn create_request_body_with_content(content: Vec<u8>) -> RequestBody {
Box::new(move |message| { Box::new(move |message| {
let request = message.to().unwrap(); let request = message.to().unwrap();
if let BodyChunkRequest::Connect(sender) = request { if let BodyChunkRequest::Connect(sender) = request {
let _ = sender.send(content.clone()); let _ = sender.send(BodyChunkResponse::Chunk(content.clone()));
let _ = sender.send(BodyChunkResponse::Done);
} }
}), }),
); );

View file

@ -124,16 +124,33 @@ pub enum BodySource {
} }
/// Messages used to implement <https://fetch.spec.whatwg.org/#concept-request-transmit-body> /// Messages used to implement <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
/// which are sent from script to net.
#[derive(Debug, Deserialize, Serialize)]
pub enum BodyChunkResponse {
/// A chunk of bytes.
Chunk(Vec<u8>),
/// The body is done.
Done,
/// There was an error streaming the body,
/// terminate fetch.
Error,
}
/// Messages used to implement <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
/// which are sent from net to script
/// (with the exception of Done, which is sent from script to script).
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]
pub enum BodyChunkRequest { pub enum BodyChunkRequest {
/// Connect a fetch in `net`, with a stream of bytes from `script`. /// Connect a fetch in `net`, with a stream of bytes from `script`.
Connect(IpcSender<Vec<u8>>), Connect(IpcSender<BodyChunkResponse>),
/// Re-extract a new stream from the source, following a redirect. /// Re-extract a new stream from the source, following a redirect.
Extract(IpcReceiver<BodyChunkRequest>), Extract(IpcReceiver<BodyChunkRequest>),
/// Ask for another chunk. /// Ask for another chunk.
Chunk, Chunk,
/// Signal the stream is done. /// Signal the stream is done(sent from script to script).
Done, Done,
/// Signal the stream has errored(sent from script to script).
Error,
} }
/// The net component's view into <https://fetch.spec.whatwg.org/#bodies> /// The net component's view into <https://fetch.spec.whatwg.org/#bodies>
@ -173,7 +190,7 @@ impl RequestBody {
} }
} }
pub fn take_stream(&mut self) -> IpcSender<BodyChunkRequest> { pub fn take_stream(&self) -> IpcSender<BodyChunkRequest> {
self.chan.clone() self.chan.clone()
} }

View file

@ -40,7 +40,9 @@ use js::rust::wrappers::JS_ParseJSON;
use js::rust::HandleValue; use js::rust::HandleValue;
use js::typedarray::{ArrayBuffer, CreateWith}; use js::typedarray::{ArrayBuffer, CreateWith};
use mime::{self, Mime}; use mime::{self, Mime};
use net_traits::request::{BodyChunkRequest, BodySource as NetBodySource, RequestBody}; use net_traits::request::{
BodyChunkRequest, BodyChunkResponse, BodySource as NetBodySource, RequestBody,
};
use script_traits::serializable::BlobImpl; use script_traits::serializable::BlobImpl;
use std::ptr; use std::ptr;
use std::rc::Rc; use std::rc::Rc;
@ -49,7 +51,7 @@ use url::form_urlencoded;
/// The Dom object, or ReadableStream, that is the source of a body. /// The Dom object, or ReadableStream, that is the source of a body.
/// <https://fetch.spec.whatwg.org/#concept-body-source> /// <https://fetch.spec.whatwg.org/#concept-body-source>
#[derive(Clone)] #[derive(Clone, PartialEq)]
pub enum BodySource { pub enum BodySource {
/// A ReadableStream comes with a null-source. /// A ReadableStream comes with a null-source.
Null, Null,
@ -59,6 +61,14 @@ pub enum BodySource {
Object, Object,
} }
/// The reason to stop reading from the body.
enum StopReading {
/// The stream has errored.
Error,
/// The stream is done.
Done,
}
/// The IPC route handler /// The IPC route handler
/// for <https://fetch.spec.whatwg.org/#concept-request-transmit-body>. /// for <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
/// This route runs in the script process, /// This route runs in the script process,
@ -69,7 +79,7 @@ struct TransmitBodyConnectHandler {
stream: Trusted<ReadableStream>, stream: Trusted<ReadableStream>,
task_source: NetworkingTaskSource, task_source: NetworkingTaskSource,
canceller: TaskCanceller, canceller: TaskCanceller,
bytes_sender: Option<IpcSender<Vec<u8>>>, bytes_sender: Option<IpcSender<BodyChunkResponse>>,
control_sender: IpcSender<BodyChunkRequest>, control_sender: IpcSender<BodyChunkRequest>,
in_memory: Option<Vec<u8>>, in_memory: Option<Vec<u8>>,
in_memory_done: bool, in_memory_done: bool,
@ -123,7 +133,14 @@ impl TransmitBodyConnectHandler {
BodyChunkRequest::Chunk => body_handler.transmit_source(), BodyChunkRequest::Chunk => body_handler.transmit_source(),
// Note: this is actually sent from this process // Note: this is actually sent from this process
// by the TransmitBodyPromiseHandler when reading stops. // by the TransmitBodyPromiseHandler when reading stops.
BodyChunkRequest::Done => body_handler.stop_reading(), BodyChunkRequest::Done => {
body_handler.stop_reading(StopReading::Done);
},
// Note: this is actually sent from this process
// by the TransmitBodyPromiseHandler when the stream errors.
BodyChunkRequest::Error => {
body_handler.stop_reading(StopReading::Error);
},
} }
}), }),
); );
@ -138,7 +155,7 @@ impl TransmitBodyConnectHandler {
fn transmit_source(&mut self) { fn transmit_source(&mut self) {
if self.in_memory_done { if self.in_memory_done {
// Step 5.1.3 // Step 5.1.3
self.stop_reading(); self.stop_reading(StopReading::Done);
return; return;
} }
@ -153,29 +170,58 @@ impl TransmitBodyConnectHandler {
.bytes_sender .bytes_sender
.as_ref() .as_ref()
.expect("No bytes sender to transmit source.") .expect("No bytes sender to transmit source.")
.send(bytes.clone()); .send(BodyChunkResponse::Chunk(bytes.clone()));
return; return;
} }
warn!("Re-directs for file-based Blobs not supported yet."); warn!("Re-directs for file-based Blobs not supported yet.");
} }
/// Take the IPC sender sent by `net`, so we can send body chunks with it. /// Take the IPC sender sent by `net`, so we can send body chunks with it.
fn start_reading(&mut self, sender: IpcSender<Vec<u8>>) { /// Also the entry point to <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
fn start_reading(&mut self, sender: IpcSender<BodyChunkResponse>) {
self.bytes_sender = Some(sender); self.bytes_sender = Some(sender);
// If we're using an actual ReadableStream, acquire a reader for it.
if self.source == BodySource::Null {
let stream = self.stream.clone();
let _ = self.task_source.queue_with_canceller(
task!(start_reading_request_body_stream: move || {
// Step 1, Let body be requests body.
let rooted_stream = stream.root();
// TODO: Step 2, If body is null.
// Step 3, get a reader for stream.
rooted_stream.start_reading().expect("Couldn't acquire a reader for the body stream.");
// Note: this algorithm continues when the first chunk is requested by `net`.
}),
&self.canceller,
);
}
} }
/// Drop the IPC sender sent by `net` /// Drop the IPC sender sent by `net`
fn stop_reading(&mut self) { fn stop_reading(&mut self, reason: StopReading) {
// Note: this should close the corresponding receiver, let bytes_sender = self
// and terminate the request stream in `net`. .bytes_sender
self.bytes_sender = None; .take()
.expect("Stop reading called multiple times on TransmitBodyConnectHandler.");
match reason {
StopReading::Error => {
let _ = bytes_sender.send(BodyChunkResponse::Error);
},
StopReading::Done => {
let _ = bytes_sender.send(BodyChunkResponse::Done);
},
}
} }
/// The entry point to <https://fetch.spec.whatwg.org/#concept-request-transmit-body> /// Step 4 and following of <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
fn transmit_body_chunk(&mut self) { fn transmit_body_chunk(&mut self) {
if self.in_memory_done { if self.in_memory_done {
// Step 5.1.3 // Step 5.1.3
self.stop_reading(); self.stop_reading(StopReading::Done);
return; return;
} }
@ -188,7 +234,7 @@ impl TransmitBodyConnectHandler {
// In case of the data being in-memory, send everything in one chunk, by-passing SpiderMonkey. // In case of the data being in-memory, send everything in one chunk, by-passing SpiderMonkey.
if let Some(bytes) = self.in_memory.clone() { if let Some(bytes) = self.in_memory.clone() {
let _ = bytes_sender.send(bytes); let _ = bytes_sender.send(BodyChunkResponse::Chunk(bytes));
// Mark this body as `done` so that we can stop reading in the next tick, // Mark this body as `done` so that we can stop reading in the next tick,
// matching the behavior of the promise-based flow // matching the behavior of the promise-based flow
self.in_memory_done = true; self.in_memory_done = true;
@ -197,27 +243,9 @@ impl TransmitBodyConnectHandler {
let _ = self.task_source.queue_with_canceller( let _ = self.task_source.queue_with_canceller(
task!(setup_native_body_promise_handler: move || { task!(setup_native_body_promise_handler: move || {
// Step 1, Let body be requests body.
//
// TODO: We need the handle the body null case,
// here assuming body is something and we have the corresponding stream.
let rooted_stream = stream.root(); let rooted_stream = stream.root();
let global = rooted_stream.global(); let global = rooted_stream.global();
// TODO: Step 2, If body is null,
// then queue a fetch task on request to process request end-of-body
// for request and abort these steps.
// TODO: queuing those "process request ..." tasks means we also need a handle on Request here.
// Step 3, get a reader for stream.
if rooted_stream.start_reading().is_err() {
// Note: this can happen if script starts consuming request body
// before fetch starts transmitting it.
// Not in the spec.
return;
}
// Step 4, the result of reading a chunk from bodys stream with reader. // Step 4, the result of reading a chunk from bodys stream with reader.
let promise = rooted_stream.read_a_chunk(); let promise = rooted_stream.read_a_chunk();
@ -225,14 +253,19 @@ impl TransmitBodyConnectHandler {
// are a combination of the promise native handler here, // are a combination of the promise native handler here,
// and the corresponding IPC route in `component::net::http_loader`. // and the corresponding IPC route in `component::net::http_loader`.
let promise_handler = Box::new(TransmitBodyPromiseHandler { let promise_handler = Box::new(TransmitBodyPromiseHandler {
bytes_sender, bytes_sender: bytes_sender.clone(),
stream: rooted_stream.clone(), stream: rooted_stream.clone(),
control_sender: control_sender.clone(),
});
let rejection_handler = Box::new(TransmitBodyPromiseRejectionHandler {
bytes_sender,
stream: rooted_stream,
control_sender, control_sender,
}); });
let rejection_handler = Box::new(TransmitBodyPromiseRejectionHandler {stream: rooted_stream}); let handler =
PromiseNativeHandler::new(&global, Some(promise_handler), Some(rejection_handler));
let handler = PromiseNativeHandler::new(&global, Some(promise_handler), Some(rejection_handler));
let realm = enter_realm(&*global); let realm = enter_realm(&*global);
let comp = InRealm::Entered(&realm); let comp = InRealm::Entered(&realm);
@ -248,7 +281,7 @@ impl TransmitBodyConnectHandler {
#[derive(Clone, JSTraceable, MallocSizeOf)] #[derive(Clone, JSTraceable, MallocSizeOf)]
struct TransmitBodyPromiseHandler { struct TransmitBodyPromiseHandler {
#[ignore_malloc_size_of = "Channels are hard"] #[ignore_malloc_size_of = "Channels are hard"]
bytes_sender: IpcSender<Vec<u8>>, bytes_sender: IpcSender<BodyChunkResponse>,
stream: DomRoot<ReadableStream>, stream: DomRoot<ReadableStream>,
#[ignore_malloc_size_of = "Channels are hard"] #[ignore_malloc_size_of = "Channels are hard"]
control_sender: IpcSender<BodyChunkRequest>, control_sender: IpcSender<BodyChunkRequest>,
@ -278,8 +311,7 @@ impl Callback for TransmitBodyPromiseHandler {
Ok(chunk) => chunk, Ok(chunk) => chunk,
Err(_) => { Err(_) => {
// Step 5.5, the "otherwise" steps. // Step 5.5, the "otherwise" steps.
// TODO: terminate fetch. let _ = self.control_sender.send(BodyChunkRequest::Error);
let _ = self.control_sender.send(BodyChunkRequest::Done);
return self.stream.stop_reading(); return self.stream.stop_reading();
}, },
}; };
@ -287,7 +319,7 @@ impl Callback for TransmitBodyPromiseHandler {
// Step 5.1 and 5.2, transmit chunk. // Step 5.1 and 5.2, transmit chunk.
// Send the chunk to the body transmitter in net::http_loader::obtain_response. // Send the chunk to the body transmitter in net::http_loader::obtain_response.
// TODO: queue a fetch task on request to process request body for request. // TODO: queue a fetch task on request to process request body for request.
let _ = self.bytes_sender.send(chunk); let _ = self.bytes_sender.send(BodyChunkResponse::Chunk(chunk));
} }
} }
@ -295,14 +327,18 @@ impl Callback for TransmitBodyPromiseHandler {
/// <https://fetch.spec.whatwg.org/#concept-request-transmit-body>. /// <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
#[derive(Clone, JSTraceable, MallocSizeOf)] #[derive(Clone, JSTraceable, MallocSizeOf)]
struct TransmitBodyPromiseRejectionHandler { struct TransmitBodyPromiseRejectionHandler {
#[ignore_malloc_size_of = "Channels are hard"]
bytes_sender: IpcSender<BodyChunkResponse>,
stream: DomRoot<ReadableStream>, stream: DomRoot<ReadableStream>,
#[ignore_malloc_size_of = "Channels are hard"]
control_sender: IpcSender<BodyChunkRequest>,
} }
impl Callback for TransmitBodyPromiseRejectionHandler { impl Callback for TransmitBodyPromiseRejectionHandler {
/// <https://fetch.spec.whatwg.org/#concept-request-transmit-body> /// <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm) { fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm) {
// Step 5.4, the "rejection" steps. // Step 5.4, the "rejection" steps.
// TODO: terminate fetch. let _ = self.control_sender.send(BodyChunkRequest::Error);
return self.stream.stop_reading(); return self.stream.stop_reading();
} }
} }
@ -376,7 +412,14 @@ impl ExtractedBody {
BodyChunkRequest::Chunk => body_handler.transmit_body_chunk(), BodyChunkRequest::Chunk => body_handler.transmit_body_chunk(),
// Note: this is actually sent from this process // Note: this is actually sent from this process
// by the TransmitBodyPromiseHandler when reading stops. // by the TransmitBodyPromiseHandler when reading stops.
BodyChunkRequest::Done => body_handler.stop_reading(), BodyChunkRequest::Done => {
body_handler.stop_reading(StopReading::Done);
},
// Note: this is actually sent from this process
// by the TransmitBodyPromiseHandler when the stream errors.
BodyChunkRequest::Error => {
body_handler.stop_reading(StopReading::Error);
},
} }
}), }),
); );

View file

@ -1,41 +1,9 @@
[request-upload.any.html] [request-upload.any.html]
type: testharness
[Fetch with POST with ReadableStream] [Fetch with POST with ReadableStream]
expected: FAIL expected: FAIL
[Fetch with POST with ReadableStream containing String]
expected: FAIL
[Fetch with POST with ReadableStream containing null]
expected: FAIL
[Fetch with POST with ReadableStream containing number]
expected: FAIL
[Fetch with POST with ReadableStream containing ArrayBuffer]
expected: FAIL
[Fetch with POST with ReadableStream containing Blob]
expected: FAIL
[request-upload.any.worker.html] [request-upload.any.worker.html]
type: testharness
[Fetch with POST with ReadableStream] [Fetch with POST with ReadableStream]
expected: FAIL expected: FAIL
[Fetch with POST with ReadableStream containing String]
expected: FAIL
[Fetch with POST with ReadableStream containing null]
expected: FAIL
[Fetch with POST with ReadableStream containing number]
expected: FAIL
[Fetch with POST with ReadableStream containing ArrayBuffer]
expected: FAIL
[Fetch with POST with ReadableStream containing Blob]
expected: FAIL