From a3e2dbc74475bf4ca6358c179793177bcdb8b6d6 Mon Sep 17 00:00:00 2001 From: John Smith Date: Mon, 8 May 2023 22:05:51 -0400 Subject: [PATCH] checkpoint --- veilid-core/src/storage_manager/mod.rs | 224 ++++++++++++------ .../storage_manager/storage_manager_inner.rs | 129 +++++++++- .../serialize_helpers/rkyv_range_set_blaze.rs | 12 +- veilid-core/src/veilid_api/types/dht/mod.rs | 2 + .../src/veilid_api/types/dht/value_data.rs | 10 - .../types/dht/value_subkey_range_set.rs | 46 ++++ 6 files changed, 337 insertions(+), 86 deletions(-) create mode 100644 veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index c390ad80..528e339b 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -61,34 +61,6 @@ impl StorageManager { StorageManagerInner::new(unlocked_inner) } - fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { - let c = config.get(); - RecordStoreLimits { - subkey_cache_size: c.network.dht.local_subkey_cache_size as usize, - max_subkey_size: MAX_SUBKEY_SIZE, - max_record_total_size: MAX_RECORD_DATA_SIZE, - max_records: None, - max_subkey_cache_memory_mb: Some( - c.network.dht.local_max_subkey_cache_memory_mb as usize, - ), - max_storage_space_mb: None, - } - } - - fn remote_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { - let c = config.get(); - RecordStoreLimits { - subkey_cache_size: c.network.dht.remote_subkey_cache_size as usize, - max_subkey_size: MAX_SUBKEY_SIZE, - max_record_total_size: MAX_RECORD_DATA_SIZE, - max_records: Some(c.network.dht.remote_max_records as usize), - max_subkey_cache_memory_mb: Some( - c.network.dht.remote_max_subkey_cache_memory_mb as usize, - ), - max_storage_space_mb: Some(c.network.dht.remote_max_storage_space_mb as usize), - } - } - pub fn new( config: VeilidConfig, crypto: Crypto, @@ -118,39 +90,7 @@ impl StorageManager { debug!("startup storage manager"); let mut inner = self.inner.lock().await; - let local_limits = Self::local_limits_from_config(self.unlocked_inner.config.clone()); - let remote_limits = Self::remote_limits_from_config(self.unlocked_inner.config.clone()); - - let mut local_record_store = RecordStore::new( - self.unlocked_inner.table_store.clone(), - "local", - local_limits, - ); - local_record_store.init().await?; - - let mut remote_record_store = RecordStore::new( - self.unlocked_inner.table_store.clone(), - "remote", - remote_limits, - ); - remote_record_store.init().await?; - - inner.local_record_store = Some(local_record_store); - inner.remote_record_store = Some(remote_record_store); - - // Schedule tick - let this = self.clone(); - let tick_future = interval(1000, move || { - let this = this.clone(); - async move { - if let Err(e) = this.tick().await { - warn!("storage manager tick failed: {}", e); - } - } - }); - inner.tick_future = Some(tick_future); - - inner.initialized = true; + inner.init(self.clone()).await?; Ok(()) } @@ -159,12 +99,7 @@ impl StorageManager { debug!("starting storage manager shutdown"); let mut inner = self.inner.lock().await; - - // Stop ticker - let tick_future = inner.tick_future.take(); - if let Some(f) = tick_future { - f.await; - } + inner.terminate().await; // Cancel all tasks self.cancel_tasks().await; @@ -353,14 +288,165 @@ impl StorageManager { Ok(Some(subkey_result_value.into_value_data())) } + /// Set the value of a subkey on an opened local record + /// Puts changes to the network immediately and may refresh the record if the there is a newer subkey available online pub async fn set_value( &self, key: TypedKey, subkey: ValueSubkey, data: Vec, ) -> Result, VeilidAPIError> { - let inner = self.lock().await?; - unimplemented!(); + let mut inner = self.lock().await?; + + // Get cryptosystem + let Some(vcrypto) = self.unlocked_inner.crypto.get(key.kind) else { + apibail_generic!("unsupported cryptosystem"); + }; + + let Some(opened_record) = inner.opened_records.remove(&key) else { + apibail_generic!("record not open"); + }; + + // If we don't have a writer then we can't write + let Some(writer) = opened_record.writer().cloned() else { + apibail_generic!("value is not writable"); + }; + + // See if the subkey we are modifying has a last known local value + let last_subkey_result = inner.handle_get_local_value(key, subkey, true)?; + + // Get the descriptor and schema for the key + let Some(descriptor) = last_subkey_result.descriptor else { + apibail_generic!("must have a descriptor"); + }; + let schema = descriptor.schema()?; + + // Make new subkey data + let value_data = if let Some(signed_value_data) = last_subkey_result.value { + let seq = signed_value_data.value_data().seq(); + ValueData::new_with_seq(seq + 1, data, writer.key) + } else { + ValueData::new(data, writer.key) + }; + + // Validate with schema + if !schema.check_subkey_value_data(descriptor.owner(), subkey, &value_data) { + // Validation failed, ignore this value + apibail_generic!("failed schema validation"); + } + + // Sign the new value data with the writer + let signed_value_data = SignedValueData::make_signature( + value_data, + descriptor.owner(), + subkey, + vcrypto, + writer.secret, + )?; + let subkey_result = SubkeyResult { + value: Some(signed_value_data), + descriptor: Some(descriptor) + }; + + // Get rpc processor and drop mutex so we don't block while getting the value from the network + let Some(rpc_processor) = inner.rpc_processor.clone() else { + // Offline, just write it locally and return immediately + inner + .handle_set_local_value(key, subkey, signed_value_data) + .await?; + }; + + // Drop the lock for network access + drop(inner); + + // Use the safety selection we opened the record with + let final_subkey_result = self + .do_set_value( + rpc_processor, + key, + subkey, + opened_record.safety_selection(), + subkey_result, + ) + .await?; + + // See if we got a value back + let Some(subkey_result_value) = subkey_result.value else { + // If we got nothing back then we also had nothing beforehand, return nothing + return Ok(None); + }; + + // If we got a new value back then write it to the opened record + if Some(subkey_result_value.value_data().seq()) != opt_last_seq { + let mut inner = self.lock().await?; + inner + .handle_set_local_value(key, subkey, subkey_result_value.clone()) + .await?; + } + Ok(Some(subkey_result_value.into_value_data())) + + + + + + + + + + + // Store subkey locally + inner + .handle_set_local_value(key, subkey, signed_value_data) + .await?; + + // Return the existing value if we have one unless we are forcing a refresh + if !force_refresh { + if let Some(last_subkey_result_value) = last_subkey_result.value { + return Ok(Some(last_subkey_result_value.into_value_data())); + } + } + + // Refresh if we can + + // Get rpc processor and drop mutex so we don't block while getting the value from the network + let Some(rpc_processor) = inner.rpc_processor.clone() else { + // Offline, try again later + apibail_try_again!(); + }; + + // Drop the lock for network access + drop(inner); + + // May have last descriptor / value + // Use the safety selection we opened the record with + let opt_last_seq = last_subkey_result + .value + .as_ref() + .map(|v| v.value_data().seq()); + let subkey_result = self + .do_get_value( + rpc_processor, + key, + subkey, + opened_record.safety_selection(), + last_subkey_result, + ) + .await?; + + // See if we got a value back + let Some(subkey_result_value) = subkey_result.value else { + // If we got nothing back then we also had nothing beforehand, return nothing + return Ok(None); + }; + + // If we got a new value back then write it to the opened record + if Some(subkey_result_value.value_data().seq()) != opt_last_seq { + let mut inner = self.lock().await?; + inner + .handle_set_local_value(key, subkey, subkey_result_value.clone()) + .await?; + } + Ok(Some(subkey_result_value.into_value_data())) } pub async fn watch_values( diff --git a/veilid-core/src/storage_manager/storage_manager_inner.rs b/veilid-core/src/storage_manager/storage_manager_inner.rs index d756cbe8..149feb6f 100644 --- a/veilid-core/src/storage_manager/storage_manager_inner.rs +++ b/veilid-core/src/storage_manager/storage_manager_inner.rs @@ -11,12 +11,44 @@ pub(super) struct StorageManagerInner { pub local_record_store: Option>, /// Records that have been pushed to this node for distribution by other nodes, that we make an effort to republish pub remote_record_store: Option>, + /// Record subkeys that have not been pushed to the network because they were written to offline + pub offline_subkey_writes: HashMap, + /// Storage manager metadata that is persistent, including copy of offline subkey writes + pub metadata_db: Option, /// RPC processor if it is available pub rpc_processor: Option, /// Background processing task (not part of attachment manager tick tree so it happens when detached too) pub tick_future: Option>, } +fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { + let c = config.get(); + RecordStoreLimits { + subkey_cache_size: c.network.dht.local_subkey_cache_size as usize, + max_subkey_size: MAX_SUBKEY_SIZE, + max_record_total_size: MAX_RECORD_DATA_SIZE, + max_records: None, + max_subkey_cache_memory_mb: Some( + c.network.dht.local_max_subkey_cache_memory_mb as usize, + ), + max_storage_space_mb: None, + } +} + +fn remote_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { + let c = config.get(); + RecordStoreLimits { + subkey_cache_size: c.network.dht.remote_subkey_cache_size as usize, + max_subkey_size: MAX_SUBKEY_SIZE, + max_record_total_size: MAX_RECORD_DATA_SIZE, + max_records: Some(c.network.dht.remote_max_records as usize), + max_subkey_cache_memory_mb: Some( + c.network.dht.remote_max_subkey_cache_memory_mb as usize, + ), + max_storage_space_mb: Some(c.network.dht.remote_max_storage_space_mb as usize), + } +} + impl StorageManagerInner { pub fn new(unlocked_inner: Arc) -> Self { Self { @@ -25,11 +57,106 @@ impl StorageManagerInner { opened_records: Default::default(), local_record_store: Default::default(), remote_record_store: Default::default(), + offline_subkey_writes: Default::default(), + metadata_db: Default::default(), rpc_processor: Default::default(), tick_future: Default::default(), } } + pub async fn init(&mut self, outer_self: StorageManager) -> EyreResult<()> { + + let metadata_db = self.unlocked_inner + .table_store + .open(&format!("storage_manager_metadata"), 1) + .await?; + + let local_limits = local_limits_from_config(self.unlocked_inner.config.clone()); + let remote_limits = remote_limits_from_config(self.unlocked_inner.config.clone()); + + let mut local_record_store = RecordStore::new( + self.unlocked_inner.table_store.clone(), + "local", + local_limits, + ); + local_record_store.init().await?; + + let mut remote_record_store = RecordStore::new( + self.unlocked_inner.table_store.clone(), + "remote", + remote_limits, + ); + remote_record_store.init().await?; + + self.metadata_db = Some(metadata_db); + self.local_record_store = Some(local_record_store); + self.remote_record_store = Some(remote_record_store); + + self.load_metadata().await?; + + // Schedule tick + let tick_future = interval(1000, move || { + let this = outer_self.clone(); + async move { + if let Err(e) = this.tick().await { + log_stor!(warn "storage manager tick failed: {}", e); + } + } + }); + self.tick_future = Some(tick_future); + + self.initialized = true; + + Ok(()) + } + + pub async fn terminate(&mut self) { + + // Stop ticker + let tick_future = self.tick_future.take(); + if let Some(f) = tick_future { + f.await; + } + + // Final flush on record stores + if let Some(mut local_record_store) = self.local_record_store.take() { + local_record_store.tick().await; + } + if let Some(mut remote_record_store) = self.remote_record_store.take() { + remote_record_store.tick().await; + } + + // Save metadata + if self.metadata_db.is_some() { + if let Err(e) = self.save_metadata().await { + log_stor!(error "termination metadata save failed: {}", e); + } + self.metadata_db = None; + } + self.offline_subkey_writes.clear(); + + // Mark not initialized + self.initialized = false; + } + + async fn save_metadata(&mut self) -> EyreResult<()>{ + if let Some(metadata_db) = &self.metadata_db { + let tx = metadata_db.transact(); + tx.store_rkyv(0, b"offline_subkey_writes", &self.offline_subkey_writes); + tx.commit().await.wrap_err("failed to commit")? + } + Ok(()) + } + + async fn load_metadata(&mut self) -> EyreResult<()> { + if let Some(metadata_db) = &self.metadata_db { + self.offline_subkey_writes = metadata_db.load_rkyv(0, b"offline_subkey_writes")?.unwrap_or_default(); + } + Ok(()) + } + + write offline subkey write flush background task or make a ticket for it and get back to it after the rest of set value + pub async fn create_new_owned_local_record( &mut self, kind: CryptoKind, @@ -223,7 +350,7 @@ impl StorageManagerInner { ) -> Result<(), VeilidAPIError> { // See if it's in the local record store let Some(local_record_store) = self.local_record_store.as_mut() else { - apibail_not_initialized!(); + apibail_not_initialized!(); }; // Write subkey to local store diff --git a/veilid-core/src/veilid_api/serialize_helpers/rkyv_range_set_blaze.rs b/veilid-core/src/veilid_api/serialize_helpers/rkyv_range_set_blaze.rs index 1acc20f6..6a2c4736 100644 --- a/veilid-core/src/veilid_api/serialize_helpers/rkyv_range_set_blaze.rs +++ b/veilid-core/src/veilid_api/serialize_helpers/rkyv_range_set_blaze.rs @@ -52,20 +52,20 @@ where D: rkyv::Fallible + ?Sized, T: rkyv::Archive + Integer, rkyv::Archived: rkyv::Deserialize, - D::Error: From, + // D::Error: From, // xxx this doesn't work { fn deserialize_with( field: &rkyv::Archived>, deserializer: &mut D, ) -> Result, D::Error> { let mut out = RangeSetBlaze::::new(); - if field.len() % 2 == 1 { - return Err("invalid range set length".to_owned().into()); - } + // if field.len() % 2 == 1 { + // return Err("invalid range set length".to_owned().into()); + // } let f = field.as_slice(); for i in 0..field.len() / 2 { - let l: T = f[i].deserialize(deserializer)?; - let u: T = f[i + 1].deserialize(deserializer)?; + let l: T = f[i * 2].deserialize(deserializer)?; + let u: T = f[i * 2 + 1].deserialize(deserializer)?; out.ranges_insert(l..=u); } Ok(out) diff --git a/veilid-core/src/veilid_api/types/dht/mod.rs b/veilid-core/src/veilid_api/types/dht/mod.rs index 55ca7803..3a830b47 100644 --- a/veilid-core/src/veilid_api/types/dht/mod.rs +++ b/veilid-core/src/veilid_api/types/dht/mod.rs @@ -1,12 +1,14 @@ mod dht_record_descriptor; mod schema; mod value_data; +mod value_subkey_range_set; use super::*; pub use dht_record_descriptor::*; pub use schema::*; pub use value_data::*; +pub use value_subkey_range_set::*; /// Value subkey pub type ValueSubkey = u32; diff --git a/veilid-core/src/veilid_api/types/dht/value_data.rs b/veilid-core/src/veilid_api/types/dht/value_data.rs index 0cb34143..734e735d 100644 --- a/veilid-core/src/veilid_api/types/dht/value_data.rs +++ b/veilid-core/src/veilid_api/types/dht/value_data.rs @@ -48,16 +48,6 @@ impl ValueData { &self.data } - pub fn with_data_mut(&mut self, f: F) -> R - where - F: FnOnce(&mut Vec) -> R, - { - let out = f(&mut self.data); - assert!(self.data.len() <= Self::MAX_LEN); - self.seq += 1; - out - } - pub fn total_size(&self) -> usize { mem::size_of::() + self.data.len() } diff --git a/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs b/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs new file mode 100644 index 00000000..5cac4c93 --- /dev/null +++ b/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs @@ -0,0 +1,46 @@ +use super::*; +use core::ops::{Deref, DerefMut}; +use range_set_blaze::*; + +#[derive( + Clone, + Debug, + Default, + PartialOrd, + PartialEq, + Eq, + Ord, + Serialize, + Deserialize, + RkyvArchive, + RkyvSerialize, + RkyvDeserialize, +)] +#[archive_attr(repr(C), derive(CheckBytes))] +pub struct ValueSubkeyRangeSet { + #[with(RkyvRangeSetBlaze)] + #[serde(with = "serialize_range_set_blaze")] + data: RangeSetBlaze, +} + +impl ValueSubkeyRangeSet { + pub fn new() -> Self { + Self { + data: Default::default(), + } + } +} + +impl Deref for ValueSubkeyRangeSet { + type Target = RangeSetBlaze; + + fn deref(&self) -> &Self::Target { + &self.data + } +} + +impl DerefMut for ValueSubkeyRangeSet { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.data + } +}