From 8d80fbb2283dd90112dfff3b65aa5bdea3d029bb Mon Sep 17 00:00:00 2001 From: John Smith Date: Thu, 15 Dec 2022 18:41:44 -0500 Subject: [PATCH] route grooming fix --- external/keyring-manager | 2 +- veilid-cli/src/ui.rs | 1 - .../src/routing_table/route_spec_store.rs | 34 +++- .../tasks/private_route_management.rs | 157 ++++++++++++------ 4 files changed, 139 insertions(+), 55 deletions(-) diff --git a/external/keyring-manager b/external/keyring-manager index c153eb30..b127b2d3 160000 --- a/external/keyring-manager +++ b/external/keyring-manager @@ -1 +1 @@ -Subproject commit c153eb3015d6d118e5d467865510d053ddd84533 +Subproject commit b127b2d3c653fea163a776dd58b3798f28aeeee3 diff --git a/veilid-cli/src/ui.rs b/veilid-cli/src/ui.rs index 9f127056..39909958 100644 --- a/veilid-cli/src/ui.rs +++ b/veilid-cli/src/ui.rs @@ -353,7 +353,6 @@ impl UI { format!(" Error: {}", e), color, )); - return; } } // save to history unless it's a duplicate diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index 5f7d1b64..fbf21951 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -188,6 +188,12 @@ impl RouteSpecDetail { pub fn get_stats_mut(&mut self) -> &mut RouteStats { &mut self.stats } + pub fn is_published(&self) -> bool { + self.published + } + pub fn hop_count(&self) -> usize { + self.hops.len() + } } /// The core representation of the RouteSpecStore that can be serialized @@ -1082,6 +1088,11 @@ impl RouteSpecStore { avoid_node_ids: &[DHTKey], ) -> Option { let cur_ts = get_timestamp(); + + let mut routes = Vec::new(); + + // Get all valid routes, allow routes that need testing + // but definitely prefer routes that have been recently tested for detail in &inner.content.details { if detail.1.stability >= stability && detail.1.sequencing >= sequencing @@ -1089,7 +1100,6 @@ impl RouteSpecStore { && detail.1.hops.len() <= max_hop_count && detail.1.directions.is_superset(directions) && !detail.1.published - && !detail.1.stats.needs_testing(cur_ts) { let mut avoid = false; for h in &detail.1.hops { @@ -1099,11 +1109,29 @@ impl RouteSpecStore { } } if !avoid { - return Some(*detail.0); + routes.push(detail); } } } - None + + // Sort the routes by preference + routes.sort_by(|a, b| { + let a_needs_testing = a.1.stats.needs_testing(cur_ts); + let b_needs_testing = b.1.stats.needs_testing(cur_ts); + if !a_needs_testing && b_needs_testing { + return cmp::Ordering::Less; + } + if !b_needs_testing && a_needs_testing { + return cmp::Ordering::Greater; + } + let a_latency = a.1.stats.latency_stats().average; + let b_latency = b.1.stats.latency_stats().average; + + a_latency.cmp(&b_latency) + }); + + // Return the best one if we got one + routes.first().map(|r| *r.0) } /// List all allocated routes diff --git a/veilid-core/src/routing_table/tasks/private_route_management.rs b/veilid-core/src/routing_table/tasks/private_route_management.rs index f9929e2c..2b239e7c 100644 --- a/veilid-core/src/routing_table/tasks/private_route_management.rs +++ b/veilid-core/src/routing_table/tasks/private_route_management.rs @@ -4,7 +4,70 @@ use futures_util::stream::{FuturesUnordered, StreamExt}; use futures_util::FutureExt; use stop_token::future::FutureExt as StopFutureExt; +const BACKGROUND_SAFETY_ROUTE_COUNT: usize = 2; + impl RoutingTable { + /// Test set of routes and remove the ones that don't test clean + #[instrument(level = "trace", skip(self, stop_token), err)] + async fn test_route_set( + &self, + stop_token: StopToken, + routes_needing_testing: Vec, + ) -> EyreResult<()> { + let rss = self.route_spec_store(); + log_rtab!(debug "Testing routes: {:?}", routes_needing_testing); + + #[derive(Default, Debug)] + struct TestRouteContext { + failed: bool, + dead_routes: Vec, + } + + if routes_needing_testing.is_empty() { + return Ok(()); + } + + // Test all the routes that need testing at the same time + let mut unord = FuturesUnordered::new(); + let ctx = Arc::new(Mutex::new(TestRouteContext::default())); + for r in routes_needing_testing { + let rss = rss.clone(); + let ctx = ctx.clone(); + unord.push( + async move { + let success = match rss.test_route(&r).await { + Ok(v) => v, + Err(e) => { + log_rtab!(error "Test route failed: {}", e); + ctx.lock().failed = true; + return; + } + }; + if success { + // Route is okay, leave it alone + return; + } + // Route test failed + ctx.lock().dead_routes.push(r); + } + .instrument(Span::current()) + .boxed(), + ); + } + + // Wait for test_route futures to complete in parallel + while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} + + // Process failed routes + let ctx = &mut *ctx.lock(); + for r in &ctx.dead_routes { + log_rtab!(debug "Dead route: {}", &r); + rss.release_route(r); + } + + Ok(()) + } + /// Keep private routes assigned and accessible #[instrument(level = "trace", skip(self, stop_token), err)] pub(crate) async fn private_route_management_task_routine( @@ -23,9 +86,10 @@ impl RoutingTable { return Ok(()); } - // Collect any routes that need that need testing + // Test locally allocated routes first + // This may remove dead routes let rss = self.route_spec_store(); - let mut routes_needing_testing = rss.list_allocated_routes(|k, v| { + let routes_needing_testing = rss.list_allocated_routes(|k, v| { let stats = v.get_stats(); if stats.needs_testing(cur_ts) { return Some(*k); @@ -33,7 +97,45 @@ impl RoutingTable { return None; } }); - let mut remote_routes_needing_testing = rss.list_remote_routes(|k, v| { + self.test_route_set(stop_token.clone(), routes_needing_testing) + .await?; + + // Ensure we have a minimum of N allocated local, unpublished routes with the default number of hops + let default_route_hop_count = + self.with_config(|c| c.network.rpc.default_route_hop_count as usize); + let mut local_unpublished_route_count = 0usize; + rss.list_allocated_routes(|_k, v| { + if !v.is_published() && v.hop_count() == default_route_hop_count { + local_unpublished_route_count += 1; + } + Option::<()>::None + }); + if local_unpublished_route_count < BACKGROUND_SAFETY_ROUTE_COUNT { + let routes_to_allocate = BACKGROUND_SAFETY_ROUTE_COUNT - local_unpublished_route_count; + + // Newly allocated routes + let mut newly_allocated_routes = Vec::new(); + for _n in 0..routes_to_allocate { + // Parameters here must be the default safety route spec + // These will be used by test_remote_route as well + if let Some(k) = rss.allocate_route( + Stability::default(), + Sequencing::default(), + default_route_hop_count, + DirectionSet::all(), + &[], + )? { + newly_allocated_routes.push(k); + } + } + + // Immediately test them + self.test_route_set(stop_token.clone(), newly_allocated_routes) + .await?; + } + + // Test remote routes next + let remote_routes_needing_testing = rss.list_remote_routes(|k, v| { let stats = v.get_stats(); if stats.needs_testing(cur_ts) { return Some(*k); @@ -41,53 +143,8 @@ impl RoutingTable { return None; } }); - routes_needing_testing.append(&mut remote_routes_needing_testing); - - // Test all the routes that need testing at the same time - #[derive(Default, Debug)] - struct TestRouteContext { - failed: bool, - dead_routes: Vec, - } - - if !routes_needing_testing.is_empty() { - let mut unord = FuturesUnordered::new(); - let ctx = Arc::new(Mutex::new(TestRouteContext::default())); - for r in routes_needing_testing { - let rss = rss.clone(); - let ctx = ctx.clone(); - unord.push( - async move { - let success = match rss.test_route(&r).await { - Ok(v) => v, - Err(e) => { - log_rtab!(error "test route failed: {}", e); - ctx.lock().failed = true; - return; - } - }; - if success { - // Route is okay, leave it alone - return; - } - // Route test failed - ctx.lock().dead_routes.push(r); - } - .instrument(Span::current()) - .boxed(), - ); - } - - // Wait for test_route futures to complete in parallel - while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} - - // Process failed routes - let ctx = &mut *ctx.lock(); - for r in &ctx.dead_routes { - log_rtab!(debug "Dead route: {}", &r); - rss.release_route(r); - } - } + self.test_route_set(stop_token.clone(), remote_routes_needing_testing) + .await?; // Send update (also may send updates for released routes done by other parts of the program) rss.send_route_update();