checkpoint

This commit is contained in:
John Smith
2023-05-22 23:54:25 +01:00
parent f54a6fcf31
commit a1f295da78
19 changed files with 441 additions and 328 deletions

View File

@@ -1,4 +1,3 @@
mod table_db;
use super::*;
#[cfg(target_arch = "wasm32")]
@@ -10,15 +9,5 @@ mod native;
#[cfg(not(target_arch = "wasm32"))]
pub use native::*;
pub static KNOWN_TABLE_NAMES: [&'static str; 7] = [
"crypto_caches",
"RouteSpecStore",
"routing_table",
"local_records",
"local_subkeys",
"remote_records",
"remote_subkeys",
];
pub static KNOWN_PROTECTED_STORE_KEYS: [&'static str; 4] =
["node_id", "node_id_secret", "_test_key", "RouteSpecStore"];

View File

@@ -1,12 +1,10 @@
mod block_store;
mod protected_store;
mod system;
mod table_store;
pub use block_store::*;
pub use protected_store::*;
pub use system::*;
pub use table_store::*;
#[cfg(target_os = "android")]
pub mod android;

View File

@@ -1,148 +0,0 @@
use super::*;
use crate::intf::table_db::TableDBUnlockedInner;
pub use crate::intf::table_db::{TableDB, TableDBTransaction};
use keyvaluedb_sqlite::*;
use std::path::PathBuf;
struct TableStoreInner {
opened: BTreeMap<String, Weak<TableDBUnlockedInner>>,
}
/// Veilid Table Storage
/// Database for storing key value pairs persistently across runs
#[derive(Clone)]
pub struct TableStore {
config: VeilidConfig,
inner: Arc<Mutex<TableStoreInner>>,
}
impl TableStore {
fn new_inner() -> TableStoreInner {
TableStoreInner {
opened: BTreeMap::new(),
}
}
pub(crate) fn new(config: VeilidConfig) -> Self {
Self {
config,
inner: Arc::new(Mutex::new(Self::new_inner())),
}
}
/// Delete all known tables
pub async fn delete_all(&self) {
for ktn in &KNOWN_TABLE_NAMES {
if let Err(e) = self.delete(ktn).await {
error!("failed to delete '{}': {}", ktn, e);
} else {
debug!("deleted table '{}'", ktn);
}
}
}
pub(crate) async fn init(&self) -> EyreResult<()> {
Ok(())
}
pub(crate) async fn terminate(&self) {
let inner = self.inner.lock();
if !inner.opened.is_empty() {
panic!(
"all open databases should have been closed: {:?}",
inner.opened
);
}
}
pub(crate) fn on_table_db_drop(&self, table: String) {
let mut inner = self.inner.lock();
if inner.opened.remove(&table).is_none() {
unreachable!("should have removed an item");
}
}
fn get_dbpath(&self, table: &str) -> EyreResult<PathBuf> {
if !table
.chars()
.all(|c| char::is_alphanumeric(c) || c == '_' || c == '-')
{
bail!("table name '{}' is invalid", table);
}
let c = self.config.get();
let tablestoredir = c.table_store.directory.clone();
std::fs::create_dir_all(&tablestoredir).wrap_err("failed to create tablestore path")?;
let dbpath: PathBuf = [tablestoredir, String::from(table)].iter().collect();
Ok(dbpath)
}
fn get_table_name(&self, table: &str) -> EyreResult<String> {
if !table
.chars()
.all(|c| char::is_alphanumeric(c) || c == '_' || c == '-')
{
bail!("table name '{}' is invalid", table);
}
let c = self.config.get();
let namespace = c.namespace.clone();
Ok(if namespace.is_empty() {
table.to_string()
} else {
format!("_ns_{}_{}", namespace, table)
})
}
/// Get or create a TableDB database table. If the column count is greater than an
/// existing TableDB's column count, the database will be upgraded to add the missing columns
pub async fn open(&self, name: &str, column_count: u32) -> EyreResult<TableDB> {
let table_name = self.get_table_name(name)?;
let mut inner = self.inner.lock();
if let Some(table_db_weak_inner) = inner.opened.get(&table_name) {
match TableDB::try_new_from_weak_inner(table_db_weak_inner.clone()) {
Some(tdb) => {
return Ok(tdb);
}
None => {
inner.opened.remove(&table_name);
}
};
}
let dbpath = self.get_dbpath(&table_name)?;
// Ensure permissions are correct
ensure_file_private_owner(&dbpath)?;
let cfg = DatabaseConfig::with_columns(column_count);
let db = Database::open(&dbpath, cfg).wrap_err("failed to open tabledb")?;
// Ensure permissions are correct
ensure_file_private_owner(&dbpath)?;
trace!(
"opened table store '{}' at path '{:?}' with {} columns",
name,
dbpath,
column_count
);
let table_db = TableDB::new(table_name.clone(), self.clone(), db);
inner.opened.insert(table_name, table_db.weak_inner());
Ok(table_db)
}
/// Delete a TableDB table by name
pub async fn delete(&self, name: &str) -> EyreResult<bool> {
let table_name = self.get_table_name(name)?;
let inner = self.inner.lock();
if inner.opened.contains_key(&table_name) {
bail!("Not deleting table that is still opened");
}
let dbpath = self.get_dbpath(&table_name)?;
let ret = std::fs::remove_file(dbpath).is_ok();
Ok(ret)
}
}

View File

@@ -1,283 +0,0 @@
use crate::*;
cfg_if! {
if #[cfg(target_arch = "wasm32")] {
use keyvaluedb_web::*;
use keyvaluedb::*;
} else {
use keyvaluedb_sqlite::*;
use keyvaluedb::*;
}
}
pub struct TableDBUnlockedInner {
table: String,
table_store: TableStore,
database: Database,
}
impl fmt::Debug for TableDBUnlockedInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TableDBInner(table={})", self.table)
}
}
impl Drop for TableDBUnlockedInner {
fn drop(&mut self) {
self.table_store.on_table_db_drop(self.table.clone());
}
}
#[derive(Debug, Clone)]
pub struct TableDB {
unlocked_inner: Arc<TableDBUnlockedInner>,
}
impl TableDB {
pub(super) fn new(table: String, table_store: TableStore, database: Database) -> Self {
Self {
unlocked_inner: Arc::new(TableDBUnlockedInner {
table,
table_store,
database,
}),
}
}
pub(super) fn try_new_from_weak_inner(weak_inner: Weak<TableDBUnlockedInner>) -> Option<Self> {
weak_inner.upgrade().map(|table_db_unlocked_inner| Self {
unlocked_inner: table_db_unlocked_inner,
})
}
pub(super) fn weak_inner(&self) -> Weak<TableDBUnlockedInner> {
Arc::downgrade(&self.unlocked_inner)
}
/// Get the total number of columns in the TableDB
pub fn get_column_count(&self) -> VeilidAPIResult<u32> {
let db = &self.unlocked_inner.database;
db.num_columns().map_err(VeilidAPIError::from)
}
/// Get the list of keys in a column of the TableDB
pub async fn get_keys(&self, col: u32) -> VeilidAPIResult<Vec<Box<[u8]>>> {
let db = self.unlocked_inner.database.clone();
let mut out: Vec<Box<[u8]>> = Vec::new();
db.iter(col, None, |kv| {
out.push(kv.0.clone().into_boxed_slice());
Ok(Option::<()>::None)
})
.await
.map_err(VeilidAPIError::from)?;
Ok(out)
}
/// Start a TableDB write transaction. The transaction object must be committed or rolled back before dropping.
pub fn transact(&self) -> TableDBTransaction {
let dbt = self.unlocked_inner.database.transaction();
TableDBTransaction::new(self.clone(), dbt)
}
/// Store a key with a value in a column in the TableDB. Performs a single transaction immediately.
pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> {
let db = self.unlocked_inner.database.clone();
let mut dbt = db.transaction();
dbt.put(col, key, value);
db.write(dbt).await.map_err(VeilidAPIError::generic)
}
/// Store a key in rkyv format with a value in a column in the TableDB. Performs a single transaction immediately.
pub async fn store_rkyv<T>(&self, col: u32, key: &[u8], value: &T) -> VeilidAPIResult<()>
where
T: RkyvSerialize<DefaultVeilidRkyvSerializer>,
{
let v = to_rkyv(value)?;
let db = self.unlocked_inner.database.clone();
let mut dbt = db.transaction();
dbt.put(col, key, v.as_slice());
db.write(dbt).await.map_err(VeilidAPIError::generic)
}
/// Store a key in json format with a value in a column in the TableDB. Performs a single transaction immediately.
pub async fn store_json<T>(&self, col: u32, key: &[u8], value: &T) -> VeilidAPIResult<()>
where
T: serde::Serialize,
{
let v = serde_json::to_vec(value).map_err(VeilidAPIError::internal)?;
let db = self.unlocked_inner.database.clone();
let mut dbt = db.transaction();
dbt.put(col, key, v.as_slice());
db.write(dbt).await.map_err(VeilidAPIError::generic)
}
/// Read a key from a column in the TableDB immediately.
pub async fn load(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<Vec<u8>>> {
let db = self.unlocked_inner.database.clone();
db.get(col, key).await.map_err(VeilidAPIError::from)
}
/// Read an rkyv key from a column in the TableDB immediately
pub async fn load_rkyv<T>(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<T>>
where
T: RkyvArchive,
<T as RkyvArchive>::Archived:
for<'t> CheckBytes<rkyv::validation::validators::DefaultValidator<'t>>,
<T as RkyvArchive>::Archived: RkyvDeserialize<T, VeilidSharedDeserializeMap>,
{
let out = match self.load(col, key).await? {
Some(v) => Some(from_rkyv(v)?),
None => None,
};
Ok(out)
}
/// Read an serde-json key from a column in the TableDB immediately
pub async fn load_json<T>(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<T>>
where
T: for<'de> serde::Deserialize<'de>,
{
let out = match self.load(col, key).await? {
Some(v) => Some(serde_json::from_slice(&v).map_err(VeilidAPIError::internal)?),
None => None,
};
Ok(out)
}
/// Delete key with from a column in the TableDB
pub async fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<Vec<u8>>> {
let db = self.unlocked_inner.database.clone();
let old_value = db.delete(col, key).await.map_err(VeilidAPIError::from)?;
Ok(old_value)
}
/// Delete rkyv key with from a column in the TableDB
pub async fn delete_rkyv<T>(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<T>>
where
T: RkyvArchive,
<T as RkyvArchive>::Archived:
for<'t> CheckBytes<rkyv::validation::validators::DefaultValidator<'t>>,
<T as RkyvArchive>::Archived: RkyvDeserialize<T, VeilidSharedDeserializeMap>,
{
let db = self.unlocked_inner.database.clone();
let old_value = match db.delete(col, key).await.map_err(VeilidAPIError::from)? {
Some(v) => Some(from_rkyv(v)?),
None => None,
};
Ok(old_value)
}
/// Delete serde-json key with from a column in the TableDB
pub async fn delete_json<T>(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<T>>
where
T: for<'de> serde::Deserialize<'de>,
{
let db = self.unlocked_inner.database.clone();
let old_value = match db.delete(col, key).await.map_err(VeilidAPIError::from)? {
Some(v) => Some(serde_json::from_slice(&v).map_err(VeilidAPIError::internal)?),
None => None,
};
Ok(old_value)
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
struct TableDBTransactionInner {
dbt: Option<DBTransaction>,
}
impl fmt::Debug for TableDBTransactionInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"TableDBTransactionInner({})",
match &self.dbt {
Some(dbt) => format!("len={}", dbt.ops.len()),
None => "".to_owned(),
}
)
}
}
/// A TableDB transaction
/// Atomically commits a group of writes or deletes to the TableDB
#[derive(Debug, Clone)]
pub struct TableDBTransaction {
db: TableDB,
inner: Arc<Mutex<TableDBTransactionInner>>,
}
impl TableDBTransaction {
fn new(db: TableDB, dbt: DBTransaction) -> Self {
Self {
db,
inner: Arc::new(Mutex::new(TableDBTransactionInner { dbt: Some(dbt) })),
}
}
/// Commit the transaction. Performs all actions atomically.
pub async fn commit(self) -> VeilidAPIResult<()> {
let dbt = {
let mut inner = self.inner.lock();
inner
.dbt
.take()
.ok_or_else(|| VeilidAPIError::generic("transaction already completed"))?
};
let db = self.db.unlocked_inner.database.clone();
db.write(dbt)
.await
.map_err(|e| VeilidAPIError::generic(format!("commit failed, transaction lost: {}", e)))
}
/// Rollback the transaction. Does nothing to the TableDB.
pub fn rollback(self) {
let mut inner = self.inner.lock();
inner.dbt = None;
}
/// Store a key with a value in a column in the TableDB
pub fn store(&self, col: u32, key: &[u8], value: &[u8]) {
let mut inner = self.inner.lock();
inner.dbt.as_mut().unwrap().put(col, key, value);
}
/// Store a key in rkyv format with a value in a column in the TableDB
pub fn store_rkyv<T>(&self, col: u32, key: &[u8], value: &T) -> VeilidAPIResult<()>
where
T: RkyvSerialize<DefaultVeilidRkyvSerializer>,
{
let v = to_rkyv(value)?;
let mut inner = self.inner.lock();
inner.dbt.as_mut().unwrap().put(col, key, v.as_slice());
Ok(())
}
/// Store a key in rkyv format with a value in a column in the TableDB
pub fn store_json<T>(&self, col: u32, key: &[u8], value: &T) -> VeilidAPIResult<()>
where
T: serde::Serialize,
{
let v = serde_json::to_vec(value).map_err(VeilidAPIError::internal)?;
let mut inner = self.inner.lock();
inner.dbt.as_mut().unwrap().put(col, key, v.as_slice());
Ok(())
}
/// Delete key with from a column in the TableDB
pub fn delete(&self, col: u32, key: &[u8]) {
let mut inner = self.inner.lock();
inner.dbt.as_mut().unwrap().delete(col, key);
}
}
impl Drop for TableDBTransactionInner {
fn drop(&mut self) {
if self.dbt.is_some() {
warn!("Dropped transaction without commit or rollback");
}
}
}

View File

@@ -1,11 +1,9 @@
mod block_store;
mod protected_store;
mod system;
mod table_store;
pub use block_store::*;
pub use protected_store::*;
pub use system::*;
pub use table_store::*;
use super::*;

View File

@@ -1,147 +0,0 @@
use super::*;
use crate::intf::table_db::TableDBUnlockedInner;
pub use crate::intf::table_db::{TableDB, TableDBTransaction};
use keyvaluedb_web::*;
struct TableStoreInner {
opened: BTreeMap<String, Weak<TableDBUnlockedInner>>,
}
#[derive(Clone)]
pub struct TableStore {
config: VeilidConfig,
inner: Arc<Mutex<TableStoreInner>>,
async_lock: Arc<AsyncMutex<()>>,
}
impl TableStore {
fn new_inner() -> TableStoreInner {
TableStoreInner {
opened: BTreeMap::new(),
}
}
pub(crate) fn new(config: VeilidConfig) -> Self {
Self {
config,
inner: Arc::new(Mutex::new(Self::new_inner())),
async_lock: Arc::new(AsyncMutex::new(())),
}
}
/// Delete all known tables
pub async fn delete_all(&self) {
for ktn in &KNOWN_TABLE_NAMES {
if let Err(e) = self.delete(ktn).await {
error!("failed to delete '{}': {}", ktn, e);
}
}
}
pub(crate) async fn init(&self) -> EyreResult<()> {
let _async_guard = self.async_lock.lock().await;
Ok(())
}
pub(crate) async fn terminate(&self) {
let _async_guard = self.async_lock.lock().await;
assert!(
self.inner.lock().opened.len() == 0,
"all open databases should have been closed"
);
}
pub(crate) fn on_table_db_drop(&self, table: String) {
let mut inner = self.inner.lock();
match inner.opened.remove(&table) {
Some(_) => (),
None => {
assert!(false, "should have removed an item");
}
}
}
fn get_table_name(&self, table: &str) -> EyreResult<String> {
if !table
.chars()
.all(|c| char::is_alphanumeric(c) || c == '_' || c == '-')
{
bail!("table name '{}' is invalid", table);
}
let c = self.config.get();
let namespace = c.namespace.clone();
Ok(if namespace.len() == 0 {
format!("{}", table)
} else {
format!("_ns_{}_{}", namespace, table)
})
}
/// Get or create a TableDB database table. If the column count is greater than an
/// existing TableDB's column count, the database will be upgraded to add the missing columns
pub async fn open(&self, name: &str, column_count: u32) -> EyreResult<TableDB> {
let _async_guard = self.async_lock.lock().await;
let table_name = self.get_table_name(name)?;
{
let mut inner = self.inner.lock();
if let Some(table_db_weak_inner) = inner.opened.get(&table_name) {
match TableDB::try_new_from_weak_inner(table_db_weak_inner.clone()) {
Some(tdb) => {
return Ok(tdb);
}
None => {
inner.opened.remove(&table_name);
}
};
}
}
let db = Database::open(table_name.clone(), column_count, false)
.await
.wrap_err("failed to open tabledb")?;
trace!(
"opened table store '{}' with table name '{:?}' with {} columns",
name,
table_name,
column_count
);
let table_db = TableDB::new(table_name.clone(), self.clone(), db);
{
let mut inner = self.inner.lock();
inner.opened.insert(table_name, table_db.weak_inner());
}
Ok(table_db)
}
/// Delete a TableDB table by name
pub async fn delete(&self, name: &str) -> EyreResult<bool> {
let _async_guard = self.async_lock.lock().await;
trace!("TableStore::delete {}", name);
let table_name = self.get_table_name(name)?;
{
let inner = self.inner.lock();
if inner.opened.contains_key(&table_name) {
trace!(
"TableStore::delete {}: Not deleting, still open.",
table_name
);
bail!("Not deleting table that is still opened");
}
}
if is_browser() {
let out = match Database::delete(table_name.clone()).await {
Ok(_) => true,
Err(_) => false,
};
//.map_err(|e| format!("failed to delete tabledb at: {} ({})", table_name, e))?;
trace!("TableStore::deleted {}", table_name);
Ok(out)
} else {
unimplemented!();
}
}
}