diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index 3884b4c1..54676493 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -338,7 +338,7 @@ struct ValueData @0xb4b7416f169f2a3d { struct OperationGetValueQ @0xf88a5b6da5eda5d0 { key @0 :TypedKey; # the location of the value - subkey @1 :Subkey; # the index of the subkey (0 for the default subkey) + subkey @1 :Subkey; # the index of the subkey wantSchema @2 :bool; # whether or not to include the schema for the key } diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 9fe53969..245d6cd2 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -1,6 +1,6 @@ mod record_store; -mod value_record; mod record_store_limits; +mod value_record; use record_store::*; use record_store_limits::*; use value_record::*; @@ -58,7 +58,6 @@ impl StorageManager { fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { RecordStoreLimits { - record_cache_size: todo!(), subkey_cache_size: todo!(), max_records: None, max_subkey_cache_memory_mb: Some(xxx), @@ -68,11 +67,10 @@ impl StorageManager { fn remote_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { RecordStoreLimits { - record_cache_size: todo!(), subkey_cache_size: todo!(), max_records: Some(xxx), max_subkey_cache_memory_mb: Some(xxx), - max_disk_space_mb: Some(xxx) + max_disk_space_mb: Some(xxx), } } @@ -84,7 +82,6 @@ impl StorageManager { block_store: BlockStore, rpc_processor: RPCProcessor, ) -> StorageManager { - StorageManager { unlocked_inner: Arc::new(Self::new_unlocked_inner( config, @@ -94,7 +91,7 @@ impl StorageManager { block_store, rpc_processor, )), - inner: Arc::new(Mutex::new(Self::new_inner())) + inner: Arc::new(Mutex::new(Self::new_inner())), } } @@ -105,8 +102,16 @@ impl StorageManager { let local_limits = Self::local_limits_from_config(config.clone()); let remote_limits = Self::remote_limits_from_config(config.clone()); - inner.local_record_store = Some(RecordStore::new(self.unlocked_inner.table_store.clone(), "local", local_limits)); - inner.remote_record_store = Some(RecordStore::new(self.unlocked_inner.table_store.clone(), "remote", remote_limits)); + inner.local_record_store = Some(RecordStore::new( + self.unlocked_inner.table_store.clone(), + "local", + local_limits, + )); + inner.remote_record_store = Some(RecordStore::new( + self.unlocked_inner.table_store.clone(), + "remote", + remote_limits, + )); Ok(()) } @@ -120,15 +125,17 @@ impl StorageManager { debug!("finished storage manager shutdown"); } - async fn add_value_record(&self, key: TypedKey, record: ValueRecord) -> EyreResult<()> { + async fn new_local_record(&self, key: TypedKey, record: ValueRecord) -> EyreResult<()> { // add value record to record store let mut inner = self.inner.lock(); - inner.record_store. + let Some(local_record_store) = inner.local_record_store else { + apibail_generic!("not initialized"); + + }; + local_record_store.new_record(key, record) } - /// Creates a new DHT value with a specified crypto kind and schema - /// Returns the newly allocated DHT Key if successful. - pub async fn create_value( + pub async fn create_record( &self, kind: CryptoKind, schema: &DHTSchema, @@ -144,60 +151,50 @@ impl StorageManager { let key = TypedKey::new(kind, keypair.key); let secret = keypair.secret; - // Add value record - let record = ValueRecord::new(Some(secret), schema, safety_selection); - self.add_value_record(key, record) + // Add new local value record + let cur_ts = get_aligned_timestamp(); + let record = ValueRecord::new(cur_ts, Some(secret), schema, safety_selection); + self.new_local_record(key, record) .await .map_err(VeilidAPIError::internal)?; Ok(key) } - /// Opens a DHT value at a specific key. Associates an owner secret if one is provided. - /// Returns the DHT key descriptor for the opened key if successful - /// Value may only be opened or created once. To re-open with a different routing context, first close the value. - pub async fn open_value( + pub async fn open_record( key: TypedKey, secret: Option, safety_selection: SafetySelection, - ) -> Result { + ) -> Result { unimplemented!(); } - /// Closes a DHT value at a specific key that was opened with create_value or open_value. - /// Closing a value allows you to re-open it with a different routing context - pub async fn close_value(key: TypedKey) -> Result<(), VeilidAPIError> { + pub async fn close_record(key: TypedKey) -> Result<(), VeilidAPIError> { + unimplemented!(); + } + + pub async fn delete_value(key: TypedKey) -> Result<(), VeilidAPIError> { unimplemented!(); } - /// Gets the latest value of a subkey from the network - /// Returns the possibly-updated value data of the subkey pub async fn get_value( &self, key: TypedKey, subkey: ValueSubkey, force_refresh: bool, - ) -> Result { - unimplemented!(); - } - - /// Pushes a changed subkey value to the network - /// Returns None if the value was successfully put - /// Returns Some(newer_value) if the value put was older than the one available on the network - pub async fn set_value( - &self, - key: TypedKey, - subkey: ValueSubkey, - value_data: ValueData, ) -> Result, VeilidAPIError> { unimplemented!(); } - /// Watches changes to an opened or created value - /// Changes to subkeys within the subkey range are returned via a ValueChanged callback - /// If the subkey range is empty, all subkey changes are considered - /// Expiration can be infinite to keep the watch for the maximum amount of time - /// Return value upon success is the amount of time allowed for the watch + pub async fn set_value( + &self, + key: TypedKey, + subkey: ValueSubkey, + data: Vec, + ) -> Result, VeilidAPIError> { + unimplemented!(); + } + pub async fn watch_value( &self, key: TypedKey, @@ -208,8 +205,6 @@ impl StorageManager { unimplemented!(); } - /// Cancels a watch early - /// This is a convenience function that cancels watching all subkeys in a range pub async fn cancel_watch_value( &self, key: TypedKey, diff --git a/veilid-core/src/storage_manager/record_store.rs b/veilid-core/src/storage_manager/record_store.rs index f7db44d7..e26a5203 100644 --- a/veilid-core/src/storage_manager/record_store.rs +++ b/veilid-core/src/storage_manager/record_store.rs @@ -1,9 +1,15 @@ +/// RecordStore +/// Keeps an LRU cache of dht keys and their associated subkey valuedata. +/// Instances of this store are used for 'local' (persistent) and 'remote' (ephemeral) dht key storage. +/// This store does not perform any validation on the schema, and all ValueRecordData passed in must have been previously validated. +/// Uses an in-memory store for the records, backed by the TableStore. Subkey data is LRU cached and rotated out by a limits policy, +/// and backed to the TableStore for persistence. use super::*; use hashlink::LruCache; pub type RecordIndex = u32; -#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] struct RecordIndexKey { pub key: TypedKey, } @@ -30,7 +36,7 @@ impl TryFrom<&[u8]> for RecordIndexKey { } } -#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] struct SubkeyCacheKey { pub key: TypedKey, pub subkey: ValueSubkey, @@ -71,6 +77,9 @@ pub struct RecordStore { subkey_table: Option, record_index: LruCache, subkey_cache: LruCache, + + dead_records: Vec<(RecordIndexKey, ValueRecord)>, + changed_records: HashSet<(RecordIndexKey, Timestamp)>, } impl RecordStore { @@ -84,6 +93,8 @@ impl RecordStore { subkey_table: None, record_index: LruCache::new(limits.max_records.unwrap_or(usize::MAX)), subkey_cache: LruCache::new(subkey_cache_size), + dead_records: Vec::new(), + changed_records: HashSet::new(), } } @@ -118,145 +129,163 @@ impl RecordStore { }) } - // Delete dead keys - if !dead_records.empty() { - let rt_xact = record_table.transact(); - let st_xact = subkey_table.transact(); - for (k, v) in dead_records { - // Delete record - rt_xact.delete(0, &k.bytes()); - - // Delete subkeys - let subkey_count = v.subkey_count(); - for sk in 0..subkey_count { - let sck = SubkeyCacheKey { - key: k.key, - subkey: sk, - }; - st_xact.delete(0, &sck.bytes())?; - } - } - rt_xact.commit().await?; - st_xact.commit().await?; - } - self.record_table = Some(record_table); self.subkey_table = Some(record_table); Ok(()) } - fix up new record + fn add_dead_record(&mut self, key: RecordIndexKey, record: ValueRecord) { + self.dead_records.push((key, record)); + } - pub fn new_record(&mut self, key: TypedKey, record: ValueRecord) -> EyreResult<()> { + fn mark_record_changed(&mut self, key: RecordIndexKey) { + let cur_ts = get_aligned_timestamp(); + self.changed_records.insert((key, cur_ts)); + } + + async fn purge_dead_records(&mut self) { + // Delete dead keys + if self.dead_records.empty() { + return; + } + + let rt_xact = record_table.transact(); + let st_xact = subkey_table.transact(); + let mut dead_records = mem::take(&mut self.dead_records); + for (k, v) in dead_records { + // Delete record + rt_xact.delete(0, &k.bytes()); + + // Delete subkeys + let subkey_count = v.subkey_count(); + for sk in 0..subkey_count { + // From table + let sck = SubkeyCacheKey { + key: k.key, + subkey: sk, + }; + st_xact.delete(0, &sck.bytes())?; + + // From cache + self.subkey_cache.remove(&sck); + } + } + if let Err(e) = rt_xact.commit().await { + log_stor!(error "failed to commit record table transaction: {}", e); + } + if let Err(e) = st_xact.commit().await { + log_stor!(error "failed to commit subkey table transaction: {}", e); + } + } + + async fn flush_records(&mut self) { + // touch records + if self.changed_records.empty() { + return; + } + + let rt_xact = record_table.transact(); + let st_xact = subkey_table.transact(); + let mut changed_records = mem::take(&mut self.changed_records); + for (rik, ts) in changed_records { + // Flush record and changed subkeys + } + if let Err(e) = rt_xact.commit().await { + log_stor!(error "failed to commit record table transaction: {}", e); + } + if let Err(e) = st_xact.commit().await { + log_stor!(error "failed to commit subkey table transaction: {}", e); + } + } + + pub async fn tick(&mut self, last_ts: Timestamp, cur_ts: Timestamp) { + self.flush_records().await; + self.purge_dead_records().await; + } + + pub fn new_record(&mut self, key: TypedKey, record: ValueRecord) -> Result<(), VeilidAPIError> { if self.with_record(key, |_| {})?.is_some() { - bail!("record already exists"); + apibail_generic!("record already exists"); } // Get record table let Some(record_table) = self.record_table.clone() else { - bail!("record store not initialized"); + apibail_internal!("record store not initialized"); }; // Save to record table - record_table.store_rkyv(0, &key, &r).await?; + record_table + .store_rkyv(0, &key, &r) + .await + .map_err(VeilidAPIError::internal)?; // Cache it - self.record_cache.insert(key, value, |_| {}); + self.record_cache.insert(key, value, |k, v| { + self.add_dead_record(k, v); + }); Ok(()) } - pub fn with_record(&mut self, key: TypedKey, f: F) -> EyreResult> + pub fn with_record(&mut self, key: TypedKey, f: F) -> Option where - F: FnOnce(&mut RecordStore, TypedKey, &ValueRecord) -> R, + F: FnOnce(&ValueRecord) -> R, { - // Get record table - let Some(record_table) = self.record_table.clone() else { - bail!("record store not initialized"); - }; - - // If record exists in cache, use that + // Get record from index let rck = RecordIndexKey { key }; - if let Some(r) = self.record_cache.get(&rck) { + if let Some(r) = self.record_index.get_mut(&rck) { + // Touch + r.touch(get_aligned_timestamp()); + self.mark_record_changed(&rck); + // Callback - return Ok(Some(f(self, key, r))); + return Some(f(key, r)); } - // If not in cache, try to pull from table store - let k = rck.bytes(); - if let Some(r) = record_table.load_rkyv(0, &k)? { - // Callback - let out = f(self, key, &r); - - // Add to cache, do nothing with lru out - self.record_cache.insert(rck, r, |_| {}); - - return Ok(Some(out)); - }; - - return Ok(None); + None } - pub fn with_record_mut(&mut self, key: TypedKey, f: F) -> EyreResult> - where - F: FnOnce(&mut RecordStore, TypedKey, &mut ValueRecord) -> R, - { - // Get record table - let Some(record_table) = self.record_table.clone() else { - bail!("record store not initialized"); - }; - - // If record exists in cache, use that - let rck = RecordIndexKey { key }; - if let Some(r) = self.record_cache.get_mut(&rck) { - // Callback - return Ok(Some(f(self, key, r))); - } - // If not in cache, try to pull from table store - let k = rck.bytes(); - if let Some(r) = record_table.load_rkyv(0, &k)? { - // Callback - let out = f(self, key, &mut r); - - // Save changes back to record table - record_table.store_rkyv(0, &k, &r).await?; - - // Add to cache, do nothing with lru out - self.record_cache.insert(rck, r, |_| {}); - - return Ok(Some(out)); - }; - - Ok(None) - } - - pub fn with_subkey( + pub fn get_subkey( &mut self, key: TypedKey, subkey: ValueSubkey, - f: F, - ) -> EyreResult> - where - F: FnOnce(&mut RecordStore, TypedKey, ValueSubkey, &ValueRecordData) -> R, - { + ) -> Result, VeilidAPIError> { + // record from index + let rck = RecordIndexKey { key }; + let Some(r) = self.record_index.get_mut(&rck) else { + apibail_invalid_argument!("no record at this key", "key", key); + }; + + // Touch + r.touch(get_aligned_timestamp()); + self.mark_record_changed(&rck); + + // Check if the subkey is in range + if subkey >= r.subkey_count() { + apibail_invalid_argument!("subkey out of range", "subkey", subkey); + } + // Get subkey table let Some(subkey_table) = self.subkey_table.clone() else { - bail!("record store not initialized"); + apibail_internal!("record store not initialized"); }; // If subkey exists in subkey cache, use that let skck = SubkeyCacheKey { key, subkey }; - if let Some(rd) = self.subkey_cache.get(&skck) { - // Callback - return Ok(Some(f(self, key, subkey, rd))); + if let Some(rd) = self.subkey_cache.get_mut(&skck) { + let out = rd.clone(); + + return Ok(Some(out)); } // If not in cache, try to pull from table store let k = skck.bytes(); - if let Some(rd) = subkey_table.load_rkyv(0, &k)? { - // Callback - let out = f(self, key, subkey, &rd); + if let Some(rd) = subkey_table + .load_rkyv::(0, &k) + .map_err(VeilidAPIError::internal)? + { + let out = rd.clone(); // Add to cache, do nothing with lru out - self.subkey_cache.insert(skck, r, |_| {}); + self.subkey_cache.insert(skck, rd, |_| {}); return Ok(Some(out)); }; @@ -264,41 +293,52 @@ impl RecordStore { return Ok(None); } - pub fn with_subkey_mut( + pub fn set_subkey( &mut self, key: TypedKey, subkey: ValueSubkey, - f: F, - ) -> EyreResult> - where - F: FnOnce(&mut RecordStore, TypedKey, ValueSubkey, &mut ValueRecord) -> R, - { - // Get record table - let Some(subkey_table) = self.subkey_table.clone() else { - bail!("record store not initialized"); + data: ValueRecordData, + ) -> Result<(), VeilidAPIError> { + // Get record from index + let rck = RecordIndexKey { key }; + let Some(r) = self.record_index.get_mut(&rck) else { + apibail_invalid_argument!("no record at this key", "key", key); }; - // If subkey exists in cache, use that - let skck = SubkeyCacheKey { key, subkey }; - if let Some(rd) = self.subkey_cache.get_mut(&skck) { - // Callback - return Ok(Some(f(self, key, subkey, rd))); + // Touch + r.touch(get_aligned_timestamp()); + self.mark_record_changed(&rck); + + // Check if the subkey is in range + if subkey >= r.subkey_count() { + apibail_invalid_argument!("subkey out of range", "subkey", subkey); } - // If not in cache, try to pull from table store - let k = skck.bytes(); - if let Some(rd) = subkey_table.load_rkyv(0, &k)? { - // Callback - let out = f(self, key, subkey, &mut rd); - // Save changes back to record table - subkey_table.store_rkyv(0, &k, &rd).await?; - - // Add to cache, do nothing with lru out - self.subkey_cache.insert(key, r, |_| {}); - - return Ok(Some(out)); + // Get subkey table + let Some(subkey_table) = self.subkey_table.clone() else { + apibail_internal!("record store not initialized"); }; - Ok(None) + // Write to subkey cache + let skck = SubkeyCacheKey { key, subkey }; + if let Some(rd) = self.subkey_cache.insert(skck, data, |_, _| {}) { + return Ok(Some(out)); + } + + +xxx do we flush this now or queue it? + + // Write subkey + // let k = skck.bytes(); + // if let Some(rd) = subkey_table.load_rkyv::(0, &k)? { + // let out = rd.data.clone(); + + // // Add to cache, do nothing with lru out + // self.subkey_cache.insert(skck, rd, |_| {}); + + // return Ok(Some(out)); + // }; + + return Ok(None); } } diff --git a/veilid-core/src/storage_manager/value_record.rs b/veilid-core/src/storage_manager/value_record.rs index ae460424..e3500c10 100644 --- a/veilid-core/src/storage_manager/value_record.rs +++ b/veilid-core/src/storage_manager/value_record.rs @@ -14,8 +14,8 @@ use super::*; )] #[archive_attr(repr(C), derive(CheckBytes))] pub struct ValueRecordData { - data: ValueData, - signature: Signature, + pub data: ValueData, + pub signature: Signature, } #[derive( diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index 5ca61c6e..8a2b2d69 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -195,64 +195,74 @@ impl RoutingContext { } /////////////////////////////////// - /// DHT Values + /// DHT Records - /// Creates a new DHT value with a specified crypto kind and schema - /// Returns the newly allocated DHT Key if successful. - pub async fn create_value( + /// Creates a new DHT record a specified crypto kind and schema + /// Returns the newly allocated DHT record's key if successful. The records is considered 'open' after the create operation succeeds. + pub async fn create_dht_record( &self, kind: CryptoKind, schema: &DHTSchema, ) -> Result { let storage_manager = self.api.storage_manager()?; storage_manager - .create_value(kind, schema, self.unlocked_inner.safety_selection) + .create_record(kind, schema, self.unlocked_inner.safety_selection) .await } - /// Opens a DHT value at a specific key. Associates a secret if one is provided to provide writer capability. - /// Returns the DHT key descriptor for the opened key if successful - /// Value may only be opened or created once. To re-open with a different routing context, first close the value. - pub async fn open_value( + /// Opens a DHT record at a specific key. Associates a secret if one is provided to provide writer capability. + /// Returns the DHT record descriptor for the opened record if successful + /// Records may only be opened or created . To re-open with a different routing context, first close the value. + pub async fn open_dht_record( key: TypedKey, secret: Option, - ) -> Result { + ) -> Result { let storage_manager = self.api.storage_manager()?; storage_manager - .open_value(key, secret, self.unlocked_inner.safety_selection) + .open_record(key, secret, self.unlocked_inner.safety_selection) .await } - /// Closes a DHT value at a specific key that was opened with create_value or open_value. - /// Closing a value allows you to re-open it with a different routing context - pub async fn close_value(key: TypedKey) -> Result<(), VeilidAPIError> { + /// Closes a DHT record at a specific key that was opened with create_dht_record or open_dht_record. + /// Closing a record allows you to re-open it with a different routing context + pub async fn close_dht_record(key: TypedKey) -> Result<(), VeilidAPIError> { let storage_manager = self.api.storage_manager()?; - storage_manager.close_value(key).await + storage_manager.close_record(key).await } - /// Gets the latest value of a subkey from the network - /// Returns the possibly-updated value data of the subkey - pub async fn get_value( + /// Deletes a DHT record at a specific key. If the record is opened, it must be closed before it is deleted. + /// Deleting a record does not delete it from the network immediately, but will remove the storage of the record + /// locally, and will prevent its value from being refreshed on the network by this node. + pub async fn delete_dht_record(key: TypedKey) -> Result<(), VeilidAPIError> { + let storage_manager = self.api.storage_manager()?; + storage_manager.delete_record(key).await + } + + /// Gets the latest value of a subkey + /// May pull the latest value from the network, but by settings 'force_refresh' you can force a network data refresh + /// Returns None if the value subkey has not yet been set + /// Returns Some(data) if the value subkey has valid data + pub async fn get_dht_value( &self, key: TypedKey, subkey: ValueSubkey, force_refresh: bool, - ) -> Result { + ) -> Result, VeilidAPIError> { let storage_manager = self.api.storage_manager()?; storage_manager.get_value(key, subkey, force_refresh).await } /// Pushes a changed subkey value to the network /// Returns None if the value was successfully put - /// Returns Some(newer_value) if the value put was older than the one available on the network - pub async fn set_value( + /// Returns Some(data) if the value put was older than the one available on the network + pub async fn set_dht_value( &self, key: TypedKey, subkey: ValueSubkey, - value_data: ValueData, + data: Vec, ) -> Result, VeilidAPIError> { let storage_manager = self.api.storage_manager()?; - storage_manager.set_value(key, subkey, value_data).await + storage_manager.set_value(key, subkey, data).await } /// Watches changes to an opened or created value @@ -260,7 +270,7 @@ impl RoutingContext { /// If the subkey range is empty, all subkey changes are considered /// Expiration can be infinite to keep the watch for the maximum amount of time /// Return value upon success is the amount of time allowed for the watch - pub async fn watch_value( + pub async fn watch_dht_values( &self, key: TypedKey, subkeys: &[ValueSubkeyRange], @@ -275,7 +285,7 @@ impl RoutingContext { /// Cancels a watch early /// This is a convenience function that cancels watching all subkeys in a range - pub async fn cancel_watch_value( + pub async fn cancel_dht_watch( &self, key: TypedKey, subkeys: &[ValueSubkeyRange], diff --git a/veilid-core/src/veilid_api/types.rs b/veilid-core/src/veilid_api/types.rs index 5383afa9..c8e5baab 100644 --- a/veilid-core/src/veilid_api/types.rs +++ b/veilid-core/src/veilid_api/types.rs @@ -353,12 +353,15 @@ pub struct VeilidState { )] #[archive_attr(repr(C), derive(CheckBytes))] pub struct ValueData { - pub seq: ValueSeqNum, - pub data: Vec, - pub writer: PublicKey, + seq: ValueSeqNum, + data: Vec, + writer: PublicKey, } impl ValueData { + pub const MAX_LEN: usize = 32768; + pub fn new(data: Vec, writer: PublicKey) -> Self { + assert!(data.len() <= Self::MAX_LEN); Self { seq: 0, data, @@ -366,11 +369,30 @@ impl ValueData { } } pub fn new_with_seq(seq: ValueSeqNum, data: Vec, writer: PublicKey) -> Self { + assert!(data.len() <= Self::MAX_LEN); Self { seq, data, writer } } - pub fn change(&mut self, data: Vec) { - self.data = data; + + pub fn seq(&self) -> ValueSeqNum { + self.seq + } + + pub fn writer(&self) -> PublicKey { + self.writer + } + + pub fn data(&self) -> &[u8] { + &self.data + } + + pub fn with_data_mut(&mut self, f: F) + where + F: FnOnce(&mut Vec) -> R, + { + let out = f(&mut self.data); + assert(self.data.len() <= Self::MAX_LEN); self.seq += 1; + out } } @@ -2444,7 +2466,7 @@ impl DHTSchemaDFLT { out } - /// Get the number of subkeys this schema allocates + /// Get the number of subkeys this schema allocates pub fn subkey_count(&self) -> usize { self.o_cnt as usize } @@ -2492,7 +2514,7 @@ impl DHTSchemaSMPL { out } - /// Get the number of subkeys this schema allocates + /// Get the number of subkeys this schema allocates pub fn subkey_count(&self) -> usize { self.members .iter() @@ -2527,7 +2549,7 @@ impl DHTSchema { } } - /// Get the number of subkeys this schema allocates + /// Get the number of subkeys this schema allocates pub fn subkey_count(&self) -> usize { match self { DHTSchema::DFLT(d) => d.subkey_count(), @@ -2536,12 +2558,12 @@ impl DHTSchema { } } -/// DHT Key Descriptor +/// DHT Record Descriptor #[derive( Debug, Clone, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize, )] #[archive_attr(repr(C), derive(CheckBytes))] -pub struct DHTDescriptor { +pub struct DHTRecordDescriptor { pub owner: PublicKey, pub schema: DHTSchema, } diff --git a/veilid-tools/src/log_thru.rs b/veilid-tools/src/log_thru.rs index 587a8393..89ef8233 100644 --- a/veilid-tools/src/log_thru.rs +++ b/veilid-tools/src/log_thru.rs @@ -1,6 +1,6 @@ // LogThru // Pass errors through and log them simultaneously via map_err() -// Also contains common log facilities (net, rpc, rtab, pstore, crypto, etc ) +// Also contains common log facilities (net, rpc, rtab, stor, pstore, crypto, etc ) use super::*; @@ -123,6 +123,42 @@ macro_rules! log_rtab { } } +#[macro_export] +macro_rules! log_stor { + (error $text:expr) => { error!( + target: "stor", + "{}", + $text, + )}; + (error $fmt:literal, $($arg:expr),+) => { + error!(target:"stor", $fmt, $($arg),+); + }; + (warn $text:expr) => { warn!( + target: "stor", + "{}", + $text, + )}; + (warn $fmt:literal, $($arg:expr),+) => { + warn!(target:"stor", $fmt, $($arg),+); + }; + (debug $text:expr) => { debug!( + target: "stor", + "{}", + $text, + )}; + (debug $fmt:literal, $($arg:expr),+) => { + debug!(target:"stor", $fmt, $($arg),+); + }; + ($text:expr) => {trace!( + target: "stor", + "{}", + $text, + )}; + ($fmt:literal, $($arg:expr),+) => { + trace!(target:"stor", $fmt, $($arg),+); + } +} + #[macro_export] macro_rules! log_pstore { (error $text:expr) => { error!( @@ -216,6 +252,18 @@ macro_rules! logthru_rtab { } } #[macro_export] +macro_rules! logthru_stor { + ($($level:ident)?) => { + logthru!($($level)? "stor") + }; + ($($level:ident)? $text:literal) => { + logthru!($($level)? "stor", $text) + }; + ($($level:ident)? $fmt:literal, $($arg:expr),+) => { + logthru!($($level)? "stor", $fmt, $($arg),+) + } +} +#[macro_export] macro_rules! logthru_pstore { ($($level:ident)?) => { logthru!($($level)? "pstore")