Non-blocking network IO

This commit is contained in:
Naveen Gattu 2021-12-05 23:14:49 -08:00
parent f77e66bbf8
commit 903e0cd857
13 changed files with 475 additions and 345 deletions

View file

@ -11,11 +11,13 @@ use crate::fetch::methods::{main_fetch, Data, DoneChannel, FetchContext, Target}
use crate::hsts::HstsList;
use crate::http_cache::{CacheKey, HttpCache};
use crate::resource_thread::AuthCache;
use async_recursion::async_recursion;
use crossbeam_channel::{unbounded, Receiver, Sender};
use devtools_traits::{
ChromeToDevtoolsControlMsg, DevtoolsControlMsg, HttpRequest as DevtoolsHttpRequest,
};
use devtools_traits::{HttpResponse as DevtoolsHttpResponse, NetworkEvent};
use futures_util::compat::*;
use headers::authorization::Basic;
use headers::{AccessControlAllowCredentials, AccessControlAllowHeaders, HeaderMapExt};
use headers::{
@ -64,11 +66,13 @@ use std::sync::{Arc as StdArc, Condvar, Mutex, RwLock};
use std::time::{Duration, SystemTime};
use time::{self, Tm};
use tokio::prelude::{future, Future, Sink, Stream};
use tokio::runtime::Runtime;
use tokio::sync::mpsc::{channel, Receiver as TokioReceiver, Sender as TokioSender};
use tokio2::sync::mpsc::{unbounded_channel, UnboundedSender as Tokio02Sender};
use tokio_compat::runtime::{Builder, Runtime};
lazy_static! {
pub static ref HANDLE: Mutex<Option<Runtime>> = Mutex::new(Some(Runtime::new().unwrap()));
pub static ref HANDLE: Mutex<Option<Runtime>> =
Mutex::new(Some(Builder::new().build().unwrap()));
}
/// The various states an entry of the HttpCache can be in.
@ -491,180 +495,184 @@ impl BodySink {
}
}
fn obtain_response(
async fn obtain_response(
client: &Client<Connector, Body>,
url: &ServoUrl,
method: &Method,
request_headers: &mut HeaderMap,
body: Option<IpcSender<BodyChunkRequest>>,
body: Option<StdArc<Mutex<IpcSender<BodyChunkRequest>>>>,
source_is_null: bool,
pipeline_id: &Option<PipelineId>,
request_id: Option<&str>,
is_xhr: bool,
context: &FetchContext,
fetch_terminated: Sender<bool>,
) -> Box<
dyn Future<
Item = (HyperResponse<Decoder>, Option<ChromeToDevtoolsControlMsg>),
Error = NetworkError,
>,
> {
let headers = request_headers.clone();
fetch_terminated: Tokio02Sender<bool>,
) -> Result<(HyperResponse<Decoder>, Option<ChromeToDevtoolsControlMsg>), NetworkError> {
{
let mut headers = request_headers.clone();
let devtools_bytes = StdArc::new(Mutex::new(vec![]));
let devtools_bytes = StdArc::new(Mutex::new(vec![]));
// https://url.spec.whatwg.org/#percent-encoded-bytes
let encoded_url = url
.clone()
.into_url()
.as_ref()
.replace("|", "%7C")
.replace("{", "%7B")
.replace("}", "%7D");
// https://url.spec.whatwg.org/#percent-encoded-bytes
let encoded_url = url
.clone()
.into_url()
.as_ref()
.replace("|", "%7C")
.replace("{", "%7B")
.replace("}", "%7D");
let request = if let Some(chunk_requester) = body {
let (sink, stream) = if source_is_null {
// Step 4.2 of https://fetch.spec.whatwg.org/#concept-http-network-fetch
// TODO: this should not be set for HTTP/2(currently not supported?).
request_headers.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
let request = if let Some(chunk_requester) = body {
let (mut sink, stream) = if source_is_null {
// Step 4.2 of https://fetch.spec.whatwg.org/#concept-http-network-fetch
// TODO: this should not be set for HTTP/2(currently not supported?).
headers.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
let (sender, receiver) = channel(1);
(BodySink::Chunked(sender), BodyStream::Chunked(receiver))
} else {
// Note: Hyper seems to already buffer bytes when the request appears not stream-able,
// see https://github.com/hyperium/hyper/issues/2232#issuecomment-644322104
//
// However since this doesn't appear documented, and we're using an ancient version,
// for now we buffer manually to ensure we don't stream requests
// to servers that might not know how to handle them.
let (sender, receiver) = unbounded();
(BodySink::Buffered(sender), BodyStream::Buffered(receiver))
};
let (sender, receiver) = channel(1);
(BodySink::Chunked(sender), BodyStream::Chunked(receiver))
} else {
// Note: Hyper seems to already buffer bytes when the request appears not stream-able,
// see https://github.com/hyperium/hyper/issues/2232#issuecomment-644322104
//
// However since this doesn't appear documented, and we're using an ancient version,
// for now we buffer manually to ensure we don't stream requests
// to servers that might not know how to handle them.
let (sender, receiver) = unbounded();
(BodySink::Buffered(sender), BodyStream::Buffered(receiver))
};
let (body_chan, body_port) = ipc::channel().unwrap();
let (body_chan, body_port) = ipc::channel().unwrap();
let _ = chunk_requester.send(BodyChunkRequest::Connect(body_chan));
if let Ok(requester) = chunk_requester.lock() {
let _ = requester.send(BodyChunkRequest::Connect(body_chan));
// https://fetch.spec.whatwg.org/#concept-request-transmit-body
// Request the first chunk, corresponding to Step 3 and 4.
let _ = chunk_requester.send(BodyChunkRequest::Chunk);
// https://fetch.spec.whatwg.org/#concept-request-transmit-body
// Request the first chunk, corresponding to Step 3 and 4.
let _ = requester.send(BodyChunkRequest::Chunk);
}
let devtools_bytes = devtools_bytes.clone();
let devtools_bytes = devtools_bytes.clone();
let chunk_requester2 = chunk_requester.clone();
ROUTER.add_route(
body_port.to_opaque(),
Box::new(move |message| {
let bytes: Vec<u8> = match message.to().unwrap() {
BodyChunkResponse::Chunk(bytes) => bytes,
BodyChunkResponse::Done => {
// Step 3, abort these parallel steps.
let _ = fetch_terminated.send(false);
sink.close();
return;
},
BodyChunkResponse::Error => {
// Step 4 and/or 5.
// TODO: differentiate between the two steps,
// where step 5 requires setting an `aborted` flag on the fetch.
let _ = fetch_terminated.send(true);
sink.close();
return;
},
};
ROUTER.add_route(
body_port.to_opaque(),
Box::new(move |message| {
let bytes: Vec<u8> = match message.to().unwrap() {
BodyChunkResponse::Chunk(bytes) => bytes,
BodyChunkResponse::Done => {
// Step 3, abort these parallel steps.
let _ = fetch_terminated.send(false);
sink.close();
devtools_bytes.lock().unwrap().append(&mut bytes.clone());
// Step 5.1.2.2, transmit chunk over the network,
// currently implemented by sending the bytes to the fetch worker.
sink.transmit_bytes(bytes);
// Step 5.1.2.3
// Request the next chunk.
let _ = chunk_requester.send(BodyChunkRequest::Chunk);
}),
);
let body = match stream {
BodyStream::Chunked(receiver) => Body::wrap_stream(receiver),
BodyStream::Buffered(receiver) => {
// Accumulate bytes received over IPC into a vector.
let mut body = vec![];
loop {
match receiver.recv() {
Ok(BodyChunk::Chunk(mut bytes)) => {
body.append(&mut bytes);
return;
},
Ok(BodyChunk::Done) => break,
Err(_) => warn!("Failed to read all chunks from request body."),
BodyChunkResponse::Error => {
// Step 4 and/or 5.
// TODO: differentiate between the two steps,
// where step 5 requires setting an `aborted` flag on the fetch.
let _ = fetch_terminated.send(true);
sink.close();
return;
},
};
devtools_bytes.lock().unwrap().append(&mut bytes.clone());
// Step 5.1.2.2, transmit chunk over the network,
// currently implemented by sending the bytes to the fetch worker.
sink.transmit_bytes(bytes);
// Step 5.1.2.3
// Request the next chunk.
let _ = chunk_requester2
.lock()
.unwrap()
.send(BodyChunkRequest::Chunk);
}),
);
let body = match stream {
BodyStream::Chunked(receiver) => Body::wrap_stream(receiver),
BodyStream::Buffered(receiver) => {
// Accumulate bytes received over IPC into a vector.
let mut body = vec![];
loop {
match receiver.recv() {
Ok(BodyChunk::Chunk(mut bytes)) => {
body.append(&mut bytes);
},
Ok(BodyChunk::Done) => break,
Err(_) => warn!("Failed to read all chunks from request body."),
}
}
}
body.into()
},
body.into()
},
};
HyperRequest::builder()
.method(method)
.uri(encoded_url)
.body(body)
} else {
HyperRequest::builder()
.method(method)
.uri(encoded_url)
.body(Body::empty())
};
HyperRequest::builder()
.method(method)
.uri(encoded_url)
.body(body)
} else {
HyperRequest::builder()
.method(method)
.uri(encoded_url)
.body(Body::empty())
};
context
.timing
.lock()
.unwrap()
.set_attribute(ResourceAttribute::DomainLookupStart);
// TODO(#21261) connect_start: set if a persistent connection is *not* used and the last non-redirected
// fetch passes the timing allow check
let connect_start = precise_time_ms();
context
.timing
.lock()
.unwrap()
.set_attribute(ResourceAttribute::ConnectStart(connect_start));
// TODO: We currently don't know when the handhhake before the connection is done
// so our best bet would be to set `secure_connection_start` here when we are currently
// fetching on a HTTPS url.
if url.scheme() == "https" {
context
.timing
.lock()
.unwrap()
.set_attribute(ResourceAttribute::SecureConnectionStart);
}
.set_attribute(ResourceAttribute::DomainLookupStart);
let mut request = match request {
Ok(request) => request,
Err(e) => return Box::new(future::result(Err(NetworkError::from_http_error(&e)))),
};
*request.headers_mut() = headers.clone();
// TODO(#21261) connect_start: set if a persistent connection is *not* used and the last non-redirected
// fetch passes the timing allow check
let connect_start = precise_time_ms();
context
.timing
.lock()
.unwrap()
.set_attribute(ResourceAttribute::ConnectStart(connect_start));
let connect_end = precise_time_ms();
context
.timing
.lock()
.unwrap()
.set_attribute(ResourceAttribute::ConnectEnd(connect_end));
// TODO: We currently don't know when the handhhake before the connection is done
// so our best bet would be to set `secure_connection_start` here when we are currently
// fetching on a HTTPS url.
if url.scheme() == "https" {
context
.timing
.lock()
.unwrap()
.set_attribute(ResourceAttribute::SecureConnectionStart);
}
let request_id = request_id.map(|v| v.to_owned());
let pipeline_id = pipeline_id.clone();
let closure_url = url.clone();
let method = method.clone();
let send_start = precise_time_ms();
let mut request = match request {
Ok(request) => request,
Err(e) => return Err(NetworkError::from_http_error(&e)),
};
*request.headers_mut() = headers.clone();
let host = request.uri().host().unwrap_or("").to_owned();
let host_clone = request.uri().host().unwrap_or("").to_owned();
let connection_certs = context.state.connection_certs.clone();
let connection_certs_clone = context.state.connection_certs.clone();
let connect_end = precise_time_ms();
context
.timing
.lock()
.unwrap()
.set_attribute(ResourceAttribute::ConnectEnd(connect_end));
let request_id = request_id.map(|v| v.to_owned());
let pipeline_id = pipeline_id.clone();
let closure_url = url.clone();
let method = method.clone();
let send_start = precise_time_ms();
let host = request.uri().host().unwrap_or("").to_owned();
let host_clone = request.uri().host().unwrap_or("").to_owned();
let connection_certs = context.state.connection_certs.clone();
let connection_certs_clone = context.state.connection_certs.clone();
let headers = headers.clone();
let headers = headers.clone();
Box::new(
client
.request(request)
.and_then(move |res| {
@ -705,18 +713,21 @@ fn obtain_response(
})
.map_err(move |e| {
NetworkError::from_hyper_error(&e, connection_certs_clone.remove(host_clone))
}),
)
})
.compat() // convert from Future01 to Future03
.await
}
}
/// [HTTP fetch](https://fetch.spec.whatwg.org#http-fetch)
pub fn http_fetch(
#[async_recursion]
pub async fn http_fetch(
request: &mut Request,
cache: &mut CorsCache,
cors_flag: bool,
cors_preflight_flag: bool,
authentication_fetch_flag: bool,
target: Target,
target: Target<'async_recursion>,
done_chan: &mut DoneChannel,
context: &FetchContext,
) -> Response {
@ -771,7 +782,7 @@ pub fn http_fetch(
// Sub-substep 1
if method_mismatch || header_mismatch {
let preflight_result = cors_preflight_fetch(&request, cache, context);
let preflight_result = cors_preflight_fetch(&request, cache, context).await;
// Sub-substep 2
if let Some(e) = preflight_result.get_network_error() {
return Response::network_error(e.clone());
@ -799,7 +810,8 @@ pub fn http_fetch(
cors_flag,
done_chan,
context,
);
)
.await;
// Substep 4
if cors_flag && cors_check(&request, &fetch_result).is_err() {
@ -865,6 +877,7 @@ pub fn http_fetch(
http_redirect_fetch(
request, cache, response, cors_flag, target, done_chan, context,
)
.await
},
};
}
@ -907,12 +920,13 @@ impl Drop for RedirectEndTimer {
}
/// [HTTP redirect fetch](https://fetch.spec.whatwg.org#http-redirect-fetch)
pub fn http_redirect_fetch(
#[async_recursion]
pub async fn http_redirect_fetch(
request: &mut Request,
cache: &mut CorsCache,
response: Response,
cors_flag: bool,
target: Target,
target: Target<'async_recursion>,
done_chan: &mut DoneChannel,
context: &FetchContext,
) -> Response {
@ -1071,7 +1085,8 @@ pub fn http_redirect_fetch(
target,
done_chan,
context,
);
)
.await;
// TODO: timing allow check
context
@ -1100,7 +1115,8 @@ fn try_immutable_origin_to_hyper_origin(url_origin: &ImmutableOrigin) -> Option<
}
/// [HTTP network or cache fetch](https://fetch.spec.whatwg.org#http-network-or-cache-fetch)
fn http_network_or_cache_fetch(
#[async_recursion]
async fn http_network_or_cache_fetch(
request: &mut Request,
authentication_fetch_flag: bool,
cors_flag: bool,
@ -1398,26 +1414,27 @@ fn http_network_or_cache_fetch(
}
}
fn wait_for_cached_response(done_chan: &mut DoneChannel, response: &mut Option<Response>) {
if let Some(ref ch) = *done_chan {
async fn wait_for_cached_response(
done_chan: &mut DoneChannel,
response: &mut Option<Response>,
) {
if let Some(ref mut ch) = *done_chan {
// The cache constructed a response with a body of ResponseBody::Receiving.
// We wait for the response in the cache to "finish",
// with a body of either Done or Cancelled.
assert!(response.is_some());
loop {
match ch
.1
.recv()
.expect("HTTP cache should always send Done or Cancelled")
{
Data::Payload(_) => {},
Data::Done => break, // Return the full response as if it was initially cached as such.
Data::Cancelled => {
match ch.1.recv().await {
Some(Data::Payload(_)) => {},
Some(Data::Done) => break, // Return the full response as if it was initially cached as such.
Some(Data::Cancelled) => {
// The response was cancelled while the fetch was ongoing.
// Set response to None, which will trigger a network fetch below.
*response = None;
break;
},
_ => panic!("HTTP cache should always send Done or Cancelled"),
}
}
}
@ -1425,7 +1442,7 @@ fn http_network_or_cache_fetch(
*done_chan = None;
}
wait_for_cached_response(done_chan, &mut response);
wait_for_cached_response(done_chan, &mut response).await;
// Step 6
// TODO: https://infra.spec.whatwg.org/#if-aborted
@ -1446,7 +1463,7 @@ fn http_network_or_cache_fetch(
if response.is_none() {
// Substep 2
let forward_response =
http_network_fetch(http_request, credentials_flag, done_chan, context);
http_network_fetch(http_request, credentials_flag, done_chan, context).await;
// Substep 3
if let Some((200..=399, _)) = forward_response.raw_status {
if !http_request.method.is_safe() {
@ -1467,8 +1484,8 @@ fn http_network_or_cache_fetch(
// since the network response will be replaced by the revalidated stored one.
*done_chan = None;
response = http_cache.refresh(&http_request, forward_response.clone(), done_chan);
wait_for_cached_response(done_chan, &mut response);
}
wait_for_cached_response(done_chan, &mut response).await;
}
// Substep 5
@ -1596,7 +1613,8 @@ fn http_network_or_cache_fetch(
cors_flag,
done_chan,
context,
);
)
.await;
}
// Step 11
@ -1655,7 +1673,7 @@ impl Drop for ResponseEndTimer {
}
/// [HTTP network fetch](https://fetch.spec.whatwg.org/#http-network-fetch)
fn http_network_fetch(
async fn http_network_fetch(
request: &mut Request,
credentials_flag: bool,
done_chan: &mut DoneChannel,
@ -1686,7 +1704,7 @@ fn http_network_fetch(
if log_enabled!(log::Level::Info) {
info!("{:?} request for {}", request.method, url);
for header in request.headers.iter() {
info!(" - {:?}", header);
debug!(" - {:?}", header);
}
}
@ -1696,7 +1714,7 @@ fn http_network_fetch(
let is_xhr = request.destination == Destination::None;
// The receiver will receive true if there has been an error streaming the request body.
let (fetch_terminated_sender, fetch_terminated_receiver) = unbounded();
let (fetch_terminated_sender, mut fetch_terminated_receiver) = unbounded_channel();
let body = request.body.as_ref().map(|body| body.take_stream());
@ -1728,32 +1746,28 @@ fn http_network_fetch(
let pipeline_id = request.pipeline_id;
// This will only get the headers, the body is read later
let (res, msg) = match response_future.wait() {
let (res, msg) = match response_future.await {
Ok(wrapped_response) => wrapped_response,
Err(error) => return Response::network_error(error),
};
if log_enabled!(log::Level::Info) {
debug!("{:?} response for {}", res.version(), url);
for header in res.headers().iter() {
debug!(" - {:?}", header);
}
}
// Check if there was an error while streaming the request body.
//
// It's ok to block on the receiver,
// since we're already blocking on the response future above,
// so we can be sure that the request has already been processed,
// and a message is in the channel(or soon will be).
match fetch_terminated_receiver.recv() {
Ok(true) => {
match fetch_terminated_receiver.recv().await {
Some(true) => {
return Response::network_error(NetworkError::Internal(
"Request body streaming failed.".into(),
));
},
Ok(false) => {},
Err(_) => warn!("Failed to receive confirmation request was streamed without error."),
}
if log_enabled!(log::Level::Info) {
info!("{:?} response for {}", res.version(), url);
for header in res.headers().iter() {
info!(" - {:?}", header);
}
Some(false) => {},
_ => warn!("Failed to receive confirmation request was streamed without error."),
}
let header_strings: Vec<&str> = res
@ -1791,7 +1805,7 @@ fn http_network_fetch(
res.status(),
res.status().canonical_reason().unwrap_or("").into(),
));
debug!("got {:?} response for {:?}", res.status(), request.url());
info!("got {:?} response for {:?}", res.status(), request.url());
response.raw_status = Some((
res.status().as_u16(),
res.status().canonical_reason().unwrap_or("").into(),
@ -1803,7 +1817,7 @@ fn http_network_fetch(
let res_body = response.body.clone();
// We're about to spawn a future to be waited on here
let (done_sender, done_receiver) = unbounded();
let (done_sender, done_receiver) = unbounded_channel();
*done_chan = Some((done_sender.clone(), done_receiver));
let meta = match response
.metadata()
@ -1825,6 +1839,7 @@ fn http_network_fetch(
let res_body2 = res_body.clone();
if let Some(ref sender) = devtools_sender {
let sender = sender.lock().unwrap();
if let Some(m) = msg {
send_request_to_devtools(m, &sender);
}
@ -1848,21 +1863,22 @@ fn http_network_fetch(
let timing_ptr3 = context.timing.clone();
let url1 = request.url();
let url2 = url1.clone();
HANDLE.lock().unwrap().as_mut().unwrap().spawn(
HANDLE.lock().unwrap().as_ref().unwrap().spawn(
res.into_body()
.map_err(|_| ())
.fold(res_body, move |res_body, chunk| {
if cancellation_listener.lock().unwrap().cancelled() {
*res_body.lock().unwrap() = ResponseBody::Done(vec![]);
let _ = done_sender.send(Data::Cancelled);
return future::failed(());
return tokio::prelude::future::failed(());
}
if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() {
let bytes = chunk.into_bytes();
body.extend_from_slice(&*bytes);
let _ = done_sender.send(Data::Payload(bytes.to_vec()));
}
future::ok(res_body)
tokio::prelude::future::ok(res_body)
})
.and_then(move |res_body| {
debug!("successfully finished response for {:?}", url1);
@ -1877,10 +1893,10 @@ fn http_network_fetch(
.unwrap()
.set_attribute(ResourceAttribute::ResponseEnd);
let _ = done_sender2.send(Data::Done);
future::ok(())
tokio::prelude::future::ok(())
})
.map_err(move |_| {
debug!("finished response for {:?} with error", url2);
warn!("finished response for {:?} with error", url2);
let mut body = res_body2.lock().unwrap();
let completed_body = match *body {
ResponseBody::Receiving(ref mut body) => mem::replace(body, vec![]),
@ -1956,7 +1972,7 @@ fn http_network_fetch(
}
/// [CORS preflight fetch](https://fetch.spec.whatwg.org#cors-preflight-fetch)
fn cors_preflight_fetch(
async fn cors_preflight_fetch(
request: &Request,
cache: &mut CorsCache,
context: &FetchContext,
@ -2000,7 +2016,8 @@ fn cors_preflight_fetch(
}
// Step 6
let response = http_network_or_cache_fetch(&mut preflight, false, false, &mut None, context);
let response =
http_network_or_cache_fetch(&mut preflight, false, false, &mut None, context).await;
// Step 7
if cors_check(&request, &response).is_ok() &&
response