From 0aa7cf5ef22c4314ba4e9e3eafbea0a85fd0978a Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Fri, 8 Sep 2023 20:38:31 -0400 Subject: [PATCH 1/6] fanout work --- veilid-core/src/routing_table/find_peers.rs | 15 +- veilid-core/src/routing_table/mod.rs | 14 +- .../src/routing_table/routing_table_inner.rs | 24 +-- .../tasks/peer_minimum_refresh.rs | 2 +- veilid-core/src/rpc_processor/fanout_call.rs | 139 ++++++------------ veilid-core/src/rpc_processor/fanout_queue.rs | 73 +++++++++ veilid-core/src/rpc_processor/mod.rs | 2 + .../src/rpc_processor/rpc_find_node.rs | 2 +- .../src/rpc_processor/rpc_get_value.rs | 2 +- .../src/rpc_processor/rpc_set_value.rs | 2 +- 10 files changed, 146 insertions(+), 129 deletions(-) create mode 100644 veilid-core/src/rpc_processor/fanout_queue.rs diff --git a/veilid-core/src/routing_table/find_peers.rs b/veilid-core/src/routing_table/find_peers.rs index 03b109b3..647dc78b 100644 --- a/veilid-core/src/routing_table/find_peers.rs +++ b/veilid-core/src/routing_table/find_peers.rs @@ -1,8 +1,10 @@ use super::*; impl RoutingTable { - /// Utility to find all closest nodes to a particular key, including possibly our own node and nodes further away from the key than our own, returning their peer info - pub fn find_all_closest_peers( + /// Utility to find the closest nodes to a particular key, preferring reliable nodes first, + /// including possibly our own node and nodes further away from the key than our own, + /// returning their peer info + pub fn find_preferred_closest_peers( &self, key: TypedKey, capabilities: &[Capability], @@ -49,7 +51,7 @@ impl RoutingTable { }; let own_peer_info = self.get_own_peer_info(RoutingDomain::PublicInternet); - let closest_nodes = match self.find_closest_nodes( + let closest_nodes = match self.find_preferred_closest_nodes( node_count, key, filters, @@ -68,9 +70,10 @@ impl RoutingTable { NetworkResult::value(closest_nodes) } - /// Utility to find nodes that are closer to a key than our own node, returning their peer info + /// Utility to find nodes that are closer to a key than our own node, + /// preferring reliable nodes first, and returning their peer info /// Can filter based on a particular set of capabiltiies - pub fn find_peers_closer_to_key( + pub fn find_preferred_peers_closer_to_key( &self, key: TypedKey, required_capabilities: Vec, @@ -126,7 +129,7 @@ impl RoutingTable { }; // - let closest_nodes = match self.find_closest_nodes( + let closest_nodes = match self.find_preferred_closest_nodes( node_count, key, filters, diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index a3917be0..45808558 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -939,7 +939,7 @@ impl RoutingTable { let filters = VecDeque::from([filter]); - self.find_fastest_nodes( + self.find_preferred_fastest_nodes( protocol_types_len * 2 * max_per_type, filters, |_rti, entry: Option>| { @@ -990,7 +990,7 @@ impl RoutingTable { .find_peers_with_sort_and_filter(node_count, cur_ts, filters, compare, transform) } - pub fn find_fastest_nodes<'a, T, O>( + pub fn find_preferred_fastest_nodes<'a, T, O>( &self, node_count: usize, filters: VecDeque, @@ -1001,10 +1001,10 @@ impl RoutingTable { { self.inner .read() - .find_fastest_nodes(node_count, filters, transform) + .find_preferred_fastest_nodes(node_count, filters, transform) } - pub fn find_closest_nodes<'a, T, O>( + pub fn find_preferred_closest_nodes<'a, T, O>( &self, node_count: usize, node_id: TypedKey, @@ -1016,14 +1016,14 @@ impl RoutingTable { { self.inner .read() - .find_closest_nodes(node_count, node_id, filters, transform) + .find_preferred_closest_nodes(node_count, node_id, filters, transform) } pub fn sort_and_clean_closest_noderefs( &self, node_id: TypedKey, - closest_nodes: &mut Vec, - ) { + closest_nodes: &[NodeRef], + ) -> Vec { self.inner .read() .sort_and_clean_closest_noderefs(node_id, closest_nodes) diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index 8ba6cf88..44c001a4 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -963,7 +963,7 @@ impl RoutingTableInner { }) as RoutingTableEntryFilter; filters.push_front(public_node_filter); - self.find_fastest_nodes( + self.find_preferred_fastest_nodes( node_count, filters, |_rti: &RoutingTableInner, v: Option>| { @@ -1062,7 +1062,7 @@ impl RoutingTableInner { out } - pub fn find_fastest_nodes( + pub fn find_preferred_fastest_nodes( &self, node_count: usize, mut filters: VecDeque, @@ -1154,7 +1154,7 @@ impl RoutingTableInner { out } - pub fn find_closest_nodes( + pub fn find_preferred_closest_nodes( &self, node_count: usize, node_id: TypedKey, @@ -1242,8 +1242,8 @@ impl RoutingTableInner { pub fn sort_and_clean_closest_noderefs( &self, node_id: TypedKey, - closest_nodes: &mut Vec, - ) { + closest_nodes: &[NodeRef], + ) -> Vec { // Lock all noderefs let kind = node_id.kind; let mut closest_nodes_locked: Vec = closest_nodes @@ -1263,7 +1263,7 @@ impl RoutingTableInner { closest_nodes_locked.sort_by(sort); // Unlock noderefs - *closest_nodes = closest_nodes_locked.iter().map(|x| x.unlocked()).collect(); + closest_nodes_locked.iter().map(|x| x.unlocked()).collect() } } @@ -1271,7 +1271,6 @@ fn make_closest_noderef_sort( crypto: Crypto, node_id: TypedKey, ) -> impl Fn(&NodeRefLocked, &NodeRefLocked) -> core::cmp::Ordering { - let cur_ts = get_aligned_timestamp(); let kind = node_id.kind; // Get cryptoversion to check distance with let vcrypto = crypto.get(kind).unwrap(); @@ -1282,19 +1281,8 @@ fn make_closest_noderef_sort( return core::cmp::Ordering::Equal; } - // reliable nodes come first, pessimistically treating our own node as unreliable a.operate(|_rti, a_entry| { b.operate(|_rti, b_entry| { - let ra = a_entry.check_reliable(cur_ts); - let rb = b_entry.check_reliable(cur_ts); - if ra != rb { - if ra { - return core::cmp::Ordering::Less; - } else { - return core::cmp::Ordering::Greater; - } - } - // get keys let a_key = a_entry.node_ids().get(kind).unwrap(); let b_key = b_entry.node_ids().get(kind).unwrap(); diff --git a/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs b/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs index f2dc92fc..00581179 100644 --- a/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs +++ b/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs @@ -67,7 +67,7 @@ impl RoutingTable { ) as RoutingTableEntryFilter; filters.push_front(filter); - let noderefs = routing_table.find_fastest_nodes( + let noderefs = routing_table.find_preferred_fastest_nodes( min_peer_count, filters, |_rti, entry: Option>| { diff --git a/veilid-core/src/rpc_processor/fanout_call.rs b/veilid-core/src/rpc_processor/fanout_call.rs index 94dfda66..3f7529ad 100644 --- a/veilid-core/src/rpc_processor/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout_call.rs @@ -4,8 +4,7 @@ struct FanoutContext where R: Unpin, { - closest_nodes: Vec, - called_nodes: HashSet, + fanout_queue: FanoutQueue, result: Option>, } @@ -72,8 +71,7 @@ where check_done: D, ) -> Arc { let context = Mutex::new(FanoutContext { - closest_nodes: Vec::with_capacity(node_count), - called_nodes: HashSet::new(), + fanout_queue: FanoutQueue::new(node_id.kind), result: None, }); @@ -91,82 +89,44 @@ where }) } - fn add_new_nodes(self: Arc, new_nodes: Vec) { - let mut ctx = self.context.lock(); - - for nn in new_nodes { - // Make sure the new node isnt already in the list - let mut dup = false; - for cn in &ctx.closest_nodes { - if cn.same_entry(&nn) { - dup = true; - break; - } - } - if !dup { - // Add the new node if we haven't already called it before (only one call per node ever) - if let Some(key) = nn.node_ids().get(self.crypto_kind) { - if !ctx.called_nodes.contains(&key) { - ctx.closest_nodes.push(nn.clone()); - } - } - } - } - - self.routing_table - .sort_and_clean_closest_noderefs(self.node_id, &mut ctx.closest_nodes); - ctx.closest_nodes.truncate(self.node_count); - } - - fn remove_node(self: Arc, dead_node: NodeRef) { - let mut ctx = self.context.lock(); - for n in 0..ctx.closest_nodes.len() { - let cn = &ctx.closest_nodes[n]; - if cn.same_entry(&dead_node) { - ctx.closest_nodes.remove(n); - break; - } - } - } - - fn get_next_node(self: Arc) -> Option { - let mut next_node = None; - let mut ctx = self.context.lock(); - for cn in ctx.closest_nodes.clone() { - if let Some(key) = cn.node_ids().get(self.crypto_kind) { - if !ctx.called_nodes.contains(&key) { - // New fanout call candidate found - next_node = Some(cn.clone()); - ctx.called_nodes.insert(key); - break; - } - } - } - next_node - } - - fn evaluate_done(self: Arc) -> bool { - let mut ctx = self.context.lock(); - + fn evaluate_done(self: Arc, ctx: &mut FanoutContext) -> bool { // If we have a result, then we're done if ctx.result.is_some() { return true; } // Check for a new done result - ctx.result = (self.check_done)(&ctx.closest_nodes).map(|o| Ok(o)); + ctx.result = (self.check_done)(ctx.fanout_queue.nodes()).map(|o| Ok(o)); ctx.result.is_some() } + fn add_to_fanout_queue(self: Arc, new_nodes: &[NodeRef]) { + let ctx = &mut *self.context.lock(); + let this = self.clone(); + ctx.fanout_queue.add(&new_nodes, |current_nodes| { + let mut current_nodes_vec = this + .routing_table + .sort_and_clean_closest_noderefs(this.node_id, current_nodes); + current_nodes_vec.truncate(self.node_count); + current_nodes_vec + }); + } + async fn fanout_processor(self: Arc) { - // Check to see if we have a result or are done - while !self.clone().evaluate_done() { - // Get the closest node we haven't processed yet - let next_node = self.clone().get_next_node(); + // Loop until we have a result or are done + loop { + // Get the closest node we haven't processed yet if we're not done yet + let next_node = { + let mut ctx = self.context.lock(); + if self.clone().evaluate_done(&mut ctx) { + break; + } + self.context.lock().fanout_queue.next() + }; // If we don't have a node to process, stop fanning out let Some(next_node) = next_node else { - return; + break; }; // Do the call for this node @@ -188,20 +148,18 @@ where .collect(); // Call succeeded - // Register the returned nodes and add them to the closest nodes list in sorted order + // Register the returned nodes and add them to the fanout queue in sorted order let new_nodes = self .routing_table .register_find_node_answer(self.crypto_kind, filtered_v); - self.clone().add_new_nodes(new_nodes); + self.clone().add_to_fanout_queue(&new_nodes); } Ok(None) => { - // Call failed, remove the node so it isn't considered as part of the fanout - self.clone().remove_node(next_node); + // Call failed, node will node be considered again } Err(e) => { // Error happened, abort everything and return the error - let mut ctx = self.context.lock(); - ctx.result = Some(Err(e)); + self.context.lock().result = Some(Err(e)); return; } }; @@ -231,7 +189,7 @@ where return false; } - // Check our node info ilter + // Check our node info filter let node_ids = e.node_ids().to_vec(); if !(node_info_filter)(&node_ids, signed_node_info.node_info()) { return false; @@ -248,12 +206,10 @@ where }; routing_table - .find_closest_nodes(self.node_count, self.node_id, filters, transform) + .find_preferred_closest_nodes(self.node_count, self.node_id, filters, transform) .map_err(RPCError::invalid_format)? }; - - let mut ctx = self.context.lock(); - ctx.closest_nodes = closest_nodes; + self.clone().add_to_fanout_queue(&closest_nodes); Ok(()) } @@ -272,9 +228,11 @@ where } // Do a quick check to see if we're already done - if self.clone().evaluate_done() { + { let mut ctx = self.context.lock(); - return TimeoutOr::value(ctx.result.take().transpose()); + if self.clone().evaluate_done(&mut ctx) { + return TimeoutOr::value(ctx.result.take().transpose()); + } } // If not, do the fanout @@ -287,19 +245,12 @@ where } } // Wait for them to complete - timeout(timeout_ms, async { - while let Some(_) = unord.next().await { - if self.clone().evaluate_done() { - break; - } - } - }) - .await - .into_timeout_or() - .map(|_| { - // Finished, return whatever value we came up with - let mut ctx = self.context.lock(); - ctx.result.take().transpose() - }) + timeout(timeout_ms, async { while unord.next().await.is_some() {} }) + .await + .into_timeout_or() + .map(|_| { + // Finished, return whatever value we came up with + self.context.lock().result.take().transpose() + }) } } diff --git a/veilid-core/src/rpc_processor/fanout_queue.rs b/veilid-core/src/rpc_processor/fanout_queue.rs new file mode 100644 index 00000000..3655012b --- /dev/null +++ b/veilid-core/src/rpc_processor/fanout_queue.rs @@ -0,0 +1,73 @@ +use super::*; + +pub struct FanoutQueue { + crypto_kind: CryptoKind, + current_nodes: VecDeque, + returned_nodes: HashSet, +} + +impl FanoutQueue { + // Create a queue for fanout candidates that have a crypto-kind compatible node id + pub fn new(crypto_kind: CryptoKind) -> Self { + Self { + crypto_kind, + current_nodes: VecDeque::new(), + returned_nodes: HashSet::new(), + } + } + + // Add new nodes to list of fanout candidates + // Run a cleanup routine afterwards to trim down the list of candidates so it doesn't grow too large + pub fn add Vec>( + &mut self, + new_nodes: &[NodeRef], + cleanup: F, + ) { + for nn in new_nodes { + // Ensure the node has a comparable key with our current crypto kind + let Some(key) = nn.node_ids().get(self.crypto_kind) else { + continue; + }; + // Check if we have already done this node before (only one call per node ever) + if self.returned_nodes.contains(&key) { + continue; + } + + // Make sure the new node isnt already in the list + let mut dup = false; + for cn in &self.current_nodes { + if cn.same_entry(nn) { + dup = true; + break; + } + } + if !dup { + // Add the new node + self.current_nodes.push_front(nn.clone()); + } + } + + // Make sure the deque is a single slice + self.current_nodes.make_contiguous(); + + // Sort and trim the candidate set + self.current_nodes = + VecDeque::from_iter(cleanup(self.current_nodes.as_slices().0).iter().cloned()); + } + + // Return next fanout candidate + pub fn next(&mut self) -> Option { + let cn = self.current_nodes.pop_front()?; + self.current_nodes.make_contiguous(); + let key = cn.node_ids().get(self.crypto_kind).unwrap(); + + // Ensure we don't return this node again + self.returned_nodes.insert(key); + Some(cn) + } + + // Get a slice of all the current fanout candidates + pub fn nodes(&self) -> &[NodeRef] { + self.current_nodes.as_slices().0 + } +} diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index b4a06628..8f4baea8 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -1,6 +1,7 @@ mod coders; mod destination; mod fanout_call; +mod fanout_queue; mod operation_waiter; mod rpc_app_call; mod rpc_app_message; @@ -31,6 +32,7 @@ mod rpc_start_tunnel; pub use coders::*; pub use destination::*; pub use fanout_call::*; +pub use fanout_queue::*; pub use operation_waiter::*; pub use rpc_error::*; pub use rpc_status::*; diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index 6bbcf5fa..d205081f 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -105,7 +105,7 @@ impl RPCProcessor { // Get a chunk of the routing table near the requested node id let routing_table = self.routing_table(); let closest_nodes = - network_result_try!(routing_table.find_all_closest_peers(node_id, &capabilities)); + network_result_try!(routing_table.find_preferred_closest_peers(node_id, &capabilities)); // Make FindNode answer let find_node_a = RPCOperationFindNodeA::new(closest_nodes)?; diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index 3895d9cb..4dff0c0d 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -201,7 +201,7 @@ impl RPCProcessor { // Get the nodes that we know about that are closer to the the key than our own node let routing_table = self.routing_table(); - let closer_to_key_peers = network_result_try!(routing_table.find_peers_closer_to_key(key, vec![CAP_DHT])); + let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT])); let debug_string = format!( "IN <=== GetValueQ({} #{}{}) <== {}", diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index 82f9afc4..62ad8da8 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -213,7 +213,7 @@ impl RPCProcessor { // Get the nodes that we know about that are closer to the the key than our own node let routing_table = self.routing_table(); - let closer_to_key_peers = network_result_try!(routing_table.find_peers_closer_to_key(key, vec![CAP_DHT])); + let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT])); let debug_string = format!( "IN <=== SetValueQ({} #{} len={} seq={} writer={}{}) <== {}", From 36957d84f10d18a5b502bb748afd7d66c4b73ee0 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Fri, 8 Sep 2023 20:50:05 -0400 Subject: [PATCH 2/6] fix bug --- veilid-core/src/rpc_processor/fanout_call.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/veilid-core/src/rpc_processor/fanout_call.rs b/veilid-core/src/rpc_processor/fanout_call.rs index 3f7529ad..06656731 100644 --- a/veilid-core/src/rpc_processor/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout_call.rs @@ -121,7 +121,7 @@ where if self.clone().evaluate_done(&mut ctx) { break; } - self.context.lock().fanout_queue.next() + ctx.fanout_queue.next() }; // If we don't have a node to process, stop fanning out From 853976789fc5f545f95d0d4b2eda0cac2cbcc214 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sat, 9 Sep 2023 13:30:48 -0400 Subject: [PATCH 3/6] fix crash and clean up record data size housekeeping --- .../src/storage_manager/limited_size.rs | 3 +- .../src/storage_manager/record_store.rs | 130 ++++++++++++------ .../src/storage_manager/types/record.rs | 4 +- .../src/storage_manager/types/record_data.rs | 6 +- .../types/signed_value_data.rs | 4 + .../src/veilid_api/types/dht/value_data.rs | 4 + 6 files changed, 103 insertions(+), 48 deletions(-) diff --git a/veilid-core/src/storage_manager/limited_size.rs b/veilid-core/src/storage_manager/limited_size.rs index dbb82bec..22a4a93b 100644 --- a/veilid-core/src/storage_manager/limited_size.rs +++ b/veilid-core/src/storage_manager/limited_size.rs @@ -93,13 +93,14 @@ impl LimitedSize { } } log_stor!(debug "Commit ({}): {} => {}", self.description, self.value, uncommitted_value); + self.uncommitted_value = None; self.value = uncommitted_value; } Ok(self.value) } pub fn rollback(&mut self) -> T { - if let Some(uv) = self.uncommitted_value { + if let Some(uv) = self.uncommitted_value.take() { log_stor!(debug "Rollback ({}): {} (drop {})", self.description, self.value, uv); } return self.value; diff --git a/veilid-core/src/storage_manager/record_store.rs b/veilid-core/src/storage_manager/record_store.rs index 1de083f2..8a52d93a 100644 --- a/veilid-core/src/storage_manager/record_store.rs +++ b/veilid-core/src/storage_manager/record_store.rs @@ -7,6 +7,21 @@ use super::*; use hashlink::LruCache; +#[derive(Debug, Clone)] +/// A dead record that is yet to be purged from disk and statistics +struct DeadRecord +where + D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>, +{ + // The key used in the record_index + key: RecordTableKey, + // The actual record + record: Record, + // True if this record is accounted for in the total storage + // and needs to have the statistics updated or not when purged + in_total_storage: bool, +} + pub struct RecordStore where D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>, @@ -15,16 +30,24 @@ where name: String, limits: RecordStoreLimits, + /// The tabledb used for record data record_table: Option, + /// The tabledb used for subkey data subkey_table: Option, + /// The in-memory index that keeps track of what records are in the tabledb record_index: LruCache>, + /// The in-memory cache of commonly accessed subkey data so we don't have to keep hitting the db subkey_cache: LruCache, + /// Total storage space or subkey data inclusive of structures in memory subkey_cache_total_size: LimitedSize, + /// Total storage space of records in the tabledb inclusive of subkey data and structures total_storage_space: LimitedSize, - - dead_records: Vec<(RecordTableKey, Record)>, + /// Records to be removed from the tabledb upon next purge + dead_records: Vec>, + /// The list of records that have changed since last flush to disk (optimization for batched writes) changed_records: HashSet, + /// A mutex to ensure we handle this concurrently purge_dead_records_mutex: Arc>, } @@ -97,33 +120,45 @@ where // Sort the record index by last touched time and insert in sorted order record_index_saved.sort_by(|a, b| a.1.last_touched().cmp(&b.1.last_touched())); - let mut dead_records = Vec::new(); + let mut dead_records = Vec::>::new(); for ri in record_index_saved { // total the storage space self.total_storage_space - .add(mem::size_of::() as u64) - .unwrap(); - self.total_storage_space - .add(ri.1.total_size() as u64) + .add((mem::size_of::() + ri.1.total_size()) as u64) .unwrap(); if let Err(_) = self.total_storage_space.commit() { - // If we overflow the limit, kill off the record - dead_records.push((ri.0, ri.1)); + // Revert the total storage space because the commit failed + self.total_storage_space.rollback(); + + // If we overflow the limit, kill off the record, noting that it has not yet been added to the total storage space + dead_records.push(DeadRecord { + key: ri.0, + record: ri.1, + in_total_storage: false, + }); continue; } // add to index and ensure we deduplicate in the case of an error if let Some(v) = self.record_index.insert_with_callback(ri.0, ri.1, |k, v| { // If the configuration change, we only want to keep the 'limits.max_records' records - dead_records.push((k, v)); + dead_records.push(DeadRecord { + key: k, + record: v, + in_total_storage: true, + }); }) { // This shouldn't happen, but deduplicate anyway log_stor!(warn "duplicate record in table: {:?}", ri.0); - dead_records.push((ri.0, v)); + dead_records.push(DeadRecord { + key: ri.0, + record: v, + in_total_storage: true, + }); } } - for (k, v) in dead_records { - self.add_dead_record(k, v); + for dr in dead_records { + self.dead_records.push(dr); } self.record_table = Some(record_table); @@ -132,7 +167,11 @@ where } fn add_dead_record(&mut self, key: RecordTableKey, record: Record) { - self.dead_records.push((key, record)); + self.dead_records.push(DeadRecord { + key, + record, + in_total_storage: true, + }); } fn mark_record_changed(&mut self, key: RecordTableKey) { @@ -208,23 +247,23 @@ where let rt_xact = record_table.transact(); let st_xact = subkey_table.transact(); let dead_records = mem::take(&mut self.dead_records); - for (k, v) in dead_records { + for dr in dead_records { // Record should already be gone from index - if self.record_index.contains_key(&k) { - log_stor!(error "dead record found in index: {:?}", k); + if self.record_index.contains_key(&dr.key) { + log_stor!(error "dead record found in index: {:?}", dr.key); } // Delete record - if let Err(e) = rt_xact.delete(0, &k.bytes()) { + if let Err(e) = rt_xact.delete(0, &dr.key.bytes()) { log_stor!(error "record could not be deleted: {}", e); } // Delete subkeys - let stored_subkeys = v.stored_subkeys(); + let stored_subkeys = dr.record.stored_subkeys(); for sk in stored_subkeys.iter() { // From table let stk = SubkeyTableKey { - key: k.key, + key: dr.key.key, subkey: sk, }; let stkb = stk.bytes(); @@ -237,11 +276,12 @@ where } // Remove from total size - self.total_storage_space - .saturating_sub(mem::size_of::() as u64); - self.total_storage_space - .saturating_sub(v.total_size() as u64); - self.total_storage_space.commit().unwrap(); + if dr.in_total_storage { + self.total_storage_space.saturating_sub( + (mem::size_of::() + dr.record.total_size()) as u64, + ); + self.total_storage_space.commit().unwrap(); + } } if let Err(e) = rt_xact.commit().await { log_stor!(error "failed to commit record table transaction: {}", e); @@ -306,6 +346,9 @@ where .await .map_err(VeilidAPIError::internal)?; + // Update storage space (won't fail due to check_limit above) + self.total_storage_space.commit().unwrap(); + // Save to record index let mut dead_records = Vec::new(); if let Some(v) = self.record_index.insert_with_callback(rtk, record, |k, v| { @@ -319,9 +362,6 @@ where self.add_dead_record(dr.0, dr.1); } - // Update storage space - self.total_storage_space.commit().unwrap(); - Ok(()) } @@ -407,7 +447,7 @@ where subkey: ValueSubkey, want_descriptor: bool, ) -> VeilidAPIResult> { - // record from index + // Get record from index let Some((subkey_count, has_subkey, opt_descriptor)) = self.with_record(key, |record| { (record.subkey_count(), record.stored_subkeys().contains(subkey), if want_descriptor { Some(record.descriptor().clone()) @@ -545,9 +585,9 @@ where ); } - // Get record from index - let Some((subkey_count, total_size)) = self.with_record(key, |record| { - (record.subkey_count(), record.total_size()) + // Get record subkey count and total size of all record subkey data exclusive of structures + let Some((subkey_count, prior_record_data_size)) = self.with_record(key, |record| { + (record.subkey_count(), record.record_data_size()) }) else { apibail_invalid_argument!("no record at this key", "key", key); }; @@ -563,14 +603,14 @@ where }; // Get the previous subkey and ensure we aren't going over the record size limit - let mut prior_record_data_size = 0usize; + let mut prior_subkey_size = 0usize; // If subkey exists in subkey cache, use that let stk = SubkeyTableKey { key, subkey }; let stk_bytes = stk.bytes(); if let Some(record_data) = self.subkey_cache.peek(&stk) { - prior_record_data_size = record_data.total_size(); + prior_subkey_size = record_data.data_size(); } else { // If not in cache, try to pull from table store if let Some(record_data) = subkey_table @@ -578,26 +618,26 @@ where .await .map_err(VeilidAPIError::internal)? { - prior_record_data_size = record_data.total_size(); + prior_subkey_size = record_data.data_size(); } } // Make new record data - let record_data = RecordData::new(signed_value_data); + let subkey_record_data = RecordData::new(signed_value_data); // Check new total record size - let new_record_data_size = record_data.total_size(); - let new_total_size = total_size + new_record_data_size - prior_record_data_size; - if new_total_size > self.limits.max_record_total_size { + let new_subkey_size = subkey_record_data.data_size(); + let new_record_data_size = prior_record_data_size - prior_subkey_size + new_subkey_size; + if new_record_data_size > self.limits.max_record_total_size { apibail_generic!("dht record too large"); } // Check new total storage space self.total_storage_space - .sub(prior_record_data_size as u64) + .sub(prior_subkey_size as u64) .unwrap(); self.total_storage_space - .add(new_record_data_size as u64) + .add(new_subkey_size as u64) .unwrap(); if !self.total_storage_space.check_limit() { apibail_try_again!(); @@ -605,17 +645,17 @@ where // Write subkey subkey_table - .store_json(0, &stk_bytes, &record_data) + .store_json(0, &stk_bytes, &subkey_record_data) .await .map_err(VeilidAPIError::internal)?; // Write to subkey cache - self.add_to_subkey_cache(stk, record_data); + self.add_to_subkey_cache(stk, subkey_record_data); // Update record self.with_record_mut(key, |record| { record.store_subkey(subkey); - record.set_record_data_size(new_total_size); + record.set_record_data_size(new_record_data_size); }) .expect("record should still be here"); @@ -666,7 +706,7 @@ where out += &format!("Total Storage Space: {}\n", self.total_storage_space.get()); out += &format!("Dead Records: {}\n", self.dead_records.len()); for dr in &self.dead_records { - out += &format!(" {}\n", dr.0.key.to_string()); + out += &format!(" {}\n", dr.key.key.to_string()); } out += &format!("Changed Records: {}\n", self.changed_records.len()); for cr in &self.changed_records { diff --git a/veilid-core/src/storage_manager/types/record.rs b/veilid-core/src/storage_manager/types/record.rs index 76f61bc0..d8ca49ee 100644 --- a/veilid-core/src/storage_manager/types/record.rs +++ b/veilid-core/src/storage_manager/types/record.rs @@ -74,7 +74,9 @@ where } pub fn total_size(&self) -> usize { - mem::size_of::>() + self.descriptor.total_size() + self.record_data_size + (mem::size_of::() - mem::size_of::()) + + self.descriptor.total_size() + + self.record_data_size } // pub fn detail(&self) -> &D { diff --git a/veilid-core/src/storage_manager/types/record_data.rs b/veilid-core/src/storage_manager/types/record_data.rs index 532f4e0b..8b646c7a 100644 --- a/veilid-core/src/storage_manager/types/record_data.rs +++ b/veilid-core/src/storage_manager/types/record_data.rs @@ -12,7 +12,11 @@ impl RecordData { pub fn signed_value_data(&self) -> &SignedValueData { &self.signed_value_data } + pub fn data_size(&self) -> usize { + self.signed_value_data.data_size() + } pub fn total_size(&self) -> usize { - mem::size_of::() + self.signed_value_data.value_data().data().len() + (mem::size_of::() - mem::size_of::()) + + self.signed_value_data.total_size() } } diff --git a/veilid-core/src/storage_manager/types/signed_value_data.rs b/veilid-core/src/storage_manager/types/signed_value_data.rs index 1a2a3cf8..5724f0c3 100644 --- a/veilid-core/src/storage_manager/types/signed_value_data.rs +++ b/veilid-core/src/storage_manager/types/signed_value_data.rs @@ -56,6 +56,10 @@ impl SignedValueData { &self.signature } + pub fn data_size(&self) -> usize { + self.value_data.data_size() + } + pub fn total_size(&self) -> usize { (mem::size_of::() - mem::size_of::()) + self.value_data.total_size() } diff --git a/veilid-core/src/veilid_api/types/dht/value_data.rs b/veilid-core/src/veilid_api/types/dht/value_data.rs index 34d6d8ac..98f73910 100644 --- a/veilid-core/src/veilid_api/types/dht/value_data.rs +++ b/veilid-core/src/veilid_api/types/dht/value_data.rs @@ -56,6 +56,10 @@ impl ValueData { &self.data } + pub fn data_size(&self) -> usize { + self.data.len() + } + pub fn total_size(&self) -> usize { mem::size_of::() + self.data.len() } From 07f92b6e3fe22e9b564d2c95e98a8be49106966a Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sat, 9 Sep 2023 18:35:25 -0400 Subject: [PATCH 4/6] more setvalue fixes and concurrency cleanup --- veilid-core/src/routing_table/mod.rs | 3 + .../routing_table/routing_domain_editor.rs | 11 ++- .../src/routing_table/routing_table_inner.rs | 8 +- veilid-core/src/routing_table/tasks/mod.rs | 27 +++---- .../src/rpc_processor/rpc_get_value.rs | 79 +++++++++++-------- .../src/rpc_processor/rpc_set_value.rs | 53 +++++++------ veilid-flutter/lib/default_config.dart | 6 +- veilid-tools/run_tests.sh | 2 +- veilid-tools/src/async_tag_lock.rs | 31 ++++++++ .../src/tests/common/test_async_tag_lock.rs | 24 ++++++ veilid-tools/src/tools.rs | 13 +++ veilid-tools/tests/web.rs | 1 - 12 files changed, 168 insertions(+), 90 deletions(-) diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 45808558..38b89504 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -54,6 +54,9 @@ const ROUTING_TABLE: &str = "routing_table"; const SERIALIZED_BUCKET_MAP: &[u8] = b"serialized_bucket_map"; const CACHE_VALIDITY_KEY: &[u8] = b"cache_validity_key"; +// Critical sections +const LOCK_TAG_TICK: &str = "TICK"; + pub type LowLevelProtocolPorts = BTreeSet<(LowLevelProtocolType, AddressType, u16)>; pub type ProtocolToPortMapping = BTreeMap<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>; #[derive(Clone, Debug)] diff --git a/veilid-core/src/routing_table/routing_domain_editor.rs b/veilid-core/src/routing_table/routing_domain_editor.rs index 1b22657e..385fe4b5 100644 --- a/veilid-core/src/routing_table/routing_domain_editor.rs +++ b/veilid-core/src/routing_table/routing_domain_editor.rs @@ -129,9 +129,11 @@ impl RoutingDomainEditor { } // Briefly pause routing table ticker while changes are made - if pause_tasks { - self.routing_table.pause_tasks(true).await; - } + let _tick_guard = if pause_tasks { + Some(self.routing_table.pause_tasks().await) + } else { + None + }; // Apply changes let mut changed = false; @@ -262,8 +264,5 @@ impl RoutingDomainEditor { rss.reset(); } } - - // Unpause routing table ticker - self.routing_table.pause_tasks(false).await; } } diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index 44c001a4..6da5cefa 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -2,6 +2,7 @@ use super::*; use weak_table::PtrWeakHashSet; const RECENT_PEERS_TABLE_SIZE: usize = 64; + pub type EntryCounts = BTreeMap<(RoutingDomain, CryptoKind), usize>; ////////////////////////////////////////////////////////////////////////// @@ -34,8 +35,9 @@ pub struct RoutingTableInner { pub(super) recent_peers: LruCache, /// Storage for private/safety RouteSpecs pub(super) route_spec_store: Option, - /// Tick paused or not - pub(super) tick_paused: bool, + /// Async tagged critical sections table + /// Tag: "tick" -> in ticker + pub(super) critical_sections: AsyncTagLockTable<&'static str>, } impl RoutingTableInner { @@ -52,7 +54,7 @@ impl RoutingTableInner { self_transfer_stats: TransferStatsDownUp::default(), recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE), route_spec_store: None, - tick_paused: false, + critical_sections: AsyncTagLockTable::new(), } } diff --git a/veilid-core/src/routing_table/tasks/mod.rs b/veilid-core/src/routing_table/tasks/mod.rs index 25a67cd0..0a599b26 100644 --- a/veilid-core/src/routing_table/tasks/mod.rs +++ b/veilid-core/src/routing_table/tasks/mod.rs @@ -126,9 +126,13 @@ impl RoutingTable { /// to run tick tasks which may run at slower tick rates as configured pub async fn tick(&self) -> EyreResult<()> { // Don't tick if paused - if self.inner.read().tick_paused { + let opt_tick_guard = { + let inner = self.inner.read(); + inner.critical_sections.try_lock_tag(LOCK_TAG_TICK) + }; + let Some(_tick_guard) = opt_tick_guard else { return Ok(()); - } + }; // Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs self.unlocked_inner.rolling_transfers_task.tick().await?; @@ -183,22 +187,9 @@ impl RoutingTable { Ok(()) } - pub(crate) async fn pause_tasks(&self, paused: bool) { - let cancel = { - let mut inner = self.inner.write(); - if !inner.tick_paused && paused { - inner.tick_paused = true; - true - } else if inner.tick_paused && !paused { - inner.tick_paused = false; - false - } else { - false - } - }; - if cancel { - self.cancel_tasks().await; - } + pub(crate) async fn pause_tasks(&self) -> AsyncTagLockGuard<&'static str> { + let critical_sections = self.inner.read().critical_sections.clone(); + critical_sections.lock_tag(LOCK_TAG_TICK).await } pub(crate) async fn cancel_tasks(&self) { diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index 4dff0c0d..caeb894d 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -78,7 +78,7 @@ impl RPCProcessor { log_rpc!(debug "{}", debug_string); let waitable_reply = network_result_try!( - self.question(dest, question, Some(question_context)) + self.question(dest.clone(), question, Some(question_context)) .await? ); @@ -99,29 +99,35 @@ impl RPCProcessor { }; let (value, peers, descriptor) = get_value_a.destructure(); + #[cfg(feature="debug-dht")] + { + let debug_string_value = value.as_ref().map(|v| { + format!(" len={} seq={} writer={}", + v.value_data().data().len(), + v.value_data().seq(), + v.value_data().writer(), + ) + }).unwrap_or_default(); + + let debug_string_answer = format!( + "OUT <== GetValueA({} #{}{}{} peers={}) <= {}", + key, + subkey, + debug_string_value, + if descriptor.is_some() { + " +desc" + } else { + "" + }, + peers.len(), + dest + ); - let debug_string_value = value.as_ref().map(|v| { - format!(" len={} seq={} writer={}", - v.value_data().data().len(), - v.value_data().seq(), - v.value_data().writer(), - ) - }).unwrap_or_default(); - - let debug_string_answer = format!( - "OUT <== GetValueA({} #{}{}{} peers={})", - key, - subkey, - debug_string_value, - if descriptor.is_some() { - " +desc" - } else { - "" - }, - peers.len(), - ); - - log_rpc!(debug "{}", debug_string_answer); + log_rpc!(debug "{}", debug_string_answer); + + let peer_ids:Vec = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect(); + log_rpc!(debug "Peers: {:#?}", peers); + } // Validate peers returned are, in fact, closer to the key than the node we sent this to let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) { @@ -203,19 +209,22 @@ impl RPCProcessor { let routing_table = self.routing_table(); let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT])); - let debug_string = format!( - "IN <=== GetValueQ({} #{}{}) <== {}", - key, - subkey, - if want_descriptor { - " +wantdesc" - } else { - "" - }, - msg.header.direct_sender_node_id() - ); + #[cfg(feature="debug-dht")] + { + let debug_string = format!( + "IN <=== GetValueQ({} #{}{}) <== {}", + key, + subkey, + if want_descriptor { + " +wantdesc" + } else { + "" + }, + msg.header.direct_sender_node_id() + ); - log_rpc!(debug "{}", debug_string); + log_rpc!(debug "{}", debug_string); + } // See if we have this record ourselves let storage_manager = self.storage_manager(); diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index 62ad8da8..cab0d6e6 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -92,7 +92,7 @@ impl RPCProcessor { log_rpc!(debug "{}", debug_string); let waitable_reply = network_result_try!( - self.question(dest, question, Some(question_context)) + self.question(dest.clone(), question, Some(question_context)) .await? ); @@ -114,28 +114,35 @@ impl RPCProcessor { }; let (set, value, peers) = set_value_a.destructure(); - - let debug_string_value = value.as_ref().map(|v| { - format!(" len={} writer={}", - v.value_data().data().len(), - v.value_data().writer(), - ) - }).unwrap_or_default(); - - let debug_string_answer = format!( - "OUT <== SetValueA({} #{}{}{} peers={})", - key, - subkey, - if set { - " +set" - } else { - "" - }, - debug_string_value, - peers.len(), - ); + + #[cfg(feature="debug-dht")] + { + let debug_string_value = value.as_ref().map(|v| { + format!(" len={} writer={}", + v.value_data().data().len(), + v.value_data().writer(), + ) + }).unwrap_or_default(); + - log_rpc!(debug "{}", debug_string_answer); + let debug_string_answer = format!( + "OUT <== SetValueA({} #{}{}{} peers={}) <= {}", + key, + subkey, + if set { + " +set" + } else { + "" + }, + debug_string_value, + peers.len(), + dest, + ); + + log_rpc!(debug "{}", debug_string_answer); + let peer_ids:Vec = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect(); + log_rpc!(debug "Peers: {:#?}", peer_ids); + } // Validate peers returned are, in fact, closer to the key than the node we sent this to let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) { @@ -235,7 +242,7 @@ impl RPCProcessor { // If there are less than 'set_value_count' peers that are closer, then store here too let set_value_count = { let c = self.config.get(); - c.network.dht.set_value_fanout as usize + c.network.dht.set_value_count as usize }; let (set, new_value) = if closer_to_key_peers.len() >= set_value_count { // Not close enough diff --git a/veilid-flutter/lib/default_config.dart b/veilid-flutter/lib/default_config.dart index 660722cd..eac855bc 100644 --- a/veilid-flutter/lib/default_config.dart +++ b/veilid-flutter/lib/default_config.dart @@ -120,16 +120,16 @@ Future getDefaultVeilidConfig(String programName) async { defaultRouteHopCount: 1, ), dht: VeilidConfigDHT( + maxFindNodeCount: 20, resolveNodeTimeoutMs: 10000, resolveNodeCount: 1, resolveNodeFanout: 4, - maxFindNodeCount: 20, getValueTimeoutMs: 10000, getValueCount: 3, getValueFanout: 4, setValueTimeoutMs: 10000, - setValueCount: 4, - setValueFanout: 6, + setValueCount: 5, + setValueFanout: 4, minPeerCount: 20, minPeerRefreshTimeMs: 60000, validateDialInfoReceiptTimeMs: 2000, diff --git a/veilid-tools/run_tests.sh b/veilid-tools/run_tests.sh index be64d808..615f3e63 100755 --- a/veilid-tools/run_tests.sh +++ b/veilid-tools/run_tests.sh @@ -3,7 +3,7 @@ SCRIPTDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" pushd $SCRIPTDIR 2>/dev/null if [[ "$1" == "wasm" ]]; then - WASM_BINDGEN_TEST_TIMEOUT=120 wasm-pack test --firefox --headless --features=rt-wasm-bindgen + WASM_BINDGEN_TEST_TIMEOUT=120 wasm-pack test --firefox --headless --no-default-features --features=rt-wasm-bindgen elif [[ "$1" == "ios" ]]; then SYMROOT=/tmp/testout APPNAME=veilidtools-tests diff --git a/veilid-tools/src/async_tag_lock.rs b/veilid-tools/src/async_tag_lock.rs index 7dcaec02..cf3950b8 100644 --- a/veilid-tools/src/async_tag_lock.rs +++ b/veilid-tools/src/async_tag_lock.rs @@ -135,4 +135,35 @@ where // Return the locked guard AsyncTagLockGuard::new(self.clone(), tag, guard) } + + pub fn try_lock_tag(&self, tag: T) -> Option> { + // Get or create a tag lock entry + let mutex = { + let mut inner = self.inner.lock(); + + // See if this tag is in the table + // and if not, add a new mutex for this tag + let entry = inner + .table + .entry(tag.clone()) + .or_insert_with(|| AsyncTagLockTableEntry { + mutex: Arc::new(AsyncMutex::new(())), + waiters: 0, + }); + + // Increment the number of waiters + entry.waiters += 1; + + // Return the mutex associated with the tag + entry.mutex.clone() + + // Drop the table guard + }; + + // Lock the tag lock + let opt_guard = asyncmutex_try_lock_arc!(mutex); + + // Return the locked guard + opt_guard.map(|guard| AsyncTagLockGuard::new(self.clone(), tag, guard)) + } } diff --git a/veilid-tools/src/tests/common/test_async_tag_lock.rs b/veilid-tools/src/tests/common/test_async_tag_lock.rs index c1b2ecc4..caef3001 100644 --- a/veilid-tools/src/tests/common/test_async_tag_lock.rs +++ b/veilid-tools/src/tests/common/test_async_tag_lock.rs @@ -55,6 +55,29 @@ pub async fn test_simple_single_contention() { assert_eq!(table.len(), 1); } +pub async fn test_simple_try() { + info!("test_simple_try"); + + let table = AsyncTagLockTable::new(); + + let a1 = SocketAddr::new("1.2.3.4".parse().unwrap(), 1234); + let a2 = SocketAddr::new("1.2.3.5".parse().unwrap(), 1235); + + { + let _g1 = table.lock_tag(a1).await; + + let opt_g2 = table.try_lock_tag(a1); + let opt_g3 = table.try_lock_tag(a2); + + assert!(opt_g2.is_none()); + assert!(opt_g3.is_some()); + } + let opt_g4 = table.try_lock_tag(a1); + assert!(opt_g4.is_some()); + + assert_eq!(table.len(), 1); +} + pub async fn test_simple_double_contention() { info!("test_simple_double_contention"); @@ -153,6 +176,7 @@ pub async fn test_parallel_single_contention() { pub async fn test_all() { test_simple_no_contention().await; + test_simple_try().await; test_simple_single_contention().await; test_parallel_single_contention().await; } diff --git a/veilid-tools/src/tools.rs b/veilid-tools/src/tools.rs index 92ce1444..76d0a2d8 100644 --- a/veilid-tools/src/tools.rs +++ b/veilid-tools/src/tools.rs @@ -47,6 +47,13 @@ cfg_if::cfg_if! { $x.clone().lock_owned().await }; } + + #[macro_export] + macro_rules! asyncmutex_try_lock_arc { + ($x:expr) => { + $x.try_lock_owned().ok() + }; + } } else { #[macro_export] macro_rules! asyncmutex_try_lock { @@ -60,6 +67,12 @@ cfg_if::cfg_if! { $x.lock_arc().await }; } + #[macro_export] + macro_rules! asyncmutex_try_lock_arc { + ($x:expr) => { + $x.try_lock_arc() + }; + } } } diff --git a/veilid-tools/tests/web.rs b/veilid-tools/tests/web.rs index e3997307..af72b762 100644 --- a/veilid-tools/tests/web.rs +++ b/veilid-tools/tests/web.rs @@ -4,7 +4,6 @@ use cfg_if::*; use parking_lot::Once; use veilid_tools::tests::*; -use veilid_tools::*; use wasm_bindgen_test::*; From 60a7e907120c144925c5e406e814a6c91cae44ff Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sat, 9 Sep 2023 21:44:16 -0400 Subject: [PATCH 5/6] dht fixes --- .../src/rpc_processor/rpc_get_value.rs | 4 +- .../src/rpc_processor/rpc_set_value.rs | 3 +- veilid-core/src/storage_manager/get_value.rs | 12 ++- veilid-core/src/storage_manager/set_value.rs | 45 ++++++++--- veilid-tools/src/async_tag_lock.rs | 77 ++++++++----------- veilid-tools/src/tools.rs | 2 +- 6 files changed, 82 insertions(+), 61 deletions(-) diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index caeb894d..b6137796 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -126,7 +126,7 @@ impl RPCProcessor { log_rpc!(debug "{}", debug_string_answer); let peer_ids:Vec = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect(); - log_rpc!(debug "Peers: {:#?}", peers); + log_rpc!(debug "Peers: {:#?}", peer_ids); } // Validate peers returned are, in fact, closer to the key than the node we sent this to @@ -224,7 +224,7 @@ impl RPCProcessor { ); log_rpc!(debug "{}", debug_string); - } + } // See if we have this record ourselves let storage_manager = self.storage_manager(); diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index cab0d6e6..48f7563b 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -114,7 +114,7 @@ impl RPCProcessor { }; let (set, value, peers) = set_value_a.destructure(); - + #[cfg(feature="debug-dht")] { let debug_string_value = value.as_ref().map(|v| { @@ -140,6 +140,7 @@ impl RPCProcessor { ); log_rpc!(debug "{}", debug_string_answer); + let peer_ids:Vec = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect(); log_rpc!(debug "Peers: {:#?}", peer_ids); } diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index 8092b1d0..ebf8969d 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -176,9 +176,13 @@ impl StorageManager { } // If we finished with consensus (enough nodes returning the same value) TimeoutOr::Value(Ok(Some(()))) => { - log_stor!(debug "GetValue Fanout Consensus"); // Return the best answer we've got let ctx = context.lock(); + if ctx.value_count >= consensus_count { + log_stor!(debug "GetValue Fanout Consensus"); + } else { + log_stor!(debug "GetValue Fanout Non-Consensus: {}", ctx.value_count); + } Ok(SubkeyResult { value: ctx.value.clone(), descriptor: ctx.descriptor.clone(), @@ -188,7 +192,11 @@ impl StorageManager { TimeoutOr::Value(Ok(None)) => { // Return the best answer we've got let ctx = context.lock(); - log_stor!(debug "GetValue Fanout No Consensus: {}", ctx.value_count); + if ctx.value_count >= consensus_count { + log_stor!(debug "GetValue Fanout Exhausted Consensus"); + } else { + log_stor!(debug "GetValue Fanout Exhausted Non-Consensus: {}", ctx.value_count); + } Ok(SubkeyResult { value: ctx.value.clone(), descriptor: ctx.descriptor.clone(), diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index 17f65448..e1b4be01 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -5,7 +5,9 @@ struct OutboundSetValueContext { /// The latest value of the subkey, may be the value passed in pub value: SignedValueData, /// The consensus count for the value we have received - pub value_count: usize, + pub set_count: usize, + /// The number of non-sets since the last set we have received + pub missed_since_last_set: usize, /// The parsed schema from the descriptor if we have one pub schema: DHTSchema, } @@ -38,7 +40,8 @@ impl StorageManager { let schema = descriptor.schema()?; let context = Arc::new(Mutex::new(OutboundSetValueContext { value, - value_count: 0, + set_count: 0, + missed_since_last_set: 0, schema, })); @@ -98,7 +101,8 @@ impl StorageManager { // If the sequence number is greater, keep it ctx.value = value; // One node has shown us this value so far - ctx.value_count = 1; + ctx.set_count = 1; + ctx.missed_since_last_set = 0; } else { // If the sequence number is older, or an equal sequence number, // node should have not returned a value here. @@ -108,8 +112,12 @@ impl StorageManager { } else { // It was set on this node and no newer value was found and returned, // so increase our consensus count - ctx.value_count += 1; + ctx.set_count += 1; + ctx.missed_since_last_set = 0; } + } else { + let mut ctx = context.lock(); + ctx.missed_since_last_set += 1; } // Return peers if we have some @@ -122,9 +130,18 @@ impl StorageManager { // Routine to call to check if we're done at each step let check_done = |_closest_nodes: &[NodeRef]| { - // If we have reached sufficient consensus, return done let ctx = context.lock(); - if ctx.value_count >= consensus_count { + + // If we have reached sufficient consensus, return done + if ctx.set_count >= consensus_count { + return Some(()); + } + // If we have missed more than our consensus count since our last set, return done + // This keeps the traversal from searching too many nodes when we aren't converging + // Only do this if we have gotten at least half our desired sets. + if ctx.set_count >= ((consensus_count + 1) / 2) + && ctx.missed_since_last_set >= consensus_count + { return Some(()); } None @@ -150,18 +167,26 @@ impl StorageManager { let ctx = context.lock(); Ok(ctx.value.clone()) } - // If we finished with consensus (enough nodes returning the same value) + // If we finished with or without consensus (enough nodes returning the same value) TimeoutOr::Value(Ok(Some(()))) => { - log_stor!(debug "SetValue Fanout Consensus"); // Return the best answer we've got let ctx = context.lock(); + if ctx.set_count >= consensus_count { + log_stor!(debug "SetValue Fanout Consensus"); + } else { + log_stor!(debug "SetValue Fanout Non-Consensus: {}", ctx.set_count); + } Ok(ctx.value.clone()) } - // If we finished without consensus (ran out of nodes before getting consensus) + // If we ran out of nodes before getting consensus) TimeoutOr::Value(Ok(None)) => { // Return the best answer we've got let ctx = context.lock(); - log_stor!(debug "SetValue Fanout No Consensus: {}", ctx.value_count); + if ctx.set_count >= consensus_count { + log_stor!(debug "SetValue Fanout Exhausted Consensus"); + } else { + log_stor!(debug "SetValue Fanout Exhausted Non-Consensus: {}", ctx.set_count); + } Ok(ctx.value.clone()) } // Failed diff --git a/veilid-tools/src/async_tag_lock.rs b/veilid-tools/src/async_tag_lock.rs index cf3950b8..ec9e8ec3 100644 --- a/veilid-tools/src/async_tag_lock.rs +++ b/veilid-tools/src/async_tag_lock.rs @@ -33,16 +33,16 @@ where fn drop(&mut self) { let mut inner = self.table.inner.lock(); // Inform the table we're dropping this guard - let waiters = { + let guards = { // Get the table entry, it must exist since we have a guard locked let entry = inner.table.get_mut(&self.tag).unwrap(); - // Decrement the number of waiters - entry.waiters -= 1; - // Return the number of waiters left - entry.waiters + // Decrement the number of guards + entry.guards -= 1; + // Return the number of guards left + entry.guards }; - // If there are no waiters left, we remove the tag from the table - if waiters == 0 { + // If there are no guards left, we remove the tag from the table + if guards == 0 { inner.table.remove(&self.tag).unwrap(); } // Proceed with releasing _guard, which may cause some concurrent tag lock to acquire @@ -52,7 +52,7 @@ where #[derive(Clone, Debug)] struct AsyncTagLockTableEntry { mutex: Arc>, - waiters: usize, + guards: usize, } struct AsyncTagLockTableInner @@ -108,11 +108,11 @@ where .entry(tag.clone()) .or_insert_with(|| AsyncTagLockTableEntry { mutex: Arc::new(AsyncMutex::new(())), - waiters: 0, + guards: 0, }); - // Increment the number of waiters - entry.waiters += 1; + // Increment the number of guards + entry.guards += 1; // Return the mutex associated with the tag entry.mutex.clone() @@ -121,16 +121,7 @@ where }; // Lock the tag lock - let guard; - cfg_if! { - if #[cfg(feature="rt-tokio")] { - // tokio version - guard = mutex.lock_owned().await; - } else { - // async-std and wasm async-lock version - guard = mutex.lock_arc().await; - } - } + let guard = asyncmutex_lock_arc!(mutex); // Return the locked guard AsyncTagLockGuard::new(self.clone(), tag, guard) @@ -138,32 +129,28 @@ where pub fn try_lock_tag(&self, tag: T) -> Option> { // Get or create a tag lock entry - let mutex = { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock(); - // See if this tag is in the table - // and if not, add a new mutex for this tag - let entry = inner - .table - .entry(tag.clone()) - .or_insert_with(|| AsyncTagLockTableEntry { - mutex: Arc::new(AsyncMutex::new(())), - waiters: 0, - }); - - // Increment the number of waiters - entry.waiters += 1; - - // Return the mutex associated with the tag - entry.mutex.clone() - - // Drop the table guard - }; + // See if this tag is in the table + // and if not, add a new mutex for this tag + let entry = inner.table.entry(tag.clone()); // Lock the tag lock - let opt_guard = asyncmutex_try_lock_arc!(mutex); - - // Return the locked guard - opt_guard.map(|guard| AsyncTagLockGuard::new(self.clone(), tag, guard)) + let guard = match entry { + std::collections::hash_map::Entry::Occupied(mut o) => { + let e = o.get_mut(); + let guard = asyncmutex_try_lock_arc!(e.mutex)?; + e.guards += 1; + guard + } + std::collections::hash_map::Entry::Vacant(v) => { + let mutex = Arc::new(AsyncMutex::new(())); + let guard = asyncmutex_try_lock_arc!(mutex)?; + v.insert(AsyncTagLockTableEntry { mutex, guards: 1 }); + guard + } + }; + // Return guard + Some(AsyncTagLockGuard::new(self.clone(), tag, guard)) } } diff --git a/veilid-tools/src/tools.rs b/veilid-tools/src/tools.rs index 76d0a2d8..648da9b2 100644 --- a/veilid-tools/src/tools.rs +++ b/veilid-tools/src/tools.rs @@ -51,7 +51,7 @@ cfg_if::cfg_if! { #[macro_export] macro_rules! asyncmutex_try_lock_arc { ($x:expr) => { - $x.try_lock_owned().ok() + $x.clone().try_lock_owned().ok() }; } } else { From 2ff2ab7aa83dbcafb03e99775b614a2e8b39e8e1 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sat, 9 Sep 2023 22:34:42 -0400 Subject: [PATCH 6/6] debugging for public address --- veilid-core/src/network_manager/native/discovery_context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/veilid-core/src/network_manager/native/discovery_context.rs b/veilid-core/src/network_manager/native/discovery_context.rs index b4175e28..5cc70cf4 100644 --- a/veilid-core/src/network_manager/native/discovery_context.rs +++ b/veilid-core/src/network_manager/native/discovery_context.rs @@ -116,7 +116,7 @@ impl DiscoveryContext { ); log_net!( - "request_public_address {:?}: Value({:?})", + debug "request_public_address {:?}: Value({:?})", node_ref, res.answer );