Initial IndexedDB Support (#33044)

Adds indexeddb support to servo. At the moment heed is being used as the
backend, although this can be swapped out by implementing `KvsEngine`.
This PR adds a thread + a thread pool for Indexeddb related operations.
Also `database_access_task_source` is added for Indexeddb related
operations.

This is a partial rewrite of #25214. (Reopened due to branching issue)

Fixes #6963

---------

Signed-off-by: Ashwin Naren <arihant2math@gmail.com>
Signed-off-by: Josh Matthews <josh@joshmatthews.net>
Co-authored-by: Rasmus Viitanen <rasviitanen@gmail.com>
Co-authored-by: Josh Matthews <josh@joshmatthews.net>
This commit is contained in:
Ashwin Naren 2025-06-18 22:19:07 -07:00 committed by GitHub
parent d33cf8fbd1
commit ed9a79f3f4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
259 changed files with 11141 additions and 8 deletions

View file

@ -0,0 +1,251 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use heed::types::*;
use heed::{Database, Env, EnvOpenOptions};
use log::warn;
use net_traits::indexeddb_thread::{AsyncOperation, IndexedDBTxnMode};
use tokio::sync::oneshot;
use super::{KvsEngine, KvsTransaction, SanitizedName};
use crate::resource_thread::CoreResourceThreadPool;
type HeedDatabase = Database<Bytes, Bytes>;
// A simple store that also has a key generator that can be used if no key
// is provided for the stored objects
#[derive(Clone)]
struct Store {
inner: HeedDatabase,
// https://www.w3.org/TR/IndexedDB-2/#key-generator
key_generator: Option<u64>,
}
pub struct HeedEngine {
heed_env: Arc<Env>,
open_stores: Arc<RwLock<HashMap<SanitizedName, Store>>>,
read_pool: Arc<CoreResourceThreadPool>,
write_pool: Arc<CoreResourceThreadPool>,
}
impl HeedEngine {
pub fn new(
base_dir: &Path,
db_file_name: &Path,
thread_pool: Arc<CoreResourceThreadPool>,
) -> Self {
let mut db_dir = PathBuf::new();
db_dir.push(base_dir);
db_dir.push(db_file_name);
std::fs::create_dir_all(&db_dir).expect("Could not create OS directory for idb");
// FIXME:(arihant2math) gracefully handle errors like hitting max dbs
#[allow(unsafe_code)]
let env = unsafe {
EnvOpenOptions::new()
.max_dbs(1024)
.open(db_dir)
.expect("Failed to open db_dir")
};
Self {
heed_env: Arc::new(env),
open_stores: Arc::new(RwLock::new(HashMap::new())),
read_pool: thread_pool.clone(),
write_pool: thread_pool,
}
}
}
impl KvsEngine for HeedEngine {
fn create_store(&self, store_name: SanitizedName, auto_increment: bool) {
let mut write_txn = self
.heed_env
.write_txn()
.expect("Could not create idb store writer");
let _ = self.heed_env.clear_stale_readers();
let new_store: HeedDatabase = self
.heed_env
.create_database(&mut write_txn, Some(&*store_name.to_string()))
.expect("Failed to create idb store");
let key_generator = { if auto_increment { Some(0) } else { None } };
let store = Store {
inner: new_store,
key_generator,
};
self.open_stores
.write()
.expect("Could not acquire lock on stores")
.insert(store_name, store);
}
fn delete_store(&self, store_name: SanitizedName) {
// TODO: Actually delete store instead of just clearing it
let mut write_txn = self
.heed_env
.write_txn()
.expect("Could not create idb store writer");
let store: HeedDatabase = self
.heed_env
.create_database(&mut write_txn, Some(&*store_name.to_string()))
.expect("Failed to create idb store");
store.clear(&mut write_txn).expect("Could not clear store");
let mut open_stores = self.open_stores.write().unwrap();
open_stores.retain(|key, _| key != &store_name);
}
fn close_store(&self, store_name: SanitizedName) {
// FIXME: (arihant2math) unused
// FIXME:(arihant2math) return error if no store ...
let mut open_stores = self.open_stores.write().unwrap();
open_stores.retain(|key, _| key != &store_name);
}
// Starts a transaction, processes all operations for that transaction,
// and commits the changes.
fn process_transaction(
&self,
transaction: KvsTransaction,
) -> oneshot::Receiver<Option<Vec<u8>>> {
// This executes in a thread pool, and `readwrite` transactions
// will block their thread if the writer is occupied, so we can
// probably do some smart things here in order to optimize.
// Queueing 8 writers will for example block 7 threads,
// so write operations are reserved for just one thread,
// so that the rest of the threads can work in parallel with read txns.
let heed_env = self.heed_env.clone();
let stores = self.open_stores.clone();
let (tx, rx) = oneshot::channel();
if let IndexedDBTxnMode::Readonly = transaction.mode {
self.read_pool.spawn(move || {
let env = heed_env;
let rtxn = env.read_txn().expect("Could not create idb store reader");
for request in transaction.requests {
match request.operation {
AsyncOperation::GetItem(key) => {
let key: Vec<u8> = bincode::serialize(&key).unwrap();
let stores = stores
.read()
.expect("Could not acquire read lock on stores");
let store = stores
.get(&request.store_name)
.expect("Could not get store");
let result = store.inner.get(&rtxn, &key).expect("Could not get item");
if let Some(blob) = result {
let _ = request.sender.send(Some(blob.to_vec()));
} else {
let _ = request.sender.send(None);
}
},
_ => {
// We cannot reach this, as checks are made earlier so that
// no modifying requests are executed on readonly transactions
unreachable!(
"Cannot execute modifying request with readonly transactions"
);
},
}
}
if tx.send(None).is_err() {
warn!("IDBTransaction's execution channel is dropped");
};
});
} else {
self.write_pool.spawn(move || {
// Acquiring a writer will block the thread if another `readwrite` transaction is active
let env = heed_env;
let mut wtxn = env.write_txn().expect("Could not creat idb store writer");
for request in transaction.requests {
match request.operation {
AsyncOperation::PutItem(key, value, overwrite) => {
let key: Vec<u8> = bincode::serialize(&key).unwrap();
let stores = stores
.write()
.expect("Could not acquire write lock on stores");
let store = stores
.get(&request.store_name)
.expect("Could not get store");
if overwrite {
let result =
store.inner.put(&mut wtxn, &key, &value).ok().and(Some(key));
request.sender.send(result).unwrap();
} else if store
.inner
.get(&wtxn, &key)
.expect("Could not get item")
.is_none()
{
let result =
store.inner.put(&mut wtxn, &key, &value).ok().and(Some(key));
let _ = request.sender.send(result);
} else {
let _ = request.sender.send(None);
}
},
AsyncOperation::GetItem(key) => {
let key: Vec<u8> = bincode::serialize(&key).unwrap();
let stores = stores
.read()
.expect("Could not acquire write lock on stores");
let store = stores
.get(&request.store_name)
.expect("Could not get store");
let result = store.inner.get(&wtxn, &key).expect("Could not get item");
if let Some(blob) = result {
let _ = request.sender.send(Some(blob.to_vec()));
} else {
let _ = request.sender.send(None);
}
},
AsyncOperation::RemoveItem(key) => {
let key: Vec<u8> = bincode::serialize(&key).unwrap();
let stores = stores
.write()
.expect("Could not acquire write lock on stores");
let store = stores
.get(&request.store_name)
.expect("Could not get store");
let result = store.inner.delete(&mut wtxn, &key).ok().and(Some(key));
let _ = request.sender.send(result);
},
AsyncOperation::Count(key) => {
let _key: Vec<u8> = bincode::serialize(&key).unwrap();
let stores = stores
.read()
.expect("Could not acquire read lock on stores");
let _store = stores
.get(&request.store_name)
.expect("Could not get store");
// FIXME:(arihant2math) Return count with sender
},
}
}
wtxn.commit().expect("Failed to commit to database");
})
}
rx
}
fn has_key_generator(&self, store_name: SanitizedName) -> bool {
let has_generator = self
.open_stores
.read()
.expect("Could not acquire read lock on stores")
.get(&store_name)
.expect("Store not found")
.key_generator
.is_some();
has_generator
}
}

View file

@ -0,0 +1,73 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use std::collections::VecDeque;
use ipc_channel::ipc::IpcSender;
use net_traits::indexeddb_thread::{AsyncOperation, IndexedDBTxnMode};
use tokio::sync::oneshot;
pub use self::heed::HeedEngine;
mod heed;
#[derive(Eq, Hash, PartialEq)]
pub struct SanitizedName {
name: String,
}
impl SanitizedName {
pub fn new(name: String) -> SanitizedName {
let name = name.replace("https://", "");
let name = name.replace("http://", "");
// FIXME:(arihant2math) Disallowing special characters might be a big problem,
// but better safe than sorry. E.g. the db name '../other_origin/db',
// would let us access databases from another origin.
let name = name
.chars()
.map(|c| match c {
'A'..='Z' => c,
'a'..='z' => c,
'0'..='9' => c,
'-' => c,
'_' => c,
_ => '-',
})
.collect();
SanitizedName { name }
}
}
impl std::fmt::Display for SanitizedName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name)
}
}
pub struct KvsOperation {
pub sender: IpcSender<Option<Vec<u8>>>,
pub store_name: SanitizedName,
pub operation: AsyncOperation,
}
pub struct KvsTransaction {
pub mode: IndexedDBTxnMode,
pub requests: VecDeque<KvsOperation>,
}
pub trait KvsEngine {
fn create_store(&self, store_name: SanitizedName, auto_increment: bool);
fn delete_store(&self, store_name: SanitizedName);
#[expect(dead_code)]
fn close_store(&self, store_name: SanitizedName);
fn process_transaction(
&self,
transaction: KvsTransaction,
) -> oneshot::Receiver<Option<Vec<u8>>>;
fn has_key_generator(&self, store_name: SanitizedName) -> bool;
}

