From f04c965a9bab46ff6943416baab338032bc526c1 Mon Sep 17 00:00:00 2001 From: Jackson Lewis Date: Tue, 28 Aug 2018 10:15:50 -0700 Subject: [PATCH 1/3] Read file in chunks and send chunks to FetchTaskTarget instead of read_to_end call --- components/net/fetch/methods.rs | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/components/net/fetch/methods.rs b/components/net/fetch/methods.rs index 667b46090c7..e36f0fcc86a 100644 --- a/components/net/fetch/methods.rs +++ b/components/net/fetch/methods.rs @@ -25,7 +25,7 @@ use servo_url::ServoUrl; use std::borrow::Cow; use std::fmt; use std::fs::File; -use std::io::Read; +use std::io::{BufReader, BufRead}; use std::mem; use std::str; use std::sync::{Arc, Mutex}; @@ -33,6 +33,8 @@ use std::sync::atomic::Ordering; use std::sync::mpsc::{Sender, Receiver}; use subresource_integrity::is_response_integrity_valid; +const FILE_CHUNK_SIZE: usize = 32768; //32 KB + pub type Target<'a> = &'a mut (FetchTaskTarget + Send); pub enum Data { @@ -486,8 +488,20 @@ fn scheme_fetch(request: &mut Request, Ok(file_path) => { match File::open(file_path.clone()) { Ok(mut file) => { - let mut bytes = vec![]; - let _ = file.read_to_end(&mut bytes); + let mut reader = BufReader::with_capacity(FILE_CHUNK_SIZE, file); + let mut bytes = Vec::new(); + loop { + let length = { + let mut buffer = reader.fill_buf().unwrap().to_vec(); + let buffer_len = buffer.len(); + bytes.append(&mut buffer); + target.process_response_chunk(buffer); + buffer_len + }; + if length == 0 { break; } + reader.consume(length); + } + let mime = guess_mime_type(file_path); let mut response = Response::new(url); From 04288748ee1806dbf39e9d76e8cf2876dcf06015 Mon Sep 17 00:00:00 2001 From: Jackson Lewis Date: Tue, 4 Sep 2018 14:50:33 -0700 Subject: [PATCH 2/3] Make file fetch asynchronous --- components/net/fetch/methods.rs | 58 ++++++++++++++++++++++++--------- 1 file changed, 43 insertions(+), 15 deletions(-) diff --git a/components/net/fetch/methods.rs b/components/net/fetch/methods.rs index e36f0fcc86a..f81ee094a2e 100644 --- a/components/net/fetch/methods.rs +++ b/components/net/fetch/methods.rs @@ -27,10 +27,12 @@ use std::fmt; use std::fs::File; use std::io::{BufReader, BufRead}; use std::mem; +use std::sync::mpsc::channel; use std::str; use std::sync::{Arc, Mutex}; use std::sync::atomic::Ordering; use std::sync::mpsc::{Sender, Receiver}; +use std::thread; use subresource_integrity::is_response_integrity_valid; const FILE_CHUNK_SIZE: usize = 32768; //32 KB @@ -488,25 +490,51 @@ fn scheme_fetch(request: &mut Request, Ok(file_path) => { match File::open(file_path.clone()) { Ok(mut file) => { - let mut reader = BufReader::with_capacity(FILE_CHUNK_SIZE, file); - let mut bytes = Vec::new(); - loop { - let length = { - let mut buffer = reader.fill_buf().unwrap().to_vec(); - let buffer_len = buffer.len(); - bytes.append(&mut buffer); - target.process_response_chunk(buffer); - buffer_len - }; - if length == 0 { break; } - reader.consume(length); - } - let mime = guess_mime_type(file_path); let mut response = Response::new(url); - *response.body.lock().unwrap() = ResponseBody::Done(bytes); response.headers.set(ContentType(mime)); + + let (done_sender, done_receiver) = channel(); + *done_chan = Some((done_sender.clone(), done_receiver)); + *response.body.lock().unwrap() = ResponseBody::Receiving(vec![]); + + let mut res_body = response.body.clone(); + + let cancellation_listener = context.cancellation_listener.clone(); + + thread::Builder::new().name("fetch file worker thread".to_string()).spawn(move || { + let mut reader = BufReader::with_capacity(FILE_CHUNK_SIZE, file); + loop { + if cancellation_listener.lock().unwrap().cancelled() { + *res_body.lock().unwrap() = ResponseBody::Done(vec![]); + let _ = done_sender.send(Data::Cancelled); + return; + } + let length = { + let mut buffer = reader.fill_buf().unwrap().to_vec(); + let buffer_len = buffer.len(); + if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() { + body.extend_from_slice(&buffer); + let _ = done_sender.send(Data::Payload(buffer.clone())); + } + buffer_len + }; + if length == 0 { + let mut body = res_body.lock().unwrap(); + let completed_body = match *body { + ResponseBody::Receiving(ref mut body) => { + mem::replace(body, vec![]) + }, + _ => vec![], + }; + *body = ResponseBody::Done(completed_body); + let _ = done_sender.send(Data::Done); + break; + } + reader.consume(length); + } + }).expect("Failed to create fetch file worker thread"); response }, _ => Response::network_error(NetworkError::Internal("Opening file failed".into())), From 430ba866685bd779626961926dffb0f60dbcf797 Mon Sep 17 00:00:00 2001 From: Jackson Lewis Date: Thu, 6 Sep 2018 16:09:35 -0700 Subject: [PATCH 3/3] Tidy things up --- components/net/fetch/methods.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/components/net/fetch/methods.rs b/components/net/fetch/methods.rs index f81ee094a2e..2ed528e17fb 100644 --- a/components/net/fetch/methods.rs +++ b/components/net/fetch/methods.rs @@ -27,11 +27,10 @@ use std::fmt; use std::fs::File; use std::io::{BufReader, BufRead}; use std::mem; -use std::sync::mpsc::channel; use std::str; use std::sync::{Arc, Mutex}; use std::sync::atomic::Ordering; -use std::sync::mpsc::{Sender, Receiver}; +use std::sync::mpsc::{Sender, Receiver, channel}; use std::thread; use subresource_integrity::is_response_integrity_valid; @@ -516,7 +515,7 @@ fn scheme_fetch(request: &mut Request, let buffer_len = buffer.len(); if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() { body.extend_from_slice(&buffer); - let _ = done_sender.send(Data::Payload(buffer.clone())); + let _ = done_sender.send(Data::Payload(buffer)); } buffer_len };