diff --git a/veilid-core/src/crypto/types/crypto_typed_set.rs b/veilid-core/src/crypto/types/crypto_typed_set.rs index 3906d559..b8e17bf6 100644 --- a/veilid-core/src/crypto/types/crypto_typed_set.rs +++ b/veilid-core/src/crypto/types/crypto_typed_set.rs @@ -141,9 +141,9 @@ where } false } - pub fn contains_key(&self, key: &K) -> bool { + pub fn contains_value(&self, value: &K) -> bool { for tk in &self.items { - if tk.value == *key { + if tk.value == *value { return true; } } diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index 4ec518cb..c2c92708 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -1209,9 +1209,6 @@ impl RoutingTableInner { }; // distance is the next metric, closer nodes first - // since multiple cryptosystems are in use, the distance for a key is the shortest - // distance to that key over all supported cryptosystems - let da = vcrypto.distance(&a_key.value, &node_id.value); let db = vcrypto.distance(&b_key.value, &node_id.value); da.cmp(&db) diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 77764d1c..87286060 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -401,28 +401,84 @@ impl RPCProcessor { /// If no node was found in the timeout, this returns None pub async fn search_dht_single_key( &self, - _node_id: PublicKey, - _count: u32, - _fanout: u32, - _timeout: TimestampDuration, - + node_id: TypedKey, + count: usize, + fanout: usize, + timeout_us: TimestampDuration, ) -> Result, RPCError> { let routing_table = self.routing_table(); + let filter = Box::new( + move |rti: &RoutingTableInner, opt_entry: Option>| { + // Exclude our own node + if opt_entry.is_none() { + return false; + } + + // Ensure only things that are valid/signed in the PublicInternet domain are returned + rti.filter_has_valid_signed_node_info( + RoutingDomain::PublicInternet, + true, + opt_entry, + ) + }, + ) as RoutingTableEntryFilter; + let filters = VecDeque::from([filter]); + + let transform = |_rti: &RoutingTableInner, v: Option>| { + NodeRef::new(routing_table.clone(), v.unwrap().clone(), None) + }; + // Get the 'count' closest nodes to the key out of our routing table - let mut closest_nodes = Vec::new(); - routing_table.find_closest_nodes(count, node_id, filters, transform) + let closest_nodes = routing_table.find_closest_nodes(count, node_id, filters, transform); + // If the node we want to locate is one of the closest nodes, return it immediately + if let Some(out) = closest_nodes + .iter() + .find(|x| x.node_ids().contains(&node_id)) + { + return Ok(Some(out.clone())); + } - + // Make accessible to fanout tasks + struct FanoutContext { + closest_nodes: Vec, + called_nodes: TypedKeySet, + } + let closest_nodes = Arc::new(Mutex::new(closest_nodes)); + + // Otherwise contact the 'fanout' closest nodes to see if there's closer nodes + let mut unord = FuturesUnordered::new(); + { + // Spin up 'fanout' tasks to process the fanout + for n in 0..4 { + // Fanout processor + let closest_nodes = closest_nodes.clone(); + let h = async move { + // Find the nth node to iterate on + let cn = closest_nodes.lock(); + let n = n.clamp(0, cn.len()); xxx dont do this, use called nodes set, shouldnt need stop token canceller, but maybe at the top level? nothing is spawning. so maybe not. + let mut node = + + }; + unord.push(h); + } + } + // Wait for them to complete + timeout((timeout_us.as_u64() / 1000u64) as u32, async { + while let Some(_) = unord.next().await {} + }) + .await; + + Ok(None) } /// Search the DHT for the 'count' closest nodes to a key, adding them all to the routing table if they are not there and returning their node references pub async fn search_dht_multi_key( &self, - _node_id: PublicKey, - _count: u32, - _fanout: u32, + _node_id: TypedKey, + _count: usize, + _fanout: usize, _timeout: TimestampDuration, ) -> Result, RPCError> { // xxx return closest nodes after the timeout @@ -433,14 +489,14 @@ impl RPCProcessor { /// Note: This routine can possible be recursive, hence the SendPinBoxFuture async form pub fn resolve_node( &self, - node_id: PublicKey, xxx switch to typedkey for the api. everything else is going to need it. + node_id: TypedKey, ) -> SendPinBoxFuture, RPCError>> { let this = self.clone(); Box::pin(async move { let routing_table = this.routing_table(); // First see if we have the node in our routing table already - if let Some(nr) = routing_table.lookup_any_node_ref(node_id) { + if let Some(nr) = routing_table.lookup_node_ref(node_id) { // ensure we have some dial info for the entry already, // if not, we should do the find_node anyway if nr.has_any_dial_info() { @@ -452,8 +508,8 @@ impl RPCProcessor { let (count, fanout, timeout) = { let c = this.config.get(); ( - c.network.dht.resolve_node_count, - c.network.dht.resolve_node_fanout, + c.network.dht.resolve_node_count as usize, + c.network.dht.resolve_node_fanout as usize, TimestampDuration::from(ms_to_us(c.network.dht.resolve_node_timeout_ms)), ) }; @@ -464,7 +520,7 @@ impl RPCProcessor { .await?; if let Some(nr) = &nr { - if nr.node_ids().contains_key(&node_id) { + if nr.node_ids().contains(&node_id) { // found a close node, but not exact within our configured resolve_node timeout return Ok(None); }