From 903e0cd8570353b0ecef33d094464166869b9eb3 Mon Sep 17 00:00:00 2001 From: Naveen Gattu Date: Sun, 5 Dec 2021 23:14:49 -0800 Subject: [PATCH] Non-blocking network IO --- Cargo.lock | 47 +++ components/malloc_size_of/Cargo.toml | 1 + components/malloc_size_of/lib.rs | 7 + components/net/Cargo.toml | 4 + components/net/fetch/methods.rs | 193 +++++++------ components/net/filemanager_thread.rs | 11 +- components/net/http_cache.rs | 4 +- components/net/http_loader.rs | 409 ++++++++++++++------------- components/net/resource_thread.rs | 86 +++--- components/net/tests/fetch.rs | 27 +- components/net/tests/http_cache.rs | 5 +- components/net/tests/main.rs | 14 +- components/net_traits/request.rs | 12 +- 13 files changed, 475 insertions(+), 345 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 85b3f55a297..39d01c15ee1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -169,6 +169,17 @@ dependencies = [ "libloading 0.6.1", ] +[[package]] +name = "async-recursion" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7d78656ba01f1b93024b7c3a0467f1608e4be67d725749fdcd7d2c7678fd7a2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-tungstenite" version = "0.7.1" @@ -1909,6 +1920,7 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d304cff4a7b99cfb7986f7d43fbe93d175e72e704a8860787cc95e9ffd85cbd2" dependencies = [ + "futures 0.1.31", "futures-channel", "futures-core", "futures-io", @@ -3534,6 +3546,7 @@ dependencies = [ "string_cache", "thin-slice", "time", + "tokio 0.2.21", "url", "uuid", "void", @@ -3949,6 +3962,7 @@ checksum = "c44922cb3dbb1c70b5e5f443d63b64363a898564d739ba5198e3a9138442868d" name = "net" version = "0.0.1" dependencies = [ + "async-recursion", "async-tungstenite", "base64 0.10.1", "brotli", @@ -3962,6 +3976,7 @@ dependencies = [ "flate2", "futures 0.1.31", "futures 0.3.5", + "futures-util", "headers", "http 0.1.21", "hyper", @@ -3994,7 +4009,9 @@ dependencies = [ "time", "tokio 0.1.22", "tokio 0.2.21", + "tokio-compat", "tokio-openssl 0.3.0", + "tokio-test", "tungstenite", "url", "uuid", @@ -6479,11 +6496,13 @@ checksum = "d099fa27b9702bed751524694adbe393e18b36b204da91eb1cbbbbb4a5ee2d58" dependencies = [ "bytes 0.5.5", "fnv", + "futures-core", "iovec", "lazy_static", "mio", "num_cpus", "pin-project-lite", + "slab", "tokio-macros", ] @@ -6509,6 +6528,23 @@ dependencies = [ "tokio-io", ] +[[package]] +name = "tokio-compat" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "107b625135aa7b9297dd2d99ccd6ca6ab124a5d1230778e159b9095adca4c722" +dependencies = [ + "futures 0.1.31", + "futures-core", + "futures-util", + "pin-project-lite", + "tokio 0.2.21", + "tokio-current-thread", + "tokio-executor", + "tokio-reactor", + "tokio-timer", +] + [[package]] name = "tokio-current-thread" version = "0.1.7" @@ -6626,6 +6662,17 @@ dependencies = [ "tokio-reactor", ] +[[package]] +name = "tokio-test" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed0049c119b6d505c4447f5c64873636c7af6c75ab0d45fd9f618d82acb8016d" +dependencies = [ + "bytes 0.5.5", + "futures-core", + "tokio 0.2.21", +] + [[package]] name = "tokio-threadpool" version = "0.1.18" diff --git a/components/malloc_size_of/Cargo.toml b/components/malloc_size_of/Cargo.toml index 8ee174d013f..040d377aeb6 100644 --- a/components/malloc_size_of/Cargo.toml +++ b/components/malloc_size_of/Cargo.toml @@ -46,6 +46,7 @@ smallvec = "1.0" string_cache = { version = "0.8", optional = true } thin-slice = "0.1.0" time = { version = "0.1.41", optional = true } +tokio = "0.2" url = { version = "2.0", optional = true } uuid = { version = "0.8", features = ["v4"], optional = true } void = "1.0.2" diff --git a/components/malloc_size_of/lib.rs b/components/malloc_size_of/lib.rs index 040d88ec6eb..046b6d2fc5d 100644 --- a/components/malloc_size_of/lib.rs +++ b/components/malloc_size_of/lib.rs @@ -949,6 +949,13 @@ impl MallocSizeOf for crossbeam_channel::Sender { } } +#[cfg(feature = "servo")] +impl MallocSizeOf for tokio::sync::mpsc::UnboundedSender { + fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize { + 0 + } +} + #[cfg(feature = "servo")] impl MallocSizeOf for hyper::StatusCode { fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize { diff --git a/components/net/Cargo.toml b/components/net/Cargo.toml index 66678844bb2..dbdeffbbd97 100644 --- a/components/net/Cargo.toml +++ b/components/net/Cargo.toml @@ -15,6 +15,7 @@ test = false doctest = false [dependencies] +async-recursion = "0.3.2" async-tungstenite = { version = "0.7.1", features = ["tokio-openssl"] } base64 = "0.10.1" brotli = "3" @@ -28,6 +29,7 @@ embedder_traits = { path = "../embedder_traits" } flate2 = "1" futures = "0.1" futures03 = { version = "0.3", package = "futures" } +futures-util = { version = "0.3", features = ["compat"] } headers = "0.2" http = "0.1" hyper = "0.12" @@ -59,6 +61,7 @@ servo_url = { path = "../url" } time = "0.1.41" tokio = "0.1" tokio2 = { version = "0.2", package = "tokio", features = ["sync", "macros", "rt-threaded"] } +tokio-compat = "0.1" tungstenite = "0.11" url = "2.0" uuid = { version = "0.8", features = ["v4"] } @@ -68,6 +71,7 @@ webrender_api = { git = "https://github.com/servo/webrender" } futures = "0.1" std_test_override = { path = "../std_test_override" } tokio-openssl = "0.3" +tokio-test = "0.2" [[test]] name = "main" diff --git a/components/net/fetch/methods.rs b/components/net/fetch/methods.rs index 20c5cd87cc1..2d9818a0770 100644 --- a/components/net/fetch/methods.rs +++ b/components/net/fetch/methods.rs @@ -10,8 +10,10 @@ use crate::http_loader::{determine_requests_referrer, http_fetch, HttpState}; use crate::http_loader::{set_default_accept, set_default_accept_language}; use crate::subresource_integrity::is_response_integrity_valid; use content_security_policy as csp; -use crossbeam_channel::{unbounded, Receiver, Sender}; +use crossbeam_channel::Sender; use devtools_traits::DevtoolsControlMsg; +use futures_util::compat::*; +use futures_util::StreamExt; use headers::{AccessControlExposeHeaders, ContentType, HeaderMapExt, Range}; use http::header::{self, HeaderMap, HeaderName}; use hyper::Method; @@ -40,6 +42,9 @@ use std::ops::Bound; use std::str; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; +use tokio2::sync::mpsc::{ + unbounded_channel, UnboundedReceiver as TokioReceiver, UnboundedSender as TokioSender, +}; lazy_static! { static ref X_CONTENT_TYPE_OPTIONS: HeaderName = @@ -48,7 +53,7 @@ lazy_static! { pub type Target<'a> = &'a mut (dyn FetchTaskTarget + Send); -#[derive(Clone)] +#[derive(Clone, Deserialize, Serialize)] pub enum Data { Payload(Vec), Done, @@ -58,8 +63,8 @@ pub enum Data { pub struct FetchContext { pub state: Arc, pub user_agent: Cow<'static, str>, - pub devtools_chan: Option>, - pub filemanager: FileManager, + pub devtools_chan: Option>>>, + pub filemanager: Arc>, pub file_token: FileTokenCheck, pub cancellation_listener: Arc>, pub timing: ServoArc>, @@ -93,10 +98,10 @@ impl CancellationListener { } } } -pub type DoneChannel = Option<(Sender, Receiver)>; +pub type DoneChannel = Option<(TokioSender, TokioReceiver)>; /// [Fetch](https://fetch.spec.whatwg.org#concept-fetch) -pub fn fetch(request: &mut Request, target: Target, context: &FetchContext) { +pub async fn fetch(request: &mut Request, target: Target<'_>, context: &FetchContext) { // Steps 7,4 of https://w3c.github.io/resource-timing/#processing-model // rev order okay since spec says they're equal - https://w3c.github.io/resource-timing/#dfn-starttime context @@ -110,13 +115,13 @@ pub fn fetch(request: &mut Request, target: Target, context: &FetchContext) { .unwrap() .set_attribute(ResourceAttribute::StartTime(ResourceTimeValue::FetchStart)); - fetch_with_cors_cache(request, &mut CorsCache::new(), target, context); + fetch_with_cors_cache(request, &mut CorsCache::new(), target, context).await; } -pub fn fetch_with_cors_cache( +pub async fn fetch_with_cors_cache( request: &mut Request, cache: &mut CorsCache, - target: Target, + target: Target<'_>, context: &FetchContext, ) { // Step 1. @@ -150,7 +155,7 @@ pub fn fetch_with_cors_cache( } // Step 8. - main_fetch(request, cache, false, false, target, &mut None, &context); + main_fetch(request, cache, false, false, target, &mut None, &context).await; } /// https://www.w3.org/TR/CSP/#should-block-request @@ -178,12 +183,12 @@ pub fn should_request_be_blocked_by_csp(request: &Request) -> csp::CheckResult { } /// [Main fetch](https://fetch.spec.whatwg.org/#concept-main-fetch) -pub fn main_fetch( +pub async fn main_fetch( request: &mut Request, cache: &mut CorsCache, cors_flag: bool, recursive_flag: bool, - target: Target, + target: Target<'_>, done_chan: &mut DoneChannel, context: &FetchContext, ) -> Response { @@ -266,61 +271,67 @@ pub fn main_fetch( // Not applicable: see fetch_async. // Step 12. - let mut response = response.unwrap_or_else(|| { - let current_url = request.current_url(); - let same_origin = if let Origin::Origin(ref origin) = request.origin { - *origin == current_url.origin() - } else { - false - }; - if (same_origin && !cors_flag) || - current_url.scheme() == "data" || - current_url.scheme() == "chrome" - { - // Substep 1. - request.response_tainting = ResponseTainting::Basic; + let mut response = match response { + Some(res) => res, + None => { + let current_url = request.current_url(); + let same_origin = if let Origin::Origin(ref origin) = request.origin { + *origin == current_url.origin() + } else { + false + }; - // Substep 2. - scheme_fetch(request, cache, target, done_chan, context) - } else if request.mode == RequestMode::SameOrigin { - Response::network_error(NetworkError::Internal("Cross-origin response".into())) - } else if request.mode == RequestMode::NoCors { - // Substep 1. - request.response_tainting = ResponseTainting::Opaque; + if (same_origin && !cors_flag) || + current_url.scheme() == "data" || + current_url.scheme() == "chrome" + { + // Substep 1. + request.response_tainting = ResponseTainting::Basic; - // Substep 2. - scheme_fetch(request, cache, target, done_chan, context) - } else if !matches!(current_url.scheme(), "http" | "https") { - Response::network_error(NetworkError::Internal("Non-http scheme".into())) - } else if request.use_cors_preflight || - (request.unsafe_request && - (!is_cors_safelisted_method(&request.method) || - request.headers.iter().any(|(name, value)| { - !is_cors_safelisted_request_header(&name, &value) - }))) - { - // Substep 1. - request.response_tainting = ResponseTainting::CorsTainting; - // Substep 2. - let response = http_fetch( - request, cache, true, true, false, target, done_chan, context, - ); - // Substep 3. - if response.is_network_error() { - // TODO clear cache entries using request + // Substep 2. + scheme_fetch(request, cache, target, done_chan, context).await + } else if request.mode == RequestMode::SameOrigin { + Response::network_error(NetworkError::Internal("Cross-origin response".into())) + } else if request.mode == RequestMode::NoCors { + // Substep 1. + request.response_tainting = ResponseTainting::Opaque; + + // Substep 2. + scheme_fetch(request, cache, target, done_chan, context).await + } else if !matches!(current_url.scheme(), "http" | "https") { + Response::network_error(NetworkError::Internal("Non-http scheme".into())) + } else if request.use_cors_preflight || + (request.unsafe_request && + (!is_cors_safelisted_method(&request.method) || + request.headers.iter().any(|(name, value)| { + !is_cors_safelisted_request_header(&name, &value) + }))) + { + // Substep 1. + request.response_tainting = ResponseTainting::CorsTainting; + // Substep 2. + let response = http_fetch( + request, cache, true, true, false, target, done_chan, context, + ) + .await; + // Substep 3. + if response.is_network_error() { + // TODO clear cache entries using request + } + // Substep 4. + response + } else { + // Substep 1. + request.response_tainting = ResponseTainting::CorsTainting; + // Substep 2. + http_fetch( + request, cache, true, false, false, target, done_chan, context, + ) + .await } - // Substep 4. - response - } else { - // Substep 1. - request.response_tainting = ResponseTainting::CorsTainting; - // Substep 2. - http_fetch( - request, cache, true, false, false, target, done_chan, context, - ) - } - }); + }, + }; // Step 13. if recursive_flag { @@ -441,7 +452,7 @@ pub fn main_fetch( let mut response_loaded = false; let mut response = if !response.is_network_error() && !request.integrity_metadata.is_empty() { // Step 19.1. - wait_for_response(&mut response, target, done_chan); + wait_for_response(&mut response, target, done_chan).await; response_loaded = true; // Step 19.2. @@ -465,7 +476,7 @@ pub fn main_fetch( // by sync fetch, but we overload it here for simplicity target.process_response(&mut response); if !response_loaded { - wait_for_response(&mut response, target, done_chan); + wait_for_response(&mut response, target, done_chan).await; } // overloaded similarly to process_response target.process_response_eof(&response); @@ -487,7 +498,7 @@ pub fn main_fetch( // Step 23. if !response_loaded { - wait_for_response(&mut response, target, done_chan); + wait_for_response(&mut response, target, done_chan).await; } // Step 24. @@ -502,22 +513,25 @@ pub fn main_fetch( response } -fn wait_for_response(response: &mut Response, target: Target, done_chan: &mut DoneChannel) { - if let Some(ref ch) = *done_chan { +async fn wait_for_response( + response: &mut Response, + target: Target<'_>, + done_chan: &mut DoneChannel, +) { + if let Some(ref mut ch) = *done_chan { loop { - match ch - .1 - .recv() - .expect("fetch worker should always send Done before terminating") - { - Data::Payload(vec) => { + match ch.1.recv().await { + Some(Data::Payload(vec)) => { target.process_response_chunk(vec); }, - Data::Done => break, - Data::Cancelled => { + Some(Data::Done) => break, + Some(Data::Cancelled) => { response.aborted.store(true, Ordering::Release); break; }, + _ => { + panic!("fetch worker should always send Done before terminating"); + }, } } } else { @@ -613,10 +627,10 @@ fn create_blank_reply(url: ServoUrl, timing_type: ResourceTimingType) -> Respons } /// [Scheme fetch](https://fetch.spec.whatwg.org#scheme-fetch) -fn scheme_fetch( +async fn scheme_fetch( request: &mut Request, cache: &mut CorsCache, - target: Target, + target: Target<'_>, done_chan: &mut DoneChannel, context: &FetchContext, ) -> Response { @@ -628,6 +642,7 @@ fn scheme_fetch( "chrome" if url.path() == "allowcert" => { let data = request.body.as_mut().and_then(|body| { let stream = body.take_stream(); + let stream = stream.lock().unwrap(); let (body_chan, body_port) = ipc::channel().unwrap(); let _ = stream.send(BodyChunkRequest::Connect(body_chan)); let _ = stream.send(BodyChunkRequest::Chunk); @@ -653,9 +668,12 @@ fn scheme_fetch( create_blank_reply(url, request.timing_type()) }, - "http" | "https" => http_fetch( - request, cache, false, false, false, target, done_chan, context, - ), + "http" | "https" => { + http_fetch( + request, cache, false, false, false, target, done_chan, context, + ) + .await + }, "data" => match decode(&url) { Ok((mime, bytes)) => { @@ -726,12 +744,13 @@ fn scheme_fetch( // Setup channel to receive cross-thread messages about the file fetch // operation. - let (done_sender, done_receiver) = unbounded(); + let (mut done_sender, done_receiver) = unbounded_channel(); *done_chan = Some((done_sender.clone(), done_receiver)); + *response.body.lock().unwrap() = ResponseBody::Receiving(vec![]); - context.filemanager.fetch_file_in_chunks( - done_sender, + context.filemanager.lock().unwrap().fetch_file_in_chunks( + &mut done_sender, reader, response.body.clone(), context.cancellation_listener.clone(), @@ -781,12 +800,12 @@ fn scheme_fetch( partial_content(&mut response); } - let (done_sender, done_receiver) = unbounded(); + let (mut done_sender, done_receiver) = unbounded_channel(); *done_chan = Some((done_sender.clone(), done_receiver)); *response.body.lock().unwrap() = ResponseBody::Receiving(vec![]); - if let Err(err) = context.filemanager.fetch_file( - &done_sender, + if let Err(err) = context.filemanager.lock().unwrap().fetch_file( + &mut done_sender, context.cancellation_listener.clone(), id, &context.file_token, diff --git a/components/net/filemanager_thread.rs b/components/net/filemanager_thread.rs index 6d9da285154..d56bdd54668 100644 --- a/components/net/filemanager_thread.rs +++ b/components/net/filemanager_thread.rs @@ -4,7 +4,6 @@ use crate::fetch::methods::{CancellationListener, Data, RangeRequestBounds}; use crate::resource_thread::CoreResourceThreadPool; -use crossbeam_channel::Sender; use embedder_traits::{EmbedderMsg, EmbedderProxy, FilterPattern}; use headers::{ContentLength, ContentType, HeaderMap, HeaderMapExt}; use http::header::{self, HeaderValue}; @@ -28,6 +27,7 @@ use std::ops::Index; use std::path::{Path, PathBuf}; use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, RwLock, Weak}; +use tokio2::sync::mpsc::UnboundedSender as TokioSender; use url::Url; use uuid::Uuid; @@ -127,7 +127,7 @@ impl FileManager { // in a separate thread. pub fn fetch_file( &self, - done_sender: &Sender, + done_sender: &mut TokioSender, cancellation_listener: Arc>, id: Uuid, file_token: &FileTokenCheck, @@ -210,12 +210,13 @@ impl FileManager { pub fn fetch_file_in_chunks( &self, - done_sender: Sender, + done_sender: &mut TokioSender, mut reader: BufReader, res_body: ServoArc>, cancellation_listener: Arc>, range: RelativePos, ) { + let done_sender = done_sender.clone(); self.thread_pool .upgrade() .and_then(|pool| { @@ -282,7 +283,7 @@ impl FileManager { fn fetch_blob_buf( &self, - done_sender: &Sender, + done_sender: &mut TokioSender, cancellation_listener: Arc>, id: &Uuid, file_token: &FileTokenCheck, @@ -358,7 +359,7 @@ impl FileManager { ); self.fetch_file_in_chunks( - done_sender.clone(), + &mut done_sender.clone(), reader, response.body.clone(), cancellation_listener, diff --git a/components/net/http_cache.rs b/components/net/http_cache.rs index a743a05cdde..fe97ed13c8a 100644 --- a/components/net/http_cache.rs +++ b/components/net/http_cache.rs @@ -8,7 +8,6 @@ //! and . use crate::fetch::methods::{Data, DoneChannel}; -use crossbeam_channel::{unbounded, Sender}; use headers::{ CacheControl, ContentRange, Expires, HeaderMapExt, LastModified, Pragma, Range, Vary, }; @@ -30,6 +29,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Mutex; use std::time::SystemTime; use time::{Duration, Timespec, Tm}; +use tokio2::sync::mpsc::{unbounded_channel as unbounded, UnboundedSender as TokioSender}; /// The key used to differentiate requests in the cache. #[derive(Clone, Eq, Hash, MallocSizeOf, PartialEq)] @@ -58,7 +58,7 @@ struct CachedResource { request_headers: Arc>, body: Arc>, aborted: Arc, - awaiting_body: Arc>>>, + awaiting_body: Arc>>>, data: Measurable, } diff --git a/components/net/http_loader.rs b/components/net/http_loader.rs index db91c15c3cc..7950f711e48 100644 --- a/components/net/http_loader.rs +++ b/components/net/http_loader.rs @@ -11,11 +11,13 @@ use crate::fetch::methods::{main_fetch, Data, DoneChannel, FetchContext, Target} use crate::hsts::HstsList; use crate::http_cache::{CacheKey, HttpCache}; use crate::resource_thread::AuthCache; +use async_recursion::async_recursion; use crossbeam_channel::{unbounded, Receiver, Sender}; use devtools_traits::{ ChromeToDevtoolsControlMsg, DevtoolsControlMsg, HttpRequest as DevtoolsHttpRequest, }; use devtools_traits::{HttpResponse as DevtoolsHttpResponse, NetworkEvent}; +use futures_util::compat::*; use headers::authorization::Basic; use headers::{AccessControlAllowCredentials, AccessControlAllowHeaders, HeaderMapExt}; use headers::{ @@ -64,11 +66,13 @@ use std::sync::{Arc as StdArc, Condvar, Mutex, RwLock}; use std::time::{Duration, SystemTime}; use time::{self, Tm}; use tokio::prelude::{future, Future, Sink, Stream}; -use tokio::runtime::Runtime; use tokio::sync::mpsc::{channel, Receiver as TokioReceiver, Sender as TokioSender}; +use tokio2::sync::mpsc::{unbounded_channel, UnboundedSender as Tokio02Sender}; +use tokio_compat::runtime::{Builder, Runtime}; lazy_static! { - pub static ref HANDLE: Mutex> = Mutex::new(Some(Runtime::new().unwrap())); + pub static ref HANDLE: Mutex> = + Mutex::new(Some(Builder::new().build().unwrap())); } /// The various states an entry of the HttpCache can be in. @@ -491,180 +495,184 @@ impl BodySink { } } -fn obtain_response( +async fn obtain_response( client: &Client, url: &ServoUrl, method: &Method, request_headers: &mut HeaderMap, - body: Option>, + body: Option>>>, source_is_null: bool, pipeline_id: &Option, request_id: Option<&str>, is_xhr: bool, context: &FetchContext, - fetch_terminated: Sender, -) -> Box< - dyn Future< - Item = (HyperResponse, Option), - Error = NetworkError, - >, -> { - let headers = request_headers.clone(); + fetch_terminated: Tokio02Sender, +) -> Result<(HyperResponse, Option), NetworkError> { + { + let mut headers = request_headers.clone(); - let devtools_bytes = StdArc::new(Mutex::new(vec![])); + let devtools_bytes = StdArc::new(Mutex::new(vec![])); - // https://url.spec.whatwg.org/#percent-encoded-bytes - let encoded_url = url - .clone() - .into_url() - .as_ref() - .replace("|", "%7C") - .replace("{", "%7B") - .replace("}", "%7D"); + // https://url.spec.whatwg.org/#percent-encoded-bytes + let encoded_url = url + .clone() + .into_url() + .as_ref() + .replace("|", "%7C") + .replace("{", "%7B") + .replace("}", "%7D"); - let request = if let Some(chunk_requester) = body { - let (sink, stream) = if source_is_null { - // Step 4.2 of https://fetch.spec.whatwg.org/#concept-http-network-fetch - // TODO: this should not be set for HTTP/2(currently not supported?). - request_headers.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked")); + let request = if let Some(chunk_requester) = body { + let (mut sink, stream) = if source_is_null { + // Step 4.2 of https://fetch.spec.whatwg.org/#concept-http-network-fetch + // TODO: this should not be set for HTTP/2(currently not supported?). + headers.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked")); - let (sender, receiver) = channel(1); - (BodySink::Chunked(sender), BodyStream::Chunked(receiver)) - } else { - // Note: Hyper seems to already buffer bytes when the request appears not stream-able, - // see https://github.com/hyperium/hyper/issues/2232#issuecomment-644322104 - // - // However since this doesn't appear documented, and we're using an ancient version, - // for now we buffer manually to ensure we don't stream requests - // to servers that might not know how to handle them. - let (sender, receiver) = unbounded(); - (BodySink::Buffered(sender), BodyStream::Buffered(receiver)) - }; + let (sender, receiver) = channel(1); + (BodySink::Chunked(sender), BodyStream::Chunked(receiver)) + } else { + // Note: Hyper seems to already buffer bytes when the request appears not stream-able, + // see https://github.com/hyperium/hyper/issues/2232#issuecomment-644322104 + // + // However since this doesn't appear documented, and we're using an ancient version, + // for now we buffer manually to ensure we don't stream requests + // to servers that might not know how to handle them. + let (sender, receiver) = unbounded(); + (BodySink::Buffered(sender), BodyStream::Buffered(receiver)) + }; - let (body_chan, body_port) = ipc::channel().unwrap(); + let (body_chan, body_port) = ipc::channel().unwrap(); - let _ = chunk_requester.send(BodyChunkRequest::Connect(body_chan)); + if let Ok(requester) = chunk_requester.lock() { + let _ = requester.send(BodyChunkRequest::Connect(body_chan)); - // https://fetch.spec.whatwg.org/#concept-request-transmit-body - // Request the first chunk, corresponding to Step 3 and 4. - let _ = chunk_requester.send(BodyChunkRequest::Chunk); + // https://fetch.spec.whatwg.org/#concept-request-transmit-body + // Request the first chunk, corresponding to Step 3 and 4. + let _ = requester.send(BodyChunkRequest::Chunk); + } - let devtools_bytes = devtools_bytes.clone(); + let devtools_bytes = devtools_bytes.clone(); + let chunk_requester2 = chunk_requester.clone(); - ROUTER.add_route( - body_port.to_opaque(), - Box::new(move |message| { - let bytes: Vec = match message.to().unwrap() { - BodyChunkResponse::Chunk(bytes) => bytes, - BodyChunkResponse::Done => { - // Step 3, abort these parallel steps. - let _ = fetch_terminated.send(false); - sink.close(); - return; - }, - BodyChunkResponse::Error => { - // Step 4 and/or 5. - // TODO: differentiate between the two steps, - // where step 5 requires setting an `aborted` flag on the fetch. - let _ = fetch_terminated.send(true); - sink.close(); - return; - }, - }; + ROUTER.add_route( + body_port.to_opaque(), + Box::new(move |message| { + let bytes: Vec = match message.to().unwrap() { + BodyChunkResponse::Chunk(bytes) => bytes, + BodyChunkResponse::Done => { + // Step 3, abort these parallel steps. + let _ = fetch_terminated.send(false); + sink.close(); - devtools_bytes.lock().unwrap().append(&mut bytes.clone()); - - // Step 5.1.2.2, transmit chunk over the network, - // currently implemented by sending the bytes to the fetch worker. - sink.transmit_bytes(bytes); - - // Step 5.1.2.3 - // Request the next chunk. - let _ = chunk_requester.send(BodyChunkRequest::Chunk); - }), - ); - - let body = match stream { - BodyStream::Chunked(receiver) => Body::wrap_stream(receiver), - BodyStream::Buffered(receiver) => { - // Accumulate bytes received over IPC into a vector. - let mut body = vec![]; - loop { - match receiver.recv() { - Ok(BodyChunk::Chunk(mut bytes)) => { - body.append(&mut bytes); + return; }, - Ok(BodyChunk::Done) => break, - Err(_) => warn!("Failed to read all chunks from request body."), + BodyChunkResponse::Error => { + // Step 4 and/or 5. + // TODO: differentiate between the two steps, + // where step 5 requires setting an `aborted` flag on the fetch. + let _ = fetch_terminated.send(true); + sink.close(); + + return; + }, + }; + + devtools_bytes.lock().unwrap().append(&mut bytes.clone()); + + // Step 5.1.2.2, transmit chunk over the network, + // currently implemented by sending the bytes to the fetch worker. + sink.transmit_bytes(bytes); + + // Step 5.1.2.3 + // Request the next chunk. + let _ = chunk_requester2 + .lock() + .unwrap() + .send(BodyChunkRequest::Chunk); + }), + ); + + let body = match stream { + BodyStream::Chunked(receiver) => Body::wrap_stream(receiver), + BodyStream::Buffered(receiver) => { + // Accumulate bytes received over IPC into a vector. + let mut body = vec![]; + loop { + match receiver.recv() { + Ok(BodyChunk::Chunk(mut bytes)) => { + body.append(&mut bytes); + }, + Ok(BodyChunk::Done) => break, + Err(_) => warn!("Failed to read all chunks from request body."), + } } - } - body.into() - }, + body.into() + }, + }; + + HyperRequest::builder() + .method(method) + .uri(encoded_url) + .body(body) + } else { + HyperRequest::builder() + .method(method) + .uri(encoded_url) + .body(Body::empty()) }; - HyperRequest::builder() - .method(method) - .uri(encoded_url) - .body(body) - } else { - HyperRequest::builder() - .method(method) - .uri(encoded_url) - .body(Body::empty()) - }; - - context - .timing - .lock() - .unwrap() - .set_attribute(ResourceAttribute::DomainLookupStart); - - // TODO(#21261) connect_start: set if a persistent connection is *not* used and the last non-redirected - // fetch passes the timing allow check - let connect_start = precise_time_ms(); - context - .timing - .lock() - .unwrap() - .set_attribute(ResourceAttribute::ConnectStart(connect_start)); - - // TODO: We currently don't know when the handhhake before the connection is done - // so our best bet would be to set `secure_connection_start` here when we are currently - // fetching on a HTTPS url. - if url.scheme() == "https" { context .timing .lock() .unwrap() - .set_attribute(ResourceAttribute::SecureConnectionStart); - } + .set_attribute(ResourceAttribute::DomainLookupStart); - let mut request = match request { - Ok(request) => request, - Err(e) => return Box::new(future::result(Err(NetworkError::from_http_error(&e)))), - }; - *request.headers_mut() = headers.clone(); + // TODO(#21261) connect_start: set if a persistent connection is *not* used and the last non-redirected + // fetch passes the timing allow check + let connect_start = precise_time_ms(); + context + .timing + .lock() + .unwrap() + .set_attribute(ResourceAttribute::ConnectStart(connect_start)); - let connect_end = precise_time_ms(); - context - .timing - .lock() - .unwrap() - .set_attribute(ResourceAttribute::ConnectEnd(connect_end)); + // TODO: We currently don't know when the handhhake before the connection is done + // so our best bet would be to set `secure_connection_start` here when we are currently + // fetching on a HTTPS url. + if url.scheme() == "https" { + context + .timing + .lock() + .unwrap() + .set_attribute(ResourceAttribute::SecureConnectionStart); + } - let request_id = request_id.map(|v| v.to_owned()); - let pipeline_id = pipeline_id.clone(); - let closure_url = url.clone(); - let method = method.clone(); - let send_start = precise_time_ms(); + let mut request = match request { + Ok(request) => request, + Err(e) => return Err(NetworkError::from_http_error(&e)), + }; + *request.headers_mut() = headers.clone(); - let host = request.uri().host().unwrap_or("").to_owned(); - let host_clone = request.uri().host().unwrap_or("").to_owned(); - let connection_certs = context.state.connection_certs.clone(); - let connection_certs_clone = context.state.connection_certs.clone(); + let connect_end = precise_time_ms(); + context + .timing + .lock() + .unwrap() + .set_attribute(ResourceAttribute::ConnectEnd(connect_end)); + + let request_id = request_id.map(|v| v.to_owned()); + let pipeline_id = pipeline_id.clone(); + let closure_url = url.clone(); + let method = method.clone(); + let send_start = precise_time_ms(); + + let host = request.uri().host().unwrap_or("").to_owned(); + let host_clone = request.uri().host().unwrap_or("").to_owned(); + let connection_certs = context.state.connection_certs.clone(); + let connection_certs_clone = context.state.connection_certs.clone(); + + let headers = headers.clone(); - let headers = headers.clone(); - Box::new( client .request(request) .and_then(move |res| { @@ -705,18 +713,21 @@ fn obtain_response( }) .map_err(move |e| { NetworkError::from_hyper_error(&e, connection_certs_clone.remove(host_clone)) - }), - ) + }) + .compat() // convert from Future01 to Future03 + .await + } } /// [HTTP fetch](https://fetch.spec.whatwg.org#http-fetch) -pub fn http_fetch( +#[async_recursion] +pub async fn http_fetch( request: &mut Request, cache: &mut CorsCache, cors_flag: bool, cors_preflight_flag: bool, authentication_fetch_flag: bool, - target: Target, + target: Target<'async_recursion>, done_chan: &mut DoneChannel, context: &FetchContext, ) -> Response { @@ -771,7 +782,7 @@ pub fn http_fetch( // Sub-substep 1 if method_mismatch || header_mismatch { - let preflight_result = cors_preflight_fetch(&request, cache, context); + let preflight_result = cors_preflight_fetch(&request, cache, context).await; // Sub-substep 2 if let Some(e) = preflight_result.get_network_error() { return Response::network_error(e.clone()); @@ -799,7 +810,8 @@ pub fn http_fetch( cors_flag, done_chan, context, - ); + ) + .await; // Substep 4 if cors_flag && cors_check(&request, &fetch_result).is_err() { @@ -865,6 +877,7 @@ pub fn http_fetch( http_redirect_fetch( request, cache, response, cors_flag, target, done_chan, context, ) + .await }, }; } @@ -907,12 +920,13 @@ impl Drop for RedirectEndTimer { } /// [HTTP redirect fetch](https://fetch.spec.whatwg.org#http-redirect-fetch) -pub fn http_redirect_fetch( +#[async_recursion] +pub async fn http_redirect_fetch( request: &mut Request, cache: &mut CorsCache, response: Response, cors_flag: bool, - target: Target, + target: Target<'async_recursion>, done_chan: &mut DoneChannel, context: &FetchContext, ) -> Response { @@ -1071,7 +1085,8 @@ pub fn http_redirect_fetch( target, done_chan, context, - ); + ) + .await; // TODO: timing allow check context @@ -1100,7 +1115,8 @@ fn try_immutable_origin_to_hyper_origin(url_origin: &ImmutableOrigin) -> Option< } /// [HTTP network or cache fetch](https://fetch.spec.whatwg.org#http-network-or-cache-fetch) -fn http_network_or_cache_fetch( +#[async_recursion] +async fn http_network_or_cache_fetch( request: &mut Request, authentication_fetch_flag: bool, cors_flag: bool, @@ -1398,26 +1414,27 @@ fn http_network_or_cache_fetch( } } - fn wait_for_cached_response(done_chan: &mut DoneChannel, response: &mut Option) { - if let Some(ref ch) = *done_chan { + async fn wait_for_cached_response( + done_chan: &mut DoneChannel, + response: &mut Option, + ) { + if let Some(ref mut ch) = *done_chan { // The cache constructed a response with a body of ResponseBody::Receiving. // We wait for the response in the cache to "finish", // with a body of either Done or Cancelled. assert!(response.is_some()); + loop { - match ch - .1 - .recv() - .expect("HTTP cache should always send Done or Cancelled") - { - Data::Payload(_) => {}, - Data::Done => break, // Return the full response as if it was initially cached as such. - Data::Cancelled => { + match ch.1.recv().await { + Some(Data::Payload(_)) => {}, + Some(Data::Done) => break, // Return the full response as if it was initially cached as such. + Some(Data::Cancelled) => { // The response was cancelled while the fetch was ongoing. // Set response to None, which will trigger a network fetch below. *response = None; break; }, + _ => panic!("HTTP cache should always send Done or Cancelled"), } } } @@ -1425,7 +1442,7 @@ fn http_network_or_cache_fetch( *done_chan = None; } - wait_for_cached_response(done_chan, &mut response); + wait_for_cached_response(done_chan, &mut response).await; // Step 6 // TODO: https://infra.spec.whatwg.org/#if-aborted @@ -1446,7 +1463,7 @@ fn http_network_or_cache_fetch( if response.is_none() { // Substep 2 let forward_response = - http_network_fetch(http_request, credentials_flag, done_chan, context); + http_network_fetch(http_request, credentials_flag, done_chan, context).await; // Substep 3 if let Some((200..=399, _)) = forward_response.raw_status { if !http_request.method.is_safe() { @@ -1467,8 +1484,8 @@ fn http_network_or_cache_fetch( // since the network response will be replaced by the revalidated stored one. *done_chan = None; response = http_cache.refresh(&http_request, forward_response.clone(), done_chan); - wait_for_cached_response(done_chan, &mut response); } + wait_for_cached_response(done_chan, &mut response).await; } // Substep 5 @@ -1596,7 +1613,8 @@ fn http_network_or_cache_fetch( cors_flag, done_chan, context, - ); + ) + .await; } // Step 11 @@ -1655,7 +1673,7 @@ impl Drop for ResponseEndTimer { } /// [HTTP network fetch](https://fetch.spec.whatwg.org/#http-network-fetch) -fn http_network_fetch( +async fn http_network_fetch( request: &mut Request, credentials_flag: bool, done_chan: &mut DoneChannel, @@ -1686,7 +1704,7 @@ fn http_network_fetch( if log_enabled!(log::Level::Info) { info!("{:?} request for {}", request.method, url); for header in request.headers.iter() { - info!(" - {:?}", header); + debug!(" - {:?}", header); } } @@ -1696,7 +1714,7 @@ fn http_network_fetch( let is_xhr = request.destination == Destination::None; // The receiver will receive true if there has been an error streaming the request body. - let (fetch_terminated_sender, fetch_terminated_receiver) = unbounded(); + let (fetch_terminated_sender, mut fetch_terminated_receiver) = unbounded_channel(); let body = request.body.as_ref().map(|body| body.take_stream()); @@ -1728,32 +1746,28 @@ fn http_network_fetch( let pipeline_id = request.pipeline_id; // This will only get the headers, the body is read later - let (res, msg) = match response_future.wait() { + let (res, msg) = match response_future.await { Ok(wrapped_response) => wrapped_response, Err(error) => return Response::network_error(error), }; + if log_enabled!(log::Level::Info) { + debug!("{:?} response for {}", res.version(), url); + for header in res.headers().iter() { + debug!(" - {:?}", header); + } + } + // Check if there was an error while streaming the request body. // - // It's ok to block on the receiver, - // since we're already blocking on the response future above, - // so we can be sure that the request has already been processed, - // and a message is in the channel(or soon will be). - match fetch_terminated_receiver.recv() { - Ok(true) => { + match fetch_terminated_receiver.recv().await { + Some(true) => { return Response::network_error(NetworkError::Internal( "Request body streaming failed.".into(), )); }, - Ok(false) => {}, - Err(_) => warn!("Failed to receive confirmation request was streamed without error."), - } - - if log_enabled!(log::Level::Info) { - info!("{:?} response for {}", res.version(), url); - for header in res.headers().iter() { - info!(" - {:?}", header); - } + Some(false) => {}, + _ => warn!("Failed to receive confirmation request was streamed without error."), } let header_strings: Vec<&str> = res @@ -1791,7 +1805,7 @@ fn http_network_fetch( res.status(), res.status().canonical_reason().unwrap_or("").into(), )); - debug!("got {:?} response for {:?}", res.status(), request.url()); + info!("got {:?} response for {:?}", res.status(), request.url()); response.raw_status = Some(( res.status().as_u16(), res.status().canonical_reason().unwrap_or("").into(), @@ -1803,7 +1817,7 @@ fn http_network_fetch( let res_body = response.body.clone(); // We're about to spawn a future to be waited on here - let (done_sender, done_receiver) = unbounded(); + let (done_sender, done_receiver) = unbounded_channel(); *done_chan = Some((done_sender.clone(), done_receiver)); let meta = match response .metadata() @@ -1825,6 +1839,7 @@ fn http_network_fetch( let res_body2 = res_body.clone(); if let Some(ref sender) = devtools_sender { + let sender = sender.lock().unwrap(); if let Some(m) = msg { send_request_to_devtools(m, &sender); } @@ -1848,21 +1863,22 @@ fn http_network_fetch( let timing_ptr3 = context.timing.clone(); let url1 = request.url(); let url2 = url1.clone(); - HANDLE.lock().unwrap().as_mut().unwrap().spawn( + + HANDLE.lock().unwrap().as_ref().unwrap().spawn( res.into_body() .map_err(|_| ()) .fold(res_body, move |res_body, chunk| { if cancellation_listener.lock().unwrap().cancelled() { *res_body.lock().unwrap() = ResponseBody::Done(vec![]); let _ = done_sender.send(Data::Cancelled); - return future::failed(()); + return tokio::prelude::future::failed(()); } if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() { let bytes = chunk.into_bytes(); body.extend_from_slice(&*bytes); let _ = done_sender.send(Data::Payload(bytes.to_vec())); } - future::ok(res_body) + tokio::prelude::future::ok(res_body) }) .and_then(move |res_body| { debug!("successfully finished response for {:?}", url1); @@ -1877,10 +1893,10 @@ fn http_network_fetch( .unwrap() .set_attribute(ResourceAttribute::ResponseEnd); let _ = done_sender2.send(Data::Done); - future::ok(()) + tokio::prelude::future::ok(()) }) .map_err(move |_| { - debug!("finished response for {:?} with error", url2); + warn!("finished response for {:?} with error", url2); let mut body = res_body2.lock().unwrap(); let completed_body = match *body { ResponseBody::Receiving(ref mut body) => mem::replace(body, vec![]), @@ -1956,7 +1972,7 @@ fn http_network_fetch( } /// [CORS preflight fetch](https://fetch.spec.whatwg.org#cors-preflight-fetch) -fn cors_preflight_fetch( +async fn cors_preflight_fetch( request: &Request, cache: &mut CorsCache, context: &FetchContext, @@ -2000,7 +2016,8 @@ fn cors_preflight_fetch( } // Step 6 - let response = http_network_or_cache_fetch(&mut preflight, false, false, &mut None, context); + let response = + http_network_or_cache_fetch(&mut preflight, false, false, &mut None, context).await; // Step 7 if cors_check(&request, &response).is_ok() && response diff --git a/components/net/resource_thread.rs b/components/net/resource_thread.rs index a1d6c849098..e59aca10fcf 100644 --- a/components/net/resource_thread.rs +++ b/components/net/resource_thread.rs @@ -680,44 +680,58 @@ impl CoreResourceManager { _ => (FileTokenCheck::NotRequired, None), }; - self.thread_pool.spawn(move || { - // XXXManishearth: Check origin against pipeline id (also ensure that the mode is allowed) - // todo load context / mimesniff in fetch - // todo referrer policy? - // todo service worker stuff - let context = FetchContext { - state: http_state, - user_agent: ua, - devtools_chan: dc, - filemanager: filemanager, - file_token, - cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(cancel_chan))), - timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new(request.timing_type()))), - }; + HANDLE + .lock() + .unwrap() + .as_ref() + .unwrap() + .spawn_std(async move { + // XXXManishearth: Check origin against pipeline id (also ensure that the mode is allowed) + // todo load context / mimesniff in fetch + // todo referrer policy? + // todo service worker stuff + let context = FetchContext { + state: http_state, + user_agent: ua, + devtools_chan: dc.map(|dc| Arc::new(Mutex::new(dc))), + filemanager: Arc::new(Mutex::new(filemanager)), + file_token, + cancellation_listener: Arc::new(Mutex::new(CancellationListener::new( + cancel_chan, + ))), + timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new( + request.timing_type(), + ))), + }; - match res_init_ { - Some(res_init) => { - let response = Response::from_init(res_init, timing_type); - http_redirect_fetch( - &mut request, - &mut CorsCache::new(), - response, - true, - &mut sender, - &mut None, - &context, - ); - }, - None => fetch(&mut request, &mut sender, &context), - }; + match res_init_ { + Some(res_init) => { + let response = Response::from_init(res_init, timing_type); + http_redirect_fetch( + &mut request, + &mut CorsCache::new(), + response, + true, + &mut sender, + &mut None, + &context, + ) + .await; + }, + None => { + fetch(&mut request, &mut sender, &context).await; + }, + }; - // Remove token after fetch. - if let Some(id) = blob_url_file_id.as_ref() { - context - .filemanager - .invalidate_token(&context.file_token, id); - } - }); + // Remove token after fetch. + if let Some(id) = blob_url_file_id.as_ref() { + context + .filemanager + .lock() + .unwrap() + .invalidate_token(&context.file_token, id); + } + }); } fn websocket_connect( diff --git a/components/net/tests/fetch.rs b/components/net/tests/fetch.rs index 653526adccb..9416a81e8de 100644 --- a/components/net/tests/fetch.rs +++ b/components/net/tests/fetch.rs @@ -48,6 +48,7 @@ use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Weak}; use std::time::{Duration, SystemTime}; +use tokio_test::block_on; use uuid::Uuid; // TODO write a struct that impls Handler for storing test values @@ -191,9 +192,12 @@ fn test_fetch_blob() { let origin = ServoUrl::parse("http://www.example.org/").unwrap(); let id = Uuid::new_v4(); - context - .filemanager - .promote_memory(id.clone(), blob_buf, true, "http://www.example.org".into()); + context.filemanager.lock().unwrap().promote_memory( + id.clone(), + blob_buf, + true, + "http://www.example.org".into(), + ); let url = ServoUrl::parse(&format!("blob:{}{}", origin.as_str(), id.to_simple())).unwrap(); let mut request = Request::new( @@ -212,7 +216,7 @@ fn test_fetch_blob() { expected: bytes.to_vec(), }; - methods::fetch(&mut request, &mut target, &context); + block_on(methods::fetch(&mut request, &mut target, &context)); let fetch_response = receiver.recv().unwrap(); assert!(!fetch_response.is_network_error()); @@ -772,7 +776,10 @@ fn test_fetch_with_hsts() { state: Arc::new(HttpState::new(tls_config)), user_agent: DEFAULT_USER_AGENT.into(), devtools_chan: None, - filemanager: FileManager::new(create_embedder_proxy(), Weak::new()), + filemanager: Arc::new(Mutex::new(FileManager::new( + create_embedder_proxy(), + Weak::new(), + ))), file_token: FileTokenCheck::NotRequired, cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(None))), timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new( @@ -835,7 +842,10 @@ fn test_load_adds_host_to_hsts_list_when_url_is_https() { state: Arc::new(HttpState::new(tls_config)), user_agent: DEFAULT_USER_AGENT.into(), devtools_chan: None, - filemanager: FileManager::new(create_embedder_proxy(), Weak::new()), + filemanager: Arc::new(Mutex::new(FileManager::new( + create_embedder_proxy(), + Weak::new(), + ))), file_token: FileTokenCheck::NotRequired, cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(None))), timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new( @@ -900,7 +910,10 @@ fn test_fetch_self_signed() { state: Arc::new(HttpState::new(tls_config)), user_agent: DEFAULT_USER_AGENT.into(), devtools_chan: None, - filemanager: FileManager::new(create_embedder_proxy(), Weak::new()), + filemanager: Arc::new(Mutex::new(FileManager::new( + create_embedder_proxy(), + Weak::new(), + ))), file_token: FileTokenCheck::NotRequired, cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(None))), timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new( diff --git a/components/net/tests/http_cache.rs b/components/net/tests/http_cache.rs index 6d2cd50ce47..8dd09b650da 100644 --- a/components/net/tests/http_cache.rs +++ b/components/net/tests/http_cache.rs @@ -2,7 +2,6 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -use crossbeam_channel::unbounded; use http::header::{HeaderValue, EXPIRES}; use http::StatusCode; use msg::constellation_msg::TEST_PIPELINE_ID; @@ -11,6 +10,7 @@ use net_traits::request::{Origin, Referrer, Request}; use net_traits::response::{HttpsState, Response, ResponseBody}; use net_traits::{ResourceFetchTiming, ResourceTimingType}; use servo_url::ServoUrl; +use tokio2::sync::mpsc::unbounded_channel as unbounded; #[test] fn test_refreshing_resource_sets_done_chan_the_appropriate_value() { @@ -40,7 +40,8 @@ fn test_refreshing_resource_sets_done_chan_the_appropriate_value() { cache.store(&request, &response); // Second, mutate the response into a 304 response, and refresh the stored one. response.status = Some((StatusCode::NOT_MODIFIED, String::from("304"))); - let mut done_chan = Some(unbounded()); + let (send, recv) = unbounded(); + let mut done_chan = Some((send, recv)); let refreshed_response = cache.refresh(&request, response.clone(), &mut done_chan); // Ensure a resource was found, and refreshed. assert!(refreshed_response.is_some()); diff --git a/components/net/tests/main.rs b/components/net/tests/main.rs index be43c23b0ca..f814980f4aa 100644 --- a/components/net/tests/main.rs +++ b/components/net/tests/main.rs @@ -50,6 +50,7 @@ use tokio::net::TcpListener; use tokio::reactor::Handle; use tokio::runtime::Runtime; use tokio_openssl::SslAcceptorExt; +use tokio_test::block_on; lazy_static! { pub static ref HANDLE: Mutex = Mutex::new(Runtime::new().unwrap()); @@ -103,8 +104,11 @@ fn new_fetch_context( FetchContext { state: Arc::new(HttpState::new(tls_config)), user_agent: DEFAULT_USER_AGENT.into(), - devtools_chan: dc, - filemanager: FileManager::new(sender, pool_handle.unwrap_or_else(|| Weak::new())), + devtools_chan: dc.map(|dc| Arc::new(Mutex::new(dc))), + filemanager: Arc::new(Mutex::new(FileManager::new( + sender, + pool_handle.unwrap_or_else(|| Weak::new()), + ))), file_token: FileTokenCheck::NotRequired, cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(None))), timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new( @@ -131,7 +135,7 @@ fn fetch_with_context(request: &mut Request, mut context: &mut FetchContext) -> let (sender, receiver) = unbounded(); let mut target = FetchResponseCollector { sender: sender }; - methods::fetch(request, &mut target, &mut context); + block_on(methods::fetch(request, &mut target, &mut context)); receiver.recv().unwrap() } @@ -140,12 +144,12 @@ fn fetch_with_cors_cache(request: &mut Request, cache: &mut CorsCache) -> Respon let (sender, receiver) = unbounded(); let mut target = FetchResponseCollector { sender: sender }; - methods::fetch_with_cors_cache( + block_on(methods::fetch_with_cors_cache( request, cache, &mut target, &mut new_fetch_context(None, None, None), - ); + )); receiver.recv().unwrap() } diff --git a/components/net_traits/request.rs b/components/net_traits/request.rs index 49cd548e093..cfdd8edf399 100644 --- a/components/net_traits/request.rs +++ b/components/net_traits/request.rs @@ -13,6 +13,7 @@ use ipc_channel::ipc::{self, IpcReceiver, IpcSender}; use mime::Mime; use msg::constellation_msg::PipelineId; use servo_url::{ImmutableOrigin, ServoUrl}; +use std::sync::{Arc, Mutex}; /// An [initiator](https://fetch.spec.whatwg.org/#concept-request-initiator) #[derive(Clone, Copy, Debug, Deserialize, MallocSizeOf, PartialEq, Serialize)] @@ -163,7 +164,7 @@ pub enum BodyChunkRequest { pub struct RequestBody { /// Net's channel to communicate with script re this body. #[ignore_malloc_size_of = "Channels are hard"] - chan: IpcSender, + chan: Arc>>, /// source: BodySource, /// @@ -177,7 +178,7 @@ impl RequestBody { total_bytes: Option, ) -> Self { RequestBody { - chan, + chan: Arc::new(Mutex::new(chan)), source, total_bytes, } @@ -189,13 +190,14 @@ impl RequestBody { BodySource::Null => panic!("Null sources should never be re-directed."), BodySource::Object => { let (chan, port) = ipc::channel().unwrap(); - let _ = self.chan.send(BodyChunkRequest::Extract(port)); - self.chan = chan.clone(); + let mut selfchan = self.chan.lock().unwrap(); + let _ = selfchan.send(BodyChunkRequest::Extract(port)); + *selfchan = chan; }, } } - pub fn take_stream(&self) -> IpcSender { + pub fn take_stream(&self) -> Arc>> { self.chan.clone() }