Improved performance and reduced crashes and hangs

Switch idb backend to heed and add support for more key values and used better concurrency measures and updated interface between backend and scripts.

Signed-off-by: Ashwin Naren <arihant2math@gmail.com>
This commit is contained in:
Ashwin Naren 2024-08-15 15:15:36 -07:00
parent 0a79918849
commit dcca64d034
No known key found for this signature in database
GPG key ID: D96D7DE56FBCB6B6
8 changed files with 156 additions and 136 deletions

View file

@ -35,6 +35,7 @@ futures-core = { version = "0.3.30", default-features = false }
futures-util = { version = "0.3.30", default-features = false }
generic-array = "0.14"
headers = { workspace = true }
heed = "0.20"
http = { workspace = true }
http-body-util = { workspace = true }
hyper = { workspace = true, features = ["client", "http1", "http2"] }
@ -53,7 +54,6 @@ net_traits = { workspace = true }
pixels = { path = "../pixels" }
profile_traits = { workspace = true }
rayon = { workspace = true }
rkv = "0.10"
rustls = { workspace = true }
rustls-pemfile = { workspace = true }
rustls-pki-types = { workspace = true }

View file

@ -1,63 +1,76 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use heed::types::*;
use heed::{Database, Env, EnvOpenOptions};
use log::warn;
use net_traits::indexeddb_thread::{AsyncOperation, IndexedDBKeyType, IndexedDBTxnMode};
use rkv::{Manager, Rkv, SingleStore, StoreOptions, Value};
use net_traits::indexeddb_thread::{AsyncOperation, IndexedDBTxnMode};
use tokio::sync::oneshot;
use super::{KvsEngine, KvsTransaction, SanitizedName};
type HeedDatabase = Database<Bytes, Bytes>;
// A simple store that also has a key generator that can be used if no key
// is provided for the stored objects
#[derive(Clone)]
struct Store {
inner: SingleStore,
key_generator: Option<u64>, // https://www.w3.org/TR/IndexedDB-2/#key-generator
inner: HeedDatabase,
// https://www.w3.org/TR/IndexedDB-2/#key-generator
key_generator: Option<u64>,
}
pub struct RkvEngine {
rkv_handle: Arc<RwLock<Rkv>>,
pub struct HeedEngine {
heed_env: Arc<Env>,
open_stores: Arc<RwLock<HashMap<SanitizedName, Store>>>,
pool: rayon::ThreadPool,
read_pool: rayon::ThreadPool,
write_pool: rayon::ThreadPool,
}
impl RkvEngine {
impl HeedEngine {
pub fn new(base_dir: &Path, db_file_name: &Path) -> Self {
let mut db_dir = PathBuf::new();
db_dir.push(base_dir);
db_dir.push(db_file_name);
std::fs::create_dir_all(&db_dir).expect("Could not create OS directory for idb");
let rkv_handle = Manager::singleton()
.write()
.expect("Could not get write lock")
.get_or_create(db_dir.as_path(), Rkv::new)
.expect("Could not create database with this origin");
// FIXME:(rasviitanen) What is a reasonable number of threads?
RkvEngine {
rkv_handle,
#[allow(unsafe_code)]
let env = unsafe {
EnvOpenOptions::new()
.open(db_dir)
.expect("Failed to open db_dir")
};
// FIXME:(arihant2math) What is a reasonable number of threads?
let threads = 4;
Self {
heed_env: Arc::new(env),
open_stores: Arc::new(RwLock::new(HashMap::new())),
pool: rayon::ThreadPoolBuilder::new()
.num_threads(16)
read_pool: rayon::ThreadPoolBuilder::new()
.num_threads(threads - 1)
.build()
.expect("Could not create IDBTransaction thread pool"),
.expect("Could not create IDBTransaction read thread pool"),
write_pool: rayon::ThreadPoolBuilder::new()
.num_threads(1)
.build()
.expect("Could not create IDBTransaction write thread pool"),
}
}
}
impl KvsEngine for RkvEngine {
impl KvsEngine for HeedEngine {
fn create_store(&self, store_name: SanitizedName, auto_increment: bool) {
let rkv = self.rkv_handle.read().unwrap();
let new_store = rkv
.open_single(&*store_name.to_string(), StoreOptions::create())
.unwrap();
let mut write_txn = self
.heed_env
.write_txn()
.expect("Could not create idb store writer");
let new_store: HeedDatabase = self
.heed_env
.create_database(&mut write_txn, Some(&*store_name.to_string()))
.expect("Failed to create idb store");
let key_generator = {
if auto_increment {
@ -74,49 +87,43 @@ impl KvsEngine for RkvEngine {
self.open_stores
.write()
.expect("Could not aquire lock")
.expect("Could not acquire lock on stores")
.insert(store_name, store);
}
fn has_key_generator(&self, store_name: SanitizedName) -> bool {
let stores = self
.open_stores
.read()
.expect("Could not aquire read lock on stores");
stores
.get(&store_name)
.expect("Store not found")
.key_generator
.is_some()
}
// Starts a transaction, processes all operations for that transaction,
// and commits the changes.
fn process_transaction(
&self,
transaction: KvsTransaction,
) -> oneshot::Receiver<Option<Vec<u8>>> {
let db_handle = self.rkv_handle.clone();
// This executes in a thread pool, and `readwrite` transactions
// will block their thread if the writer is occupied, so we can
// probably do some smart things here in order to optimize.
// Queueing 8 writers will for example block 7 threads,
// so write operations are reserved for just one thread,
// so that the rest of the threads can work in parallel with read txns.
let heed_env = self.heed_env.clone();
let stores = self.open_stores.clone();
let (tx, rx) = oneshot::channel();
self.pool.spawn(move || {
let db = db_handle
.read()
.expect("Could not aquire read lock on idb handle");
let stores = stores.read().expect("Could not aquire read lock on stores");
if let IndexedDBTxnMode::Readonly = transaction.mode {
let reader = db.read().expect("Could not create reader for idb");
self.read_pool.spawn(move || {
let env = heed_env;
let rtxn = env.read_txn().expect("Could not create idb store reader");
for request in transaction.requests {
match request.operation {
AsyncOperation::GetItem(key) => {
let key: Vec<u8> = bincode::serialize(&key).unwrap();
let stores = stores
.read()
.expect("Could not acquire write lock on stores");
let store = stores
.get(&request.store_name)
.expect("Could not get store");
let result = store.inner.get(&reader, key).expect("Could not get item");
let result = store.inner.get(&rtxn, &key).expect("Could not get item");
if let Some(Value::Blob(blob)) = result {
if let Some(blob) = result {
request.sender.send(Some(blob.to_vec())).unwrap();
} else {
request.sender.send(None).unwrap();
@ -131,39 +138,40 @@ impl KvsEngine for RkvEngine {
},
}
}
if tx.send(None).is_err() {
warn!("IDBTransaction's execution channel is dropped");
};
});
} else {
// Aquiring a writer will block the thread if another `readwrite` transaction is active
let mut writer = db.write().expect("Could not create writer for idb");
self.write_pool.spawn(move || {
// Acquiring a writer will block the thread if another `readwrite` transaction is active
let env = heed_env;
let mut wtxn = env.write_txn().expect("Could not creat idb store writer");
for request in transaction.requests {
match request.operation {
AsyncOperation::PutItem(key, value, overwrite) => {
let key: Vec<u8> = bincode::serialize(&key).unwrap();
let stores = stores
.write()
.expect("Could not acquire write lock on stores");
let store = stores
.get(&request.store_name)
.expect("Could not get store");
let key = match key {
IndexedDBKeyType::String(inner) => inner,
IndexedDBKeyType::Number(inner) => inner,
IndexedDBKeyType::Binary(inner) => inner,
};
if overwrite {
let result = store
.inner
.put(&mut writer, key.clone(), &Value::Blob(&value))
.ok()
.and(Some(key));
let result =
store.inner.put(&mut wtxn, &key, &value).ok().and(Some(key));
request.sender.send(result).unwrap();
} else {
// FIXME:(rasviitanen) We should be able to set some flags in
// `rkv` in order to do this without running a get request first
if store
.inner
.get(&writer, key.clone())
.get(&mut wtxn, &key)
.expect("Could not get item")
.is_none()
{
let result = store
.inner
.put(&mut writer, key.clone(), &Value::Blob(&value))
.put(&mut wtxn, &key, &value)
.ok()
.and(Some(key));
request.sender.send(result).unwrap();
@ -173,39 +181,50 @@ impl KvsEngine for RkvEngine {
}
},
AsyncOperation::GetItem(key) => {
let key: Vec<u8> = bincode::serialize(&key).unwrap();
let stores = stores
.read()
.expect("Could not acquire write lock on stores");
let store = stores
.get(&request.store_name)
.expect("Could not get store");
let result = store.inner.get(&writer, key).expect("Could not get item");
let result = store.inner.get(&wtxn, &key).expect("Could not get item");
if let Some(Value::Blob(blob)) = result {
if let Some(blob) = result {
request.sender.send(Some(blob.to_vec())).unwrap();
} else {
request.sender.send(None).unwrap();
}
},
AsyncOperation::RemoveItem(key) => {
let key: Vec<u8> = bincode::serialize(&key).unwrap();
let stores = stores
.write()
.expect("Could not acquire write lock on stores");
let store = stores
.get(&request.store_name)
.expect("Could not get store");
let result = store
.inner
.delete(&mut writer, key.clone())
.ok()
.and(Some(key));
let result = store.inner.delete(&mut wtxn, &key).ok().and(Some(key));
request.sender.send(result).unwrap();
},
}
}
writer.commit().expect("Failed to commit to database");
wtxn.commit().expect("Failed to commit to database");
})
}
if tx.send(None).is_err() {
warn!("IDBTransaction's execution channel is dropped");
};
});
rx
}
fn has_key_generator(&self, store_name: SanitizedName) -> bool {
let has_generator = self
.open_stores
.read()
.expect("Could not acquire read lock on stores")
.get(&store_name)
.expect("Store not found")
.key_generator
.is_some();
has_generator
}
}

View file

@ -8,9 +8,9 @@ use ipc_channel::ipc::IpcSender;
use net_traits::indexeddb_thread::{AsyncOperation, IndexedDBTxnMode};
use tokio::sync::oneshot;
pub use self::rkv::RkvEngine;
pub use self::heed::HeedEngine;
mod rkv;
mod heed;
#[derive(Eq, Hash, PartialEq)]
pub struct SanitizedName {
@ -21,7 +21,7 @@ impl SanitizedName {
pub fn new(name: String) -> SanitizedName {
let name = name.replace("https://", "");
let name = name.replace("http://", "");
// FIXME:(rasviitanen) Disallowing special characters might be a big problem,
// FIXME:(arihant2math) Disallowing special characters might be a big problem,
// but better safe than sorry. E.g. the db name '../other_origin/db',
// would let us access databases from another origin.
let name = name

View file

@ -7,8 +7,8 @@ use std::collections::{HashMap, VecDeque};
use std::path::PathBuf;
use std::thread;
use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
use log::warn;
use ipc_channel::ipc::{self, IpcError, IpcReceiver, IpcSender};
use log::{error, warn};
use net_traits::indexeddb_thread::{
AsyncOperation, IndexedDBThreadMsg, IndexedDBThreadReturnType, IndexedDBTxnMode, SyncOperation,
};
@ -16,7 +16,7 @@ use servo_config::pref;
use servo_url::origin::ImmutableOrigin;
use crate::indexeddb::engines::{
KvsEngine, KvsOperation, KvsTransaction, RkvEngine, SanitizedName,
HeedEngine, KvsEngine, KvsOperation, KvsTransaction, SanitizedName,
};
pub trait IndexedDBThreadFactory {
@ -106,13 +106,7 @@ impl<E: KvsEngine> IndexedDBEnvironment<E> {
// Executes all requests for a transaction (without committing)
fn start_transaction(&mut self, txn: u64, sender: Option<IpcSender<Result<(), ()>>>) {
// FIXME:(rasviitanen)
// This executes in a thread pool, and `readwrite` transactions
// will block their thread if the writer is occupied, so we can
// probably do some smart things here in order to optimize.
// Queueuing 8 writers will for example block 7 threads,
// so we should probably reserve write operations for just one thread,
// so that the rest of the threads can work in parallel with read txns.
// FIXME:(arihant2math) find a way to optimizations in this function rather than on the engine level code (less repetition)
self.transactions.remove(&txn).map(|txn| {
self.engine.process_transaction(txn).blocking_recv();
});
@ -145,14 +139,14 @@ impl<E: KvsEngine> IndexedDBEnvironment<E> {
struct IndexedDBManager {
port: IpcReceiver<IndexedDBThreadMsg>,
idb_base_dir: PathBuf,
databases: HashMap<IndexedDBDescription, IndexedDBEnvironment<RkvEngine>>,
databases: HashMap<IndexedDBDescription, IndexedDBEnvironment<HeedEngine>>,
}
impl IndexedDBManager {
fn new(port: IpcReceiver<IndexedDBThreadMsg>, idb_base_dir: PathBuf) -> IndexedDBManager {
IndexedDBManager {
port: port,
idb_base_dir: idb_base_dir,
port,
idb_base_dir,
databases: HashMap::new(),
}
}
@ -162,7 +156,18 @@ impl IndexedDBManager {
fn start(&mut self) {
if pref!(dom.indexeddb.enabled) {
loop {
let message = self.port.recv().expect("No message");
// FIXME:(arihant2math) No message *most likely* means that
// the ipc sender has been dropped, so we break the look
let message = match self.port.recv() {
Ok(msg) => msg,
Err(e) => match e {
IpcError::Disconnected => {
error!("indexeddb ipc channel has been dropped, breaking loop");
break;
},
other => Err(other).unwrap(),
},
};
match message {
IndexedDBThreadMsg::Sync(operation) => {
self.handle_sync_operation(operation);
@ -180,7 +185,7 @@ impl IndexedDBManager {
self.get_database_mut(origin, db_name).map(|db| {
// Queues an operation for a transaction without starting it
db.queue_operation(sender, store_name, txn, mode, operation);
// FIXME:(rasviitanen) Schedule transactions properly:
// FIXME:(arihant2math) Schedule transactions properly:
// for now, we start them directly.
db.start_transaction(txn, None);
});
@ -194,9 +199,9 @@ impl IndexedDBManager {
&self,
origin: ImmutableOrigin,
db_name: String,
) -> Option<&IndexedDBEnvironment<RkvEngine>> {
) -> Option<&IndexedDBEnvironment<HeedEngine>> {
let idb_description = IndexedDBDescription {
origin: origin,
origin,
name: db_name,
};
@ -207,9 +212,9 @@ impl IndexedDBManager {
&mut self,
origin: ImmutableOrigin,
db_name: String,
) -> Option<&mut IndexedDBEnvironment<RkvEngine>> {
) -> Option<&mut IndexedDBEnvironment<HeedEngine>> {
let idb_description = IndexedDBDescription {
origin: origin,
origin,
name: db_name,
};
@ -220,7 +225,7 @@ impl IndexedDBManager {
match operation {
SyncOperation::OpenDatabase(sender, origin, db_name, version) => {
let idb_description = IndexedDBDescription {
origin: origin,
origin,
name: db_name,
};
@ -229,7 +234,7 @@ impl IndexedDBManager {
match self.databases.entry(idb_description.clone()) {
Entry::Vacant(e) => {
let db = IndexedDBEnvironment::new(
RkvEngine::new(idb_base_dir, &idb_description.as_path()),
HeedEngine::new(idb_base_dir, &idb_description.as_path()),
version.unwrap_or(0),
);
sender.send(db.version).unwrap();
@ -242,12 +247,12 @@ impl IndexedDBManager {
},
SyncOperation::DeleteDatabase(sender, origin, db_name) => {
let idb_description = IndexedDBDescription {
origin: origin,
origin,
name: db_name,
};
self.databases.remove(&idb_description);
// FIXME:(rasviitanen) Possible sercurity issue?
// FIXME:(rasviitanen) Possible security issue?
let mut db_dir = self.idb_base_dir.clone();
db_dir.push(&idb_description.as_path());
if std::fs::remove_dir_all(&db_dir).is_err() {
@ -265,7 +270,7 @@ impl IndexedDBManager {
sender.send(result).expect("Could not send generator info");
},
SyncOperation::Commit(sender, _origin, _db_name, _txn) => {
// FIXME:(rasviitanen) This does nothing at the moment
// FIXME:(arihant2math) This does nothing at the moment
sender
.send(IndexedDBThreadReturnType::Commit(Err(())))
.expect("Could not send commit status");
@ -275,7 +280,7 @@ impl IndexedDBManager {
db.version = version;
});
// FIXME:(rasviitanen) Get the version from the database instead
// FIXME:(arihant2math) Get the version from the database instead
// We never fail as of now, so we can just return it like this
// for now...
sender

View file

@ -181,20 +181,21 @@ impl IDBObjectStore {
}
if let ESClass::Date = built_in_class {
// FIXME:(rasviitanen)
unimplemented!("Dates as keys is currently unsupported");
let key =
structuredclone::write(cx, input, None).expect("Could not serialize key");
return Ok(IndexedDBKeyType::Date(key.serialized.clone()));
}
if IsArrayBufferObject(*object) || JS_IsArrayBufferViewObject(*object) {
let key =
structuredclone::write(cx, input, None).expect("Could not serialize key");
// FIXME:(rasviitanen) Return the correct type here
// FIXME:(arihant2math) Return the correct type here
// it doesn't really matter at the moment...
return Ok(IndexedDBKeyType::Number(key.serialized.clone()));
}
if let ESClass::Array = built_in_class {
// FIXME:(rasviitanen)
// FIXME:(arihant2math)
unimplemented!("Arrays as keys is currently unsupported");
}
}
@ -482,14 +483,11 @@ impl IDBObjectStoreMethods for IDBObjectStore {
// https://www.w3.org/TR/IndexedDB-2/#dom-idbobjectstore-delete
fn Delete(&self, cx: SafeJSContext, query: HandleValue) -> Fallible<DomRoot<IDBRequest>> {
let serialized_query =
structuredclone::write(cx, query, None).expect("Could not serialize value");
IDBRequest::execute_async(
&*self,
AsyncOperation::RemoveItem(serialized_query.serialized.clone()),
None,
)
let serialized_query = IDBObjectStore::convert_value_to_key(cx, query, None);
match serialized_query {
Ok(q) => IDBRequest::execute_async(&*self, AsyncOperation::RemoveItem(q), None),
Err(e) => Err(e),
}
}
// https://www.w3.org/TR/IndexedDB-2/#dom-idbobjectstore-clear
@ -499,14 +497,11 @@ impl IDBObjectStoreMethods for IDBObjectStore {
// https://www.w3.org/TR/IndexedDB-2/#dom-idbobjectstore-get
fn Get(&self, cx: SafeJSContext, query: HandleValue) -> Fallible<DomRoot<IDBRequest>> {
let serialized_query =
structuredclone::write(cx, query, None).expect("Could not serialize value");
IDBRequest::execute_async(
&*self,
AsyncOperation::GetItem(serialized_query.serialized.clone()),
None,
)
let serialized_query = IDBObjectStore::convert_value_to_key(cx, query, None);
match serialized_query {
Ok(q) => IDBRequest::execute_async(&*self, AsyncOperation::GetItem(q), None),
Err(e) => Err(e),
}
}
// https://www.w3.org/TR/IndexedDB-2/#dom-idbobjectstore-getkey

View file

@ -76,6 +76,7 @@ impl IDBVersionChangeEvent {
ev
}
#[allow(non_snake_case)]
pub fn Constructor(
window: &Window,
proto: Option<HandleObject>,

View file

@ -27,12 +27,12 @@ pub enum IndexedDBTxnMode {
}
// https://www.w3.org/TR/IndexedDB-2/#key-type
#[derive(Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum IndexedDBKeyType {
Number(Vec<u8>),
String(Vec<u8>),
Binary(Vec<u8>),
// FIXME:(arihant2math) implement Date(),
Date(Vec<u8>),
// FIXME:(arihant2math) implment Array(),
}
@ -42,7 +42,7 @@ pub enum IndexedDBKeyType {
pub enum AsyncOperation {
/// Gets the value associated with the given key in the associated idb data
GetItem(
Vec<u8>, // Key
IndexedDBKeyType, // Key
),
/// Sets the value of the given key in the associated idb data
@ -54,7 +54,7 @@ pub enum AsyncOperation {
/// Removes the key/value pair for the given key in the associated idb data
RemoveItem(
Vec<u8>, // Key
IndexedDBKeyType, // Key
),
}