diff --git a/veilid-core/src/routing_table/find_peers.rs b/veilid-core/src/routing_table/find_peers.rs new file mode 100644 index 00000000..54a45796 --- /dev/null +++ b/veilid-core/src/routing_table/find_peers.rs @@ -0,0 +1,103 @@ +use super::*; + +impl RoutingTable { + /// Utility to find all closest nodes to a particular key, including possibly our own node and nodes further away from the key than our own, returning their peer info + pub fn find_all_closest_peers(&self, key: TypedKey) -> NetworkResult> { + let Some(own_peer_info) = self.get_own_peer_info(RoutingDomain::PublicInternet) else { + // Our own node info is not yet available, drop this request. + return NetworkResult::service_unavailable(); + }; + + // find N nodes closest to the target node in our routing table + let filter = Box::new( + move |rti: &RoutingTableInner, opt_entry: Option>| { + // Ensure only things that are valid/signed in the PublicInternet domain are returned + rti.filter_has_valid_signed_node_info( + RoutingDomain::PublicInternet, + true, + opt_entry, + ) + }, + ) as RoutingTableEntryFilter; + let filters = VecDeque::from([filter]); + + let node_count = { + let c = self.config.get(); + c.network.dht.max_find_node_count as usize + }; + + let closest_nodes = self.find_closest_nodes( + node_count, + key, + filters, + // transform + |rti, entry| { + rti.transform_to_peer_info(RoutingDomain::PublicInternet, &own_peer_info, entry) + }, + ); + + NetworkResult::value(closest_nodes) + } + + /// Utility to find nodes that are closer to a key than our own node, returning their peer info + pub fn find_peers_closer_to_key(&self, key: TypedKey) -> NetworkResult> { + // add node information for the requesting node to our routing table + let crypto_kind = key.kind; + let own_node_id = self.node_id(crypto_kind); + + // find N nodes closest to the target node in our routing table + // ensure the nodes returned are only the ones closer to the target node than ourself + let Some(vcrypto) = self.crypto().get(crypto_kind) else { + return NetworkResult::invalid_message("unsupported cryptosystem"); + }; + let own_distance = vcrypto.distance(&own_node_id.value, &key.value); + + let filter = Box::new( + move |rti: &RoutingTableInner, opt_entry: Option>| { + // Exclude our own node + let Some(entry) = opt_entry else { + return false; + }; + // Ensure only things that are valid/signed in the PublicInternet domain are returned + if !rti.filter_has_valid_signed_node_info( + RoutingDomain::PublicInternet, + true, + Some(entry.clone()), + ) { + return false; + } + // Ensure things further from the key than our own node are not included + let Some(entry_node_id) = entry.with(rti, |_rti, e| e.node_ids().get(crypto_kind)) else { + return false; + }; + let entry_distance = vcrypto.distance(&entry_node_id.value, &key.value); + if entry_distance >= own_distance { + return false; + } + + true + }, + ) as RoutingTableEntryFilter; + let filters = VecDeque::from([filter]); + + let node_count = { + let c = self.config.get(); + c.network.dht.max_find_node_count as usize + }; + + // + let closest_nodes = self.find_closest_nodes( + node_count, + key, + filters, + // transform + |rti, entry| { + entry.unwrap().with(rti, |_rti, e| { + e.make_peer_info(RoutingDomain::PublicInternet).unwrap() + }) + }, + ); + + NetworkResult::value(closest_nodes) + } +} diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 8cdf65f7..f8457d3e 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -1,6 +1,7 @@ mod bucket; mod bucket_entry; mod debug; +mod find_peers; mod node_ref; mod node_ref_filter; mod privacy; @@ -22,6 +23,7 @@ use hashlink::LruCache; pub use bucket_entry::*; pub use debug::*; +pub use find_peers::*; pub use node_ref::*; pub use node_ref_filter::*; pub use privacy::*; diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index a910dc1b..07dcc1c2 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -90,48 +90,14 @@ impl RPCProcessor { }; let node_id = find_node_q.destructure(); - // add node information for the requesting node to our routing table + // Get a chunk of the routing table near the requested node id let routing_table = self.routing_table(); + let closest_nodes = network_result_try!(routing_table.find_all_closest_peers(node_id)); -xxx move this into routing table code, also do getvalue code - - let Some(own_peer_info) = routing_table.get_own_peer_info(RoutingDomain::PublicInternet) else { - // Our own node info is not yet available, drop this request. - return Ok(NetworkResult::service_unavailable()); - }; - - // find N nodes closest to the target node in our routing table - let filter = Box::new( - move |rti: &RoutingTableInner, opt_entry: Option>| { - // Ensure only things that are valid/signed in the PublicInternet domain are returned - rti.filter_has_valid_signed_node_info( - RoutingDomain::PublicInternet, - true, - opt_entry, - ) - }, - ) as RoutingTableEntryFilter; - let filters = VecDeque::from([filter]); - - let node_count = { - let c = self.config.get(); - c.network.dht.max_find_node_count as usize - }; - - let closest_nodes = routing_table.find_closest_nodes( - node_count, - node_id, - filters, - // transform - |rti, entry| { - rti.transform_to_peer_info(RoutingDomain::PublicInternet, &own_peer_info, entry) - }, - ); - - // Make status answer + // Make FindNode answer let find_node_a = RPCOperationFindNodeA::new(closest_nodes)?; - // Send status answer + // Send FindNode answer self.answer(msg, RPCAnswer::new(RPCAnswerDetail::FindNodeA(find_node_a))) .await } diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index f2cc5f7d..26b45861 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -113,69 +113,26 @@ impl RPCProcessor { // Destructure let (key, subkey, want_descriptor) = get_value_q.destructure(); - // add node information for the requesting node to our routing table - let crypto_kind = key.kind; + // Get the nodes that we know about that are closer to the the key than our own node let routing_table = self.routing_table(); - let own_node_id = routing_table.node_id(crypto_kind); + let closer_to_key_peers = network_result_try!(routing_table.find_peers_closer_to_key(key)); - // find N nodes closest to the target node in our routing table - // ensure the nodes returned are only the ones closer to the target node than ourself - let Some(vcrypto) = self.crypto.get(crypto_kind) else { - return Ok(NetworkResult::invalid_message("unsupported cryptosystem")); - }; - let own_distance = vcrypto.distance(&own_node_id.value, &key.value); + // See if we have this record ourselves + let storage_manager = self.storage_manager(); + let subkey_result = storage_manager + .handle_get_value(key, subkey, want_descriptor) + .await + .map_err(RPCError::internal)?; - let filter = Box::new( - move |rti: &RoutingTableInner, opt_entry: Option>| { - // Exclude our own node - let Some(entry) = opt_entry else { - return false; - }; - // Ensure only things that are valid/signed in the PublicInternet domain are returned - if !rti.filter_has_valid_signed_node_info( - RoutingDomain::PublicInternet, - true, - Some(entry.clone()), - ) { - return false; - } - // Ensure things further from the key than our own node are not included - let Some(entry_node_id) = entry.with(rti, |_rti, e| e.node_ids().get(crypto_kind)) else { - return false; - }; - let entry_distance = vcrypto.distance(&entry_node_id.value, &key.value); - if entry_distance >= own_distance { - return false; - } + // Make GetValue answer + let get_value_a = RPCOperationGetValueA::new( + subkey_result.value, + closer_to_key_peers, + subkey_result.descriptor, + )?; - true - }, - ) as RoutingTableEntryFilter; - let filters = VecDeque::from([filter]); - - let node_count = { - let c = self.config.get(); - c.network.dht.max_find_node_count as usize - }; - - // - let closest_nodes = routing_table.find_closest_nodes( - node_count, - key, - filters, - // transform - |rti, entry| { - entry.unwrap().with(rti, |_rti, e| { - e.make_peer_info(RoutingDomain::PublicInternet).unwrap() - }) - }, - ); - - // Make status answer - let find_node_a = RPCOperationFindNodeA::new(closest_nodes)?; - - // Send status answer - self.answer(msg, RPCAnswer::new(RPCAnswerDetail::FindNodeA(find_node_a))) + // Send GetValue answer + self.answer(msg, RPCAnswer::new(RPCAnswerDetail::GetValueA(get_value_a))) .await } } diff --git a/veilid-core/src/storage_manager/do_get_value.rs b/veilid-core/src/storage_manager/do_get_value.rs index c6567d7b..4671fe47 100644 --- a/veilid-core/src/storage_manager/do_get_value.rs +++ b/veilid-core/src/storage_manager/do_get_value.rs @@ -1,13 +1,5 @@ use super::*; -/// The result of the do_get_value_operation -pub struct DoGetValueResult { - /// The subkey value if we got one - pub value: Option, - /// The descriptor if we got a fresh one or empty if no descriptor was needed - pub descriptor: Option, -} - /// The context of the do_get_value operation struct DoGetValueContext { /// The latest value of the subkey, may be the value passed in @@ -22,6 +14,7 @@ struct DoGetValueContext { impl StorageManager { + /// Perform a 'get value' query on the network pub async fn do_get_value( &self, rpc_processor: RPCProcessor, @@ -30,7 +23,7 @@ impl StorageManager { last_value: Option, last_descriptor: Option, safety_selection: SafetySelection, - ) -> Result { + ) -> Result { let routing_table = rpc_processor.routing_table(); // Get the DHT parameters for 'GetValue' @@ -63,7 +56,7 @@ impl StorageManager { let context = context.clone(); let last_descriptor = last_descriptor.clone(); async move { - match rpc_processor + let vres = rpc_processor .clone() .rpc_call_get_value( Destination::direct(next_node).with_safety(safety_selection), @@ -71,75 +64,70 @@ impl StorageManager { subkey, last_descriptor, ) - .await - { - Ok(v) => { - let v = network_result_value_or_log!(v => { - // Any other failures, just try the next node - return Ok(None); - }); + .await?; + let gva = network_result_value_or_log!(vres => { + // Any other failures, just try the next node + return Ok(None); + }); - // Keep the descriptor if we got one. If we had a last_descriptor it will - // already be validated by rpc_call_get_value - if let Some(descriptor) = v.answer.descriptor { - let mut ctx = context.lock(); - if ctx.descriptor.is_none() && ctx.schema.is_none() { - ctx.schema = - Some(descriptor.schema().map_err(RPCError::invalid_format)?); - ctx.descriptor = Some(descriptor); - } - } - - // Keep the value if we got one and it is newer and it passes schema validation - if let Some(value) = v.answer.value { - let mut ctx = context.lock(); - - // Ensure we have a schema and descriptor - let (Some(descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else { - // Got a value but no descriptor for it - // Move to the next node - return Ok(None); - }; - - // Validate with schema - if !schema.check_subkey_value_data( - descriptor.owner(), - subkey, - value.value_data(), - ) { - // Validation failed, ignore this value - // Move to the next node - return Ok(None); - } - - // If we have a prior value, see if this is a newer sequence number - if let Some(prior_value) = &ctx.value { - let prior_seq = prior_value.value_data().seq(); - let new_seq = value.value_data().seq(); - - if new_seq == prior_seq { - // If sequence number is the same, the data should be the same - if prior_value.value_data() != value.value_data() { - // Move to the next node - return Ok(None); - } - // Increase the consensus count for the existing value - ctx.value_count += 1; - } else if new_seq > prior_seq { - // If the sequence number is greater, go with it - ctx.value = Some(value); - ctx.value_count = 1; - } else { - // If the sequence number is older, ignore it - } - } - } - - // Return peers if we have some - Ok(Some(v.answer.peers)) + // Keep the descriptor if we got one. If we had a last_descriptor it will + // already be validated by rpc_call_get_value + if let Some(descriptor) = gva.answer.descriptor { + let mut ctx = context.lock(); + if ctx.descriptor.is_none() && ctx.schema.is_none() { + ctx.schema = + Some(descriptor.schema().map_err(RPCError::invalid_format)?); + ctx.descriptor = Some(descriptor); } - Err(e) => Err(e), } + + // Keep the value if we got one and it is newer and it passes schema validation + if let Some(value) = gva.answer.value { + let mut ctx = context.lock(); + + // Ensure we have a schema and descriptor + let (Some(descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else { + // Got a value but no descriptor for it + // Move to the next node + return Ok(None); + }; + + // Validate with schema + if !schema.check_subkey_value_data( + descriptor.owner(), + subkey, + value.value_data(), + ) { + // Validation failed, ignore this value + // Move to the next node + return Ok(None); + } + + // If we have a prior value, see if this is a newer sequence number + if let Some(prior_value) = &ctx.value { + let prior_seq = prior_value.value_data().seq(); + let new_seq = value.value_data().seq(); + + if new_seq == prior_seq { + // If sequence number is the same, the data should be the same + if prior_value.value_data() != value.value_data() { + // Move to the next node + return Ok(None); + } + // Increase the consensus count for the existing value + ctx.value_count += 1; + } else if new_seq > prior_seq { + // If the sequence number is greater, go with it + ctx.value = Some(value); + ctx.value_count = 1; + } else { + // If the sequence number is older, ignore it + } + } + } + + // Return peers if we have some + Ok(Some(gva.answer.peers)) } }; @@ -173,7 +161,7 @@ impl StorageManager { TimeoutOr::Value(Ok(None)) => { // Return the best answer we've got let ctx = context.lock(); - Ok(DoGetValueResult{ + Ok(SubkeyResult{ value: ctx.value.clone(), descriptor: ctx.descriptor.clone(), }) @@ -185,4 +173,10 @@ impl StorageManager { } } } + + /// Handle a recieved 'Get Value' query + pub async fn handle_get_value(&self, key: TypedKey, subkey: ValueSubkey, want_descriptor: bool) -> Result { + let mut inner = self.lock().await?; + inner.handle_get_remote_value(key, subkey, want_descriptor) + } } diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 86f7301c..9c87d84d 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -2,12 +2,14 @@ mod do_get_value; mod keys; mod record_store; mod record_store_limits; +mod storage_manager_inner; mod tasks; mod types; use keys::*; use record_store::*; use record_store_limits::*; +use storage_manager_inner::*; pub use types::*; @@ -21,22 +23,6 @@ const MAX_RECORD_DATA_SIZE: usize = 1_048_576; /// Frequency to flush record stores to disk const FLUSH_RECORD_STORES_INTERVAL_SECS: u32 = 1; -/// Locked structure for storage manager -struct StorageManagerInner { - /// If we are started up - initialized: bool, - /// Records that have been 'opened' and are not yet closed - opened_records: HashMap, - /// Records that have ever been 'created' or 'opened' by this node, things we care about that we must republish to keep alive - local_record_store: Option>, - /// Records that have been pushed to this node for distribution by other nodes, that we make an effort to republish - remote_record_store: Option>, - /// RPC processor if it is available - rpc_processor: Option, - /// Background processing task (not part of attachment manager tick tree so it happens when detached too) - tick_future: Option>, -} - struct StorageManagerUnlockedInner { config: VeilidConfig, crypto: Crypto, @@ -72,14 +58,7 @@ impl StorageManager { } } fn new_inner() -> StorageManagerInner { - StorageManagerInner { - initialized: false, - opened_records: HashMap::new(), - local_record_store: None, - remote_record_store: None, - rpc_processor: None, - tick_future: None, - } + StorageManagerInner::default() } fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { @@ -266,63 +245,16 @@ impl StorageManager { local_record_store.new_record(dht_key, record).await?; // Open the record - self.open_record_inner(inner, dht_key, Some(owner), safety_selection) + self.open_record_common(inner, dht_key, Some(owner), safety_selection) .await } - fn open_record_inner_check_existing( + async fn open_record_common( &self, mut inner: AsyncMutexGuardArc, key: TypedKey, writer: Option, safety_selection: SafetySelection, - ) -> Option> { - // Get local record store - let Some(local_record_store) = inner.local_record_store.as_mut() else { - return Some(Err(VeilidAPIError::not_initialized())); - }; - - // See if we have a local record already or not - let cb = |r: &mut Record| { - // Process local record - - // Keep the safety selection we opened the record with - r.detail_mut().safety_selection = safety_selection; - - // Return record details - (r.owner().clone(), r.schema()) - }; - let Some((owner, schema)) = local_record_store.with_record_mut(key, cb) else { - return None; - }; - // Had local record - - // If the writer we chose is also the owner, we have the owner secret - // Otherwise this is just another subkey writer - let owner_secret = if let Some(writer) = writer { - if writer.key == owner { - Some(writer.secret) - } else { - None - } - } else { - None - }; - - // Write open record - inner.opened_records.insert(key, OpenedRecord::new(writer)); - - // Make DHT Record Descriptor to return - let descriptor = DHTRecordDescriptor::new(key, owner, owner_secret, schema); - Some(Ok(descriptor)) - } - - async fn open_record_inner( - &self, - inner: AsyncMutexGuardArc, - key: TypedKey, - writer: Option, - safety_selection: SafetySelection, ) -> Result { // Ensure the record is closed if inner.opened_records.contains_key(&key) { @@ -330,25 +262,23 @@ impl StorageManager { } // See if we have a local record already or not - if let Some(res) = - self.open_record_inner_check_existing(inner, key, writer, safety_selection) - { + if let Some(res) = inner.open_record_check_existing(key, writer, safety_selection) { return res; } // No record yet, try to get it from the network // Get rpc processor and drop mutex so we don't block while getting the value from the network - let rpc_processor = { - let inner = self.lock().await?; - let Some(rpc_processor) = inner.rpc_processor.clone() else { - // Offline, try again later - apibail_try_again!(); - }; - rpc_processor + let Some(rpc_processor) = inner.rpc_processor.clone() else { + // Offline, try again later + apibail_try_again!(); }; + // Drop the mutex so we dont block during network access + drop(inner); + // No last descriptor, no last value + // Use the safety selection we opened the record with let subkey: ValueSubkey = 0; let result = self .do_get_value(rpc_processor, key, subkey, None, None, safety_selection) @@ -405,7 +335,9 @@ impl StorageManager { } // Write open record - inner.opened_records.insert(key, OpenedRecord::new(writer)); + inner + .opened_records + .insert(key, OpenedRecord::new(writer, safety_selection)); // Make DHT Record Descriptor to return let descriptor = DHTRecordDescriptor::new(key, owner, owner_secret, schema); @@ -419,24 +351,13 @@ impl StorageManager { safety_selection: SafetySelection, ) -> Result { let inner = self.lock().await?; - self.open_record_inner(inner, key, writer, safety_selection) + self.open_record_common(inner, key, writer, safety_selection) .await } - async fn close_record_inner( - &self, - inner: &mut AsyncMutexGuardArc, - key: TypedKey, - ) -> Result<(), VeilidAPIError> { - let Some(_opened_record) = inner.opened_records.remove(&key) else { - apibail_generic!("record not open"); - }; - Ok(()) - } - pub async fn close_record(&self, key: TypedKey) -> Result<(), VeilidAPIError> { let mut inner = self.lock().await?; - self.close_record_inner(&mut inner, key).await + inner.close_record(key) } pub async fn delete_record(&self, key: TypedKey) -> Result<(), VeilidAPIError> { @@ -444,7 +365,7 @@ impl StorageManager { // Ensure the record is closed if inner.opened_records.contains_key(&key) { - self.close_record_inner(&mut inner, key).await?; + inner.close_record(key)?; } let Some(local_record_store) = inner.local_record_store.as_mut() else { @@ -461,8 +382,60 @@ impl StorageManager { subkey: ValueSubkey, force_refresh: bool, ) -> Result, VeilidAPIError> { - let inner = self.lock().await?; - unimplemented!(); + let mut inner = self.lock().await?; + + // Get rpc processor and drop mutex so we don't block while getting the value from the network + let Some(opened_record) = inner.opened_records.remove(&key) else { + apibail_generic!("record not open"); + }; + + // See if the requested subkey is our local record store + let SubkeyResult { value, descriptor } = inner.handle_get_local_value(key, subkey, true)?; + + // Return the existing value if we have one unless we are forcing a refresh + if !force_refresh { + if let Some(value) = value { + return Ok(Some(value.into_value_data())); + } + } + + // Refresh if we can + let Some(rpc_processor) = inner.rpc_processor.clone() else { + // Offline, try again later + apibail_try_again!(); + }; + + // Drop the lock for network access + drop(inner); + + // May have last descriptor / value + // Use the safety selection we opened the record with + let opt_last_seq = value.as_ref().map(|v| v.value_data().seq()); + let result = self + .do_get_value( + rpc_processor, + key, + subkey, + value, + descriptor, + opened_record.safety_selection(), + ) + .await?; + + // See if we got a value back + let Some(result_value) = result.value else { + // If we got nothing back then we also had nothing beforehand, return nothing + return Ok(None); + }; + + // If we got a new value back then write it to the opened record + if Some(result_value.value_data().seq()) != opt_last_seq { + let mut inner = self.lock().await?; + inner + .handle_set_local_value(key, subkey, result_value.clone()) + .await?; + } + Ok(Some(result_value.into_value_data())) } pub async fn set_value( diff --git a/veilid-core/src/storage_manager/record_store.rs b/veilid-core/src/storage_manager/record_store.rs index dc30b35b..994e7b44 100644 --- a/veilid-core/src/storage_manager/record_store.rs +++ b/veilid-core/src/storage_manager/record_store.rs @@ -30,6 +30,14 @@ where purge_dead_records_mutex: Arc>, } +/// The result of the do_get_value_operation +pub struct SubkeyResult { + /// The subkey value if we got one + pub value: Option, + /// The descriptor if we got a fresh one or empty if no descriptor was needed + pub descriptor: Option, +} + impl RecordStore where D: Clone + RkyvArchive + RkyvSerialize, @@ -297,7 +305,7 @@ where Ok(()) } - pub fn with_record(&mut self, key: TypedKey, f: F) -> Option + pub(super) fn with_record(&mut self, key: TypedKey, f: F) -> Option where F: FnOnce(&Record) -> R, { @@ -318,7 +326,7 @@ where out } - pub fn with_record_mut(&mut self, key: TypedKey, f: F) -> Option + pub(super) fn with_record_mut(&mut self, key: TypedKey, f: F) -> Option where F: FnOnce(&mut Record) -> R, { @@ -339,24 +347,27 @@ where out } - pub async fn get_subkey( + // pub fn get_descriptor(&mut self, key: TypedKey) -> Option { + // self.with_record(key, |record| record.descriptor().clone()) + // } + + pub fn get_subkey( &mut self, key: TypedKey, subkey: ValueSubkey, - ) -> Result, VeilidAPIError> { + want_descriptor: bool, + ) -> Result, VeilidAPIError> { // record from index - let rtk = RecordTableKey { key }; - let subkey_count = { - let Some(record) = self.record_index.get_mut(&rtk) else { - apibail_invalid_argument!("no record at this key", "key", key); - }; - - // Touch - record.touch(get_aligned_timestamp()); - - record.subkey_count() + let Some((subkey_count, opt_descriptor)) = self.with_record(key, |record| { + (record.subkey_count(), if want_descriptor { + Some(record.descriptor().clone()) + } else { + None + }) + }) else { + // Record not available + return Ok(None); }; - self.mark_record_changed(rtk); // Check if the subkey is in range if subkey as usize >= subkey_count { @@ -373,7 +384,10 @@ where if let Some(record_data) = self.subkey_cache.get_mut(&stk) { let out = record_data.signed_value_data().clone(); - return Ok(Some(out)); + return Ok(Some(SubkeyResult { + value: Some(out), + descriptor: opt_descriptor, + })); } // If not in cache, try to pull from table store if let Some(record_data) = subkey_table @@ -385,10 +399,17 @@ where // Add to cache, do nothing with lru out self.add_to_subkey_cache(stk, record_data); - return Ok(Some(out)); + return Ok(Some(SubkeyResult { + value: Some(out), + descriptor: opt_descriptor, + })); }; - return Ok(None); + // Record was available, but subkey was not found, maybe descriptor gets returned + Ok(Some(SubkeyResult { + value: None, + descriptor: opt_descriptor, + })) } pub async fn set_subkey( @@ -403,18 +424,11 @@ where } // Get record from index - let rtk = RecordTableKey { key }; - let (subkey_count, total_size) = { - let Some(record) = self.record_index.get_mut(&rtk) else { - apibail_invalid_argument!("no record at this key", "key", key); - }; - - // Touch - record.touch(get_aligned_timestamp()); - + let Some((subkey_count, total_size)) = self.with_record(key, |record| { (record.subkey_count(), record.total_size()) + }) else { + apibail_invalid_argument!("no record at this key", "key", key); }; - self.mark_record_changed(rtk); // Check if the subkey is in range if subkey as usize >= subkey_count { @@ -474,10 +488,10 @@ where self.add_to_subkey_cache(stk, record_data); // Update record - let Some(record) = self.record_index.get_mut(&rtk) else { - apibail_invalid_argument!("no record at this key", "key", key); - }; - record.set_record_data_size(new_record_data_size); + self.with_record_mut(key, |record| { + record.set_record_data_size(new_record_data_size); + }) + .expect("record should still be here"); Ok(()) } diff --git a/veilid-core/src/storage_manager/storage_manager_inner.rs b/veilid-core/src/storage_manager/storage_manager_inner.rs new file mode 100644 index 00000000..d593d7a3 --- /dev/null +++ b/veilid-core/src/storage_manager/storage_manager_inner.rs @@ -0,0 +1,187 @@ +use super::*; + +/// Locked structure for storage manager +#[derive(Default)] +pub(super) struct StorageManagerInner { + /// If we are started up + pub initialized: bool, + /// Records that have been 'opened' and are not yet closed + pub opened_records: HashMap, + /// Records that have ever been 'created' or 'opened' by this node, things we care about that we must republish to keep alive + pub local_record_store: Option>, + /// Records that have been pushed to this node for distribution by other nodes, that we make an effort to republish + pub remote_record_store: Option>, + /// RPC processor if it is available + pub rpc_processor: Option, + /// Background processing task (not part of attachment manager tick tree so it happens when detached too) + pub tick_future: Option>, +} + +impl StorageManagerInner { + pub fn open_record_check_existing( + &mut self, + key: TypedKey, + writer: Option, + safety_selection: SafetySelection, + ) -> Option> { + // Get local record store + let Some(local_record_store) = self.local_record_store.as_mut() else { + return Some(Err(VeilidAPIError::not_initialized())); + }; + + // See if we have a local record already or not + let cb = |r: &mut Record| { + // Process local record + + // Keep the safety selection we opened the record with + r.detail_mut().safety_selection = safety_selection; + + // Return record details + (r.owner().clone(), r.schema()) + }; + let Some((owner, schema)) = local_record_store.with_record_mut(key, cb) else { + return None; + }; + // Had local record + + // If the writer we chose is also the owner, we have the owner secret + // Otherwise this is just another subkey writer + let owner_secret = if let Some(writer) = writer { + if writer.key == owner { + Some(writer.secret) + } else { + None + } + } else { + None + }; + + // Write open record + self.opened_records + .insert(key, OpenedRecord::new(writer, safety_selection)); + + // Make DHT Record Descriptor to return + let descriptor = DHTRecordDescriptor::new(key, owner, owner_secret, schema); + Some(Ok(descriptor)) + } + + pub async fn new_local_record( + &mut self, + key: TypedKey, + subkey: ValueSubkey, + signed_value_descriptor: SignedValueDescriptor, + signed_value_data: Option, + safety_selection: SafetySelection, + ) -> Result<(), VeilidAPIError> { + // Get local record store + let Some(local_record_store) = self.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + + // Make and store a new record for this descriptor + let record = Record::::new( + get_aligned_timestamp(), + signed_value_descriptor, + LocalRecordDetail { safety_selection }, + )?; + local_record_store.new_record(key, record).await?; + + // If we got a subkey with the getvalue, it has already been validated against the schema, so store it + if let Some(signed_value_data) = signed_value_data { + // Write subkey to local store + local_record_store + .set_subkey(key, subkey, signed_value_data) + .await?; + } + // Write open record + self.opened_records + .insert(key, OpenedRecord::new(writer, safety_selection)); + + Ok(()) + } + + pub fn close_record(&mut self, key: TypedKey) -> Result<(), VeilidAPIError> { + let Some(_opened_record) = self.opened_records.remove(&key) else { + apibail_generic!("record not open"); + }; + Ok(()) + } + + pub fn handle_get_local_value( + &mut self, + key: TypedKey, + subkey: ValueSubkey, + want_descriptor: bool, + ) -> Result { + // See if it's in the local record store + let Some(local_record_store) = self.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + if let Some(subkey_result) = local_record_store.get_subkey(key, subkey, want_descriptor)? { + return Ok(subkey_result); + } + + Ok(SubkeyResult { + value: None, + descriptor: None, + }) + } + + pub async fn handle_set_local_value( + &mut self, + key: TypedKey, + subkey: ValueSubkey, + signed_value_data: SignedValueData, + ) -> Result<(), VeilidAPIError> { + // See if it's in the local record store + let Some(local_record_store) = self.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + + // Write subkey to local store + local_record_store + .set_subkey(key, subkey, signed_value_data) + .await?; + + Ok(()) + } + + pub fn handle_get_remote_value( + &mut self, + key: TypedKey, + subkey: ValueSubkey, + want_descriptor: bool, + ) -> Result { + // See if it's in the remote record store + let Some(remote_record_store) = self.remote_record_store.as_mut() else { + apibail_not_initialized!(); + }; + if let Some(subkey_result) = remote_record_store.get_subkey(key, subkey, want_descriptor)? { + return Ok(subkey_result); + } + + Ok(SubkeyResult { + value: None, + descriptor: None, + }) + } + + pub async fn handle_set_remote_value( + &mut self, + key: TypedKey, + subkey: ValueSubkey, + signed_value_data: SignedValueData, + ) -> Result<(), VeilidAPIError> { + // See if it's in the remote record store + let Some(remote_record_store) = self.remote_record_store.as_mut() else { + apibail_not_initialized!(); + }; + + // Write subkey to remote store + remote_record_store + .set_subkey(key, subkey, signed_value_data) + .await?; + + Ok(()) + } +} diff --git a/veilid-core/src/storage_manager/types/opened_record.rs b/veilid-core/src/storage_manager/types/opened_record.rs index 17424bfa..8f47786c 100644 --- a/veilid-core/src/storage_manager/types/opened_record.rs +++ b/veilid-core/src/storage_manager/types/opened_record.rs @@ -8,14 +8,24 @@ pub struct OpenedRecord { /// Without this, set_value() will fail regardless of which key or subkey is being written to /// as all writes are signed writer: Option, + + /// The safety selection in current use + safety_selection: SafetySelection, } impl OpenedRecord { - pub fn new(writer: Option) -> Self { - Self { writer } + pub fn new(writer: Option, safety_selection: SafetySelection) -> Self { + Self { + writer, + safety_selection, + } } pub fn writer(&self) -> Option<&KeyPair> { self.writer.as_ref() } + + pub fn safety_selection(&self) -> SafetySelection { + self.safety_selection + } } diff --git a/veilid-core/src/storage_manager/types/signed_value_data.rs b/veilid-core/src/storage_manager/types/signed_value_data.rs index 5f2759f7..ca5231e7 100644 --- a/veilid-core/src/storage_manager/types/signed_value_data.rs +++ b/veilid-core/src/storage_manager/types/signed_value_data.rs @@ -61,6 +61,10 @@ impl SignedValueData { &self.value_data } + pub fn into_value_data(self) -> ValueData { + self.value_data + } + pub fn signature(&self) -> &Signature { &self.signature }