View file

@ -0,0 +1,374 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use std::borrow::ToOwned;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::path::PathBuf;
use std::sync::Arc;
use std::thread;
use ipc_channel::ipc::{self, IpcError, IpcReceiver, IpcSender};
use log::{debug, warn};
use net_traits::indexeddb_thread::{
AsyncOperation, IndexedDBThreadMsg, IndexedDBThreadReturnType, IndexedDBTxnMode, SyncOperation,
};
use servo_config::pref;
use servo_url::origin::ImmutableOrigin;
use crate::indexeddb::engines::{
HeedEngine, KvsEngine, KvsOperation, KvsTransaction, SanitizedName,
};
use crate::resource_thread::CoreResourceThreadPool;
pub trait IndexedDBThreadFactory {
fn new(config_dir: Option<PathBuf>) -> Self;
}
impl IndexedDBThreadFactory for IpcSender<IndexedDBThreadMsg> {
fn new(config_dir: Option<PathBuf>) -> IpcSender<IndexedDBThreadMsg> {
let (chan, port) = ipc::channel().unwrap();
let mut idb_base_dir = PathBuf::new();
if let Some(p) = config_dir {
idb_base_dir.push(p);
}
idb_base_dir.push("IndexedDB");
thread::Builder::new()
.name("IndexedDBManager".to_owned())
.spawn(move || {
IndexedDBManager::new(port, idb_base_dir).start();
})
.expect("Thread spawning failed");
chan
}
}
#[derive(Clone, Eq, Hash, PartialEq)]
pub struct IndexedDBDescription {
origin: ImmutableOrigin,
name: String,
}
impl IndexedDBDescription {
// Converts the database description to a folder name where all
// data for this database is stored
fn as_path(&self) -> PathBuf {
let mut path = PathBuf::new();
let sanitized_origin = SanitizedName::new(self.origin.ascii_serialization());
let sanitized_name = SanitizedName::new(self.name.clone());
path.push(sanitized_origin.to_string());
path.push(sanitized_name.to_string());
path
}
}
struct IndexedDBEnvironment<E: KvsEngine> {
engine: E,
version: u64,
transactions: HashMap<u64, KvsTransaction>,
serial_number_counter: u64,
}
impl<E: KvsEngine> IndexedDBEnvironment<E> {
fn new(engine: E, version: u64) -> IndexedDBEnvironment<E> {
IndexedDBEnvironment {
engine,
version,
transactions: HashMap::new(),
serial_number_counter: 0,
}
}
fn queue_operation(
&mut self,
sender: IpcSender<Option<Vec<u8>>>,
store_name: SanitizedName,
serial_number: u64,
mode: IndexedDBTxnMode,
operation: AsyncOperation,
) {
self.transactions
.entry(serial_number)
.or_insert_with(|| KvsTransaction {
requests: VecDeque::new(),
mode,
})
.requests
.push_back(KvsOperation {
sender,
operation,
store_name,
});
}
// Executes all requests for a transaction (without committing)
fn start_transaction(&mut self, txn: u64, sender: Option<IpcSender<Result<(), ()>>>) {
// FIXME:(arihant2math) find a way to optimizations in this function
// rather than on the engine level code (less repetition)
if let Some(txn) = self.transactions.remove(&txn) {
let _ = self.engine.process_transaction(txn).blocking_recv();
}
// We have a sender if the transaction is started manually, and they
// probably want to know when it is finished
if let Some(sender) = sender {
if sender.send(Ok(())).is_err() {
warn!("IDBTransaction starter dropped its channel");
}
}
}
fn has_key_generator(&self, store_name: SanitizedName) -> bool {
self.engine.has_key_generator(store_name)
}
fn create_object_store(
&mut self,
sender: IpcSender<Result<(), ()>>,
store_name: SanitizedName,
auto_increment: bool,
) {
self.engine.create_store(store_name, auto_increment);
let _ = sender.send(Ok(()));
}
fn delete_object_store(
&mut self,
sender: IpcSender<Result<(), ()>>,
store_name: SanitizedName,
) {
self.engine.delete_store(store_name);
let _ = sender.send(Ok(()));
}
}
struct IndexedDBManager {
port: IpcReceiver<IndexedDBThreadMsg>,
idb_base_dir: PathBuf,
databases: HashMap<IndexedDBDescription, IndexedDBEnvironment<HeedEngine>>,
thread_pool: Arc<CoreResourceThreadPool>,
}
impl IndexedDBManager {
fn new(port: IpcReceiver<IndexedDBThreadMsg>, idb_base_dir: PathBuf) -> IndexedDBManager {
debug!("New indexedDBManager");
let thread_count = thread::available_parallelism()
.map(|i| i.get())
.unwrap_or(pref!(threadpools_fallback_worker_num) as usize)
.min(pref!(threadpools_async_runtime_workers_max).max(1) as usize);
IndexedDBManager {
port,
idb_base_dir,
databases: HashMap::new(),
thread_pool: Arc::new(CoreResourceThreadPool::new(
thread_count,
"ImageCache".to_string(),
)),
}
}
}
impl IndexedDBManager {
fn start(&mut self) {
if !pref!(dom_indexeddb_enabled) {
return;
}
loop {
// FIXME:(arihant2math) No message *most likely* means that
// the ipc sender has been dropped, so we break the look
let message = match self.port.recv() {
Ok(msg) => msg,
Err(e) => match e {
IpcError::Disconnected => {
break;
},
other => {
warn!("Error in IndexedDB thread: {:?}", other);
continue;
},
},
};
match message {
IndexedDBThreadMsg::Sync(operation) => {
self.handle_sync_operation(operation);
},
IndexedDBThreadMsg::Async(
sender,
origin,
db_name,
store_name,
txn,
mode,
operation,
) => {
let store_name = SanitizedName::new(store_name);
if let Some(db) = self.get_database_mut(origin, db_name) {
// Queues an operation for a transaction without starting it
db.queue_operation(sender, store_name, txn, mode, operation);
// FIXME:(arihant2math) Schedule transactions properly:
// for now, we start them directly.
db.start_transaction(txn, None);
}
},
}
}
}
fn get_database(
&self,
origin: ImmutableOrigin,
db_name: String,
) -> Option<&IndexedDBEnvironment<HeedEngine>> {
let idb_description = IndexedDBDescription {
origin,
name: db_name,
};
self.databases.get(&idb_description)
}
fn get_database_mut(
&mut self,
origin: ImmutableOrigin,
db_name: String,
) -> Option<&mut IndexedDBEnvironment<HeedEngine>> {
let idb_description = IndexedDBDescription {
origin,
name: db_name,
};
self.databases.get_mut(&idb_description)
}
fn handle_sync_operation(&mut self, operation: SyncOperation) {
match operation {
SyncOperation::CloseDatabase(sender, origin, db_name) => {
let idb_description = IndexedDBDescription {
origin,
name: db_name,
};
if let Some(_db) = self.databases.remove(&idb_description) {
// TODO: maybe close store here?
}
let _ = sender.send(Ok(()));
},
SyncOperation::OpenDatabase(sender, origin, db_name, version) => {
let idb_description = IndexedDBDescription {
origin,
name: db_name,
};
let idb_base_dir = self.idb_base_dir.as_path();
match self.databases.entry(idb_description.clone()) {
Entry::Vacant(e) => {
let db = IndexedDBEnvironment::new(
HeedEngine::new(
idb_base_dir,
&idb_description.as_path(),
self.thread_pool.clone(),
),
version.unwrap_or(0),
);
let _ = sender.send(db.version);
e.insert(db);
},
Entry::Occupied(db) => {
let _ = sender.send(db.get().version);
},
}
},
SyncOperation::DeleteDatabase(sender, origin, db_name) => {
let idb_description = IndexedDBDescription {
origin,
name: db_name,
};
self.databases.remove(&idb_description);
// FIXME:(rasviitanen) Possible security issue?
// FIXME:(arihant2math) using remove_dir_all with arbitrary input ...
let mut db_dir = self.idb_base_dir.clone();
db_dir.push(idb_description.as_path());
if std::fs::remove_dir_all(&db_dir).is_err() {
let _ = sender.send(Err(()));
} else {
let _ = sender.send(Ok(()));
}
},
SyncOperation::HasKeyGenerator(sender, origin, db_name, store_name) => {
let store_name = SanitizedName::new(store_name);
let result = self
.get_database(origin, db_name)
.map(|db| db.has_key_generator(store_name))
.expect("No Database");
sender.send(result).expect("Could not send generator info");
},
SyncOperation::Commit(sender, _origin, _db_name, _txn) => {
// FIXME:(arihant2math) This does nothing at the moment
sender
.send(IndexedDBThreadReturnType::Commit(Err(())))
.expect("Could not send commit status");
},
SyncOperation::UpgradeVersion(sender, origin, db_name, _txn, version) => {
if let Some(db) = self.get_database_mut(origin, db_name) {
db.version = version;
};
// FIXME:(arihant2math) Get the version from the database instead
// We never fail as of now, so we can just return it like this
// for now...
sender
.send(IndexedDBThreadReturnType::UpgradeVersion(Ok(version)))
.expect("Could not upgrade version");
},
SyncOperation::CreateObjectStore(
sender,
origin,
db_name,
store_name,
auto_increment,
) => {
let store_name = SanitizedName::new(store_name);
if let Some(db) = self.get_database_mut(origin, db_name) {
db.create_object_store(sender, store_name, auto_increment);
}
},
SyncOperation::DeleteObjectStore(sender, origin, db_name, store_name) => {
let store_name = SanitizedName::new(store_name);
if let Some(db) = self.get_database_mut(origin, db_name) {
db.delete_object_store(sender, store_name);
}
},
SyncOperation::StartTransaction(sender, origin, db_name, txn) => {
if let Some(db) = self.get_database_mut(origin, db_name) {
db.start_transaction(txn, Some(sender));
};
},
SyncOperation::Version(sender, origin, db_name) => {
if let Some(db) = self.get_database(origin, db_name) {
let _ = sender.send(db.version);
};
},
SyncOperation::RegisterNewTxn(sender, origin, db_name) => {
if let Some(db) = self.get_database_mut(origin, db_name) {
db.serial_number_counter += 1;
let _ = sender.send(db.serial_number_counter);
}
},
SyncOperation::Exit(sender) => {
// FIXME:(rasviitanen) Nothing to do?
let _ = sender.send(IndexedDBThreadReturnType::Exit);
},
}
}
}

View file

@ -0,0 +1,9 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
pub use self::idb_thread::IndexedDBThreadFactory;
mod engines;
pub mod idb_thread;