pr management work

This commit is contained in:
John Smith 2022-11-25 14:21:55 -05:00
parent 05be3c8cc5
commit 79f55f1a0c
11 changed files with 464 additions and 90 deletions

View File

@ -405,7 +405,22 @@ reply - reply to an AppCall not handled directly by the server
self.inner_mut().ui.set_config(config.config) self.inner_mut().ui.set_config(config.config)
} }
pub fn update_route(&mut self, route: veilid_core::VeilidStateRoute) { pub fn update_route(&mut self, route: veilid_core::VeilidStateRoute) {
//self.inner_mut().ui.set_config(config.config) let mut out = String::new();
if !route.dead_routes.is_empty() {
out.push_str(&format!("Dead routes: {:?}", route.dead_routes));
}
if !route.dead_remote_routes.is_empty() {
if !out.is_empty() {
out.push_str("\n");
}
out.push_str(&format!(
"Dead remote routes: {:?}",
route.dead_remote_routes
));
}
if !out.is_empty() {
self.inner().ui.add_node_event(out);
}
} }
pub fn update_log(&mut self, log: veilid_core::VeilidLog) { pub fn update_log(&mut self, log: veilid_core::VeilidLog) {

View File

@ -283,6 +283,14 @@ impl NetworkManager {
.connection_manager .connection_manager
.clone() .clone()
} }
pub fn update_callback(&self) -> UpdateCallback {
self.unlocked_inner
.update_callback
.read()
.as_ref()
.unwrap()
.clone()
}
#[instrument(level = "debug", skip_all, err)] #[instrument(level = "debug", skip_all, err)]
pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> { pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> {

View File

@ -127,6 +127,9 @@ impl RoutingTable {
pub fn rpc_processor(&self) -> RPCProcessor { pub fn rpc_processor(&self) -> RPCProcessor {
self.network_manager().rpc_processor() self.network_manager().rpc_processor()
} }
pub fn update_callback(&self) -> UpdateCallback {
self.network_manager().update_callback()
}
pub fn with_config<F, R>(&self, f: F) -> R pub fn with_config<F, R>(&self, f: F) -> R
where where
F: FnOnce(&VeilidConfigInner) -> R, F: FnOnce(&VeilidConfigInner) -> R,

View File

@ -116,6 +116,18 @@ impl PrivateRoute {
PrivateRouteHops::Empty => return None, PrivateRouteHops::Empty => return None,
} }
} }
pub fn first_hop_node_id(&self) -> Option<DHTKey> {
let PrivateRouteHops::FirstHop(pr_first_hop) = &self.hops else {
return None;
};
// Get the safety route to use from the spec
Some(match &pr_first_hop.node {
RouteNode::NodeId(n) => n.key,
RouteNode::PeerInfo(p) => p.node_id.key,
})
}
} }
impl fmt::Display for PrivateRoute { impl fmt::Display for PrivateRoute {

View File

@ -8,6 +8,8 @@ use rkyv::{
const REMOTE_PRIVATE_ROUTE_CACHE_SIZE: usize = 1024; const REMOTE_PRIVATE_ROUTE_CACHE_SIZE: usize = 1024;
/// Remote private route cache entries expire in 5 minutes if they haven't been used /// Remote private route cache entries expire in 5 minutes if they haven't been used
const REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY: u64 = 300_000_000u64; const REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY: u64 = 300_000_000u64;
/// Amount of time a route can remain idle before it gets tested
const ROUTE_MIN_IDLE_TIME_MS: u32 = 30_000;
/// Compiled route (safety route + private route) /// Compiled route (safety route + private route)
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -32,25 +34,25 @@ pub struct KeyPair {
pub struct RouteStats { pub struct RouteStats {
/// Consecutive failed to send count /// Consecutive failed to send count
#[with(Skip)] #[with(Skip)]
failed_to_send: u32, pub failed_to_send: u32,
/// Questions lost /// Questions lost
#[with(Skip)] #[with(Skip)]
questions_lost: u32, pub questions_lost: u32,
/// Timestamp of when the route was created /// Timestamp of when the route was created
created_ts: u64, pub created_ts: u64,
/// Timestamp of when the route was last checked for validity /// Timestamp of when the route was last checked for validity
#[with(Skip)] #[with(Skip)]
last_tested_ts: Option<u64>, pub last_tested_ts: Option<u64>,
/// Timestamp of when the route was last sent to /// Timestamp of when the route was last sent to
#[with(Skip)] #[with(Skip)]
last_sent_ts: Option<u64>, pub last_sent_ts: Option<u64>,
/// Timestamp of when the route was last received over /// Timestamp of when the route was last received over
#[with(Skip)] #[with(Skip)]
last_received_ts: Option<u64>, pub last_received_ts: Option<u64>,
/// Transfers up and down /// Transfers up and down
transfer_stats_down_up: TransferStatsDownUp, pub transfer_stats_down_up: TransferStatsDownUp,
/// Latency stats /// Latency stats
latency_stats: LatencyStats, pub latency_stats: LatencyStats,
/// Accounting mechanism for this route's RPC latency /// Accounting mechanism for this route's RPC latency
#[with(Skip)] #[with(Skip)]
latency_stats_accounting: LatencyStatsAccounting, latency_stats_accounting: LatencyStatsAccounting,
@ -129,6 +131,28 @@ impl RouteStats {
self.last_sent_ts = None; self.last_sent_ts = None;
self.last_received_ts = None; self.last_received_ts = None;
} }
/// Check if a route needs testing
pub fn needs_testing(&self, cur_ts: u64) -> bool {
// Has the route had any failures lately?
if self.questions_lost > 0 || self.failed_to_send > 0 {
// If so, always test
return true;
}
// Has the route been tested within the idle time we'd want to check things?
// (also if we've received successfully over the route, this will get set)
if let Some(last_tested_ts) = self.last_tested_ts {
if cur_ts.saturating_sub(last_tested_ts) > (ROUTE_MIN_IDLE_TIME_MS as u64 * 1000u64) {
return true;
}
} else {
// If this route has never been tested, it needs to be
return true;
}
false
}
} }
#[derive(Clone, Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)] #[derive(Clone, Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
@ -157,6 +181,15 @@ pub struct RouteSpecDetail {
stats: RouteStats, stats: RouteStats,
} }
impl RouteSpecDetail {
pub fn get_stats(&self) -> &RouteStats {
&self.stats
}
pub fn get_stats_mut(&mut self) -> &mut RouteStats {
&mut self.stats
}
}
/// The core representation of the RouteSpecStore that can be serialized /// The core representation of the RouteSpecStore that can be serialized
#[derive(Debug, Clone, Default, RkyvArchive, RkyvSerialize, RkyvDeserialize)] #[derive(Debug, Clone, Default, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C), derive(CheckBytes))] #[archive_attr(repr(C), derive(CheckBytes))]
@ -178,6 +211,15 @@ pub struct RemotePrivateRouteInfo {
stats: RouteStats, stats: RouteStats,
} }
impl RemotePrivateRouteInfo {
pub fn get_stats(&self) -> &RouteStats {
&self.stats
}
pub fn get_stats_mut(&mut self) -> &mut RouteStats {
&mut self.stats
}
}
/// Ephemeral data used to help the RouteSpecStore operate efficiently /// Ephemeral data used to help the RouteSpecStore operate efficiently
#[derive(Debug)] #[derive(Debug)]
pub struct RouteSpecStoreCache { pub struct RouteSpecStoreCache {
@ -189,6 +231,10 @@ pub struct RouteSpecStoreCache {
hop_cache: HashSet<Vec<u8>>, hop_cache: HashSet<Vec<u8>>,
/// Has a remote private route responded to a question and when /// Has a remote private route responded to a question and when
remote_private_route_cache: LruCache<DHTKey, RemotePrivateRouteInfo>, remote_private_route_cache: LruCache<DHTKey, RemotePrivateRouteInfo>,
/// List of dead allocated routes
dead_routes: Vec<DHTKey>,
/// List of dead remote routes
dead_remote_routes: Vec<DHTKey>,
} }
impl Default for RouteSpecStoreCache { impl Default for RouteSpecStoreCache {
@ -198,6 +244,8 @@ impl Default for RouteSpecStoreCache {
used_end_nodes: Default::default(), used_end_nodes: Default::default(),
hop_cache: Default::default(), hop_cache: Default::default(),
remote_private_route_cache: LruCache::new(REMOTE_PRIVATE_ROUTE_CACHE_SIZE), remote_private_route_cache: LruCache::new(REMOTE_PRIVATE_ROUTE_CACHE_SIZE),
dead_routes: Default::default(),
dead_remote_routes: Default::default(),
} }
} }
} }
@ -341,6 +389,7 @@ impl RouteSpecStore {
} }
} }
#[instrument(level = "trace", skip(routing_table), err)]
pub async fn load(routing_table: RoutingTable) -> EyreResult<RouteSpecStore> { pub async fn load(routing_table: RoutingTable) -> EyreResult<RouteSpecStore> {
let (max_route_hop_count, default_route_hop_count) = { let (max_route_hop_count, default_route_hop_count) = {
let config = routing_table.network_manager().config(); let config = routing_table.network_manager().config();
@ -413,6 +462,7 @@ impl RouteSpecStore {
Ok(rss) Ok(rss)
} }
#[instrument(level = "trace", skip(self), err)]
pub async fn save(&self) -> EyreResult<()> { pub async fn save(&self) -> EyreResult<()> {
let content = { let content = {
let inner = self.inner.lock(); let inner = self.inner.lock();
@ -448,6 +498,29 @@ impl RouteSpecStore {
Ok(()) Ok(())
} }
#[instrument(level = "trace", skip(self))]
pub fn send_route_update(&self) {
let update_callback = self.unlocked_inner.routing_table.update_callback();
let (dead_routes, dead_remote_routes) = {
let mut inner = self.inner.lock();
if inner.cache.dead_routes.is_empty() && inner.cache.dead_remote_routes.is_empty() {
// Nothing to do
return;
}
let dead_routes = core::mem::take(&mut inner.cache.dead_routes);
let dead_remote_routes = core::mem::take(&mut inner.cache.dead_remote_routes);
(dead_routes, dead_remote_routes)
};
let update = VeilidUpdate::Route(VeilidStateRoute {
dead_routes,
dead_remote_routes,
});
update_callback(update);
}
fn add_to_cache(cache: &mut RouteSpecStoreCache, cache_key: Vec<u8>, rsd: &RouteSpecDetail) { fn add_to_cache(cache: &mut RouteSpecStoreCache, cache_key: Vec<u8>, rsd: &RouteSpecDetail) {
if !cache.hop_cache.insert(cache_key) { if !cache.hop_cache.insert(cache_key) {
panic!("route should never be inserted twice"); panic!("route should never be inserted twice");
@ -500,6 +573,7 @@ impl RouteSpecStore {
/// Prefers nodes that are not currently in use by another route /// Prefers nodes that are not currently in use by another route
/// The route is not yet tested for its reachability /// The route is not yet tested for its reachability
/// Returns None if no route could be allocated at this time /// Returns None if no route could be allocated at this time
#[instrument(level = "trace", skip(self), ret, err)]
pub fn allocate_route( pub fn allocate_route(
&self, &self,
stability: Stability, stability: Stability,
@ -523,6 +597,7 @@ impl RouteSpecStore {
) )
} }
#[instrument(level = "trace", skip(self, inner, rti), ret, err)]
fn allocate_route_inner( fn allocate_route_inner(
&self, &self,
inner: &mut RouteSpecStoreInner, inner: &mut RouteSpecStoreInner,
@ -789,6 +864,7 @@ impl RouteSpecStore {
Ok(Some(public_key)) Ok(Some(public_key))
} }
#[instrument(level = "trace", skip(self, data), ret, err)]
pub fn validate_signatures( pub fn validate_signatures(
&self, &self,
public_key: &DHTKey, public_key: &DHTKey,
@ -835,10 +911,8 @@ impl RouteSpecStore {
))) )))
} }
/// Test an allocated route for continuity #[instrument(level = "trace", skip(self), ret, err)]
pub async fn test_route(&self, key: &DHTKey) -> EyreResult<bool> { async fn test_allocated_route(&self, key: &DHTKey) -> EyreResult<bool> {
let rpc_processor = self.unlocked_inner.routing_table.rpc_processor();
// Make loopback route to test with // Make loopback route to test with
let dest = { let dest = {
let private_route = self.assemble_private_route(key, None)?; let private_route = self.assemble_private_route(key, None)?;
@ -864,6 +938,7 @@ impl RouteSpecStore {
}; };
// Test with double-round trip ping to self // Test with double-round trip ping to self
let rpc_processor = self.unlocked_inner.routing_table.rpc_processor();
let _res = match rpc_processor.rpc_call_status(dest).await? { let _res = match rpc_processor.rpc_call_status(dest).await? {
NetworkResult::Value(v) => v, NetworkResult::Value(v) => v,
_ => { _ => {
@ -875,13 +950,71 @@ impl RouteSpecStore {
Ok(true) Ok(true)
} }
/// Release an allocated route that is no longer in use #[instrument(level = "trace", skip(self), ret, err)]
pub fn release_route(&self, public_key: DHTKey) -> EyreResult<()> { async fn test_remote_route(&self, key: &DHTKey) -> EyreResult<bool> {
let mut inner = self.inner.lock(); // Make private route test
let Some(detail) = inner.content.details.remove(&public_key) else { let dest = {
bail!("can't release route that was never allocated"); // Get the route to test
let private_route = match self.peek_remote_private_route(key) {
Some(pr) => pr,
None => return Ok(false),
};
// Get a safety route that is good enough
let safety_spec = SafetySpec {
preferred_route: None,
hop_count: self.unlocked_inner.default_route_hop_count,
stability: Stability::LowLatency,
sequencing: Sequencing::NoPreference,
};
let safety_selection = SafetySelection::Safe(safety_spec);
Destination::PrivateRoute {
private_route,
safety_selection,
}
}; };
// Test with double-round trip ping to self
let rpc_processor = self.unlocked_inner.routing_table.rpc_processor();
let _res = match rpc_processor.rpc_call_status(dest).await? {
NetworkResult::Value(v) => v,
_ => {
// Did not error, but did not come back, just return false
return Ok(false);
}
};
Ok(true)
}
/// Test an allocated route for continuity
#[instrument(level = "trace", skip(self), ret, err)]
pub async fn test_route(&self, key: &DHTKey) -> EyreResult<bool> {
let is_remote = {
let inner = &mut *self.inner.lock();
let cur_ts = intf::get_timestamp();
Self::with_peek_remote_private_route(inner, cur_ts, key, |_| {}).is_some()
};
if is_remote {
self.test_remote_route(key).await
} else {
self.test_allocated_route(key).await
}
}
/// Release an allocated route that is no longer in use
#[instrument(level = "trace", skip(self), ret)]
fn release_allocated_route(&self, public_key: &DHTKey) -> bool {
let mut inner = self.inner.lock();
let Some(detail) = inner.content.details.remove(public_key) else {
return false;
};
// Mark it as dead for the update
inner.cache.dead_routes.push(*public_key);
// Remove from hop cache // Remove from hop cache
let cache_key = route_hops_to_hop_cache(&detail.hops); let cache_key = route_hops_to_hop_cache(&detail.hops);
if !inner.cache.hop_cache.remove(&cache_key) { if !inner.cache.hop_cache.remove(&cache_key) {
@ -917,11 +1050,27 @@ impl RouteSpecStore {
panic!("used_end_nodes cache should have contained hop"); panic!("used_end_nodes cache should have contained hop");
} }
} }
Ok(()) true
}
/// Release an allocated or remote route that is no longer in use
#[instrument(level = "trace", skip(self), ret)]
pub fn release_route(&self, key: &DHTKey) -> bool {
let is_remote = {
let inner = &mut *self.inner.lock();
let cur_ts = intf::get_timestamp();
Self::with_peek_remote_private_route(inner, cur_ts, key, |_| {}).is_some()
};
if is_remote {
self.release_remote_private_route(key)
} else {
self.release_allocated_route(key)
}
} }
/// Find first matching unpublished route that fits into the selection criteria /// Find first matching unpublished route that fits into the selection criteria
fn first_unpublished_route_inner<'a>( /// Don't pick any routes that have failed and haven't been tested yet
fn first_available_route_inner<'a>(
inner: &'a RouteSpecStoreInner, inner: &'a RouteSpecStoreInner,
min_hop_count: usize, min_hop_count: usize,
max_hop_count: usize, max_hop_count: usize,
@ -930,6 +1079,7 @@ impl RouteSpecStore {
directions: DirectionSet, directions: DirectionSet,
avoid_node_ids: &[DHTKey], avoid_node_ids: &[DHTKey],
) -> Option<DHTKey> { ) -> Option<DHTKey> {
let cur_ts = intf::get_timestamp();
for detail in &inner.content.details { for detail in &inner.content.details {
if detail.1.stability >= stability if detail.1.stability >= stability
&& detail.1.sequencing >= sequencing && detail.1.sequencing >= sequencing
@ -937,6 +1087,7 @@ impl RouteSpecStore {
&& detail.1.hops.len() <= max_hop_count && detail.1.hops.len() <= max_hop_count
&& detail.1.directions.is_subset(directions) && detail.1.directions.is_subset(directions)
&& !detail.1.published && !detail.1.published
&& !detail.1.stats.needs_testing(cur_ts)
{ {
let mut avoid = false; let mut avoid = false;
for h in &detail.1.hops { for h in &detail.1.hops {
@ -953,19 +1104,47 @@ impl RouteSpecStore {
None None
} }
/// List all routes /// List all allocated routes
pub fn list_routes(&self) -> Vec<DHTKey> { pub fn list_allocated_routes<F, R>(&self, mut filter: F) -> Vec<R>
where
F: FnMut(&DHTKey, &RouteSpecDetail) -> Option<R>,
{
let inner = self.inner.lock(); let inner = self.inner.lock();
let mut out = Vec::with_capacity(inner.content.details.len()); let mut out = Vec::with_capacity(inner.content.details.len());
for detail in &inner.content.details { for detail in &inner.content.details {
out.push(*detail.0); if let Some(x) = filter(detail.0, detail.1) {
out.push(x);
}
}
out
}
/// List all allocated routes
pub fn list_remote_routes<F, R>(&self, mut filter: F) -> Vec<R>
where
F: FnMut(&DHTKey, &RemotePrivateRouteInfo) -> Option<R>,
{
let inner = self.inner.lock();
let mut out = Vec::with_capacity(inner.cache.remote_private_route_cache.len());
for info in &inner.cache.remote_private_route_cache {
if let Some(x) = filter(info.0, info.1) {
out.push(x);
}
} }
out out
} }
/// Get the debug description of a route /// Get the debug description of a route
pub fn debug_route(&self, key: &DHTKey) -> Option<String> { pub fn debug_route(&self, key: &DHTKey) -> Option<String> {
let inner = &*self.inner.lock(); let inner = &mut *self.inner.lock();
let cur_ts = intf::get_timestamp();
// If this is a remote route, print it
if let Some(s) =
Self::with_peek_remote_private_route(inner, cur_ts, key, |rpi| format!("{:#?}", rpi))
{
return Some(s);
}
// Otherwise check allocated routes
Self::detail(inner, key).map(|rsd| format!("{:#?}", rsd)) Self::detail(inner, key).map(|rsd| format!("{:#?}", rsd))
} }
@ -1028,19 +1207,13 @@ impl RouteSpecStore {
} }
}; };
let PrivateRouteHops::FirstHop(pr_first_hop) = &private_route.hops else {
bail!("compiled private route should have first hop");
};
// If the safety route requested is also the private route, this is a loopback test, just accept it // If the safety route requested is also the private route, this is a loopback test, just accept it
let sr_pubkey = if safety_spec.preferred_route == Some(private_route.public_key) { let sr_pubkey = if safety_spec.preferred_route == Some(private_route.public_key) {
// Private route is also safety route during loopback test // Private route is also safety route during loopback test
private_route.public_key private_route.public_key
} else { } else {
// Get the safety route to use from the spec let Some(avoid_node_id) = private_route.first_hop_node_id() else {
let avoid_node_id = match &pr_first_hop.node { bail!("compiled private route should have first hop");
RouteNode::NodeId(n) => n.key,
RouteNode::PeerInfo(p) => p.node_id.key,
}; };
let Some(sr_pubkey) = self.get_route_for_safety_spec_inner(inner, rti, &safety_spec, Direction::Outbound.into(), &[avoid_node_id])? else { let Some(sr_pubkey) = self.get_route_for_safety_spec_inner(inner, rti, &safety_spec, Direction::Outbound.into(), &[avoid_node_id])? else {
// No safety route could be found for this spec // No safety route could be found for this spec
@ -1176,6 +1349,7 @@ impl RouteSpecStore {
} }
/// Get a route that matches a particular safety spec /// Get a route that matches a particular safety spec
#[instrument(level = "trace", skip(self, inner, rti), ret, err)]
fn get_route_for_safety_spec_inner( fn get_route_for_safety_spec_inner(
&self, &self,
inner: &mut RouteSpecStoreInner, inner: &mut RouteSpecStoreInner,
@ -1204,7 +1378,7 @@ impl RouteSpecStore {
} }
// Select a safety route from the pool or make one if we don't have one that matches // Select a safety route from the pool or make one if we don't have one that matches
let sr_pubkey = if let Some(sr_pubkey) = Self::first_unpublished_route_inner( let sr_pubkey = if let Some(sr_pubkey) = Self::first_available_route_inner(
inner, inner,
safety_spec.hop_count, safety_spec.hop_count,
safety_spec.hop_count, safety_spec.hop_count,
@ -1238,6 +1412,7 @@ impl RouteSpecStore {
} }
/// Get a private sroute to use for the answer to question /// Get a private sroute to use for the answer to question
#[instrument(level = "trace", skip(self), ret, err)]
pub fn get_private_route_for_safety_spec( pub fn get_private_route_for_safety_spec(
&self, &self,
safety_spec: &SafetySpec, safety_spec: &SafetySpec,
@ -1257,6 +1432,7 @@ impl RouteSpecStore {
} }
/// Assemble private route for publication /// Assemble private route for publication
#[instrument(level = "trace", skip(self), err)]
pub fn assemble_private_route( pub fn assemble_private_route(
&self, &self,
key: &DHTKey, key: &DHTKey,
@ -1341,30 +1517,59 @@ impl RouteSpecStore {
} }
/// Import a remote private route for compilation /// Import a remote private route for compilation
#[instrument(level = "trace", skip(self, blob), ret, err)]
pub fn import_remote_private_route(&self, blob: Vec<u8>) -> EyreResult<DHTKey> { pub fn import_remote_private_route(&self, blob: Vec<u8>) -> EyreResult<DHTKey> {
// decode the pr blob // decode the pr blob
let private_route = RouteSpecStore::blob_to_private_route(blob)?; let private_route = RouteSpecStore::blob_to_private_route(blob)?;
// store the private route in our cache // ensure private route has first hop
let inner = &mut *self.inner.lock(); if !matches!(private_route.hops, PrivateRouteHops::FirstHop(_)) {
let cur_ts = intf::get_timestamp(); bail!("private route must have first hop");
}
// ensure this isn't also an allocated route
let inner = &mut *self.inner.lock();
if Self::detail(inner, &private_route.public_key).is_some() {
bail!("should not import allocated route");
}
// store the private route in our cache
let cur_ts = intf::get_timestamp();
let key = Self::with_create_remote_private_route(inner, cur_ts, private_route, |r| { let key = Self::with_create_remote_private_route(inner, cur_ts, private_route, |r| {
r.private_route.as_ref().unwrap().public_key.clone() r.private_route.as_ref().unwrap().public_key.clone()
}); });
Ok(key) Ok(key)
} }
/// Release a remote private route that is no longer in use
#[instrument(level = "trace", skip(self), ret)]
fn release_remote_private_route(&self, key: &DHTKey) -> bool {
let inner = &mut *self.inner.lock();
if inner.cache.remote_private_route_cache.remove(key).is_some() {
// Mark it as dead for the update
inner.cache.dead_remote_routes.push(*key);
true
} else {
false
}
}
/// Retrieve an imported remote private route by its public key /// Retrieve an imported remote private route by its public key
pub fn get_remote_private_route(&self, key: &DHTKey) -> EyreResult<PrivateRoute> { pub fn get_remote_private_route(&self, key: &DHTKey) -> Option<PrivateRoute> {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
let cur_ts = intf::get_timestamp(); let cur_ts = intf::get_timestamp();
let Some(pr) = Self::with_get_remote_private_route(inner, cur_ts, key, |r| { Self::with_get_remote_private_route(inner, cur_ts, key, |r| {
r.private_route.as_ref().unwrap().clone() r.private_route.as_ref().unwrap().clone()
}) else { })
bail!("remote private route not found"); }
};
Ok(pr) /// Retrieve an imported remote private route by its public key but don't 'touch' it
pub fn peek_remote_private_route(&self, key: &DHTKey) -> Option<PrivateRoute> {
let inner = &mut *self.inner.lock();
let cur_ts = intf::get_timestamp();
Self::with_peek_remote_private_route(inner, cur_ts, key, |r| {
r.private_route.as_ref().unwrap().clone()
})
} }
// get or create a remote private route cache entry // get or create a remote private route cache entry
@ -1401,7 +1606,19 @@ impl RouteSpecStore {
last_touched_ts: cur_ts, last_touched_ts: cur_ts,
stats: RouteStats::new(cur_ts), stats: RouteStats::new(cur_ts),
}); });
f(rpr)
let out = f(rpr);
// Ensure we LRU out items
if inner.cache.remote_private_route_cache.len()
> inner.cache.remote_private_route_cache.capacity()
{
let (dead_k, _) = inner.cache.remote_private_route_cache.remove_lru().unwrap();
// Mark it as dead for the update
inner.cache.dead_remote_routes.push(dead_k);
}
out
} }
// get a remote private route cache entry // get a remote private route cache entry
@ -1420,16 +1637,41 @@ impl RouteSpecStore {
return Some(f(rpr)); return Some(f(rpr));
} }
inner.cache.remote_private_route_cache.remove(key); inner.cache.remote_private_route_cache.remove(key);
inner.cache.dead_remote_routes.push(*key);
None None
} }
// peek a remote private route cache entry
fn with_peek_remote_private_route<F, R>(
inner: &mut RouteSpecStoreInner,
cur_ts: u64,
key: &DHTKey,
f: F,
) -> Option<R>
where
F: FnOnce(&mut RemotePrivateRouteInfo) -> R,
{
match inner.cache.remote_private_route_cache.entry(*key) {
hashlink::lru_cache::Entry::Occupied(mut o) => {
let rpr = o.get_mut();
if cur_ts - rpr.last_touched_ts < REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY {
return Some(f(rpr));
}
o.remove();
inner.cache.dead_remote_routes.push(*key);
None
}
hashlink::lru_cache::Entry::Vacant(_) => None,
}
}
/// Check to see if this remote (not ours) private route has seen our node info yet /// Check to see if this remote (not ours) private route has seen our node info yet
/// This returns true if we have sent non-safety-route node info to the /// This returns true if we have sent non-safety-route node info to the
/// private route and gotten a response before /// private route and gotten a response before
pub fn has_remote_private_route_seen_our_node_info(&self, key: &DHTKey) -> bool { pub fn has_remote_private_route_seen_our_node_info(&self, key: &DHTKey) -> bool {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
let cur_ts = intf::get_timestamp(); let cur_ts = intf::get_timestamp();
Self::with_get_remote_private_route(inner, cur_ts, key, |rpr| rpr.seen_our_node_info) Self::with_peek_remote_private_route(inner, cur_ts, key, |rpr| rpr.seen_our_node_info)
.unwrap_or_default() .unwrap_or_default()
} }
@ -1468,7 +1710,7 @@ impl RouteSpecStore {
} }
// Check for remote route // Check for remote route
if let Some(res) = if let Some(res) =
Self::with_get_remote_private_route(inner, cur_ts, key, |rpr| f(&mut rpr.stats)) Self::with_peek_remote_private_route(inner, cur_ts, key, |rpr| f(&mut rpr.stats))
{ {
return Some(res); return Some(res);
} }
@ -1478,6 +1720,7 @@ impl RouteSpecStore {
} }
/// Clear caches when local our local node info changes /// Clear caches when local our local node info changes
#[instrument(level = "trace", skip(self))]
pub fn reset(&self) { pub fn reset(&self) {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();

View File

@ -1,33 +1,97 @@
use super::super::*; use super::super::*;
use crate::xx::*; use crate::xx::*;
use futures_util::stream::{FuturesOrdered, StreamExt}; use futures_util::stream::{FuturesUnordered, StreamExt};
use futures_util::FutureExt;
use stop_token::future::FutureExt as StopFutureExt; use stop_token::future::FutureExt as StopFutureExt;
impl RoutingTable { impl RoutingTable {
// Keep private routes assigned and accessible /// Keep private routes assigned and accessible
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self, stop_token), err)]
pub(crate) async fn private_route_management_task_routine( pub(crate) async fn private_route_management_task_routine(
self, self,
_stop_token: StopToken, stop_token: StopToken,
_last_ts: u64, _last_ts: u64,
cur_ts: u64, cur_ts: u64,
) -> EyreResult<()> { ) -> EyreResult<()> {
// Get our node's current node info and network class and do the right thing // Get our node's current node info and network class and do the right thing
let own_peer_info = self.get_own_peer_info(RoutingDomain::PublicInternet); let network_class = self
let network_class = self.get_network_class(RoutingDomain::PublicInternet); .get_network_class(RoutingDomain::PublicInternet)
.unwrap_or(NetworkClass::Invalid);
// Get routing domain editor // If we don't know our network class then don't do this yet
let mut editor = self.edit_routing_domain(RoutingDomain::PublicInternet); if network_class == NetworkClass::Invalid {
return Ok(());
// Do we know our network class yet?
if let Some(network_class) = network_class {
// see if we have any routes that need testing
} }
// Commit the changes // Collect any routes that need that need testing
editor.commit().await; let rss = self.route_spec_store();
let mut routes_needing_testing = rss.list_allocated_routes(|k, v| {
let stats = v.get_stats();
if stats.needs_testing(cur_ts) {
return Some(*k);
} else {
return None;
}
});
let mut remote_routes_needing_testing = rss.list_remote_routes(|k, v| {
let stats = v.get_stats();
if stats.needs_testing(cur_ts) {
return Some(*k);
} else {
return None;
}
});
routes_needing_testing.append(&mut remote_routes_needing_testing);
// Test all the routes that need testing at the same time
#[derive(Default, Debug)]
struct TestRouteContext {
failed: bool,
dead_routes: Vec<DHTKey>,
}
if !routes_needing_testing.is_empty() {
let mut unord = FuturesUnordered::new();
let ctx = Arc::new(Mutex::new(TestRouteContext::default()));
for r in routes_needing_testing {
let rss = rss.clone();
let ctx = ctx.clone();
unord.push(
async move {
let success = match rss.test_route(&r).await {
Ok(v) => v,
Err(e) => {
log_rtab!(error "test route failed: {}", e);
ctx.lock().failed = true;
return;
}
};
if success {
// Route is okay, leave it alone
return;
}
// Route test failed
ctx.lock().dead_routes.push(r);
}
.instrument(Span::current())
.boxed(),
);
}
// Wait for test_route futures to complete in parallel
while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
// Process failed routes
let ctx = &mut *ctx.lock();
for r in &ctx.dead_routes {
log_rtab!(debug "Dead route: {}", &r);
rss.release_route(r);
}
}
// Send update (also may send updates for released routes done by other parts of the program)
rss.send_route_update();
Ok(()) Ok(())
} }

View File

@ -205,7 +205,7 @@ impl RPCProcessor {
private_route, private_route,
safety_selection, safety_selection,
} => { } => {
let PrivateRouteHops::FirstHop(pr_first_hop) = &private_route.hops else { let Some(avoid_node_id) = private_route.first_hop_node_id() else {
return Err(RPCError::internal("destination private route must have first hop")); return Err(RPCError::internal("destination private route must have first hop"));
}; };
@ -238,11 +238,6 @@ impl RPCProcessor {
private_route.public_key private_route.public_key
} else { } else {
// Get the privat route to respond to that matches the safety route spec we sent the request with // Get the privat route to respond to that matches the safety route spec we sent the request with
let avoid_node_id = match &pr_first_hop.node {
RouteNode::NodeId(n) => n.key,
RouteNode::PeerInfo(p) => p.node_id.key,
};
let Some(pr_key) = rss let Some(pr_key) = rss
.get_private_route_for_safety_spec(safety_spec, &[avoid_node_id]) .get_private_route_for_safety_spec(safety_spec, &[avoid_node_id])
.map_err(RPCError::internal)? else { .map_err(RPCError::internal)? else {

View File

@ -34,13 +34,13 @@ fn get_route_id(rss: RouteSpecStore) -> impl Fn(&str) -> Option<DHTKey> {
return move |text: &str| { return move |text: &str| {
match DHTKey::try_decode(text).ok() { match DHTKey::try_decode(text).ok() {
Some(key) => { Some(key) => {
let routes = rss.list_routes(); let routes = rss.list_allocated_routes(|k, _| Some(*k));
if routes.contains(&key) { if routes.contains(&key) {
return Some(key); return Some(key);
} }
} }
None => { None => {
let routes = rss.list_routes(); let routes = rss.list_allocated_routes(|k, _| Some(*k));
for r in routes { for r in routes {
let rkey = r.encode(); let rkey = r.encode();
if rkey.starts_with(text) { if rkey.starts_with(text) {
@ -126,14 +126,11 @@ fn get_destination(routing_table: RoutingTable) -> impl FnOnce(&str) -> Option<D
let mut dc = DEBUG_CACHE.lock(); let mut dc = DEBUG_CACHE.lock();
let pr_pubkey = dc.imported_routes.get(n)?; let pr_pubkey = dc.imported_routes.get(n)?;
let rss = routing_table.route_spec_store(); let rss = routing_table.route_spec_store();
let private_route = match rss.get_remote_private_route(&pr_pubkey) { let Some(private_route) = rss.get_remote_private_route(&pr_pubkey) else {
Err(_) => { // Remove imported route
// Remove imported route dc.imported_routes.remove(n);
dc.imported_routes.remove(n); info!("removed dead imported route {}", n);
info!("removed dead imported route {}", n); return None;
return None;
}
Ok(v) => v,
}; };
Some(Destination::private_route( Some(Destination::private_route(
private_route, private_route,
@ -636,11 +633,9 @@ impl VeilidAPI {
let route_id = get_debug_argument_at(&args, 1, "debug_route", "route_id", get_dht_key)?; let route_id = get_debug_argument_at(&args, 1, "debug_route", "route_id", get_dht_key)?;
// Release route // Release route
let out = match rss.release_route(route_id) { let out = match rss.release_route(&route_id) {
Ok(()) => format!("Released"), true => "Released".to_owned(),
Err(e) => { false => "Route does not exist".to_owned(),
format!("Route release failed: {}", e)
}
}; };
Ok(out) Ok(out)
@ -730,7 +725,7 @@ impl VeilidAPI {
let routing_table = netman.routing_table(); let routing_table = netman.routing_table();
let rss = routing_table.route_spec_store(); let rss = routing_table.route_spec_store();
let routes = rss.list_routes(); let routes = rss.list_allocated_routes(|k, _| Some(*k));
let mut out = format!("Routes: (count = {}):\n", routes.len()); let mut out = format!("Routes: (count = {}):\n", routes.len());
for r in routes { for r in routes {
out.push_str(&format!("{}\n", r.encode())); out.push_str(&format!("{}\n", r.encode()));

View File

@ -2789,8 +2789,7 @@ impl VeilidAPI {
.await .await
.map_err(VeilidAPIError::no_connection)? .map_err(VeilidAPIError::no_connection)?
{ {
rss.release_route(pr_pubkey) rss.release_route(&pr_pubkey);
.map_err(VeilidAPIError::generic)?;
return Err(VeilidAPIError::generic("allocated route failed to test")); return Err(VeilidAPIError::generic("allocated route failed to test"));
} }
let private_route = rss let private_route = rss
@ -2799,8 +2798,7 @@ impl VeilidAPI {
let blob = match RouteSpecStore::private_route_to_blob(&private_route) { let blob = match RouteSpecStore::private_route_to_blob(&private_route) {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
rss.release_route(pr_pubkey) rss.release_route(&pr_pubkey);
.map_err(VeilidAPIError::generic)?;
return Err(VeilidAPIError::internal(e)); return Err(VeilidAPIError::internal(e));
} }
}; };

View File

@ -129,9 +129,12 @@ impl RoutingContext {
Target::PrivateRoute(pr) => { Target::PrivateRoute(pr) => {
// Get remote private route // Get remote private route
let rss = self.api.routing_table()?.route_spec_store(); let rss = self.api.routing_table()?.route_spec_store();
let private_route = rss let Some(private_route) = rss
.get_remote_private_route(&pr) .get_remote_private_route(&pr)
.map_err(|_| VeilidAPIError::KeyNotFound { key: pr })?; else {
return Err(VeilidAPIError::KeyNotFound { key: pr });
};
Ok(rpc_processor::Destination::PrivateRoute { Ok(rpc_processor::Destination::PrivateRoute {
private_route, private_route,
safety_selection: self.unlocked_inner.safety_selection, safety_selection: self.unlocked_inner.safety_selection,

View File

@ -1266,6 +1266,10 @@ abstract class VeilidUpdate {
{ {
return VeilidUpdateConfig(state: VeilidStateConfig.fromJson(json)); return VeilidUpdateConfig(state: VeilidStateConfig.fromJson(json));
} }
case "Route":
{
return VeilidUpdateRoute(state: VeilidStateRoute.fromJson(json));
}
default: default:
{ {
throw VeilidAPIExceptionInternal( throw VeilidAPIExceptionInternal(
@ -1380,6 +1384,19 @@ class VeilidUpdateConfig implements VeilidUpdate {
} }
} }
class VeilidUpdateRoute implements VeilidUpdate {
final VeilidStateRoute state;
//
VeilidUpdateRoute({required this.state});
@override
Map<String, dynamic> get json {
var jsonRep = state.json;
jsonRep['kind'] = "Route";
return jsonRep;
}
}
////////////////////////////////////// //////////////////////////////////////
/// VeilidStateAttachment /// VeilidStateAttachment
@ -1444,7 +1461,28 @@ class VeilidStateConfig {
: config = jsonDecode(json['config']); : config = jsonDecode(json['config']);
Map<String, dynamic> get json { Map<String, dynamic> get json {
return {'config': jsonEncode(config)}; return {'config': config};
}
}
//////////////////////////////////////
/// VeilidStateRoute
class VeilidStateRoute {
final List<String> deadRoutes;
final List<String> deadRemoteRoutes;
VeilidStateRoute({
required this.deadRoutes,
required this.deadRemoteRoutes,
});
VeilidStateRoute.fromJson(Map<String, dynamic> json)
: deadRoutes = jsonDecode(json['dead_routes']),
deadRemoteRoutes = jsonDecode(json['dead_remote_routes']);
Map<String, dynamic> get json {
return {'dead_routes': deadRoutes, 'dead_remote_routes': deadRemoteRoutes};
} }
} }