mirror of
https://github.com/servo/servo.git
synced 2025-09-27 15:20:09 +01:00
storage: Move shared functionality to base (#39419)
Part of #39418. See that PR for a full description. Moves: - `read_json_from_file` - `write_json_to_file` - `IpcSendResult` - `IpcSend` Renames: - `CoreResourceThreadPool` to `ThreadPool` (shorter and more descriptive, as we use it for more than the core resource thread now) Signed-off-by: Ashwin Naren <arihant2math@gmail.com>
This commit is contained in:
parent
ba208b19cc
commit
f766b66a97
25 changed files with 254 additions and 98 deletions
|
@ -19,10 +19,13 @@ ipc-channel = { workspace = true }
|
|||
malloc_size_of = { workspace = true }
|
||||
malloc_size_of_derive = { workspace = true }
|
||||
parking_lot = { workspace = true }
|
||||
rayon = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
servo_config = { path = "../../config" }
|
||||
time = { workspace = true }
|
||||
webrender_api = { workspace = true }
|
||||
log = { workspace = true }
|
||||
|
||||
[target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies]
|
||||
mach2 = { workspace = true }
|
||||
|
|
|
@ -14,12 +14,62 @@ pub mod generic_channel;
|
|||
pub mod id;
|
||||
pub mod print_tree;
|
||||
pub mod text;
|
||||
pub mod threadpool;
|
||||
mod unicode_block;
|
||||
|
||||
use std::fs::File;
|
||||
use std::io::{BufWriter, Read};
|
||||
use std::path::Path;
|
||||
|
||||
use ipc_channel::ipc::{IpcError, IpcSender};
|
||||
use log::{trace, warn};
|
||||
use malloc_size_of_derive::MallocSizeOf;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use webrender_api::Epoch as WebRenderEpoch;
|
||||
|
||||
pub fn read_json_from_file<T>(data: &mut T, config_dir: &Path, filename: &str)
|
||||
where
|
||||
T: for<'de> Deserialize<'de>,
|
||||
{
|
||||
let path = config_dir.join(filename);
|
||||
let display = path.display();
|
||||
|
||||
let mut file = match File::open(&path) {
|
||||
Err(why) => {
|
||||
warn!("couldn't open {}: {}", display, why);
|
||||
return;
|
||||
},
|
||||
Ok(file) => file,
|
||||
};
|
||||
|
||||
let mut string_buffer: String = String::new();
|
||||
match file.read_to_string(&mut string_buffer) {
|
||||
Err(why) => panic!("couldn't read from {}: {}", display, why),
|
||||
Ok(_) => trace!("successfully read from {}", display),
|
||||
}
|
||||
|
||||
match serde_json::from_str(&string_buffer) {
|
||||
Ok(decoded_buffer) => *data = decoded_buffer,
|
||||
Err(why) => warn!("Could not decode buffer{}", why),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn write_json_to_file<T>(data: &T, config_dir: &Path, filename: &str)
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
let path = config_dir.join(filename);
|
||||
let display = path.display();
|
||||
|
||||
let mut file = match File::create(&path) {
|
||||
Err(why) => panic!("couldn't create {}: {}", display, why),
|
||||
Ok(file) => file,
|
||||
};
|
||||
let mut writer = BufWriter::new(&mut file);
|
||||
serde_json::to_writer_pretty(&mut writer, data).expect("Could not serialize to file");
|
||||
trace!("successfully wrote to {display}");
|
||||
}
|
||||
|
||||
/// A struct for denoting the age of messages; prevents race conditions.
|
||||
#[derive(
|
||||
Clone, Copy, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, MallocSizeOf,
|
||||
|
@ -50,3 +100,18 @@ impl WebRenderEpochToU16 for WebRenderEpoch {
|
|||
(self.0 % u16::MAX as u32) as u16
|
||||
}
|
||||
}
|
||||
|
||||
pub type IpcSendResult = Result<(), IpcError>;
|
||||
|
||||
/// Abstraction of the ability to send a particular type of message,
|
||||
/// used by net_traits::ResourceThreads to ease the use its IpcSender sub-fields
|
||||
/// XXX: If this trait will be used more in future, some auto derive might be appealing
|
||||
pub trait IpcSend<T>
|
||||
where
|
||||
T: serde::Serialize + for<'de> serde::Deserialize<'de>,
|
||||
{
|
||||
/// send message T
|
||||
fn send(&self, _: T) -> IpcSendResult;
|
||||
/// get underlying sender
|
||||
fn sender(&self) -> IpcSender<T>;
|
||||
}
|
||||
|
|
139
components/shared/base/threadpool.rs
Normal file
139
components/shared/base/threadpool.rs
Normal file
|
@ -0,0 +1,139 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use log::debug;
|
||||
|
||||
/// The state of the thread-pool used by CoreResource.
|
||||
struct ThreadPoolState {
|
||||
/// The number of active workers.
|
||||
active_workers: u32,
|
||||
/// Whether the pool can spawn additional work.
|
||||
active: bool,
|
||||
}
|
||||
|
||||
impl ThreadPoolState {
|
||||
pub fn new() -> ThreadPoolState {
|
||||
ThreadPoolState {
|
||||
active_workers: 0,
|
||||
active: true,
|
||||
}
|
||||
}
|
||||
|
||||
/// Is the pool still able to spawn new work?
|
||||
pub fn is_active(&self) -> bool {
|
||||
self.active
|
||||
}
|
||||
|
||||
/// How many workers are currently active?
|
||||
pub fn active_workers(&self) -> u32 {
|
||||
self.active_workers
|
||||
}
|
||||
|
||||
/// Prevent additional work from being spawned.
|
||||
pub fn switch_to_inactive(&mut self) {
|
||||
self.active = false;
|
||||
}
|
||||
|
||||
/// Add to the count of active workers.
|
||||
pub fn increment_active(&mut self) {
|
||||
self.active_workers += 1;
|
||||
}
|
||||
|
||||
/// Substract from the count of active workers.
|
||||
pub fn decrement_active(&mut self) {
|
||||
self.active_workers -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
/// Threadpool used by Fetch and file operations.
|
||||
pub struct ThreadPool {
|
||||
pool: rayon::ThreadPool,
|
||||
state: Arc<Mutex<ThreadPoolState>>,
|
||||
}
|
||||
|
||||
impl ThreadPool {
|
||||
pub fn new(num_threads: usize, pool_name: String) -> Self {
|
||||
debug!("Creating new ThreadPool with {num_threads} threads!");
|
||||
let pool = rayon::ThreadPoolBuilder::new()
|
||||
.thread_name(move |i| format!("{pool_name}#{i}"))
|
||||
.num_threads(num_threads)
|
||||
.build()
|
||||
.unwrap();
|
||||
let state = Arc::new(Mutex::new(ThreadPoolState::new()));
|
||||
Self { pool, state }
|
||||
}
|
||||
|
||||
/// Spawn work on the thread-pool, if still active.
|
||||
///
|
||||
/// There is no need to give feedback to the caller,
|
||||
/// because if we do not perform work,
|
||||
/// it is because the system as a whole is exiting.
|
||||
pub fn spawn<OP>(&self, work: OP)
|
||||
where
|
||||
OP: FnOnce() + Send + 'static,
|
||||
{
|
||||
{
|
||||
let mut state = self.state.lock().unwrap();
|
||||
if state.is_active() {
|
||||
state.increment_active();
|
||||
} else {
|
||||
// Don't spawn any work.
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let state = self.state.clone();
|
||||
|
||||
self.pool.spawn(move || {
|
||||
{
|
||||
let mut state = state.lock().unwrap();
|
||||
if !state.is_active() {
|
||||
// Decrement number of active workers and return,
|
||||
// without doing any work.
|
||||
return state.decrement_active();
|
||||
}
|
||||
}
|
||||
// Perform work.
|
||||
work();
|
||||
{
|
||||
// Decrement number of active workers.
|
||||
let mut state = state.lock().unwrap();
|
||||
state.decrement_active();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Prevent further work from being spawned,
|
||||
/// and wait until all workers are done,
|
||||
/// or a timeout of roughly one second has been reached.
|
||||
pub fn exit(&self) {
|
||||
{
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.switch_to_inactive();
|
||||
}
|
||||
let mut rounds = 0;
|
||||
loop {
|
||||
rounds += 1;
|
||||
{
|
||||
let state = self.state.lock().unwrap();
|
||||
let still_active = state.active_workers();
|
||||
|
||||
if still_active == 0 || rounds == 10 {
|
||||
if still_active > 0 {
|
||||
debug!(
|
||||
"Exiting ThreadPool with {:?} still working(should be zero)",
|
||||
still_active
|
||||
);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -11,6 +11,7 @@ use std::thread::{self, JoinHandle};
|
|||
use base::cross_process_instant::CrossProcessInstant;
|
||||
use base::generic_channel::{GenericSend, GenericSender, SendResult};
|
||||
use base::id::{CookieStoreId, HistoryStateId};
|
||||
use base::{IpcSend, IpcSendResult};
|
||||
use content_security_policy::{self as csp};
|
||||
use cookie::Cookie;
|
||||
use crossbeam_channel::{Receiver, Sender, unbounded};
|
||||
|
@ -18,8 +19,7 @@ use headers::{ContentType, HeaderMapExt, ReferrerPolicy as ReferrerPolicyHeader}
|
|||
use http::{Error as HttpError, HeaderMap, HeaderValue, StatusCode, header};
|
||||
use hyper_serde::Serde;
|
||||
use hyper_util::client::legacy::Error as HyperError;
|
||||
use ipc_channel::Error as IpcError;
|
||||
use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
|
||||
use ipc_channel::ipc::{self, IpcError, IpcReceiver, IpcSender};
|
||||
use ipc_channel::router::ROUTER;
|
||||
use malloc_size_of::malloc_size_of_is_0;
|
||||
use malloc_size_of_derive::MallocSizeOf;
|
||||
|
@ -415,21 +415,6 @@ pub trait AsyncRuntime: Send {
|
|||
/// Handle to a resource thread
|
||||
pub type CoreResourceThread = IpcSender<CoreResourceMsg>;
|
||||
|
||||
pub type IpcSendResult = Result<(), IpcError>;
|
||||
|
||||
/// Abstraction of the ability to send a particular type of message,
|
||||
/// used by net_traits::ResourceThreads to ease the use its IpcSender sub-fields
|
||||
/// XXX: If this trait will be used more in future, some auto derive might be appealing
|
||||
pub trait IpcSend<T>
|
||||
where
|
||||
T: serde::Serialize + for<'de> serde::Deserialize<'de>,
|
||||
{
|
||||
/// send message T
|
||||
fn send(&self, _: T) -> IpcSendResult;
|
||||
/// get underlying sender
|
||||
fn sender(&self) -> IpcSender<T>;
|
||||
}
|
||||
|
||||
// FIXME: Originally we will construct an Arc<ResourceThread> from ResourceThread
|
||||
// in script_thread to avoid some performance pitfall. Now we decide to deal with
|
||||
// the "Arc" hack implicitly in future.
|
||||
|
@ -462,7 +447,7 @@ impl ResourceThreads {
|
|||
|
||||
impl IpcSend<CoreResourceMsg> for ResourceThreads {
|
||||
fn send(&self, msg: CoreResourceMsg) -> IpcSendResult {
|
||||
self.core_thread.send(msg)
|
||||
self.core_thread.send(msg).map_err(IpcError::Bincode)
|
||||
}
|
||||
|
||||
fn sender(&self) -> IpcSender<CoreResourceMsg> {
|
||||
|
@ -472,7 +457,7 @@ impl IpcSend<CoreResourceMsg> for ResourceThreads {
|
|||
|
||||
impl IpcSend<IndexedDBThreadMsg> for ResourceThreads {
|
||||
fn send(&self, msg: IndexedDBThreadMsg) -> IpcSendResult {
|
||||
self.idb_thread.send(msg)
|
||||
self.idb_thread.send(msg).map_err(IpcError::Bincode)
|
||||
}
|
||||
|
||||
fn sender(&self) -> IpcSender<IndexedDBThreadMsg> {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue