Auto merge of #28639 - negator:negator/async, r=jdm

Non-blocking network IO

The current networking strategy uses a fixed sized thread pool to perform blocking requests on a per call basis. This is inefficient, but can be ~easily~ improved by utilizing the existing tokio runtime instead to submit async networking tasks to its executor. However, since servo is currently using an outdated version of the `hyper` http library (`0.12`) we must use the [`tokio_compat`](https://github.com/tokio-rs/tokio-compat) and [`futures_compat`](https://docs.rs/futures/0.3.1/futures/compat/index.html) libraries to integrate with the older version of [`Future` used in `hyper`](https://docs.rs/hyper/0.12.1/hyper/rt/trait.Future.html).

~**NOTE**: This PR is just proof of concept at the moment. In addition to test failures, it appears that large javascript downloads are silently failing to stream entire payloads, and occasionally getting cutoff.~

Tests are passing.

---
<!-- Thank you for contributing to Servo! Please replace each `[ ]` by `[X]` when the step is complete, and replace `___` with appropriate data: -->
- [x] `./mach build -d` does not report any errors
- [x] `./mach test-tidy` does not report any errors
- [x] These changes fix [#22813](https://github.com/servo/servo/issues/22813) (GitHub issue number if applicable)

<!-- Either: -->
- [ ] There are tests for these changes OR
- [ ] These changes do not require tests because ___

<!-- Also, please make sure that "Allow edits from maintainers" checkbox is checked, so that we can help you if you get stuck somewhere along the way.-->

<!-- Pull requests that do not address these steps are welcome, but they will require additional verification as part of the review process. -->
This commit is contained in:
bors-servo 2022-01-01 13:14:08 -05:00 committed by GitHub
commit b06eb38f56
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 475 additions and 345 deletions

47
Cargo.lock generated
View file

@ -169,6 +169,17 @@ dependencies = [
"libloading 0.6.1",
]
[[package]]
name = "async-recursion"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7d78656ba01f1b93024b7c3a0467f1608e4be67d725749fdcd7d2c7678fd7a2"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "async-tungstenite"
version = "0.7.1"
@ -1909,6 +1920,7 @@ version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d304cff4a7b99cfb7986f7d43fbe93d175e72e704a8860787cc95e9ffd85cbd2"
dependencies = [
"futures 0.1.31",
"futures-channel",
"futures-core",
"futures-io",
@ -3534,6 +3546,7 @@ dependencies = [
"string_cache",
"thin-slice",
"time",
"tokio 0.2.21",
"url",
"uuid",
"void",
@ -3949,6 +3962,7 @@ checksum = "c44922cb3dbb1c70b5e5f443d63b64363a898564d739ba5198e3a9138442868d"
name = "net"
version = "0.0.1"
dependencies = [
"async-recursion",
"async-tungstenite",
"base64 0.10.1",
"brotli",
@ -3962,6 +3976,7 @@ dependencies = [
"flate2",
"futures 0.1.31",
"futures 0.3.5",
"futures-util",
"headers",
"http 0.1.21",
"hyper",
@ -3994,7 +4009,9 @@ dependencies = [
"time",
"tokio 0.1.22",
"tokio 0.2.21",
"tokio-compat",
"tokio-openssl 0.3.0",
"tokio-test",
"tungstenite",
"url",
"uuid",
@ -6479,11 +6496,13 @@ checksum = "d099fa27b9702bed751524694adbe393e18b36b204da91eb1cbbbbb4a5ee2d58"
dependencies = [
"bytes 0.5.5",
"fnv",
"futures-core",
"iovec",
"lazy_static",
"mio",
"num_cpus",
"pin-project-lite",
"slab",
"tokio-macros",
]
@ -6509,6 +6528,23 @@ dependencies = [
"tokio-io",
]
[[package]]
name = "tokio-compat"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "107b625135aa7b9297dd2d99ccd6ca6ab124a5d1230778e159b9095adca4c722"
dependencies = [
"futures 0.1.31",
"futures-core",
"futures-util",
"pin-project-lite",
"tokio 0.2.21",
"tokio-current-thread",
"tokio-executor",
"tokio-reactor",
"tokio-timer",
]
[[package]]
name = "tokio-current-thread"
version = "0.1.7"
@ -6626,6 +6662,17 @@ dependencies = [
"tokio-reactor",
]
[[package]]
name = "tokio-test"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed0049c119b6d505c4447f5c64873636c7af6c75ab0d45fd9f618d82acb8016d"
dependencies = [
"bytes 0.5.5",
"futures-core",
"tokio 0.2.21",
]
[[package]]
name = "tokio-threadpool"
version = "0.1.18"

View file

@ -46,6 +46,7 @@ smallvec = "1.0"
string_cache = { version = "0.8", optional = true }
thin-slice = "0.1.0"
time = { version = "0.1.41", optional = true }
tokio = "0.2"
url = { version = "2.0", optional = true }
uuid = { version = "0.8", features = ["v4"], optional = true }
void = "1.0.2"

View file

@ -949,6 +949,13 @@ impl<T> MallocSizeOf for crossbeam_channel::Sender<T> {
}
}
#[cfg(feature = "servo")]
impl<T> MallocSizeOf for tokio::sync::mpsc::UnboundedSender<T> {
fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize {
0
}
}
#[cfg(feature = "servo")]
impl MallocSizeOf for hyper::StatusCode {
fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize {

View file

@ -15,6 +15,7 @@ test = false
doctest = false
[dependencies]
async-recursion = "0.3.2"
async-tungstenite = { version = "0.7.1", features = ["tokio-openssl"] }
base64 = "0.10.1"
brotli = "3"
@ -28,6 +29,7 @@ embedder_traits = { path = "../embedder_traits" }
flate2 = "1"
futures = "0.1"
futures03 = { version = "0.3", package = "futures" }
futures-util = { version = "0.3", features = ["compat"] }
headers = "0.2"
http = "0.1"
hyper = "0.12"
@ -59,6 +61,7 @@ servo_url = { path = "../url" }
time = "0.1.41"
tokio = "0.1"
tokio2 = { version = "0.2", package = "tokio", features = ["sync", "macros", "rt-threaded"] }
tokio-compat = "0.1"
tungstenite = "0.11"
url = "2.0"
uuid = { version = "0.8", features = ["v4"] }
@ -68,6 +71,7 @@ webrender_api = { git = "https://github.com/servo/webrender" }
futures = "0.1"
std_test_override = { path = "../std_test_override" }
tokio-openssl = "0.3"
tokio-test = "0.2"
[[test]]
name = "main"

View file

@ -10,8 +10,10 @@ use crate::http_loader::{determine_requests_referrer, http_fetch, HttpState};
use crate::http_loader::{set_default_accept, set_default_accept_language};
use crate::subresource_integrity::is_response_integrity_valid;
use content_security_policy as csp;
use crossbeam_channel::{unbounded, Receiver, Sender};
use crossbeam_channel::Sender;
use devtools_traits::DevtoolsControlMsg;
use futures_util::compat::*;
use futures_util::StreamExt;
use headers::{AccessControlExposeHeaders, ContentType, HeaderMapExt, Range};
use http::header::{self, HeaderMap, HeaderName};
use hyper::Method;
@ -40,6 +42,9 @@ use std::ops::Bound;
use std::str;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use tokio2::sync::mpsc::{
unbounded_channel, UnboundedReceiver as TokioReceiver, UnboundedSender as TokioSender,
};
lazy_static! {
static ref X_CONTENT_TYPE_OPTIONS: HeaderName =
@ -48,7 +53,7 @@ lazy_static! {
pub type Target<'a> = &'a mut (dyn FetchTaskTarget + Send);
#[derive(Clone)]
#[derive(Clone, Deserialize, Serialize)]
pub enum Data {
Payload(Vec<u8>),
Done,
@ -58,8 +63,8 @@ pub enum Data {
pub struct FetchContext {
pub state: Arc<HttpState>,
pub user_agent: Cow<'static, str>,
pub devtools_chan: Option<Sender<DevtoolsControlMsg>>,
pub filemanager: FileManager,
pub devtools_chan: Option<Arc<Mutex<Sender<DevtoolsControlMsg>>>>,
pub filemanager: Arc<Mutex<FileManager>>,
pub file_token: FileTokenCheck,
pub cancellation_listener: Arc<Mutex<CancellationListener>>,
pub timing: ServoArc<Mutex<ResourceFetchTiming>>,
@ -93,10 +98,10 @@ impl CancellationListener {
}
}
}
pub type DoneChannel = Option<(Sender<Data>, Receiver<Data>)>;
pub type DoneChannel = Option<(TokioSender<Data>, TokioReceiver<Data>)>;
/// [Fetch](https://fetch.spec.whatwg.org#concept-fetch)
pub fn fetch(request: &mut Request, target: Target, context: &FetchContext) {
pub async fn fetch(request: &mut Request, target: Target<'_>, context: &FetchContext) {
// Steps 7,4 of https://w3c.github.io/resource-timing/#processing-model
// rev order okay since spec says they're equal - https://w3c.github.io/resource-timing/#dfn-starttime
context
@ -110,13 +115,13 @@ pub fn fetch(request: &mut Request, target: Target, context: &FetchContext) {
.unwrap()
.set_attribute(ResourceAttribute::StartTime(ResourceTimeValue::FetchStart));
fetch_with_cors_cache(request, &mut CorsCache::new(), target, context);
fetch_with_cors_cache(request, &mut CorsCache::new(), target, context).await;
}
pub fn fetch_with_cors_cache(
pub async fn fetch_with_cors_cache(
request: &mut Request,
cache: &mut CorsCache,
target: Target,
target: Target<'_>,
context: &FetchContext,
) {
// Step 1.
@ -150,7 +155,7 @@ pub fn fetch_with_cors_cache(
}
// Step 8.
main_fetch(request, cache, false, false, target, &mut None, &context);
main_fetch(request, cache, false, false, target, &mut None, &context).await;
}
/// https://www.w3.org/TR/CSP/#should-block-request
@ -178,12 +183,12 @@ pub fn should_request_be_blocked_by_csp(request: &Request) -> csp::CheckResult {
}
/// [Main fetch](https://fetch.spec.whatwg.org/#concept-main-fetch)
pub fn main_fetch(
pub async fn main_fetch(
request: &mut Request,
cache: &mut CorsCache,
cors_flag: bool,
recursive_flag: bool,
target: Target,
target: Target<'_>,
done_chan: &mut DoneChannel,
context: &FetchContext,
) -> Response {
@ -266,7 +271,10 @@ pub fn main_fetch(
// Not applicable: see fetch_async.
// Step 12.
let mut response = response.unwrap_or_else(|| {
let mut response = match response {
Some(res) => res,
None => {
let current_url = request.current_url();
let same_origin = if let Origin::Origin(ref origin) = request.origin {
*origin == current_url.origin()
@ -282,7 +290,7 @@ pub fn main_fetch(
request.response_tainting = ResponseTainting::Basic;
// Substep 2.
scheme_fetch(request, cache, target, done_chan, context)
scheme_fetch(request, cache, target, done_chan, context).await
} else if request.mode == RequestMode::SameOrigin {
Response::network_error(NetworkError::Internal("Cross-origin response".into()))
} else if request.mode == RequestMode::NoCors {
@ -290,7 +298,7 @@ pub fn main_fetch(
request.response_tainting = ResponseTainting::Opaque;
// Substep 2.
scheme_fetch(request, cache, target, done_chan, context)
scheme_fetch(request, cache, target, done_chan, context).await
} else if !matches!(current_url.scheme(), "http" | "https") {
Response::network_error(NetworkError::Internal("Non-http scheme".into()))
} else if request.use_cors_preflight ||
@ -305,7 +313,8 @@ pub fn main_fetch(
// Substep 2.
let response = http_fetch(
request, cache, true, true, false, target, done_chan, context,
);
)
.await;
// Substep 3.
if response.is_network_error() {
// TODO clear cache entries using request
@ -319,8 +328,10 @@ pub fn main_fetch(
http_fetch(
request, cache, true, false, false, target, done_chan, context,
)
.await
}
});
},
};
// Step 13.
if recursive_flag {
@ -441,7 +452,7 @@ pub fn main_fetch(
let mut response_loaded = false;
let mut response = if !response.is_network_error() && !request.integrity_metadata.is_empty() {
// Step 19.1.
wait_for_response(&mut response, target, done_chan);
wait_for_response(&mut response, target, done_chan).await;
response_loaded = true;
// Step 19.2.
@ -465,7 +476,7 @@ pub fn main_fetch(
// by sync fetch, but we overload it here for simplicity
target.process_response(&mut response);
if !response_loaded {
wait_for_response(&mut response, target, done_chan);
wait_for_response(&mut response, target, done_chan).await;
}
// overloaded similarly to process_response
target.process_response_eof(&response);
@ -487,7 +498,7 @@ pub fn main_fetch(
// Step 23.
if !response_loaded {
wait_for_response(&mut response, target, done_chan);
wait_for_response(&mut response, target, done_chan).await;
}
// Step 24.
@ -502,22 +513,25 @@ pub fn main_fetch(
response
}
fn wait_for_response(response: &mut Response, target: Target, done_chan: &mut DoneChannel) {
if let Some(ref ch) = *done_chan {
async fn wait_for_response(
response: &mut Response,
target: Target<'_>,
done_chan: &mut DoneChannel,
) {
if let Some(ref mut ch) = *done_chan {
loop {
match ch
.1
.recv()
.expect("fetch worker should always send Done before terminating")
{
Data::Payload(vec) => {
match ch.1.recv().await {
Some(Data::Payload(vec)) => {
target.process_response_chunk(vec);
},
Data::Done => break,
Data::Cancelled => {
Some(Data::Done) => break,
Some(Data::Cancelled) => {
response.aborted.store(true, Ordering::Release);
break;
},
_ => {
panic!("fetch worker should always send Done before terminating");
},
}
}
} else {
@ -613,10 +627,10 @@ fn create_blank_reply(url: ServoUrl, timing_type: ResourceTimingType) -> Respons
}
/// [Scheme fetch](https://fetch.spec.whatwg.org#scheme-fetch)
fn scheme_fetch(
async fn scheme_fetch(
request: &mut Request,
cache: &mut CorsCache,
target: Target,
target: Target<'_>,
done_chan: &mut DoneChannel,
context: &FetchContext,
) -> Response {
@ -628,6 +642,7 @@ fn scheme_fetch(
"chrome" if url.path() == "allowcert" => {
let data = request.body.as_mut().and_then(|body| {
let stream = body.take_stream();
let stream = stream.lock().unwrap();
let (body_chan, body_port) = ipc::channel().unwrap();
let _ = stream.send(BodyChunkRequest::Connect(body_chan));
let _ = stream.send(BodyChunkRequest::Chunk);
@ -653,9 +668,12 @@ fn scheme_fetch(
create_blank_reply(url, request.timing_type())
},
"http" | "https" => http_fetch(
"http" | "https" => {
http_fetch(
request, cache, false, false, false, target, done_chan, context,
),
)
.await
},
"data" => match decode(&url) {
Ok((mime, bytes)) => {
@ -726,12 +744,13 @@ fn scheme_fetch(
// Setup channel to receive cross-thread messages about the file fetch
// operation.
let (done_sender, done_receiver) = unbounded();
let (mut done_sender, done_receiver) = unbounded_channel();
*done_chan = Some((done_sender.clone(), done_receiver));
*response.body.lock().unwrap() = ResponseBody::Receiving(vec![]);
context.filemanager.fetch_file_in_chunks(
done_sender,
context.filemanager.lock().unwrap().fetch_file_in_chunks(
&mut done_sender,
reader,
response.body.clone(),
context.cancellation_listener.clone(),
@ -781,12 +800,12 @@ fn scheme_fetch(
partial_content(&mut response);
}
let (done_sender, done_receiver) = unbounded();
let (mut done_sender, done_receiver) = unbounded_channel();
*done_chan = Some((done_sender.clone(), done_receiver));
*response.body.lock().unwrap() = ResponseBody::Receiving(vec![]);
if let Err(err) = context.filemanager.fetch_file(
&done_sender,
if let Err(err) = context.filemanager.lock().unwrap().fetch_file(
&mut done_sender,
context.cancellation_listener.clone(),
id,
&context.file_token,

View file

@ -4,7 +4,6 @@
use crate::fetch::methods::{CancellationListener, Data, RangeRequestBounds};
use crate::resource_thread::CoreResourceThreadPool;
use crossbeam_channel::Sender;
use embedder_traits::{EmbedderMsg, EmbedderProxy, FilterPattern};
use headers::{ContentLength, ContentType, HeaderMap, HeaderMapExt};
use http::header::{self, HeaderValue};
@ -28,6 +27,7 @@ use std::ops::Index;
use std::path::{Path, PathBuf};
use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock, Weak};
use tokio2::sync::mpsc::UnboundedSender as TokioSender;
use url::Url;
use uuid::Uuid;
@ -127,7 +127,7 @@ impl FileManager {
// in a separate thread.
pub fn fetch_file(
&self,
done_sender: &Sender<Data>,
done_sender: &mut TokioSender<Data>,
cancellation_listener: Arc<Mutex<CancellationListener>>,
id: Uuid,
file_token: &FileTokenCheck,
@ -210,12 +210,13 @@ impl FileManager {
pub fn fetch_file_in_chunks(
&self,
done_sender: Sender<Data>,
done_sender: &mut TokioSender<Data>,
mut reader: BufReader<File>,
res_body: ServoArc<Mutex<ResponseBody>>,
cancellation_listener: Arc<Mutex<CancellationListener>>,
range: RelativePos,
) {
let done_sender = done_sender.clone();
self.thread_pool
.upgrade()
.and_then(|pool| {
@ -282,7 +283,7 @@ impl FileManager {
fn fetch_blob_buf(
&self,
done_sender: &Sender<Data>,
done_sender: &mut TokioSender<Data>,
cancellation_listener: Arc<Mutex<CancellationListener>>,
id: &Uuid,
file_token: &FileTokenCheck,
@ -358,7 +359,7 @@ impl FileManager {
);
self.fetch_file_in_chunks(
done_sender.clone(),
&mut done_sender.clone(),
reader,
response.body.clone(),
cancellation_listener,

View file

@ -8,7 +8,6 @@
//! and <http://tools.ietf.org/html/rfc7232>.
use crate::fetch::methods::{Data, DoneChannel};
use crossbeam_channel::{unbounded, Sender};
use headers::{
CacheControl, ContentRange, Expires, HeaderMapExt, LastModified, Pragma, Range, Vary,
};
@ -30,6 +29,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::time::SystemTime;
use time::{Duration, Timespec, Tm};
use tokio2::sync::mpsc::{unbounded_channel as unbounded, UnboundedSender as TokioSender};
/// The key used to differentiate requests in the cache.
#[derive(Clone, Eq, Hash, MallocSizeOf, PartialEq)]
@ -58,7 +58,7 @@ struct CachedResource {
request_headers: Arc<Mutex<HeaderMap>>,
body: Arc<Mutex<ResponseBody>>,
aborted: Arc<AtomicBool>,
awaiting_body: Arc<Mutex<Vec<Sender<Data>>>>,
awaiting_body: Arc<Mutex<Vec<TokioSender<Data>>>>,
data: Measurable<MeasurableCachedResource>,
}

View file

@ -11,11 +11,13 @@ use crate::fetch::methods::{main_fetch, Data, DoneChannel, FetchContext, Target}
use crate::hsts::HstsList;
use crate::http_cache::{CacheKey, HttpCache};
use crate::resource_thread::AuthCache;
use async_recursion::async_recursion;
use crossbeam_channel::{unbounded, Receiver, Sender};
use devtools_traits::{
ChromeToDevtoolsControlMsg, DevtoolsControlMsg, HttpRequest as DevtoolsHttpRequest,
};
use devtools_traits::{HttpResponse as DevtoolsHttpResponse, NetworkEvent};
use futures_util::compat::*;
use headers::authorization::Basic;
use headers::{AccessControlAllowCredentials, AccessControlAllowHeaders, HeaderMapExt};
use headers::{
@ -64,11 +66,13 @@ use std::sync::{Arc as StdArc, Condvar, Mutex, RwLock};
use std::time::{Duration, SystemTime};
use time::{self, Tm};
use tokio::prelude::{future, Future, Sink, Stream};
use tokio::runtime::Runtime;
use tokio::sync::mpsc::{channel, Receiver as TokioReceiver, Sender as TokioSender};
use tokio2::sync::mpsc::{unbounded_channel, UnboundedSender as Tokio02Sender};
use tokio_compat::runtime::{Builder, Runtime};
lazy_static! {
pub static ref HANDLE: Mutex<Option<Runtime>> = Mutex::new(Some(Runtime::new().unwrap()));
pub static ref HANDLE: Mutex<Option<Runtime>> =
Mutex::new(Some(Builder::new().build().unwrap()));
}
/// The various states an entry of the HttpCache can be in.
@ -491,25 +495,21 @@ impl BodySink {
}
}
fn obtain_response(
async fn obtain_response(
client: &Client<Connector, Body>,
url: &ServoUrl,
method: &Method,
request_headers: &mut HeaderMap,
body: Option<IpcSender<BodyChunkRequest>>,
body: Option<StdArc<Mutex<IpcSender<BodyChunkRequest>>>>,
source_is_null: bool,
pipeline_id: &Option<PipelineId>,
request_id: Option<&str>,
is_xhr: bool,
context: &FetchContext,
fetch_terminated: Sender<bool>,
) -> Box<
dyn Future<
Item = (HyperResponse<Decoder>, Option<ChromeToDevtoolsControlMsg>),
Error = NetworkError,
>,
> {
let headers = request_headers.clone();
fetch_terminated: Tokio02Sender<bool>,
) -> Result<(HyperResponse<Decoder>, Option<ChromeToDevtoolsControlMsg>), NetworkError> {
{
let mut headers = request_headers.clone();
let devtools_bytes = StdArc::new(Mutex::new(vec![]));
@ -523,10 +523,10 @@ fn obtain_response(
.replace("}", "%7D");
let request = if let Some(chunk_requester) = body {
let (sink, stream) = if source_is_null {
let (mut sink, stream) = if source_is_null {
// Step 4.2 of https://fetch.spec.whatwg.org/#concept-http-network-fetch
// TODO: this should not be set for HTTP/2(currently not supported?).
request_headers.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
headers.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
let (sender, receiver) = channel(1);
(BodySink::Chunked(sender), BodyStream::Chunked(receiver))
@ -543,13 +543,16 @@ fn obtain_response(
let (body_chan, body_port) = ipc::channel().unwrap();
let _ = chunk_requester.send(BodyChunkRequest::Connect(body_chan));
if let Ok(requester) = chunk_requester.lock() {
let _ = requester.send(BodyChunkRequest::Connect(body_chan));
// https://fetch.spec.whatwg.org/#concept-request-transmit-body
// Request the first chunk, corresponding to Step 3 and 4.
let _ = chunk_requester.send(BodyChunkRequest::Chunk);
let _ = requester.send(BodyChunkRequest::Chunk);
}
let devtools_bytes = devtools_bytes.clone();
let chunk_requester2 = chunk_requester.clone();
ROUTER.add_route(
body_port.to_opaque(),
@ -560,6 +563,7 @@ fn obtain_response(
// Step 3, abort these parallel steps.
let _ = fetch_terminated.send(false);
sink.close();
return;
},
BodyChunkResponse::Error => {
@ -568,6 +572,7 @@ fn obtain_response(
// where step 5 requires setting an `aborted` flag on the fetch.
let _ = fetch_terminated.send(true);
sink.close();
return;
},
};
@ -580,7 +585,10 @@ fn obtain_response(
// Step 5.1.2.3
// Request the next chunk.
let _ = chunk_requester.send(BodyChunkRequest::Chunk);
let _ = chunk_requester2
.lock()
.unwrap()
.send(BodyChunkRequest::Chunk);
}),
);
@ -641,7 +649,7 @@ fn obtain_response(
let mut request = match request {
Ok(request) => request,
Err(e) => return Box::new(future::result(Err(NetworkError::from_http_error(&e)))),
Err(e) => return Err(NetworkError::from_http_error(&e)),
};
*request.headers_mut() = headers.clone();
@ -664,7 +672,7 @@ fn obtain_response(
let connection_certs_clone = context.state.connection_certs.clone();
let headers = headers.clone();
Box::new(
client
.request(request)
.and_then(move |res| {
@ -705,18 +713,21 @@ fn obtain_response(
})
.map_err(move |e| {
NetworkError::from_hyper_error(&e, connection_certs_clone.remove(host_clone))
}),
)
})
.compat() // convert from Future01 to Future03
.await
}
}
/// [HTTP fetch](https://fetch.spec.whatwg.org#http-fetch)
pub fn http_fetch(
#[async_recursion]
pub async fn http_fetch(
request: &mut Request,
cache: &mut CorsCache,
cors_flag: bool,
cors_preflight_flag: bool,
authentication_fetch_flag: bool,
target: Target,
target: Target<'async_recursion>,
done_chan: &mut DoneChannel,
context: &FetchContext,
) -> Response {
@ -771,7 +782,7 @@ pub fn http_fetch(
// Sub-substep 1
if method_mismatch || header_mismatch {
let preflight_result = cors_preflight_fetch(&request, cache, context);
let preflight_result = cors_preflight_fetch(&request, cache, context).await;
// Sub-substep 2
if let Some(e) = preflight_result.get_network_error() {
return Response::network_error(e.clone());
@ -799,7 +810,8 @@ pub fn http_fetch(
cors_flag,
done_chan,
context,
);
)
.await;
// Substep 4
if cors_flag && cors_check(&request, &fetch_result).is_err() {
@ -865,6 +877,7 @@ pub fn http_fetch(
http_redirect_fetch(
request, cache, response, cors_flag, target, done_chan, context,
)
.await
},
};
}
@ -907,12 +920,13 @@ impl Drop for RedirectEndTimer {
}
/// [HTTP redirect fetch](https://fetch.spec.whatwg.org#http-redirect-fetch)
pub fn http_redirect_fetch(
#[async_recursion]
pub async fn http_redirect_fetch(
request: &mut Request,
cache: &mut CorsCache,
response: Response,
cors_flag: bool,
target: Target,
target: Target<'async_recursion>,
done_chan: &mut DoneChannel,
context: &FetchContext,
) -> Response {
@ -1071,7 +1085,8 @@ pub fn http_redirect_fetch(
target,
done_chan,
context,
);
)
.await;
// TODO: timing allow check
context
@ -1100,7 +1115,8 @@ fn try_immutable_origin_to_hyper_origin(url_origin: &ImmutableOrigin) -> Option<
}
/// [HTTP network or cache fetch](https://fetch.spec.whatwg.org#http-network-or-cache-fetch)
fn http_network_or_cache_fetch(
#[async_recursion]
async fn http_network_or_cache_fetch(
request: &mut Request,
authentication_fetch_flag: bool,
cors_flag: bool,
@ -1398,26 +1414,27 @@ fn http_network_or_cache_fetch(
}
}
fn wait_for_cached_response(done_chan: &mut DoneChannel, response: &mut Option<Response>) {
if let Some(ref ch) = *done_chan {
async fn wait_for_cached_response(
done_chan: &mut DoneChannel,
response: &mut Option<Response>,
) {
if let Some(ref mut ch) = *done_chan {
// The cache constructed a response with a body of ResponseBody::Receiving.
// We wait for the response in the cache to "finish",
// with a body of either Done or Cancelled.
assert!(response.is_some());
loop {
match ch
.1
.recv()
.expect("HTTP cache should always send Done or Cancelled")
{
Data::Payload(_) => {},
Data::Done => break, // Return the full response as if it was initially cached as such.
Data::Cancelled => {
match ch.1.recv().await {
Some(Data::Payload(_)) => {},
Some(Data::Done) => break, // Return the full response as if it was initially cached as such.
Some(Data::Cancelled) => {
// The response was cancelled while the fetch was ongoing.
// Set response to None, which will trigger a network fetch below.
*response = None;
break;
},
_ => panic!("HTTP cache should always send Done or Cancelled"),
}
}
}
@ -1425,7 +1442,7 @@ fn http_network_or_cache_fetch(
*done_chan = None;
}
wait_for_cached_response(done_chan, &mut response);
wait_for_cached_response(done_chan, &mut response).await;
// Step 6
// TODO: https://infra.spec.whatwg.org/#if-aborted
@ -1446,7 +1463,7 @@ fn http_network_or_cache_fetch(
if response.is_none() {
// Substep 2
let forward_response =
http_network_fetch(http_request, credentials_flag, done_chan, context);
http_network_fetch(http_request, credentials_flag, done_chan, context).await;
// Substep 3
if let Some((200..=399, _)) = forward_response.raw_status {
if !http_request.method.is_safe() {
@ -1467,8 +1484,8 @@ fn http_network_or_cache_fetch(
// since the network response will be replaced by the revalidated stored one.
*done_chan = None;
response = http_cache.refresh(&http_request, forward_response.clone(), done_chan);
wait_for_cached_response(done_chan, &mut response);
}
wait_for_cached_response(done_chan, &mut response).await;
}
// Substep 5
@ -1596,7 +1613,8 @@ fn http_network_or_cache_fetch(
cors_flag,
done_chan,
context,
);
)
.await;
}
// Step 11
@ -1655,7 +1673,7 @@ impl Drop for ResponseEndTimer {
}
/// [HTTP network fetch](https://fetch.spec.whatwg.org/#http-network-fetch)
fn http_network_fetch(
async fn http_network_fetch(
request: &mut Request,
credentials_flag: bool,
done_chan: &mut DoneChannel,
@ -1686,7 +1704,7 @@ fn http_network_fetch(
if log_enabled!(log::Level::Info) {
info!("{:?} request for {}", request.method, url);
for header in request.headers.iter() {
info!(" - {:?}", header);
debug!(" - {:?}", header);
}
}
@ -1696,7 +1714,7 @@ fn http_network_fetch(
let is_xhr = request.destination == Destination::None;
// The receiver will receive true if there has been an error streaming the request body.
let (fetch_terminated_sender, fetch_terminated_receiver) = unbounded();
let (fetch_terminated_sender, mut fetch_terminated_receiver) = unbounded_channel();
let body = request.body.as_ref().map(|body| body.take_stream());
@ -1728,32 +1746,28 @@ fn http_network_fetch(
let pipeline_id = request.pipeline_id;
// This will only get the headers, the body is read later
let (res, msg) = match response_future.wait() {
let (res, msg) = match response_future.await {
Ok(wrapped_response) => wrapped_response,
Err(error) => return Response::network_error(error),
};
if log_enabled!(log::Level::Info) {
debug!("{:?} response for {}", res.version(), url);
for header in res.headers().iter() {
debug!(" - {:?}", header);
}
}
// Check if there was an error while streaming the request body.
//
// It's ok to block on the receiver,
// since we're already blocking on the response future above,
// so we can be sure that the request has already been processed,
// and a message is in the channel(or soon will be).
match fetch_terminated_receiver.recv() {
Ok(true) => {
match fetch_terminated_receiver.recv().await {
Some(true) => {
return Response::network_error(NetworkError::Internal(
"Request body streaming failed.".into(),
));
},
Ok(false) => {},
Err(_) => warn!("Failed to receive confirmation request was streamed without error."),
}
if log_enabled!(log::Level::Info) {
info!("{:?} response for {}", res.version(), url);
for header in res.headers().iter() {
info!(" - {:?}", header);
}
Some(false) => {},
_ => warn!("Failed to receive confirmation request was streamed without error."),
}
let header_strings: Vec<&str> = res
@ -1791,7 +1805,7 @@ fn http_network_fetch(
res.status(),
res.status().canonical_reason().unwrap_or("").into(),
));
debug!("got {:?} response for {:?}", res.status(), request.url());
info!("got {:?} response for {:?}", res.status(), request.url());
response.raw_status = Some((
res.status().as_u16(),
res.status().canonical_reason().unwrap_or("").into(),
@ -1803,7 +1817,7 @@ fn http_network_fetch(
let res_body = response.body.clone();
// We're about to spawn a future to be waited on here
let (done_sender, done_receiver) = unbounded();
let (done_sender, done_receiver) = unbounded_channel();
*done_chan = Some((done_sender.clone(), done_receiver));
let meta = match response
.metadata()
@ -1825,6 +1839,7 @@ fn http_network_fetch(
let res_body2 = res_body.clone();
if let Some(ref sender) = devtools_sender {
let sender = sender.lock().unwrap();
if let Some(m) = msg {
send_request_to_devtools(m, &sender);
}
@ -1848,21 +1863,22 @@ fn http_network_fetch(
let timing_ptr3 = context.timing.clone();
let url1 = request.url();
let url2 = url1.clone();
HANDLE.lock().unwrap().as_mut().unwrap().spawn(
HANDLE.lock().unwrap().as_ref().unwrap().spawn(
res.into_body()
.map_err(|_| ())
.fold(res_body, move |res_body, chunk| {
if cancellation_listener.lock().unwrap().cancelled() {
*res_body.lock().unwrap() = ResponseBody::Done(vec![]);
let _ = done_sender.send(Data::Cancelled);
return future::failed(());
return tokio::prelude::future::failed(());
}
if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() {
let bytes = chunk.into_bytes();
body.extend_from_slice(&*bytes);
let _ = done_sender.send(Data::Payload(bytes.to_vec()));
}
future::ok(res_body)
tokio::prelude::future::ok(res_body)
})
.and_then(move |res_body| {
debug!("successfully finished response for {:?}", url1);
@ -1877,10 +1893,10 @@ fn http_network_fetch(
.unwrap()
.set_attribute(ResourceAttribute::ResponseEnd);
let _ = done_sender2.send(Data::Done);
future::ok(())
tokio::prelude::future::ok(())
})
.map_err(move |_| {
debug!("finished response for {:?} with error", url2);
warn!("finished response for {:?} with error", url2);
let mut body = res_body2.lock().unwrap();
let completed_body = match *body {
ResponseBody::Receiving(ref mut body) => mem::replace(body, vec![]),
@ -1956,7 +1972,7 @@ fn http_network_fetch(
}
/// [CORS preflight fetch](https://fetch.spec.whatwg.org#cors-preflight-fetch)
fn cors_preflight_fetch(
async fn cors_preflight_fetch(
request: &Request,
cache: &mut CorsCache,
context: &FetchContext,
@ -2000,7 +2016,8 @@ fn cors_preflight_fetch(
}
// Step 6
let response = http_network_or_cache_fetch(&mut preflight, false, false, &mut None, context);
let response =
http_network_or_cache_fetch(&mut preflight, false, false, &mut None, context).await;
// Step 7
if cors_check(&request, &response).is_ok() &&
response

View file

@ -680,7 +680,12 @@ impl CoreResourceManager {
_ => (FileTokenCheck::NotRequired, None),
};
self.thread_pool.spawn(move || {
HANDLE
.lock()
.unwrap()
.as_ref()
.unwrap()
.spawn_std(async move {
// XXXManishearth: Check origin against pipeline id (also ensure that the mode is allowed)
// todo load context / mimesniff in fetch
// todo referrer policy?
@ -688,11 +693,15 @@ impl CoreResourceManager {
let context = FetchContext {
state: http_state,
user_agent: ua,
devtools_chan: dc,
filemanager: filemanager,
devtools_chan: dc.map(|dc| Arc::new(Mutex::new(dc))),
filemanager: Arc::new(Mutex::new(filemanager)),
file_token,
cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(cancel_chan))),
timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new(request.timing_type()))),
cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(
cancel_chan,
))),
timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new(
request.timing_type(),
))),
};
match res_init_ {
@ -706,15 +715,20 @@ impl CoreResourceManager {
&mut sender,
&mut None,
&context,
);
)
.await;
},
None => {
fetch(&mut request, &mut sender, &context).await;
},
None => fetch(&mut request, &mut sender, &context),
};
// Remove token after fetch.
if let Some(id) = blob_url_file_id.as_ref() {
context
.filemanager
.lock()
.unwrap()
.invalidate_token(&context.file_token, id);
}
});

View file

@ -48,6 +48,7 @@ use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, SystemTime};
use tokio_test::block_on;
use uuid::Uuid;
// TODO write a struct that impls Handler for storing test values
@ -191,9 +192,12 @@ fn test_fetch_blob() {
let origin = ServoUrl::parse("http://www.example.org/").unwrap();
let id = Uuid::new_v4();
context
.filemanager
.promote_memory(id.clone(), blob_buf, true, "http://www.example.org".into());
context.filemanager.lock().unwrap().promote_memory(
id.clone(),
blob_buf,
true,
"http://www.example.org".into(),
);
let url = ServoUrl::parse(&format!("blob:{}{}", origin.as_str(), id.to_simple())).unwrap();
let mut request = Request::new(
@ -212,7 +216,7 @@ fn test_fetch_blob() {
expected: bytes.to_vec(),
};
methods::fetch(&mut request, &mut target, &context);
block_on(methods::fetch(&mut request, &mut target, &context));
let fetch_response = receiver.recv().unwrap();
assert!(!fetch_response.is_network_error());
@ -772,7 +776,10 @@ fn test_fetch_with_hsts() {
state: Arc::new(HttpState::new(tls_config)),
user_agent: DEFAULT_USER_AGENT.into(),
devtools_chan: None,
filemanager: FileManager::new(create_embedder_proxy(), Weak::new()),
filemanager: Arc::new(Mutex::new(FileManager::new(
create_embedder_proxy(),
Weak::new(),
))),
file_token: FileTokenCheck::NotRequired,
cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(None))),
timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new(
@ -835,7 +842,10 @@ fn test_load_adds_host_to_hsts_list_when_url_is_https() {
state: Arc::new(HttpState::new(tls_config)),
user_agent: DEFAULT_USER_AGENT.into(),
devtools_chan: None,
filemanager: FileManager::new(create_embedder_proxy(), Weak::new()),
filemanager: Arc::new(Mutex::new(FileManager::new(
create_embedder_proxy(),
Weak::new(),
))),
file_token: FileTokenCheck::NotRequired,
cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(None))),
timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new(
@ -900,7 +910,10 @@ fn test_fetch_self_signed() {
state: Arc::new(HttpState::new(tls_config)),
user_agent: DEFAULT_USER_AGENT.into(),
devtools_chan: None,
filemanager: FileManager::new(create_embedder_proxy(), Weak::new()),
filemanager: Arc::new(Mutex::new(FileManager::new(
create_embedder_proxy(),
Weak::new(),
))),
file_token: FileTokenCheck::NotRequired,
cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(None))),
timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new(

View file

@ -2,7 +2,6 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use crossbeam_channel::unbounded;
use http::header::{HeaderValue, EXPIRES};
use http::StatusCode;
use msg::constellation_msg::TEST_PIPELINE_ID;
@ -11,6 +10,7 @@ use net_traits::request::{Origin, Referrer, Request};
use net_traits::response::{HttpsState, Response, ResponseBody};
use net_traits::{ResourceFetchTiming, ResourceTimingType};
use servo_url::ServoUrl;
use tokio2::sync::mpsc::unbounded_channel as unbounded;
#[test]
fn test_refreshing_resource_sets_done_chan_the_appropriate_value() {
@ -40,7 +40,8 @@ fn test_refreshing_resource_sets_done_chan_the_appropriate_value() {
cache.store(&request, &response);
// Second, mutate the response into a 304 response, and refresh the stored one.
response.status = Some((StatusCode::NOT_MODIFIED, String::from("304")));
let mut done_chan = Some(unbounded());
let (send, recv) = unbounded();
let mut done_chan = Some((send, recv));
let refreshed_response = cache.refresh(&request, response.clone(), &mut done_chan);
// Ensure a resource was found, and refreshed.
assert!(refreshed_response.is_some());

View file

@ -50,6 +50,7 @@ use tokio::net::TcpListener;
use tokio::reactor::Handle;
use tokio::runtime::Runtime;
use tokio_openssl::SslAcceptorExt;
use tokio_test::block_on;
lazy_static! {
pub static ref HANDLE: Mutex<Runtime> = Mutex::new(Runtime::new().unwrap());
@ -103,8 +104,11 @@ fn new_fetch_context(
FetchContext {
state: Arc::new(HttpState::new(tls_config)),
user_agent: DEFAULT_USER_AGENT.into(),
devtools_chan: dc,
filemanager: FileManager::new(sender, pool_handle.unwrap_or_else(|| Weak::new())),
devtools_chan: dc.map(|dc| Arc::new(Mutex::new(dc))),
filemanager: Arc::new(Mutex::new(FileManager::new(
sender,
pool_handle.unwrap_or_else(|| Weak::new()),
))),
file_token: FileTokenCheck::NotRequired,
cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(None))),
timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new(
@ -131,7 +135,7 @@ fn fetch_with_context(request: &mut Request, mut context: &mut FetchContext) ->
let (sender, receiver) = unbounded();
let mut target = FetchResponseCollector { sender: sender };
methods::fetch(request, &mut target, &mut context);
block_on(methods::fetch(request, &mut target, &mut context));
receiver.recv().unwrap()
}
@ -140,12 +144,12 @@ fn fetch_with_cors_cache(request: &mut Request, cache: &mut CorsCache) -> Respon
let (sender, receiver) = unbounded();
let mut target = FetchResponseCollector { sender: sender };
methods::fetch_with_cors_cache(
block_on(methods::fetch_with_cors_cache(
request,
cache,
&mut target,
&mut new_fetch_context(None, None, None),
);
));
receiver.recv().unwrap()
}

View file

@ -13,6 +13,7 @@ use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
use mime::Mime;
use msg::constellation_msg::PipelineId;
use servo_url::{ImmutableOrigin, ServoUrl};
use std::sync::{Arc, Mutex};
/// An [initiator](https://fetch.spec.whatwg.org/#concept-request-initiator)
#[derive(Clone, Copy, Debug, Deserialize, MallocSizeOf, PartialEq, Serialize)]
@ -163,7 +164,7 @@ pub enum BodyChunkRequest {
pub struct RequestBody {
/// Net's channel to communicate with script re this body.
#[ignore_malloc_size_of = "Channels are hard"]
chan: IpcSender<BodyChunkRequest>,
chan: Arc<Mutex<IpcSender<BodyChunkRequest>>>,
/// <https://fetch.spec.whatwg.org/#concept-body-source>
source: BodySource,
/// <https://fetch.spec.whatwg.org/#concept-body-total-bytes>
@ -177,7 +178,7 @@ impl RequestBody {
total_bytes: Option<usize>,
) -> Self {
RequestBody {
chan,
chan: Arc::new(Mutex::new(chan)),
source,
total_bytes,
}
@ -189,13 +190,14 @@ impl RequestBody {
BodySource::Null => panic!("Null sources should never be re-directed."),
BodySource::Object => {
let (chan, port) = ipc::channel().unwrap();
let _ = self.chan.send(BodyChunkRequest::Extract(port));
self.chan = chan.clone();
let mut selfchan = self.chan.lock().unwrap();
let _ = selfchan.send(BodyChunkRequest::Extract(port));
*selfchan = chan;
},
}
}
pub fn take_stream(&self) -> IpcSender<BodyChunkRequest> {
pub fn take_stream(&self) -> Arc<Mutex<IpcSender<BodyChunkRequest>>> {
self.chan.clone()
}