From 36cb0687cb439e530da70a449858e5fc6e11463e Mon Sep 17 00:00:00 2001 From: John Smith Date: Tue, 25 Apr 2023 21:52:53 -0400 Subject: [PATCH] record work --- .../src/rpc_processor/coders/value_detail.rs | 38 ----- veilid-core/src/storage_manager/mod.rs | 146 +++++++++++++----- .../src/storage_manager/record_store.rs | 16 +- .../types/local_record_detail.rs | 15 ++ veilid-core/src/storage_manager/types/mod.rs | 10 ++ .../storage_manager/types/opened_record.rs | 21 +++ .../src/storage_manager/{ => types}/record.rs | 27 ++-- .../{ => types}/record_data.rs | 0 .../types/remote_record_detail.rs | 10 ++ .../src/storage_manager/types/value_detail.rs | 77 --------- veilid-core/src/veilid_api/routing_context.rs | 6 +- .../types/dht/dht_record_descriptor.rs | 25 ++- veilid-tools/src/tools.rs | 17 +- 13 files changed, 231 insertions(+), 177 deletions(-) delete mode 100644 veilid-core/src/rpc_processor/coders/value_detail.rs create mode 100644 veilid-core/src/storage_manager/types/local_record_detail.rs create mode 100644 veilid-core/src/storage_manager/types/opened_record.rs rename veilid-core/src/storage_manager/{ => types}/record.rs (82%) rename veilid-core/src/storage_manager/{ => types}/record_data.rs (100%) create mode 100644 veilid-core/src/storage_manager/types/remote_record_detail.rs delete mode 100644 veilid-core/src/storage_manager/types/value_detail.rs diff --git a/veilid-core/src/rpc_processor/coders/value_detail.rs b/veilid-core/src/rpc_processor/coders/value_detail.rs deleted file mode 100644 index 4eac9ee6..00000000 --- a/veilid-core/src/rpc_processor/coders/value_detail.rs +++ /dev/null @@ -1,38 +0,0 @@ -use super::*; -use crate::storage_manager::ValueDetail; - -pub fn encode_value_detail( - value_detail: &ValueDetail, - builder: &mut veilid_capnp::value_detail::Builder, -) -> Result<(), RPCError> { - let mut svdb = builder.reborrow().init_signed_value_data(); - encode_signed_value_data(value_detail.signed_value_data(), &mut svdb)?; - if let Some(descriptor) = value_detail.descriptor() { - let mut db = builder.reborrow().init_descriptor(); - encode_signed_value_descriptor(descriptor, &mut db)?; - } - Ok(()) -} - -pub fn decode_value_detail( - reader: &veilid_capnp::value_detail::Reader, -) -> Result { - let svdr = reader.get_signed_value_data().map_err(RPCError::protocol)?; - let signed_value_data = decode_signed_value_data(&svdr)?; - - let descriptor = if reader.has_descriptor() { - let dr = reader - .reborrow() - .get_descriptor() - .map_err(RPCError::protocol)?; - let descriptor = decode_signed_value_descriptor(&dr)?; - Some(descriptor) - } else { - None - }; - - Ok(ValueDetail { - signed_value_data, - descriptor, - }) -} diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 68720be4..e20ba720 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -1,14 +1,10 @@ mod keys; -mod record; -mod record_data; mod record_store; mod record_store_limits; mod tasks; mod types; use keys::*; -use record::*; -use record_data::*; use record_store::*; use record_store_limits::*; @@ -28,10 +24,12 @@ const FLUSH_RECORD_STORES_INTERVAL_SECS: u32 = 1; struct StorageManagerInner { /// If we are started up initialized: bool, - /// Records that have been 'created' or 'opened' by this node - local_record_store: Option, - /// Records that have been pushed to this node for distribution by other nodes - remote_record_store: Option, + /// Records that have been 'opened' and are not yet closed + opened_records: HashMap, + /// Records that have ever been 'created' or 'opened' by this node, things we care about that we must republish to keep alive + local_record_store: Option>, + /// Records that have been pushed to this node for distribution by other nodes, that we make an effort to republish + remote_record_store: Option>, /// RPC processor if it is available rpc_processor: Option, /// Background processing task (not part of attachment manager tick tree so it happens when detached too) @@ -75,6 +73,7 @@ impl StorageManager { fn new_inner() -> StorageManagerInner { StorageManagerInner { initialized: false, + opened_records: HashMap::new(), local_record_store: None, remote_record_store: None, rpc_processor: None, @@ -201,7 +200,7 @@ impl StorageManager { } /// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ] - fn get_key(&self, vcrypto: CryptoSystemVersion, record: &Record) -> TypedKey { + fn get_key(vcrypto: CryptoSystemVersion, record: &Record) -> TypedKey { let compiled = record.descriptor().schema_data(); let mut hash_data = Vec::::with_capacity(PUBLIC_KEY_LENGTH + 4 + compiled.len()); hash_data.extend_from_slice(&vcrypto.kind().0); @@ -211,20 +210,12 @@ impl StorageManager { TypedKey::new(vcrypto.kind(), hash) } - async fn new_local_record( - &self, - vcrypto: CryptoSystemVersion, - record: Record, - ) -> Result { - // add value record to record store - let mut inner = self.inner.lock().await; + async fn lock(&self) -> Result, VeilidAPIError> { + let inner = asyncmutex_lock_arc!(&self.inner); if !inner.initialized { apibail_generic!("not initialized"); } - let local_record_store = inner.local_record_store.as_mut().unwrap(); - let key = self.get_key(vcrypto.clone(), &record); - local_record_store.new_record(key, record).await?; - Ok(key) + Ok(inner) } pub async fn create_record( @@ -232,7 +223,9 @@ impl StorageManager { kind: CryptoKind, schema: DHTSchema, safety_selection: SafetySelection, - ) -> Result { + ) -> Result { + let mut inner = self.lock().await?; + // Get cryptosystem let Some(vcrypto) = self.unlocked_inner.crypto.get(kind) else { apibail_generic!("unsupported cryptosystem"); @@ -254,34 +247,113 @@ impl StorageManager { // Add new local value record let cur_ts = get_aligned_timestamp(); - let record = Record::new( - cur_ts, - signed_value_descriptor, - Some(owner.secret), - safety_selection, - )?; - let dht_key = self - .new_local_record(vcrypto, record) - .await - .map_err(VeilidAPIError::internal)?; + let local_record_detail = LocalRecordDetail { safety_selection }; + let record = + Record::::new(cur_ts, signed_value_descriptor, local_record_detail)?; - Ok(dht_key) + let local_record_store = inner.local_record_store.as_mut().unwrap(); + let dht_key = Self::get_key(vcrypto.clone(), &record); + local_record_store.new_record(dht_key, record).await?; + + // Open the record + self.open_record_inner(inner, dht_key, Some(owner), safety_selection) + .await + } + + async fn open_record_inner( + &self, + mut inner: AsyncMutexGuardArc, + key: TypedKey, + writer: Option, + safety_selection: SafetySelection, + ) -> Result { + // Get cryptosystem + let Some(vcrypto) = self.unlocked_inner.crypto.get(key.kind) else { + apibail_generic!("unsupported cryptosystem"); + }; + + // See if we have a local record already or not + let cb = |r: &Record| { + // Process local record + (r.owner().clone(), r.schema()) + }; + if let Some((owner, schema)) = inner.local_record_store.unwrap().with_record(key, cb) { + // Had local record + + // If the writer we chose is also the owner, we have the owner secret + // Otherwise this is just another subkey writer + let owner_secret = if let Some(writer) = writer { + if writer.key == owner { + Some(writer.secret) + } else { + None + } + } else { + None + }; + + // Write open record + inner.opened_records.insert(key, OpenedRecord { writer }); + + // Make DHT Record Descriptor to return + let descriptor = DHTRecordDescriptor { + key, + owner, + owner_secret, + schema, + }; + Ok(descriptor) + } else { + // No record yet + + // Make DHT Record Descriptor to return + let descriptor = DHTRecordDescriptor { + key, + owner, + owner_secret, + schema, + }; + Ok(descriptor) + } } pub async fn open_record( &self, key: TypedKey, - secret: Option, + writer: Option, safety_selection: SafetySelection, ) -> Result { - unimplemented!(); + let inner = self.lock().await?; + self.open_record_inner(inner, key, writer, safety_selection) + .await + } + + async fn close_record_inner( + &self, + mut inner: AsyncMutexGuardArc, + key: TypedKey, + ) -> Result<(), VeilidAPIError> { + let Some(opened_record) = inner.opened_records.remove(&key) else { + apibail_generic!("record not open"); + }; + Ok(()) } pub async fn close_record(&self, key: TypedKey) -> Result<(), VeilidAPIError> { - unimplemented!(); + let inner = self.lock().await?; + self.close_record_inner(inner, key).await } pub async fn delete_record(&self, key: TypedKey) -> Result<(), VeilidAPIError> { + let inner = self.lock().await?; + + // Ensure the record is closed + if inner.opened_records.contains_key(&key) { + self.close_record_inner(inner, key).await?; + } + + // Remove + unimplemented!(); } @@ -291,6 +363,7 @@ impl StorageManager { subkey: ValueSubkey, force_refresh: bool, ) -> Result, VeilidAPIError> { + let inner = self.lock().await?; unimplemented!(); } @@ -300,6 +373,7 @@ impl StorageManager { subkey: ValueSubkey, data: Vec, ) -> Result, VeilidAPIError> { + let inner = self.lock().await?; unimplemented!(); } @@ -310,6 +384,7 @@ impl StorageManager { expiration: Timestamp, count: u32, ) -> Result { + let inner = self.lock().await?; unimplemented!(); } @@ -318,6 +393,7 @@ impl StorageManager { key: TypedKey, subkeys: &[ValueSubkeyRange], ) -> Result { + let inner = self.lock().await?; unimplemented!(); } } diff --git a/veilid-core/src/storage_manager/record_store.rs b/veilid-core/src/storage_manager/record_store.rs index 9490fc27..06b659c4 100644 --- a/veilid-core/src/storage_manager/record_store.rs +++ b/veilid-core/src/storage_manager/record_store.rs @@ -7,25 +7,25 @@ use super::*; use hashlink::LruCache; -pub struct RecordStore { +pub struct RecordStore { table_store: TableStore, name: String, limits: RecordStoreLimits, record_table: Option, subkey_table: Option, - record_index: LruCache, + record_index: LruCache>, subkey_cache: LruCache, subkey_cache_total_size: usize, total_storage_space: usize, - dead_records: Vec<(RecordTableKey, Record)>, + dead_records: Vec<(RecordTableKey, Record)>, changed_records: HashSet, purge_dead_records_mutex: Arc>, } -impl RecordStore { +impl RecordStore { pub fn new(table_store: TableStore, name: &str, limits: RecordStoreLimits) -> Self { let subkey_cache_size = limits.subkey_cache_size as usize; Self { @@ -92,7 +92,7 @@ impl RecordStore { Ok(()) } - fn add_dead_record(&mut self, key: RecordTableKey, record: Record) { + fn add_dead_record(&mut self, key: RecordTableKey, record: Record) { self.dead_records.push((key, record)); } @@ -135,7 +135,7 @@ impl RecordStore { async fn purge_dead_records(&mut self, lazy: bool) { let purge_dead_records_mutex = self.purge_dead_records_mutex.clone(); let _lock = if lazy { - match mutex_try_lock!(purge_dead_records_mutex) { + match asyncmutex_try_lock!(purge_dead_records_mutex) { Some(v) => v, None => { // If not ready now, just skip it if we're lazy @@ -221,7 +221,7 @@ impl RecordStore { pub async fn new_record( &mut self, key: TypedKey, - record: Record, + record: Record, ) -> Result<(), VeilidAPIError> { let rtk = RecordTableKey { key }; if self.record_index.contains_key(&rtk) { @@ -269,7 +269,7 @@ impl RecordStore { pub fn with_record(&mut self, key: TypedKey, f: F) -> Option where - F: FnOnce(&Record) -> R, + F: FnOnce(&Record) -> R, { // Get record from index let mut out = None; diff --git a/veilid-core/src/storage_manager/types/local_record_detail.rs b/veilid-core/src/storage_manager/types/local_record_detail.rs new file mode 100644 index 00000000..8c02f85e --- /dev/null +++ b/veilid-core/src/storage_manager/types/local_record_detail.rs @@ -0,0 +1,15 @@ +use super::*; + +use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize}; +use serde::*; + +/// Information required to handle locally opened records +#[derive( + Clone, Debug, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize, +)] +#[archive_attr(repr(C), derive(CheckBytes))] +pub struct LocalRecordDetail { + /// The last 'safety selection' used when creating/opening this record. + /// Even when closed, this safety selection applies to republication attempts by the system. + safety_selection: SafetySelection, +} diff --git a/veilid-core/src/storage_manager/types/mod.rs b/veilid-core/src/storage_manager/types/mod.rs index 9eea4ad8..a295241b 100644 --- a/veilid-core/src/storage_manager/types/mod.rs +++ b/veilid-core/src/storage_manager/types/mod.rs @@ -1,7 +1,17 @@ +mod local_record_detail; +mod opened_record; +mod record; +mod record_data; +mod remote_record_detail; mod signed_value_data; mod signed_value_descriptor; use super::*; +pub use local_record_detail::*; +pub use opened_record::*; +pub use record::*; +pub use record_data::*; +pub use remote_record_detail::*; pub use signed_value_data::*; pub use signed_value_descriptor::*; diff --git a/veilid-core/src/storage_manager/types/opened_record.rs b/veilid-core/src/storage_manager/types/opened_record.rs new file mode 100644 index 00000000..17424bfa --- /dev/null +++ b/veilid-core/src/storage_manager/types/opened_record.rs @@ -0,0 +1,21 @@ +use super::*; + +/// The state associated with a local record when it is opened +/// This is not serialized to storage as it is ephemeral for the lifetime of the opened record +#[derive(Clone, Debug, Default)] +pub struct OpenedRecord { + /// The key pair used to perform writes to subkey on this opened record + /// Without this, set_value() will fail regardless of which key or subkey is being written to + /// as all writes are signed + writer: Option, +} + +impl OpenedRecord { + pub fn new(writer: Option) -> Self { + Self { writer } + } + + pub fn writer(&self) -> Option<&KeyPair> { + self.writer.as_ref() + } +} diff --git a/veilid-core/src/storage_manager/record.rs b/veilid-core/src/storage_manager/types/record.rs similarity index 82% rename from veilid-core/src/storage_manager/record.rs rename to veilid-core/src/storage_manager/types/record.rs index b80a8d47..6b07eafe 100644 --- a/veilid-core/src/storage_manager/record.rs +++ b/veilid-core/src/storage_manager/types/record.rs @@ -6,32 +6,28 @@ use serde::*; Clone, Debug, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize, )] #[archive_attr(repr(C), derive(CheckBytes))] -pub struct Record { - last_touched_ts: Timestamp, +pub struct Record { descriptor: SignedValueDescriptor, subkey_count: usize, - - owner_secret: Option, - safety_selection: SafetySelection, + last_touched_ts: Timestamp, record_data_size: usize, + detail: D, } -impl Record { +impl Record { pub fn new( cur_ts: Timestamp, descriptor: SignedValueDescriptor, - owner_secret: Option, - safety_selection: SafetySelection, + detail: D, ) -> Result { let schema = descriptor.schema()?; let subkey_count = schema.subkey_count(); Ok(Self { - last_touched_ts: cur_ts, descriptor, subkey_count, - owner_secret, - safety_selection, + last_touched_ts: cur_ts, record_data_size: 0, + detail, }) } @@ -68,6 +64,13 @@ impl Record { } pub fn total_size(&self) -> usize { - mem::size_of::() + self.descriptor.total_size() + self.record_data_size + mem::size_of::>() + self.descriptor.total_size() + self.record_data_size + } + + pub fn detail(&self) -> &D { + &self.detail + } + pub fn detail_mut(&mut self) -> &mut D { + &mut self.detail } } diff --git a/veilid-core/src/storage_manager/record_data.rs b/veilid-core/src/storage_manager/types/record_data.rs similarity index 100% rename from veilid-core/src/storage_manager/record_data.rs rename to veilid-core/src/storage_manager/types/record_data.rs diff --git a/veilid-core/src/storage_manager/types/remote_record_detail.rs b/veilid-core/src/storage_manager/types/remote_record_detail.rs new file mode 100644 index 00000000..40d41895 --- /dev/null +++ b/veilid-core/src/storage_manager/types/remote_record_detail.rs @@ -0,0 +1,10 @@ +use super::*; + +use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize}; +use serde::*; + +#[derive( + Clone, Debug, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize, +)] +#[archive_attr(repr(C), derive(CheckBytes))] +pub struct RemoteRecordDetail {} diff --git a/veilid-core/src/storage_manager/types/value_detail.rs b/veilid-core/src/storage_manager/types/value_detail.rs deleted file mode 100644 index 4f633dcc..00000000 --- a/veilid-core/src/storage_manager/types/value_detail.rs +++ /dev/null @@ -1,77 +0,0 @@ -use super::*; -use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize}; -use serde::*; - -///////////////////////////////////////////////////////////////////////////////////////////////////// -/// - -#[derive( - Clone, - Debug, - PartialOrd, - PartialEq, - Eq, - Ord, - Serialize, - Deserialize, - RkyvArchive, - RkyvSerialize, - RkyvDeserialize, -)] -#[archive_attr(repr(C), derive(CheckBytes))] -pub struct ValueDetail { - signed_value_data: SignedValueData, - descriptor: Option, -} - -impl ValueDetail { - pub fn new( - signed_value_data: SignedValueData, - descriptor: Option, - ) -> Self { - Self { - signed_value_data, - descriptor, - } - } - - pub fn validate( - &self, - last_descriptor: Option<&SignedValueDescriptor>, - subkey: ValueSubkey, - vcrypto: CryptoSystemVersion, - ) -> Result<(), VeilidAPIError> { - // Get descriptor to validate with - let descriptor = if let Some(descriptor) = &self.descriptor { - if let Some(last_descriptor) = last_descriptor { - if descriptor.cmp_no_sig(&last_descriptor) != cmp::Ordering::Equal { - return Err(VeilidAPIError::generic( - "value detail descriptor does not match last descriptor", - )); - } - } - descriptor - } else { - let Some(descriptor) = last_descriptor else { - return Err(VeilidAPIError::generic( - "no last descriptor, requires a descriptor", - )); - }; - descriptor - }; - - // Ensure the descriptor itself validates - descriptor.validate(vcrypto.clone())?; - - // And the signed value data - self.signed_value_data - .validate(descriptor.owner(), subkey, vcrypto) - } - - pub fn signed_value_data(&self) -> &SignedValueData { - &self.signed_value_data - } - pub fn descriptor(&self) -> Option<&SignedValueDescriptor> { - self.descriptor.as_ref() - } -} diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index c61d26af..ef0607d6 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -203,7 +203,7 @@ impl RoutingContext { &self, kind: CryptoKind, schema: DHTSchema, - ) -> Result { + ) -> Result { let storage_manager = self.api.storage_manager()?; storage_manager .create_record(kind, schema, self.unlocked_inner.safety_selection) @@ -216,7 +216,7 @@ impl RoutingContext { pub async fn open_dht_record( &self, key: TypedKey, - secret: Option, + writer: Option, ) -> Result { let storage_manager = self.api.storage_manager()?; storage_manager @@ -232,7 +232,7 @@ impl RoutingContext { } /// Deletes a DHT record at a specific key. If the record is opened, it must be closed before it is deleted. - /// Deleting a record does not delete it from the network immediately, but will remove the storage of the record + /// Deleting a record does not delete it from the network, but will remove the storage of the record /// locally, and will prevent its value from being refreshed on the network by this node. pub async fn delete_dht_record(&self, key: TypedKey) -> Result<(), VeilidAPIError> { let storage_manager = self.api.storage_manager()?; diff --git a/veilid-core/src/veilid_api/types/dht/dht_record_descriptor.rs b/veilid-core/src/veilid_api/types/dht/dht_record_descriptor.rs index 24e18337..1c4d114e 100644 --- a/veilid-core/src/veilid_api/types/dht/dht_record_descriptor.rs +++ b/veilid-core/src/veilid_api/types/dht/dht_record_descriptor.rs @@ -16,19 +16,40 @@ use super::*; )] #[archive_attr(repr(C), derive(CheckBytes))] pub struct DHTRecordDescriptor { + /// DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ] + key: TypedKey, + /// The public key of the owner owner: PublicKey, + /// If this key is being created: Some(the secret key of the owner) + /// If this key is just being opened: None + owner_secret: Option, + /// The schema in use associated with the key schema: DHTSchema, } impl DHTRecordDescriptor { - pub fn new(owner: PublicKey, schema: DHTSchema) -> Self { - Self { owner, schema } + pub fn new( + key: TypedKey, + owner: PublicKey, + owner_secret: Option, + schema: DHTSchema, + ) -> Self { + Self { + key, + owner, + owner_secret, + schema, + } } pub fn owner(&self) -> &PublicKey { &self.owner } + pub fn owner_secret(&self) -> Option<&SecretKey> { + self.owner_secret.as_ref() + } + pub fn schema(&self) -> &DHTSchema { &self.schema } diff --git a/veilid-tools/src/tools.rs b/veilid-tools/src/tools.rs index 380b30ad..a2ecb0fa 100644 --- a/veilid-tools/src/tools.rs +++ b/veilid-tools/src/tools.rs @@ -35,18 +35,31 @@ macro_rules! bail_io_error_other { cfg_if::cfg_if! { if #[cfg(feature="rt-tokio")] { #[macro_export] - macro_rules! mutex_try_lock { + macro_rules! asyncmutex_try_lock { ($x:expr) => { $x.try_lock().ok() }; } + + #[macro_export] + macro_rules! asyncmutex_lock_arc { + ($x:expr) => { + $x.clone().lock_owned().await + }; + } } else { #[macro_export] - macro_rules! mutex_try_lock { + macro_rules! asyncmutex_try_lock { ($x:expr) => { $x.try_lock() }; } + #[macro_export] + macro_rules! asyncmutex_lock_arc { + ($x:expr) => { + $x.lock_arc().await + }; + } } }