http-cache: wait on pending stores

and various small improvements
This commit is contained in:
Gregory Terzian 2019-09-29 22:55:47 +08:00
parent e05f0b41ba
commit 4f3ba70704
4 changed files with 239 additions and 92 deletions

View file

@ -41,6 +41,7 @@ lazy_static! {
pub type Target<'a> = &'a mut (dyn FetchTaskTarget + Send); pub type Target<'a> = &'a mut (dyn FetchTaskTarget + Send);
#[derive(Clone)]
pub enum Data { pub enum Data {
Payload(Vec<u8>), Payload(Vec<u8>),
Done, Done,
@ -456,7 +457,7 @@ pub fn main_fetch(
// Step 24. // Step 24.
target.process_response_eof(&response); target.process_response_eof(&response);
if let Ok(mut http_cache) = context.state.http_cache.write() { if let Ok(http_cache) = context.state.http_cache.write() {
http_cache.update_awaiting_consumers(&request, &response); http_cache.update_awaiting_consumers(&request, &response);
} }
@ -478,7 +479,7 @@ fn wait_for_response(response: &mut Response, target: Target, done_chan: &mut Do
}, },
Data::Done => break, Data::Done => break,
Data::Cancelled => { Data::Cancelled => {
response.aborted.store(true, Ordering::Relaxed); response.aborted.store(true, Ordering::Release);
break; break;
}, },
} }

View file

@ -38,7 +38,8 @@ pub struct CacheKey {
} }
impl CacheKey { impl CacheKey {
fn new(request: &Request) -> CacheKey { /// Create a cache-key from a request.
pub(crate) fn new(request: &Request) -> CacheKey {
CacheKey { CacheKey {
url: request.current_url(), url: request.current_url(),
} }
@ -127,7 +128,15 @@ pub struct HttpCache {
entries: HashMap<CacheKey, Vec<CachedResource>>, entries: HashMap<CacheKey, Vec<CachedResource>>,
} }
/// Determine if a given response is cacheable based on the initial metadata received. /// Determine if a response is cacheable by default <https://tools.ietf.org/html/rfc7231#section-6.1>
fn is_cacheable_by_default(status_code: u16) -> bool {
match status_code {
200 | 203 | 204 | 206 | 300 | 301 | 404 | 405 | 410 | 414 | 501 => true,
_ => false,
}
}
/// Determine if a given response is cacheable.
/// Based on <https://tools.ietf.org/html/rfc7234#section-3> /// Based on <https://tools.ietf.org/html/rfc7234#section-3>
fn response_is_cacheable(metadata: &Metadata) -> bool { fn response_is_cacheable(metadata: &Metadata) -> bool {
// TODO: if we determine that this cache should be considered shared: // TODO: if we determine that this cache should be considered shared:
@ -239,19 +248,16 @@ fn get_response_expiry(response: &Response) -> Duration {
} else { } else {
max_heuristic max_heuristic
}; };
match *code { if is_cacheable_by_default(*code) {
200 | 203 | 204 | 206 | 300 | 301 | 404 | 405 | 410 | 414 | 501 => { // Status codes that are cacheable by default can use heuristics to determine freshness.
// Status codes that are cacheable by default <https://tools.ietf.org/html/rfc7231#section-6.1> return heuristic_freshness;
return heuristic_freshness; } else {
}, // Other status codes can only use heuristic freshness if the public cache directive is present.
_ => { if let Some(ref directives) = response.headers.typed_get::<CacheControl>() {
// Other status codes can only use heuristic freshness if the public cache directive is present. if directives.public() {
if let Some(ref directives) = response.headers.typed_get::<CacheControl>() { return heuristic_freshness;
if directives.public() {
return heuristic_freshness;
}
} }
}, }
} }
} }
// Requires validation upon first use as default. // Requires validation upon first use as default.
@ -296,8 +302,11 @@ fn create_cached_response(
cached_resource: &CachedResource, cached_resource: &CachedResource,
cached_headers: &HeaderMap, cached_headers: &HeaderMap,
done_chan: &mut DoneChannel, done_chan: &mut DoneChannel,
) -> CachedResponse { ) -> Option<CachedResponse> {
debug!("creating a cached response for {:?}", request.url()); debug!("creating a cached response for {:?}", request.url());
if cached_resource.aborted.load(Ordering::Acquire) {
return None;
}
let resource_timing = ResourceFetchTiming::new(request.timing_type()); let resource_timing = ResourceFetchTiming::new(request.timing_type());
let mut response = Response::new( let mut response = Response::new(
cached_resource.data.metadata.data.final_url.clone(), cached_resource.data.metadata.data.final_url.clone(),
@ -333,10 +342,11 @@ fn create_cached_response(
// <https://tools.ietf.org/html/rfc7234#section-5.2.2.7> // <https://tools.ietf.org/html/rfc7234#section-5.2.2.7>
let has_expired = let has_expired =
(adjusted_expires < time_since_validated) || (adjusted_expires == time_since_validated); (adjusted_expires < time_since_validated) || (adjusted_expires == time_since_validated);
CachedResponse { let cached_response = CachedResponse {
response: response, response: response,
needs_validation: has_expired, needs_validation: has_expired,
} };
Some(cached_response)
} }
/// Create a new resource, based on the bytes requested, and an existing resource, /// Create a new resource, based on the bytes requested, and an existing resource,
@ -366,7 +376,7 @@ fn create_resource_with_bytes_from_resource(
/// Support for range requests <https://tools.ietf.org/html/rfc7233>. /// Support for range requests <https://tools.ietf.org/html/rfc7233>.
fn handle_range_request( fn handle_range_request(
request: &Request, request: &Request,
candidates: Vec<&CachedResource>, candidates: &[&CachedResource],
range_spec: Vec<(Bound<u64>, Bound<u64>)>, range_spec: Vec<(Bound<u64>, Bound<u64>)>,
done_chan: &mut DoneChannel, done_chan: &mut DoneChannel,
) -> Option<CachedResponse> { ) -> Option<CachedResponse> {
@ -411,7 +421,9 @@ fn handle_range_request(
let cached_headers = new_resource.data.metadata.headers.lock().unwrap(); let cached_headers = new_resource.data.metadata.headers.lock().unwrap();
let cached_response = let cached_response =
create_cached_response(request, &new_resource, &*cached_headers, done_chan); create_cached_response(request, &new_resource, &*cached_headers, done_chan);
return Some(cached_response); if let Some(cached_response) = cached_response {
return Some(cached_response);
}
} }
} }
}, },
@ -444,7 +456,9 @@ fn handle_range_request(
create_resource_with_bytes_from_resource(&bytes, partial_resource); create_resource_with_bytes_from_resource(&bytes, partial_resource);
let cached_response = let cached_response =
create_cached_response(request, &new_resource, &*headers, done_chan); create_cached_response(request, &new_resource, &*headers, done_chan);
return Some(cached_response); if let Some(cached_response) = cached_response {
return Some(cached_response);
}
} }
} }
} }
@ -459,7 +473,9 @@ fn handle_range_request(
let cached_headers = new_resource.data.metadata.headers.lock().unwrap(); let cached_headers = new_resource.data.metadata.headers.lock().unwrap();
let cached_response = let cached_response =
create_cached_response(request, &new_resource, &*cached_headers, done_chan); create_cached_response(request, &new_resource, &*cached_headers, done_chan);
return Some(cached_response); if let Some(cached_response) = cached_response {
return Some(cached_response);
}
} }
} }
}, },
@ -493,7 +509,9 @@ fn handle_range_request(
create_resource_with_bytes_from_resource(&bytes, partial_resource); create_resource_with_bytes_from_resource(&bytes, partial_resource);
let cached_response = let cached_response =
create_cached_response(request, &new_resource, &*headers, done_chan); create_cached_response(request, &new_resource, &*headers, done_chan);
return Some(cached_response); if let Some(cached_response) = cached_response {
return Some(cached_response);
}
} }
} }
} }
@ -508,7 +526,9 @@ fn handle_range_request(
let cached_headers = new_resource.data.metadata.headers.lock().unwrap(); let cached_headers = new_resource.data.metadata.headers.lock().unwrap();
let cached_response = let cached_response =
create_cached_response(request, &new_resource, &*cached_headers, done_chan); create_cached_response(request, &new_resource, &*cached_headers, done_chan);
return Some(cached_response); if let Some(cached_response) = cached_response {
return Some(cached_response);
}
} }
} }
}, },
@ -546,7 +566,9 @@ fn handle_range_request(
create_resource_with_bytes_from_resource(&bytes, partial_resource); create_resource_with_bytes_from_resource(&bytes, partial_resource);
let cached_response = let cached_response =
create_cached_response(request, &new_resource, &*headers, done_chan); create_cached_response(request, &new_resource, &*headers, done_chan);
return Some(cached_response); if let Some(cached_response) = cached_response {
return Some(cached_response);
}
} }
} }
} }
@ -638,7 +660,7 @@ impl HttpCache {
if let Some(range_spec) = request.headers.typed_get::<Range>() { if let Some(range_spec) = request.headers.typed_get::<Range>() {
return handle_range_request( return handle_range_request(
request, request,
candidates, candidates.as_slice(),
range_spec.iter().collect(), range_spec.iter().collect(),
done_chan, done_chan,
); );
@ -669,7 +691,9 @@ impl HttpCache {
let cached_headers = cached_resource.data.metadata.headers.lock().unwrap(); let cached_headers = cached_resource.data.metadata.headers.lock().unwrap();
let cached_response = let cached_response =
create_cached_response(request, cached_resource, &*cached_headers, done_chan); create_cached_response(request, cached_resource, &*cached_headers, done_chan);
return Some(cached_response); if let Some(cached_response) = cached_response {
return Some(cached_response);
}
} }
} }
debug!("couldn't find an appropriate response, not caching"); debug!("couldn't find an appropriate response, not caching");
@ -677,35 +701,47 @@ impl HttpCache {
None None
} }
/// Updating consumers who received a response constructed with a ResponseBody::Receiving. /// Wake-up consumers of cached resources
pub fn update_awaiting_consumers(&mut self, request: &Request, response: &Response) { /// whose response body was still receiving data when the resource was constructed,
if let ResponseBody::Done(ref completed_body) = /// and whose response has now either been completed or cancelled.
*response.actual_response().body.lock().unwrap() pub fn update_awaiting_consumers(&self, request: &Request, response: &Response) {
{ let entry_key = CacheKey::new(&request);
let entry_key = CacheKey::new(&request);
if let Some(cached_resources) = self.entries.get(&entry_key) { let cached_resources = match self.entries.get(&entry_key) {
// Ensure we only wake-up consumers of relevant resources, None => return,
// ie we don't want to wake-up 200 awaiting consumers with a 206. Some(resources) => resources,
let relevant_cached_resources = cached_resources };
.iter()
.filter(|resource| resource.data.raw_status == response.raw_status); // Ensure we only wake-up consumers of relevant resources,
for cached_resource in relevant_cached_resources { // ie we don't want to wake-up 200 awaiting consumers with a 206.
let mut awaiting_consumers = cached_resource.awaiting_body.lock().unwrap(); let relevant_cached_resources = cached_resources.iter().filter(|resource| {
for done_sender in awaiting_consumers.drain(..) { if response.actual_response().is_network_error() {
if cached_resource.aborted.load(Ordering::Relaxed) || return *resource.body.lock().unwrap() == ResponseBody::Empty;
response.is_network_error() }
{ resource.data.raw_status == response.raw_status
// In the case of an aborted fetch or a network errror, });
// wake-up all awaiting consumers.
// Each will then start a new network request. for cached_resource in relevant_cached_resources {
// TODO: Wake-up only one consumer, and make it the producer on which others wait. let mut awaiting_consumers = cached_resource.awaiting_body.lock().unwrap();
let _ = done_sender.send(Data::Cancelled); if awaiting_consumers.is_empty() {
} else { continue;
let _ = done_sender.send(Data::Payload(completed_body.clone())); }
let _ = done_sender.send(Data::Done); let to_send = if cached_resource.aborted.load(Ordering::Acquire) {
} // In the case of an aborted fetch,
} // wake-up all awaiting consumers.
// Each will then start a new network request.
// TODO: Wake-up only one consumer, and make it the producer on which others wait.
Data::Cancelled
} else {
match *cached_resource.body.lock().unwrap() {
ResponseBody::Done(_) | ResponseBody::Empty => Data::Done,
ResponseBody::Receiving(_) => {
continue;
},
} }
};
for done_sender in awaiting_consumers.drain(..) {
let _ = done_sender.send(to_send.clone());
} }
} }
} }
@ -857,7 +893,7 @@ impl HttpCache {
last_validated: time::now(), last_validated: time::now(),
}), }),
}; };
let entry = self.entries.entry(entry_key).or_insert(vec![]); let entry = self.entries.entry(entry_key).or_insert_with(|| vec![]);
entry.push(entry_resource); entry.push(entry_resource);
// TODO: Complete incomplete responses, including 206 response, when stored here. // TODO: Complete incomplete responses, including 206 response, when stored here.
// See A cache MAY complete a stored incomplete response by making a subsequent range request // See A cache MAY complete a stored incomplete response by making a subsequent range request

