From dec7bd27dad0ba72ba42534728b5f02264eebbf5 Mon Sep 17 00:00:00 2001 From: John Smith Date: Tue, 22 Nov 2022 18:26:39 -0500 Subject: [PATCH] checkpoint --- veilid-core/src/routing_table/mod.rs | 2 + .../{veilid_api => routing_table}/privacy.rs | 0 .../src/routing_table/route_spec_store.rs | 209 +++++++++++++----- veilid-core/src/rpc_processor/mod.rs | 18 +- veilid-core/src/veilid_api/debug.rs | 36 ++- veilid-core/src/veilid_api/mod.rs | 84 ++++++- veilid-core/src/veilid_api/routing_context.rs | 19 +- 7 files changed, 280 insertions(+), 88 deletions(-) rename veilid-core/src/{veilid_api => routing_table}/privacy.rs (100%) diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 2336396c..89370fc4 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -3,6 +3,7 @@ mod bucket_entry; mod debug; mod node_ref; mod node_ref_filter; +mod privacy; mod route_spec_store; mod routing_domain_editor; mod routing_domains; @@ -21,6 +22,7 @@ pub use debug::*; use hashlink::LruCache; pub use node_ref::*; pub use node_ref_filter::*; +pub use privacy::*; pub use route_spec_store::*; pub use routing_domain_editor::*; pub use routing_domains::*; diff --git a/veilid-core/src/veilid_api/privacy.rs b/veilid-core/src/routing_table/privacy.rs similarity index 100% rename from veilid-core/src/veilid_api/privacy.rs rename to veilid-core/src/routing_table/privacy.rs diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index 9bb8d50d..66d80cf6 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -81,14 +81,14 @@ pub struct RouteSpecStoreContent { /// What remote private routes have seen #[derive(Debug, Clone, Default)] struct RemotePrivateRouteInfo { - // When this remote private route was last modified - modified_ts: u64, - /// Did this remote private route see our node info due to no safety route in use - seen_our_node_info: bool, + // The private route itself + private_route: Option, + /// Timestamp of when the route was last used for anything + last_used_ts: u64, /// The time this remote private route last responded last_replied_ts: Option, - /// Timestamp of when the route was last used for anything - last_used_ts: Option, + /// Did this remote private route see our node info due to no safety route in use + seen_our_node_info: bool, } /// Ephemeral data used to help the RouteSpecStore operate efficiently @@ -743,7 +743,7 @@ impl RouteSpecStore { } } } - // We got the correct signatures, return a key ans + // We got the correct signatures, return a key and response safety spec Ok(Some(( rsd.secret_key, SafetySpec { @@ -755,34 +755,56 @@ impl RouteSpecStore { ))) } + /// Test an allocated route for continuity + pub async fn test_route(&self, key: &DHTKey) -> EyreResult { + let inner = &mut *self.inner.lock(); + let rsd = Self::detail(inner, &key).ok_or_else(|| eyre!("route does not exist"))?; + let rpc_processor = self.unlocked_inner.routing_table.rpc_processor(); + + // Target is the last hop + let target = rsd.hop_node_refs.last().unwrap().clone(); + let hop_count = rsd.hops.len(); + let stability = rsd.stability; + let sequencing = rsd.sequencing; + + // Test with ping to end + let res = match rpc_processor + .rpc_call_status(Destination::Direct { + target, + safety_selection: SafetySelection::Safe(SafetySpec { + preferred_route: Some(key.clone()), + hop_count, + stability, + sequencing, + }), + }) + .await? + { + NetworkResult::Value(v) => v, + _ => { + // Did not error, but did not come back, just return false + return Ok(false); + } + }; + + Ok(true) + } + + /// Release an allocated route that is no longer in use pub fn release_route(&self, public_key: DHTKey) -> EyreResult<()> { let mut inner = self.inner.lock(); - if let Some(detail) = inner.content.details.remove(&public_key) { - // Remove from hop cache - let cache_key = route_hops_to_hop_cache(&detail.hops); - if !inner.cache.hop_cache.remove(&cache_key) { - panic!("hop cache should have contained cache key"); - } - // Remove from used nodes cache - for h in &detail.hops { - match inner.cache.used_nodes.entry(*h) { - std::collections::hash_map::Entry::Occupied(mut o) => { - *o.get_mut() -= 1; - if *o.get() == 0 { - o.remove(); - } - } - std::collections::hash_map::Entry::Vacant(_) => { - panic!("used_nodes cache should have contained hop"); - } - } - } - // Remove from end nodes cache - match inner - .cache - .used_end_nodes - .entry(*detail.hops.last().unwrap()) - { + let Some(detail) = inner.content.details.remove(&public_key) else { + bail!("can't release route that was never allocated"); + }; + + // Remove from hop cache + let cache_key = route_hops_to_hop_cache(&detail.hops); + if !inner.cache.hop_cache.remove(&cache_key) { + panic!("hop cache should have contained cache key"); + } + // Remove from used nodes cache + for h in &detail.hops { + match inner.cache.used_nodes.entry(*h) { std::collections::hash_map::Entry::Occupied(mut o) => { *o.get_mut() -= 1; if *o.get() == 0 { @@ -790,11 +812,25 @@ impl RouteSpecStore { } } std::collections::hash_map::Entry::Vacant(_) => { - panic!("used_end_nodes cache should have contained hop"); + panic!("used_nodes cache should have contained hop"); } } - } else { - bail!("can't release route that was never allocated"); + } + // Remove from end nodes cache + match inner + .cache + .used_end_nodes + .entry(*detail.hops.last().unwrap()) + { + std::collections::hash_map::Entry::Occupied(mut o) => { + *o.get_mut() -= 1; + if *o.get() == 0 { + o.remove(); + } + } + std::collections::hash_map::Entry::Vacant(_) => { + panic!("used_end_nodes cache should have contained hop"); + } } Ok(()) } @@ -1205,37 +1241,66 @@ impl RouteSpecStore { Ok(private_route) } + /// Import a remote private route for compilation + pub fn import_remote_private_route(&self, blob: Vec) -> EyreResult { + // decode the pr blob + let private_route = RouteSpecStore::blob_to_private_route(blob)?; + + // store the private route in our cache + let inner = &mut *self.inner.lock(); + let cur_ts = intf::get_timestamp(); + + let key = Self::with_create_remote_private_route(inner, cur_ts, private_route, |r| { + r.private_route.as_ref().unwrap().public_key.clone() + }); + Ok(key) + } + + /// Retrieve an imported remote private route by its public key + pub fn get_remote_private_route(&self, key: &DHTKey) -> EyreResult { + let inner = &mut *self.inner.lock(); + let cur_ts = intf::get_timestamp(); + let Some(pr) = Self::with_get_remote_private_route(inner, cur_ts, key, |r| { + r.private_route.as_ref().unwrap().clone() + }) else { + bail!("remote private route not found"); + }; + Ok(pr) + } + // get or create a remote private route cache entry fn with_create_remote_private_route( inner: &mut RouteSpecStoreInner, cur_ts: u64, - key: &DHTKey, + private_route: PrivateRoute, f: F, ) -> R where F: FnOnce(&mut RemotePrivateRouteInfo) -> R, { + let pr_pubkey = private_route.public_key; + let rpr = inner .cache .remote_private_route_cache - .entry(*key) + .entry(pr_pubkey) .and_modify(|rpr| { - if cur_ts - rpr.modified_ts >= REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY { - *rpr = RemotePrivateRouteInfo { - modified_ts: cur_ts, - seen_our_node_info: false, - last_replied_ts: None, - last_used_ts: None, - }; + if cur_ts - rpr.last_used_ts >= REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY { + // Start fresh if this had expired + rpr.last_used_ts = cur_ts; + rpr.last_replied_ts = None; + rpr.seen_our_node_info = false; } else { - rpr.modified_ts = cur_ts; + // If not expired, just mark as being used + rpr.last_used_ts = cur_ts; } }) .or_insert_with(|| RemotePrivateRouteInfo { - modified_ts: cur_ts, - seen_our_node_info: false, + // New remote private route cache entry + private_route: Some(private_route), + last_used_ts: cur_ts, last_replied_ts: None, - last_used_ts: None, + seen_our_node_info: false, }); f(rpr) } @@ -1248,10 +1313,10 @@ impl RouteSpecStore { f: F, ) -> Option where - F: FnOnce(&RemotePrivateRouteInfo) -> R, + F: FnOnce(&mut RemotePrivateRouteInfo) -> R, { - let rpr = inner.cache.remote_private_route_cache.get(key)?; - if cur_ts - rpr.modified_ts < REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY { + let rpr = inner.cache.remote_private_route_cache.get_mut(key)?; + if cur_ts - rpr.last_used_ts < REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY { return Some(f(rpr)); } inner.cache.remote_private_route_cache.remove(key); @@ -1269,27 +1334,46 @@ impl RouteSpecStore { } /// Mark a remote private route as having seen our node info - pub fn mark_remote_private_route_seen_our_node_info(&self, key: &DHTKey, cur_ts: u64) { + pub fn mark_remote_private_route_seen_our_node_info( + &self, + key: &DHTKey, + cur_ts: u64, + ) -> EyreResult<()> { let inner = &mut *self.inner.lock(); - Self::with_create_remote_private_route(inner, cur_ts, key, |rpr| { + if Self::with_get_remote_private_route(inner, cur_ts, key, |rpr| { rpr.seen_our_node_info = true; }) + .is_none() + { + bail!("private route is missing from store: {}", key); + } + Ok(()) } /// Mark a remote private route as having replied to a question { - pub fn mark_remote_private_route_replied(&self, key: &DHTKey, cur_ts: u64) { + pub fn mark_remote_private_route_replied(&self, key: &DHTKey, cur_ts: u64) -> EyreResult<()> { let inner = &mut *self.inner.lock(); - Self::with_create_remote_private_route(inner, cur_ts, key, |rpr| { + if Self::with_get_remote_private_route(inner, cur_ts, key, |rpr| { rpr.last_replied_ts = Some(cur_ts); }) + .is_none() + { + bail!("private route is missing from store: {}", key); + } + Ok(()) } /// Mark a remote private route as having beed used { - pub fn mark_remote_private_route_used(&self, key: &DHTKey, cur_ts: u64) { + pub fn mark_remote_private_route_used(&self, key: &DHTKey, cur_ts: u64) -> EyreResult<()> { let inner = &mut *self.inner.lock(); - Self::with_create_remote_private_route(inner, cur_ts, key, |rpr| { - rpr.last_used_ts = Some(cur_ts); + if Self::with_get_remote_private_route(inner, cur_ts, key, |rpr| { + rpr.last_used_ts = cur_ts; }) + .is_none() + { + bail!("private route is missing from store: {}", key); + } + Ok(()) } /// Clear caches when local our local node info changes @@ -1306,8 +1390,11 @@ impl RouteSpecStore { v.last_checked_ts = None; } - // Clean up remote private routes - inner.cache.remote_private_route_cache.clear(); + // Reset private route cache + for (_k, v) in &mut inner.cache.remote_private_route_cache { + v.last_replied_ts = None; + v.seen_our_node_info = false; + } } /// Mark route as published diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index c661dec7..1a5c1781 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -452,15 +452,21 @@ impl RPCProcessor { let rss = self.routing_table.route_spec_store(); // If we received a reply from a private route, mark it as such - rss.mark_remote_private_route_replied(&private_route.public_key, recv_ts); + if let Err(e) = + rss.mark_remote_private_route_replied(&private_route.public_key, recv_ts) + { + log_rpc!(error "private route missing: {}", e); + } // If we sent to a private route without a safety route // We need to mark our own node info as having been seen so we can optimize sending it if let SafetySelection::Unsafe(_) = safety_selection { - rss.mark_remote_private_route_seen_our_node_info( + if let Err(e) = rss.mark_remote_private_route_seen_our_node_info( &private_route.public_key, recv_ts, - ); + ) { + log_rpc!(error "private route missing: {}", e); + } } } } @@ -761,7 +767,11 @@ impl RPCProcessor { } = &dest { let rss = self.routing_table.route_spec_store(); - rss.mark_remote_private_route_used(&private_route.public_key, intf::get_timestamp()); + if let Err(e) = + rss.mark_remote_private_route_used(&private_route.public_key, intf::get_timestamp()) + { + log_rpc!(error "private route missing: {}", e); + } } // Pass back waitable reply completion diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index 2b747695..ef0ecef7 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -7,7 +7,7 @@ use routing_table::*; #[derive(Default, Debug)] struct DebugCache { - imported_routes: Vec, + imported_routes: Vec, } static DEBUG_CACHE: Mutex = Mutex::new(DebugCache { @@ -123,10 +123,20 @@ fn get_destination(routing_table: RoutingTable) -> impl FnOnce(&str) -> Option { + // Remove imported route + dc.imported_routes.remove(n); + info!("removed dead imported route {}", n); + return None; + } + Ok(v) => v, + }; Some(Destination::private_route( - r.clone(), + private_route, ss.unwrap_or(SafetySelection::Unsafe(Sequencing::NoPreference)), )) } else { @@ -734,17 +744,24 @@ impl VeilidAPI { let blob_dec = BASE64URL_NOPAD .decode(blob.as_bytes()) .map_err(VeilidAPIError::generic)?; - let pr = - RouteSpecStore::blob_to_private_route(blob_dec).map_err(VeilidAPIError::generic)?; + let rss = self.routing_table()?.route_spec_store(); + let pr_pubkey = rss + .import_remote_private_route(blob_dec) + .map_err(VeilidAPIError::generic)?; let mut dc = DEBUG_CACHE.lock(); let n = dc.imported_routes.len(); - let out = format!("Private route #{} imported: {}", n, pr.public_key); - dc.imported_routes.push(pr); + let out = format!("Private route #{} imported: {}", n, pr_pubkey); + dc.imported_routes.push(pr_pubkey); return Ok(out); } + async fn debug_route_test(&self, _args: Vec) -> Result { + let out = "xxx".to_string(); + return Ok(out); + } + async fn debug_route(&self, args: String) -> Result { let args: Vec = args.split_whitespace().map(|s| s.to_owned()).collect(); @@ -764,6 +781,8 @@ impl VeilidAPI { self.debug_route_list(args).await } else if command == "import" { self.debug_route_import(args).await + } else if command == "test" { + self.debug_route_test(args).await } else { Ok(">>> Unknown command\n".to_owned()) } @@ -791,6 +810,7 @@ impl VeilidAPI { print list import + test is: * direct: [+][] diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 0512a391..605fcf90 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -1,12 +1,10 @@ #![allow(dead_code)] mod debug; -mod privacy; mod routing_context; mod serialize_helpers; pub use debug::*; -pub use privacy::*; pub use routing_context::*; pub use serialize_helpers::*; @@ -25,12 +23,13 @@ pub use intf::BlockStore; pub use intf::ProtectedStore; pub use intf::TableStore; pub use network_manager::NetworkManager; -pub use routing_table::{NodeRef, NodeRefBase, RoutingTable}; +pub use routing_table::{NodeRef, NodeRefBase}; use core::fmt; use core_context::{api_shutdown, VeilidCoreContext}; use enumset::*; use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize}; +use routing_table::{RouteSpecStore, RoutingTable}; use rpc_processor::*; use serde::*; use xx::*; @@ -86,8 +85,8 @@ pub enum VeilidAPIError { Timeout, #[error("Shutdown")] Shutdown, - #[error("Node not found: {node_id}")] - NodeNotFound { node_id: NodeId }, + #[error("Key not found: {key}")] + KeyNotFound { key: DHTKey }, #[error("No connection: {message}")] NoConnection { message: String }, #[error("No peer info: {node_id}")] @@ -123,11 +122,13 @@ impl VeilidAPIError { pub fn shutdown() -> Self { Self::Shutdown } - pub fn node_not_found(node_id: NodeId) -> Self { - Self::NodeNotFound { node_id } + pub fn key_not_found(key: DHTKey) -> Self { + Self::KeyNotFound { key } } - pub fn no_connection(message: String) -> Self { - Self::NoConnection { message } + pub fn no_connection(msg: T) -> Self { + Self::NoConnection { + message: msg.to_string(), + } } pub fn no_peer_info(node_id: NodeId) -> Self { Self::NoPeerInfo { node_id } @@ -2681,6 +2682,13 @@ impl VeilidAPI { } Err(VeilidAPIError::NotInitialized) } + pub fn routing_table(&self) -> Result { + let inner = self.inner.lock(); + if let Some(context) = &inner.context { + return Ok(context.attachment_manager.network_manager().routing_table()); + } + Err(VeilidAPIError::NotInitialized) + } //////////////////////////////////////////////////////////////// // Attach/Detach @@ -2732,6 +2740,64 @@ impl VeilidAPI { RoutingContext::new(self.clone()) } + //////////////////////////////////////////////////////////////// + // Private route allocation + + #[instrument(level = "debug", skip(self))] + pub async fn new_default_private_route(&self) -> Result<(DHTKey, Vec), VeilidAPIError> { + let config = self.config()?; + let c = config.get(); + self.new_private_route( + Stability::LowLatency, + Sequencing::NoPreference, + c.network.rpc.default_route_hop_count.into(), + ) + .await + } + + #[instrument(level = "debug", skip(self))] + pub async fn new_private_route( + &self, + stability: Stability, + sequencing: Sequencing, + hop_count: usize, + ) -> Result<(DHTKey, Vec), VeilidAPIError> { + let rss = self.routing_table()?.route_spec_store(); + let r = rss + .allocate_route( + stability, + sequencing, + hop_count, + Direction::Inbound.into(), + &[], + ) + .map_err(VeilidAPIError::internal)?; + let Some(pr_pubkey) = r else { + return Err(VeilidAPIError::generic("unable to allocate route")); + }; + if !rss + .test_route(&pr_pubkey) + .await + .map_err(VeilidAPIError::no_connection)? + { + rss.release_route(pr_pubkey) + .map_err(VeilidAPIError::generic)?; + return Err(VeilidAPIError::generic("allocated route failed to test")); + } + let private_route = rss + .assemble_private_route(&pr_pubkey, Some(true)) + .map_err(VeilidAPIError::generic)?; + let blob = match RouteSpecStore::private_route_to_blob(&private_route) { + Ok(v) => v, + Err(e) => { + rss.release_route(pr_pubkey) + .map_err(VeilidAPIError::generic)?; + return Err(VeilidAPIError::internal(e)); + } + }; + Ok((pr_pubkey, blob)) + } + //////////////////////////////////////////////////////////////// // App Calls diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index 47efb369..8e17b7de 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -4,7 +4,7 @@ use super::*; #[derive(Clone, Debug)] pub enum Target { NodeId(NodeId), - PrivateRoute(PrivateRoute), + PrivateRoute(DHTKey), } pub struct RoutingContextInner {} @@ -115,7 +115,7 @@ impl RoutingContext { // Resolve node let mut nr = match rpc_processor.resolve_node(node_id.key).await { Ok(Some(nr)) => nr, - Ok(None) => return Err(VeilidAPIError::NodeNotFound { node_id }), + Ok(None) => return Err(VeilidAPIError::KeyNotFound { key: node_id.key }), Err(e) => return Err(e.into()), }; // Apply sequencing to match safety selection @@ -126,10 +126,17 @@ impl RoutingContext { safety_selection: self.unlocked_inner.safety_selection, }) } - Target::PrivateRoute(pr) => Ok(rpc_processor::Destination::PrivateRoute { - private_route: pr, - safety_selection: self.unlocked_inner.safety_selection, - }), + Target::PrivateRoute(pr) => { + // Get remote private route + let rss = self.api.routing_table()?.route_spec_store(); + let private_route = rss + .get_remote_private_route(&pr) + .map_err(|_| VeilidAPIError::KeyNotFound { key: pr })?; + Ok(rpc_processor::Destination::PrivateRoute { + private_route, + safety_selection: self.unlocked_inner.safety_selection, + }) + } } }