Upgrade Hyper

This commit is contained in:
Naveen Gattu 2021-12-23 11:15:35 -08:00
parent 5df705a41f
commit a48a111cee
42 changed files with 872 additions and 891 deletions

View file

@ -12,12 +12,13 @@ use crate::hsts::HstsList;
use crate::http_cache::{CacheKey, HttpCache};
use crate::resource_thread::AuthCache;
use async_recursion::async_recursion;
use crossbeam_channel::{unbounded, Receiver, Sender};
use core::convert::Infallible;
use crossbeam_channel::Sender;
use devtools_traits::{
ChromeToDevtoolsControlMsg, DevtoolsControlMsg, HttpRequest as DevtoolsHttpRequest,
};
use devtools_traits::{HttpResponse as DevtoolsHttpResponse, NetworkEvent};
use futures_util::compat::*;
use futures::{future, StreamExt, TryFutureExt, TryStreamExt};
use headers::authorization::Basic;
use headers::{AccessControlAllowCredentials, AccessControlAllowHeaders, HeaderMapExt};
use headers::{
@ -28,12 +29,11 @@ use headers::{AccessControlAllowOrigin, AccessControlMaxAge};
use headers::{CacheControl, ContentEncoding, ContentLength};
use headers::{IfModifiedSince, LastModified, Origin as HyperOrigin, Pragma, Referer, UserAgent};
use http::header::{
self, HeaderName, HeaderValue, ACCEPT, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LOCATION,
CONTENT_TYPE,
self, HeaderValue, ACCEPT, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LOCATION, CONTENT_TYPE,
};
use http::{HeaderMap, Request as HyperRequest};
use hyper::header::TRANSFER_ENCODING;
use hyper::{Body, Client, Method, Response as HyperResponse, StatusCode};
use http::{HeaderMap, Method, Request as HyperRequest, StatusCode};
use hyper::header::{HeaderName, TRANSFER_ENCODING};
use hyper::{Body, Client, Response as HyperResponse};
use hyper_serde::Serde;
use ipc_channel::ipc::{self, IpcSender};
use ipc_channel::router::ROUTER;
@ -65,14 +65,15 @@ use std::ops::Deref;
use std::sync::{Arc as StdArc, Condvar, Mutex, RwLock};
use std::time::{Duration, SystemTime};
use time::{self, Tm};
use tokio::prelude::{future, Future, Sink, Stream};
use tokio::sync::mpsc::{channel, Receiver as TokioReceiver, Sender as TokioSender};
use tokio2::sync::mpsc::{unbounded_channel, UnboundedSender as Tokio02Sender};
use tokio_compat::runtime::{Builder, Runtime};
use tokio::runtime::Runtime;
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver as TokioReceiver, Sender as TokioSender,
UnboundedReceiver, UnboundedSender,
};
use tokio_stream::wrappers::ReceiverStream;
lazy_static! {
pub static ref HANDLE: Mutex<Option<Runtime>> =
Mutex::new(Some(Builder::new().build().unwrap()));
pub static ref HANDLE: Mutex<Option<Runtime>> = Mutex::new(Some(Runtime::new().unwrap()));
}
/// The various states an entry of the HttpCache can be in.
@ -110,10 +111,7 @@ impl HttpState {
history_states: RwLock::new(HashMap::new()),
http_cache: RwLock::new(HttpCache::new()),
http_cache_state: Mutex::new(HashMap::new()),
client: create_http_client(
tls_config,
HANDLE.lock().unwrap().as_ref().unwrap().executor(),
),
client: create_http_client(tls_config),
extra_certs: ExtraCerts::new(),
connection_certs: ConnectionCerts::new(),
}
@ -440,7 +438,7 @@ enum BodyStream {
Chunked(TokioReceiver<Vec<u8>>),
/// A body whose bytes are buffered
/// and sent in one chunk over the network.
Buffered(Receiver<BodyChunk>),
Buffered(UnboundedReceiver<BodyChunk>),
}
/// The sink side of the body passed to hyper,
@ -451,7 +449,7 @@ enum BodySink {
/// A Crossbeam sender used to send chunks to the fetch worker,
/// where they will be buffered
/// in order to ensure they are not streamed them over the network.
Buffered(Sender<BodyChunk>),
Buffered(UnboundedSender<BodyChunk>),
}
impl BodySink {
@ -459,12 +457,9 @@ impl BodySink {
match self {
BodySink::Chunked(ref sender) => {
let sender = sender.clone();
HANDLE
.lock()
.unwrap()
.as_mut()
.unwrap()
.spawn(sender.send(bytes).map(|_| ()).map_err(|_| ()));
HANDLE.lock().unwrap().as_mut().unwrap().spawn(async move {
let _ = sender.send(bytes).await;
});
},
BodySink::Buffered(ref sender) => {
let _ = sender.send(BodyChunk::Chunk(bytes));
@ -474,20 +469,7 @@ impl BodySink {
pub fn close(&self) {
match self {
BodySink::Chunked(ref sender) => {
let mut sender = sender.clone();
HANDLE
.lock()
.unwrap()
.as_mut()
.unwrap()
.spawn(future::lazy(move || {
if sender.close().is_err() {
warn!("Failed to close network request sink.");
}
Ok(())
}));
},
BodySink::Chunked(_) => { /* no need to close sender */ },
BodySink::Buffered(ref sender) => {
let _ = sender.send(BodyChunk::Done);
},
@ -506,7 +488,7 @@ async fn obtain_response(
request_id: Option<&str>,
is_xhr: bool,
context: &FetchContext,
fetch_terminated: Tokio02Sender<bool>,
fetch_terminated: UnboundedSender<bool>,
) -> Result<(HyperResponse<Decoder>, Option<ChromeToDevtoolsControlMsg>), NetworkError> {
{
let mut headers = request_headers.clone();
@ -537,7 +519,7 @@ async fn obtain_response(
// However since this doesn't appear documented, and we're using an ancient version,
// for now we buffer manually to ensure we don't stream requests
// to servers that might not know how to handle them.
let (sender, receiver) = unbounded();
let (sender, receiver) = unbounded_channel();
(BodySink::Buffered(sender), BodyStream::Buffered(receiver))
};
@ -557,6 +539,7 @@ async fn obtain_response(
ROUTER.add_route(
body_port.to_opaque(),
Box::new(move |message| {
info!("Received message");
let bytes: Vec<u8> = match message.to().unwrap() {
BodyChunkResponse::Chunk(bytes) => bytes,
BodyChunkResponse::Done => {
@ -593,23 +576,25 @@ async fn obtain_response(
);
let body = match stream {
BodyStream::Chunked(receiver) => Body::wrap_stream(receiver),
BodyStream::Buffered(receiver) => {
BodyStream::Chunked(receiver) => {
let stream = ReceiverStream::new(receiver);
Body::wrap_stream(stream.map(Ok::<_, Infallible>))
},
BodyStream::Buffered(mut receiver) => {
// Accumulate bytes received over IPC into a vector.
let mut body = vec![];
loop {
match receiver.recv() {
Ok(BodyChunk::Chunk(mut bytes)) => {
match receiver.recv().await {
Some(BodyChunk::Chunk(mut bytes)) => {
body.append(&mut bytes);
},
Ok(BodyChunk::Done) => break,
Err(_) => warn!("Failed to read all chunks from request body."),
Some(BodyChunk::Done) => break,
None => warn!("Failed to read all chunks from request body."),
}
}
body.into()
},
};
HyperRequest::builder()
.method(method)
.uri(encoded_url)
@ -709,12 +694,11 @@ async fn obtain_response(
debug!("Not notifying devtools (no request_id)");
None
};
Ok((Decoder::detect(res), msg))
future::ready(Ok((Decoder::detect(res), msg)))
})
.map_err(move |e| {
NetworkError::from_hyper_error(&e, connection_certs_clone.remove(host_clone))
})
.compat() // convert from Future01 to Future03
.await
}
}
@ -1850,7 +1834,7 @@ async fn http_network_fetch(
send_response_to_devtools(
&sender,
request_id.unwrap(),
meta_headers.map(Serde::into_inner),
meta_headers.map(|hdrs| Serde::into_inner(hdrs)),
meta_status,
pipeline_id,
);
@ -1866,19 +1850,22 @@ async fn http_network_fetch(
HANDLE.lock().unwrap().as_ref().unwrap().spawn(
res.into_body()
.map_err(|_| ())
.fold(res_body, move |res_body, chunk| {
.map_err(|e| {
warn!("Error streaming response body: {:?}", e);
()
})
.try_fold(res_body, move |res_body, chunk| {
if cancellation_listener.lock().unwrap().cancelled() {
*res_body.lock().unwrap() = ResponseBody::Done(vec![]);
let _ = done_sender.send(Data::Cancelled);
return tokio::prelude::future::failed(());
return future::ready(Err(()));
}
if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() {
let bytes = chunk.into_bytes();
let bytes = chunk;
body.extend_from_slice(&*bytes);
let _ = done_sender.send(Data::Payload(bytes.to_vec()));
}
tokio::prelude::future::ok(res_body)
future::ready(Ok(res_body))
})
.and_then(move |res_body| {
debug!("successfully finished response for {:?}", url1);
@ -1893,10 +1880,10 @@ async fn http_network_fetch(
.unwrap()
.set_attribute(ResourceAttribute::ResponseEnd);
let _ = done_sender2.send(Data::Done);
tokio::prelude::future::ok(())
future::ready(Ok(()))
})
.map_err(move |_| {
warn!("finished response for {:?} with error", url2);
debug!("finished response for {:?}", url2);
let mut body = res_body2.lock().unwrap();
let completed_body = match *body {
ResponseBody::Receiving(ref mut body) => mem::replace(body, vec![]),