checkpoint
This commit is contained in:
@@ -132,7 +132,7 @@ impl Crypto {
|
||||
drop(db);
|
||||
table_store.delete("crypto_caches").await?;
|
||||
db = table_store.open("crypto_caches", 1).await?;
|
||||
db.store(0, b"node_id", &node_id.unwrap().bytes)?;
|
||||
db.store(0, b"node_id", &node_id.unwrap().bytes).await?;
|
||||
}
|
||||
|
||||
// Schedule flushing
|
||||
@@ -159,7 +159,7 @@ impl Crypto {
|
||||
};
|
||||
|
||||
let db = table_store.open("crypto_caches", 1).await?;
|
||||
db.store(0, b"dh_cache", &cache_bytes)?;
|
||||
db.store(0, b"dh_cache", &cache_bytes).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::intf::table_db::*;
|
||||
use crate::intf::table_db::TableDBInner;
|
||||
pub use crate::intf::table_db::{TableDB, TableDBTransaction};
|
||||
use crate::*;
|
||||
use keyvaluedb_sqlite::*;
|
||||
use std::path::PathBuf;
|
||||
|
||||
@@ -83,42 +83,42 @@ impl TableDB {
|
||||
}
|
||||
|
||||
/// Store a key with a value in a column in the TableDB. Performs a single transaction immediately.
|
||||
pub fn store(&self, col: u32, key: &[u8], value: &[u8]) -> EyreResult<()> {
|
||||
let db = &self.inner.lock().database;
|
||||
pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> EyreResult<()> {
|
||||
let db = self.inner.lock().database.clone();
|
||||
let mut dbt = db.transaction();
|
||||
dbt.put(col, key, value);
|
||||
db.write(dbt).wrap_err("failed to store key")
|
||||
db.write(dbt).await.wrap_err("failed to store key")
|
||||
}
|
||||
|
||||
/// Store a key in rkyv format with a value in a column in the TableDB. Performs a single transaction immediately.
|
||||
pub fn store_rkyv<T>(&self, col: u32, key: &[u8], value: &T) -> EyreResult<()>
|
||||
pub async fn store_rkyv<T>(&self, col: u32, key: &[u8], value: &T) -> EyreResult<()>
|
||||
where
|
||||
T: RkyvSerialize<rkyv::ser::serializers::AllocSerializer<1024>>,
|
||||
{
|
||||
let v = to_rkyv(value)?;
|
||||
|
||||
let db = &self.inner.lock().database;
|
||||
let db = self.inner.lock().database.clone();
|
||||
let mut dbt = db.transaction();
|
||||
dbt.put(col, key, v.as_slice());
|
||||
db.write(dbt).wrap_err("failed to store key")
|
||||
db.write(dbt).await.wrap_err("failed to store key")
|
||||
}
|
||||
|
||||
/// Store a key in json format with a value in a column in the TableDB. Performs a single transaction immediately.
|
||||
pub fn store_json<T>(&self, col: u32, key: &[u8], value: &T) -> EyreResult<()>
|
||||
pub async fn store_json<T>(&self, col: u32, key: &[u8], value: &T) -> EyreResult<()>
|
||||
where
|
||||
T: serde::Serialize,
|
||||
{
|
||||
let v = serde_json::to_vec(value)?;
|
||||
|
||||
let db = &self.inner.lock().database;
|
||||
let db = self.inner.lock().database.clone();
|
||||
let mut dbt = db.transaction();
|
||||
dbt.put(col, key, v.as_slice());
|
||||
db.write(dbt).wrap_err("failed to store key")
|
||||
db.write(dbt).await.wrap_err("failed to store key")
|
||||
}
|
||||
|
||||
/// Read a key from a column in the TableDB immediately.
|
||||
pub fn load(&self, col: u32, key: &[u8]) -> EyreResult<Option<Vec<u8>>> {
|
||||
let db = &self.inner.lock().database;
|
||||
let db = self.inner.lock().database.clone();
|
||||
db.get(col, key).wrap_err("failed to get key")
|
||||
}
|
||||
|
||||
@@ -131,7 +131,7 @@ impl TableDB {
|
||||
<T as RkyvArchive>::Archived:
|
||||
RkyvDeserialize<T, rkyv::de::deserializers::SharedDeserializeMap>,
|
||||
{
|
||||
let db = &self.inner.lock().database;
|
||||
let db = self.inner.lock().database.clone();
|
||||
let out = db.get(col, key).wrap_err("failed to get key")?;
|
||||
let b = match out {
|
||||
Some(v) => v,
|
||||
@@ -148,7 +148,7 @@ impl TableDB {
|
||||
where
|
||||
T: for<'de> serde::Deserialize<'de>,
|
||||
{
|
||||
let db = &self.inner.lock().database;
|
||||
let db = self.inner.lock().database.clone();
|
||||
let out = db.get(col, key).wrap_err("failed to get key")?;
|
||||
let b = match out {
|
||||
Some(v) => v,
|
||||
@@ -161,15 +161,15 @@ impl TableDB {
|
||||
}
|
||||
|
||||
/// Delete key with from a column in the TableDB
|
||||
pub fn delete(&self, col: u32, key: &[u8]) -> EyreResult<bool> {
|
||||
let db = &self.inner.lock().database;
|
||||
pub async fn delete(&self, col: u32, key: &[u8]) -> EyreResult<bool> {
|
||||
let db = self.inner.lock().database.clone();
|
||||
let found = db.get(col, key).wrap_err("failed to get key")?;
|
||||
match found {
|
||||
None => Ok(false),
|
||||
Some(_) => {
|
||||
let mut dbt = db.transaction();
|
||||
dbt.delete(col, key);
|
||||
db.write(dbt).wrap_err("failed to delete key")?;
|
||||
db.write(dbt).await.wrap_err("failed to delete key")?;
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
@@ -212,55 +212,58 @@ impl TableDBTransaction {
|
||||
}
|
||||
|
||||
/// Commit the transaction. Performs all actions atomically.
|
||||
pub fn commit(mut self) -> EyreResult<()> {
|
||||
pub async fn commit(self) -> EyreResult<()> {
|
||||
let dbt = {
|
||||
let inner = self.inner.lock();
|
||||
let mut inner = self.inner.lock();
|
||||
inner
|
||||
.dbt
|
||||
.take()
|
||||
.ok_or_else(|| Err(eyre!("transaction already completed")))?
|
||||
.ok_or_else(|| eyre!("transaction already completed"))?
|
||||
};
|
||||
self.db
|
||||
.inner
|
||||
.lock()
|
||||
.database
|
||||
.write(dbt)
|
||||
let db = self.db.inner.lock().database.clone();
|
||||
db.write(dbt)
|
||||
.await
|
||||
.wrap_err("commit failed, transaction lost")
|
||||
}
|
||||
|
||||
/// Rollback the transaction. Does nothing to the TableDB.
|
||||
pub fn rollback(mut self) {
|
||||
self.dbt = None;
|
||||
pub fn rollback(self) {
|
||||
let mut inner = self.inner.lock();
|
||||
inner.dbt = None;
|
||||
}
|
||||
|
||||
/// Store a key with a value in a column in the TableDB
|
||||
pub fn store(&mut self, col: u32, key: &[u8], value: &[u8]) {
|
||||
self.dbt.as_mut().unwrap().put(col, key, value);
|
||||
pub fn store(&self, col: u32, key: &[u8], value: &[u8]) {
|
||||
let mut inner = self.inner.lock();
|
||||
inner.dbt.as_mut().unwrap().put(col, key, value);
|
||||
}
|
||||
|
||||
/// Store a key in rkyv format with a value in a column in the TableDB
|
||||
pub fn store_rkyv<T>(&mut self, col: u32, key: &[u8], value: &T) -> EyreResult<()>
|
||||
pub fn store_rkyv<T>(&self, col: u32, key: &[u8], value: &T) -> EyreResult<()>
|
||||
where
|
||||
T: RkyvSerialize<rkyv::ser::serializers::AllocSerializer<1024>>,
|
||||
{
|
||||
let v = to_rkyv(value)?;
|
||||
self.dbt.as_mut().unwrap().put(col, key, v.as_slice());
|
||||
let mut inner = self.inner.lock();
|
||||
inner.dbt.as_mut().unwrap().put(col, key, v.as_slice());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Store a key in rkyv format with a value in a column in the TableDB
|
||||
pub fn store_json<T>(&mut self, col: u32, key: &[u8], value: &T) -> EyreResult<()>
|
||||
pub fn store_json<T>(&self, col: u32, key: &[u8], value: &T) -> EyreResult<()>
|
||||
where
|
||||
T: serde::Serialize,
|
||||
{
|
||||
let v = serde_json::to_vec(value)?;
|
||||
self.dbt.as_mut().unwrap().put(col, key, v.as_slice());
|
||||
let mut inner = self.inner.lock();
|
||||
inner.dbt.as_mut().unwrap().put(col, key, v.as_slice());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete key with from a column in the TableDB
|
||||
pub fn delete(&mut self, col: u32, key: &[u8]) {
|
||||
self.dbt.as_mut().unwrap().delete(col, key);
|
||||
pub fn delete(&self, col: u32, key: &[u8]) {
|
||||
let mut inner = self.inner.lock();
|
||||
inner.dbt.as_mut().unwrap().delete(col, key);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::intf::table_db::*;
|
||||
use crate::intf::table_db::TableDBInner;
|
||||
pub use crate::intf::table_db::{TableDB, TableDBTransaction};
|
||||
use crate::*;
|
||||
use keyvaluedb_web::*;
|
||||
|
||||
|
||||
@@ -241,7 +241,7 @@ impl RoutingTable {
|
||||
let table_store = self.network_manager().table_store();
|
||||
let tdb = table_store.open("routing_table", 1).await?;
|
||||
let bucket_count = bucketvec.len();
|
||||
let mut dbx = tdb.transact();
|
||||
let dbx = tdb.transact();
|
||||
if let Err(e) = dbx.store_rkyv(0, b"bucket_count", &bucket_count) {
|
||||
dbx.rollback();
|
||||
return Err(e);
|
||||
@@ -250,7 +250,7 @@ impl RoutingTable {
|
||||
for (n, b) in bucketvec.iter().enumerate() {
|
||||
dbx.store(0, format!("bucket_{}", n).as_bytes(), b)
|
||||
}
|
||||
dbx.commit()?;
|
||||
dbx.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -512,7 +512,7 @@ impl RouteSpecStore {
|
||||
.network_manager()
|
||||
.table_store();
|
||||
let rsstdb = table_store.open("RouteSpecStore", 1).await?;
|
||||
rsstdb.store_rkyv(0, b"content", &content)?;
|
||||
rsstdb.store_rkyv(0, b"content", &content).await?;
|
||||
|
||||
// // Keep secrets in protected store as well
|
||||
let pstore = self
|
||||
|
||||
@@ -63,7 +63,7 @@ pub async fn test_store_delete_load(ts: TableStore) {
|
||||
"should not load missing key"
|
||||
);
|
||||
assert!(
|
||||
db.store(1, b"foo", b"1234567890").is_ok(),
|
||||
db.store(1, b"foo", b"1234567890").await.is_ok(),
|
||||
"should store new key"
|
||||
);
|
||||
assert_eq!(
|
||||
@@ -74,23 +74,25 @@ pub async fn test_store_delete_load(ts: TableStore) {
|
||||
assert_eq!(db.load(1, b"foo").unwrap(), Some(b"1234567890".to_vec()));
|
||||
|
||||
assert!(
|
||||
db.store(1, b"bar", b"FNORD").is_ok(),
|
||||
db.store(1, b"bar", b"FNORD").await.is_ok(),
|
||||
"should store new key"
|
||||
);
|
||||
assert!(
|
||||
db.store(0, b"bar", b"ABCDEFGHIJKLMNOPQRSTUVWXYZ").is_ok(),
|
||||
db.store(0, b"bar", b"ABCDEFGHIJKLMNOPQRSTUVWXYZ")
|
||||
.await
|
||||
.is_ok(),
|
||||
"should store new key"
|
||||
);
|
||||
assert!(
|
||||
db.store(2, b"bar", b"FNORD").is_ok(),
|
||||
db.store(2, b"bar", b"FNORD").await.is_ok(),
|
||||
"should store new key"
|
||||
);
|
||||
assert!(
|
||||
db.store(2, b"baz", b"QWERTY").is_ok(),
|
||||
db.store(2, b"baz", b"QWERTY").await.is_ok(),
|
||||
"should store new key"
|
||||
);
|
||||
assert!(
|
||||
db.store(2, b"bar", b"QWERTYUIOP").is_ok(),
|
||||
db.store(2, b"bar", b"QWERTYUIOP").await.is_ok(),
|
||||
"should store new key"
|
||||
);
|
||||
|
||||
@@ -102,10 +104,10 @@ pub async fn test_store_delete_load(ts: TableStore) {
|
||||
assert_eq!(db.load(2, b"bar").unwrap(), Some(b"QWERTYUIOP".to_vec()));
|
||||
assert_eq!(db.load(2, b"baz").unwrap(), Some(b"QWERTY".to_vec()));
|
||||
|
||||
assert_eq!(db.delete(1, b"bar").unwrap(), true);
|
||||
assert_eq!(db.delete(1, b"bar").unwrap(), false);
|
||||
assert_eq!(db.delete(1, b"bar").await.unwrap(), true);
|
||||
assert_eq!(db.delete(1, b"bar").await.unwrap(), false);
|
||||
assert!(
|
||||
db.delete(4, b"bar").is_err(),
|
||||
db.delete(4, b"bar").await.is_err(),
|
||||
"can't delete from column that doesn't exist"
|
||||
);
|
||||
|
||||
@@ -128,7 +130,7 @@ pub async fn test_frozen(ts: TableStore) {
|
||||
let db = ts.open("test", 3).await.expect("should have opened");
|
||||
let (dht_key, _) = generate_secret();
|
||||
|
||||
assert!(db.store_rkyv(0, b"asdf", &dht_key).is_ok());
|
||||
assert!(db.store_rkyv(0, b"asdf", &dht_key).await.is_ok());
|
||||
|
||||
assert_eq!(db.load_rkyv::<DHTKey>(0, b"qwer").unwrap(), None);
|
||||
|
||||
@@ -141,7 +143,7 @@ pub async fn test_frozen(ts: TableStore) {
|
||||
assert_eq!(d, Some(dht_key), "keys should be equal");
|
||||
|
||||
assert!(
|
||||
db.store(1, b"foo", b"1234567890").is_ok(),
|
||||
db.store(1, b"foo", b"1234567890").await.is_ok(),
|
||||
"should store new key"
|
||||
);
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ pub use crypto::Crypto;
|
||||
pub use crypto::{generate_secret, sign, verify, DHTKey, DHTKeySecret, DHTSignature, Nonce};
|
||||
pub use intf::BlockStore;
|
||||
pub use intf::ProtectedStore;
|
||||
pub use intf::TableStore;
|
||||
pub use intf::{TableDB, TableDBTransaction, TableStore};
|
||||
pub use network_manager::NetworkManager;
|
||||
pub use routing_table::{NodeRef, NodeRefBase};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user