test work

This commit is contained in:
John Smith
2023-06-18 18:47:39 -04:00
parent b8e5039251
commit b6e055e47d
21 changed files with 664 additions and 171 deletions

View File

@@ -44,7 +44,14 @@ impl Network {
// Spawn a local async task for each socket
let mut protocol_handlers_unordered = FuturesUnordered::new();
let network_manager = this.network_manager();
let stop_token = this.inner.lock().stop_source.as_ref().unwrap().token();
let stop_token = {
let inner = this.inner.lock();
if inner.stop_source.is_none() {
log_net!(debug "exiting UDP listener before it starts because we encountered an error");
return;
}
inner.stop_source.as_ref().unwrap().token()
};
for ph in protocol_handlers {
let network_manager = network_manager.clone();

View File

@@ -1234,9 +1234,10 @@ impl RoutingTableInner {
let kind = node_id.kind;
let mut closest_nodes_locked: Vec<NodeRefLocked> = closest_nodes
.iter()
.filter_map(|x| {
if x.node_ids().kinds().contains(&kind) {
Some(x.locked(self))
.filter_map(|nr| {
let nr_locked = nr.locked(self);
if nr_locked.node_ids().kinds().contains(&kind) {
Some(nr_locked)
} else {
None
}

View File

@@ -184,17 +184,22 @@ where
}
// Delete record
rt_xact.delete(0, &k.bytes());
if let Err(e) = rt_xact.delete(0, &k.bytes()) {
log_stor!(error "record could not be deleted: {}", e);
}
// Delete subkeys
let subkey_count = v.subkey_count() as u32;
for sk in 0..subkey_count {
let stored_subkeys = v.stored_subkeys();
for sk in stored_subkeys.iter() {
// From table
let stk = SubkeyTableKey {
key: k.key,
subkey: sk,
};
st_xact.delete(0, &stk.bytes());
let stkb = stk.bytes();
if let Err(e) = st_xact.delete(0, &stkb) {
log_stor!(error "subkey could not be deleted: {}", e);
}
// From cache
self.remove_from_subkey_cache(stk);
@@ -355,8 +360,8 @@ where
want_descriptor: bool,
) -> VeilidAPIResult<Option<SubkeyResult>> {
// record from index
let Some((subkey_count, opt_descriptor)) = self.with_record(key, |record| {
(record.subkey_count(), if want_descriptor {
let Some((subkey_count, has_subkey, opt_descriptor)) = self.with_record(key, |record| {
(record.subkey_count(), record.stored_subkeys().contains(subkey), if want_descriptor {
Some(record.descriptor().clone())
} else {
None
@@ -371,6 +376,15 @@ where
apibail_invalid_argument!("subkey out of range", "subkey", subkey);
}
// See if we have this subkey stored
if !has_subkey {
// If not, return no value but maybe with descriptor
return Ok(Some(SubkeyResult {
value: None,
descriptor: opt_descriptor,
}));
}
// Get subkey table
let Some(subkey_table) = self.subkey_table.clone() else {
apibail_internal!("record store not initialized");
@@ -386,28 +400,23 @@ where
descriptor: opt_descriptor,
}));
}
// If not in cache, try to pull from table store
if let Some(record_data) = subkey_table
// If not in cache, try to pull from table store if it is in our stored subkey set
let Some(record_data) = subkey_table
.load_rkyv::<RecordData>(0, &stk.bytes())
.await
.map_err(VeilidAPIError::internal)?
{
let out = record_data.signed_value_data().clone();
.map_err(VeilidAPIError::internal)? else {
apibail_internal!("failed to get subkey that was stored");
};
// Add to cache, do nothing with lru out
self.add_to_subkey_cache(stk, record_data);
let out = record_data.signed_value_data().clone();
return Ok(Some(SubkeyResult {
value: Some(out),
descriptor: opt_descriptor,
}));
};
// Add to cache, do nothing with lru out
self.add_to_subkey_cache(stk, record_data);
// Record was available, but subkey was not found, maybe descriptor gets returned
Ok(Some(SubkeyResult {
value: None,
return Ok(Some(SubkeyResult {
value: Some(out),
descriptor: opt_descriptor,
}))
}));
}
pub async fn set_subkey(
@@ -492,6 +501,7 @@ where
// Update record
self.with_record_mut(key, |record| {
record.store_subkey(subkey);
record.set_record_data_size(new_record_data_size);
})
.expect("record should still be here");
@@ -522,10 +532,11 @@ where
out += "Record Index:\n";
for (rik, rec) in &self.record_index {
out += &format!(
" {} @ {} len={}\n",
" {} @ {} len={} subkeys={}\n",
rik.key.to_string(),
rec.last_touched().as_u64(),
rec.record_data_size()
rec.record_data_size(),
rec.stored_subkeys(),
);
}
out += &format!("Subkey Cache Count: {}\n", self.subkey_cache.len());

View File

@@ -12,6 +12,7 @@ where
{
descriptor: SignedValueDescriptor,
subkey_count: usize,
stored_subkeys: ValueSubkeyRangeSet,
last_touched_ts: Timestamp,
record_data_size: usize,
detail: D,
@@ -33,6 +34,7 @@ where
Ok(Self {
descriptor,
subkey_count,
stored_subkeys: ValueSubkeyRangeSet::new(),
last_touched_ts: cur_ts,
record_data_size: 0,
detail,
@@ -50,6 +52,13 @@ where
self.subkey_count
}
pub fn stored_subkeys(&self) -> &ValueSubkeyRangeSet {
&self.stored_subkeys
}
pub fn store_subkey(&mut self, subkey: ValueSubkey) {
self.stored_subkeys.insert(subkey);
}
pub fn touch(&mut self, cur_ts: Timestamp) {
self.last_touched_ts = cur_ts
}

View File

@@ -45,6 +45,7 @@ impl Drop for TableDBUnlockedInner {
#[derive(Debug, Clone)]
pub struct TableDB {
opened_column_count: u32,
unlocked_inner: Arc<TableDBUnlockedInner>,
}
@@ -56,11 +57,13 @@ impl TableDB {
database: Database,
encryption_key: Option<TypedSharedSecret>,
decryption_key: Option<TypedSharedSecret>,
opened_column_count: u32,
) -> Self {
let encrypt_info = encryption_key.map(|ek| CryptInfo::new(crypto.clone(), ek));
let decrypt_info = decryption_key.map(|dk| CryptInfo::new(crypto.clone(), dk));
Self {
opened_column_count,
unlocked_inner: Arc::new(TableDBUnlockedInner {
table,
table_store,
@@ -71,8 +74,12 @@ impl TableDB {
}
}
pub(super) fn try_new_from_weak_inner(weak_inner: Weak<TableDBUnlockedInner>) -> Option<Self> {
pub(super) fn try_new_from_weak_inner(
weak_inner: Weak<TableDBUnlockedInner>,
opened_column_count: u32,
) -> Option<Self> {
weak_inner.upgrade().map(|table_db_unlocked_inner| Self {
opened_column_count,
unlocked_inner: table_db_unlocked_inner,
})
}
@@ -82,6 +89,7 @@ impl TableDB {
}
/// Get the total number of columns in the TableDB
/// Not the number of columns that were opened, rather the total number that could be opened
pub fn get_column_count(&self) -> VeilidAPIResult<u32> {
let db = &self.unlocked_inner.database;
db.num_columns().map_err(VeilidAPIError::from)
@@ -144,8 +152,14 @@ impl TableDB {
}
}
/// Get the list of keys in a column of the TableDB
/// Get the list of keys in a column of the TableDAB
pub async fn get_keys(&self, col: u32) -> VeilidAPIResult<Vec<Vec<u8>>> {
if col >= self.opened_column_count {
apibail_generic!(format!(
"Column exceeds opened column count {} >= {}",
col, self.opened_column_count
));
}
let db = self.unlocked_inner.database.clone();
let mut out = Vec::new();
db.iter_keys(col, None, |k| {
@@ -165,6 +179,12 @@ impl TableDB {
/// Store a key with a value in a column in the TableDB. Performs a single transaction immediately.
pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> {
if col >= self.opened_column_count {
apibail_generic!(format!(
"Column exceeds opened column count {} >= {}",
col, self.opened_column_count
));
}
let db = self.unlocked_inner.database.clone();
let mut dbt = db.transaction();
dbt.put(
@@ -195,6 +215,12 @@ impl TableDB {
/// Read a key from a column in the TableDB immediately.
pub async fn load(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<Vec<u8>>> {
if col >= self.opened_column_count {
apibail_generic!(format!(
"Column exceeds opened column count {} >= {}",
col, self.opened_column_count
));
}
let db = self.unlocked_inner.database.clone();
let key = self.maybe_encrypt(key, true);
Ok(db
@@ -233,6 +259,12 @@ impl TableDB {
/// Delete key with from a column in the TableDB
pub async fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<Vec<u8>>> {
if col >= self.opened_column_count {
apibail_generic!(format!(
"Column exceeds opened column count {} >= {}",
col, self.opened_column_count
));
}
let key = self.maybe_encrypt(key, true);
let db = self.unlocked_inner.database.clone();
@@ -330,11 +362,19 @@ impl TableDBTransaction {
}
/// Store a key with a value in a column in the TableDB
pub fn store(&self, col: u32, key: &[u8], value: &[u8]) {
pub fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> {
if col >= self.db.opened_column_count {
apibail_generic!(format!(
"Column exceeds opened column count {} >= {}",
col, self.db.opened_column_count
));
}
let key = self.db.maybe_encrypt(key, true);
let value = self.db.maybe_encrypt(value, false);
let mut inner = self.inner.lock();
inner.dbt.as_mut().unwrap().put_owned(col, key, value);
Ok(())
}
/// Store a key in rkyv format with a value in a column in the TableDB
@@ -343,12 +383,7 @@ impl TableDBTransaction {
T: RkyvSerialize<DefaultVeilidRkyvSerializer>,
{
let value = to_rkyv(value)?;
let key = self.db.maybe_encrypt(key, true);
let value = self.db.maybe_encrypt(&value, false);
let mut inner = self.inner.lock();
inner.dbt.as_mut().unwrap().put_owned(col, key, value);
Ok(())
self.store(col, key, &value)
}
/// Store a key in rkyv format with a value in a column in the TableDB
@@ -357,19 +392,22 @@ impl TableDBTransaction {
T: serde::Serialize,
{
let value = serde_json::to_vec(value).map_err(VeilidAPIError::internal)?;
let key = self.db.maybe_encrypt(key, true);
let value = self.db.maybe_encrypt(&value, false);
let mut inner = self.inner.lock();
inner.dbt.as_mut().unwrap().put_owned(col, key, value);
Ok(())
self.store(col, key, &value)
}
/// Delete key with from a column in the TableDB
pub fn delete(&self, col: u32, key: &[u8]) {
pub fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult<()> {
if col >= self.db.opened_column_count {
apibail_generic!(format!(
"Column exceeds opened column count {} >= {}",
col, self.db.opened_column_count
));
}
let key = self.db.maybe_encrypt(key, true);
let mut inner = self.inner.lock();
inner.dbt.as_mut().unwrap().delete_owned(col, key);
Ok(())
}
}

View File

@@ -428,6 +428,7 @@ impl TableStore {
}
pub(crate) fn on_table_db_drop(&self, table: String) {
log_rtab!(debug "dropping table db: {}", table);
let mut inner = self.inner.lock();
if inner.opened.remove(&table).is_none() {
unreachable!("should have removed an item");
@@ -449,12 +450,21 @@ impl TableStore {
let table_name = self.name_get_or_create(name).await?;
// See if this table is already opened
// See if this table is already opened, if so the column count must be the same
{
let mut inner = self.inner.lock();
if let Some(table_db_weak_inner) = inner.opened.get(&table_name) {
match TableDB::try_new_from_weak_inner(table_db_weak_inner.clone()) {
match TableDB::try_new_from_weak_inner(table_db_weak_inner.clone(), column_count) {
Some(tdb) => {
// Ensure column count isnt bigger
let existing_col_count = tdb.get_column_count()?;
if column_count > existing_col_count {
return Err(VeilidAPIError::generic(format!(
"database must be closed before increasing column count {} -> {}",
existing_col_count, column_count,
)));
}
return Ok(tdb);
}
None => {
@@ -465,7 +475,7 @@ impl TableStore {
}
// Open table db using platform-specific driver
let db = match self
let mut db = match self
.table_store_driver
.open(&table_name, column_count)
.await
@@ -481,6 +491,24 @@ impl TableStore {
// Flush table names to disk
self.flush().await;
// If more columns are available, open the low level db with the max column count but restrict the tabledb object to the number requested
let existing_col_count = db.num_columns().map_err(VeilidAPIError::from)?;
if existing_col_count > column_count {
drop(db);
db = match self
.table_store_driver
.open(&table_name, existing_col_count)
.await
{
Ok(db) => db,
Err(e) => {
self.name_delete(name).await.expect("cleanup failed");
self.flush().await;
return Err(e);
}
};
}
// Wrap low-level Database in TableDB object
let mut inner = self.inner.lock();
let table_db = TableDB::new(
@@ -490,6 +518,7 @@ impl TableStore {
db,
inner.encryption_key.clone(),
inner.encryption_key.clone(),
column_count,
);
// Keep track of opened DBs

View File

@@ -132,6 +132,41 @@ pub async fn test_store_delete_load(ts: TableStore) {
assert_eq!(db.load(2, b"baz").await.unwrap(), Some(b"QWERTY".to_vec()));
}
pub async fn test_transaction(ts: TableStore) {
trace!("test_transaction");
let _ = ts.delete("test");
let db = ts.open("test", 3).await.expect("should have opened");
assert!(
ts.delete("test").await.is_err(),
"should fail because file is opened"
);
let tx = db.transact();
assert!(tx.store(0, b"aaa", b"a-value").is_ok());
assert!(tx.store_json(1, b"bbb", &"b-value".to_owned()).is_ok());
assert!(tx.store_rkyv(2, b"ccc", &"c-value".to_owned()).is_ok());
assert!(tx.store(3, b"ddd", b"d-value").is_err());
assert!(tx.store(0, b"ddd", b"d-value").is_ok());
assert!(tx.delete(0, b"ddd").is_ok());
assert!(tx.commit().await.is_ok());
let tx = db.transact();
assert!(tx.delete(2, b"ccc").is_ok());
tx.rollback();
assert_eq!(db.load(0, b"aaa").await, Ok(Some(b"a-value".to_vec())));
assert_eq!(
db.load_json::<String>(1, b"bbb").await,
Ok(Some("b-value".to_owned()))
);
assert_eq!(
db.load_rkyv::<String>(2, b"ccc").await,
Ok(Some("c-value".to_owned()))
);
assert_eq!(db.load(0, b"ddd").await, Ok(None));
}
pub async fn test_rkyv(vcrypto: CryptoSystemVersion, ts: TableStore) {
trace!("test_rkyv");
@@ -268,6 +303,7 @@ pub async fn test_all() {
test_protect_unprotect(vcrypto.clone(), ts.clone()).await;
test_delete_open_delete(ts.clone()).await;
test_store_delete_load(ts.clone()).await;
test_transaction(ts.clone()).await;
test_rkyv(vcrypto.clone(), ts.clone()).await;
test_json(vcrypto, ts.clone()).await;
let _ = ts.delete("test").await;

View File

@@ -405,12 +405,15 @@ impl JsonRequestProcessor {
TableDbTransactionResponseOp::Rollback {}
}
TableDbTransactionRequestOp::Store { col, key, value } => {
table_db_transaction.store(col, &key, &value);
TableDbTransactionResponseOp::Store {}
TableDbTransactionResponseOp::Store {
result: to_json_api_result(table_db_transaction.store(col, &key, &value)),
}
}
TableDbTransactionRequestOp::Delete { col, key } => {
table_db_transaction.delete(col, &key);
TableDbTransactionResponseOp::Delete {}
TableDbTransactionResponseOp::Delete {
result: to_json_api_result(table_db_transaction.delete(col, &key)),
}
}
};
TableDbTransactionResponse {

View File

@@ -124,6 +124,12 @@ pub enum TableDbTransactionResponseOp {
result: ApiResult<()>,
},
Rollback {},
Store {},
Delete {},
Store {
#[serde(flatten)]
result: ApiResult<()>,
},
Delete {
#[serde(flatten)]
result: ApiResult<()>,
},
}

View File

@@ -4,7 +4,6 @@ use range_set_blaze::*;
#[derive(
Clone,
Debug,
Default,
PartialOrd,
PartialEq,
@@ -55,3 +54,15 @@ impl DerefMut for ValueSubkeyRangeSet {
&mut self.data
}
}
impl fmt::Debug for ValueSubkeyRangeSet {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.data)
}
}
impl fmt::Display for ValueSubkeyRangeSet {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.data)
}
}