route grooming fix
This commit is contained in:
		@@ -188,6 +188,12 @@ impl RouteSpecDetail {
 | 
			
		||||
    pub fn get_stats_mut(&mut self) -> &mut RouteStats {
 | 
			
		||||
        &mut self.stats
 | 
			
		||||
    }
 | 
			
		||||
    pub fn is_published(&self) -> bool {
 | 
			
		||||
        self.published
 | 
			
		||||
    }
 | 
			
		||||
    pub fn hop_count(&self) -> usize {
 | 
			
		||||
        self.hops.len()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// The core representation of the RouteSpecStore that can be serialized
 | 
			
		||||
@@ -1082,6 +1088,11 @@ impl RouteSpecStore {
 | 
			
		||||
        avoid_node_ids: &[DHTKey],
 | 
			
		||||
    ) -> Option<DHTKey> {
 | 
			
		||||
        let cur_ts = get_timestamp();
 | 
			
		||||
 | 
			
		||||
        let mut routes = Vec::new();
 | 
			
		||||
 | 
			
		||||
        // Get all valid routes, allow routes that need testing
 | 
			
		||||
        // but definitely prefer routes that have been recently tested
 | 
			
		||||
        for detail in &inner.content.details {
 | 
			
		||||
            if detail.1.stability >= stability
 | 
			
		||||
                && detail.1.sequencing >= sequencing
 | 
			
		||||
@@ -1089,7 +1100,6 @@ impl RouteSpecStore {
 | 
			
		||||
                && detail.1.hops.len() <= max_hop_count
 | 
			
		||||
                && detail.1.directions.is_superset(directions)
 | 
			
		||||
                && !detail.1.published
 | 
			
		||||
                && !detail.1.stats.needs_testing(cur_ts)
 | 
			
		||||
            {
 | 
			
		||||
                let mut avoid = false;
 | 
			
		||||
                for h in &detail.1.hops {
 | 
			
		||||
@@ -1099,11 +1109,29 @@ impl RouteSpecStore {
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                if !avoid {
 | 
			
		||||
                    return Some(*detail.0);
 | 
			
		||||
                    routes.push(detail);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        None
 | 
			
		||||
 | 
			
		||||
        // Sort the routes by preference
 | 
			
		||||
        routes.sort_by(|a, b| {
 | 
			
		||||
            let a_needs_testing = a.1.stats.needs_testing(cur_ts);
 | 
			
		||||
            let b_needs_testing = b.1.stats.needs_testing(cur_ts);
 | 
			
		||||
            if !a_needs_testing && b_needs_testing {
 | 
			
		||||
                return cmp::Ordering::Less;
 | 
			
		||||
            }
 | 
			
		||||
            if !b_needs_testing && a_needs_testing {
 | 
			
		||||
                return cmp::Ordering::Greater;
 | 
			
		||||
            }
 | 
			
		||||
            let a_latency = a.1.stats.latency_stats().average;
 | 
			
		||||
            let b_latency = b.1.stats.latency_stats().average;
 | 
			
		||||
 | 
			
		||||
            a_latency.cmp(&b_latency)
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        // Return the best one if we got one
 | 
			
		||||
        routes.first().map(|r| *r.0)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// List all allocated routes
 | 
			
		||||
 
 | 
			
		||||
@@ -4,7 +4,70 @@ use futures_util::stream::{FuturesUnordered, StreamExt};
 | 
			
		||||
use futures_util::FutureExt;
 | 
			
		||||
use stop_token::future::FutureExt as StopFutureExt;
 | 
			
		||||
 | 
			
		||||
const BACKGROUND_SAFETY_ROUTE_COUNT: usize = 2;
 | 
			
		||||
 | 
			
		||||
impl RoutingTable {
 | 
			
		||||
    /// Test set of routes and remove the ones that don't test clean
 | 
			
		||||
    #[instrument(level = "trace", skip(self, stop_token), err)]
 | 
			
		||||
    async fn test_route_set(
 | 
			
		||||
        &self,
 | 
			
		||||
        stop_token: StopToken,
 | 
			
		||||
        routes_needing_testing: Vec<DHTKey>,
 | 
			
		||||
    ) -> EyreResult<()> {
 | 
			
		||||
        let rss = self.route_spec_store();
 | 
			
		||||
        log_rtab!(debug "Testing routes: {:?}", routes_needing_testing);
 | 
			
		||||
 | 
			
		||||
        #[derive(Default, Debug)]
 | 
			
		||||
        struct TestRouteContext {
 | 
			
		||||
            failed: bool,
 | 
			
		||||
            dead_routes: Vec<DHTKey>,
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if routes_needing_testing.is_empty() {
 | 
			
		||||
            return Ok(());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Test all the routes that need testing at the same time
 | 
			
		||||
        let mut unord = FuturesUnordered::new();
 | 
			
		||||
        let ctx = Arc::new(Mutex::new(TestRouteContext::default()));
 | 
			
		||||
        for r in routes_needing_testing {
 | 
			
		||||
            let rss = rss.clone();
 | 
			
		||||
            let ctx = ctx.clone();
 | 
			
		||||
            unord.push(
 | 
			
		||||
                async move {
 | 
			
		||||
                    let success = match rss.test_route(&r).await {
 | 
			
		||||
                        Ok(v) => v,
 | 
			
		||||
                        Err(e) => {
 | 
			
		||||
                            log_rtab!(error "Test route failed: {}", e);
 | 
			
		||||
                            ctx.lock().failed = true;
 | 
			
		||||
                            return;
 | 
			
		||||
                        }
 | 
			
		||||
                    };
 | 
			
		||||
                    if success {
 | 
			
		||||
                        // Route is okay, leave it alone
 | 
			
		||||
                        return;
 | 
			
		||||
                    }
 | 
			
		||||
                    // Route test failed
 | 
			
		||||
                    ctx.lock().dead_routes.push(r);
 | 
			
		||||
                }
 | 
			
		||||
                .instrument(Span::current())
 | 
			
		||||
                .boxed(),
 | 
			
		||||
            );
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Wait for test_route futures to complete in parallel
 | 
			
		||||
        while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
 | 
			
		||||
 | 
			
		||||
        // Process failed routes
 | 
			
		||||
        let ctx = &mut *ctx.lock();
 | 
			
		||||
        for r in &ctx.dead_routes {
 | 
			
		||||
            log_rtab!(debug "Dead route: {}", &r);
 | 
			
		||||
            rss.release_route(r);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Keep private routes assigned and accessible
 | 
			
		||||
    #[instrument(level = "trace", skip(self, stop_token), err)]
 | 
			
		||||
    pub(crate) async fn private_route_management_task_routine(
 | 
			
		||||
@@ -23,9 +86,10 @@ impl RoutingTable {
 | 
			
		||||
            return Ok(());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Collect any routes that need that need testing
 | 
			
		||||
        // Test locally allocated routes first
 | 
			
		||||
        // This may remove dead routes
 | 
			
		||||
        let rss = self.route_spec_store();
 | 
			
		||||
        let mut routes_needing_testing = rss.list_allocated_routes(|k, v| {
 | 
			
		||||
        let routes_needing_testing = rss.list_allocated_routes(|k, v| {
 | 
			
		||||
            let stats = v.get_stats();
 | 
			
		||||
            if stats.needs_testing(cur_ts) {
 | 
			
		||||
                return Some(*k);
 | 
			
		||||
@@ -33,7 +97,45 @@ impl RoutingTable {
 | 
			
		||||
                return None;
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
        let mut remote_routes_needing_testing = rss.list_remote_routes(|k, v| {
 | 
			
		||||
        self.test_route_set(stop_token.clone(), routes_needing_testing)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        // Ensure we have a minimum of N allocated local, unpublished routes with the default number of hops
 | 
			
		||||
        let default_route_hop_count =
 | 
			
		||||
            self.with_config(|c| c.network.rpc.default_route_hop_count as usize);
 | 
			
		||||
        let mut local_unpublished_route_count = 0usize;
 | 
			
		||||
        rss.list_allocated_routes(|_k, v| {
 | 
			
		||||
            if !v.is_published() && v.hop_count() == default_route_hop_count {
 | 
			
		||||
                local_unpublished_route_count += 1;
 | 
			
		||||
            }
 | 
			
		||||
            Option::<()>::None
 | 
			
		||||
        });
 | 
			
		||||
        if local_unpublished_route_count < BACKGROUND_SAFETY_ROUTE_COUNT {
 | 
			
		||||
            let routes_to_allocate = BACKGROUND_SAFETY_ROUTE_COUNT - local_unpublished_route_count;
 | 
			
		||||
 | 
			
		||||
            // Newly allocated routes
 | 
			
		||||
            let mut newly_allocated_routes = Vec::new();
 | 
			
		||||
            for _n in 0..routes_to_allocate {
 | 
			
		||||
                // Parameters here must be the default safety route spec
 | 
			
		||||
                // These will be used by test_remote_route as well
 | 
			
		||||
                if let Some(k) = rss.allocate_route(
 | 
			
		||||
                    Stability::default(),
 | 
			
		||||
                    Sequencing::default(),
 | 
			
		||||
                    default_route_hop_count,
 | 
			
		||||
                    DirectionSet::all(),
 | 
			
		||||
                    &[],
 | 
			
		||||
                )? {
 | 
			
		||||
                    newly_allocated_routes.push(k);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            // Immediately test them
 | 
			
		||||
            self.test_route_set(stop_token.clone(), newly_allocated_routes)
 | 
			
		||||
                .await?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Test remote routes next
 | 
			
		||||
        let remote_routes_needing_testing = rss.list_remote_routes(|k, v| {
 | 
			
		||||
            let stats = v.get_stats();
 | 
			
		||||
            if stats.needs_testing(cur_ts) {
 | 
			
		||||
                return Some(*k);
 | 
			
		||||
@@ -41,53 +143,8 @@ impl RoutingTable {
 | 
			
		||||
                return None;
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
        routes_needing_testing.append(&mut remote_routes_needing_testing);
 | 
			
		||||
 | 
			
		||||
        // Test all the routes that need testing at the same time
 | 
			
		||||
        #[derive(Default, Debug)]
 | 
			
		||||
        struct TestRouteContext {
 | 
			
		||||
            failed: bool,
 | 
			
		||||
            dead_routes: Vec<DHTKey>,
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if !routes_needing_testing.is_empty() {
 | 
			
		||||
            let mut unord = FuturesUnordered::new();
 | 
			
		||||
            let ctx = Arc::new(Mutex::new(TestRouteContext::default()));
 | 
			
		||||
            for r in routes_needing_testing {
 | 
			
		||||
                let rss = rss.clone();
 | 
			
		||||
                let ctx = ctx.clone();
 | 
			
		||||
                unord.push(
 | 
			
		||||
                    async move {
 | 
			
		||||
                        let success = match rss.test_route(&r).await {
 | 
			
		||||
                            Ok(v) => v,
 | 
			
		||||
                            Err(e) => {
 | 
			
		||||
                                log_rtab!(error "test route failed: {}", e);
 | 
			
		||||
                                ctx.lock().failed = true;
 | 
			
		||||
                                return;
 | 
			
		||||
                            }
 | 
			
		||||
                        };
 | 
			
		||||
                        if success {
 | 
			
		||||
                            // Route is okay, leave it alone
 | 
			
		||||
                            return;
 | 
			
		||||
                        }
 | 
			
		||||
                        // Route test failed
 | 
			
		||||
                        ctx.lock().dead_routes.push(r);
 | 
			
		||||
                    }
 | 
			
		||||
                    .instrument(Span::current())
 | 
			
		||||
                    .boxed(),
 | 
			
		||||
                );
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            // Wait for test_route futures to complete in parallel
 | 
			
		||||
            while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
 | 
			
		||||
 | 
			
		||||
            // Process failed routes
 | 
			
		||||
            let ctx = &mut *ctx.lock();
 | 
			
		||||
            for r in &ctx.dead_routes {
 | 
			
		||||
                log_rtab!(debug "Dead route: {}", &r);
 | 
			
		||||
                rss.release_route(r);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        self.test_route_set(stop_token.clone(), remote_routes_needing_testing)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        // Send update (also may send updates for released routes done by other parts of the program)
 | 
			
		||||
        rss.send_route_update();
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user