add a core resource thread-pool

This commit is contained in:
Gregory Terzian 2020-02-11 15:00:27 +08:00
parent baac1e2c69
commit 780a1bd6cb
7 changed files with 399 additions and 219 deletions

View file

@ -3,6 +3,7 @@
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use crate::fetch::methods::{CancellationListener, Data, RangeRequestBounds};
use crate::resource_thread::CoreResourceThreadPool;
use crossbeam_channel::Sender;
use embedder_traits::{EmbedderMsg, EmbedderProxy, FilterPattern};
use headers::{ContentLength, ContentType, HeaderMap, HeaderMapExt};
@ -24,8 +25,7 @@ use std::mem;
use std::ops::Index;
use std::path::{Path, PathBuf};
use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::sync::{Arc, Mutex, RwLock, Weak};
use url::Url;
use uuid::Uuid;
@ -72,13 +72,18 @@ enum FileImpl {
pub struct FileManager {
embedder_proxy: EmbedderProxy,
store: Arc<FileManagerStore>,
thread_pool: Weak<CoreResourceThreadPool>,
}
impl FileManager {
pub fn new(embedder_proxy: EmbedderProxy) -> FileManager {
pub fn new(
embedder_proxy: EmbedderProxy,
pool_handle: Weak<CoreResourceThreadPool>,
) -> FileManager {
FileManager {
embedder_proxy: embedder_proxy,
store: Arc::new(FileManagerStore::new()),
thread_pool: pool_handle,
}
}
@ -90,14 +95,19 @@ impl FileManager {
origin: FileOrigin,
) {
let store = self.store.clone();
thread::Builder::new()
.name("read file".to_owned())
.spawn(move || {
if let Err(e) = store.try_read_file(&sender, id, check_url_validity, origin) {
let _ = sender.send(Err(FileManagerThreadError::BlobURLStoreError(e)));
}
self.thread_pool
.upgrade()
.and_then(|pool| {
pool.spawn(move || {
if let Err(e) = store.try_read_file(&sender, id, check_url_validity, origin) {
let _ = sender.send(Err(FileManagerThreadError::BlobURLStoreError(e)));
}
});
Some(())
})
.expect("Thread spawning failed");
.unwrap_or_else(|| {
warn!("FileManager tried to read a file after CoreResourceManager has exited.");
});
}
// Read a file for the Fetch implementation.
@ -113,7 +123,7 @@ impl FileManager {
response: &mut Response,
range: RangeRequestBounds,
) -> Result<(), BlobURLStoreError> {
self.store.fetch_blob_buf(
self.fetch_blob_buf(
done_sender,
cancellation_listener,
&id,
@ -134,22 +144,36 @@ impl FileManager {
FileManagerThreadMsg::SelectFile(filter, sender, origin, opt_test_path) => {
let store = self.store.clone();
let embedder = self.embedder_proxy.clone();
thread::Builder::new()
.name("select file".to_owned())
.spawn(move || {
store.select_file(filter, sender, origin, opt_test_path, embedder);
self.thread_pool
.upgrade()
.and_then(|pool| {
pool.spawn(move || {
store.select_file(filter, sender, origin, opt_test_path, embedder);
});
Some(())
})
.expect("Thread spawning failed");
.unwrap_or_else(|| {
warn!(
"FileManager tried to select a file after CoreResourceManager has exited."
);
});
},
FileManagerThreadMsg::SelectFiles(filter, sender, origin, opt_test_paths) => {
let store = self.store.clone();
let embedder = self.embedder_proxy.clone();
thread::Builder::new()
.name("select files".to_owned())
.spawn(move || {
store.select_files(filter, sender, origin, opt_test_paths, embedder);
self.thread_pool
.upgrade()
.and_then(|pool| {
pool.spawn(move || {
store.select_files(filter, sender, origin, opt_test_paths, embedder);
});
Some(())
})
.expect("Thread spawning failed");
.unwrap_or_else(|| {
warn!(
"FileManager tried to select multiple files after CoreResourceManager has exited."
);
});
},
FileManagerThreadMsg::ReadFile(sender, id, check_url_validity, origin) => {
self.read_file(sender, id, check_url_validity, origin);
@ -171,6 +195,183 @@ impl FileManager {
},
}
}
pub fn fetch_file_in_chunks(
&self,
done_sender: Sender<Data>,
mut reader: BufReader<File>,
res_body: ServoArc<Mutex<ResponseBody>>,
cancellation_listener: Arc<Mutex<CancellationListener>>,
range: RelativePos,
) {
self.thread_pool
.upgrade()
.and_then(|pool| {
pool.spawn(move || {
loop {
if cancellation_listener.lock().unwrap().cancelled() {
*res_body.lock().unwrap() = ResponseBody::Done(vec![]);
let _ = done_sender.send(Data::Cancelled);
return;
}
let length = {
let buffer = reader.fill_buf().unwrap().to_vec();
let mut buffer_len = buffer.len();
if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap()
{
let offset = usize::min(
{
if let Some(end) = range.end {
// HTTP Range requests are specified with closed ranges,
// while Rust uses half-open ranges. We add +1 here so
// we don't skip the last requested byte.
let remaining_bytes =
end as usize - range.start as usize - body.len() +
1;
if remaining_bytes <= FILE_CHUNK_SIZE {
// This is the last chunk so we set buffer
// len to 0 to break the reading loop.
buffer_len = 0;
remaining_bytes
} else {
FILE_CHUNK_SIZE
}
} else {
FILE_CHUNK_SIZE
}
},
buffer.len(),
);
let chunk = &buffer[0..offset];
body.extend_from_slice(chunk);
let _ = done_sender.send(Data::Payload(chunk.to_vec()));
}
buffer_len
};
if length == 0 {
let mut body = res_body.lock().unwrap();
let completed_body = match *body {
ResponseBody::Receiving(ref mut body) => mem::replace(body, vec![]),
_ => vec![],
};
*body = ResponseBody::Done(completed_body);
let _ = done_sender.send(Data::Done);
break;
}
reader.consume(length);
}
});
Some(())
})
.unwrap_or_else(|| {
warn!("FileManager tried to fetch a file in chunks after CoreResourceManager has exited.");
});
}
fn fetch_blob_buf(
&self,
done_sender: &Sender<Data>,
cancellation_listener: Arc<Mutex<CancellationListener>>,
id: &Uuid,
origin_in: &FileOrigin,
range: RangeRequestBounds,
check_url_validity: bool,
response: &mut Response,
) -> Result<(), BlobURLStoreError> {
let file_impl = self.store.get_impl(id, origin_in, check_url_validity)?;
match file_impl {
FileImpl::Memory(buf) => {
let range = match range.get_final(Some(buf.size)) {
Ok(range) => range,
Err(_) => {
return Err(BlobURLStoreError::InvalidRange);
},
};
let range = range.to_abs_range(buf.size as usize);
let len = range.len() as u64;
set_headers(
&mut response.headers,
len,
buf.type_string.parse().unwrap_or(mime::TEXT_PLAIN),
/* filename */ None,
);
let mut bytes = vec![];
bytes.extend_from_slice(buf.bytes.index(range));
let _ = done_sender.send(Data::Payload(bytes));
let _ = done_sender.send(Data::Done);
Ok(())
},
FileImpl::MetaDataOnly(metadata) => {
/* XXX: Snapshot state check (optional) https://w3c.github.io/FileAPI/#snapshot-state.
Concretely, here we create another file, and this file might not
has the same underlying file state (meta-info plus content) as the time
create_entry is called.
*/
let file = File::open(&metadata.path)
.map_err(|e| BlobURLStoreError::External(e.to_string()))?;
let range = match range.get_final(Some(metadata.size)) {
Ok(range) => range,
Err(_) => {
return Err(BlobURLStoreError::InvalidRange);
},
};
let mut reader = BufReader::with_capacity(FILE_CHUNK_SIZE, file);
if reader.seek(SeekFrom::Start(range.start as u64)).is_err() {
return Err(BlobURLStoreError::External(
"Unexpected method for blob".into(),
));
}
let filename = metadata
.path
.file_name()
.and_then(|osstr| osstr.to_str())
.map(|s| s.to_string());
set_headers(
&mut response.headers,
metadata.size,
mime_guess::from_path(metadata.path)
.first()
.unwrap_or(mime::TEXT_PLAIN),
filename,
);
self.fetch_file_in_chunks(
done_sender.clone(),
reader,
response.body.clone(),
cancellation_listener,
range,
);
Ok(())
},
FileImpl::Sliced(parent_id, inner_rel_pos) => {
// Next time we don't need to check validity since
// we have already done that for requesting URL if necessary.
return self.fetch_blob_buf(
done_sender,
cancellation_listener,
&parent_id,
origin_in,
RangeRequestBounds::Final(
RelativePos::full_range().slice_inner(&inner_rel_pos),
),
false,
response,
);
},
}
}
}
/// File manager's data store. It maintains a thread-safe mapping
@ -188,7 +389,7 @@ impl FileManagerStore {
}
/// Copy out the file backend implementation content
fn get_impl(
pub fn get_impl(
&self,
id: &Uuid,
origin_in: &FileOrigin,
@ -510,111 +711,6 @@ impl FileManagerStore {
)
}
fn fetch_blob_buf(
&self,
done_sender: &Sender<Data>,
cancellation_listener: Arc<Mutex<CancellationListener>>,
id: &Uuid,
origin_in: &FileOrigin,
range: RangeRequestBounds,
check_url_validity: bool,
response: &mut Response,
) -> Result<(), BlobURLStoreError> {
let file_impl = self.get_impl(id, origin_in, check_url_validity)?;
match file_impl {
FileImpl::Memory(buf) => {
let range = match range.get_final(Some(buf.size)) {
Ok(range) => range,
Err(_) => {
return Err(BlobURLStoreError::InvalidRange);
},
};
let range = range.to_abs_range(buf.size as usize);
let len = range.len() as u64;
set_headers(
&mut response.headers,
len,
buf.type_string.parse().unwrap_or(mime::TEXT_PLAIN),
/* filename */ None,
);
let mut bytes = vec![];
bytes.extend_from_slice(buf.bytes.index(range));
let _ = done_sender.send(Data::Payload(bytes));
let _ = done_sender.send(Data::Done);
Ok(())
},
FileImpl::MetaDataOnly(metadata) => {
/* XXX: Snapshot state check (optional) https://w3c.github.io/FileAPI/#snapshot-state.
Concretely, here we create another file, and this file might not
has the same underlying file state (meta-info plus content) as the time
create_entry is called.
*/
let file = File::open(&metadata.path)
.map_err(|e| BlobURLStoreError::External(e.to_string()))?;
let range = match range.get_final(Some(metadata.size)) {
Ok(range) => range,
Err(_) => {
return Err(BlobURLStoreError::InvalidRange);
},
};
let mut reader = BufReader::with_capacity(FILE_CHUNK_SIZE, file);
if reader.seek(SeekFrom::Start(range.start as u64)).is_err() {
return Err(BlobURLStoreError::External(
"Unexpected method for blob".into(),
));
}
let filename = metadata
.path
.file_name()
.and_then(|osstr| osstr.to_str())
.map(|s| s.to_string());
set_headers(
&mut response.headers,
metadata.size,
mime_guess::from_path(metadata.path)
.first()
.unwrap_or(mime::TEXT_PLAIN),
filename,
);
fetch_file_in_chunks(
done_sender.clone(),
reader,
response.body.clone(),
cancellation_listener,
range,
);
Ok(())
},
FileImpl::Sliced(parent_id, inner_rel_pos) => {
// Next time we don't need to check validity since
// we have already done that for requesting URL if necessary.
return self.fetch_blob_buf(
done_sender,
cancellation_listener,
&parent_id,
origin_in,
RangeRequestBounds::Final(
RelativePos::full_range().slice_inner(&inner_rel_pos),
),
false,
response,
);
},
}
}
fn dec_ref(&self, id: &Uuid, origin_in: &FileOrigin) -> Result<(), BlobURLStoreError> {
let (do_remove, opt_parent_id) = match self.entries.read().unwrap().get(id) {
Some(entry) => {
@ -763,70 +859,6 @@ fn read_file_in_chunks(
}
}
pub fn fetch_file_in_chunks(
done_sender: Sender<Data>,
mut reader: BufReader<File>,
res_body: ServoArc<Mutex<ResponseBody>>,
cancellation_listener: Arc<Mutex<CancellationListener>>,
range: RelativePos,
) {
thread::Builder::new()
.name("fetch file worker thread".to_string())
.spawn(move || {
loop {
if cancellation_listener.lock().unwrap().cancelled() {
*res_body.lock().unwrap() = ResponseBody::Done(vec![]);
let _ = done_sender.send(Data::Cancelled);
return;
}
let length = {
let buffer = reader.fill_buf().unwrap().to_vec();
let mut buffer_len = buffer.len();
if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() {
let offset = usize::min(
{
if let Some(end) = range.end {
// HTTP Range requests are specified with closed ranges,
// while Rust uses half-open ranges. We add +1 here so
// we don't skip the last requested byte.
let remaining_bytes =
end as usize - range.start as usize - body.len() + 1;
if remaining_bytes <= FILE_CHUNK_SIZE {
// This is the last chunk so we set buffer
// len to 0 to break the reading loop.
buffer_len = 0;
remaining_bytes
} else {
FILE_CHUNK_SIZE
}
} else {
FILE_CHUNK_SIZE
}
},
buffer.len(),
);
let chunk = &buffer[0..offset];
body.extend_from_slice(chunk);
let _ = done_sender.send(Data::Payload(chunk.to_vec()));
}
buffer_len
};
if length == 0 {
let mut body = res_body.lock().unwrap();
let completed_body = match *body {
ResponseBody::Receiving(ref mut body) => mem::replace(body, vec![]),
_ => vec![],
};
*body = ResponseBody::Done(completed_body);
let _ = done_sender.send(Data::Done);
break;
}
reader.consume(length);
}
})
.expect("Failed to create fetch file worker thread");
}
fn set_headers(headers: &mut HeaderMap, content_length: u64, mime: Mime, filename: Option<String>) {
headers.typed_insert(ContentLength(content_length));
headers.typed_insert(ContentType::from(mime.clone()));