mirror of
https://github.com/servo/servo.git
synced 2025-08-06 06:00:15 +01:00
fix streaming request bodies, terminate fetch if the body stream errors
This commit is contained in:
parent
581ade575e
commit
719b395c40
6 changed files with 282 additions and 109 deletions
|
@ -11,7 +11,7 @@ use crate::fetch::methods::{main_fetch, Data, DoneChannel, FetchContext, Target}
|
|||
use crate::hsts::HstsList;
|
||||
use crate::http_cache::{CacheKey, HttpCache};
|
||||
use crate::resource_thread::AuthCache;
|
||||
use crossbeam_channel::{unbounded, Sender};
|
||||
use crossbeam_channel::{unbounded, Receiver, Sender};
|
||||
use devtools_traits::{
|
||||
ChromeToDevtoolsControlMsg, DevtoolsControlMsg, HttpRequest as DevtoolsHttpRequest,
|
||||
};
|
||||
|
@ -30,6 +30,7 @@ use http::header::{
|
|||
CONTENT_TYPE,
|
||||
};
|
||||
use http::{HeaderMap, Request as HyperRequest};
|
||||
use hyper::header::TRANSFER_ENCODING;
|
||||
use hyper::{Body, Client, Method, Response as HyperResponse, StatusCode};
|
||||
use hyper_serde::Serde;
|
||||
use ipc_channel::ipc::{self, IpcSender};
|
||||
|
@ -40,7 +41,8 @@ use net_traits::quality::{quality_to_value, Quality, QualityItem};
|
|||
use net_traits::request::Origin::Origin as SpecificOrigin;
|
||||
use net_traits::request::{is_cors_safelisted_method, is_cors_safelisted_request_header};
|
||||
use net_traits::request::{
|
||||
BodyChunkRequest, RedirectMode, Referrer, Request, RequestBuilder, RequestMode,
|
||||
BodyChunkRequest, BodyChunkResponse, RedirectMode, Referrer, Request, RequestBuilder,
|
||||
RequestMode,
|
||||
};
|
||||
use net_traits::request::{CacheMode, CredentialsMode, Destination, Origin};
|
||||
use net_traits::request::{ResponseTainting, ServiceWorkersMode};
|
||||
|
@ -61,7 +63,7 @@ use std::time::{Duration, SystemTime};
|
|||
use time::{self, Tm};
|
||||
use tokio::prelude::{future, Future, Sink, Stream};
|
||||
use tokio::runtime::Runtime;
|
||||
use tokio::sync::mpsc::channel;
|
||||
use tokio::sync::mpsc::{channel, Receiver as TokioReceiver, Sender as TokioSender};
|
||||
|
||||
lazy_static! {
|
||||
pub static ref HANDLE: Mutex<Option<Runtime>> = Mutex::new(Some(Runtime::new().unwrap()));
|
||||
|
@ -405,16 +407,89 @@ fn auth_from_cache(
|
|||
}
|
||||
}
|
||||
|
||||
/// Messages from the IPC route to the fetch worker,
|
||||
/// used to fill the body with bytes coming-in over IPC.
|
||||
enum BodyChunk {
|
||||
/// A chunk of bytes.
|
||||
Chunk(Vec<u8>),
|
||||
/// Body is done.
|
||||
Done,
|
||||
}
|
||||
|
||||
/// The stream side of the body passed to hyper.
|
||||
enum BodyStream {
|
||||
/// A receiver that can be used in Body::wrap_stream,
|
||||
/// for streaming the request over the network.
|
||||
Chunked(TokioReceiver<Vec<u8>>),
|
||||
/// A body whose bytes are buffered
|
||||
/// and sent in one chunk over the network.
|
||||
Buffered(Receiver<BodyChunk>),
|
||||
}
|
||||
|
||||
/// The sink side of the body passed to hyper,
|
||||
/// used to enqueue chunks.
|
||||
enum BodySink {
|
||||
/// A Tokio sender used to feed chunks to the network stream.
|
||||
Chunked(TokioSender<Vec<u8>>),
|
||||
/// A Crossbeam sender used to send chunks to the fetch worker,
|
||||
/// where they will be buffered
|
||||
/// in order to ensure they are not streamed them over the network.
|
||||
Buffered(Sender<BodyChunk>),
|
||||
}
|
||||
|
||||
impl BodySink {
|
||||
pub fn transmit_bytes(&self, bytes: Vec<u8>) {
|
||||
match self {
|
||||
BodySink::Chunked(ref sender) => {
|
||||
let sender = sender.clone();
|
||||
HANDLE
|
||||
.lock()
|
||||
.unwrap()
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.spawn(sender.send(bytes).map(|_| ()).map_err(|_| ()));
|
||||
},
|
||||
BodySink::Buffered(ref sender) => {
|
||||
let _ = sender.send(BodyChunk::Chunk(bytes));
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn close(&self) {
|
||||
match self {
|
||||
BodySink::Chunked(ref sender) => {
|
||||
let mut sender = sender.clone();
|
||||
HANDLE
|
||||
.lock()
|
||||
.unwrap()
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.spawn(future::lazy(move || {
|
||||
if sender.close().is_err() {
|
||||
warn!("Failed to close network request sink.");
|
||||
}
|
||||
Ok(())
|
||||
}));
|
||||
},
|
||||
BodySink::Buffered(ref sender) => {
|
||||
let _ = sender.send(BodyChunk::Done);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn obtain_response(
|
||||
client: &Client<Connector, Body>,
|
||||
url: &ServoUrl,
|
||||
method: &Method,
|
||||
request_headers: &HeaderMap,
|
||||
request_headers: &mut HeaderMap,
|
||||
body: Option<IpcSender<BodyChunkRequest>>,
|
||||
source_is_null: bool,
|
||||
pipeline_id: &Option<PipelineId>,
|
||||
request_id: Option<&str>,
|
||||
is_xhr: bool,
|
||||
context: &FetchContext,
|
||||
fetch_terminated: Sender<bool>,
|
||||
) -> Box<
|
||||
dyn Future<
|
||||
Item = (HyperResponse<Decoder>, Option<ChromeToDevtoolsControlMsg>),
|
||||
|
@ -435,13 +510,26 @@ fn obtain_response(
|
|||
.replace("}", "%7D");
|
||||
|
||||
let request = if let Some(chunk_requester) = body {
|
||||
// TODO: If body is a stream, append `Transfer-Encoding`/`chunked`,
|
||||
// see step 4.2 of https://fetch.spec.whatwg.org/#concept-http-network-fetch
|
||||
let (sink, stream) = if source_is_null {
|
||||
// Step 4.2 of https://fetch.spec.whatwg.org/#concept-http-network-fetch
|
||||
// TODO: this should not be set for HTTP/2(currently not supported?).
|
||||
request_headers.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
|
||||
|
||||
let (sender, receiver) = channel(1);
|
||||
(BodySink::Chunked(sender), BodyStream::Chunked(receiver))
|
||||
} else {
|
||||
// Note: Hyper seems to already buffer bytes when the request appears not stream-able,
|
||||
// see https://github.com/hyperium/hyper/issues/2232#issuecomment-644322104
|
||||
//
|
||||
// However since this doesn't appear documented, and we're using an ancient version,
|
||||
// for now we buffer manually to ensure we don't stream requests
|
||||
// to servers that might not know how to handle them.
|
||||
let (sender, receiver) = unbounded();
|
||||
(BodySink::Buffered(sender), BodyStream::Buffered(receiver))
|
||||
};
|
||||
|
||||
let (body_chan, body_port) = ipc::channel().unwrap();
|
||||
|
||||
let (sender, receiver) = channel(1);
|
||||
|
||||
let _ = chunk_requester.send(BodyChunkRequest::Connect(body_chan));
|
||||
|
||||
// https://fetch.spec.whatwg.org/#concept-request-transmit-body
|
||||
|
@ -453,32 +541,58 @@ fn obtain_response(
|
|||
ROUTER.add_route(
|
||||
body_port.to_opaque(),
|
||||
Box::new(move |message| {
|
||||
let bytes: Vec<u8> = message.to().unwrap();
|
||||
let chunk_requester = chunk_requester.clone();
|
||||
let sender = sender.clone();
|
||||
let bytes: Vec<u8> = match message.to().unwrap() {
|
||||
BodyChunkResponse::Chunk(bytes) => bytes,
|
||||
BodyChunkResponse::Done => {
|
||||
// Step 3, abort these parallel steps.
|
||||
let _ = fetch_terminated.send(false);
|
||||
sink.close();
|
||||
return;
|
||||
},
|
||||
BodyChunkResponse::Error => {
|
||||
// Step 4 and/or 5.
|
||||
// TODO: differentiate between the two steps,
|
||||
// where step 5 requires setting an `aborted` flag on the fetch.
|
||||
let _ = fetch_terminated.send(true);
|
||||
sink.close();
|
||||
return;
|
||||
},
|
||||
};
|
||||
|
||||
devtools_bytes.lock().unwrap().append(&mut bytes.clone());
|
||||
|
||||
HANDLE.lock().unwrap().as_mut().unwrap().spawn(
|
||||
// Step 5.1.2.2
|
||||
// Transmit a chunk over the network(and blocking until this is done).
|
||||
sender
|
||||
.send(bytes)
|
||||
.map(move |_| {
|
||||
// Step 5.1.2.3
|
||||
// Request the next chunk.
|
||||
let _ = chunk_requester.send(BodyChunkRequest::Chunk);
|
||||
()
|
||||
})
|
||||
.map_err(|_| ()),
|
||||
);
|
||||
// Step 5.1.2.2, transmit chunk over the network,
|
||||
// currently implemented by sending the bytes to the fetch worker.
|
||||
sink.transmit_bytes(bytes);
|
||||
|
||||
// Step 5.1.2.3
|
||||
// Request the next chunk.
|
||||
let _ = chunk_requester.send(BodyChunkRequest::Chunk);
|
||||
}),
|
||||
);
|
||||
|
||||
let body = match stream {
|
||||
BodyStream::Chunked(receiver) => Body::wrap_stream(receiver),
|
||||
BodyStream::Buffered(receiver) => {
|
||||
// Accumulate bytes received over IPC into a vector.
|
||||
let mut body = vec![];
|
||||
loop {
|
||||
match receiver.recv() {
|
||||
Ok(BodyChunk::Chunk(mut bytes)) => {
|
||||
body.append(&mut bytes);
|
||||
},
|
||||
Ok(BodyChunk::Done) => break,
|
||||
Err(_) => warn!("Failed to read all chunks from request body."),
|
||||
}
|
||||
}
|
||||
body.into()
|
||||
},
|
||||
};
|
||||
|
||||
HyperRequest::builder()
|
||||
.method(method)
|
||||
.uri(encoded_url)
|
||||
.body(Body::wrap_stream(receiver))
|
||||
.body(body)
|
||||
} else {
|
||||
HyperRequest::builder()
|
||||
.method(method)
|
||||
|
@ -1566,16 +1680,26 @@ fn http_network_fetch(
|
|||
// do not. Once we support other kinds of fetches we'll need to be more fine grained here
|
||||
// since things like image fetches are classified differently by devtools
|
||||
let is_xhr = request.destination == Destination::None;
|
||||
|
||||
// The receiver will receive true if there has been an error streaming the request body.
|
||||
let (fetch_terminated_sender, fetch_terminated_receiver) = unbounded();
|
||||
|
||||
let response_future = obtain_response(
|
||||
&context.state.client,
|
||||
&url,
|
||||
&request.method,
|
||||
&request.headers,
|
||||
request.body.as_mut().map(|body| body.take_stream()),
|
||||
&mut request.headers,
|
||||
request.body.as_ref().map(|body| body.take_stream()),
|
||||
request
|
||||
.body
|
||||
.as_ref()
|
||||
.map(|body| body.source_is_null())
|
||||
.unwrap_or(false),
|
||||
&request.pipeline_id,
|
||||
request_id.as_ref().map(Deref::deref),
|
||||
is_xhr,
|
||||
context,
|
||||
fetch_terminated_sender,
|
||||
);
|
||||
|
||||
let pipeline_id = request.pipeline_id;
|
||||
|
@ -1585,6 +1709,22 @@ fn http_network_fetch(
|
|||
Err(error) => return Response::network_error(error),
|
||||
};
|
||||
|
||||
// Check if there was an error while streaming the request body.
|
||||
//
|
||||
// It's ok to block on the receiver,
|
||||
// since we're already blocking on the response future above,
|
||||
// so we can be sure that the request has already been processed,
|
||||
// and a message is in the channel(or soon will be).
|
||||
match fetch_terminated_receiver.recv() {
|
||||
Ok(true) => {
|
||||
return Response::network_error(NetworkError::Internal(
|
||||
"Request body streaming failed.".into(),
|
||||
));
|
||||
},
|
||||
Ok(false) => {},
|
||||
Err(_) => warn!("Failed to receive confirmation request was streamed without error."),
|
||||
}
|
||||
|
||||
if log_enabled!(log::Level::Info) {
|
||||
info!("{:?} response for {}", res.version(), url);
|
||||
for header in res.headers().iter() {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue