From a06c2fb5a395c6f93528abca711b7ca997c23eff Mon Sep 17 00:00:00 2001 From: John Smith Date: Wed, 12 Oct 2022 15:52:19 -0400 Subject: [PATCH] checkpoint --- veilid-core/src/network_manager/tasks.rs | 4 +- veilid-core/src/routing_table/find_nodes.rs | 28 +-- .../src/routing_table/route_spec_store.rs | 176 ++++++++++++------ 3 files changed, 137 insertions(+), 71 deletions(-) diff --git a/veilid-core/src/network_manager/tasks.rs b/veilid-core/src/network_manager/tasks.rs index e41c8c06..c34b9cb7 100644 --- a/veilid-core/src/network_manager/tasks.rs +++ b/veilid-core/src/network_manager/tasks.rs @@ -224,7 +224,7 @@ impl NetworkManager { #[instrument(level = "trace", skip(self), err)] pub(super) async fn bootstrap_task_routine(self, stop_token: StopToken) -> EyreResult<()> { let (bootstrap, bootstrap_nodes) = { - let c = self.config.get(); + let c = self.unlocked_inner.config.get(); ( c.network.bootstrap.clone(), c.network.bootstrap_nodes.clone(), @@ -487,7 +487,7 @@ impl NetworkManager { let routing_table = self.routing_table(); let mut ord = FuturesOrdered::new(); let min_peer_count = { - let c = self.config.get(); + let c = self.unlocked_inner.config.get(); c.network.dht.min_peer_count as usize }; diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index 3e3866d4..0e304d91 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -74,13 +74,13 @@ impl RoutingTable { } // Retrieve the fastest nodes in the routing table matching an entry filter - pub fn find_fast_public_nodes_filtered<'r, 'e, F>( + pub fn find_fast_public_nodes_filtered<'a, 'b, F>( &self, node_count: usize, mut entry_filter: F, ) -> Vec where - F: FnMut(&'r RoutingTableInner, &'e BucketEntryInner) -> bool, + F: FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool, { self.find_fastest_nodes( // count @@ -201,7 +201,7 @@ impl RoutingTable { } } - pub fn find_peers_with_sort_and_filter( + pub fn find_peers_with_sort_and_filter<'a, 'b, F, C, T, O>( &self, node_count: usize, cur_ts: u64, @@ -210,13 +210,13 @@ impl RoutingTable { mut transform: T, ) -> Vec where - F: FnMut(&RoutingTableInner, DHTKey, Option>) -> bool, + F: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> bool, C: FnMut( - &RoutingTableInner, - &(DHTKey, Option>), - &(DHTKey, Option>), + &'a RoutingTableInner, + &'b (DHTKey, Option>), + &'b (DHTKey, Option>), ) -> core::cmp::Ordering, - T: FnMut(&RoutingTableInner, DHTKey, Option>) -> O, + T: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> O, { let inner = self.inner.read(); let inner = &*inner; @@ -259,15 +259,15 @@ impl RoutingTable { out } - pub fn find_fastest_nodes( + pub fn find_fastest_nodes<'a, T, F, O>( &self, node_count: usize, mut filter: F, transform: T, ) -> Vec where - F: FnMut(&RoutingTableInner, DHTKey, Option>) -> bool, - T: FnMut(&RoutingTableInner, DHTKey, Option>) -> O, + F: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> bool, + T: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> O, { let cur_ts = intf::get_timestamp(); let out = self.find_peers_with_sort_and_filter( @@ -341,15 +341,15 @@ impl RoutingTable { out } - pub fn find_closest_nodes( + pub fn find_closest_nodes<'a, F, T, O>( &self, node_id: DHTKey, filter: F, mut transform: T, ) -> Vec where - F: FnMut(&RoutingTableInner, DHTKey, Option>) -> bool, - T: FnMut(&RoutingTableInner, DHTKey, Option>) -> O, + F: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> bool, + T: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> O, { let cur_ts = intf::get_timestamp(); let node_count = { diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index 46368ef3..bc75825c 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -37,7 +37,7 @@ struct RouteSpecDetail { } /// The core representation of the RouteSpecStore that can be serialized -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Default, Serialize, Deserialize)] pub struct RouteSpecStoreContent { /// All of the routes we have allocated so far details: HashMap, @@ -154,32 +154,95 @@ impl RouteSpecStore { } } - pub fn load(routing_table: RoutingTable) -> Result { + pub async fn load(routing_table: RoutingTable) -> EyreResult { // Get cbor blob from table store - let content: RouteSpecStoreContent = serde_cbor::from_slice(cbor) - .map_err(|e| VeilidAPIError::parse_error("invalid route spec store content", e))?; - let rss = RouteSpecStore { + let table_store = routing_table.network_manager().table_store(); + let rsstdb = table_store.open("RouteSpecStore", 1).await?; + let content = rsstdb.load_cbor(0, b"content").await?.unwrap_or_default(); + let mut rss = RouteSpecStore { content, cache: Default::default(), }; + + // Load secrets from pstore + let pstore = routing_table.network_manager().protected_store(); + let mut dead_keys = Vec::new(); + for (k, v) in &mut rss.content.details { + if let Some(secret_key) = pstore + .load_user_secret(&format!("RouteSpecStore_{}", k.encode())) + .await? + { + match secret_key.try_into() { + Ok(s) => { + v.secret_key = DHTKeySecret::new(s); + } + Err(_) => { + dead_keys.push(*k); + } + } + } else { + dead_keys.push(*k); + } + } + for k in dead_keys { + log_rtab!(debug "killing off private route: {}", k.encode()); + rss.content.details.remove(&k); + } + + // Rebuild the routespecstore cache rss.rebuild_cache(routing_table); Ok(rss) } - pub fn save(&self, routing_table: RoutingTable) -> Result<(), VeilidAPIError> { + pub async fn save(&self, routing_table: RoutingTable) -> EyreResult<()> { // Save all the fields we care about to the cbor blob in table storage - let cbor = serde_cbor::to_vec(&self.content).unwrap(); let table_store = routing_table.network_manager().table_store(); - table_store.open("") + let rsstdb = table_store.open("RouteSpecStore", 1).await?; + rsstdb.store_cbor(0, b"content", &self.content).await?; + + // Keep secrets in protected store as well + let pstore = routing_table.network_manager().protected_store(); + for (k, v) in &self.content.details { + if pstore + .save_user_secret( + &format!("RouteSpecStore_{}", k.encode()), + &v.secret_key.bytes, + ) + .await? + { + panic!("route spec should not already have secret key saved"); + } + } + + Ok(()) + } + + fn add_to_cache(&mut self, cache_key: Vec, rsd: &RouteSpecDetail) { + if !self.cache.hop_cache.insert(cache_key) { + panic!("route should never be inserted twice"); + } + for h in &rsd.hops { + self.cache + .used_nodes + .entry(*h) + .and_modify(|e| *e += 1) + .or_insert(1); + } + self.cache + .used_end_nodes + .entry(*rsd.hops.last().unwrap()) + .and_modify(|e| *e += 1) + .or_insert(1); } fn rebuild_cache(&mut self, routing_table: RoutingTable) { - // - // xxx also load secrets from pstore - let pstore = routing_table.network_manager().protected_store(); + for v in self.content.details.values() { + let cache_key = route_hops_to_hop_cache(&v.hops); + self.add_to_cache(cache_key, &v); + } } - fn detail_mut(&mut self, public_key: DHTKey) -> &mut RouteSpecDetail { + fn detail_mut(&mut self, public_key: &DHTKey) -> &mut RouteSpecDetail { self.content.details.get_mut(&public_key).unwrap() } @@ -187,13 +250,13 @@ impl RouteSpecStore { /// Prefers nodes that are not currently in use by another route /// The route is not yet tested for its reachability /// Returns None if no route could be allocated at this time - pub fn allocate_route( + pub async fn allocate_route( &mut self, routing_table: RoutingTable, reliable: bool, hop_count: usize, directions: DirectionSet, - ) -> Option { + ) -> EyreResult> { use core::cmp::Ordering; let max_route_hop_count = { @@ -204,13 +267,11 @@ impl RouteSpecStore { }; if hop_count < 2 { - log_rtab!(error "Not allocating route less than two hops in length"); - return None; + bail!("Not allocating route less than two hops in length"); } if hop_count > max_route_hop_count { - log_rtab!(error "Not allocating route longer than max route hop count"); - return None; + bail!("Not allocating route longer than max route hop count"); } // Get list of all nodes, and sort them for selection @@ -293,8 +354,8 @@ impl RouteSpecStore { } // always prioritize reliable nodes, but sort by oldest or fastest - let cmpout = v1.1.unwrap().with(rti, |rti, e1| { - v2.1.unwrap().with(rti, |_rti, e2| { + let cmpout = v1.1.as_ref().unwrap().with(rti, |rti, e1| { + v2.1.as_ref().unwrap().with(rti, |_rti, e2| { if reliable { BucketEntryInner::cmp_oldest_reliable(cur_ts, e1, e2) } else { @@ -321,13 +382,13 @@ impl RouteSpecStore { RoutingDomain::PublicInternet.into(), BucketEntryState::Unreliable, ); - let mut nodes = routing_table + let nodes = routing_table .find_peers_with_sort_and_filter(node_count, cur_ts, filter, compare, transform); // If we couldn't find enough nodes, wait until we have more nodes in the routing table if nodes.len() < hop_count { - log_rtab!(debug "Not enough nodes to construct route at this time. Try again later."); - return None; + log_rtab!(debug "not enough nodes to construct route at this time"); + return Ok(None); } // Now go through nodes and try to build a route we haven't seen yet @@ -396,7 +457,8 @@ impl RouteSpecStore { } } if route_nodes.is_empty() { - return None; + log_rtab!(debug "unable to find unique route at this time"); + return Ok(None); } // Got a unique route, lets build the detail, register it, and return it @@ -423,25 +485,13 @@ impl RouteSpecStore { reliable, }; + // Add to cache + self.add_to_cache(cache_key, &rsd); + // Keep route in spec store self.content.details.insert(public_key, rsd); - if !self.cache.hop_cache.insert(cache_key) { - panic!("route should never be inserted twice"); - } - for h in &hops { - self.cache - .used_nodes - .entry(*h) - .and_modify(|e| *e += 1) - .or_insert(1); - } - self.cache - .used_end_nodes - .entry(*hops.last().unwrap()) - .and_modify(|e| *e += 1) - .or_insert(1); - Some(public_key) + Ok(Some(public_key)) } pub fn release_route(&mut self, public_key: DHTKey) { @@ -482,7 +532,7 @@ impl RouteSpecStore { } } - pub fn best_route( + pub fn first_unpublished_route( &mut self, reliable: bool, min_hop_count: usize, @@ -494,6 +544,7 @@ impl RouteSpecStore { && detail.1.hops.len() >= min_hop_count && detail.1.hops.len() <= max_hop_count && detail.1.directions.is_subset(directions) + && !detail.1.published { return Some(*detail.0); } @@ -507,31 +558,46 @@ impl RouteSpecStore { /// Mark route as published /// When first deserialized, routes must be re-published in order to ensure they remain /// in the RouteSpecStore. - pub fn mark_route_published(&mut self, spec: Arc) { - self.detail_mut(spec).published = true; + pub fn mark_route_published(&mut self, key: &DHTKey) { + self.detail_mut(&key).published = true; } /// Mark route as checked - pub fn touch_route_checked(&mut self, spec: Arc, cur_ts: u64) { - self.detail_mut(spec).last_checked_ts = cur_ts; + pub fn touch_route_checked(&mut self, key: &DHTKey, cur_ts: u64) { + self.detail_mut(&key).last_checked_ts = Some(cur_ts); } - pub fn record_latency(&mut self, spec: Arc, latency: u64) { - let lsa = self.detail_mut(spec).latency_stats_accounting; - self.detail_mut(spec).latency_stats = lsa.record_latency(latency); + /// Record latency on the route + pub fn record_latency(&mut self, key: &DHTKey, latency: u64) { + let lsa = &mut self.detail_mut(&key).latency_stats_accounting; + self.detail_mut(&key).latency_stats = lsa.record_latency(latency); } - pub fn latency_stats(&self, spec: Arc) -> LatencyStats { - self.detail_mut(spec).latency_stats.clone() + /// Get the calculated latency stats + pub fn latency_stats(&self, key: &DHTKey) -> LatencyStats { + self.detail_mut(&key).latency_stats.clone() } - pub fn add_down(&mut self, spec: Arc, bytes: u64) { - self.current_transfer.down += bytes; + /// Add download transfers to route + pub fn add_down(&mut self, key: &DHTKey, bytes: u64) { + let tsa = &mut self.detail_mut(&key).transfer_stats_accounting; + tsa.add_down(bytes); } - pub fn add_up(&mut self, spec: Arc, bytes: u64) {} + /// Add upload transfers to route + pub fn add_up(&mut self, key: &DHTKey, bytes: u64) { + let tsa = &mut self.detail_mut(&key).transfer_stats_accounting; + tsa.add_up(bytes); + } - pub fn roll_transfers(&mut self) { - // + /// Process transfer statistics to get averages + pub fn roll_transfers(&mut self, last_ts: u64, cur_ts: u64) { + for rsd in self.content.details.values_mut() { + rsd.transfer_stats_accounting.roll_transfers( + last_ts, + cur_ts, + &mut rsd.transfer_stats_down_up, + ); + } } }