diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 6bca250c..1204a4a1 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -197,7 +197,7 @@ impl NetworkManager { block_store: BlockStore, crypto: Crypto, ) -> NetworkManagerUnlockedInner { - let c = config.get(); + let min_peer_refresh_time_ms = config.get().network.dht.min_peer_refresh_time_ms; NetworkManagerUnlockedInner { config, protected_store, @@ -211,7 +211,7 @@ impl NetworkManager { relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS), private_route_management_task: TickTask::new(PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS), bootstrap_task: TickTask::new(1), - peer_minimum_refresh_task: TickTask::new_ms(c.network.dht.min_peer_refresh_time_ms), + peer_minimum_refresh_task: TickTask::new_ms(min_peer_refresh_time_ms), ping_validator_task: TickTask::new(1), public_address_check_task: TickTask::new(PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS), node_info_update_single_future: MustJoinSingleFuture::new(), diff --git a/veilid-core/src/routing_table/bucket.rs b/veilid-core/src/routing_table/bucket.rs index 88051c0c..ba48563c 100644 --- a/veilid-core/src/routing_table/bucket.rs +++ b/veilid-core/src/routing_table/bucket.rs @@ -57,7 +57,7 @@ impl Bucket { } pub(super) fn kick( - &self, + &mut self, inner: &mut RoutingTableInner, bucket_depth: usize, ) -> Option> { diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index fd563f7c..94428da9 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -658,7 +658,7 @@ impl BucketEntry { // immutable reference to RoutingTableInner must be passed in to get this // This ensures that an operation on the routing table can not change entries // while it is being read from - pub(super) fn with(&self, rti: &RoutingTableInner, f: F) -> R + pub fn with(&self, rti: &RoutingTableInner, f: F) -> R where F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> R, { @@ -668,7 +668,7 @@ impl BucketEntry { // Note, that this requires -also- holding the RoutingTable write lock, as a // mutable reference to RoutingTableInner must be passed in to get this - pub(super) fn with_mut(&self, rti: &mut RoutingTableInner, f: F) -> R + pub fn with_mut(&self, rti: &mut RoutingTableInner, f: F) -> R where F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner) -> R, { diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index ef0854b2..fd1d80ab 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -20,6 +20,8 @@ pub struct CompiledRoute { pub safety_route: SafetyRoute, /// The secret used to encrypt the message payload pub secret: DHTKeySecret, + /// The node ref to the first hop in the compiled route + pub first_hop: NodeRef, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -46,6 +48,9 @@ struct RouteSpecDetail { /// Not serialized because all routes should be re-published when restarting #[serde(skip)] published: bool, + // Can optimize the rendering of this route, using node ids only instead of full peer info + #[serde(skip)] + reachable: bool, /// Timestamp of when the route was created created_ts: u64, /// Timestamp of when the route was last checked for validity @@ -124,7 +129,7 @@ fn get_route_permutation_count(hop_count: usize) -> usize { /// get the route permutation at particular 'perm' index, starting at the 'start' index /// for a set of 'hop_count' nodes. the first node is always fixed, and the maximum /// number of permutations is given by get_route_permutation_count() -fn with_route_permutations(hop_count: usize, start: usize, f: F) -> bool +fn with_route_permutations(hop_count: usize, start: usize, mut f: F) -> bool where F: FnMut(&[usize]) -> bool, { @@ -142,7 +147,7 @@ where } // heaps algorithm, but skipping the first element - fn heaps_permutation(permutation: &mut [usize], size: usize, f: F) -> bool + fn heaps_permutation(permutation: &mut [usize], size: usize, mut f: F) -> bool where F: FnMut(&[usize]) -> bool, { @@ -154,7 +159,7 @@ where } for i in 0..size { - if heaps_permutation(permutation, size - 1, f) { + if heaps_permutation(permutation, size - 1, &mut f) { return true; } if size % 2 == 1 { @@ -224,7 +229,7 @@ impl RouteSpecStore { } // Rebuild the routespecstore cache - rss.rebuild_cache(routing_table); + rss.rebuild_cache(); Ok(rss) } @@ -251,28 +256,28 @@ impl RouteSpecStore { Ok(()) } - fn add_to_cache(&mut self, cache_key: Vec, rsd: &RouteSpecDetail) { - if !self.cache.hop_cache.insert(cache_key) { + fn add_to_cache(cache: &mut RouteSpecStoreCache, cache_key: Vec, rsd: &RouteSpecDetail) { + if !cache.hop_cache.insert(cache_key) { panic!("route should never be inserted twice"); } for h in &rsd.hops { - self.cache + cache .used_nodes .entry(*h) .and_modify(|e| *e += 1) .or_insert(1); } - self.cache + cache .used_end_nodes .entry(*rsd.hops.last().unwrap()) .and_modify(|e| *e += 1) .or_insert(1); } - fn rebuild_cache(&mut self, routing_table: RoutingTable) { + fn rebuild_cache(&mut self) { for v in self.content.details.values() { let cache_key = route_hops_to_hop_cache(&v.hops); - self.add_to_cache(cache_key, &v); + Self::add_to_cache(&mut self.cache, cache_key, &v); } } @@ -304,12 +309,7 @@ impl RouteSpecStore { // Get list of all nodes, and sort them for selection let cur_ts = intf::get_timestamp(); - let dial_info_sort = if reliable { - Some(DialInfoDetail::reliable_sort) - } else { - None - }; - let filter = |rti, k: DHTKey, v: Option>| -> bool { + let filter = |rti, _k: DHTKey, v: Option>| -> bool { // Exclude our own node from routes if v.is_none() { return false; @@ -503,7 +503,10 @@ impl RouteSpecStore { let hops = route_nodes.iter().map(|v| nodes[*v].0).collect(); let hop_node_refs = route_nodes .iter() - .map(|v| rti.lookup_node_ref(routing_table, nodes[*v].0).unwrap()) + .map(|v| { + rti.lookup_node_ref(routing_table.clone(), nodes[*v].0) + .unwrap() + }) .collect(); let (public_key, secret_key) = generate_secret(); @@ -517,6 +520,7 @@ impl RouteSpecStore { latency_stats_accounting: Default::default(), transfer_stats_accounting: Default::default(), published: false, + reachable: false, created_ts: cur_ts, last_checked_ts: None, last_used_ts: None, @@ -525,7 +529,7 @@ impl RouteSpecStore { }; // Add to cache - self.add_to_cache(cache_key, &rsd); + Self::add_to_cache(&mut self.cache, cache_key, &rsd); // Keep route in spec store self.content.details.insert(public_key, rsd); @@ -543,7 +547,7 @@ impl RouteSpecStore { // Remove from used nodes cache for h in &detail.hops { match self.cache.used_nodes.entry(*h) { - std::collections::hash_map::Entry::Occupied(o) => { + std::collections::hash_map::Entry::Occupied(mut o) => { *o.get_mut() -= 1; if *o.get() == 0 { o.remove(); @@ -556,7 +560,7 @@ impl RouteSpecStore { } // Remove from end nodes cache match self.cache.used_nodes.entry(*detail.hops.last().unwrap()) { - std::collections::hash_map::Entry::Occupied(o) => { + std::collections::hash_map::Entry::Occupied(mut o) => { *o.get_mut() -= 1; if *o.get() == 0 { o.remove(); @@ -611,23 +615,44 @@ impl RouteSpecStore { // See if we are using a safety route, if not, short circuit this operation if safety_spec.is_none() { // Safety route stub with the node's public key as the safety route key since it's the 0th hop + if private_route.first_hop.is_none() { + return Err(RPCError::internal("can't compile zero length route")); + } + let first_hop = private_route.first_hop.as_ref().unwrap(); + let opt_first_hop_noderef = match &first_hop.node { + RouteNode::NodeId(id) => rti.lookup_node_ref(routing_table, id.key), + RouteNode::PeerInfo(pi) => rti.register_node_with_signed_node_info( + routing_table.clone(), + RoutingDomain::PublicInternet, + pi.node_id.key, + pi.signed_node_info, + false, + ), + }; + if opt_first_hop_noderef.is_none() { + // Can't reach this private route any more + log_rtab!(debug "can't reach private route any more"); + return Ok(None); + } + return Ok(Some(CompiledRoute { safety_route: SafetyRoute::new_stub(routing_table.node_id(), private_route), secret: routing_table.node_id_secret(), + first_hop: opt_first_hop_noderef.unwrap(), })); } let safety_spec = safety_spec.unwrap(); // See if the preferred route is here - let safety_route_public_key; - let opt_safety_rsd: Option<&mut RouteSpecDetail> = + let opt_safety_rsd: Option<(&mut RouteSpecDetail, DHTKey)> = if let Some(preferred_route) = safety_spec.preferred_route { self.detail_mut(&preferred_route) + .map(|rsd| (rsd, preferred_route)) } else { // Preferred safety route was not requested None }; - let safety_rsd: &mut RouteSpecDetail = if let Some(safety_rsd) = opt_safety_rsd { + let (safety_rsd, sr_pubkey) = if let Some(safety_rsd) = opt_safety_rsd { // Safety route exists safety_rsd } else { @@ -639,13 +664,13 @@ impl RouteSpecStore { Direction::Outbound.into(), ) { // Found a route to use - self.detail_mut(&sr_pubkey).unwrap() + (self.detail_mut(&sr_pubkey).unwrap(), sr_pubkey) } else { // No route found, gotta allocate one let sr_pubkey = match self .allocate_route( rti, - routing_table, + routing_table.clone(), safety_spec.reliable, safety_spec.hop_count, Direction::Outbound.into(), @@ -655,12 +680,10 @@ impl RouteSpecStore { Some(pk) => pk, None => return Ok(None), }; - self.detail_mut(&sr_pubkey).unwrap() + (self.detail_mut(&sr_pubkey).unwrap(), sr_pubkey) } }; - // xxx implement caching first! - // Ensure the total hop count isn't too long for our config let sr_hopcount = safety_spec.hop_count; if sr_hopcount == 0 { @@ -672,7 +695,9 @@ impl RouteSpecStore { // See if we can optimize this compilation yet // We don't want to include full nodeinfo if we don't have to - //let optimize = safety_rsd. + let optimize = safety_rsd.reachable; + + // xxx implement caching here! // Create hops let hops = { @@ -714,7 +739,27 @@ impl RouteSpecStore { // Make route hop let route_hop = RouteHop { - node: safety_route_spec.hops[h].dial_info.clone(), + node: match optimize { + // Optimized, no peer info, just the dht key + true => RouteNode::NodeId(NodeId::new(safety_rsd.hops[h])), + // Full peer info, required until we are sure the route has been fully established + false => { + let node_id = safety_rsd.hops[h]; + let pi = rti + .with_node_entry(node_id, |entry| { + entry.with(rti, |rti, e| { + e.make_peer_info(node_id, RoutingDomain::PublicInternet) + }) + }) + .flatten(); + if pi.is_none() { + return Err(RPCError::internal( + "peer info should exist for route but doesn't", + )); + } + RouteNode::PeerInfo(pi.unwrap()) + } + }, next_hop: Some(route_hop_data), }; @@ -734,12 +779,8 @@ impl RouteSpecStore { } // Encode first RouteHopData - let dh_secret = self - .crypto - .cached_dh( - &safety_route_spec.hops[0].dial_info.node_id.key, - &safety_route_spec.secret_key, - ) + let dh_secret = crypto + .cached_dh(&safety_rsd.hops[0], &safety_rsd.secret_key) .map_err(RPCError::map_internal("dh failed"))?; let enc_msg_data = Crypto::encrypt_aead(blob_data.as_slice(), &nonce, &dh_secret, None) .map_err(RPCError::map_internal("encryption failed"))?; @@ -754,15 +795,20 @@ impl RouteSpecStore { // Build safety route let safety_route = SafetyRoute { - public_key: safety_rsd. + public_key: sr_pubkey, hop_count: safety_spec.hop_count as u8, hops, }; - Ok(Some(CompiledRoute { + let compiled_route = CompiledRoute { safety_route, - secret: todo!(), - })) + secret: safety_rsd.secret_key, + first_hop: safety_rsd.hop_node_refs.first().unwrap().clone(), + }; + + // xxx: add cache here + + Ok(Some(compiled_route)) } /// Mark route as published @@ -775,6 +821,16 @@ impl RouteSpecStore { Ok(()) } + /// Mark route as reachable + /// When first deserialized, routes must be re-tested for reachability + /// This can be used to determine if routes need to be sent with full peerinfo or can just use a node id + pub fn mark_route_reachable(&mut self, key: &DHTKey) -> EyreResult<()> { + self.detail_mut(&key) + .ok_or_else(|| eyre!("route does not exist"))? + .published = true; + Ok(()) + } + /// Mark route as checked pub fn touch_route_checked(&mut self, key: &DHTKey, cur_ts: u64) -> EyreResult<()> { self.detail_mut(&key) diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index f0bf9a24..aded6846 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -68,10 +68,10 @@ impl RoutingTableInner { self.unlocked_inner.config.clone() } - pub fn transfer_stats_accounting(&mut self) -> &TransferStatsAccounting { + pub fn transfer_stats_accounting(&mut self) -> &mut TransferStatsAccounting { &mut self.self_transfer_stats_accounting } - pub fn latency_stats_accounting(&mut self) -> &LatencyStatsAccounting { + pub fn latency_stats_accounting(&mut self) -> &mut LatencyStatsAccounting { &mut self.self_latency_stats_accounting } @@ -616,7 +616,7 @@ impl RoutingTableInner { /// Resolve an existing routing table entry and return a reference to it pub fn lookup_node_ref(&self, outer_self: RoutingTable, node_id: DHTKey) -> Option { if node_id == self.unlocked_inner.node_id { - log_rtab!(debug "can't look up own node id in routing table"); + log_rtab!(error "can't look up own node id in routing table"); return None; } let idx = self.find_bucket_index(node_id); @@ -644,6 +644,23 @@ impl RoutingTableInner { ) } + /// Resolve an existing routing table entry and call a function on its entry without using a noderef + pub fn with_node_entry(&self, node_id: DHTKey, f: F) -> Option + where + F: FnOnce(Arc) -> R, + { + if node_id == self.unlocked_inner.node_id { + log_rtab!(error "can't look up own node id in routing table"); + return None; + } + let idx = self.find_bucket_index(node_id); + let bucket = &self.buckets[idx]; + if let Some(e) = bucket.entry(&node_id) { + return Some(f(e)); + } + None + } + /// Shortcut function to add a node to our routing table if it doesn't exist /// and add the dial info we have for it. Returns a noderef filtered to /// the routing domain in which this node was registered for convenience. diff --git a/veilid-core/src/rpc_processor/destination.rs b/veilid-core/src/rpc_processor/destination.rs index 152923cf..a611275b 100644 --- a/veilid-core/src/rpc_processor/destination.rs +++ b/veilid-core/src/rpc_processor/destination.rs @@ -81,6 +81,25 @@ impl Destination { }, } } + + pub fn get_safety_spec(&self) -> &Option { + match self { + Destination::Direct { + target: _, + safety_spec, + } => safety_spec, + Destination::Relay { + relay: _, + target: _, + safety_spec, + } => safety_spec, + Destination::PrivateRoute { + private_route: _, + safety_spec, + reliable: _, + } => safety_spec, + } + } } impl fmt::Display for Destination { diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 4947a0a3..499fc543 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -55,8 +55,8 @@ struct RPCMessageHeader { connection_descriptor: ConnectionDescriptor, /// The routing domain the message was sent through routing_domain: RoutingDomain, - /// The private route the message was received through - private_route: Option, + // The private route the message was received through + //private_route: Option, } #[derive(Debug)] @@ -87,7 +87,7 @@ pub(crate) struct RPCMessage { opt_sender_nr: Option, } -fn builder_to_vec<'a, T>(builder: capnp::message::Builder) -> Result, RPCError> +pub fn builder_to_vec<'a, T>(builder: capnp::message::Builder) -> Result, RPCError> where T: capnp::message::Allocator + 'a, { @@ -185,10 +185,10 @@ impl RPCProcessor { // set up channel let mut concurrency = c.network.rpc.concurrency; - let mut queue_size = c.network.rpc.queue_size; - let mut timeout = ms_to_us(c.network.rpc.timeout_ms); - let mut max_route_hop_count = c.network.rpc.max_route_hop_count as usize; - let mut default_route_hop_count = c.network.rpc.default_route_hop_count as usize; + let queue_size = c.network.rpc.queue_size; + let timeout = ms_to_us(c.network.rpc.timeout_ms); + let max_route_hop_count = c.network.rpc.max_route_hop_count as usize; + let default_route_hop_count = c.network.rpc.default_route_hop_count as usize; if concurrency == 0 { concurrency = intf::get_concurrency() / 2; if concurrency == 0 { @@ -405,8 +405,11 @@ impl RPCProcessor { message_data: Vec, ) -> Result, RPCError> { let routing_table = self.routing_table(); + let pr_hop_count = private_route.hop_count; + let pr_pubkey = private_route.public_key; + let compiled_route: CompiledRoute = - match self.routing_table().with_route_spec_store(|rss, rti| { + match self.routing_table().with_route_spec_store_mut(|rss, rti| { // Compile the safety route with the private route rss.compile_safety_route(rti, routing_table, safety_spec, private_route) })? { @@ -423,7 +426,7 @@ impl RPCProcessor { let nonce = Crypto::get_random_nonce(); let dh_secret = self .crypto - .cached_dh(&private_route.public_key, &compiled_route.secret) + .cached_dh(&pr_pubkey, &compiled_route.secret) .map_err(RPCError::map_internal("dh failed"))?; let enc_msg_data = Crypto::encrypt_aead(&message_data, &nonce, &dh_secret, None) .map_err(RPCError::map_internal("encryption failed"))?; @@ -432,6 +435,7 @@ impl RPCProcessor { let operation = RoutedOperation::new(nonce, enc_msg_data); // Prepare route operation + let sr_hop_count = compiled_route.safety_route.hop_count; let route_operation = RPCOperationRoute { safety_route: compiled_route.safety_route, operation, @@ -448,17 +452,8 @@ impl RPCProcessor { let out_message = builder_to_vec(route_msg)?; // Get the first hop this is going to - let out_node_id = compiled_route - .safety_route - .hops - .first() - .ok_or_else(RPCError::else_internal("no hop in safety route"))? - .dial_info - .node_id - .key; - - let out_hop_count = - (1 + compiled_route.safety_route.hop_count + private_route.hop_count) as usize; + let out_node_id = compiled_route.first_hop.node_id(); + let out_hop_count = (1 + sr_hop_count + pr_hop_count) as usize; let out = RenderedOperation { message: out_message, @@ -560,20 +555,21 @@ impl RPCProcessor { // as far as we can tell this is the only domain that will really benefit fn get_sender_signed_node_info(&self, dest: &Destination) -> Option { // Don't do this if the sender is to remain private - if dest.safety_route_spec().is_some() { + // Otherwise we would be attaching the original sender's identity to the final destination, + // thus defeating the purpose of the safety route entirely :P + if dest.get_safety_spec().is_some() { return None; } // Don't do this if our own signed node info isn't valid yet let routing_table = self.routing_table(); - let network_manager = self.network_manager(); - if !RoutingTable::has_valid_own_node_info(network_manager, RoutingDomain::PublicInternet) { + if !routing_table.has_valid_own_node_info(RoutingDomain::PublicInternet) { return None; } match dest { Destination::Direct { target, - safety_route_spec: _, + safety_spec: _, } => { // If the target has seen our node info already don't do this if target.has_seen_our_node_info(RoutingDomain::PublicInternet) { @@ -584,7 +580,7 @@ impl RPCProcessor { Destination::Relay { relay: _, target, - safety_route_spec: _, + safety_spec: _, } => { if let Some(target) = routing_table.lookup_node_ref(*target) { if target.has_seen_our_node_info(RoutingDomain::PublicInternet) { @@ -597,7 +593,8 @@ impl RPCProcessor { } Destination::PrivateRoute { private_route: _, - safety_route_spec: _, + safety_spec: _, + reliable: _, } => None, } } @@ -742,7 +739,14 @@ impl RPCProcessor { Destination::relay(peer_noderef, sender_id) } } - RespondTo::PrivateRoute(pr) => Destination::private_route(pr.clone()), + RespondTo::PrivateRoute(pr) => Destination::private_route( + pr.clone(), + request + .header + .connection_descriptor + .protocol_type() + .is_connection_oriented(), + ), } } diff --git a/veilid-core/src/veilid_api/privacy.rs b/veilid-core/src/veilid_api/privacy.rs index 9bdd05f8..a47721a0 100644 --- a/veilid-core/src/veilid_api/privacy.rs +++ b/veilid-core/src/veilid_api/privacy.rs @@ -116,5 +116,3 @@ impl fmt::Display for SafetyRoute { ) } } - -// xxx impl to_blob and from_blob using capnp here