diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index d0b180f0..ad2da244 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -540,10 +540,21 @@ impl RoutingTable { &self, routing_domain_set: RoutingDomainSet, min_state: BucketEntryState, + crypto_kinds: &[CryptoKind], ) -> usize { self.inner .read() - .get_entry_count(routing_domain_set, min_state) + .get_entry_count(routing_domain_set, min_state, crypto_kinds) + } + + pub fn get_entry_count_per_crypto_kind( + &self, + routing_domain_set: RoutingDomainSet, + min_state: BucketEntryState, + ) -> BTreeMap { + self.inner + .read() + .get_entry_count_per_crypto_kind(routing_domain_set, min_state) } pub fn get_nodes_needing_ping( @@ -774,14 +785,19 @@ impl RoutingTable { .find_fast_public_nodes_filtered(self.clone(), node_count, filters) } - /// Retrieve up to N of each type of protocol capable nodes - pub fn find_bootstrap_nodes_filtered(&self, max_per_type: usize) -> Vec { + /// Retrieve up to N of each type of protocol capable nodes for a single crypto kind + fn find_bootstrap_nodes_filtered_per_crypto_kind( + &self, + crypto_kind: CryptoKind, + max_per_type: usize, + ) -> Vec { let protocol_types = vec![ ProtocolType::UDP, ProtocolType::TCP, ProtocolType::WS, ProtocolType::WSS, ]; + let protocol_types_len = protocol_types.len(); let mut nodes_proto_v4 = vec![0usize, 0usize, 0usize, 0usize]; let mut nodes_proto_v6 = vec![0usize, 0usize, 0usize, 0usize]; @@ -795,6 +811,11 @@ impl RoutingTable { return false; } + // Ensure crypto kind is supported + if !e.crypto_kinds().contains(&crypto_kind) { + return false; + } + // does it have some dial info we need? let filter = |n: &NodeInfo| { let mut keep = false; @@ -840,6 +861,27 @@ impl RoutingTable { ) } + /// Retrieve up to N of each type of protocol capable nodes for all crypto kinds + pub fn find_bootstrap_nodes_filtered(&self, max_per_type: usize) -> Vec { + let mut out = + self.find_bootstrap_nodes_filtered_per_crypto_kind(VALID_CRYPTO_KINDS[0], max_per_type); + + // Merge list of nodes so we don't have duplicates + for crypto_kind in &VALID_CRYPTO_KINDS[1..] { + let nrs = + self.find_bootstrap_nodes_filtered_per_crypto_kind(*crypto_kind, max_per_type); + 'nrloop: for nr in nrs { + for nro in out { + if nro.same_entry(&nr) { + continue 'nrloop; + } + } + out.push(nr); + } + } + out + } + pub fn find_peers_with_sort_and_filter( &self, node_count: usize, diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index de60b8b0..798102cc 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -2,7 +2,7 @@ use super::*; use weak_table::PtrWeakHashSet; const RECENT_PEERS_TABLE_SIZE: usize = 64; - +pub type EntryCounts = BTreeMap<(RoutingDomain, CryptoKind), usize>; ////////////////////////////////////////////////////////////////////////// #[derive(Debug, Clone, Copy)] @@ -18,6 +18,8 @@ pub struct RoutingTableInner { pub(super) buckets: BTreeMap>, /// A weak set of all the entries we have in the buckets for faster iteration pub(super) all_entries: PtrWeakHashSet>, + /// A rough count of the entries in the table per routing domain and crypto kind + pub(super) live_entry_count: EntryCounts, /// The public internet routing domain pub(super) public_internet_routing_domain: PublicInternetRoutingDomainDetail, /// The dial info we use on the local network @@ -42,6 +44,7 @@ impl RoutingTableInner { public_internet_routing_domain: PublicInternetRoutingDomainDetail::default(), local_network_routing_domain: LocalNetworkRoutingDomainDetail::default(), all_entries: PtrWeakHashSet::new(), + live_entry_count: BTreeMap::new(), self_latency_stats_accounting: LatencyStatsAccounting::new(), self_transfer_stats_accounting: TransferStatsAccounting::new(), self_transfer_stats: TransferStatsDownUp::default(), @@ -409,18 +412,45 @@ impl RoutingTableInner { } } + /// Build the counts of entries per routing domain and crypto kind and cache them + pub fn refresh_cached_entry_counts(&mut self) -> EntryCounts { + self.live_entry_count.clear(); + let cur_ts = get_aligned_timestamp(); + self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| { + entry.with_inner(|e| { + if let Some(rd) = e.best_routing_domain(rti, RoutingDomainSet::all()) { + for crypto_kind in e.crypto_kinds() { + self.live_entry_count + .entry((rd, crypto_kind)) + .and_modify(|x| *x += 1) + .or_insert(1); + } + } + }); + Option::<()>::None + }); + self.live_entry_count.clone() + } + + /// Return the last cached entry counts + pub fn cached_entry_counts(&self) -> EntryCounts { + self.live_entry_count.clone() + } + + /// Count entries that match some criteria pub fn get_entry_count( &self, routing_domain_set: RoutingDomainSet, min_state: BucketEntryState, - crypto_kinds: &[CryptoKind], xxx finish this and peer minimum refresh and bootstrap tick, then both routines + crypto_kinds: &[CryptoKind], ) -> usize { let mut count = 0usize; let cur_ts = get_aligned_timestamp(); self.with_entries(cur_ts, min_state, |rti, e| { - if e.with_inner(|e| e.best_routing_domain(rti, routing_domain_set)) - .is_some() - { + if e.with_inner(|e| { + e.best_routing_domain(rti, routing_domain_set).is_some() + && !common_crypto_kinds(&e.crypto_kinds(), crypto_kinds).is_empty() + }) { count += 1; } Option::<()>::None @@ -428,6 +458,33 @@ impl RoutingTableInner { count } + /// Count entries per crypto kind that match some criteria + pub fn get_entry_count_per_crypto_kind( + &self, + routing_domain_set: RoutingDomainSet, + min_state: BucketEntryState, + ) -> BTreeMap { + let mut counts = BTreeMap::new(); + let cur_ts = get_aligned_timestamp(); + self.with_entries(cur_ts, min_state, |rti, e| { + if let Some(crypto_kinds) = e.with_inner(|e| { + if e.best_routing_domain(rti, routing_domain_set).is_some() { + Some(e.crypto_kinds()) + } else { + None + } + }) { + // Got crypto kinds, add to map + for ck in crypto_kinds { + counts.entry(ck).and_modify(|x| *x += 1).or_insert(1); + } + } + Option::<()>::None + }); + counts + } + + /// Iterate entries with a filter pub fn with_entries) -> Option>( &self, cur_ts: Timestamp, @@ -445,6 +502,7 @@ impl RoutingTableInner { None } + /// Iterate entries with a filter mutably pub fn with_entries_mut) -> Option>( &mut self, cur_ts: Timestamp, @@ -615,7 +673,7 @@ impl RoutingTableInner { new_entry.with_mut_inner(|e| update_func(self, e)); // Kick the bucket - log_rtab!(debug "Routing table now has {} nodes, {} live", self.bucket_entry_count(), self.get_entry_count(RoutingDomainSet::all(), BucketEntryState::Unreliable)); + log_rtab!(debug "Routing table now has {} nodes, {} live", self.bucket_entry_count(), self.get_entry_count(RoutingDomainSet::all(), BucketEntryState::Unreliable, &VALID_CRYPTO_KINDS)); Some(nr) } diff --git a/veilid-core/src/routing_table/tasks/bootstrap.rs b/veilid-core/src/routing_table/tasks/bootstrap.rs index ccdf948d..cc717131 100644 --- a/veilid-core/src/routing_table/tasks/bootstrap.rs +++ b/veilid-core/src/routing_table/tasks/bootstrap.rs @@ -294,6 +294,9 @@ impl RoutingTable { log_rtab!(debug "--- bootstrap_task"); + // Get counts by crypto kind + let entry_count = self.inner.read().cached_entry_counts(); + // See if we are specifying a direct dialinfo for bootstrap, if so use the direct mechanism let mut bootstrap_dialinfos = Vec::::new(); for b in &bootstrap { @@ -341,6 +344,14 @@ impl RoutingTable { { // Add this our futures to process in parallel for crypto_kind in VALID_CRYPTO_KINDS { + // Do we need to bootstrap this crypto kind? + let eckey = (RoutingDomain::PublicInternet, crypto_kind); + let cnt = entry_count.get(&eckey).copied().unwrap_or_default(); + if cnt != 0 { + continue; + } + + // Bootstrap this crypto kind let nr = nr.clone(); let routing_table = self.clone(); unord.push( diff --git a/veilid-core/src/routing_table/tasks/mod.rs b/veilid-core/src/routing_table/tasks/mod.rs index 1cc3e317..b7841b78 100644 --- a/veilid-core/src/routing_table/tasks/mod.rs +++ b/veilid-core/src/routing_table/tasks/mod.rs @@ -134,21 +134,30 @@ impl RoutingTable { self.unlocked_inner.kick_buckets_task.tick().await?; } - // See how many live PublicInternet entries we have - let live_public_internet_entry_count = self.get_entry_count( - RoutingDomain::PublicInternet.into(), - BucketEntryState::Unreliable, - ); + // Refresh entry counts + let entry_counts = { + let mut inner = self.inner.write(); + inner.refresh_cached_entry_counts() + }; + let min_peer_count = self.with_config(|c| c.network.dht.min_peer_count as usize); - // If none, then add the bootstrap nodes to it - if live_public_internet_entry_count == 0 { + // Figure out which tables need bootstrap or peer minimum refresh + let mut needs_bootstrap = false; + let mut needs_peer_minimum_refresh = false; + for ck in VALID_CRYPTO_KINDS { + let eckey = (RoutingDomain::PublicInternet, ck); + let cnt = entry_counts.get(&eckey).copied().unwrap_or_default(); + if cnt == 0 { + needs_bootstrap = true; + } else if cnt < min_peer_count { + needs_peer_minimum_refresh = true; + } + } + if needs_bootstrap { self.unlocked_inner.bootstrap_task.tick().await?; } - // If we still don't have enough peers, find nodes until we do - else if !self.unlocked_inner.bootstrap_task.is_running() - && live_public_internet_entry_count < min_peer_count - { + if needs_peer_minimum_refresh { self.unlocked_inner.peer_minimum_refresh_task.tick().await?; } diff --git a/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs b/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs index 71c6deea..0caa3399 100644 --- a/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs +++ b/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs @@ -16,6 +16,9 @@ impl RoutingTable { self, stop_token: StopToken, ) -> EyreResult<()> { + // Get counts by crypto kind + let entry_count = self.inner.read().cached_entry_counts(); + let min_peer_count = self.with_config(|c| c.network.dht.min_peer_count as usize); // For the PublicInternet routing domain, get list of all peers we know about @@ -24,11 +27,21 @@ impl RoutingTable { let mut ord = FuturesOrdered::new(); for crypto_kind in VALID_CRYPTO_KINDS { + // Do we need to peer minimum refresh this crypto kind? + let eckey = (RoutingDomain::PublicInternet, crypto_kind); + let cnt = entry_count.get(&eckey).copied().unwrap_or_default(); + if cnt == 0 || cnt > min_peer_count { + // If we have enough nodes, skip it + // If we have zero nodes, bootstrap will get it + continue; + } + let routing_table = self.clone(); let mut filters = VecDeque::new(); let filter = Box::new( move |rti: &RoutingTableInner, opt_entry: Option>| { + // Keep only the entries that contain the crypto kind we're looking for if let Some(entry) = opt_entry { entry.with_inner(|e| e.crypto_kinds().contains(&crypto_kind)) } else { @@ -49,8 +62,12 @@ impl RoutingTable { for nr in noderefs { let routing_table = self.clone(); ord.push_back( - async move { routing_table.reverse_find_node(nr, false).await } - .instrument(Span::current()), + async move { + routing_table + .reverse_find_node(crypto_kind, nr, false) + .await + } + .instrument(Span::current()), ); } }