From a84442864d64242903a2b55c170b7f889ab4ab32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Jim=C3=A9nez=20Moreno?= Date: Sat, 10 Nov 2018 11:57:47 +0100 Subject: [PATCH] Add support fo byte range requests for blob URLs --- components/net/blob_loader.rs | 41 ------ components/net/fetch/methods.rs | 185 ++++++++++++--------------- components/net/filemanager_thread.rs | 147 +++++++++++++-------- components/net/lib.rs | 1 - 4 files changed, 179 insertions(+), 195 deletions(-) delete mode 100644 components/net/blob_loader.rs diff --git a/components/net/blob_loader.rs b/components/net/blob_loader.rs deleted file mode 100644 index ae8f09879a4..00000000000 --- a/components/net/blob_loader.rs +++ /dev/null @@ -1,41 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ - -use crate::fetch::methods::{Data, DoneChannel}; -use crate::filemanager_thread::FileManager; -use net_traits::blob_url_store::parse_blob_url; -use net_traits::response::{Response, ResponseBody}; -use net_traits::{NetworkError, ResourceFetchTiming}; -use servo_channel::channel; -use servo_url::ServoUrl; - -// TODO: Check on GET -// https://w3c.github.io/FileAPI/#requestResponseModel - -/// https://fetch.spec.whatwg.org/#concept-basic-fetch (partial) -pub fn load_blob_async( - url: ServoUrl, - filemanager: FileManager, - done_chan: &mut DoneChannel, -) -> Response { - let (id, origin) = match parse_blob_url(&url) { - Ok((id, origin)) => (id, origin), - Err(()) => { - return Response::network_error(NetworkError::Internal("Invalid blob url".into())); - }, - }; - - let mut response = Response::new(url, ResourceFetchTiming::new(request.timing_type())); - let (sender, receiver) = channel(); - *done_chan = Some((sender.clone(), receiver)); - *response.body.lock().unwrap() = ResponseBody::Receiving(vec![]); - let check_url_validity = true; - if let Err(err) = filemanager.fetch_file(&sender, id, check_url_validity, origin, &mut response) - { - let _ = sender.send(Data::Done); - return Response::network_error(NetworkError::Internal(err)); - }; - - response -} diff --git a/components/net/fetch/methods.rs b/components/net/fetch/methods.rs index 21ccbad73a0..c5e6e45dde5 100644 --- a/components/net/fetch/methods.rs +++ b/components/net/fetch/methods.rs @@ -2,10 +2,9 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -use crate::blob_loader::load_blob_async; use crate::data_loader::decode; use crate::fetch::cors_cache::CorsCache; -use crate::filemanager_thread::FileManager; +use crate::filemanager_thread::{fetch_file_in_chunks, FILE_CHUNK_SIZE, FileManager}; use crate::http_loader::{determine_request_referrer, http_fetch, HttpState}; use crate::http_loader::{set_default_accept, set_default_accept_language}; use crate::subresource_integrity::is_response_integrity_valid; @@ -19,28 +18,27 @@ use hyper::StatusCode; use ipc_channel::ipc::IpcReceiver; use mime::{self, Mime}; use mime_guess::guess_mime_type; +use net_traits::blob_url_store::parse_blob_url; +use net_traits::filemanager_thread::RelativePos; use net_traits::request::{CredentialsMode, Destination, Referrer, Request, RequestMode}; use net_traits::request::{Origin, ResponseTainting, Window}; use net_traits::response::{Response, ResponseBody, ResponseType}; use net_traits::{FetchTaskTarget, NetworkError, ReferrerPolicy, ResourceFetchTiming}; use servo_url::ServoUrl; use std::borrow::Cow; -use std::fs::File; -use std::io::{BufRead, BufReader, Seek, SeekFrom}; +use std::fs::{File, Metadata}; +use std::io::{BufReader, Seek, SeekFrom}; use std::mem; use std::ops::Bound; use std::str; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; -use std::thread; lazy_static! { static ref X_CONTENT_TYPE_OPTIONS: HeaderName = HeaderName::from_static("x-content-type-options"); } -const FILE_CHUNK_SIZE: usize = 32768; //32 KB - pub type Target<'a> = &'a mut (dyn FetchTaskTarget + Send); pub enum Data { @@ -492,6 +490,36 @@ fn wait_for_response(response: &mut Response, target: Target, done_chan: &mut Do } } +/// Get the range bounds if the `Range` header is present. +fn get_range_bounds(range: Option, metadata: Option) -> RelativePos { + if let Some(ref range) = range + { + let (start, end) = match range + .iter() + .collect::, Bound)>>() + .first() + { + Some(&(Bound::Included(start), Bound::Unbounded)) => (start, None), + Some(&(Bound::Included(start), Bound::Included(end))) => { + // `end` should be less or equal to `start`. + (start, Some(i64::max(start as i64, end as i64))) + }, + Some(&(Bound::Unbounded, Bound::Included(offset))) => { + if let Some(metadata) = metadata { + // `offset` cannot be bigger than the file size. + (metadata.len() - u64::min(metadata.len(), offset), None) + } else { + (0, None) + } + }, + _ => (0, None), + }; + RelativePos::from_opts(Some(start as i64), end) + } else { + RelativePos::from_opts(Some(0), None) + } +} + /// [Scheme fetch](https://fetch.spec.whatwg.org#scheme-fetch) fn scheme_fetch( request: &mut Request, @@ -537,106 +565,35 @@ fn scheme_fetch( } if let Ok(file_path) = url.to_file_path() { if let Ok(file) = File::open(file_path.clone()) { - let mime = guess_mime_type(file_path); + let mut response = Response::new(url, ResourceFetchTiming::new(request.timing_type())); - let mut response = - Response::new(url, ResourceFetchTiming::new(request.timing_type())); + // Get range bounds (if any) and try to seek to the requested offset. + // If seeking fails, bail out with a NetworkError. + let range = get_range_bounds(request.headers.typed_get::(), file.metadata().ok()); + let mut reader = BufReader::with_capacity(FILE_CHUNK_SIZE, file); + if reader.seek(SeekFrom::Start(range.start as u64)).is_err() { + *response.body.lock().unwrap() = ResponseBody::Done(vec![]); + return Response::network_error(NetworkError::Internal( + "Unexpected method for blob".into(), + )); + } + + // Set Content-Type header. + let mime = guess_mime_type(file_path); response.headers.typed_insert(ContentType::from(mime)); - let (done_sender, done_receiver) = unbounded(); + // Setup channel to receive cross-thread messages about the file fetch + // operation. + let (done_sender, done_receiver) = channel(); *done_chan = Some((done_sender.clone(), done_receiver)); *response.body.lock().unwrap() = ResponseBody::Receiving(vec![]); - let res_body = response.body.clone(); + fetch_file_in_chunks(done_sender, + reader, + response.body.clone(), + context.cancellation_listener.clone(), + range); - let cancellation_listener = context.cancellation_listener.clone(); - - let (start, end) = if let Some(ref range) = request.headers.typed_get::() - { - match range - .iter() - .collect::, Bound)>>() - .first() - { - Some(&(Bound::Included(start), Bound::Unbounded)) => (start, None), - Some(&(Bound::Included(start), Bound::Included(end))) => { - // `end` should be less or equal to `start`. - (start, Some(u64::max(start, end))) - }, - Some(&(Bound::Unbounded, Bound::Included(offset))) => { - if let Ok(metadata) = file.metadata() { - // `offset` cannot be bigger than the file size. - (metadata.len() - u64::min(metadata.len(), offset), None) - } else { - (0, None) - } - }, - _ => (0, None), - } - } else { - (0, None) - }; - - thread::Builder::new() - .name("fetch file worker thread".to_string()) - .spawn(move || { - let mut reader = BufReader::with_capacity(FILE_CHUNK_SIZE, file); - if reader.seek(SeekFrom::Start(start)).is_err() { - warn!("Fetch - could not seek to {:?}", start); - } - - loop { - if cancellation_listener.lock().unwrap().cancelled() { - *res_body.lock().unwrap() = ResponseBody::Done(vec![]); - let _ = done_sender.send(Data::Cancelled); - return; - } - let length = { - let buffer = reader.fill_buf().unwrap().to_vec(); - let mut buffer_len = buffer.len(); - if let ResponseBody::Receiving(ref mut body) = - *res_body.lock().unwrap() - { - let offset = usize::min( - { - if let Some(end) = end { - let remaining_bytes = - end as usize - start as usize - body.len(); - if remaining_bytes <= FILE_CHUNK_SIZE { - // This is the last chunk so we set buffer - // len to 0 to break the reading loop. - buffer_len = 0; - remaining_bytes - } else { - FILE_CHUNK_SIZE - } - } else { - FILE_CHUNK_SIZE - } - }, - buffer.len(), - ); - body.extend_from_slice(&buffer[0..offset]); - let _ = done_sender.send(Data::Payload(buffer)); - } - buffer_len - }; - if length == 0 { - let mut body = res_body.lock().unwrap(); - let completed_body = match *body { - ResponseBody::Receiving(ref mut body) => { - mem::replace(body, vec![]) - }, - _ => vec![], - }; - *body = ResponseBody::Done(completed_body); - let _ = done_sender.send(Data::Done); - break; - } - reader.consume(length); - } - }) - .expect("Failed to create fetch file worker thread"); response } else { Response::network_error(NetworkError::Internal("Opening file failed".into())) @@ -657,7 +614,33 @@ fn scheme_fetch( )); } - load_blob_async(url.clone(), context.filemanager.clone(), done_chan) + let range = get_range_bounds(request.headers.typed_get::(), None); + + let (id, origin) = match parse_blob_url(&url) { + Ok((id, origin)) => (id, origin), + Err(()) => { + return Response::network_error(NetworkError::Internal("Invalid blob url".into())); + }, + }; + + let mut response = Response::new(url); + let (done_sender, done_receiver) = channel(); + *done_chan = Some((done_sender.clone(), done_receiver)); + *response.body.lock().unwrap() = ResponseBody::Receiving(vec![]); + let check_url_validity = true; + if let Err(err) = context.filemanager.fetch_file(&done_sender, + context.cancellation_listener.clone(), + id, + check_url_validity, + origin, + &mut response, + range) + { + let _ = done_sender.send(Data::Done); + return Response::network_error(NetworkError::Internal(err)); + }; + + response }, "ftp" => { diff --git a/components/net/filemanager_thread.rs b/components/net/filemanager_thread.rs index 660fe1d8875..032199d3f43 100644 --- a/components/net/filemanager_thread.rs +++ b/components/net/filemanager_thread.rs @@ -2,7 +2,7 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -use crate::fetch::methods::Data; +use crate::fetch::methods::{CancellationListener, Data}; use embedder_traits::{EmbedderMsg, EmbedderProxy, FilterPattern}; use headers_ext::{ContentLength, ContentType, HeaderMap, HeaderMapExt}; use http::header::{self, HeaderValue}; @@ -21,7 +21,8 @@ use servo_channel; use servo_config::prefs::PREFS; use std::collections::HashMap; use std::fs::File; -use std::io::{Read, Seek, SeekFrom}; +use std::io::{BufRead, BufReader, Read, Seek, SeekFrom}; +use std::mem; use std::ops::Index; use std::path::{Path, PathBuf}; use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; @@ -30,6 +31,8 @@ use std::thread; use url::Url; use uuid::Uuid; +pub const FILE_CHUNK_SIZE: usize = 32768; //32 KB + /// FileManagerStore's entry struct FileStoreEntry { /// Origin of the entry's "creator" @@ -104,18 +107,21 @@ impl FileManager { // in a separate thread. pub fn fetch_file( &self, - sender: &servo_channel::Sender, + done_sender: &servo_channel::Sender, + cancellation_listener: Arc>, id: Uuid, check_url_validity: bool, origin: FileOrigin, response: &mut Response, + range: RelativePos ) -> Result<(), String> { self.store .fetch_blob_buf( - sender, + done_sender, + cancellation_listener, &id, &origin, - RelativePos::full_range(), + range, check_url_validity, response, ) @@ -483,7 +489,7 @@ impl FileManagerStore { None => "".to_string(), }; - chunked_read(sender, &mut file, range.len(), opt_filename, type_string); + read_file_in_chunks(sender, &mut file, range.len(), opt_filename, type_string); Ok(()) } else { Err(BlobURLStoreError::InvalidEntry) @@ -522,17 +528,18 @@ impl FileManagerStore { fn fetch_blob_buf( &self, - sender: &servo_channel::Sender, + done_sender: &servo_channel::Sender, + cancellation_listener: Arc>, id: &Uuid, origin_in: &FileOrigin, - rel_pos: RelativePos, + range: RelativePos, check_url_validity: bool, response: &mut Response, ) -> Result<(), BlobURLStoreError> { let file_impl = self.get_impl(id, origin_in, check_url_validity)?; match file_impl { FileImpl::Memory(buf) => { - let range = rel_pos.to_abs_range(buf.size as usize); + let range = range.to_abs_range(buf.size as usize); let len = range.len() as u64; set_headers( @@ -545,8 +552,8 @@ impl FileManagerStore { let mut bytes = vec![]; bytes.extend_from_slice(buf.bytes.index(range)); - let _ = sender.send(Data::Payload(bytes)); - let _ = sender.send(Data::Done); + let _ = done_sender.send(Data::Payload(bytes)); + let _ = done_sender.send(Data::Done); Ok(()) }, @@ -557,16 +564,12 @@ impl FileManagerStore { create_entry is called. */ - let mut file = File::open(&metadata.path) + let file = File::open(&metadata.path) .map_err(|e| BlobURLStoreError::External(e.to_string()))?; - let range = rel_pos.to_abs_range(metadata.size as usize); - let range_start = range.start as u64; - let seeked_start = file - .seek(SeekFrom::Start(range_start)) - .map_err(|e| BlobURLStoreError::External(e.to_string()))?; - if seeked_start != range_start { - return Err(BlobURLStoreError::InvalidEntry); + let mut reader = BufReader::with_capacity(FILE_CHUNK_SIZE, file); + if reader.seek(SeekFrom::Start(range.start as u64)).is_err() { + return Err(BlobURLStoreError::External("Unexpected method for blob".into())); } let filename = metadata @@ -582,22 +585,23 @@ impl FileManagerStore { filename, ); - let body = response.body.clone(); - let sender = sender.clone(); - thread::Builder::new() - .name("fetch file".to_owned()) - .spawn(move || chunked_fetch(sender, &mut file, body)) - .expect("Thread spawn failed"); + fetch_file_in_chunks(done_sender.clone(), + reader, + response.body.clone(), + cancellation_listener, + range); + Ok(()) }, FileImpl::Sliced(parent_id, inner_rel_pos) => { // Next time we don't need to check validity since // we have already done that for requesting URL if necessary. return self.fetch_blob_buf( - sender, + done_sender, + cancellation_listener, &parent_id, origin_in, - rel_pos.slice_inner(&inner_rel_pos), + range.slice_inner(&inner_rel_pos), false, response, ); @@ -725,9 +729,7 @@ fn select_files_pref_enabled() -> bool { .unwrap_or(false) } -const CHUNK_SIZE: usize = 8192; - -fn chunked_read( +fn read_file_in_chunks( sender: &IpcSender>, file: &mut File, size: usize, @@ -735,7 +737,7 @@ fn chunked_read( type_string: String, ) { // First chunk - let mut buf = vec![0; CHUNK_SIZE]; + let mut buf = vec![0; FILE_CHUNK_SIZE]; match file.read(&mut buf) { Ok(n) => { buf.truncate(n); @@ -755,7 +757,7 @@ fn chunked_read( // Send the remaining chunks loop { - let mut buf = vec![0; CHUNK_SIZE]; + let mut buf = vec![0; FILE_CHUNK_SIZE]; match file.read(&mut buf) { Ok(0) => { let _ = sender.send(Ok(ReadFileProgress::EOF)); @@ -773,27 +775,68 @@ fn chunked_read( } } -fn chunked_fetch( - sender: servo_channel::Sender, - file: &mut File, - response_body: ServoArc>, +pub fn fetch_file_in_chunks( + done_sender: servo_channel::Sender, + mut reader: BufReader, + res_body: ServoArc>, + cancellation_listener: Arc>, + range: RelativePos, ) { - loop { - let mut buf = vec![0; CHUNK_SIZE]; - match file.read(&mut buf) { - Ok(0) | Err(_) => { - *response_body.lock().unwrap() = ResponseBody::Done(vec![]); - let _ = sender.send(Data::Done); - return; - }, - Ok(n) => { - buf.truncate(n); - let mut bytes = vec![]; - bytes.extend_from_slice(&buf); - let _ = sender.send(Data::Payload(buf)); - }, - } - } + thread::Builder::new() + .name("fetch file worker thread".to_string()) + .spawn(move || { + loop { + if cancellation_listener.lock().unwrap().cancelled() { + *res_body.lock().unwrap() = ResponseBody::Done(vec![]); + let _ = done_sender.send(Data::Cancelled); + return; + } + let length = { + let buffer = reader.fill_buf().unwrap().to_vec(); + let mut buffer_len = buffer.len(); + if let ResponseBody::Receiving(ref mut body) = + *res_body.lock().unwrap() + { + let offset = usize::min( + { + if let Some(end) = range.end { + let remaining_bytes = + end as usize - range.start as usize - body.len(); + if remaining_bytes <= FILE_CHUNK_SIZE { + // This is the last chunk so we set buffer + // len to 0 to break the reading loop. + buffer_len = 0; + remaining_bytes + } else { + FILE_CHUNK_SIZE + } + } else { + FILE_CHUNK_SIZE + } + }, + buffer.len(), + ); + body.extend_from_slice(&buffer[0..offset]); + let _ = done_sender.send(Data::Payload(buffer)); + } + buffer_len + }; + if length == 0 { + let mut body = res_body.lock().unwrap(); + let completed_body = match *body { + ResponseBody::Receiving(ref mut body) => { + mem::replace(body, vec![]) + }, + _ => vec![], + }; + *body = ResponseBody::Done(completed_body); + let _ = done_sender.send(Data::Done); + break; + } + reader.consume(length); + } + }) + .expect("Failed to create fetch file worker thread"); } fn set_headers(headers: &mut HeaderMap, content_length: u64, mime: Mime, filename: Option) { diff --git a/components/net/lib.rs b/components/net/lib.rs index 04d41dd8d30..d5943fa380c 100644 --- a/components/net/lib.rs +++ b/components/net/lib.rs @@ -17,7 +17,6 @@ extern crate profile_traits; #[macro_use] extern crate serde; -mod blob_loader; pub mod connector; pub mod cookie; pub mod cookie_storage;