From 7962d3fe110ff6046ad0cd3d6f2fc19418545401 Mon Sep 17 00:00:00 2001 From: John Smith Date: Fri, 24 Feb 2023 21:02:24 -0500 Subject: [PATCH] more private route work --- .../src/routing_table/route_spec_store/mod.rs | 35 + .../remote_private_route_info.rs | 51 ++ .../route_spec_store/route_set_spec_detail.rs | 112 +++ .../route_spec_store.rs | 691 ++---------------- .../route_spec_store_cache.rs | 325 ++++++++ .../route_spec_store_content.rs | 68 ++ .../route_spec_store/route_stats.rs | 129 ++++ .../src/routing_table/routing_table_inner.rs | 4 +- veilid-core/src/rpc_processor/destination.rs | 4 +- veilid-core/src/veilid_api/routing_context.rs | 10 +- veilid-core/src/veilid_api/types.rs | 4 +- 11 files changed, 810 insertions(+), 623 deletions(-) create mode 100644 veilid-core/src/routing_table/route_spec_store/mod.rs create mode 100644 veilid-core/src/routing_table/route_spec_store/remote_private_route_info.rs create mode 100644 veilid-core/src/routing_table/route_spec_store/route_set_spec_detail.rs rename veilid-core/src/routing_table/{ => route_spec_store}/route_spec_store.rs (73%) create mode 100644 veilid-core/src/routing_table/route_spec_store/route_spec_store_cache.rs create mode 100644 veilid-core/src/routing_table/route_spec_store/route_spec_store_content.rs create mode 100644 veilid-core/src/routing_table/route_spec_store/route_stats.rs diff --git a/veilid-core/src/routing_table/route_spec_store/mod.rs b/veilid-core/src/routing_table/route_spec_store/mod.rs new file mode 100644 index 00000000..d453caa9 --- /dev/null +++ b/veilid-core/src/routing_table/route_spec_store/mod.rs @@ -0,0 +1,35 @@ +use super::*; + +mod remote_private_route_info; +mod route_set_spec_detail; +mod route_spec_store; +mod route_spec_store_cache; +mod route_spec_store_content; +mod route_stats; + +pub use remote_private_route_info::*; +pub use route_set_spec_detail::*; +pub use route_spec_store::*; +pub use route_spec_store_cache::*; +pub use route_spec_store_content::*; +pub use route_stats::*; + +use crate::veilid_api::*; +use rkyv::{ + with::Skip, Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize, +}; + +/// The size of the remote private route cache +const REMOTE_PRIVATE_ROUTE_CACHE_SIZE: usize = 1024; +/// Remote private route cache entries expire in 5 minutes if they haven't been used +const REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY: TimestampDuration = TimestampDuration::new(300_000_000u64); +/// Amount of time a route can remain idle before it gets tested +const ROUTE_MIN_IDLE_TIME_MS: u32 = 30_000; +/// The size of the compiled route cache +const COMPILED_ROUTE_CACHE_SIZE: usize = 256; + +/// The type of an allocated route set id +pub type RouteSetSpecId = String; + +/// Type type of an imported remote route set id +pub type RemotePrivateRouteId = String; diff --git a/veilid-core/src/routing_table/route_spec_store/remote_private_route_info.rs b/veilid-core/src/routing_table/route_spec_store/remote_private_route_info.rs new file mode 100644 index 00000000..e931b504 --- /dev/null +++ b/veilid-core/src/routing_table/route_spec_store/remote_private_route_info.rs @@ -0,0 +1,51 @@ +use super::*; + +/// What remote private routes have seen +#[derive(Debug, Clone, Default)] +pub struct RemotePrivateRouteInfo { + /// The private routes themselves + private_routes: Vec, + /// Did this remote private route see our node info due to no safety route in use + last_seen_our_node_info_ts: Timestamp, + /// Last time this remote private route was requested for any reason (cache expiration) + last_touched_ts: Timestamp, + /// Stats + stats: RouteStats, +} + +impl RemotePrivateRouteInfo { + pub fn new(private_routes: Vec, cur_ts: Timestamp) -> Self { + RemotePrivateRouteInfo { + private_routes, + last_seen_our_node_info_ts: Timestamp::new(0), + last_touched_ts: cur_ts, + stats: RouteStats::new(cur_ts), + } + } + pub fn get_private_routes(&self) -> &[PrivateRoute] { + &self.private_routes + } + pub fn get_stats(&self) -> &RouteStats { + &self.stats + } + pub fn get_stats_mut(&mut self) -> &mut RouteStats { + &mut self.stats + } + + // Check to see if this remote private route has expired + pub fn did_expire(&self, cur_ts: Timestamp) -> bool { + cur_ts.saturating_sub(self.last_touched_ts) >= REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY + } + + /// Start fresh if this had expired + pub fn unexpire(&mut self, cur_ts: Timestamp) { + self.last_seen_our_node_info_ts = Timestamp::new(0); + self.last_touched_ts = cur_ts; + self.stats = RouteStats::new(cur_ts); + } + + /// Note when this was last used + pub fn touch(&mut self, cur_ts: Timestamp) { + self.last_touched_ts = cur_ts; + } +} diff --git a/veilid-core/src/routing_table/route_spec_store/route_set_spec_detail.rs b/veilid-core/src/routing_table/route_spec_store/route_set_spec_detail.rs new file mode 100644 index 00000000..86b1d73d --- /dev/null +++ b/veilid-core/src/routing_table/route_spec_store/route_set_spec_detail.rs @@ -0,0 +1,112 @@ +use super::*; + +#[derive(Clone, Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)] +#[archive_attr(repr(C), derive(CheckBytes))] +pub struct RouteSpecDetail { + /// Crypto kind + pub crypto_kind: CryptoKind, + /// Secret key + #[with(Skip)] + pub secret_key: SecretKey, + /// Route hops (node id keys) + pub hops: Vec, +} + +#[derive(Clone, Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)] +#[archive_attr(repr(C), derive(CheckBytes))] +pub struct RouteSetSpecDetail { + /// Route set per crypto kind + route_set: BTreeMap, + /// Route noderefs + #[with(Skip)] + hop_node_refs: Vec, + /// Published private route, do not reuse for ephemeral routes + /// Not serialized because all routes should be re-published when restarting + #[with(Skip)] + published: bool, + /// Directions this route is guaranteed to work in + #[with(RkyvEnumSet)] + directions: DirectionSet, + /// Stability preference (prefer reliable nodes over faster) + stability: Stability, + /// Sequencing capability (connection oriented protocols vs datagram) + can_do_sequenced: bool, + /// Stats + stats: RouteStats, +} + +impl RouteSetSpecDetail { + pub fn get_route_by_key(&self, key: &PublicKey) -> Option<&RouteSpecDetail> { + self.route_set.get(key) + } + pub fn get_route_by_key_mut(&mut self, key: &PublicKey) -> Option<&mut RouteSpecDetail> { + self.route_set.get_mut(key) + } + pub fn get_route_set_keys(&self) -> TypedKeySet { + let mut tks = TypedKeySet::new(); + for (k, v) in &self.route_set { + tks.add(TypedKey::new(v.crypto_kind, *k)); + } + tks + } + pub fn iter_route_set( + &self, + ) -> alloc::collections::btree_map::Iter { + self.route_set.iter() + } + pub fn get_stats(&self) -> &RouteStats { + &self.stats + } + pub fn get_stats_mut(&mut self) -> &mut RouteStats { + &mut self.stats + } + pub fn is_published(&self) -> bool { + self.published + } + pub fn set_published(&mut self, published: bool) { + self.published = self.published; + } + pub fn hop_count(&self) -> usize { + self.hop_node_refs.len() + } + pub fn get_stability(&self) -> Stability { + self.stability + } + pub fn is_sequencing_match(&self, sequencing: Sequencing) -> bool { + match sequencing { + Sequencing::NoPreference => true, + Sequencing::PreferOrdered => true, + Sequencing::EnsureOrdered => self.can_do_sequenced, + } + } + + /// Generate a key for the cache that can be used to uniquely identify this route's contents + pub fn make_cache_key(&self) -> Vec { + let hops = &self.hop_node_refs; + let mut cache: Vec = Vec::with_capacity(hops.len() * PUBLIC_KEY_LENGTH); + for hop in hops { + cache.extend_from_slice(&hop.best_node_id().key.bytes); + } + cache + } + + /// Generate a user-facing identifier for this allocated route + pub fn make_id(&self) -> RouteSetSpecId { + let mut idbytes = [0u8; 16]; + for (pk, _) in self.route_set.iter() { + for (i, x) in pk.bytes.iter().enumerate() { + idbytes[i % 16] ^= *x; + } + } + let id = format!( + "{:08x}-{:04x}-{:04x}-{:04x}-{:08x}{:04x}", + u32::from_be_bytes(idbytes[0..4].try_into().expect("32 bits")), + u16::from_be_bytes(idbytes[4..6].try_into().expect("16 bits")), + u16::from_be_bytes(idbytes[6..8].try_into().expect("16 bits")), + u16::from_be_bytes(idbytes[8..10].try_into().expect("16 bits")), + u32::from_be_bytes(idbytes[10..14].try_into().expect("32 bits")), + u16::from_be_bytes(idbytes[14..16].try_into().expect("16 bits")) + ); + id + } +} diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store/route_spec_store.rs similarity index 73% rename from veilid-core/src/routing_table/route_spec_store.rs rename to veilid-core/src/routing_table/route_spec_store/route_spec_store.rs index f82d5faa..dc4046fd 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store/route_spec_store.rs @@ -1,371 +1,4 @@ use super::*; -use crate::veilid_api::*; -use rkyv::{ - with::Skip, Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize, -}; - -/// The size of the remote private route cache -const REMOTE_PRIVATE_ROUTE_CACHE_SIZE: usize = 1024; -/// Remote private route cache entries expire in 5 minutes if they haven't been used -const REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY: TimestampDuration = TimestampDuration::new(300_000_000u64); -/// Amount of time a route can remain idle before it gets tested -const ROUTE_MIN_IDLE_TIME_MS: u32 = 30_000; -/// The size of the compiled route cache -const COMPILED_ROUTE_CACHE_SIZE: usize = 256; - - -// Compiled route key for caching -#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] -struct CompiledRouteCacheKey { - sr_pubkey: PublicKey, - pr_pubkey: PublicKey, -} - -/// Compiled route (safety route + private route) -#[derive(Clone, Debug)] -pub struct CompiledRoute { - /// The safety route attached to the private route - pub safety_route: SafetyRoute, - /// The secret used to encrypt the message payload - pub secret: SecretKey, - /// The node ref to the first hop in the compiled route - pub first_hop: NodeRef, -} - -#[derive(Clone, Debug, Default, RkyvArchive, RkyvSerialize, RkyvDeserialize)] -#[archive_attr(repr(C), derive(CheckBytes))] -pub struct RouteStats { - /// Consecutive failed to send count - #[with(Skip)] - pub failed_to_send: u32, - /// Questions lost - #[with(Skip)] - pub questions_lost: u32, - /// Timestamp of when the route was created - pub created_ts: Timestamp, - /// Timestamp of when the route was last checked for validity - #[with(Skip)] - pub last_tested_ts: Option, - /// Timestamp of when the route was last sent to - #[with(Skip)] - pub last_sent_ts: Option, - /// Timestamp of when the route was last received over - #[with(Skip)] - pub last_received_ts: Option, - /// Transfers up and down - pub transfer_stats_down_up: TransferStatsDownUp, - /// Latency stats - pub latency_stats: LatencyStats, - /// Accounting mechanism for this route's RPC latency - #[with(Skip)] - latency_stats_accounting: LatencyStatsAccounting, - /// Accounting mechanism for the bandwidth across this route - #[with(Skip)] - transfer_stats_accounting: TransferStatsAccounting, -} - -impl RouteStats { - /// Make new route stats - pub fn new(created_ts: Timestamp) -> Self { - Self { - created_ts, - ..Default::default() - } - } - /// Mark a route as having failed to send - pub fn record_send_failed(&mut self) { - self.failed_to_send += 1; - } - - /// Mark a route as having lost a question - pub fn record_question_lost(&mut self) { - self.questions_lost += 1; - } - - /// Mark a route as having received something - pub fn record_received(&mut self, cur_ts: Timestamp, bytes: ByteCount) { - self.last_received_ts = Some(cur_ts); - self.last_tested_ts = Some(cur_ts); - self.transfer_stats_accounting.add_down(bytes); - } - - /// Mark a route as having been sent to - pub fn record_sent(&mut self, cur_ts: Timestamp, bytes: ByteCount) { - self.last_sent_ts = Some(cur_ts); - self.transfer_stats_accounting.add_up(bytes); - } - - /// Mark a route as having been sent to - pub fn record_latency(&mut self, latency: TimestampDuration) { - self.latency_stats = self.latency_stats_accounting.record_latency(latency); - } - - /// Mark a route as having been tested - pub fn record_tested(&mut self, cur_ts: Timestamp) { - self.last_tested_ts = Some(cur_ts); - - // Reset question_lost and failed_to_send if we test clean - self.failed_to_send = 0; - self.questions_lost = 0; - } - - /// Roll transfers for these route stats - pub fn roll_transfers(&mut self, last_ts: Timestamp, cur_ts: Timestamp) { - self.transfer_stats_accounting.roll_transfers( - last_ts, - cur_ts, - &mut self.transfer_stats_down_up, - ) - } - - /// Get the latency stats - pub fn latency_stats(&self) -> &LatencyStats { - &self.latency_stats - } - - /// Get the transfer stats - pub fn transfer_stats(&self) -> &TransferStatsDownUp { - &self.transfer_stats_down_up - } - - /// Reset stats when network restarts - pub fn reset(&mut self) { - self.last_tested_ts = None; - self.last_sent_ts = None; - self.last_received_ts = None; - } - - /// Check if a route needs testing - pub fn needs_testing(&self, cur_ts: Timestamp) -> bool { - // Has the route had any failures lately? - if self.questions_lost > 0 || self.failed_to_send > 0 { - // If so, always test - return true; - } - - // Has the route been tested within the idle time we'd want to check things? - // (also if we've received successfully over the route, this will get set) - if let Some(last_tested_ts) = self.last_tested_ts { - if cur_ts.saturating_sub(last_tested_ts) - > TimestampDuration::new(ROUTE_MIN_IDLE_TIME_MS as u64 * 1000u64) - { - return true; - } - } else { - // If this route has never been tested, it needs to be - return true; - } - - false - } -} - -#[derive(Clone, Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)] -#[archive_attr(repr(C), derive(CheckBytes))] -pub struct RouteSpecDetail { - /// Crypto kind - pub crypto_kind: CryptoKind, - /// Secret key - #[with(Skip)] - pub secret_key: SecretKey, - /// Route hops (node id keys) - pub hops: Vec, -} - -#[derive(Clone, Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)] -#[archive_attr(repr(C), derive(CheckBytes))] -pub struct RouteSetSpecDetail { - /// Route set per crypto kind - route_set: BTreeMap, - /// Route noderefs - #[with(Skip)] - hop_node_refs: Vec, - /// Published private route, do not reuse for ephemeral routes - /// Not serialized because all routes should be re-published when restarting - #[with(Skip)] - published: bool, - /// Directions this route is guaranteed to work in - #[with(RkyvEnumSet)] - directions: DirectionSet, - /// Stability preference (prefer reliable nodes over faster) - stability: Stability, - /// Sequencing capability (connection oriented protocols vs datagram) - can_do_sequenced: bool, - /// Stats - stats: RouteStats, -} - -impl RouteSetSpecDetail { - pub fn get_route_by_key(&self, key: PublicKey) -> Option<&RouteSpecDetail> { - self.route_set.get(&key) - } - pub fn get_route_by_key_mut(&mut self, key: PublicKey) -> Option<&mut RouteSpecDetail> { - self.route_set.get_mut(&key) - } - pub fn get_route_set_keys(&self) -> TypedKeySet { - let mut tks = TypedKeySet::new(); - for (k, v) in &self.route_set { - tks.add(TypedKey::new(v.crypto_kind, *k)); - } - tks - } - pub fn get_stats(&self) -> &RouteStats { - &self.stats - } - pub fn get_stats_mut(&mut self) -> &mut RouteStats { - &mut self.stats - } - pub fn is_published(&self) -> bool { - self.published - } - pub fn hop_count(&self) -> usize { - self.hop_node_refs.len() - } - pub fn get_stability(&self) -> Stability { - self.stability - } - pub fn is_sequencing_match(&self, sequencing: Sequencing) -> bool { - match sequencing { - Sequencing::NoPreference => true, - Sequencing::PreferOrdered => true, - Sequencing::EnsureOrdered => { - self.can_do_sequenced - } - } - } -} - -/// The core representation of the RouteSpecStore that can be serialized -#[derive(Debug, Clone, Default, RkyvArchive, RkyvSerialize, RkyvDeserialize)] -#[archive_attr(repr(C, align(8)), derive(CheckBytes))] -pub struct RouteSpecStoreContent { - /// All of the route sets we have allocated so far indexed by key - id_by_key: HashMap, - /// All of the route sets we have allocated so far - details: HashMap, -} - -impl RouteSpecStoreContent { - pub fn add_detail(&mut self, detail: RouteSetSpecDetail) -> String { - - // generate unique key string - let mut idbytes = [0u8; 16]; - for (pk, _) in &detail.route_set { - for (i, x) in pk.bytes.iter().enumerate() { - idbytes[i % 16] ^= *x; - } - } - let id = format!("{:08x}-{:04x}-{:04x}-{:04x}-{:08x}{:04x}", - u32::from_be_bytes(idbytes[0..4].try_into().expect("32 bits")), - u16::from_be_bytes(idbytes[4..6].try_into().expect("16 bits")), - u16::from_be_bytes(idbytes[6..8].try_into().expect("16 bits")), - u16::from_be_bytes(idbytes[8..10].try_into().expect("16 bits")), - u32::from_be_bytes(idbytes[10..14].try_into().expect("32 bits")), - u16::from_be_bytes(idbytes[14..16].try_into().expect("16 bits"))); - - // also store in id by key table - for (pk, _) in &detail.route_set { - self.id_by_key.insert(*pk, id.clone()); - } - self.details.insert(id.clone(), detail); - - id - } - pub fn remove_detail(&mut self, id: &String) { - let detail = self.details.remove(id).unwrap(); - for (pk, _) in &detail.route_set { - self.id_by_key.remove(&pk).unwrap(); - } - } - pub fn get_detail(&self, id: &String) -> Option<&RouteSetSpecDetail> { - self.details.get(id) - } - pub fn get_detail_mut(&mut self, id: &String) -> Option<&mut RouteSetSpecDetail> { - self.details.get_mut(id) - } - pub fn get_id_by_key(&self, key: &PublicKey) -> Option { - self.id_by_key.get(key).cloned() - } -} - -/// What remote private routes have seen -#[derive(Debug, Clone, Default)] -pub struct RemotePrivateRouteInfo { - /// The private routes themselves - private_routes: HashMap, - /// Did this remote private route see our node info due to no safety route in use - last_seen_our_node_info_ts: Timestamp, - /// Last time this remote private route was requested for any reason (cache expiration) - last_touched_ts: Timestamp, - /// Stats - stats: RouteStats, -} - -impl RemotePrivateRouteInfo { - pub fn get_stats(&self) -> &RouteStats { - &self.stats - } - pub fn get_stats_mut(&mut self) -> &mut RouteStats { - &mut self.stats - } -} - -/// Ephemeral data used to help the RouteSpecStore operate efficiently -#[derive(Debug)] -pub struct RouteSpecStoreCache { - /// How many times nodes have been used - used_nodes: HashMap, - /// How many times nodes have been used at the terminal point of a route - used_end_nodes: HashMap, - /// Route spec hop cache, used to quickly disqualify routes - hop_cache: HashSet>, - /// Remote private routes we've imported and statistics - remote_private_route_set_cache: LruCache, - /// Remote private routes indexed by public key - remote_private_routes_by_key: HashMap, - /// Compiled route cache - compiled_route_cache: LruCache, - /// List of dead allocated routes - dead_routes: Vec, - /// List of dead remote routes - dead_remote_routes: Vec, -} - -impl RouteSpecStoreCache { - pub fn get_used_node_count(&self, node_ids: &TypedKeySet) -> usize { - node_ids.iter().fold(0usize, |acc, k| { - acc + self - .used_nodes - .get(&k) - .cloned() - .unwrap_or_default() - }) - } - pub fn get_used_end_node_count(&self, node_ids: &TypedKeySet) -> usize { - node_ids.iter().fold(0usize, |acc, k| { - acc + self - .used_end_nodes - .get(&k) - .cloned() - .unwrap_or_default() - }) - } -} - -impl Default for RouteSpecStoreCache { - fn default() -> Self { - Self { - used_nodes: Default::default(), - used_end_nodes: Default::default(), - hop_cache: Default::default(), - remote_private_route_set_cache: LruCache::new(REMOTE_PRIVATE_ROUTE_CACHE_SIZE), - remote_private_routes_by_key: HashMap::new(), - compiled_route_cache: LruCache::new(COMPILED_ROUTE_CACHE_SIZE), - dead_routes: Default::default(), - dead_remote_routes: Default::default(), - } - } -} #[derive(Debug)] pub struct RouteSpecStoreInner { @@ -400,23 +33,6 @@ pub struct RouteSpecStore { unlocked_inner: Arc, } -fn route_hops_to_hop_cache(hops: &[NodeRef]) -> Vec { - let mut cache: Vec = Vec::with_capacity(hops.len() * PUBLIC_KEY_LENGTH); - for hop in hops { - cache.extend_from_slice(&hop.best_node_id().key.bytes); - } - cache -} - -/// get the hop cache key for a particular route permutation -fn route_permutation_to_hop_cache(rti: &RoutingTableInner, nodes: &[NodeRef], perm: &[usize]) -> Vec { - let mut cache: Vec = Vec::with_capacity(perm.len() * PUBLIC_KEY_LENGTH); - for n in perm { - cache.extend_from_slice(&nodes[*n].locked(rti).best_node_id().key.bytes) - } - cache -} - /// number of route permutations is the number of unique orderings /// for a set of nodes, given that the first node is fixed fn _get_route_permutation_count(hop_count: usize) -> usize { @@ -526,7 +142,7 @@ impl RouteSpecStore { // Look up all route hop noderefs since we can't serialize those let mut dead_ids = Vec::new(); - for (rsid, rssd) in &mut content.details { + for (rsid, rssd) in content.iter_details_mut() { // Get first route since they all should resolve let Some((pk, rsd)) = rssd.route_set.first_key_value() else { dead_ids.push(rsid.clone()); @@ -555,7 +171,7 @@ impl RouteSpecStore { // Ensure we got secret keys for all the public keys let mut got_secret_key_ids = HashSet::new(); - for (rsid, rssd) in &mut content.details { + for (rsid, rssd) in content.iter_details_mut() { let mut found_all = true; for (pk, rsd) in &mut rssd.route_set { if let Some(sk) = secret_key_map.get(pk) { @@ -571,7 +187,7 @@ impl RouteSpecStore { } // If we missed any, nuke those route ids - let dead_ids:Vec = content.details.keys().filter_map(|id| { + let dead_ids:Vec = content.keys().filter_map(|id| { if !got_secret_key_ids.contains(id) { Some(id.clone()) } else { @@ -589,8 +205,11 @@ impl RouteSpecStore { }; // Rebuild the routespecstore cache - Self::rebuild_cache(&mut inner); + for (_, rssd) in inner.content.iter_details() { + inner.cache.add_to_cache(&rssd); + } + // Return the loaded RouteSpecStore let rss = RouteSpecStore { unlocked_inner: Arc::new(RouteSpecStoreUnlockedInner { max_route_hop_count, @@ -602,6 +221,7 @@ impl RouteSpecStore { Ok(rss) } + #[instrument(level = "trace", skip(self), err)] pub async fn save(&self) -> EyreResult<()> { @@ -628,7 +248,7 @@ impl RouteSpecStore { .protected_store(); let mut out: HashMap = HashMap::new(); - for (rsid, rssd) in &content.details { + for (rsid, rssd) in content.iter_details() { for (pk, rsd) in &rssd.route_set { out.insert(*pk, rsd.secret_key); } @@ -641,17 +261,9 @@ impl RouteSpecStore { #[instrument(level = "trace", skip(self))] pub fn send_route_update(&self) { - let update_callback = self.unlocked_inner.routing_table.update_callback(); - let (dead_routes, dead_remote_routes) = { let mut inner = self.inner.lock(); - if inner.cache.dead_routes.is_empty() && inner.cache.dead_remote_routes.is_empty() { - // Nothing to do - return; - } - let dead_routes = core::mem::take(&mut inner.cache.dead_routes); - let dead_remote_routes = core::mem::take(&mut inner.cache.dead_remote_routes); - (dead_routes, dead_remote_routes) + inner.cache.take_dead_routes() }; let update = VeilidUpdate::Route(VeilidStateRoute { @@ -659,35 +271,11 @@ impl RouteSpecStore { dead_remote_routes, }); + let update_callback = self.unlocked_inner.routing_table.update_callback(); update_callback(update); } - fn add_to_cache(cache: &mut RouteSpecStoreCache, cache_key: Vec, rssd: &RouteSetSpecDetail) { - if !cache.hop_cache.insert(cache_key) { - panic!("route should never be inserted twice"); - } - for (pk, rsd) in &rssd.route_set { - for h in &rsd.hops { - cache - .used_nodes - .entry(TypedKey::new(rsd.crypto_kind, *h)) - .and_modify(|e| *e += 1) - .or_insert(1); - } - cache - .used_end_nodes - .entry(TypedKey::new(rsd.crypto_kind, *rsd.hops.last().unwrap())) - .and_modify(|e| *e += 1) - .or_insert(1); - } - } - fn rebuild_cache(inner: &mut RouteSpecStoreInner) { - for rssd in inner.content.details.values() { - let cache_key = route_hops_to_hop_cache(&rssd.hop_node_refs); - Self::add_to_cache(&mut inner.cache, cache_key, &rssd); - } - } /// Purge the route spec store pub async fn purge(&self) -> EyreResult<()> { @@ -918,11 +506,20 @@ impl RouteSpecStore { // Now go through nodes and try to build a route we haven't seen yet let perm_func = Box::new(|permutation: &[usize]| { - // Get the route cache key + + /// Get the hop cache key for a particular route permutation + /// uses the same algorithm as RouteSetSpecDetail::make_cache_key + fn route_permutation_to_hop_cache(rti: &RoutingTableInner, nodes: &[NodeRef], perm: &[usize]) -> Vec { + let mut cache: Vec = Vec::with_capacity(perm.len() * PUBLIC_KEY_LENGTH); + for n in perm { + cache.extend_from_slice(&nodes[*n].locked(rti).best_node_id().key.bytes) + } + cache + } let cache_key = route_permutation_to_hop_cache(rti, &nodes, permutation); // Skip routes we have already seen - if inner.cache.hop_cache.contains(&cache_key) { + if inner.cache.contains_route(&cache_key) { return None; } @@ -1020,18 +617,16 @@ impl RouteSpecStore { } // Keep this route let route_nodes = permutation.to_vec(); - Some((route_nodes, cache_key, can_do_sequenced)) + Some((route_nodes, can_do_sequenced)) }) as PermFunc; let mut route_nodes: Vec = Vec::new(); - let mut cache_key: Vec = Vec::new(); let mut can_do_sequenced: bool = true; for start in 0..(nodes.len() - hop_count) { // Try the permutations available starting with 'start' if let Some((rn, ck, cds)) = with_route_permutations(hop_count, start, &perm_func) { route_nodes = rn; - cache_key = ck; can_do_sequenced = cds; break; } @@ -1072,7 +667,7 @@ impl RouteSpecStore { drop(perm_func); // Add to cache - Self::add_to_cache(&mut inner.cache, cache_key, &rssd); + inner.cache.add_to_cache(&rssd); // Keep route in spec store let id = inner.content.add_detail(rssd); @@ -1108,7 +703,7 @@ impl RouteSpecStore { log_rpc!(debug "route detail does not exist: {:?}", rsid); return None; }; - let Some(rsd) = rssd.route_set.get(&public_key.key) else { + let Some(rsd) = rssd.get_route_by_key(&public_key.key) else { log_rpc!(debug "route set {:?} does not have key: {:?}", rsid, public_key.key); return None; }; @@ -1145,14 +740,21 @@ impl RouteSpecStore { async fn test_allocated_route(&self, id: &String) -> EyreResult { // Make loopback route to test with let dest = { - xxx figure out how to pick best crypto for the private route - let private_route = self.assemble_private_route(id, None)?; - - let inner = &mut *self.inner.lock(); - let rsd = Self::detail(inner, &key).ok_or_else(|| eyre!("route does not exist"))?; + // Get best route from set // Match the private route's hop length for safety route length - let hop_count = rsd.hops.len(); + let (key, hop_count) = { + let inner = &mut *self.inner.lock(); + let Some(rssd) = inner.content.get_detail(id) else { + bail!("route id not allocated"); + }; + let Some(tkey) = rssd.get_route_set_keys().best() else { + bail!("route does not have best key"); + }; + (tkey.key, rssd.hop_count()) + }; + let private_route = self.assemble_private_route(&key, None)?; + // Always test routes with safety routes that are more likely to succeed let stability = Stability::Reliable; // Routes can test with whatever sequencing they were allocated with @@ -1186,11 +788,14 @@ impl RouteSpecStore { } #[instrument(level = "trace", skip(self), ret, err)] - async fn test_remote_route(&self, key: &TypedKey) -> EyreResult { + async fn test_remote_route(&self, id: &String) -> EyreResult { + // Make private route test let dest = { + // Get best remote route from imported set + // Get the route to test - let private_route = match self.peek_remote_private_route(key) { + let private_route = match self.peek_remote_private_route(id) { Some(pr) => pr, None => return Ok(false), }; @@ -1241,72 +846,39 @@ impl RouteSpecStore { /// Release an allocated route that is no longer in use #[instrument(level = "trace", skip(self), ret)] - fn release_allocated_route(&self, public_key: &PublicKey) -> bool { + fn release_allocated_route(&self, id: &String) -> bool { let mut inner = self.inner.lock(); - let Some(detail) = inner.content.details.remove(public_key) else { + let Some(rssd) = inner.content.remove_detail(id) else { return false; }; - // Mark it as dead for the update - inner.cache.dead_routes.push(*public_key); - // Remove from hop cache - let cache_key = route_hops_to_hop_cache(&detail.hops); - if !inner.cache.hop_cache.remove(&cache_key) { + if !inner.cache.remove_from_cache(&rssd) { panic!("hop cache should have contained cache key"); } - // Remove from used nodes cache - for h in &detail.hops { - match inner.cache.used_nodes.entry(*h) { - std::collections::hash_map::Entry::Occupied(mut o) => { - *o.get_mut() -= 1; - if *o.get() == 0 { - o.remove(); - } - } - std::collections::hash_map::Entry::Vacant(_) => { - panic!("used_nodes cache should have contained hop"); - } - } - } - // Remove from end nodes cache - match inner - .cache - .used_end_nodes - .entry(*detail.hops.last().unwrap()) - { - std::collections::hash_map::Entry::Occupied(mut o) => { - *o.get_mut() -= 1; - if *o.get() == 0 { - o.remove(); - } - } - std::collections::hash_map::Entry::Vacant(_) => { - panic!("used_end_nodes cache should have contained hop"); - } - } + true } /// Release an allocated or remote route that is no longer in use #[instrument(level = "trace", skip(self), ret)] - pub fn release_route(&self, key: &PublicKey) -> bool { + pub fn release_route(&self, id: &String) -> bool { let is_remote = { let inner = &mut *self.inner.lock(); // Release from compiled route cache if it's used there - self.invalidate_compiled_route_cache(inner, key); + self.invalidate_compiled_route_cache(inner, id); // Check to see if this is a remote route let cur_ts = get_aligned_timestamp(); - Self::with_peek_remote_private_route(inner, cur_ts, key, |_| {}).is_some() + Self::with_peek_remote_private_route(inner, cur_ts, id, |_| {}).is_some() }; if is_remote { - self.release_remote_private_route(key) + self.release_remote_private_route(id) } else { - self.release_allocated_route(key) + self.release_allocated_route(id) } } @@ -1869,14 +1441,16 @@ impl RouteSpecStore { } /// Import a remote private route for compilation + /// returns a route set id #[instrument(level = "trace", skip(self, blob), ret, err)] - pub fn import_remote_private_route(&self, blob: Vec) -> EyreResult { + pub fn import_remote_private_route(&self, blob: Vec) -> EyreResult { + // decode the pr blob - let private_routes = RouteSpecStore::blob_to_private_routes(blob)?; + let private_routes = RouteSpecStore::blob_to_private_routes(self.unlocked_inner.routing_table.crypto(), blob)?; - let mut out = TypedKeySet::new(); let inner = &mut *self.inner.lock(); - + + // validate the private routes for private_route in private_routes { // ensure private route has first hop @@ -1885,37 +1459,26 @@ impl RouteSpecStore { } // ensure this isn't also an allocated route - if Self::detail(inner, &private_route.public_key.key).is_some() { + if inner.content.get_id_by_key(&private_route.public_key.key).is_some() { bail!("should not import allocated route"); } - - // store the private route in our cache - let cur_ts = get_aligned_timestamp(); - let key = Self::with_create_remote_private_route(inner, cur_ts, private_route, |r| { - r.private_route.as_ref().unwrap().public_key.clone() - }); - - out.add(key); } - Ok(out) + let cur_ts = get_aligned_timestamp(); + let id = inner.cache.import_remote_private_route(cur_ts, private_routes); + + Ok(id) } /// Release a remote private route that is no longer in use #[instrument(level = "trace", skip(self), ret)] - fn release_remote_private_route(&self, key: &PublicKey) -> bool { + fn release_remote_private_route(&self, id: &String) -> bool { let inner = &mut *self.inner.lock(); - if inner.cache.remote_private_route_cache.remove(key).is_some() { - // Mark it as dead for the update - inner.cache.dead_remote_routes.push(*key); - true - } else { - false - } + inner.cache.remove_remote_private_route(id) } /// Retrieve an imported remote private route by its public key - pub fn get_remote_private_route(&self, key: &PublicKey) -> Option { + pub fn get_remote_private_route(&self, id: &String) -> Option { let inner = &mut *self.inner.lock(); let cur_ts = get_aligned_timestamp(); Self::with_get_remote_private_route(inner, cur_ts, key, |r| { @@ -1924,7 +1487,8 @@ impl RouteSpecStore { } /// Retrieve an imported remote private route by its public key but don't 'touch' it - pub fn peek_remote_private_route(&self, key: &PublicKey) -> Option { + pub fn peek_remote_private_route(&self, id: &String) -> Option { + xx fix these let inner = &mut *self.inner.lock(); let cur_ts = get_aligned_timestamp(); Self::with_peek_remote_private_route(inner, cur_ts, key, |r| { @@ -1932,99 +1496,6 @@ impl RouteSpecStore { }) } - // get or create a remote private route cache entry - fn with_create_remote_private_route( - inner: &mut RouteSpecStoreInner, - cur_ts: Timestamp, - private_route: PrivateRoute, - f: F, - ) -> R - where - F: FnOnce(&mut RemotePrivateRouteInfo) -> R, - { - let pr_pubkey = private_route.public_key.key; - - let rpr = inner - .cache - .remote_private_route_cache - .entry(pr_pubkey) - .and_modify(|rpr| { - if cur_ts.saturating_sub(rpr.last_touched_ts) >= REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY { - // Start fresh if this had expired - rpr.last_seen_our_node_info_ts = Timestamp::new(0); - rpr.last_touched_ts = cur_ts; - rpr.stats = RouteStats::new(cur_ts); - } else { - // If not expired, just mark as being used - rpr.last_touched_ts = cur_ts; - } - }) - .or_insert_with(|| RemotePrivateRouteInfo { - // New remote private route cache entry - private_route: Some(private_route), - last_seen_our_node_info_ts: Timestamp::new(0), - last_touched_ts: cur_ts, - stats: RouteStats::new(cur_ts), - }); - - let out = f(rpr); - - // Ensure we LRU out items - if inner.cache.remote_private_route_cache.len() - > inner.cache.remote_private_route_cache.capacity() - { - let (dead_k, _) = inner.cache.remote_private_route_cache.remove_lru().unwrap(); - // Mark it as dead for the update - inner.cache.dead_remote_routes.push(dead_k); - } - - out - } - - // get a remote private route cache entry - fn with_get_remote_private_route( - inner: &mut RouteSpecStoreInner, - cur_ts: Timestamp, - key: &PublicKey, - f: F, - ) -> Option - where - F: FnOnce(&mut RemotePrivateRouteInfo) -> R, - { - let rpr = inner.cache.remote_private_route_cache.get_mut(key)?; - if cur_ts.saturating_sub(rpr.last_touched_ts) < REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY { - rpr.last_touched_ts = cur_ts; - return Some(f(rpr)); - } - inner.cache.remote_private_route_cache.remove(key); - inner.cache.dead_remote_routes.push(*key); - None - } - - // peek a remote private route cache entry - fn with_peek_remote_private_route( - inner: &mut RouteSpecStoreInner, - cur_ts: Timestamp, - key: &PublicKey, - f: F, - ) -> Option - where - F: FnOnce(&mut RemotePrivateRouteInfo) -> R, - { - match inner.cache.remote_private_route_cache.entry(*key) { - hashlink::lru_cache::Entry::Occupied(mut o) => { - let rpr = o.get_mut(); - if cur_ts.saturating_sub(rpr.last_touched_ts) < REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY { - return Some(f(rpr)); - } - o.remove(); - inner.cache.dead_remote_routes.push(*key); - None - } - hashlink::lru_cache::Entry::Vacant(_) => None, - } - } - /// Check to see if this remote (not ours) private route has seen our current node info yet /// This happens when you communicate with a private route without a safety route pub fn has_remote_private_route_seen_our_node_info(&self, key: &PublicKey) -> bool { @@ -2119,28 +1590,21 @@ impl RouteSpecStore { let inner = &mut *self.inner.lock(); // Clean up local allocated routes - for (_k, v) in &mut inner.content.details { - // Must republish route now - v.published = false; - // Restart stats for routes so we test the route again - v.stats.reset(); - } + inner.content.reset_details(); // Reset private route cache - for (_k, v) in &mut inner.cache.remote_private_route_cache { - // Restart stats for routes so we test the route again - v.stats.reset(); - } + inner.cache.reset_remote_private_routes(); } /// Mark route as published /// When first deserialized, routes must be re-published in order to ensure they remain /// in the RouteSpecStore. - pub fn mark_route_published(&self, key: &PublicKey, published: bool) -> EyreResult<()> { + pub fn mark_route_published(&self, id: &String, published: bool) -> EyreResult<()> { let inner = &mut *self.inner.lock(); - Self::detail_mut(inner, key) - .ok_or_else(|| eyre!("route does not exist"))? - .published = published; + let Some(rssd) = inner.content.get_detail_mut(id) else { + bail!("route does not exist"); + }; + rssd.set_published(published); Ok(()) } @@ -2149,8 +1613,8 @@ impl RouteSpecStore { let inner = &mut *self.inner.lock(); // Roll transfers for locally allocated routes - for rsd in inner.content.details.values_mut() { - rsd.stats.roll_transfers(last_ts, cur_ts); + for rssd in inner.content.details.values_mut() { + rssd.stats.roll_transfers(last_ts, cur_ts); } // Roll transfers for remote private routes for (_k, v) in inner.cache.remote_private_route_cache.iter_mut() { @@ -2216,6 +1680,7 @@ impl RouteSpecStore { let private_route = decode_private_route(&pr_reader, crypto).wrap_err("failed to decode private route")?; out.push(private_route); } + Ok(out) } } diff --git a/veilid-core/src/routing_table/route_spec_store/route_spec_store_cache.rs b/veilid-core/src/routing_table/route_spec_store/route_spec_store_cache.rs new file mode 100644 index 00000000..1abe6e8a --- /dev/null +++ b/veilid-core/src/routing_table/route_spec_store/route_spec_store_cache.rs @@ -0,0 +1,325 @@ +use super::*; + +// Compiled route key for caching +#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] +struct CompiledRouteCacheKey { + sr_pubkey: PublicKey, + pr_pubkey: PublicKey, +} + +/// Compiled route (safety route + private route) +#[derive(Clone, Debug)] +pub struct CompiledRoute { + /// The safety route attached to the private route + pub safety_route: SafetyRoute, + /// The secret used to encrypt the message payload + pub secret: SecretKey, + /// The node ref to the first hop in the compiled route + pub first_hop: NodeRef, +} + +/// Ephemeral data used to help the RouteSpecStore operate efficiently +#[derive(Debug)] +pub struct RouteSpecStoreCache { + /// How many times nodes have been used + used_nodes: HashMap, + /// How many times nodes have been used at the terminal point of a route + used_end_nodes: HashMap, + /// Route spec hop cache, used to quickly disqualify routes + hop_cache: HashSet>, + /// Remote private routes we've imported and statistics + remote_private_route_set_cache: LruCache, + /// Remote private routes indexed by public key + remote_private_routes_by_key: HashMap, + /// Compiled route cache + compiled_route_cache: LruCache, + /// List of dead allocated routes + dead_routes: Vec, + /// List of dead remote routes + dead_remote_routes: Vec, +} + +impl RouteSpecStoreCache { + /// add an allocated route set to our cache via its cache key + pub fn add_to_cache(&mut self, rssd: &RouteSetSpecDetail) { + let cache_key = rssd.make_cache_key(); + if !self.hop_cache.insert(cache_key) { + panic!("route should never be inserted twice"); + } + for (pk, rsd) in rssd.iter_route_set() { + for h in &rsd.hops { + self.used_nodes + .entry(*h) + .and_modify(|e| *e += 1) + .or_insert(1); + } + self.used_end_nodes + .entry(*rsd.hops.last().unwrap()) + .and_modify(|e| *e += 1) + .or_insert(1); + } + } + + /// checks if an allocated route is in our cache + pub fn contains_route(&self, cache_key: &Vec) -> bool { + self.hop_cache.contains(cache_key) + } + + /// removes an allocated route set from our cache + pub fn remove_from_cache(&mut self, rssd: &RouteSetSpecDetail) -> bool { + let cache_key = rssd.make_cache_key(); + + // Remove from hop cache + if !self.hop_cache.remove(&cache_key) { + return false; + } + for (pk, rsd) in rssd.iter_route_set() { + for h in &rsd.hops { + // Remove from used nodes cache + match self.used_nodes.entry(*h) { + std::collections::hash_map::Entry::Occupied(mut o) => { + *o.get_mut() -= 1; + if *o.get() == 0 { + o.remove(); + } + } + std::collections::hash_map::Entry::Vacant(_) => { + panic!("used_nodes cache should have contained hop"); + } + } + } + // Remove from end nodes cache + match self.used_end_nodes.entry(*rsd.hops.last().unwrap()) { + std::collections::hash_map::Entry::Occupied(mut o) => { + *o.get_mut() -= 1; + if *o.get() == 0 { + o.remove(); + } + } + std::collections::hash_map::Entry::Vacant(_) => { + panic!("used_end_nodes cache should have contained hop"); + } + } + } + + // Mark it as dead for the update + self.dead_routes.push(rssd.make_id()); + + true + } + + /// calculate how many times a node with a particular node id set has been used anywhere in the path of our allocated routes + pub fn get_used_node_count(&self, node_ids: &TypedKeySet) -> usize { + node_ids.iter().fold(0usize, |acc, k| { + acc + self.used_nodes.get(&k.key).cloned().unwrap_or_default() + }) + } + + /// calculate how many times a node with a particular node id set has been used at the end of the path of our allocated routes + pub fn get_used_end_node_count(&self, node_ids: &TypedKeySet) -> usize { + node_ids.iter().fold(0usize, |acc, k| { + acc + self.used_end_nodes.get(&k.key).cloned().unwrap_or_default() + }) + } + + /// generate unique remote private route set id for a remote private route set + fn make_remote_private_route_id(private_routes: &[PrivateRoute]) -> String { + let mut idbytes = [0u8; 16]; + for (pk, _) in &rprinfo.private_routes { + for (i, x) in pk.bytes.iter().enumerate() { + idbytes[i % 16] ^= *x; + } + } + let id = format!( + "{:08x}-{:04x}-{:04x}-{:04x}-{:08x}{:04x}", + u32::from_be_bytes(idbytes[0..4].try_into().expect("32 bits")), + u16::from_be_bytes(idbytes[4..6].try_into().expect("16 bits")), + u16::from_be_bytes(idbytes[6..8].try_into().expect("16 bits")), + u16::from_be_bytes(idbytes[8..10].try_into().expect("16 bits")), + u32::from_be_bytes(idbytes[10..14].try_into().expect("32 bits")), + u16::from_be_bytes(idbytes[14..16].try_into().expect("16 bits")) + ); + id + } + + /// add remote private route to caches + /// returns a remote private route set id + fn add_remote_private_route( + &mut self, + rprinfo: RemotePrivateRouteInfo, + ) -> RemotePrivateRouteId { + let id = Self::make_remote_private_route_id(rprinfo.get_private_routes()); + + // also store in id by key table + for (pk, _) in rprinfo.get_private_routes() { + self.remote_private_routes_by_key.insert(*pk, id.clone()); + } + self.remote_private_route_set_cache + .insert(id.clone(), rprinfo, |dead_id, dead_rpri| { + // If anything LRUs out, remove from the by-key table + for (dead_pk, _) in dead_rpri.get_private_routes() { + self.remote_private_routes_by_key.remove(&dead_pk).unwrap(); + } + self.dead_remote_routes.push(dead_id); + }); + + id + } + + /// remote private route cache accessor + fn get_remote_private_route( + &mut self, + id: &RemotePrivateRouteId, + ) -> Option<&RemotePrivateRouteInfo> { + self.remote_private_route_set_cache.get(id) + } + /// mutable remote private route cache accessor + fn get_remote_private_route_mut( + &mut self, + id: &RemotePrivateRouteId, + ) -> Option<&mut RemotePrivateRouteInfo> { + self.remote_private_route_set_cache.get_mut(id) + } + /// mutable remote private route cache accessor without lru action + fn peek_remote_private_route_mut( + &mut self, + id: &RemotePrivateRouteId, + ) -> Option<&mut RemotePrivateRouteInfo> { + self.remote_private_route_set_cache.peek_mut(id) + } + + /// look up a remote private route id by one of the route public keys + pub fn get_remote_private_route_id_by_key( + &self, + key: &PublicKey, + ) -> Option { + self.remote_private_routes_by_key.get(key).cloned() + } + + /// get or create a remote private route cache entry + /// may LRU and/or expire other cache entries to make room for the new one + /// or update an existing entry with the same private route set + /// returns the route set id + pub fn import_remote_private_route( + &mut self, + cur_ts: Timestamp, + private_routes: Vec, + ) -> RemotePrivateRouteId { + // get id for this route set + let id = RouteSpecStoreCache::make_remote_private_route_id(&private_routes); + let rpri = if let Some(rpri) = self.get_remote_private_route_mut(&id) { + if rpri.did_expire(cur_ts) { + // Start fresh if this had expired + rpri.unexpire(cur_ts); + } else { + // If not expired, just mark as being used + rpri.touch(cur_ts); + } + } else { + let rpri = RemotePrivateRouteInfo { + // New remote private route cache entry + private_routes, + last_seen_our_node_info_ts: Timestamp::new(0), + last_touched_ts: cur_ts, + stats: RouteStats::new(cur_ts), + }; + let new_id = self.add_remote_private_route(rpri); + assert_eq!(id, new_id); + if self.get_remote_private_route_mut(&id).is_none() { + bail!("remote private route should exist"); + }; + }; + id + } + + /// remove a remote private route from the cache + pub fn remove_remote_private_route(&mut self, id: &RemotePrivateRouteId) -> bool { + let Some(rprinfo) = self.remote_private_route_set_cache.remove(id) else { + return false; + }; + for (pk, _) in rprinfo.get_private_routes() { + self.remote_private_routes_by_key.remove(&pk).unwrap(); + } + self.dead_remote_routes.push(id.clone()); + true + } + + /// get an existing remote private route cache entry + /// will LRU entries and may expire entries and not return them if they are stale + /// calls a callback with the remote private route info if returned + pub fn with_get_remote_private_route( + &mut self, + cur_ts: Timestamp, + id: &RemotePrivateRouteId, + f: F, + ) -> Option + where + F: FnOnce(&mut RemotePrivateRouteInfo) -> R, + { + if let Some(rpri) = self.get_remote_private_route_mut(&id) { + if !rpri.did_expire(cur_ts) { + rpri.touch(cur_ts); + return Some(f(rpri)); + } + } + self.remove_remote_private_route(&id); + None + } + + // peek a remote private route cache entry + // will not LRU entries but may expire entries and not return them if they are stale + /// calls a callback with the remote private route info if returned + pub fn with_peek_remote_private_route( + &mut self, + cur_ts: Timestamp, + id: &RemotePrivateRouteId, + f: F, + ) -> Option + where + F: FnOnce(&mut RemotePrivateRouteInfo) -> R, + { + if let Some(rpri) = self.peek_remote_private_route_mut(&id) { + if !rpri.did_expire(cur_ts) { + rpri.touch(cur_ts); + return Some(f(rpri)); + } + } + self.remove_remote_private_route(&id); + None + } + + /// Take the dead local and remote routes so we can update clients + pub fn take_dead_routes(&mut self) -> (Vec, Vec) { + if self.dead_routes.is_empty() && self.dead_remote_routes.is_empty() { + // Nothing to do + return; + } + let dead_routes = core::mem::take(&mut self.dead_routes); + let dead_remote_routes = core::mem::take(&mut self.dead_remote_routes); + (dead_routes, dead_remote_routes) + } + + /// Clean up imported remote routes + /// Resets statistics for when our node info changes + pub fn reset_details(&mut self) { + for (_k, v) in self.remote_private_route_cache { + // Restart stats for routes so we test the route again + v.stats.reset(); + } + } +} + +impl Default for RouteSpecStoreCache { + fn default() -> Self { + Self { + used_nodes: Default::default(), + used_end_nodes: Default::default(), + hop_cache: Default::default(), + remote_private_route_set_cache: LruCache::new(REMOTE_PRIVATE_ROUTE_CACHE_SIZE), + remote_private_routes_by_key: HashMap::new(), + compiled_route_cache: LruCache::new(COMPILED_ROUTE_CACHE_SIZE), + dead_routes: Default::default(), + dead_remote_routes: Default::default(), + } + } +} diff --git a/veilid-core/src/routing_table/route_spec_store/route_spec_store_content.rs b/veilid-core/src/routing_table/route_spec_store/route_spec_store_content.rs new file mode 100644 index 00000000..7a8688b8 --- /dev/null +++ b/veilid-core/src/routing_table/route_spec_store/route_spec_store_content.rs @@ -0,0 +1,68 @@ +use super::*; + +/// The core representation of the RouteSpecStore that can be serialized +#[derive(Debug, Clone, Default, RkyvArchive, RkyvSerialize, RkyvDeserialize)] +#[archive_attr(repr(C, align(8)), derive(CheckBytes))] +pub struct RouteSpecStoreContent { + /// All of the route sets we have allocated so far indexed by key + id_by_key: HashMap, + /// All of the route sets we have allocated so far + details: HashMap, +} + +impl RouteSpecStoreContent { + pub fn add_detail(&mut self, detail: RouteSetSpecDetail) -> RouteSetSpecId { + // generate unique key string + let id = detail.make_id(); + assert!(!self.details.contains_key(&id)); + + // also store in id by key table + for (pk, _) in detail.iter_route_set() { + self.id_by_key.insert(*pk, id.clone()); + } + self.details.insert(id.clone(), detail); + + id + } + pub fn remove_detail(&mut self, id: &RouteSetSpecId) -> Option { + let detail = self.details.remove(id)?; + for (pk, _) in detail.iter_route_set() { + self.id_by_key.remove(&pk).unwrap(); + } + Some(detail) + } + pub fn get_detail(&self, id: &RouteSetSpecId) -> Option<&RouteSetSpecDetail> { + self.details.get(id) + } + pub fn get_detail_mut(&mut self, id: &RouteSetSpecId) -> Option<&mut RouteSetSpecDetail> { + self.details.get_mut(id) + } + pub fn get_id_by_key(&self, key: &PublicKey) -> Option { + self.id_by_key.get(key).cloned() + } + pub fn iter_ids(&self) -> std::collections::hash_map::Keys { + self.details.keys() + } + pub fn iter_details( + &self, + ) -> std::collections::hash_map::Iter { + self.details.iter() + } + pub fn iter_details_mut( + &mut self, + ) -> std::collections::hash_map::IterMut { + self.details.iter_mut() + } + + /// Clean up local allocated routes + /// Resets publication status and statistics for when our node info changes + /// Routes must be republished + pub fn reset_details(&mut self) { + for (_k, v) in &mut self.details { + // Must republish route now + v.set_published(false); + // Restart stats for routes so we test the route again + v.get_stats_mut().reset(); + } + } +} diff --git a/veilid-core/src/routing_table/route_spec_store/route_stats.rs b/veilid-core/src/routing_table/route_spec_store/route_stats.rs new file mode 100644 index 00000000..93b4f304 --- /dev/null +++ b/veilid-core/src/routing_table/route_spec_store/route_stats.rs @@ -0,0 +1,129 @@ +use super::*; + +#[derive(Clone, Debug, Default, RkyvArchive, RkyvSerialize, RkyvDeserialize)] +#[archive_attr(repr(C), derive(CheckBytes))] +pub struct RouteStats { + /// Consecutive failed to send count + #[with(Skip)] + pub failed_to_send: u32, + /// Questions lost + #[with(Skip)] + pub questions_lost: u32, + /// Timestamp of when the route was created + pub created_ts: Timestamp, + /// Timestamp of when the route was last checked for validity + #[with(Skip)] + pub last_tested_ts: Option, + /// Timestamp of when the route was last sent to + #[with(Skip)] + pub last_sent_ts: Option, + /// Timestamp of when the route was last received over + #[with(Skip)] + pub last_received_ts: Option, + /// Transfers up and down + pub transfer_stats_down_up: TransferStatsDownUp, + /// Latency stats + pub latency_stats: LatencyStats, + /// Accounting mechanism for this route's RPC latency + #[with(Skip)] + latency_stats_accounting: LatencyStatsAccounting, + /// Accounting mechanism for the bandwidth across this route + #[with(Skip)] + transfer_stats_accounting: TransferStatsAccounting, +} + +impl RouteStats { + /// Make new route stats + pub fn new(created_ts: Timestamp) -> Self { + Self { + created_ts, + ..Default::default() + } + } + /// Mark a route as having failed to send + pub fn record_send_failed(&mut self) { + self.failed_to_send += 1; + } + + /// Mark a route as having lost a question + pub fn record_question_lost(&mut self) { + self.questions_lost += 1; + } + + /// Mark a route as having received something + pub fn record_received(&mut self, cur_ts: Timestamp, bytes: ByteCount) { + self.last_received_ts = Some(cur_ts); + self.last_tested_ts = Some(cur_ts); + self.transfer_stats_accounting.add_down(bytes); + } + + /// Mark a route as having been sent to + pub fn record_sent(&mut self, cur_ts: Timestamp, bytes: ByteCount) { + self.last_sent_ts = Some(cur_ts); + self.transfer_stats_accounting.add_up(bytes); + } + + /// Mark a route as having been sent to + pub fn record_latency(&mut self, latency: TimestampDuration) { + self.latency_stats = self.latency_stats_accounting.record_latency(latency); + } + + /// Mark a route as having been tested + pub fn record_tested(&mut self, cur_ts: Timestamp) { + self.last_tested_ts = Some(cur_ts); + + // Reset question_lost and failed_to_send if we test clean + self.failed_to_send = 0; + self.questions_lost = 0; + } + + /// Roll transfers for these route stats + pub fn roll_transfers(&mut self, last_ts: Timestamp, cur_ts: Timestamp) { + self.transfer_stats_accounting.roll_transfers( + last_ts, + cur_ts, + &mut self.transfer_stats_down_up, + ) + } + + /// Get the latency stats + pub fn latency_stats(&self) -> &LatencyStats { + &self.latency_stats + } + + /// Get the transfer stats + pub fn transfer_stats(&self) -> &TransferStatsDownUp { + &self.transfer_stats_down_up + } + + /// Reset stats when network restarts + pub fn reset(&mut self) { + self.last_tested_ts = None; + self.last_sent_ts = None; + self.last_received_ts = None; + } + + /// Check if a route needs testing + pub fn needs_testing(&self, cur_ts: Timestamp) -> bool { + // Has the route had any failures lately? + if self.questions_lost > 0 || self.failed_to_send > 0 { + // If so, always test + return true; + } + + // Has the route been tested within the idle time we'd want to check things? + // (also if we've received successfully over the route, this will get set) + if let Some(last_tested_ts) = self.last_tested_ts { + if cur_ts.saturating_sub(last_tested_ts) + > TimestampDuration::new(ROUTE_MIN_IDLE_TIME_MS as u64 * 1000u64) + { + return true; + } + } else { + // If this route has never been tested, it needs to be + return true; + } + + false + } +} diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index 090b32d5..d405e3aa 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -862,7 +862,9 @@ impl RoutingTableInner { pub fn touch_recent_peer(&mut self, node_id: TypedKey, last_connection: ConnectionDescriptor) { self.recent_peers - .insert(node_id, RecentPeersEntry { last_connection }); + .insert(node_id, RecentPeersEntry { last_connection }, |_k, _v| { + // do nothing on lru eviction + }); } ////////////////////////////////////////////////////////////////////// diff --git a/veilid-core/src/rpc_processor/destination.rs b/veilid-core/src/rpc_processor/destination.rs index 98dffccc..af9cbd17 100644 --- a/veilid-core/src/rpc_processor/destination.rs +++ b/veilid-core/src/rpc_processor/destination.rs @@ -21,8 +21,8 @@ pub enum Destination { }, /// Send to private route (privateroute) PrivateRoute { - /// A private route to send to - private_route: PrivateRoute, + /// A private route set id to send to + private_route: String, /// Require safety route or not safety_selection: SafetySelection, }, diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index 2fcf43be..79441c1b 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -4,8 +4,8 @@ use super::*; #[derive(Clone, Debug)] pub enum Target { - NodeId(PublicKey), - PrivateRoute(PublicKey), + NodeId(PublicKey), // Node by any of its public keys + PrivateRoute(String), // Private route by its route set id } pub struct RoutingContextInner {} @@ -118,17 +118,17 @@ impl RoutingContext { safety_selection: self.unlocked_inner.safety_selection, }) } - Target::PrivateRoute(pr) => { + Target::PrivateRoute(rsid) => { // Get remote private route let rss = self.api.routing_table()?.route_spec_store(); let Some(private_route) = rss - .get_remote_private_route(&pr) + .get_remote_private_route(&rsid) else { apibail_key_not_found!(pr); }; Ok(rpc_processor::Destination::PrivateRoute { - private_route, + private_route: rsid, safety_selection: self.unlocked_inner.safety_selection, }) } diff --git a/veilid-core/src/veilid_api/types.rs b/veilid-core/src/veilid_api/types.rs index c0f55cee..7374cda8 100644 --- a/veilid-core/src/veilid_api/types.rs +++ b/veilid-core/src/veilid_api/types.rs @@ -512,8 +512,8 @@ impl SafetySelection { )] #[archive_attr(repr(C), derive(CheckBytes))] pub struct SafetySpec { - /// preferred safety route if it still exists - pub preferred_route: Option, + /// preferred safety route set id if it still exists + pub preferred_route: Option, /// must be greater than 0 pub hop_count: usize, /// prefer reliability over speed