net: use connection pooling

This commit is contained in:
Sean McArthur 2015-08-05 10:46:13 -07:00 committed by Josh Matthews
parent b7c88dd547
commit a1a9db8ffd
10 changed files with 185 additions and 133 deletions

View file

@ -8,14 +8,15 @@ use file_loader;
use flate2::read::{DeflateDecoder, GzDecoder};
use hsts::secure_url;
use hyper::Error as HttpError;
use hyper::client::{Request, Response};
use hyper::client::{Request, Response, Pool};
use hyper::error::Result as HttpResult;
use hyper::header::{AcceptEncoding, Accept, ContentLength, ContentType, Host};
use hyper::header::{Location, qitem, StrictTransportSecurity};
use hyper::header::{Quality, QualityItem, Headers, ContentEncoding, Encoding};
use hyper::http::RawStatus;
use hyper::method::Method;
use hyper::mime::{Mime, TopLevel, SubLevel};
use hyper::net::{Fresh, HttpsConnector, Openssl};
use hyper::net::{Fresh, HttpsConnector, HttpStream, Ssl, Openssl};
use hyper::status::{StatusCode, StatusClass};
use ipc_channel::ipc::{self, IpcSender};
use log;
@ -23,6 +24,7 @@ use mime_classifier::MIMEClassifier;
use net_traits::ProgressMsg::{Payload, Done};
use net_traits::hosts::replace_hosts;
use net_traits::{ControlMsg, CookieSource, LoadData, Metadata, LoadConsumer, IncludeSubdomains};
use openssl::ssl::SslStream as OpensslStream;
use openssl::ssl::{SslContext, SslMethod, SSL_VERIFY_PEER};
use resource_task::{start_sending_opt, start_sending_sniffed_opt};
use std::borrow::ToOwned;
@ -37,12 +39,49 @@ use util::resource_files::resources_dir_path;
use util::task::spawn_named;
use uuid;
pub type Connector = HttpsConnector<SslProvider>;
pub enum SslProvider {
None,
Openssl(Openssl)
}
impl Ssl for SslProvider {
type Stream = OpensslStream<HttpStream>;
fn wrap_client(&self, stream: HttpStream, host: &str) -> HttpResult<Self::Stream> {
match *self {
SslProvider::None => Err(HttpError::Ssl("ssl disabled".into())),
SslProvider::Openssl(ref s) => s.wrap_client(stream, host)
}
}
fn wrap_server(&self, _: HttpStream) -> HttpResult<Self::Stream> {
unimplemented!()
}
}
pub fn create_http_connector() -> Arc<Pool<Connector>> {
let mut context = SslContext::new(SslMethod::Sslv23).unwrap();
context.set_verify(SSL_VERIFY_PEER, None);
context.set_CA_file(&resources_dir_path().join("certs")).unwrap();
let connector = HttpsConnector::new(SslProvider::Openssl(Openssl {
context: Arc::new(context)
}));
Arc::new(Pool::with_connector(Default::default(), connector))
}
pub fn factory(resource_mgr_chan: IpcSender<ControlMsg>,
devtools_chan: Option<Sender<DevtoolsControlMsg>>)
devtools_chan: Option<Sender<DevtoolsControlMsg>>,
connector: Arc<Pool<Connector>>)
-> Box<FnBox(LoadData, LoadConsumer, Arc<MIMEClassifier>) + Send> {
box move |load_data: LoadData, senders, classifier| {
spawn_named(format!("http_loader for {}", load_data.url.serialize()),
move || load_for_consumer(load_data, senders, classifier, resource_mgr_chan, devtools_chan))
spawn_named(format!("http_loader for {}", load_data.url.serialize()), move || {
load_for_consumer(load_data, senders, classifier, connector, resource_mgr_chan, devtools_chan)
})
}
}
@ -82,10 +121,14 @@ fn inner_url(url: &Url) -> Url {
fn load_for_consumer(load_data: LoadData,
start_chan: LoadConsumer,
classifier: Arc<MIMEClassifier>,
connector: Arc<Pool<Connector>>,
resource_mgr_chan: IpcSender<ControlMsg>,
devtools_chan: Option<Sender<DevtoolsControlMsg>>) {
match load::<WrappedHttpRequest>(load_data, resource_mgr_chan, devtools_chan, &NetworkHttpRequestFactory) {
let factory = NetworkHttpRequestFactory {
connector: connector,
};
match load::<WrappedHttpRequest>(load_data, resource_mgr_chan, devtools_chan, &factory) {
Err(LoadError::UnsupportedScheme(url)) => {
let s = format!("{} request, but we don't support that scheme", &*url.scheme);
send_error(url, s, start_chan)
@ -111,6 +154,7 @@ fn load_for_consumer(load_data: LoadData,
file_loader::factory(load_data, start_chan, classifier)
}
Err(LoadError::ConnectionAborted(_)) => unreachable!(),
Ok(mut load_response) => {
let metadata = load_response.metadata.clone();
send_data(&mut load_response, start_chan, metadata, classifier)
@ -172,18 +216,15 @@ pub trait HttpRequestFactory {
fn create(&self, url: Url, method: Method) -> Result<Self::R, LoadError>;
}
struct NetworkHttpRequestFactory;
struct NetworkHttpRequestFactory {
connector: Arc<Pool<Connector>>,
}
impl HttpRequestFactory for NetworkHttpRequestFactory {
type R = WrappedHttpRequest;
fn create(&self, url: Url, method: Method) -> Result<WrappedHttpRequest, LoadError> {
let mut context = SslContext::new(SslMethod::Sslv23).unwrap();
context.set_verify(SSL_VERIFY_PEER, None);
context.set_CA_file(&resources_dir_path().join("certs")).unwrap();
let connector = HttpsConnector::new(Openssl { context: Arc::new(context) });
let connection = Request::with_connector(method, url.clone(), &connector);
let connection = Request::with_connector(method, url.clone(), &*self.connector);
let ssl_err_string = "Some(OpenSslErrors([UnknownError { library: \"SSL routines\", \
function: \"SSL3_GET_SERVER_CERTIFICATE\", \
@ -253,6 +294,9 @@ impl HttpRequest for WrappedHttpRequest {
let response = match request_writer.send() {
Ok(w) => w,
Err(HttpError::Io(ref io_error)) if io_error.kind() == io::ErrorKind::ConnectionAborted => {
return Err(LoadError::ConnectionAborted(io_error.description().to_string()));
},
Err(e) => return Err(LoadError::Connection(url, e.description().to_string()))
};
@ -268,7 +312,8 @@ pub enum LoadError {
Ssl(Url, String),
InvalidRedirect(Url, String),
Decoding(Url, String),
MaxRedirects(Url)
MaxRedirects(Url),
ConnectionAborted(String),
}
fn set_default_accept_encoding(headers: &mut Headers) {
@ -428,11 +473,12 @@ fn send_response_to_devtools(devtools_chan: Option<Sender<DevtoolsControlMsg>>,
chan.send(DevtoolsControlMsg::FromChrome(msg)).unwrap();
}
}
pub fn load<A>(load_data: LoadData,
resource_mgr_chan: IpcSender<ControlMsg>,
devtools_chan: Option<Sender<DevtoolsControlMsg>>,
request_factory: &HttpRequestFactory<R=A>)
-> Result<StreamedResponse<A::R>, LoadError> where A: HttpRequest + 'static {
resource_mgr_chan: IpcSender<ControlMsg>,
devtools_chan: Option<Sender<DevtoolsControlMsg>>,
request_factory: &HttpRequestFactory<R=A>)
-> Result<StreamedResponse<A::R>, LoadError> where A: HttpRequest + 'static {
// FIXME: At the time of writing this FIXME, servo didn't have any central
// location for configuration. If you're reading this and such a
// repository DOES exist, please update this constant to use it.
@ -499,53 +545,74 @@ pub fn load<A>(load_data: LoadData,
set_default_accept_encoding(&mut request_headers);
set_request_cookies(doc_url.clone(), &mut request_headers, &resource_mgr_chan);
let mut req = try!(request_factory.create(url.clone(), method.clone()));
*req.headers_mut() = request_headers;
if log_enabled!(log::LogLevel::Info) {
info!("{}", method);
for header in req.headers_mut().iter() {
info!(" - {}", header);
}
info!("{:?}", load_data.data);
}
// Avoid automatically sending request body if a redirect has occurred.
//
// TODO - This is the wrong behaviour according to the RFC. However, I'm not
// sure how much "correctness" vs. real-world is important in this case.
//
// https://tools.ietf.org/html/rfc7231#section-6.4
let is_redirected_request = iters != 1;
let request_id = uuid::Uuid::new_v4().to_simple_string();
let response = match load_data.data {
Some(ref data) if !is_redirected_request => {
req.headers_mut().set(ContentLength(data.len() as u64));
// TODO: Do this only if load_data has some pipeline_id, and send the pipeline_id
// in the message
send_request_to_devtools(
devtools_chan.clone(), request_id.clone(), url.clone(),
method.clone(), load_data.headers.clone(),
load_data.data.clone()
);
let response;
try!(req.send(&load_data.data))
}
_ => {
if load_data.method != Method::Get && load_data.method != Method::Head {
req.headers_mut().set(ContentLength(0))
// loop trying connections in connection pool
// they may have grown stale (disconnected), in which case we'll get
// a ConnectionAborted error. this loop tries again with a new
// connection.
loop {
let mut req = try!(request_factory.create(url.clone(), method.clone()));
*req.headers_mut() = request_headers.clone();
if log_enabled!(log::LogLevel::Info) {
info!("{}", method);
for header in req.headers_mut().iter() {
info!(" - {}", header);
}
send_request_to_devtools(
devtools_chan.clone(), request_id.clone(), url.clone(),
method.clone(), load_data.headers.clone(),
None
);
try!(req.send(&None))
info!("{:?}", load_data.data);
}
};
// Avoid automatically sending request body if a redirect has occurred.
//
// TODO - This is the wrong behaviour according to the RFC. However, I'm not
// sure how much "correctness" vs. real-world is important in this case.
//
// https://tools.ietf.org/html/rfc7231#section-6.4
let is_redirected_request = iters != 1;
let maybe_response = match load_data.data {
Some(ref data) if !is_redirected_request => {
req.headers_mut().set(ContentLength(data.len() as u64));
// TODO: Do this only if load_data has some pipeline_id, and send the pipeline_id
// in the message
send_request_to_devtools(
devtools_chan.clone(), request_id.clone(), url.clone(),
method.clone(), load_data.headers.clone(),
load_data.data.clone()
);
req.send(&load_data.data)
}
_ => {
if load_data.method != Method::Get && load_data.method != Method::Head {
req.headers_mut().set(ContentLength(0))
}
send_request_to_devtools(
devtools_chan.clone(), request_id.clone(), url.clone(),
method.clone(), load_data.headers.clone(),
None
);
req.send(&None)
}
};
response = match maybe_response {
Ok(r) => r,
Err(LoadError::ConnectionAborted(reason)) => {
debug!("connection aborted ({:?}), possibly stale, trying new connection", reason);
continue;
}
Err(e) => return Err(e),
};
// if no ConnectionAborted, break the loop
break;
}
info!("got HTTP response {}, headers:", response.status());
if log_enabled!(log::LogLevel::Info) {