From 39ade462c239fe4dde8971421c7d87fcd46cb4c7 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 2 Apr 2023 20:40:46 -0400 Subject: [PATCH] storage manager work --- veilid-core/src/storage_manager/mod.rs | 43 ++++- .../src/storage_manager/record_store.rs | 152 ++++++++++++++++++ .../storage_manager/record_store_limits.rs | 11 ++ .../src/storage_manager/value_record.rs | 58 +++++++ veilid-core/src/veilid_api/types.rs | 23 +++ 5 files changed, 283 insertions(+), 4 deletions(-) create mode 100644 veilid-core/src/storage_manager/record_store.rs create mode 100644 veilid-core/src/storage_manager/record_store_limits.rs create mode 100644 veilid-core/src/storage_manager/value_record.rs diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 94ee3ccd..da7bc588 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -1,7 +1,16 @@ +mod record_store; +mod value_record; +mod record_store_limits; +use record_store::*; +use record_store_limits::*; +use value_record::*; + use super::*; use crate::rpc_processor::*; -struct StorageManagerInner {} +struct StorageManagerInner { + record_store: RecordStore, +} struct StorageManagerUnlockedInner { config: VeilidConfig, @@ -36,7 +45,11 @@ impl StorageManager { rpc_processor, } } - fn new_inner() -> StorageManagerInner {} + fn new_inner() -> StorageManagerInner { + StorageManagerInner { + record_store: RecordStore::new(table_store), + } + } pub fn new( config: VeilidConfig, @@ -76,6 +89,12 @@ impl StorageManager { debug!("finished storage manager shutdown"); } + async fn add_value_record(&self, key: TypedKey, record: ValueRecord) -> EyreResult<()> { + // add value record to record store + let mut inner = self.inner.lock(); + inner.record_store. + } + /// Creates a new DHT value with a specified crypto kind and schema /// Returns the newly allocated DHT Key if successful. pub async fn create_value( @@ -84,10 +103,26 @@ impl StorageManager { schema: &DHTSchema, safety_selection: SafetySelection, ) -> Result { - unimplemented!(); + // Get cryptosystem + let Some(vcrypto) = self.unlocked_inner.crypto.get(kind) else { + apibail_generic!("unsupported cryptosystem"); + }; + + // New values require a new owner key + let keypair = vcrypto.generate_keypair(); + let key = TypedKey::new(kind, keypair.key); + let secret = keypair.secret; + + // Add value record + let record = ValueRecord::new(Some(secret), schema, safety_selection); + self.add_value_record(key, record) + .await + .map_err(VeilidAPIError::internal)?; + + Ok(key) } - /// Opens a DHT value at a specific key. Associates a secret if one is provided to provide writer capability. + /// Opens a DHT value at a specific key. Associates an owner secret if one is provided. /// Returns the DHT key descriptor for the opened key if successful /// Value may only be opened or created once. To re-open with a different routing context, first close the value. pub async fn open_value( diff --git a/veilid-core/src/storage_manager/record_store.rs b/veilid-core/src/storage_manager/record_store.rs new file mode 100644 index 00000000..bb59c871 --- /dev/null +++ b/veilid-core/src/storage_manager/record_store.rs @@ -0,0 +1,152 @@ +use super::*; +use hashlink::LruCache; + +pub type RecordIndex = u32; + +#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +struct RecordCacheKey { + record_idx: RecordIndex, +} + +#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +struct SubkeyCacheKey { + record_idx: RecordIndex, + subkey: ValueSubkey, +} + +pub struct RecordStore { + table_store: TableStore, + name: String, + limits: RecordStoreLimits, + + record_table: Option, + subkey_table: Option, + record_index: HashMap, + free_record_index_list: Vec, + record_cache: LruCache, + subkey_cache: LruCache, +} + +impl RecordStore { + pub fn new(table_store: TableStore, name: &str, limits: RecordStoreLimits) -> Self { + let record_cache_size = limits.record_cache_size as usize; + let subkey_cache_size = limits.subkey_cache_size as usize; + Self { + table_store, + name: name.to_owned(), + limits, + record_table: None, + subkey_table: None, + record_index: HashMap::new(), + free_record_index_list: Vec::new(), // xxx can this be auto-recovered? should we ever compact the allocated indexes? + record_cache: LruCache::new(record_cache_size), + subkey_cache: LruCache::new(subkey_cache_size), + } + } + + pub async fn init(&mut self) -> EyreResult<()> { + let record_table = self + .table_store + .open(&format!("{}_records", self.name), 1) + .await?; + let subkey_table = self + .table_store + .open(&&format!("{}_subkeys", self.name), 1) + .await?; + + // xxx get record index and free record index list + + self.record_table = Some(record_table); + self.subkey_table = Some(record_table); + Ok(()) + } + + fn key_bytes(key: TypedKey) -> [u8; PUBLIC_KEY_LENGTH + 4] { + let mut bytes = [0u8; PUBLIC_KEY_LENGTH + 4]; + bytes[0..4] = key.kind.0; + bytes[4..PUBLIC_KEY_LENGTH + 4] = key.value.bytes; + bytes + } + + pub fn with_record R>( + &mut self, + key: TypedKey, + f: F, + ) -> EyreResult> { + // Get record table + let Some(record_table) = self.record_table.clone() else { + bail!("record store not initialized"); + }; + + // If record exists in cache, use that + if let Some(r) = self.record_cache.get(&key) { + // Callback + return Ok(Some(f(key, r))); + } + // If not in cache, try to pull from table store + let k = Self::key_bytes(key); + if let Some(r) = record_table.load_rkyv(0, &k)? { + // Callback + let out = f(key, &r); + + // Add to cache, do nothing with lru out + self.record_cache.insert(key, r, |_| {}); + + return Ok(Some(out)); + }; + + return Ok(None); + } + + pub fn with_record_mut R>( + &mut self, + key: TypedKey, + f: F, + ) -> EyreResult> { + // Get record table + let Some(record_table) = self.record_table.clone() else { + bail!("record store not initialized"); + }; + + // If record exists in cache, use that + if let Some(r) = self.record_cache.get_mut(&key) { + // Callback + return Ok(Some(f(key, r))); + } + // If not in cache, try to pull from table store + let k = Self::key_bytes(key); + if let Some(r) = record_table.load_rkyv(0, &k)? { + // Callback + let out = f(key, &mut r); + + // Save changes back to record table + record_table.store_rkyv(0, &k, &r).await?; + + // Add to cache, do nothing with lru out + self.record_cache.insert(key, r, |_| {}); + + return Ok(Some(out)); + }; + + Ok(None) + } + + pub fn new_record(&mut self, key: TypedKey, record: ValueRecord) -> EyreResult<()> { + if self.with_record(key, |_| {})?.is_some() { + bail!("record already exists"); + } + + // Get record table + let Some(record_table) = self.record_table.clone() else { + bail!("record store not initialized"); + }; + + // Save to record table + record_table.store_rkyv(0, &key, &r).await?; + + // Cache it + self.record_cache.insert(key, value, |_| {}); + + Ok(()) + } +} diff --git a/veilid-core/src/storage_manager/record_store_limits.rs b/veilid-core/src/storage_manager/record_store_limits.rs new file mode 100644 index 00000000..1b879b26 --- /dev/null +++ b/veilid-core/src/storage_manager/record_store_limits.rs @@ -0,0 +1,11 @@ +use super::*; + +/// Configuration for the record store +#[derive(Debug, Default, Copy, Clone)] +pub struct RecordStoreLimits { + pub record_cache_size: u32, + pub subkey_cache_size: u32, + pub max_records: Option, + pub max_cache_memory_mb: Option, + pub max_disk_space_mb: Option, +} diff --git a/veilid-core/src/storage_manager/value_record.rs b/veilid-core/src/storage_manager/value_record.rs new file mode 100644 index 00000000..2b9ab997 --- /dev/null +++ b/veilid-core/src/storage_manager/value_record.rs @@ -0,0 +1,58 @@ +use super::*; + +#[derive( + Clone, + Debug, + Default, + PartialEq, + Eq, + Serialize, + Deserialize, + RkyvArchive, + RkyvSerialize, + RkyvDeserialize, +)] +#[archive_attr(repr(C), derive(CheckBytes))] +pub struct ValueRecordData { + data: ValueData, + signature: Signature, +} + +#[derive( + Clone, + Debug, + Default, + PartialEq, + Eq, + Serialize, + Deserialize, + RkyvArchive, + RkyvSerialize, + RkyvDeserialize, +)] +#[archive_attr(repr(C), derive(CheckBytes))] +pub struct ValueRecord { + secret: Option, + schema: DHTSchema, + safety_selection: SafetySelection, + total_size: usize, + subkeys: Vec, +} + +impl ValueRecord { + pub fn new( + secret: Option, + schema: DHTSchema, + safety_selection: SafetySelection, + ) -> Self { + // Get number of subkeys + let subkey_count = schema.subkey_count(); + + Self { + secret, + schema, + safety_selection, + subkeys: vec![Vec::new(); subkey_count], + } + } +} diff --git a/veilid-core/src/veilid_api/types.rs b/veilid-core/src/veilid_api/types.rs index e2752321..5383afa9 100644 --- a/veilid-core/src/veilid_api/types.rs +++ b/veilid-core/src/veilid_api/types.rs @@ -2434,6 +2434,7 @@ pub struct DHTSchemaDFLT { } impl DHTSchemaDFLT { + /// Build the data representation of the schema pub fn compile(&self) -> Vec { let mut out = Vec::::with_capacity(6); // kind @@ -2442,6 +2443,11 @@ impl DHTSchemaDFLT { out.extend_from_slice(&self.o_cnt.to_le_bytes()); out } + + /// Get the number of subkeys this schema allocates + pub fn subkey_count(&self) -> usize { + self.o_cnt as usize + } } /// Simple DHT Schema (SMPL) Member @@ -2469,6 +2475,7 @@ pub struct DHTSchemaSMPL { } impl DHTSchemaSMPL { + /// Build the data representation of the schema pub fn compile(&self) -> Vec { let mut out = Vec::::with_capacity(6 + (self.members.len() * (PUBLIC_KEY_LENGTH + 2))); // kind @@ -2484,6 +2491,13 @@ impl DHTSchemaSMPL { } out } + + /// Get the number of subkeys this schema allocates + pub fn subkey_count(&self) -> usize { + self.members + .iter() + .fold(o_cnt as usize, |acc, x| acc + (x.m_cnt as usize)) + } } /// Enum over all the supported DHT Schemas @@ -2505,12 +2519,21 @@ impl DHTSchema { DHTSchema::SMPL(DHTSchemaSMPL { o_cnt, members }) } + /// Build the data representation of the schema pub fn compile(&self) -> Vec { match self { DHTSchema::DFLT(d) => d.compile(), DHTSchema::SMPL(s) => s.compile(), } } + + /// Get the number of subkeys this schema allocates + pub fn subkey_count(&self) -> usize { + match self { + DHTSchema::DFLT(d) => d.subkey_count(), + DHTSchema::SMPL(s) => s.subkey_count(), + } + } } /// DHT Key Descriptor