View file

@ -12,7 +12,7 @@ use crate::fetch::methods::{
}; };
use crate::fetch::methods::{Data, DoneChannel, FetchContext, Target}; use crate::fetch::methods::{Data, DoneChannel, FetchContext, Target};
use crate::hsts::HstsList; use crate::hsts::HstsList;
use crate::http_cache::HttpCache; use crate::http_cache::{CacheKey, HttpCache};
use crate::resource_thread::AuthCache; use crate::resource_thread::AuthCache;
use crossbeam_channel::{unbounded, Sender}; use crossbeam_channel::{unbounded, Sender};
use devtools_traits::{ use devtools_traits::{
@ -53,7 +53,7 @@ use std::iter::FromIterator;
use std::mem; use std::mem;
use std::ops::Deref; use std::ops::Deref;
use std::str::FromStr; use std::str::FromStr;
use std::sync::{Mutex, RwLock}; use std::sync::{Condvar, Mutex, RwLock};
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use time::{self, Tm}; use time::{self, Tm};
use tokio::prelude::{future, Future, Stream}; use tokio::prelude::{future, Future, Stream};
@ -63,10 +63,25 @@ lazy_static! {
pub static ref HANDLE: Mutex<Runtime> = { Mutex::new(Runtime::new().unwrap()) }; pub static ref HANDLE: Mutex<Runtime> = { Mutex::new(Runtime::new().unwrap()) };
} }
/// The various states an entry of the HttpCache can be in.
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum HttpCacheEntryState {
/// The entry is fully up-to-date,
/// there are no pending concurrent stores,
/// and it is ready to construct cached responses.
ReadyToConstruct,
/// The entry is pending a number of concurrent stores.
PendingStore(usize),
}
pub struct HttpState { pub struct HttpState {
pub hsts_list: RwLock<HstsList>, pub hsts_list: RwLock<HstsList>,
pub cookie_jar: RwLock<CookieStorage>, pub cookie_jar: RwLock<CookieStorage>,
pub http_cache: RwLock<HttpCache>, pub http_cache: RwLock<HttpCache>,
/// A map of cache key to entry state,
/// reflecting whether the cache entry is ready to read from,
/// or whether a concurrent pending store should be awaited.
pub http_cache_state: Mutex<HashMap<CacheKey, Arc<(Mutex<HttpCacheEntryState>, Condvar)>>>,
pub auth_cache: RwLock<AuthCache>, pub auth_cache: RwLock<AuthCache>,
pub history_states: RwLock<HashMap<HistoryStateId, Vec<u8>>>, pub history_states: RwLock<HashMap<HistoryStateId, Vec<u8>>>,
pub client: Client<Connector, Body>, pub client: Client<Connector, Body>,
@ -80,6 +95,7 @@ impl HttpState {
auth_cache: RwLock::new(AuthCache::new()), auth_cache: RwLock::new(AuthCache::new()),
history_states: RwLock::new(HashMap::new()), history_states: RwLock::new(HashMap::new()),
http_cache: RwLock::new(HttpCache::new()), http_cache: RwLock::new(HttpCache::new()),
http_cache_state: Mutex::new(HashMap::new()),
client: create_http_client(ssl_connector_builder, HANDLE.lock().unwrap().executor()), client: create_http_client(ssl_connector_builder, HANDLE.lock().unwrap().executor()),
} }
} }
@ -1020,42 +1036,124 @@ fn http_network_or_cache_fetch(
// Step 5.18 // Step 5.18
// TODO If theres a proxy-authentication entry, use it as appropriate. // TODO If theres a proxy-authentication entry, use it as appropriate.
// Step 5.19 // If the cache is not ready to construct a response, wait.
if let Ok(http_cache) = context.state.http_cache.read() { //
if let Some(response_from_cache) = http_cache.construct_response(&http_request, done_chan) { // The cache is not ready if a previous fetch checked the cache, found nothing,
let response_headers = response_from_cache.response.headers.clone(); // and moved on to a network fetch, and hasn't updated the cache yet with a pending resource.
// Substep 1, 2, 3, 4 //
let (cached_response, needs_revalidation) = // Note that this is a different workflow from the one involving `wait_for_cached_response`.
match (http_request.cache_mode, &http_request.mode) { // That one happens when a fetch gets a cache hit, and the resource is pending completion from the network.
(CacheMode::ForceCache, _) => (Some(response_from_cache.response), false), {
(CacheMode::OnlyIfCached, &RequestMode::SameOrigin) => { let (lock, cvar) = {
(Some(response_from_cache.response), false) let entry_key = CacheKey::new(&http_request);
}, let mut state_map = context.state.http_cache_state.lock().unwrap();
(CacheMode::OnlyIfCached, _) | &*state_map
(CacheMode::NoStore, _) | .entry(entry_key)
(CacheMode::Reload, _) => (None, false), .or_insert_with(|| {
(_, _) => ( Arc::new((
Some(response_from_cache.response), Mutex::new(HttpCacheEntryState::ReadyToConstruct),
response_from_cache.needs_validation, Condvar::new(),
), ))
}; })
if needs_revalidation { .clone()
revalidating_flag = true; };
// Substep 5
if let Some(http_date) = response_headers.typed_get::<LastModified>() { // Start of critical section on http-cache state.
let http_date: SystemTime = http_date.into(); let mut state = lock.lock().unwrap();
http_request while let HttpCacheEntryState::PendingStore(_) = *state {
.headers let (current_state, time_out) = cvar
.typed_insert(IfModifiedSince::from(http_date)); .wait_timeout(state, Duration::from_millis(500))
.unwrap();
state = current_state;
if time_out.timed_out() {
// After a timeout, ignore the pending store.
break;
}
}
// Step 5.19
if let Ok(http_cache) = context.state.http_cache.read() {
if let Some(response_from_cache) =
http_cache.construct_response(&http_request, done_chan)
{
let response_headers = response_from_cache.response.headers.clone();
// Substep 1, 2, 3, 4
let (cached_response, needs_revalidation) =
match (http_request.cache_mode, &http_request.mode) {
(CacheMode::ForceCache, _) => (Some(response_from_cache.response), false),
(CacheMode::OnlyIfCached, &RequestMode::SameOrigin) => {
(Some(response_from_cache.response), false)
},
(CacheMode::OnlyIfCached, _) |
(CacheMode::NoStore, _) |
(CacheMode::Reload, _) => (None, false),
(_, _) => (
Some(response_from_cache.response),
response_from_cache.needs_validation,
),
};
if cached_response.is_none() {
// Ensure the done chan is not set if we're not using the cached response,
// as the cache might have set it to Some if it constructed a pending response.
*done_chan = None;
// Update the cache state, incrementing the pending store count,
// or starting the count.
if let HttpCacheEntryState::PendingStore(i) = *state {
let new = i + 1;
*state = HttpCacheEntryState::PendingStore(new);
} else {
*state = HttpCacheEntryState::PendingStore(1);
}
} }
if let Some(entity_tag) = response_headers.get(header::ETAG) { if needs_revalidation {
http_request revalidating_flag = true;
.headers // Substep 5
.insert(header::IF_NONE_MATCH, entity_tag.clone()); if let Some(http_date) = response_headers.typed_get::<LastModified>() {
let http_date: SystemTime = http_date.into();
http_request
.headers
.typed_insert(IfModifiedSince::from(http_date));
}
if let Some(entity_tag) = response_headers.get(header::ETAG) {
http_request
.headers
.insert(header::IF_NONE_MATCH, entity_tag.clone());
}
} else {
// Substep 6
response = cached_response;
} }
}
}
// Notify the next thread waiting in line, if there is any.
if *state == HttpCacheEntryState::ReadyToConstruct {
cvar.notify_one();
}
// End of critical section on http-cache state.
}
// Decrement the number of pending stores,
// and set the state to ready to construct,
// if no stores are pending.
fn update_http_cache_state(context: &FetchContext, http_request: &Request) {
let (lock, cvar) = {
let entry_key = CacheKey::new(&http_request);
let mut state_map = context.state.http_cache_state.lock().unwrap();
&*state_map
.get_mut(&entry_key)
.expect("Entry in http-cache state to have been previously inserted")
.clone()
};
let mut state = lock.lock().unwrap();
if let HttpCacheEntryState::PendingStore(i) = *state {
let new = i - 1;
if new == 0 {
*state = HttpCacheEntryState::ReadyToConstruct;
// Notify the next thread waiting in line, if there is any.
cvar.notify_one();
} else { } else {
// Substep 6 *state = HttpCacheEntryState::PendingStore(new);
response = cached_response;
} }
} }
} }
@ -1065,6 +1163,7 @@ fn http_network_or_cache_fetch(
// The cache constructed a response with a body of ResponseBody::Receiving. // The cache constructed a response with a body of ResponseBody::Receiving.
// We wait for the response in the cache to "finish", // We wait for the response in the cache to "finish",
// with a body of either Done or Cancelled. // with a body of either Done or Cancelled.
assert!(response.is_some());
loop { loop {
match ch match ch
.1 .1
@ -1095,6 +1194,9 @@ fn http_network_or_cache_fetch(
if response.is_none() { if response.is_none() {
// Substep 1 // Substep 1
if http_request.cache_mode == CacheMode::OnlyIfCached { if http_request.cache_mode == CacheMode::OnlyIfCached {
// The cache will not be updated,
// set its state to ready to construct.
update_http_cache_state(context, &http_request);
return Response::network_error(NetworkError::Internal( return Response::network_error(NetworkError::Internal(
"Couldn't find response in cache".into(), "Couldn't find response in cache".into(),
)); ));
@ -1141,6 +1243,9 @@ fn http_network_or_cache_fetch(
let mut response = response.unwrap(); let mut response = response.unwrap();
// The cache has been updated, set its state to ready to construct.
update_http_cache_state(context, &http_request);
// Step 8 // Step 8
// TODO: if necessary set response's range-requested flag // TODO: if necessary set response's range-requested flag
@ -1170,6 +1275,10 @@ fn http_network_or_cache_fetch(
return response; return response;
} }
// Make sure this is set to None,
// since we're about to start a new `http_network_or_cache_fetch`.
*done_chan = None;
// Substep 4 // Substep 4
response = http_network_or_cache_fetch( response = http_network_or_cache_fetch(
http_request, http_request,

View file

@ -146,6 +146,7 @@ fn create_http_states(
cookie_jar: RwLock::new(cookie_jar), cookie_jar: RwLock::new(cookie_jar),
auth_cache: RwLock::new(auth_cache), auth_cache: RwLock::new(auth_cache),
http_cache: RwLock::new(http_cache), http_cache: RwLock::new(http_cache),
http_cache_state: Mutex::new(HashMap::new()),
hsts_list: RwLock::new(hsts_list), hsts_list: RwLock::new(hsts_list),
history_states: RwLock::new(HashMap::new()), history_states: RwLock::new(HashMap::new()),
client: create_http_client(ssl_connector_builder, HANDLE.lock().unwrap().executor()), client: create_http_client(ssl_connector_builder, HANDLE.lock().unwrap().executor()),