diff --git a/veilid-core/src/routing_table/find_peers.rs b/veilid-core/src/routing_table/find_peers.rs index 03b109b3..647dc78b 100644 --- a/veilid-core/src/routing_table/find_peers.rs +++ b/veilid-core/src/routing_table/find_peers.rs @@ -1,8 +1,10 @@ use super::*; impl RoutingTable { - /// Utility to find all closest nodes to a particular key, including possibly our own node and nodes further away from the key than our own, returning their peer info - pub fn find_all_closest_peers( + /// Utility to find the closest nodes to a particular key, preferring reliable nodes first, + /// including possibly our own node and nodes further away from the key than our own, + /// returning their peer info + pub fn find_preferred_closest_peers( &self, key: TypedKey, capabilities: &[Capability], @@ -49,7 +51,7 @@ impl RoutingTable { }; let own_peer_info = self.get_own_peer_info(RoutingDomain::PublicInternet); - let closest_nodes = match self.find_closest_nodes( + let closest_nodes = match self.find_preferred_closest_nodes( node_count, key, filters, @@ -68,9 +70,10 @@ impl RoutingTable { NetworkResult::value(closest_nodes) } - /// Utility to find nodes that are closer to a key than our own node, returning their peer info + /// Utility to find nodes that are closer to a key than our own node, + /// preferring reliable nodes first, and returning their peer info /// Can filter based on a particular set of capabiltiies - pub fn find_peers_closer_to_key( + pub fn find_preferred_peers_closer_to_key( &self, key: TypedKey, required_capabilities: Vec, @@ -126,7 +129,7 @@ impl RoutingTable { }; // - let closest_nodes = match self.find_closest_nodes( + let closest_nodes = match self.find_preferred_closest_nodes( node_count, key, filters, diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index a3917be0..45808558 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -939,7 +939,7 @@ impl RoutingTable { let filters = VecDeque::from([filter]); - self.find_fastest_nodes( + self.find_preferred_fastest_nodes( protocol_types_len * 2 * max_per_type, filters, |_rti, entry: Option>| { @@ -990,7 +990,7 @@ impl RoutingTable { .find_peers_with_sort_and_filter(node_count, cur_ts, filters, compare, transform) } - pub fn find_fastest_nodes<'a, T, O>( + pub fn find_preferred_fastest_nodes<'a, T, O>( &self, node_count: usize, filters: VecDeque, @@ -1001,10 +1001,10 @@ impl RoutingTable { { self.inner .read() - .find_fastest_nodes(node_count, filters, transform) + .find_preferred_fastest_nodes(node_count, filters, transform) } - pub fn find_closest_nodes<'a, T, O>( + pub fn find_preferred_closest_nodes<'a, T, O>( &self, node_count: usize, node_id: TypedKey, @@ -1016,14 +1016,14 @@ impl RoutingTable { { self.inner .read() - .find_closest_nodes(node_count, node_id, filters, transform) + .find_preferred_closest_nodes(node_count, node_id, filters, transform) } pub fn sort_and_clean_closest_noderefs( &self, node_id: TypedKey, - closest_nodes: &mut Vec, - ) { + closest_nodes: &[NodeRef], + ) -> Vec { self.inner .read() .sort_and_clean_closest_noderefs(node_id, closest_nodes) diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index 8ba6cf88..44c001a4 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -963,7 +963,7 @@ impl RoutingTableInner { }) as RoutingTableEntryFilter; filters.push_front(public_node_filter); - self.find_fastest_nodes( + self.find_preferred_fastest_nodes( node_count, filters, |_rti: &RoutingTableInner, v: Option>| { @@ -1062,7 +1062,7 @@ impl RoutingTableInner { out } - pub fn find_fastest_nodes( + pub fn find_preferred_fastest_nodes( &self, node_count: usize, mut filters: VecDeque, @@ -1154,7 +1154,7 @@ impl RoutingTableInner { out } - pub fn find_closest_nodes( + pub fn find_preferred_closest_nodes( &self, node_count: usize, node_id: TypedKey, @@ -1242,8 +1242,8 @@ impl RoutingTableInner { pub fn sort_and_clean_closest_noderefs( &self, node_id: TypedKey, - closest_nodes: &mut Vec, - ) { + closest_nodes: &[NodeRef], + ) -> Vec { // Lock all noderefs let kind = node_id.kind; let mut closest_nodes_locked: Vec = closest_nodes @@ -1263,7 +1263,7 @@ impl RoutingTableInner { closest_nodes_locked.sort_by(sort); // Unlock noderefs - *closest_nodes = closest_nodes_locked.iter().map(|x| x.unlocked()).collect(); + closest_nodes_locked.iter().map(|x| x.unlocked()).collect() } } @@ -1271,7 +1271,6 @@ fn make_closest_noderef_sort( crypto: Crypto, node_id: TypedKey, ) -> impl Fn(&NodeRefLocked, &NodeRefLocked) -> core::cmp::Ordering { - let cur_ts = get_aligned_timestamp(); let kind = node_id.kind; // Get cryptoversion to check distance with let vcrypto = crypto.get(kind).unwrap(); @@ -1282,19 +1281,8 @@ fn make_closest_noderef_sort( return core::cmp::Ordering::Equal; } - // reliable nodes come first, pessimistically treating our own node as unreliable a.operate(|_rti, a_entry| { b.operate(|_rti, b_entry| { - let ra = a_entry.check_reliable(cur_ts); - let rb = b_entry.check_reliable(cur_ts); - if ra != rb { - if ra { - return core::cmp::Ordering::Less; - } else { - return core::cmp::Ordering::Greater; - } - } - // get keys let a_key = a_entry.node_ids().get(kind).unwrap(); let b_key = b_entry.node_ids().get(kind).unwrap(); diff --git a/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs b/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs index f2dc92fc..00581179 100644 --- a/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs +++ b/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs @@ -67,7 +67,7 @@ impl RoutingTable { ) as RoutingTableEntryFilter; filters.push_front(filter); - let noderefs = routing_table.find_fastest_nodes( + let noderefs = routing_table.find_preferred_fastest_nodes( min_peer_count, filters, |_rti, entry: Option>| { diff --git a/veilid-core/src/rpc_processor/fanout_call.rs b/veilid-core/src/rpc_processor/fanout_call.rs index 94dfda66..3f7529ad 100644 --- a/veilid-core/src/rpc_processor/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout_call.rs @@ -4,8 +4,7 @@ struct FanoutContext where R: Unpin, { - closest_nodes: Vec, - called_nodes: HashSet, + fanout_queue: FanoutQueue, result: Option>, } @@ -72,8 +71,7 @@ where check_done: D, ) -> Arc { let context = Mutex::new(FanoutContext { - closest_nodes: Vec::with_capacity(node_count), - called_nodes: HashSet::new(), + fanout_queue: FanoutQueue::new(node_id.kind), result: None, }); @@ -91,82 +89,44 @@ where }) } - fn add_new_nodes(self: Arc, new_nodes: Vec) { - let mut ctx = self.context.lock(); - - for nn in new_nodes { - // Make sure the new node isnt already in the list - let mut dup = false; - for cn in &ctx.closest_nodes { - if cn.same_entry(&nn) { - dup = true; - break; - } - } - if !dup { - // Add the new node if we haven't already called it before (only one call per node ever) - if let Some(key) = nn.node_ids().get(self.crypto_kind) { - if !ctx.called_nodes.contains(&key) { - ctx.closest_nodes.push(nn.clone()); - } - } - } - } - - self.routing_table - .sort_and_clean_closest_noderefs(self.node_id, &mut ctx.closest_nodes); - ctx.closest_nodes.truncate(self.node_count); - } - - fn remove_node(self: Arc, dead_node: NodeRef) { - let mut ctx = self.context.lock(); - for n in 0..ctx.closest_nodes.len() { - let cn = &ctx.closest_nodes[n]; - if cn.same_entry(&dead_node) { - ctx.closest_nodes.remove(n); - break; - } - } - } - - fn get_next_node(self: Arc) -> Option { - let mut next_node = None; - let mut ctx = self.context.lock(); - for cn in ctx.closest_nodes.clone() { - if let Some(key) = cn.node_ids().get(self.crypto_kind) { - if !ctx.called_nodes.contains(&key) { - // New fanout call candidate found - next_node = Some(cn.clone()); - ctx.called_nodes.insert(key); - break; - } - } - } - next_node - } - - fn evaluate_done(self: Arc) -> bool { - let mut ctx = self.context.lock(); - + fn evaluate_done(self: Arc, ctx: &mut FanoutContext) -> bool { // If we have a result, then we're done if ctx.result.is_some() { return true; } // Check for a new done result - ctx.result = (self.check_done)(&ctx.closest_nodes).map(|o| Ok(o)); + ctx.result = (self.check_done)(ctx.fanout_queue.nodes()).map(|o| Ok(o)); ctx.result.is_some() } + fn add_to_fanout_queue(self: Arc, new_nodes: &[NodeRef]) { + let ctx = &mut *self.context.lock(); + let this = self.clone(); + ctx.fanout_queue.add(&new_nodes, |current_nodes| { + let mut current_nodes_vec = this + .routing_table + .sort_and_clean_closest_noderefs(this.node_id, current_nodes); + current_nodes_vec.truncate(self.node_count); + current_nodes_vec + }); + } + async fn fanout_processor(self: Arc) { - // Check to see if we have a result or are done - while !self.clone().evaluate_done() { - // Get the closest node we haven't processed yet - let next_node = self.clone().get_next_node(); + // Loop until we have a result or are done + loop { + // Get the closest node we haven't processed yet if we're not done yet + let next_node = { + let mut ctx = self.context.lock(); + if self.clone().evaluate_done(&mut ctx) { + break; + } + self.context.lock().fanout_queue.next() + }; // If we don't have a node to process, stop fanning out let Some(next_node) = next_node else { - return; + break; }; // Do the call for this node @@ -188,20 +148,18 @@ where .collect(); // Call succeeded - // Register the returned nodes and add them to the closest nodes list in sorted order + // Register the returned nodes and add them to the fanout queue in sorted order let new_nodes = self .routing_table .register_find_node_answer(self.crypto_kind, filtered_v); - self.clone().add_new_nodes(new_nodes); + self.clone().add_to_fanout_queue(&new_nodes); } Ok(None) => { - // Call failed, remove the node so it isn't considered as part of the fanout - self.clone().remove_node(next_node); + // Call failed, node will node be considered again } Err(e) => { // Error happened, abort everything and return the error - let mut ctx = self.context.lock(); - ctx.result = Some(Err(e)); + self.context.lock().result = Some(Err(e)); return; } }; @@ -231,7 +189,7 @@ where return false; } - // Check our node info ilter + // Check our node info filter let node_ids = e.node_ids().to_vec(); if !(node_info_filter)(&node_ids, signed_node_info.node_info()) { return false; @@ -248,12 +206,10 @@ where }; routing_table - .find_closest_nodes(self.node_count, self.node_id, filters, transform) + .find_preferred_closest_nodes(self.node_count, self.node_id, filters, transform) .map_err(RPCError::invalid_format)? }; - - let mut ctx = self.context.lock(); - ctx.closest_nodes = closest_nodes; + self.clone().add_to_fanout_queue(&closest_nodes); Ok(()) } @@ -272,9 +228,11 @@ where } // Do a quick check to see if we're already done - if self.clone().evaluate_done() { + { let mut ctx = self.context.lock(); - return TimeoutOr::value(ctx.result.take().transpose()); + if self.clone().evaluate_done(&mut ctx) { + return TimeoutOr::value(ctx.result.take().transpose()); + } } // If not, do the fanout @@ -287,19 +245,12 @@ where } } // Wait for them to complete - timeout(timeout_ms, async { - while let Some(_) = unord.next().await { - if self.clone().evaluate_done() { - break; - } - } - }) - .await - .into_timeout_or() - .map(|_| { - // Finished, return whatever value we came up with - let mut ctx = self.context.lock(); - ctx.result.take().transpose() - }) + timeout(timeout_ms, async { while unord.next().await.is_some() {} }) + .await + .into_timeout_or() + .map(|_| { + // Finished, return whatever value we came up with + self.context.lock().result.take().transpose() + }) } } diff --git a/veilid-core/src/rpc_processor/fanout_queue.rs b/veilid-core/src/rpc_processor/fanout_queue.rs new file mode 100644 index 00000000..3655012b --- /dev/null +++ b/veilid-core/src/rpc_processor/fanout_queue.rs @@ -0,0 +1,73 @@ +use super::*; + +pub struct FanoutQueue { + crypto_kind: CryptoKind, + current_nodes: VecDeque, + returned_nodes: HashSet, +} + +impl FanoutQueue { + // Create a queue for fanout candidates that have a crypto-kind compatible node id + pub fn new(crypto_kind: CryptoKind) -> Self { + Self { + crypto_kind, + current_nodes: VecDeque::new(), + returned_nodes: HashSet::new(), + } + } + + // Add new nodes to list of fanout candidates + // Run a cleanup routine afterwards to trim down the list of candidates so it doesn't grow too large + pub fn add Vec>( + &mut self, + new_nodes: &[NodeRef], + cleanup: F, + ) { + for nn in new_nodes { + // Ensure the node has a comparable key with our current crypto kind + let Some(key) = nn.node_ids().get(self.crypto_kind) else { + continue; + }; + // Check if we have already done this node before (only one call per node ever) + if self.returned_nodes.contains(&key) { + continue; + } + + // Make sure the new node isnt already in the list + let mut dup = false; + for cn in &self.current_nodes { + if cn.same_entry(nn) { + dup = true; + break; + } + } + if !dup { + // Add the new node + self.current_nodes.push_front(nn.clone()); + } + } + + // Make sure the deque is a single slice + self.current_nodes.make_contiguous(); + + // Sort and trim the candidate set + self.current_nodes = + VecDeque::from_iter(cleanup(self.current_nodes.as_slices().0).iter().cloned()); + } + + // Return next fanout candidate + pub fn next(&mut self) -> Option { + let cn = self.current_nodes.pop_front()?; + self.current_nodes.make_contiguous(); + let key = cn.node_ids().get(self.crypto_kind).unwrap(); + + // Ensure we don't return this node again + self.returned_nodes.insert(key); + Some(cn) + } + + // Get a slice of all the current fanout candidates + pub fn nodes(&self) -> &[NodeRef] { + self.current_nodes.as_slices().0 + } +} diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index b4a06628..8f4baea8 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -1,6 +1,7 @@ mod coders; mod destination; mod fanout_call; +mod fanout_queue; mod operation_waiter; mod rpc_app_call; mod rpc_app_message; @@ -31,6 +32,7 @@ mod rpc_start_tunnel; pub use coders::*; pub use destination::*; pub use fanout_call::*; +pub use fanout_queue::*; pub use operation_waiter::*; pub use rpc_error::*; pub use rpc_status::*; diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index 6bbcf5fa..d205081f 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -105,7 +105,7 @@ impl RPCProcessor { // Get a chunk of the routing table near the requested node id let routing_table = self.routing_table(); let closest_nodes = - network_result_try!(routing_table.find_all_closest_peers(node_id, &capabilities)); + network_result_try!(routing_table.find_preferred_closest_peers(node_id, &capabilities)); // Make FindNode answer let find_node_a = RPCOperationFindNodeA::new(closest_nodes)?; diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index 3895d9cb..4dff0c0d 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -201,7 +201,7 @@ impl RPCProcessor { // Get the nodes that we know about that are closer to the the key than our own node let routing_table = self.routing_table(); - let closer_to_key_peers = network_result_try!(routing_table.find_peers_closer_to_key(key, vec![CAP_DHT])); + let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT])); let debug_string = format!( "IN <=== GetValueQ({} #{}{}) <== {}", diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index 82f9afc4..62ad8da8 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -213,7 +213,7 @@ impl RPCProcessor { // Get the nodes that we know about that are closer to the the key than our own node let routing_table = self.routing_table(); - let closer_to_key_peers = network_result_try!(routing_table.find_peers_closer_to_key(key, vec![CAP_DHT])); + let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT])); let debug_string = format!( "IN <=== SetValueQ({} #{} len={} seq={} writer={}{}) <== {}",