From 7eded89b1123476e4a7adfc4a2f1ce3fc7abbfc4 Mon Sep 17 00:00:00 2001 From: John Smith Date: Mon, 3 Apr 2023 20:58:10 -0400 Subject: [PATCH] checkpoint --- veilid-core/src/crypto/byte_array_types.rs | 26 +- veilid-core/src/intf/native/table_store.rs | 15 + veilid-core/src/storage_manager/mod.rs | 39 ++- .../src/storage_manager/record_store.rs | 314 +++++++++++++----- .../storage_manager/record_store_limits.rs | 7 +- .../src/storage_manager/value_record.rs | 23 +- 6 files changed, 319 insertions(+), 105 deletions(-) diff --git a/veilid-core/src/crypto/byte_array_types.rs b/veilid-core/src/crypto/byte_array_types.rs index 331376d3..3a404698 100644 --- a/veilid-core/src/crypto/byte_array_types.rs +++ b/veilid-core/src/crypto/byte_array_types.rs @@ -126,18 +126,6 @@ macro_rules! byte_array_type { Self { bytes } } - pub fn try_from_vec(v: Vec) -> Result { - let vl = v.len(); - Ok(Self { - bytes: v.try_into().map_err(|_| { - VeilidAPIError::generic(format!( - "Expected a Vec of length {} but it was {}", - $size, vl - )) - })?, - }) - } - pub fn bit(&self, index: usize) -> bool { assert!(index < ($size * 8)); let bi = index / 8; @@ -250,6 +238,20 @@ macro_rules! byte_array_type { Self::try_decode(value) } } + impl TryFrom(&[u8]) for $name { + type Error = VeilidAPIError; + pub fn try_from(v: &[u8]) -> Result { + let vl = v.len(); + Ok(Self { + bytes: v.try_into().map_err(|_| { + VeilidAPIError::generic(format!( + "Expected a slice of length {} but it was {}", + $size, vl + )) + })?, + }) + } + } }; } diff --git a/veilid-core/src/intf/native/table_store.rs b/veilid-core/src/intf/native/table_store.rs index a09b8e4d..cd9ebff9 100644 --- a/veilid-core/src/intf/native/table_store.rs +++ b/veilid-core/src/intf/native/table_store.rs @@ -40,6 +40,21 @@ impl TableStore { if let Err(e) = self.delete("routing_table").await { error!("failed to delete 'routing_table': {}", e); } + if let Err(e) = self.delete("routing_table").await { + error!("failed to delete 'routing_table': {}", e); + } + if let Err(e) = self.delete("local_records").await { + error!("failed to delete 'local_records': {}", e); + } + if let Err(e) = self.delete("local_subkeys").await { + error!("failed to delete 'local_subkeys': {}", e); + } + if let Err(e) = self.delete("remote_records").await { + error!("failed to delete 'remote_records': {}", e); + } + if let Err(e) = self.delete("remote_subkeys").await { + error!("failed to delete 'remote_subkeys': {}", e); + } } pub(crate) async fn init(&self) -> EyreResult<()> { diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index da7bc588..9fe53969 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -8,8 +8,12 @@ use value_record::*; use super::*; use crate::rpc_processor::*; +/// Locked structure for storage manager struct StorageManagerInner { - record_store: RecordStore, + /// Records that have been 'created' or 'opened' by this node + local_record_store: Option, + /// Records that have been pushed to this node for distribution by other nodes + remote_record_store: Option, } struct StorageManagerUnlockedInner { @@ -47,7 +51,28 @@ impl StorageManager { } fn new_inner() -> StorageManagerInner { StorageManagerInner { - record_store: RecordStore::new(table_store), + local_record_store: None, + remote_record_store: None, + } + } + + fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { + RecordStoreLimits { + record_cache_size: todo!(), + subkey_cache_size: todo!(), + max_records: None, + max_subkey_cache_memory_mb: Some(xxx), + max_disk_space_mb: None, + } + } + + fn remote_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { + RecordStoreLimits { + record_cache_size: todo!(), + subkey_cache_size: todo!(), + max_records: Some(xxx), + max_subkey_cache_memory_mb: Some(xxx), + max_disk_space_mb: Some(xxx) } } @@ -59,6 +84,7 @@ impl StorageManager { block_store: BlockStore, rpc_processor: RPCProcessor, ) -> StorageManager { + StorageManager { unlocked_inner: Arc::new(Self::new_unlocked_inner( config, @@ -68,7 +94,7 @@ impl StorageManager { block_store, rpc_processor, )), - inner: Arc::new(Mutex::new(Self::new_inner())), + inner: Arc::new(Mutex::new(Self::new_inner())) } } @@ -76,7 +102,12 @@ impl StorageManager { pub async fn init(&self) -> EyreResult<()> { debug!("startup storage manager"); let mut inner = self.inner.lock(); - // xxx + + let local_limits = Self::local_limits_from_config(config.clone()); + let remote_limits = Self::remote_limits_from_config(config.clone()); + inner.local_record_store = Some(RecordStore::new(self.unlocked_inner.table_store.clone(), "local", local_limits)); + inner.remote_record_store = Some(RecordStore::new(self.unlocked_inner.table_store.clone(), "remote", remote_limits)); + Ok(()) } diff --git a/veilid-core/src/storage_manager/record_store.rs b/veilid-core/src/storage_manager/record_store.rs index bb59c871..f7db44d7 100644 --- a/veilid-core/src/storage_manager/record_store.rs +++ b/veilid-core/src/storage_manager/record_store.rs @@ -4,14 +4,62 @@ use hashlink::LruCache; pub type RecordIndex = u32; #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -struct RecordCacheKey { - record_idx: RecordIndex, +struct RecordIndexKey { + pub key: TypedKey, +} +impl RecordIndexKey { + pub fn bytes(&self) -> [u8; PUBLIC_KEY_LENGTH + 4] { + let mut bytes = [0u8; PUBLIC_KEY_LENGTH + 4]; + bytes[0..4] = self.key.kind.0; + bytes[4..PUBLIC_KEY_LENGTH + 4] = self.key.value.bytes; + bytes + } +} + +impl TryFrom<&[u8]> for RecordIndexKey { + type Error = EyreReport; + fn try_from(bytes: &[u8]) -> Result { + if bytes.len() != PUBLIC_KEY_LENGTH + 4 { + bail!("invalid bytes length"); + } + let kind = FourCC::try_from(&bytes[0..4]).wrap_err("invalid kind")?; + let value = + PublicKey::try_from(&bytes[4..PUBLIC_KEY_LENGTH + 4]).wrap_err("invalid value")?; + let key = TypedKey::new(kind, value); + Ok(RecordIndexKey { key }) + } } #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] struct SubkeyCacheKey { - record_idx: RecordIndex, - subkey: ValueSubkey, + pub key: TypedKey, + pub subkey: ValueSubkey, +} +impl SubkeyCacheKey { + pub fn bytes(&self) -> [u8; PUBLIC_KEY_LENGTH + 4 + 4] { + let mut bytes = [0u8; PUBLIC_KEY_LENGTH + 4 + 4]; + bytes[0..4] = self.key.kind.0; + bytes[4..PUBLIC_KEY_LENGTH + 4] = self.key.value.bytes; + bytes[PUBLIC_KEY_LENGTH + 4..PUBLIC_KEY_LENGTH + 4 + 4] = self.subkey.to_le_bytes(); + bytes + } +} +impl TryFrom<&[u8]> for SubkeyCacheKey { + type Error = EyreReport; + fn try_from(bytes: &[u8]) -> Result { + if bytes.len() != PUBLIC_KEY_LENGTH + 4 { + bail!("invalid bytes length"); + } + let kind = FourCC::try_from(&bytes[0..4]).wrap_err("invalid kind")?; + let value = + PublicKey::try_from(&bytes[4..PUBLIC_KEY_LENGTH + 4]).wrap_err("invalid value")?; + let subkey = + ValueSubkey::try_from(&bytes[PUBLIC_KEY_LENGTH + 4..PUBLIC_KEY_LENGTH + 4 + 4]) + .wrap_err("invalid subkey")?; + + let key = TypedKey::new(kind, value); + Ok(SubkeyCacheKey { key, subkey }) + } } pub struct RecordStore { @@ -21,15 +69,12 @@ pub struct RecordStore { record_table: Option, subkey_table: Option, - record_index: HashMap, - free_record_index_list: Vec, - record_cache: LruCache, + record_index: LruCache, subkey_cache: LruCache, } impl RecordStore { pub fn new(table_store: TableStore, name: &str, limits: RecordStoreLimits) -> Self { - let record_cache_size = limits.record_cache_size as usize; let subkey_cache_size = limits.subkey_cache_size as usize; Self { table_store, @@ -37,9 +82,7 @@ impl RecordStore { limits, record_table: None, subkey_table: None, - record_index: HashMap::new(), - free_record_index_list: Vec::new(), // xxx can this be auto-recovered? should we ever compact the allocated indexes? - record_cache: LruCache::new(record_cache_size), + record_index: LruCache::new(limits.max_records.unwrap_or(usize::MAX)), subkey_cache: LruCache::new(subkey_cache_size), } } @@ -54,82 +97,55 @@ impl RecordStore { .open(&&format!("{}_subkeys", self.name), 1) .await?; - // xxx get record index and free record index list + // Pull record index from table into a vector to ensure we sort them + let record_table_keys = record_table.get_keys(0)?; + let mut record_index_saved: Vec<(RecordIndexKey, ValueRecord)> = + Vec::with_capacity(record_table_keys.len()); + for rtk in record_table_keys { + if let Some(vr) = record_table.load_rkyv::(0, &rtk)? { + record_index_saved.push((rtk, vr)); + } + } + + // Sort the record index by last touched time and insert in sorted order + record_index_saved.sort_by(|a, b| a.1.last_touched().cmp(&b.1.last_touched())); + let mut dead_records = Vec::new(); + for ri in record_index_saved { + let rik = RecordIndexKey::try_from(&ri.0)?; + self.record_index.insert(rik, ri.1, |k, v| { + // If the configuration change, we only want to keep the 'limits.max_records' records + dead_records.push((k, v)); + }) + } + + // Delete dead keys + if !dead_records.empty() { + let rt_xact = record_table.transact(); + let st_xact = subkey_table.transact(); + for (k, v) in dead_records { + // Delete record + rt_xact.delete(0, &k.bytes()); + + // Delete subkeys + let subkey_count = v.subkey_count(); + for sk in 0..subkey_count { + let sck = SubkeyCacheKey { + key: k.key, + subkey: sk, + }; + st_xact.delete(0, &sck.bytes())?; + } + } + rt_xact.commit().await?; + st_xact.commit().await?; + } self.record_table = Some(record_table); self.subkey_table = Some(record_table); Ok(()) } - fn key_bytes(key: TypedKey) -> [u8; PUBLIC_KEY_LENGTH + 4] { - let mut bytes = [0u8; PUBLIC_KEY_LENGTH + 4]; - bytes[0..4] = key.kind.0; - bytes[4..PUBLIC_KEY_LENGTH + 4] = key.value.bytes; - bytes - } - - pub fn with_record R>( - &mut self, - key: TypedKey, - f: F, - ) -> EyreResult> { - // Get record table - let Some(record_table) = self.record_table.clone() else { - bail!("record store not initialized"); - }; - - // If record exists in cache, use that - if let Some(r) = self.record_cache.get(&key) { - // Callback - return Ok(Some(f(key, r))); - } - // If not in cache, try to pull from table store - let k = Self::key_bytes(key); - if let Some(r) = record_table.load_rkyv(0, &k)? { - // Callback - let out = f(key, &r); - - // Add to cache, do nothing with lru out - self.record_cache.insert(key, r, |_| {}); - - return Ok(Some(out)); - }; - - return Ok(None); - } - - pub fn with_record_mut R>( - &mut self, - key: TypedKey, - f: F, - ) -> EyreResult> { - // Get record table - let Some(record_table) = self.record_table.clone() else { - bail!("record store not initialized"); - }; - - // If record exists in cache, use that - if let Some(r) = self.record_cache.get_mut(&key) { - // Callback - return Ok(Some(f(key, r))); - } - // If not in cache, try to pull from table store - let k = Self::key_bytes(key); - if let Some(r) = record_table.load_rkyv(0, &k)? { - // Callback - let out = f(key, &mut r); - - // Save changes back to record table - record_table.store_rkyv(0, &k, &r).await?; - - // Add to cache, do nothing with lru out - self.record_cache.insert(key, r, |_| {}); - - return Ok(Some(out)); - }; - - Ok(None) - } + fix up new record pub fn new_record(&mut self, key: TypedKey, record: ValueRecord) -> EyreResult<()> { if self.with_record(key, |_| {})?.is_some() { @@ -149,4 +165,140 @@ impl RecordStore { Ok(()) } + + pub fn with_record(&mut self, key: TypedKey, f: F) -> EyreResult> + where + F: FnOnce(&mut RecordStore, TypedKey, &ValueRecord) -> R, + { + // Get record table + let Some(record_table) = self.record_table.clone() else { + bail!("record store not initialized"); + }; + + // If record exists in cache, use that + let rck = RecordIndexKey { key }; + if let Some(r) = self.record_cache.get(&rck) { + // Callback + return Ok(Some(f(self, key, r))); + } + // If not in cache, try to pull from table store + let k = rck.bytes(); + if let Some(r) = record_table.load_rkyv(0, &k)? { + // Callback + let out = f(self, key, &r); + + // Add to cache, do nothing with lru out + self.record_cache.insert(rck, r, |_| {}); + + return Ok(Some(out)); + }; + + return Ok(None); + } + + pub fn with_record_mut(&mut self, key: TypedKey, f: F) -> EyreResult> + where + F: FnOnce(&mut RecordStore, TypedKey, &mut ValueRecord) -> R, + { + // Get record table + let Some(record_table) = self.record_table.clone() else { + bail!("record store not initialized"); + }; + + // If record exists in cache, use that + let rck = RecordIndexKey { key }; + if let Some(r) = self.record_cache.get_mut(&rck) { + // Callback + return Ok(Some(f(self, key, r))); + } + // If not in cache, try to pull from table store + let k = rck.bytes(); + if let Some(r) = record_table.load_rkyv(0, &k)? { + // Callback + let out = f(self, key, &mut r); + + // Save changes back to record table + record_table.store_rkyv(0, &k, &r).await?; + + // Add to cache, do nothing with lru out + self.record_cache.insert(rck, r, |_| {}); + + return Ok(Some(out)); + }; + + Ok(None) + } + + pub fn with_subkey( + &mut self, + key: TypedKey, + subkey: ValueSubkey, + f: F, + ) -> EyreResult> + where + F: FnOnce(&mut RecordStore, TypedKey, ValueSubkey, &ValueRecordData) -> R, + { + // Get subkey table + let Some(subkey_table) = self.subkey_table.clone() else { + bail!("record store not initialized"); + }; + + // If subkey exists in subkey cache, use that + let skck = SubkeyCacheKey { key, subkey }; + if let Some(rd) = self.subkey_cache.get(&skck) { + // Callback + return Ok(Some(f(self, key, subkey, rd))); + } + // If not in cache, try to pull from table store + let k = skck.bytes(); + if let Some(rd) = subkey_table.load_rkyv(0, &k)? { + // Callback + let out = f(self, key, subkey, &rd); + + // Add to cache, do nothing with lru out + self.subkey_cache.insert(skck, r, |_| {}); + + return Ok(Some(out)); + }; + + return Ok(None); + } + + pub fn with_subkey_mut( + &mut self, + key: TypedKey, + subkey: ValueSubkey, + f: F, + ) -> EyreResult> + where + F: FnOnce(&mut RecordStore, TypedKey, ValueSubkey, &mut ValueRecord) -> R, + { + // Get record table + let Some(subkey_table) = self.subkey_table.clone() else { + bail!("record store not initialized"); + }; + + // If subkey exists in cache, use that + let skck = SubkeyCacheKey { key, subkey }; + if let Some(rd) = self.subkey_cache.get_mut(&skck) { + // Callback + return Ok(Some(f(self, key, subkey, rd))); + } + // If not in cache, try to pull from table store + let k = skck.bytes(); + if let Some(rd) = subkey_table.load_rkyv(0, &k)? { + // Callback + let out = f(self, key, subkey, &mut rd); + + // Save changes back to record table + subkey_table.store_rkyv(0, &k, &rd).await?; + + // Add to cache, do nothing with lru out + self.subkey_cache.insert(key, r, |_| {}); + + return Ok(Some(out)); + }; + + Ok(None) + } } diff --git a/veilid-core/src/storage_manager/record_store_limits.rs b/veilid-core/src/storage_manager/record_store_limits.rs index 1b879b26..45601ba0 100644 --- a/veilid-core/src/storage_manager/record_store_limits.rs +++ b/veilid-core/src/storage_manager/record_store_limits.rs @@ -3,9 +3,12 @@ use super::*; /// Configuration for the record store #[derive(Debug, Default, Copy, Clone)] pub struct RecordStoreLimits { - pub record_cache_size: u32, + /// Number of subkeys to keep in the memory cache pub subkey_cache_size: u32, + /// Limit on the total number of records in the table store pub max_records: Option, - pub max_cache_memory_mb: Option, + /// Limit on the amount of subkey cache memory to use before evicting cache items + pub max_subkey_cache_memory_mb: Option, + /// Limit on the amount of disk space to use for subkey data pub max_disk_space_mb: Option, } diff --git a/veilid-core/src/storage_manager/value_record.rs b/veilid-core/src/storage_manager/value_record.rs index 2b9ab997..ae460424 100644 --- a/veilid-core/src/storage_manager/value_record.rs +++ b/veilid-core/src/storage_manager/value_record.rs @@ -32,27 +32,38 @@ pub struct ValueRecordData { )] #[archive_attr(repr(C), derive(CheckBytes))] pub struct ValueRecord { + last_touched_ts: Timestamp, secret: Option, schema: DHTSchema, safety_selection: SafetySelection, - total_size: usize, - subkeys: Vec, + data_size: usize, } impl ValueRecord { pub fn new( + cur_ts: Timestamp, secret: Option, schema: DHTSchema, safety_selection: SafetySelection, ) -> Self { - // Get number of subkeys - let subkey_count = schema.subkey_count(); - Self { + last_touched_ts: cur_ts, secret, schema, safety_selection, - subkeys: vec![Vec::new(); subkey_count], + data_size: 0, } } + + pub fn subkey_count(&self) -> usize { + self.schema.subkey_count() + } + + pub fn touch(&mut self, cur_ts: Timestamp) { + self.last_touched_ts = cur_ts + } + + pub fn last_touched(&self) -> Timestamp { + self.last_touched_ts + } }