diff --git a/veilid-core/src/routing_table/tasks/private_route_management.rs b/veilid-core/src/routing_table/tasks/private_route_management.rs index 7ae002ef..c6686f67 100644 --- a/veilid-core/src/routing_table/tasks/private_route_management.rs +++ b/veilid-core/src/routing_table/tasks/private_route_management.rs @@ -7,6 +7,83 @@ use stop_token::future::FutureExt as StopFutureExt; const BACKGROUND_SAFETY_ROUTE_COUNT: usize = 2; impl RoutingTable { + /// Fastest routes sort + fn route_sort_latency_fn(a: &(DHTKey, u64), b: &(DHTKey, u64)) -> cmp::Ordering { + let mut al = a.1; + let mut bl = b.1; + // Treat zero latency as uncalculated + if al == 0 { + al = u64::MAX; + } + if bl == 0 { + bl = u64::MAX; + } + // Less is better + al.cmp(&bl) + } + + /// Get the list of routes to test and drop + /// + /// Allocated routes to test: + /// * if a route 'needs_testing' + /// . all published allocated routes + /// . the fastest 0..N default length routes + /// Routes to drop: + /// * if a route 'needs_testing' + /// . the N.. default routes + /// . the rest of the allocated unpublished routes + /// + /// If a route doesn't 'need_testing', then we neither test nor drop it + #[instrument(level = "trace", skip(self))] + fn get_allocated_routes_to_test(&self, cur_ts: Timestamp) -> Vec { + let default_route_hop_count = + self.with_config(|c| c.network.rpc.default_route_hop_count as usize); + + let rss = self.route_spec_store(); + let mut must_test_routes = Vec::::new(); + let mut unpublished_routes = Vec::<(DHTKey, u64)>::new(); + let mut dead_routes = Vec::::new(); + rss.list_allocated_routes(|k, v| { + let stats = v.get_stats(); + // Ignore nodes that don't need testing + if !stats.needs_testing(cur_ts) { + return Option::<()>::None; + } + if v.is_published() { + must_test_routes.push(*k); + } else if v.hop_count() == default_route_hop_count { + unpublished_routes.push((*k, stats.latency_stats.average.as_u64())); + } else { + dead_routes.push(*k); + } + Option::<()>::None + }); + + // Sort unpublished routes by speed if we know the speed + unpublished_routes.sort_by(Self::route_sort_latency_fn); + + // Save up to N unpublished routes and test them + for x in 0..(usize::min(BACKGROUND_SAFETY_ROUTE_COUNT, unpublished_routes.len())) { + must_test_routes.push(unpublished_routes[x].0); + } + + // Kill off all but N unpublished routes rather than testing them + if unpublished_routes.len() > BACKGROUND_SAFETY_ROUTE_COUNT { + for x in &unpublished_routes[BACKGROUND_SAFETY_ROUTE_COUNT..] { + dead_routes.push(x.0); + } + } + + // Process dead routes + for r in &dead_routes { + log_rtab!(debug "Dead route no test: {}", r); + rss.release_route(r); + } + + // return routes to test + must_test_routes + } + /// Test set of routes and remove the ones that don't test clean #[instrument(level = "trace", skip(self, stop_token), err)] async fn test_route_set( @@ -60,7 +137,7 @@ impl RoutingTable { // Process failed routes let ctx = &mut *ctx.lock(); for r in &ctx.dead_routes { - log_rtab!(debug "Dead route: {}", &r); + log_rtab!(debug "Dead route failed to test: {}", &r); rss.release_route(r); } @@ -87,15 +164,7 @@ impl RoutingTable { // Test locally allocated routes first // This may remove dead routes - let rss = self.route_spec_store(); - let routes_needing_testing = rss.list_allocated_routes(|k, v| { - let stats = v.get_stats(); - if stats.needs_testing(cur_ts) { - return Some(*k); - } else { - return None; - } - }); + let routes_needing_testing = self.get_allocated_routes_to_test(cur_ts); if !routes_needing_testing.is_empty() { self.test_route_set(stop_token.clone(), routes_needing_testing) .await?; @@ -105,6 +174,7 @@ impl RoutingTable { let default_route_hop_count = self.with_config(|c| c.network.rpc.default_route_hop_count as usize); let mut local_unpublished_route_count = 0usize; + let rss = self.route_spec_store(); rss.list_allocated_routes(|_k, v| { if !v.is_published() && v.hop_count() == default_route_hop_count { local_unpublished_route_count += 1;