diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index 54676493..e9352711 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -321,7 +321,7 @@ struct ValueData @0xb4b7416f169f2a3d { data @1 :Data; # value or subvalue contents owner @2 :PublicKey; # the public key of the owner writer @3 :PublicKey; # the public key of the writer - signature @5 :Signature; # signature of data at this subkey, using the writer key (which may be the same as the owner key) + signature @4 :Signature; # signature of data at this subkey, using the writer key (which may be the same as the owner key) # signature covers: # * ownerKey # * subkey @@ -329,7 +329,7 @@ struct ValueData @0xb4b7416f169f2a3d { # * data # signature does not need to cover schema because schema is validated upon every set # so the data either fits, or it doesn't. - schema @6 :Data; # (optional) the schema in use + schema @5 :Data; # (optional) the schema in use # If not set and seqnum == 0, uses the default schema. # If not set and If seqnum != 0, the schema must have been set prior and no other schema may be used, but this field may be eliminated to save space # Changing this after key creation is not supported as it would change the dht key @@ -339,7 +339,7 @@ struct ValueData @0xb4b7416f169f2a3d { struct OperationGetValueQ @0xf88a5b6da5eda5d0 { key @0 :TypedKey; # the location of the value subkey @1 :Subkey; # the index of the subkey - wantSchema @2 :bool; # whether or not to include the schema for the key + wantSchema @2 :Bool; # whether or not to include the schema for the key } @@ -352,8 +352,8 @@ struct OperationGetValueA @0xd896bb46f2e0249f { struct OperationSetValueQ @0xbac06191ff8bdbc5 { key @0 :TypedKey; # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ] - subkey @3 :Subkey; # the index of the subkey - value @4 :ValueData; # value or subvalue contents (older or equal seq number gets dropped) + subkey @1 :Subkey; # the index of the subkey + value @2 :ValueData; # value or subvalue contents (older or equal seq number gets dropped) } struct OperationSetValueA @0x9378d0732dc95be2 { diff --git a/veilid-core/src/core_context.rs b/veilid-core/src/core_context.rs index 7f884aca..36e307f1 100644 --- a/veilid-core/src/core_context.rs +++ b/veilid-core/src/core_context.rs @@ -195,8 +195,8 @@ pub struct VeilidCoreContext { pub table_store: TableStore, pub block_store: BlockStore, pub crypto: Crypto, - pub storage_manager: StorageManager, pub attachment_manager: AttachmentManager, + pub storage_manager: StorageManager, } impl VeilidCoreContext { @@ -248,8 +248,8 @@ impl VeilidCoreContext { table_store: sc.table_store.unwrap(), block_store: sc.block_store.unwrap(), crypto: sc.crypto.unwrap(), - storage_manager: sc.storage_manager.unwrap(), attachment_manager: sc.attachment_manager.unwrap(), + storage_manager: sc.storage_manager.unwrap(), }) } @@ -262,8 +262,8 @@ impl VeilidCoreContext { self.table_store, self.block_store, self.crypto, - self.storage_manager, self.attachment_manager, + self.storage_manager, ); sc.shutdown().await; } diff --git a/veilid-core/src/crypto/byte_array_types.rs b/veilid-core/src/crypto/byte_array_types.rs index 3a404698..ff75900c 100644 --- a/veilid-core/src/crypto/byte_array_types.rs +++ b/veilid-core/src/crypto/byte_array_types.rs @@ -238,9 +238,9 @@ macro_rules! byte_array_type { Self::try_decode(value) } } - impl TryFrom(&[u8]) for $name { + impl TryFrom<&[u8]> for $name { type Error = VeilidAPIError; - pub fn try_from(v: &[u8]) -> Result { + fn try_from(v: &[u8]) -> Result { let vl = v.len(); Ok(Self { bytes: v.try_into().map_err(|_| { diff --git a/veilid-core/src/storage_manager/keys.rs b/veilid-core/src/storage_manager/keys.rs new file mode 100644 index 00000000..547e4aa9 --- /dev/null +++ b/veilid-core/src/storage_manager/keys.rs @@ -0,0 +1,63 @@ +use super::*; + +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct RecordTableKey { + pub key: TypedKey, +} +impl RecordTableKey { + pub fn bytes(&self) -> [u8; PUBLIC_KEY_LENGTH + 4] { + let mut bytes = [0u8; PUBLIC_KEY_LENGTH + 4]; + bytes[0..4].copy_from_slice(&self.key.kind.0); + bytes[4..PUBLIC_KEY_LENGTH + 4].copy_from_slice(&self.key.value.bytes); + bytes + } +} + +impl TryFrom<&[u8]> for RecordTableKey { + type Error = EyreReport; + fn try_from(bytes: &[u8]) -> Result { + if bytes.len() != PUBLIC_KEY_LENGTH + 4 { + bail!("invalid bytes length"); + } + let kind = FourCC::try_from(&bytes[0..4]).wrap_err("invalid kind")?; + let value = + PublicKey::try_from(&bytes[4..PUBLIC_KEY_LENGTH + 4]).wrap_err("invalid value")?; + let key = TypedKey::new(kind, value); + Ok(RecordTableKey { key }) + } +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct SubkeyTableKey { + pub key: TypedKey, + pub subkey: ValueSubkey, +} +impl SubkeyTableKey { + pub fn bytes(&self) -> [u8; PUBLIC_KEY_LENGTH + 4 + 4] { + let mut bytes = [0u8; PUBLIC_KEY_LENGTH + 4 + 4]; + bytes[0..4].copy_from_slice(&self.key.kind.0); + bytes[4..PUBLIC_KEY_LENGTH + 4].copy_from_slice(&self.key.value.bytes); + bytes[PUBLIC_KEY_LENGTH + 4..PUBLIC_KEY_LENGTH + 4 + 4] + .copy_from_slice(&self.subkey.to_le_bytes()); + bytes + } +} +impl TryFrom<&[u8]> for SubkeyTableKey { + type Error = EyreReport; + fn try_from(bytes: &[u8]) -> Result { + if bytes.len() != PUBLIC_KEY_LENGTH + 4 { + bail!("invalid bytes length"); + } + let kind = FourCC::try_from(&bytes[0..4]).wrap_err("invalid kind")?; + let value = + PublicKey::try_from(&bytes[4..PUBLIC_KEY_LENGTH + 4]).wrap_err("invalid value")?; + let subkey = ValueSubkey::from_le_bytes( + bytes[PUBLIC_KEY_LENGTH + 4..PUBLIC_KEY_LENGTH + 4 + 4] + .try_into() + .wrap_err("invalid subkey")?, + ); + + let key = TypedKey::new(kind, value); + Ok(SubkeyTableKey { key, subkey }) + } +} diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 245d6cd2..79e5369f 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -1,6 +1,9 @@ +mod keys; mod record_store; mod record_store_limits; mod value_record; + +use keys::*; use record_store::*; use record_store_limits::*; use value_record::*; @@ -8,6 +11,11 @@ use value_record::*; use super::*; use crate::rpc_processor::*; +/// The maximum size of a single subkey +const MAX_SUBKEY_SIZE: usize = ValueData::MAX_LEN; +/// The maximum total size of all subkeys of a record +const MAX_RECORD_DATA_SIZE: usize = 1_048_576; + /// Locked structure for storage manager struct StorageManagerInner { /// Records that have been 'created' or 'opened' by this node @@ -62,6 +70,8 @@ impl StorageManager { max_records: None, max_subkey_cache_memory_mb: Some(xxx), max_disk_space_mb: None, + max_subkey_size: MAX_SUBKEY_SIZE, + max_record_data_size: MAX_RECORD_DATA_SIZE, } } @@ -71,6 +81,8 @@ impl StorageManager { max_records: Some(xxx), max_subkey_cache_memory_mb: Some(xxx), max_disk_space_mb: Some(xxx), + max_subkey_size: MAX_SUBKEY_SIZE, + max_record_data_size: MAX_RECORD_DATA_SIZE, } } @@ -116,7 +128,7 @@ impl StorageManager { Ok(()) } - pub fn terminate(&self) { + pub async fn terminate(&self) { debug!("starting storage manager shutdown"); // Release the storage manager @@ -125,10 +137,14 @@ impl StorageManager { debug!("finished storage manager shutdown"); } - async fn new_local_record(&self, key: TypedKey, record: ValueRecord) -> EyreResult<()> { + async fn new_local_record( + &self, + key: TypedKey, + record: ValueRecord, + ) -> Result<(), VeilidAPIError> { // add value record to record store let mut inner = self.inner.lock(); - let Some(local_record_store) = inner.local_record_store else { + let Some(local_record_store) = inner.local_record_store.as_mut() else { apibail_generic!("not initialized"); }; @@ -138,7 +154,7 @@ impl StorageManager { pub async fn create_record( &self, kind: CryptoKind, - schema: &DHTSchema, + schema: DHTSchema, safety_selection: SafetySelection, ) -> Result { // Get cryptosystem @@ -162,6 +178,7 @@ impl StorageManager { } pub async fn open_record( + &self, key: TypedKey, secret: Option, safety_selection: SafetySelection, @@ -169,11 +186,11 @@ impl StorageManager { unimplemented!(); } - pub async fn close_record(key: TypedKey) -> Result<(), VeilidAPIError> { + pub async fn close_record(&self, key: TypedKey) -> Result<(), VeilidAPIError> { unimplemented!(); } - pub async fn delete_value(key: TypedKey) -> Result<(), VeilidAPIError> { + pub async fn delete_record(&self, key: TypedKey) -> Result<(), VeilidAPIError> { unimplemented!(); } @@ -195,7 +212,7 @@ impl StorageManager { unimplemented!(); } - pub async fn watch_value( + pub async fn watch_values( &self, key: TypedKey, subkeys: &[ValueSubkeyRange], @@ -205,7 +222,7 @@ impl StorageManager { unimplemented!(); } - pub async fn cancel_watch_value( + pub async fn cancel_watch_values( &self, key: TypedKey, subkeys: &[ValueSubkeyRange], diff --git a/veilid-core/src/storage_manager/record_store.rs b/veilid-core/src/storage_manager/record_store.rs index e26a5203..49f469eb 100644 --- a/veilid-core/src/storage_manager/record_store.rs +++ b/veilid-core/src/storage_manager/record_store.rs @@ -7,67 +7,6 @@ use super::*; use hashlink::LruCache; -pub type RecordIndex = u32; - -#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -struct RecordIndexKey { - pub key: TypedKey, -} -impl RecordIndexKey { - pub fn bytes(&self) -> [u8; PUBLIC_KEY_LENGTH + 4] { - let mut bytes = [0u8; PUBLIC_KEY_LENGTH + 4]; - bytes[0..4] = self.key.kind.0; - bytes[4..PUBLIC_KEY_LENGTH + 4] = self.key.value.bytes; - bytes - } -} - -impl TryFrom<&[u8]> for RecordIndexKey { - type Error = EyreReport; - fn try_from(bytes: &[u8]) -> Result { - if bytes.len() != PUBLIC_KEY_LENGTH + 4 { - bail!("invalid bytes length"); - } - let kind = FourCC::try_from(&bytes[0..4]).wrap_err("invalid kind")?; - let value = - PublicKey::try_from(&bytes[4..PUBLIC_KEY_LENGTH + 4]).wrap_err("invalid value")?; - let key = TypedKey::new(kind, value); - Ok(RecordIndexKey { key }) - } -} - -#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -struct SubkeyCacheKey { - pub key: TypedKey, - pub subkey: ValueSubkey, -} -impl SubkeyCacheKey { - pub fn bytes(&self) -> [u8; PUBLIC_KEY_LENGTH + 4 + 4] { - let mut bytes = [0u8; PUBLIC_KEY_LENGTH + 4 + 4]; - bytes[0..4] = self.key.kind.0; - bytes[4..PUBLIC_KEY_LENGTH + 4] = self.key.value.bytes; - bytes[PUBLIC_KEY_LENGTH + 4..PUBLIC_KEY_LENGTH + 4 + 4] = self.subkey.to_le_bytes(); - bytes - } -} -impl TryFrom<&[u8]> for SubkeyCacheKey { - type Error = EyreReport; - fn try_from(bytes: &[u8]) -> Result { - if bytes.len() != PUBLIC_KEY_LENGTH + 4 { - bail!("invalid bytes length"); - } - let kind = FourCC::try_from(&bytes[0..4]).wrap_err("invalid kind")?; - let value = - PublicKey::try_from(&bytes[4..PUBLIC_KEY_LENGTH + 4]).wrap_err("invalid value")?; - let subkey = - ValueSubkey::try_from(&bytes[PUBLIC_KEY_LENGTH + 4..PUBLIC_KEY_LENGTH + 4 + 4]) - .wrap_err("invalid subkey")?; - - let key = TypedKey::new(kind, value); - Ok(SubkeyCacheKey { key, subkey }) - } -} - pub struct RecordStore { table_store: TableStore, name: String, @@ -75,11 +14,11 @@ pub struct RecordStore { record_table: Option, subkey_table: Option, - record_index: LruCache, - subkey_cache: LruCache, + record_index: LruCache, + subkey_cache: LruCache, - dead_records: Vec<(RecordIndexKey, ValueRecord)>, - changed_records: HashSet<(RecordIndexKey, Timestamp)>, + dead_records: Vec<(RecordTableKey, ValueRecord)>, + changed_records: HashSet<(RecordTableKey, Timestamp)>, } impl RecordStore { @@ -110,11 +49,12 @@ impl RecordStore { // Pull record index from table into a vector to ensure we sort them let record_table_keys = record_table.get_keys(0)?; - let mut record_index_saved: Vec<(RecordIndexKey, ValueRecord)> = + let mut record_index_saved: Vec<(RecordTableKey, ValueRecord)> = Vec::with_capacity(record_table_keys.len()); for rtk in record_table_keys { if let Some(vr) = record_table.load_rkyv::(0, &rtk)? { - record_index_saved.push((rtk, vr)); + let rik = RecordTableKey::try_from(rtk.as_ref())?; + record_index_saved.push((rik, vr)); } } @@ -122,49 +62,51 @@ impl RecordStore { record_index_saved.sort_by(|a, b| a.1.last_touched().cmp(&b.1.last_touched())); let mut dead_records = Vec::new(); for ri in record_index_saved { - let rik = RecordIndexKey::try_from(&ri.0)?; - self.record_index.insert(rik, ri.1, |k, v| { + self.record_index.insert(ri.0, ri.1, |k, v| { // If the configuration change, we only want to keep the 'limits.max_records' records dead_records.push((k, v)); - }) + }); } self.record_table = Some(record_table); - self.subkey_table = Some(record_table); + self.subkey_table = Some(subkey_table); Ok(()) } - fn add_dead_record(&mut self, key: RecordIndexKey, record: ValueRecord) { + fn add_dead_record(&mut self, key: RecordTableKey, record: ValueRecord) { self.dead_records.push((key, record)); } - fn mark_record_changed(&mut self, key: RecordIndexKey) { + fn mark_record_changed(&mut self, key: RecordTableKey) { let cur_ts = get_aligned_timestamp(); self.changed_records.insert((key, cur_ts)); } async fn purge_dead_records(&mut self) { // Delete dead keys - if self.dead_records.empty() { + if self.dead_records.is_empty() { return; } + let record_table = self.record_table.clone().unwrap(); + let subkey_table = self.subkey_table.clone().unwrap(); + let rt_xact = record_table.transact(); let st_xact = subkey_table.transact(); - let mut dead_records = mem::take(&mut self.dead_records); + let dead_records = mem::take(&mut self.dead_records); for (k, v) in dead_records { // Delete record rt_xact.delete(0, &k.bytes()); // Delete subkeys - let subkey_count = v.subkey_count(); + let subkey_count = v.subkey_count() as u32; for sk in 0..subkey_count { // From table - let sck = SubkeyCacheKey { + let sck = SubkeyTableKey { key: k.key, subkey: sk, }; - st_xact.delete(0, &sck.bytes())?; + st_xact.delete(0, &sck.bytes()); // From cache self.subkey_cache.remove(&sck); @@ -184,18 +126,21 @@ impl RecordStore { return; } + let record_table = self.record_table.clone().unwrap(); + let subkey_table = self.subkey_table.clone().unwrap(); + let rt_xact = record_table.transact(); - let st_xact = subkey_table.transact(); let mut changed_records = mem::take(&mut self.changed_records); for (rik, ts) in changed_records { - // Flush record and changed subkeys + // Flush changed records + if let Some(r) = self.record_index.peek(&rik) { + record_table.store_rkyv(0, &rtk)?; + xxx + } } if let Err(e) = rt_xact.commit().await { log_stor!(error "failed to commit record table transaction: {}", e); } - if let Err(e) = st_xact.commit().await { - log_stor!(error "failed to commit subkey table transaction: {}", e); - } } pub async fn tick(&mut self, last_ts: Timestamp, cur_ts: Timestamp) { @@ -204,7 +149,8 @@ impl RecordStore { } pub fn new_record(&mut self, key: TypedKey, record: ValueRecord) -> Result<(), VeilidAPIError> { - if self.with_record(key, |_| {})?.is_some() { + let rik = RecordTableKey { key }; + if self.record_index.contains_key(&rik) { apibail_generic!("record already exists"); } @@ -215,7 +161,7 @@ impl RecordStore { // Save to record table record_table - .store_rkyv(0, &key, &r) + .store_rkyv(0, &rik, &r) .await .map_err(VeilidAPIError::internal)?; @@ -232,7 +178,7 @@ impl RecordStore { F: FnOnce(&ValueRecord) -> R, { // Get record from index - let rck = RecordIndexKey { key }; + let rck = RecordTableKey { key }; if let Some(r) = self.record_index.get_mut(&rck) { // Touch r.touch(get_aligned_timestamp()); @@ -250,7 +196,7 @@ impl RecordStore { subkey: ValueSubkey, ) -> Result, VeilidAPIError> { // record from index - let rck = RecordIndexKey { key }; + let rck = RecordTableKey { key }; let Some(r) = self.record_index.get_mut(&rck) else { apibail_invalid_argument!("no record at this key", "key", key); }; @@ -270,7 +216,7 @@ impl RecordStore { }; // If subkey exists in subkey cache, use that - let skck = SubkeyCacheKey { key, subkey }; + let skck = SubkeyTableKey { key, subkey }; if let Some(rd) = self.subkey_cache.get_mut(&skck) { let out = rd.clone(); @@ -299,8 +245,13 @@ impl RecordStore { subkey: ValueSubkey, data: ValueRecordData, ) -> Result<(), VeilidAPIError> { + // Check size limit for data + if data.data.len() > self.limits.max_subkey_size { + return Err(VeilidAPIError::generic("record subkey too large")); + } + // Get record from index - let rck = RecordIndexKey { key }; + let rck = RecordTableKey { key }; let Some(r) = self.record_index.get_mut(&rck) else { apibail_invalid_argument!("no record at this key", "key", key); }; @@ -319,26 +270,41 @@ impl RecordStore { apibail_internal!("record store not initialized"); }; - // Write to subkey cache - let skck = SubkeyCacheKey { key, subkey }; - if let Some(rd) = self.subkey_cache.insert(skck, data, |_, _| {}) { - return Ok(Some(out)); + // Get the previous subkey and ensure we aren't going over the record size limit + let mut prior_subkey_size = 0usize; + + // If subkey exists in subkey cache, use that + let skck = SubkeyTableKey { key, subkey }; + if let Some(rd) = self.subkey_cache.peek(&skck) { + prior_subkey_size = rd.data.data().len(); + } else { + // If not in cache, try to pull from table store + let k = skck.bytes(); + if let Some(rd) = subkey_table + .load_rkyv::(0, &k) + .map_err(VeilidAPIError::internal)? + { + prior_subkey_size = rd.data.data().len(); + } } - -xxx do we flush this now or queue it? + // Check new data size + let new_data_size = r.data_size() + data.data().len() - priod_subkey_size; + if new_data_size > self.limits.max_record_data_size { + return Err(VeilidAPIError::generic("dht record too large")); + } // Write subkey - // let k = skck.bytes(); - // if let Some(rd) = subkey_table.load_rkyv::(0, &k)? { - // let out = rd.data.clone(); + let k = skck.bytes(); + subkey_table.store_rkyv(0, &k, &data)?; - // // Add to cache, do nothing with lru out - // self.subkey_cache.insert(skck, rd, |_| {}); + // Write to subkey cache + let skck = SubkeyTableKey { key, subkey }; + self.subkey_cache.insert(skck, data, |_, _| {}); - // return Ok(Some(out)); - // }; + // Update record + r.set_data_size(new_data_size); - return Ok(None); + Ok(()) } } diff --git a/veilid-core/src/storage_manager/record_store_limits.rs b/veilid-core/src/storage_manager/record_store_limits.rs index 45601ba0..cabc00ea 100644 --- a/veilid-core/src/storage_manager/record_store_limits.rs +++ b/veilid-core/src/storage_manager/record_store_limits.rs @@ -1,14 +1,16 @@ -use super::*; - /// Configuration for the record store #[derive(Debug, Default, Copy, Clone)] pub struct RecordStoreLimits { /// Number of subkeys to keep in the memory cache - pub subkey_cache_size: u32, + pub subkey_cache_size: usize, + /// Maximum size of a subkey + pub max_subkey_size: usize, + /// Maximum total record data size: + pub max_record_data_size: usize, /// Limit on the total number of records in the table store - pub max_records: Option, + pub max_records: Option, /// Limit on the amount of subkey cache memory to use before evicting cache items - pub max_subkey_cache_memory_mb: Option, + pub max_subkey_cache_memory_mb: Option, /// Limit on the amount of disk space to use for subkey data - pub max_disk_space_mb: Option, + pub max_disk_space_mb: Option, } diff --git a/veilid-core/src/storage_manager/value_record.rs b/veilid-core/src/storage_manager/value_record.rs index e3500c10..ac5c0c56 100644 --- a/veilid-core/src/storage_manager/value_record.rs +++ b/veilid-core/src/storage_manager/value_record.rs @@ -1,4 +1,6 @@ use super::*; +use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize}; +use serde::*; #[derive( Clone, @@ -66,4 +68,12 @@ impl ValueRecord { pub fn last_touched(&self) -> Timestamp { self.last_touched_ts } + + pub fn set_data_size(&mut self, size: usize) { + self.data_size = size; + } + + pub fn data_size(&self) -> usize { + self.data_size + } } diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index 8a2b2d69..c61d26af 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -202,7 +202,7 @@ impl RoutingContext { pub async fn create_dht_record( &self, kind: CryptoKind, - schema: &DHTSchema, + schema: DHTSchema, ) -> Result { let storage_manager = self.api.storage_manager()?; storage_manager @@ -214,6 +214,7 @@ impl RoutingContext { /// Returns the DHT record descriptor for the opened record if successful /// Records may only be opened or created . To re-open with a different routing context, first close the value. pub async fn open_dht_record( + &self, key: TypedKey, secret: Option, ) -> Result { @@ -225,7 +226,7 @@ impl RoutingContext { /// Closes a DHT record at a specific key that was opened with create_dht_record or open_dht_record. /// Closing a record allows you to re-open it with a different routing context - pub async fn close_dht_record(key: TypedKey) -> Result<(), VeilidAPIError> { + pub async fn close_dht_record(&self, key: TypedKey) -> Result<(), VeilidAPIError> { let storage_manager = self.api.storage_manager()?; storage_manager.close_record(key).await } @@ -233,7 +234,7 @@ impl RoutingContext { /// Deletes a DHT record at a specific key. If the record is opened, it must be closed before it is deleted. /// Deleting a record does not delete it from the network immediately, but will remove the storage of the record /// locally, and will prevent its value from being refreshed on the network by this node. - pub async fn delete_dht_record(key: TypedKey) -> Result<(), VeilidAPIError> { + pub async fn delete_dht_record(&self, key: TypedKey) -> Result<(), VeilidAPIError> { let storage_manager = self.api.storage_manager()?; storage_manager.delete_record(key).await } @@ -291,7 +292,7 @@ impl RoutingContext { subkeys: &[ValueSubkeyRange], ) -> Result { let storage_manager = self.api.storage_manager()?; - storage_manager.cancel_watch_value(key, subkey).await + storage_manager.cancel_watch_values(key, subkeys).await } /////////////////////////////////// diff --git a/veilid-core/src/veilid_api/types.rs b/veilid-core/src/veilid_api/types.rs index c8e5baab..acdf0b15 100644 --- a/veilid-core/src/veilid_api/types.rs +++ b/veilid-core/src/veilid_api/types.rs @@ -385,12 +385,12 @@ impl ValueData { &self.data } - pub fn with_data_mut(&mut self, f: F) + pub fn with_data_mut(&mut self, f: F) -> R where F: FnOnce(&mut Vec) -> R, { let out = f(&mut self.data); - assert(self.data.len() <= Self::MAX_LEN); + assert!(self.data.len() <= Self::MAX_LEN); self.seq += 1; out } @@ -537,6 +537,12 @@ impl SafetySelection { } } +impl Default for SafetySelection { + fn default() -> Self { + Self::Unsafe(Sequencing::NoPreference) + } +} + /// Options for safety routes (sender privacy) #[derive( Copy, @@ -2456,11 +2462,14 @@ pub struct DHTSchemaDFLT { } impl DHTSchemaDFLT { + const FCC: [u8; 4] = *b"DFLT"; + const FIXED_SIZE: usize = 6; + /// Build the data representation of the schema pub fn compile(&self) -> Vec { - let mut out = Vec::::with_capacity(6); + let mut out = Vec::::with_capacity(Self::FIXED_SIZE); // kind - out.extend_from_slice(&FourCC::from_str("DFLT").unwrap().0); + out.extend_from_slice(&Self::FCC); // o_cnt out.extend_from_slice(&self.o_cnt.to_le_bytes()); out @@ -2472,6 +2481,22 @@ impl DHTSchemaDFLT { } } +impl TryFrom<&[u8]> for DHTSchemaDFLT { + type Error = VeilidAPIError; + fn try_from(b: &[u8]) -> Result { + if b.len() != Self::FIXED_SIZE { + apibail_generic!("invalid size"); + } + if &b[0..4] != &Self::FCC { + apibail_generic!("wrong fourcc"); + } + + let o_cnt = u16::from_le_bytes(b[4..6].try_into().map_err(VeilidAPIError::internal)?); + + Ok(Self { o_cnt }) + } +} + /// Simple DHT Schema (SMPL) Member #[derive( Debug, Clone, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize, @@ -2497,11 +2522,16 @@ pub struct DHTSchemaSMPL { } impl DHTSchemaSMPL { + const FCC: [u8; 4] = *b"SMPL"; + const FIXED_SIZE: usize = 6; + /// Build the data representation of the schema pub fn compile(&self) -> Vec { - let mut out = Vec::::with_capacity(6 + (self.members.len() * (PUBLIC_KEY_LENGTH + 2))); + let mut out = Vec::::with_capacity( + Self::FIXED_SIZE + (self.members.len() * (PUBLIC_KEY_LENGTH + 2)), + ); // kind - out.extend_from_slice(&FourCC::from_str("SMPL").unwrap().0); + out.extend_from_slice(&Self::FCC); // o_cnt out.extend_from_slice(&self.o_cnt.to_le_bytes()); // members @@ -2518,7 +2548,40 @@ impl DHTSchemaSMPL { pub fn subkey_count(&self) -> usize { self.members .iter() - .fold(o_cnt as usize, |acc, x| acc + (x.m_cnt as usize)) + .fold(self.o_cnt as usize, |acc, x| acc + (x.m_cnt as usize)) + } +} + +impl TryFrom<&[u8]> for DHTSchemaSMPL { + type Error = VeilidAPIError; + fn try_from(b: &[u8]) -> Result { + if b.len() != Self::FIXED_SIZE { + apibail_generic!("invalid size"); + } + if &b[0..4] != &Self::FCC { + apibail_generic!("wrong fourcc"); + } + if (b.len() - Self::FIXED_SIZE) % (PUBLIC_KEY_LENGTH + 2) != 0 { + apibail_generic!("invalid member length"); + } + + let o_cnt = u16::from_le_bytes(b[4..6].try_into().map_err(VeilidAPIError::internal)?); + + let members_len = (b.len() - Self::FIXED_SIZE) / (PUBLIC_KEY_LENGTH + 2); + let mut members: Vec = Vec::with_capacity(members_len); + for n in 0..members_len { + let mstart = Self::FIXED_SIZE + n * (PUBLIC_KEY_LENGTH + 2); + let m_key = PublicKey::try_from(&b[mstart..mstart + PUBLIC_KEY_LENGTH]) + .map_err(VeilidAPIError::internal)?; + let m_cnt = u16::from_le_bytes( + b[mstart + PUBLIC_KEY_LENGTH..mstart + PUBLIC_KEY_LENGTH + 2] + .try_into() + .map_err(VeilidAPIError::internal)?, + ); + members.push(DHTSchemaSMPLMember { m_key, m_cnt }); + } + + Ok(Self { o_cnt, members }) } } @@ -2558,6 +2621,29 @@ impl DHTSchema { } } +impl Default for DHTSchema { + fn default() -> Self { + Self::dflt(1) + } +} + +impl TryFrom<&[u8]> for DHTSchema { + type Error = VeilidAPIError; + fn try_from(b: &[u8]) -> Result { + if b.len() < 4 { + apibail_generic!("invalid size"); + } + let fcc: [u8; 4] = b[0..4].try_into().unwrap(); + match fcc { + DHTSchemaDFLT::FCC => Ok(DHTSchema::DFLT(DHTSchemaDFLT::try_from(b)?)), + DHTSchemaSMPL::FCC => Ok(DHTSchema::SMPL(DHTSchemaSMPL::try_from(b)?)), + _ => { + apibail_generic!("unknown fourcc"); + } + } + } +} + /// DHT Record Descriptor #[derive( Debug, Clone, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize,