diff --git a/veilid-core/src/crypto/mod.rs b/veilid-core/src/crypto/mod.rs index 60c96897..104dd600 100644 --- a/veilid-core/src/crypto/mod.rs +++ b/veilid-core/src/crypto/mod.rs @@ -132,7 +132,7 @@ impl Crypto { drop(db); table_store.delete("crypto_caches").await?; db = table_store.open("crypto_caches", 1).await?; - db.store(0, b"node_id", &node_id.unwrap().bytes)?; + db.store(0, b"node_id", &node_id.unwrap().bytes).await?; } // Schedule flushing @@ -159,7 +159,7 @@ impl Crypto { }; let db = table_store.open("crypto_caches", 1).await?; - db.store(0, b"dh_cache", &cache_bytes)?; + db.store(0, b"dh_cache", &cache_bytes).await?; Ok(()) } diff --git a/veilid-core/src/intf/native/table_store.rs b/veilid-core/src/intf/native/table_store.rs index fca78232..a09b8e4d 100644 --- a/veilid-core/src/intf/native/table_store.rs +++ b/veilid-core/src/intf/native/table_store.rs @@ -1,4 +1,5 @@ -use crate::intf::table_db::*; +use crate::intf::table_db::TableDBInner; +pub use crate::intf::table_db::{TableDB, TableDBTransaction}; use crate::*; use keyvaluedb_sqlite::*; use std::path::PathBuf; diff --git a/veilid-core/src/intf/table_db.rs b/veilid-core/src/intf/table_db.rs index a6bb6b5d..6c93dcbf 100644 --- a/veilid-core/src/intf/table_db.rs +++ b/veilid-core/src/intf/table_db.rs @@ -83,42 +83,42 @@ impl TableDB { } /// Store a key with a value in a column in the TableDB. Performs a single transaction immediately. - pub fn store(&self, col: u32, key: &[u8], value: &[u8]) -> EyreResult<()> { - let db = &self.inner.lock().database; + pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> EyreResult<()> { + let db = self.inner.lock().database.clone(); let mut dbt = db.transaction(); dbt.put(col, key, value); - db.write(dbt).wrap_err("failed to store key") + db.write(dbt).await.wrap_err("failed to store key") } /// Store a key in rkyv format with a value in a column in the TableDB. Performs a single transaction immediately. - pub fn store_rkyv(&self, col: u32, key: &[u8], value: &T) -> EyreResult<()> + pub async fn store_rkyv(&self, col: u32, key: &[u8], value: &T) -> EyreResult<()> where T: RkyvSerialize>, { let v = to_rkyv(value)?; - let db = &self.inner.lock().database; + let db = self.inner.lock().database.clone(); let mut dbt = db.transaction(); dbt.put(col, key, v.as_slice()); - db.write(dbt).wrap_err("failed to store key") + db.write(dbt).await.wrap_err("failed to store key") } /// Store a key in json format with a value in a column in the TableDB. Performs a single transaction immediately. - pub fn store_json(&self, col: u32, key: &[u8], value: &T) -> EyreResult<()> + pub async fn store_json(&self, col: u32, key: &[u8], value: &T) -> EyreResult<()> where T: serde::Serialize, { let v = serde_json::to_vec(value)?; - let db = &self.inner.lock().database; + let db = self.inner.lock().database.clone(); let mut dbt = db.transaction(); dbt.put(col, key, v.as_slice()); - db.write(dbt).wrap_err("failed to store key") + db.write(dbt).await.wrap_err("failed to store key") } /// Read a key from a column in the TableDB immediately. pub fn load(&self, col: u32, key: &[u8]) -> EyreResult>> { - let db = &self.inner.lock().database; + let db = self.inner.lock().database.clone(); db.get(col, key).wrap_err("failed to get key") } @@ -131,7 +131,7 @@ impl TableDB { ::Archived: RkyvDeserialize, { - let db = &self.inner.lock().database; + let db = self.inner.lock().database.clone(); let out = db.get(col, key).wrap_err("failed to get key")?; let b = match out { Some(v) => v, @@ -148,7 +148,7 @@ impl TableDB { where T: for<'de> serde::Deserialize<'de>, { - let db = &self.inner.lock().database; + let db = self.inner.lock().database.clone(); let out = db.get(col, key).wrap_err("failed to get key")?; let b = match out { Some(v) => v, @@ -161,15 +161,15 @@ impl TableDB { } /// Delete key with from a column in the TableDB - pub fn delete(&self, col: u32, key: &[u8]) -> EyreResult { - let db = &self.inner.lock().database; + pub async fn delete(&self, col: u32, key: &[u8]) -> EyreResult { + let db = self.inner.lock().database.clone(); let found = db.get(col, key).wrap_err("failed to get key")?; match found { None => Ok(false), Some(_) => { let mut dbt = db.transaction(); dbt.delete(col, key); - db.write(dbt).wrap_err("failed to delete key")?; + db.write(dbt).await.wrap_err("failed to delete key")?; Ok(true) } } @@ -212,55 +212,58 @@ impl TableDBTransaction { } /// Commit the transaction. Performs all actions atomically. - pub fn commit(mut self) -> EyreResult<()> { + pub async fn commit(self) -> EyreResult<()> { let dbt = { - let inner = self.inner.lock(); + let mut inner = self.inner.lock(); inner .dbt .take() - .ok_or_else(|| Err(eyre!("transaction already completed")))? + .ok_or_else(|| eyre!("transaction already completed"))? }; - self.db - .inner - .lock() - .database - .write(dbt) + let db = self.db.inner.lock().database.clone(); + db.write(dbt) + .await .wrap_err("commit failed, transaction lost") } /// Rollback the transaction. Does nothing to the TableDB. - pub fn rollback(mut self) { - self.dbt = None; + pub fn rollback(self) { + let mut inner = self.inner.lock(); + inner.dbt = None; } /// Store a key with a value in a column in the TableDB - pub fn store(&mut self, col: u32, key: &[u8], value: &[u8]) { - self.dbt.as_mut().unwrap().put(col, key, value); + pub fn store(&self, col: u32, key: &[u8], value: &[u8]) { + let mut inner = self.inner.lock(); + inner.dbt.as_mut().unwrap().put(col, key, value); } /// Store a key in rkyv format with a value in a column in the TableDB - pub fn store_rkyv(&mut self, col: u32, key: &[u8], value: &T) -> EyreResult<()> + pub fn store_rkyv(&self, col: u32, key: &[u8], value: &T) -> EyreResult<()> where T: RkyvSerialize>, { let v = to_rkyv(value)?; - self.dbt.as_mut().unwrap().put(col, key, v.as_slice()); + let mut inner = self.inner.lock(); + inner.dbt.as_mut().unwrap().put(col, key, v.as_slice()); Ok(()) } /// Store a key in rkyv format with a value in a column in the TableDB - pub fn store_json(&mut self, col: u32, key: &[u8], value: &T) -> EyreResult<()> + pub fn store_json(&self, col: u32, key: &[u8], value: &T) -> EyreResult<()> where T: serde::Serialize, { let v = serde_json::to_vec(value)?; - self.dbt.as_mut().unwrap().put(col, key, v.as_slice()); + let mut inner = self.inner.lock(); + inner.dbt.as_mut().unwrap().put(col, key, v.as_slice()); Ok(()) } /// Delete key with from a column in the TableDB - pub fn delete(&mut self, col: u32, key: &[u8]) { - self.dbt.as_mut().unwrap().delete(col, key); + pub fn delete(&self, col: u32, key: &[u8]) { + let mut inner = self.inner.lock(); + inner.dbt.as_mut().unwrap().delete(col, key); } } diff --git a/veilid-core/src/intf/wasm/table_store.rs b/veilid-core/src/intf/wasm/table_store.rs index ba2dfadb..f401220d 100644 --- a/veilid-core/src/intf/wasm/table_store.rs +++ b/veilid-core/src/intf/wasm/table_store.rs @@ -1,4 +1,5 @@ -use crate::intf::table_db::*; +use crate::intf::table_db::TableDBInner; +pub use crate::intf::table_db::{TableDB, TableDBTransaction}; use crate::*; use keyvaluedb_web::*; diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 43aa2ebf..8ddd5c5a 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -241,7 +241,7 @@ impl RoutingTable { let table_store = self.network_manager().table_store(); let tdb = table_store.open("routing_table", 1).await?; let bucket_count = bucketvec.len(); - let mut dbx = tdb.transact(); + let dbx = tdb.transact(); if let Err(e) = dbx.store_rkyv(0, b"bucket_count", &bucket_count) { dbx.rollback(); return Err(e); @@ -250,7 +250,7 @@ impl RoutingTable { for (n, b) in bucketvec.iter().enumerate() { dbx.store(0, format!("bucket_{}", n).as_bytes(), b) } - dbx.commit()?; + dbx.commit().await?; Ok(()) } diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index 77832685..4c7d8d5b 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -512,7 +512,7 @@ impl RouteSpecStore { .network_manager() .table_store(); let rsstdb = table_store.open("RouteSpecStore", 1).await?; - rsstdb.store_rkyv(0, b"content", &content)?; + rsstdb.store_rkyv(0, b"content", &content).await?; // // Keep secrets in protected store as well let pstore = self diff --git a/veilid-core/src/tests/common/test_table_store.rs b/veilid-core/src/tests/common/test_table_store.rs index 773f8b3e..659e7527 100644 --- a/veilid-core/src/tests/common/test_table_store.rs +++ b/veilid-core/src/tests/common/test_table_store.rs @@ -63,7 +63,7 @@ pub async fn test_store_delete_load(ts: TableStore) { "should not load missing key" ); assert!( - db.store(1, b"foo", b"1234567890").is_ok(), + db.store(1, b"foo", b"1234567890").await.is_ok(), "should store new key" ); assert_eq!( @@ -74,23 +74,25 @@ pub async fn test_store_delete_load(ts: TableStore) { assert_eq!(db.load(1, b"foo").unwrap(), Some(b"1234567890".to_vec())); assert!( - db.store(1, b"bar", b"FNORD").is_ok(), + db.store(1, b"bar", b"FNORD").await.is_ok(), "should store new key" ); assert!( - db.store(0, b"bar", b"ABCDEFGHIJKLMNOPQRSTUVWXYZ").is_ok(), + db.store(0, b"bar", b"ABCDEFGHIJKLMNOPQRSTUVWXYZ") + .await + .is_ok(), "should store new key" ); assert!( - db.store(2, b"bar", b"FNORD").is_ok(), + db.store(2, b"bar", b"FNORD").await.is_ok(), "should store new key" ); assert!( - db.store(2, b"baz", b"QWERTY").is_ok(), + db.store(2, b"baz", b"QWERTY").await.is_ok(), "should store new key" ); assert!( - db.store(2, b"bar", b"QWERTYUIOP").is_ok(), + db.store(2, b"bar", b"QWERTYUIOP").await.is_ok(), "should store new key" ); @@ -102,10 +104,10 @@ pub async fn test_store_delete_load(ts: TableStore) { assert_eq!(db.load(2, b"bar").unwrap(), Some(b"QWERTYUIOP".to_vec())); assert_eq!(db.load(2, b"baz").unwrap(), Some(b"QWERTY".to_vec())); - assert_eq!(db.delete(1, b"bar").unwrap(), true); - assert_eq!(db.delete(1, b"bar").unwrap(), false); + assert_eq!(db.delete(1, b"bar").await.unwrap(), true); + assert_eq!(db.delete(1, b"bar").await.unwrap(), false); assert!( - db.delete(4, b"bar").is_err(), + db.delete(4, b"bar").await.is_err(), "can't delete from column that doesn't exist" ); @@ -128,7 +130,7 @@ pub async fn test_frozen(ts: TableStore) { let db = ts.open("test", 3).await.expect("should have opened"); let (dht_key, _) = generate_secret(); - assert!(db.store_rkyv(0, b"asdf", &dht_key).is_ok()); + assert!(db.store_rkyv(0, b"asdf", &dht_key).await.is_ok()); assert_eq!(db.load_rkyv::(0, b"qwer").unwrap(), None); @@ -141,7 +143,7 @@ pub async fn test_frozen(ts: TableStore) { assert_eq!(d, Some(dht_key), "keys should be equal"); assert!( - db.store(1, b"foo", b"1234567890").is_ok(), + db.store(1, b"foo", b"1234567890").await.is_ok(), "should store new key" ); diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index d2d81c45..a63adbb2 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -23,7 +23,7 @@ pub use crypto::Crypto; pub use crypto::{generate_secret, sign, verify, DHTKey, DHTKeySecret, DHTSignature, Nonce}; pub use intf::BlockStore; pub use intf::ProtectedStore; -pub use intf::TableStore; +pub use intf::{TableDB, TableDBTransaction, TableStore}; pub use network_manager::NetworkManager; pub use routing_table::{NodeRef, NodeRefBase}; diff --git a/veilid-flutter/lib/veilid.dart b/veilid-flutter/lib/veilid.dart index a83918d3..2bbac86a 100644 --- a/veilid-flutter/lib/veilid.dart +++ b/veilid-flutter/lib/veilid.dart @@ -1849,10 +1849,26 @@ abstract class VeilidRoutingContext { ///////////////////////////////////// /// VeilidTableDB +abstract class VeilidTableDBTransaction { + Future commit(); + Future rollback(); + Future store(int col, Uint8List key, Uint8List value); + Future storeJson(int col, Uint8List key, Object? object, + {Object? Function(Object? nonEncodable) toEncodable}); + Future delete(int col, Uint8List key); +} + abstract class VeilidTableDB { int getColumnCount(); - List getKeys(); - VeilidTableDBTransaction transact() + List getKeys(int col); + VeilidTableDBTransaction transact(); + Future store(int col, Uint8List key, Uint8List value); + Future storeJson(int col, Uint8List key, Object? object, + {Object? Function(Object? nonEncodable) toEncodable}); + Future load(int col, Uint8List key); + Future loadJson(int col, Uint8List key, + {Object? Function(Object? key, Object? value) reviver}); + Future delete(int col, Uint8List key); } ////////////////////////////////////// diff --git a/veilid-flutter/rust/src/dart_ffi.rs b/veilid-flutter/rust/src/dart_ffi.rs index 2a45485e..0e90ae45 100644 --- a/veilid-flutter/rust/src/dart_ffi.rs +++ b/veilid-flutter/rust/src/dart_ffi.rs @@ -2,6 +2,7 @@ use crate::dart_isolate_wrapper::*; use crate::tools::*; use allo_isolate::*; use cfg_if::*; +use data_encoding::BASE64URL_NOPAD; use ffi_support::*; use lazy_static::*; use opentelemetry::sdk::*; @@ -23,6 +24,10 @@ lazy_static! { Mutex::new(BTreeMap::new()); static ref ROUTING_CONTEXTS: Mutex> = Mutex::new(BTreeMap::new()); + static ref TABLE_DBS: Mutex> = + Mutex::new(BTreeMap::new()); + static ref TABLE_DB_TRANSACTIONS: Mutex> = + Mutex::new(BTreeMap::new()); } async fn get_veilid_api() -> Result { @@ -551,6 +556,266 @@ pub extern "C" fn app_call_reply(port: i64, id: FfiStr, message: FfiStr) { }); } +fn add_table_db(table_db: veilid_core::TableDB) -> u32 { + let mut next_id: u32 = 1; + let mut rc = TABLE_DBS.lock(); + while rc.contains_key(&next_id) { + next_id += 1; + } + rc.insert(next_id, table_db); + next_id +} + +#[no_mangle] +pub extern "C" fn open_table_db(port: i64, name: FfiStr, column_count: u32) { + let name = name.into_opt_string().unwrap_or_default(); + DartIsolateWrapper::new(port).spawn_result(async move { + let veilid_api = get_veilid_api().await?; + let tstore = veilid_api.table_store()?; + let table_db = tstore.open(&name, column_count).await.map_err(veilid_core::VeilidAPIError::generic)?; + let new_id = add_table_db(table_db); + APIResult::Ok(new_id) + }); +} + +#[no_mangle] +pub extern "C" fn release_table_db(id: u32) -> i32 { + let mut rc = TABLE_DBS.lock(); + if rc.remove(&id).is_none() { + return 0; + } + return 1; +} + +#[no_mangle] +pub extern "C" fn delete_table_db(port: i64, name: FfiStr) { + let name = name.into_opt_string().unwrap_or_default(); + DartIsolateWrapper::new(port).spawn_result(async move { + let veilid_api = get_veilid_api().await?; + let tstore = veilid_api.table_store()?; + let deleted = tstore.delete(&name).await.map_err(veilid_core::VeilidAPIError::generic)?; + APIResult::Ok(deleted) + }); +} + +#[no_mangle] +pub extern "C" fn table_db_get_column_count(id: u32) -> u32 { + let table_dbs = TABLE_DBS.lock(); + let Some(table_db) = table_dbs.get(&id) else { + return 0; + }; + let Ok(cc) = table_db.clone().get_column_count() else { + return 0; + }; + return cc; +} + +#[no_mangle] +pub extern "C" fn table_db_get_keys(id: u32, col: u32) -> *mut c_char { + let table_dbs = TABLE_DBS.lock(); + let Some(table_db) = table_dbs.get(&id) else { + return std::ptr::null_mut(); + }; + let Ok(keys) = table_db.clone().get_keys(col) else { + return std::ptr::null_mut(); + }; + let keys: Vec = keys.into_iter().map(|k| BASE64URL_NOPAD.encode(&k)).collect(); + let out = veilid_core::serialize_json(keys); + out.into_ffi_value() +} + +fn add_table_db_transaction(tdbt: veilid_core::TableDBTransaction) -> u32 { + let mut next_id: u32 = 1; + let mut tdbts = TABLE_DB_TRANSACTIONS.lock(); + while tdbts.contains_key(&next_id) { + next_id += 1; + } + tdbts.insert(next_id, tdbt); + next_id +} + +#[no_mangle] +pub extern "C" fn table_db_transact(id: u32) -> u32 { + let table_dbs = TABLE_DBS.lock(); + let Some(table_db) = table_dbs.get(&id) else { + return 0; + }; + let tdbt = table_db.clone().transact(); + let tdbtid = add_table_db_transaction(tdbt); + return tdbtid; +} + +#[no_mangle] +pub extern "C" fn release_table_db_transaction(id: u32) -> i32 { + let mut tdbts = TABLE_DB_TRANSACTIONS.lock(); + if tdbts.remove(&id).is_none() { + return 0; + } + return 1; +} + + +#[no_mangle] +pub extern "C" fn table_db_transaction_commit(port: i64, id: u32) { + DartIsolateWrapper::new(port).spawn_result_json(async move { + let tdbt = { + let tdbts = TABLE_DB_TRANSACTIONS.lock(); + let Some(tdbt) = tdbts.get(&id) else { + return APIResult::Err(veilid_core::VeilidAPIError::invalid_argument("table_db_transaction_commit", "id", id)); + }; + tdbt.clone() + }; + + tdbt.commit().await.map_err(veilid_core::VeilidAPIError::generic)?; + APIRESULT_VOID + }); +} +#[no_mangle] +pub extern "C" fn table_db_transaction_rollback(port: i64, id: u32) { + DartIsolateWrapper::new(port).spawn_result_json(async move { + let tdbt = { + let tdbts = TABLE_DB_TRANSACTIONS.lock(); + let Some(tdbt) = tdbts.get(&id) else { + return APIResult::Err(veilid_core::VeilidAPIError::invalid_argument("table_db_transaction_rollback", "id", id)); + }; + tdbt.clone() + }; + + tdbt.rollback(); + APIRESULT_VOID + }); +} + +#[no_mangle] +pub extern "C" fn table_db_transaction_store(port: i64, id: u32, col: u32, key: FfiStr, value: FfiStr) { + let key: Vec = data_encoding::BASE64URL_NOPAD + .decode( + veilid_core::deserialize_opt_json::(key.into_opt_string()) + .unwrap() + .as_bytes(), + ) + .unwrap(); + let value: Vec = data_encoding::BASE64URL_NOPAD + .decode( + veilid_core::deserialize_opt_json::(value.into_opt_string()) + .unwrap() + .as_bytes(), + ) + .unwrap(); + DartIsolateWrapper::new(port).spawn_result_json(async move { + let tdbt = { + let tdbts = TABLE_DB_TRANSACTIONS.lock(); + let Some(tdbt) = tdbts.get(&id) else { + return APIResult::Err(veilid_core::VeilidAPIError::invalid_argument("table_db_transaction_store", "id", id)); + }; + tdbt.clone() + }; + + tdbt.store(col, &key, &value); + APIRESULT_VOID + }); +} + + +#[no_mangle] +pub extern "C" fn table_db_transaction_delete(port: i64, id: u32, col: u32, key: FfiStr) { + let key: Vec = data_encoding::BASE64URL_NOPAD + .decode( + veilid_core::deserialize_opt_json::(key.into_opt_string()) + .unwrap() + .as_bytes(), + ) + .unwrap(); + DartIsolateWrapper::new(port).spawn_result_json(async move { + let tdbt = { + let tdbts = TABLE_DB_TRANSACTIONS.lock(); + let Some(tdbt) = tdbts.get(&id) else { + return APIResult::Err(veilid_core::VeilidAPIError::invalid_argument("table_db_transaction_delete", "id", id)); + }; + tdbt.clone() + }; + + let out = tdbt.delete(col, &key); + APIResult::Ok(out) + }); +} + +#[no_mangle] +pub extern "C" fn table_db_store(port: i64, id: u32, col: u32, key: FfiStr, value: FfiStr) { + let key: Vec = data_encoding::BASE64URL_NOPAD + .decode( + veilid_core::deserialize_opt_json::(key.into_opt_string()) + .unwrap() + .as_bytes(), + ) + .unwrap(); + let value: Vec = data_encoding::BASE64URL_NOPAD + .decode( + veilid_core::deserialize_opt_json::(value.into_opt_string()) + .unwrap() + .as_bytes(), + ) + .unwrap(); + DartIsolateWrapper::new(port).spawn_result_json(async move { + let table_db = { + let table_dbs = TABLE_DBS.lock(); + let Some(table_db) = table_dbs.get(&id) else { + return APIResult::Err(veilid_core::VeilidAPIError::invalid_argument("table_db_store", "id", id)); + }; + table_db.clone() + }; + + table_db.store(col, &key, &value).await.map_err(veilid_core::VeilidAPIError::generic)?; + APIRESULT_VOID + }); +} + +#[no_mangle] +pub extern "C" fn table_db_load(port: i64, id: u32, col: u32, key: FfiStr) { + let key: Vec = data_encoding::BASE64URL_NOPAD + .decode( + veilid_core::deserialize_opt_json::(key.into_opt_string()) + .unwrap() + .as_bytes(), + ) + .unwrap(); + DartIsolateWrapper::new(port).spawn_result_json(async move { + let table_db = { + let table_dbs = TABLE_DBS.lock(); + let Some(table_db) = table_dbs.get(&id) else { + return APIResult::Err(veilid_core::VeilidAPIError::invalid_argument("table_db_load", "id", id)); + }; + table_db.clone() + }; + + let out = table_db.load(col, &key).map_err(veilid_core::VeilidAPIError::generic)?; + APIResult::Ok(out) + }); +} + +#[no_mangle] +pub extern "C" fn table_db_delete(port: i64, id: u32, col: u32, key: FfiStr) { + let key: Vec = data_encoding::BASE64URL_NOPAD + .decode( + veilid_core::deserialize_opt_json::(key.into_opt_string()) + .unwrap() + .as_bytes(), + ) + .unwrap(); + DartIsolateWrapper::new(port).spawn_result_json(async move { + let table_db = { + let table_dbs = TABLE_DBS.lock(); + let Some(table_db) = table_dbs.get(&id) else { + return APIResult::Err(veilid_core::VeilidAPIError::invalid_argument("table_db_delete", "id", id)); + }; + table_db.clone() + }; + + let out = table_db.delete(col, &key).await.map_err(veilid_core::VeilidAPIError::generic)?; + APIResult::Ok(out) + }); +} + #[no_mangle] pub extern "C" fn debug(port: i64, command: FfiStr) { let command = command.into_opt_string().unwrap_or_default();