more refactor

This commit is contained in:
John Smith
2023-02-25 19:51:14 -05:00
parent 7962d3fe11
commit 4bc3f950df
17 changed files with 587 additions and 529 deletions

View File

@@ -8,7 +8,7 @@ const BACKGROUND_SAFETY_ROUTE_COUNT: usize = 2;
impl RoutingTable {
/// Fastest routes sort
fn route_sort_latency_fn(a: &(TypedKey, u64), b: &(TypedKey, u64)) -> cmp::Ordering {
fn route_sort_latency_fn(a: &(RouteId, u64), b: &(RouteId, u64)) -> cmp::Ordering {
let mut al = a.1;
let mut bl = b.1;
// Treat zero latency as uncalculated
@@ -35,14 +35,14 @@ impl RoutingTable {
///
/// If a route doesn't 'need_testing', then we neither test nor drop it
#[instrument(level = "trace", skip(self))]
fn get_allocated_routes_to_test(&self, cur_ts: Timestamp) -> Vec<TypedKey> {
fn get_allocated_routes_to_test(&self, cur_ts: Timestamp) -> Vec<RouteId> {
let default_route_hop_count =
self.with_config(|c| c.network.rpc.default_route_hop_count as usize);
let rss = self.route_spec_store();
let mut must_test_routes = Vec::<TypedKey>::new();
let mut unpublished_routes = Vec::<(TypedKey, u64)>::new();
let mut expired_routes = Vec::<TypedKey>::new();
let mut must_test_routes = Vec::<RouteId>::new();
let mut unpublished_routes = Vec::<(RouteId, u64)>::new();
let mut expired_routes = Vec::<RouteId>::new();
rss.list_allocated_routes(|k, v| {
let stats = v.get_stats();
// Ignore nodes that don't need testing
@@ -81,7 +81,7 @@ impl RoutingTable {
}
// Process dead routes
for r in &expired_routes {
for r in expired_routes {
log_rtab!(debug "Expired route: {}", r);
rss.release_route(r);
}
@@ -95,7 +95,7 @@ impl RoutingTable {
async fn test_route_set(
&self,
stop_token: StopToken,
routes_needing_testing: Vec<TypedKey>,
routes_needing_testing: Vec<RouteId>,
) -> EyreResult<()> {
if routes_needing_testing.is_empty() {
return Ok(());
@@ -107,43 +107,45 @@ impl RoutingTable {
#[derive(Default, Debug)]
struct TestRouteContext {
failed: bool,
dead_routes: Vec<TypedKey>,
dead_routes: Vec<RouteId>,
}
let mut unord = FuturesUnordered::new();
let ctx = Arc::new(Mutex::new(TestRouteContext::default()));
for r in routes_needing_testing {
let rss = rss.clone();
let ctx = ctx.clone();
unord.push(
async move {
let success = match rss.test_route(&r).await {
Ok(v) => v,
Err(e) => {
log_rtab!(error "Test route failed: {}", e);
ctx.lock().failed = true;
{
let mut unord = FuturesUnordered::new();
for r in routes_needing_testing {
let rss = rss.clone();
let ctx = ctx.clone();
unord.push(
async move {
let success = match rss.test_route(r).await {
Ok(v) => v,
Err(e) => {
log_rtab!(error "Test route failed: {}", e);
ctx.lock().failed = true;
return;
}
};
if success {
// Route is okay, leave it alone
return;
}
};
if success {
// Route is okay, leave it alone
return;
// Route test failed
ctx.lock().dead_routes.push(r);
}
// Route test failed
ctx.lock().dead_routes.push(r);
}
.instrument(Span::current())
.boxed(),
);
.instrument(Span::current())
.boxed(),
);
}
// Wait for test_route futures to complete in parallel
while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
}
// Wait for test_route futures to complete in parallel
while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
// Process failed routes
let ctx = &mut *ctx.lock();
for r in &ctx.dead_routes {
log_rtab!(debug "Dead route failed to test: {}", &r);
let ctx = Arc::try_unwrap(ctx).unwrap().into_inner();
for r in ctx.dead_routes {
log_rtab!(debug "Dead route failed to test: {}", r);
rss.release_route(r);
}
@@ -176,13 +178,16 @@ impl RoutingTable {
.await?;
}
// Ensure we have a minimum of N allocated local, unpublished routes with the default number of hops
// Ensure we have a minimum of N allocated local, unpublished routes with the default number of hops and all our supported crypto kinds
let default_route_hop_count =
self.with_config(|c| c.network.rpc.default_route_hop_count as usize);
let mut local_unpublished_route_count = 0usize;
let rss = self.route_spec_store();
rss.list_allocated_routes(|_k, v| {
if !v.is_published() && v.hop_count() == default_route_hop_count {
if !v.is_published()
&& v.hop_count() == default_route_hop_count
&& v.get_route_set_keys().kinds() == VALID_CRYPTO_KINDS
{
local_unpublished_route_count += 1;
}
Option::<()>::None
@@ -196,6 +201,7 @@ impl RoutingTable {
// Parameters here must be the default safety route spec
// These will be used by test_remote_route as well
if let Some(k) = rss.allocate_route(
&VALID_CRYPTO_KINDS,
Stability::default(),
Sequencing::default(),
default_route_hop_count,