diff --git a/veilid-core/src/attachment_manager.rs b/veilid-core/src/attachment_manager.rs index 0d834561..8c304e44 100644 --- a/veilid-core/src/attachment_manager.rs +++ b/veilid-core/src/attachment_manager.rs @@ -342,7 +342,7 @@ impl AttachmentManager { #[instrument(level = "trace", skip(self))] fn attach(&self) { // Create long-running connection maintenance routine - let inner = self.inner.lock(); + let mut inner = self.inner.lock(); if inner.attachment_maintainer_jh.is_some() { return; } diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 5a771d95..8ae5256f 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -788,6 +788,7 @@ impl NetworkManager { pub async fn handle_private_receipt>( &self, receipt_data: R, + private_route: DHTKey, ) -> NetworkResult<()> { let receipt_manager = self.receipt_manager(); @@ -799,7 +800,7 @@ impl NetworkManager { }; receipt_manager - .handle_receipt(receipt, ReceiptReturned::Private) + .handle_receipt(receipt, ReceiptReturned::Private { private_route }) .await } @@ -1023,7 +1024,8 @@ impl NetworkManager { // Wait for the return receipt let inbound_nr = match eventual_value.await.take_value().unwrap() { - ReceiptEvent::ReturnedPrivate | ReceiptEvent::ReturnedOutOfBand => { + ReceiptEvent::ReturnedPrivate { private_route: _ } + | ReceiptEvent::ReturnedOutOfBand => { return Ok(NetworkResult::invalid_message( "reverse connect receipt should be returned in-band", )); @@ -1124,7 +1126,8 @@ impl NetworkManager { // Wait for the return receipt let inbound_nr = match eventual_value.await.take_value().unwrap() { - ReceiptEvent::ReturnedPrivate | ReceiptEvent::ReturnedOutOfBand => { + ReceiptEvent::ReturnedPrivate { private_route: _ } + | ReceiptEvent::ReturnedOutOfBand => { return Ok(NetworkResult::invalid_message( "hole punch receipt should be returned in-band", )); diff --git a/veilid-core/src/network_manager/native/network_class_discovery.rs b/veilid-core/src/network_manager/native/network_class_discovery.rs index aff9bd28..cd4989ce 100644 --- a/veilid-core/src/network_manager/native/network_class_discovery.rs +++ b/veilid-core/src/network_manager/native/network_class_discovery.rs @@ -125,22 +125,24 @@ impl DiscoveryContext { RoutingDomain::PublicInternet, dial_info_filter.clone(), ); - let disallow_relays_filter = move |_rti, e: &BucketEntryInner| { - if let Some(n) = e.node_info(RoutingDomain::PublicInternet) { - n.relay_peer_info.is_none() - } else { - false - } - }; - let filter = RoutingTable::combine_entry_filters( - inbound_dial_info_entry_filter, - disallow_relays_filter, - ); + let disallow_relays_filter = Box::new( + move |rti: &RoutingTableInner, _k: DHTKey, v: Option>| { + let v = v.unwrap(); + v.with(rti, |_rti, e| { + if let Some(n) = e.node_info(RoutingDomain::PublicInternet) { + n.relay_peer_info.is_none() + } else { + false + } + }) + }, + ) as RoutingTableEntryFilter; + let filters = VecDeque::from([inbound_dial_info_entry_filter, disallow_relays_filter]); // Find public nodes matching this filter let peers = self .routing_table - .find_fast_public_nodes_filtered(node_count, filter); + .find_fast_public_nodes_filtered(node_count, filters); if peers.is_empty() { log_net!( "no external address detection peers of type {:?}:{:?}", diff --git a/veilid-core/src/network_manager/tasks.rs b/veilid-core/src/network_manager/tasks.rs index 5de6568c..ddcc065d 100644 --- a/veilid-core/src/network_manager/tasks.rs +++ b/veilid-core/src/network_manager/tasks.rs @@ -495,7 +495,7 @@ impl NetworkManager { // even the unreliable ones, and ask them to find nodes close to our node too let noderefs = routing_table.find_fastest_nodes( min_peer_count, - |_rti, _k, _v| true, + VecDeque::new(), |_rti, k: DHTKey, v: Option>| { NodeRef::new(routing_table.clone(), k, v.unwrap().clone(), None) }, diff --git a/veilid-core/src/receipt_manager.rs b/veilid-core/src/receipt_manager.rs index 28ba23d2..4ea815e8 100644 --- a/veilid-core/src/receipt_manager.rs +++ b/veilid-core/src/receipt_manager.rs @@ -11,7 +11,7 @@ use xx::*; pub enum ReceiptEvent { ReturnedOutOfBand, ReturnedInBand { inbound_noderef: NodeRef }, - ReturnedPrivate, + ReturnedPrivate { private_route: DHTKey }, Expired, Cancelled, } @@ -20,7 +20,7 @@ pub enum ReceiptEvent { pub enum ReceiptReturned { OutOfBand, InBand { inbound_noderef: NodeRef }, - Private, + Private { private_route: DHTKey }, } pub trait ReceiptCallback: Send + 'static { @@ -412,7 +412,7 @@ impl ReceiptManager { match receipt_returned { ReceiptReturned::OutOfBand => "OutOfBand".to_owned(), ReceiptReturned::InBand { ref inbound_noderef } => format!("InBand({})", inbound_noderef), - ReceiptReturned::Private => "Private".to_owned(), + ReceiptReturned::Private { ref private_route } => format!("Private({})", private_route), }, if extra_data.is_empty() { "".to_owned() @@ -450,7 +450,9 @@ impl ReceiptManager { } => ReceiptEvent::ReturnedInBand { inbound_noderef: inbound_noderef.clone(), }, - ReceiptReturned::Private => ReceiptEvent::ReturnedPrivate, + ReceiptReturned::Private { ref private_route } => ReceiptEvent::ReturnedPrivate { + private_route: private_route.clone(), + }, }; let callback_future = Self::perform_callback(receipt_event, &mut record_mut); diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index ef6837d1..96fb913b 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -25,8 +25,6 @@ pub use routing_domains::*; pub use routing_table_inner::*; pub use stats_accounting::*; -const RECENT_PEERS_TABLE_SIZE: usize = 64; - ////////////////////////////////////////////////////////////////////////// pub type LowLevelProtocolPorts = BTreeSet<(LowLevelProtocolType, AddressType, u16)>; @@ -36,6 +34,8 @@ pub struct LowLevelPortInfo { pub low_level_protocol_ports: LowLevelProtocolPorts, pub protocol_to_port: ProtocolToPortMapping, } +pub type RoutingTableEntryFilter = + Box FnMut(&'r RoutingTableInner, DHTKey, Option>) -> bool + Send>; #[derive(Clone, Debug, Default)] pub struct RoutingTableHealth { @@ -47,7 +47,7 @@ pub struct RoutingTableHealth { pub dead_entry_count: usize, } -struct RoutingTableUnlockedInner { +pub(super) struct RoutingTableUnlockedInner { // Accessors config: VeilidConfig, network_manager: NetworkManager, @@ -164,7 +164,7 @@ impl RoutingTable { debug!("finished route spec store init"); let mut inner = self.inner.write(); - inner.init(self.clone()); + inner.init(self.clone())?; inner.route_spec_store = Some(route_spec_store); @@ -192,7 +192,9 @@ impl RoutingTable { inner.route_spec_store.take() }; if let Some(rss) = rss { - rss.save().await; + if let Err(e) = rss.save().await { + error!("couldn't save route spec store: {}", e); + } } debug!("shutting down routing table"); @@ -545,23 +547,30 @@ impl RoutingTable { } /// Makes a filter that finds nodes with a matching inbound dialinfo - pub fn make_inbound_dial_info_entry_filter( + pub fn make_inbound_dial_info_entry_filter<'a>( routing_domain: RoutingDomain, dial_info_filter: DialInfoFilter, - ) -> Box bool> { + ) -> RoutingTableEntryFilter { // does it have matching public dial info? - Box::new(move |_rti, e| { - if let Some(ni) = e.node_info(routing_domain) { - if ni - .first_filtered_dial_info_detail(DialInfoDetail::NO_SORT, |did| { - did.matches_filter(&dial_info_filter) - }) + Box::new(move |rti, _k, e| { + if let Some(e) = e { + e.with(rti, |_rti, e| { + if let Some(ni) = e.node_info(routing_domain) { + if ni + .first_filtered_dial_info_detail(DialInfoDetail::NO_SORT, |did| { + did.matches_filter(&dial_info_filter) + }) + .is_some() + { + return true; + } + } + false + }) + } else { + rti.first_filtered_dial_info_detail(routing_domain.into(), &dial_info_filter) .is_some() - { - return true; - } } - false }) } @@ -569,48 +578,36 @@ impl RoutingTable { pub fn make_outbound_dial_info_entry_filter( routing_domain: RoutingDomain, dial_info: DialInfo, - ) -> Box bool> { + ) -> RoutingTableEntryFilter { // does the node's outbound capabilities match the dialinfo? - Box::new(move |_rti, e| { - if let Some(ni) = e.node_info(routing_domain) { - let dif = DialInfoFilter::all() - .with_protocol_type_set(ni.outbound_protocols) - .with_address_type_set(ni.address_types); - if dial_info.matches_filter(&dif) { - return true; - } + Box::new(move |rti, _k, e| { + if let Some(e) = e { + e.with(rti, |_rti, e| { + if let Some(ni) = e.node_info(routing_domain) { + let dif = DialInfoFilter::all() + .with_protocol_type_set(ni.outbound_protocols) + .with_address_type_set(ni.address_types); + if dial_info.matches_filter(&dif) { + return true; + } + } + false + }) + } else { + let dif = rti.get_outbound_dial_info_filter(routing_domain); + dial_info.matches_filter(&dif) } - false }) } - /// Make a filter that wraps another filter - pub fn combine_entry_filters( - mut f1: Box bool>, - mut f2: Box bool>, - ) -> Box bool> { - Box::new(move |rti, e| { - if !f1(rti, e) { - return false; - } - if !f2(rti, e) { - return false; - } - true - }) - } - - pub fn find_fast_public_nodes_filtered( + pub fn find_fast_public_nodes_filtered( &self, node_count: usize, - mut entry_filter: F, - ) -> Vec - where - F: FnMut(&RoutingTableInner, &BucketEntryInner) -> bool, - { + filters: VecDeque, + ) -> Vec { self.inner .read() - .find_fast_public_nodes_filtered(self.clone(), node_count, entry_filter) + .find_fast_public_nodes_filtered(self.clone(), node_count, filters) } /// Retrieve up to N of each type of protocol capable nodes @@ -621,14 +618,12 @@ impl RoutingTable { ProtocolType::WS, ProtocolType::WSS, ]; + let protocol_types_len = protocol_types.len(); let mut nodes_proto_v4 = vec![0usize, 0usize, 0usize, 0usize]; let mut nodes_proto_v6 = vec![0usize, 0usize, 0usize, 0usize]; - self.find_fastest_nodes( - // count - protocol_types.len() * 2 * max_per_type, - // filter - move |rti, _k: DHTKey, v: Option>| { + let filter = Box::new( + move |rti: &RoutingTableInner, _k: DHTKey, v: Option>| { let entry = v.unwrap(); entry.with(rti, |_rti, e| { // skip nodes on our local network here @@ -668,64 +663,68 @@ impl RoutingTable { .unwrap_or(false) }) }, - // transform + ) as RoutingTableEntryFilter; + + let filters = VecDeque::from([filter]); + + self.find_fastest_nodes( + protocol_types_len * 2 * max_per_type, + filters, |_rti, k: DHTKey, v: Option>| { NodeRef::new(self.clone(), k, v.unwrap().clone(), None) }, ) } - pub fn find_peers_with_sort_and_filter<'a, 'b, F, C, T, O>( + pub fn find_peers_with_sort_and_filter( &self, node_count: usize, cur_ts: u64, - mut filter: F, + filters: VecDeque, compare: C, - mut transform: T, + transform: T, ) -> Vec where - F: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> bool, - C: FnMut( + C: for<'a, 'b> FnMut( &'a RoutingTableInner, &'b (DHTKey, Option>), &'b (DHTKey, Option>), ) -> core::cmp::Ordering, - T: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> O, + T: for<'r> FnMut(&'r RoutingTableInner, DHTKey, Option>) -> O + Send, { self.inner .read() - .find_peers_with_sort_and_filter(node_count, cur_ts, filter, compare, transform) + .find_peers_with_sort_and_filter(node_count, cur_ts, filters, compare, transform) } - pub fn find_fastest_nodes<'a, T, F, O>( + pub fn find_fastest_nodes<'a, T, O>( &self, node_count: usize, - mut filter: F, + filters: VecDeque, transform: T, ) -> Vec where - F: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> bool, - T: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> O, + T: for<'r> FnMut(&'r RoutingTableInner, DHTKey, Option>) -> O + Send, { self.inner .read() - .find_fastest_nodes(node_count, filter, transform) + .find_fastest_nodes(node_count, filters, transform) } - pub fn find_closest_nodes<'a, F, T, O>( + pub fn find_closest_nodes<'a, T, O>( &self, node_id: DHTKey, - filter: F, - mut transform: T, + filters: VecDeque, + transform: T, ) -> Vec where - F: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> bool, - T: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> O, + T: for<'r> FnMut(&'r RoutingTableInner, DHTKey, Option>) -> O + Send, { self.inner .read() - .find_closest_nodes(node_id, filter, transform) + .find_closest_nodes(node_id, filters, transform) } + #[instrument(level = "trace", skip(self), ret)] pub fn register_find_node_answer(&self, peers: Vec) -> Vec { let node_id = self.node_id(); diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index 9d49d7fa..12dddeed 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -124,7 +124,7 @@ fn route_permutation_to_hop_cache(nodes: &[(DHTKey, NodeInfo)], perm: &[usize]) /// number of route permutations is the number of unique orderings /// for a set of nodes, given that the first node is fixed -fn get_route_permutation_count(hop_count: usize) -> usize { +fn _get_route_permutation_count(hop_count: usize) -> usize { if hop_count == 0 { unreachable!(); } @@ -374,39 +374,43 @@ impl RouteSpecStore { // Get list of all nodes, and sort them for selection let cur_ts = intf::get_timestamp(); - let filter = |rti, _k: DHTKey, v: Option>| -> bool { - // Exclude our own node from routes - if v.is_none() { - return false; - } - let v = v.unwrap(); + let filter = Box::new( + move |rti: &RoutingTableInner, _k: DHTKey, v: Option>| -> bool { + // Exclude our own node from routes + if v.is_none() { + return false; + } + let v = v.unwrap(); - // Exclude nodes on our local network - let on_local_network = v.with(rti, |_rti, e| { - e.node_info(RoutingDomain::LocalNetwork).is_some() - }); - if on_local_network { - return false; - } + // Exclude nodes on our local network + let on_local_network = v.with(rti, |_rti, e| { + e.node_info(RoutingDomain::LocalNetwork).is_some() + }); + if on_local_network { + return false; + } - // Exclude nodes with no publicinternet nodeinfo, or incompatible nodeinfo or node status won't route - v.with(rti, |_rti, e| { - let node_info_ok = if let Some(ni) = e.node_info(RoutingDomain::PublicInternet) { - ni.has_sequencing_matched_dial_info(sequencing) - } else { - false - }; - let node_status_ok = if let Some(ns) = e.node_status(RoutingDomain::PublicInternet) - { - ns.will_route() - } else { - false - }; + // Exclude nodes with no publicinternet nodeinfo, or incompatible nodeinfo or node status won't route + v.with(rti, move |_rti, e| { + let node_info_ok = if let Some(ni) = e.node_info(RoutingDomain::PublicInternet) + { + ni.has_sequencing_matched_dial_info(sequencing) + } else { + false + }; + let node_status_ok = + if let Some(ns) = e.node_status(RoutingDomain::PublicInternet) { + ns.will_route() + } else { + false + }; - node_info_ok && node_status_ok - }) - }; - let compare = |rti, + node_info_ok && node_status_ok + }) + }, + ) as RoutingTableEntryFilter; + let filters = VecDeque::from([filter]); + let compare = |rti: &RoutingTableInner, v1: &(DHTKey, Option>), v2: &(DHTKey, Option>)| -> Ordering { @@ -461,7 +465,10 @@ impl RouteSpecStore { }); cmpout }; - let transform = |rti, k: DHTKey, v: Option>| -> (DHTKey, NodeInfo) { + let transform = |rti: &RoutingTableInner, + k: DHTKey, + v: Option>| + -> (DHTKey, NodeInfo) { // Return the key and the nodeinfo for that key ( k, @@ -479,7 +486,7 @@ impl RouteSpecStore { BucketEntryState::Unreliable, ); let nodes = - rti.find_peers_with_sort_and_filter(node_count, cur_ts, filter, compare, transform); + rti.find_peers_with_sort_and_filter(node_count, cur_ts, filters, compare, transform); // If we couldn't find enough nodes, wait until we have more nodes in the routing table if nodes.len() < hop_count { @@ -606,12 +613,49 @@ impl RouteSpecStore { Ok(Some(public_key)) } - pub fn with_route_spec_detail(&self, public_key: &DHTKey, f: F) -> Option - where - F: FnOnce(&RouteSpecDetail) -> R, - { - let inner = self.inner.lock(); - Self::detail(&*inner, &public_key).map(f) + pub fn validate_signatures( + &self, + public_key: &DHTKey, + signatures: &[DHTSignature], + data: &[u8], + last_hop_id: DHTKey, + ) -> EyreResult> { + let inner = &*self.inner.lock(); + let rsd = Self::detail(inner, &public_key).ok_or_else(|| eyre!("route does not exist"))?; + + // Ensure we have the right number of signatures + if signatures.len() != rsd.hops.len() - 1 { + // Wrong number of signatures + log_rpc!(debug "wrong number of signatures ({} should be {}) for routed operation on private route {}", signatures.len(), rsd.hops.len() - 1, public_key); + return Ok(None); + } + // Validate signatures to ensure the route was handled by the nodes and not messed with + for (hop_n, hop_public_key) in rsd.hops.iter().enumerate() { + // The last hop is not signed, as the whole packet is signed + if hop_n == signatures.len() { + // Verify the node we received the routed operation from is the last hop in our route + if *hop_public_key != last_hop_id { + log_rpc!(debug "received routed operation from the wrong hop ({} should be {}) on private route {}", hop_public_key.encode(), last_hop_id.encode(), public_key); + return Ok(None); + } + } else { + // Verify a signature for a hop node along the route + if let Err(e) = verify(hop_public_key, data, &signatures[hop_n]) { + log_rpc!(debug "failed to verify signature for hop {} at {} on private route {}: {}", hop_n, hop_public_key, public_key, e); + return Ok(None); + } + } + } + // We got the correct signatures, return a key ans + Ok(Some(( + rsd.secret_key, + SafetySelection::Safe(SafetySpec { + preferred_route: Some(*public_key), + hop_count: rsd.hops.len(), + stability: rsd.stability, + sequencing: rsd.sequencing, + }), + ))) } pub fn release_route(&self, public_key: DHTKey) { diff --git a/veilid-core/src/routing_table/routing_domains.rs b/veilid-core/src/routing_table/routing_domains.rs index 6729667a..25efeddd 100644 --- a/veilid-core/src/routing_table/routing_domains.rs +++ b/veilid-core/src/routing_table/routing_domains.rs @@ -2,7 +2,7 @@ use super::*; /// Mechanism required to contact another node #[derive(Clone, Debug)] -pub(crate) enum ContactMethod { +pub enum ContactMethod { /// Node is not reachable by any means Unreachable, /// Connection should have already existed diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index 579c5c55..2e7c9989 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -34,7 +34,7 @@ pub struct RoutingTableInner { } impl RoutingTableInner { - pub fn new(unlocked_inner: Arc) -> RoutingTableInner { + pub(super) fn new(unlocked_inner: Arc) -> RoutingTableInner { RoutingTableInner { unlocked_inner, buckets: Vec::new(), @@ -794,22 +794,16 @@ impl RoutingTableInner { // Find Nodes // Retrieve the fastest nodes in the routing table matching an entry filter - pub fn find_fast_public_nodes_filtered( + pub fn find_fast_public_nodes_filtered( &self, outer_self: RoutingTable, node_count: usize, - mut entry_filter: F, - ) -> Vec - where - F: FnMut(&RoutingTableInner, &BucketEntryInner) -> bool, - { - self.find_fastest_nodes( - // count - node_count, - // filter - |rti, _k: DHTKey, v: Option>| { + mut filters: VecDeque, + ) -> Vec { + let public_node_filter = Box::new( + |rti: &RoutingTableInner, _k: DHTKey, v: Option>| { let entry = v.unwrap(); - entry.with(rti, |rti, e| { + entry.with(rti, |_rti, e| { // skip nodes on local network if e.node_info(RoutingDomain::LocalNetwork).is_some() { return false; @@ -818,12 +812,16 @@ impl RoutingTableInner { if e.node_info(RoutingDomain::PublicInternet).is_none() { return false; } - // skip nodes that dont match entry filter - entry_filter(rti, e) + true }) }, - // transform - |_rti, k: DHTKey, v: Option>| { + ) as RoutingTableEntryFilter; + filters.push_front(public_node_filter); + + self.find_fastest_nodes( + node_count, + filters, + |_rti: &RoutingTableInner, k: DHTKey, v: Option>| { NodeRef::new(outer_self.clone(), k, v.unwrap().clone(), None) }, ) @@ -858,37 +856,42 @@ impl RoutingTableInner { } } - pub fn find_peers_with_sort_and_filter<'a, 'b, F, C, T, O>( + pub fn find_peers_with_sort_and_filter( &self, node_count: usize, cur_ts: u64, - mut filter: F, - compare: C, + mut filters: VecDeque, + mut compare: C, mut transform: T, ) -> Vec where - F: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> bool, - C: FnMut( + C: for<'a, 'b> FnMut( &'a RoutingTableInner, &'b (DHTKey, Option>), &'b (DHTKey, Option>), ) -> core::cmp::Ordering, - T: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> O, + T: for<'r> FnMut(&'r RoutingTableInner, DHTKey, Option>) -> O, { // collect all the nodes for sorting let mut nodes = Vec::<(DHTKey, Option>)>::with_capacity(self.bucket_entry_count + 1); // add our own node (only one of there with the None entry) - if filter(self, self.unlocked_inner.node_id, None) { - nodes.push((self.unlocked_inner.node_id, None)); + for filter in &mut filters { + if filter(self, self.unlocked_inner.node_id, None) { + nodes.push((self.unlocked_inner.node_id, None)); + break; + } } // add all nodes from buckets self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, k, v| { // Apply filter - if filter(rti, k, Some(v.clone())) { - nodes.push((k, Some(v.clone()))); + for filter in &mut filters { + if filter(rti, k, Some(v.clone())) { + nodes.push((k, Some(v.clone()))); + break; + } } Option::<()>::None }); @@ -907,97 +910,99 @@ impl RoutingTableInner { out } - pub fn find_fastest_nodes<'a, T, F, O>( + pub fn find_fastest_nodes( &self, node_count: usize, - mut filter: F, + mut filters: VecDeque, transform: T, ) -> Vec where - F: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> bool, - T: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> O, + T: for<'r> FnMut(&'r RoutingTableInner, DHTKey, Option>) -> O, { let cur_ts = intf::get_timestamp(); - let out = self.find_peers_with_sort_and_filter( - node_count, - cur_ts, - // filter - |rti, k, v| { + + // Add filter to remove dead nodes always + let filter_dead = Box::new( + move |rti: &RoutingTableInner, _k: DHTKey, v: Option>| { if let Some(entry) = &v { // always filter out dead nodes if entry.with(rti, |_rti, e| e.state(cur_ts) == BucketEntryState::Dead) { false } else { - filter(rti, k, v) + true } } else { // always filter out self peer, as it is irrelevant to the 'fastest nodes' search false } }, - // sort - |rti, (a_key, a_entry), (b_key, b_entry)| { - // same nodes are always the same - if a_key == b_key { - return core::cmp::Ordering::Equal; - } - // our own node always comes last (should not happen, here for completeness) - if a_entry.is_none() { - return core::cmp::Ordering::Greater; - } - if b_entry.is_none() { - return core::cmp::Ordering::Less; - } - // reliable nodes come first - let ae = a_entry.as_ref().unwrap(); - let be = b_entry.as_ref().unwrap(); - ae.with(rti, |rti, ae| { - be.with(rti, |_rti, be| { - let ra = ae.check_reliable(cur_ts); - let rb = be.check_reliable(cur_ts); - if ra != rb { - if ra { - return core::cmp::Ordering::Less; - } else { - return core::cmp::Ordering::Greater; - } - } + ) as RoutingTableEntryFilter; + filters.push_front(filter_dead); - // latency is the next metric, closer nodes first - let a_latency = match ae.peer_stats().latency.as_ref() { - None => { - // treat unknown latency as slow - return core::cmp::Ordering::Greater; - } - Some(l) => l, - }; - let b_latency = match be.peer_stats().latency.as_ref() { - None => { - // treat unknown latency as slow - return core::cmp::Ordering::Less; - } - Some(l) => l, - }; - // Sort by average latency - a_latency.average.cmp(&b_latency.average) - }) + // Fastest sort + let sort = |rti: &RoutingTableInner, + (a_key, a_entry): &(DHTKey, Option>), + (b_key, b_entry): &(DHTKey, Option>)| { + // same nodes are always the same + if a_key == b_key { + return core::cmp::Ordering::Equal; + } + // our own node always comes last (should not happen, here for completeness) + if a_entry.is_none() { + return core::cmp::Ordering::Greater; + } + if b_entry.is_none() { + return core::cmp::Ordering::Less; + } + // reliable nodes come first + let ae = a_entry.as_ref().unwrap(); + let be = b_entry.as_ref().unwrap(); + ae.with(rti, |rti, ae| { + be.with(rti, |_rti, be| { + let ra = ae.check_reliable(cur_ts); + let rb = be.check_reliable(cur_ts); + if ra != rb { + if ra { + return core::cmp::Ordering::Less; + } else { + return core::cmp::Ordering::Greater; + } + } + + // latency is the next metric, closer nodes first + let a_latency = match ae.peer_stats().latency.as_ref() { + None => { + // treat unknown latency as slow + return core::cmp::Ordering::Greater; + } + Some(l) => l, + }; + let b_latency = match be.peer_stats().latency.as_ref() { + None => { + // treat unknown latency as slow + return core::cmp::Ordering::Less; + } + Some(l) => l, + }; + // Sort by average latency + a_latency.average.cmp(&b_latency.average) }) - }, - // transform, - transform, - ); + }) + }; + + let out = + self.find_peers_with_sort_and_filter(node_count, cur_ts, filters, sort, transform); out } - pub fn find_closest_nodes<'a, F, T, O>( + pub fn find_closest_nodes( &self, node_id: DHTKey, - filter: F, - mut transform: T, + filters: VecDeque, + transform: T, ) -> Vec where - F: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> bool, - T: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> O, + T: for<'r> FnMut(&'r RoutingTableInner, DHTKey, Option>) -> O, { let cur_ts = intf::get_timestamp(); let node_count = { @@ -1005,41 +1010,39 @@ impl RoutingTableInner { let c = config.get(); c.network.dht.max_find_node_count as usize }; - let out = self.find_peers_with_sort_and_filter( - node_count, - cur_ts, - // filter - filter, - // sort - |rti, (a_key, a_entry), (b_key, b_entry)| { - // same nodes are always the same - if a_key == b_key { - return core::cmp::Ordering::Equal; - } - // reliable nodes come first, pessimistically treating our own node as unreliable - let ra = a_entry - .as_ref() - .map_or(false, |x| x.with(rti, |_rti, x| x.check_reliable(cur_ts))); - let rb = b_entry - .as_ref() - .map_or(false, |x| x.with(rti, |_rti, x| x.check_reliable(cur_ts))); - if ra != rb { - if ra { - return core::cmp::Ordering::Less; - } else { - return core::cmp::Ordering::Greater; - } - } + // closest sort + let sort = |rti: &RoutingTableInner, + (a_key, a_entry): &(DHTKey, Option>), + (b_key, b_entry): &(DHTKey, Option>)| { + // same nodes are always the same + if a_key == b_key { + return core::cmp::Ordering::Equal; + } - // distance is the next metric, closer nodes first - let da = distance(a_key, &node_id); - let db = distance(b_key, &node_id); - da.cmp(&db) - }, - // transform, - &mut transform, - ); + // reliable nodes come first, pessimistically treating our own node as unreliable + let ra = a_entry + .as_ref() + .map_or(false, |x| x.with(rti, |_rti, x| x.check_reliable(cur_ts))); + let rb = b_entry + .as_ref() + .map_or(false, |x| x.with(rti, |_rti, x| x.check_reliable(cur_ts))); + if ra != rb { + if ra { + return core::cmp::Ordering::Less; + } else { + return core::cmp::Ordering::Greater; + } + } + + // distance is the next metric, closer nodes first + let da = distance(a_key, &node_id); + let db = distance(b_key, &node_id); + da.cmp(&db) + }; + + let out = + self.find_peers_with_sort_and_filter(node_count, cur_ts, filters, sort, transform); log_rtab!(">> find_closest_nodes: node count = {}", out.len()); out } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index b68dac80..e020aa45 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -171,7 +171,6 @@ pub struct RPCProcessorUnlockedInner { queue_size: u32, concurrency: u32, max_route_hop_count: usize, - default_route_hop_count: usize, validate_dial_info_receipt_time_ms: u32, update_callback: UpdateCallback, waiting_rpc_table: OperationWaiter, @@ -208,7 +207,6 @@ impl RPCProcessor { let queue_size = c.network.rpc.queue_size; let timeout = ms_to_us(c.network.rpc.timeout_ms); let max_route_hop_count = c.network.rpc.max_route_hop_count as usize; - let default_route_hop_count = c.network.rpc.default_route_hop_count as usize; if concurrency == 0 { concurrency = intf::get_concurrency() / 2; if concurrency == 0 { @@ -222,7 +220,6 @@ impl RPCProcessor { queue_size, concurrency, max_route_hop_count, - default_route_hop_count, validate_dial_info_receipt_time_ms, update_callback, waiting_rpc_table: OperationWaiter::new(), @@ -418,7 +415,7 @@ impl RPCProcessor { } // Wrap an operation with a private route inside a safety route - pub(super) fn wrap_with_route( + fn wrap_with_route( &self, safety_selection: SafetySelection, private_route: PrivateRoute, @@ -540,6 +537,7 @@ impl RPCProcessor { match safety_selection { SafetySelection::Unsafe(sequencing) => { // Apply safety selection sequencing requirement if it is more strict than the node_ref's sequencing requirement + let mut node_ref = node_ref.clone(); if sequencing > node_ref.sequencing() { node_ref.set_sequencing(sequencing) } diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index b5674b70..65dde614 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -62,22 +62,26 @@ impl RPCProcessor { // add node information for the requesting node to our routing table let routing_table = self.routing_table(); - let network_manager = self.network_manager(); let has_valid_own_node_info = routing_table.has_valid_own_node_info(RoutingDomain::PublicInternet); let own_peer_info = routing_table.get_own_peer_info(RoutingDomain::PublicInternet); // find N nodes closest to the target node in our routing table - let closest_nodes = routing_table.find_closest_nodes( - find_node_q.node_id, - // filter - |rti, _k, v| { + + let filter = Box::new( + move |rti: &RoutingTableInner, _k: DHTKey, v: Option>| { rti.filter_has_valid_signed_node_info( RoutingDomain::PublicInternet, has_valid_own_node_info, v, ) }, + ) as RoutingTableEntryFilter; + let filters = VecDeque::from([filter]); + + let closest_nodes = routing_table.find_closest_nodes( + find_node_q.node_id, + filters, // transform |rti, k, v| { rti.transform_to_peer_info( diff --git a/veilid-core/src/rpc_processor/rpc_return_receipt.rs b/veilid-core/src/rpc_processor/rpc_return_receipt.rs index 667ca81e..3087dd0f 100644 --- a/veilid-core/src/rpc_processor/rpc_return_receipt.rs +++ b/veilid-core/src/rpc_processor/rpc_return_receipt.rs @@ -45,7 +45,7 @@ impl RPCProcessor { RPCMessageHeaderDetail::PrivateRoute(detail) => { network_result_value_or_log!(debug network_manager - .handle_private_receipt(receipt) + .handle_private_receipt(receipt, detail.private_route) .await => {} ); } diff --git a/veilid-core/src/rpc_processor/rpc_route.rs b/veilid-core/src/rpc_processor/rpc_route.rs index b12fa50b..6c3f4384 100644 --- a/veilid-core/src/rpc_processor/rpc_route.rs +++ b/veilid-core/src/rpc_processor/rpc_route.rs @@ -121,7 +121,8 @@ impl RPCProcessor { if next_private_route.hop_count != 0 { let node_id = self.routing_table.node_id(); let node_id_secret = self.routing_table.node_id_secret(); - let sig = sign(&node_id, &node_id_secret, &route.operation.data).map_err(RPCError::internal)?; + let sig = sign(&node_id, &node_id_secret, &route.operation.data) + .map_err(RPCError::internal)?; route.operation.signatures.push(sig); } @@ -169,14 +170,16 @@ impl RPCProcessor { // If the private route public key is our node id, then this was sent via safety route to our node directly // so there will be no signatures to validate - let opt_pr_info = if private_route.public_key == self.routing_table.node_id() { + let (secret_key, safety_selection) = if private_route.public_key + == self.routing_table.node_id() + { // The private route was a stub // Return our secret key and an appropriate safety selection // // Note: it is important that we never respond with a safety route to questions that come - // in without a private route. Giving away a safety route when the node id is known is + // in without a private route. Giving away a safety route when the node id is known is // a privacy violation! - + // Get sequencing preference let sequencing = if detail .connection_descriptor @@ -187,65 +190,25 @@ impl RPCProcessor { } else { Sequencing::NoPreference }; - Some(( - self.routing_table.node_id_secret(), + ( + self.routing_table.node_id_secret(), SafetySelection::Unsafe(sequencing), - )) + ) } else { // Get sender id let sender_id = detail.envelope.get_sender_id(); // Look up the private route and ensure it's one in our spec store - let rss= self.routing_table.route_spec_store(); - let opt_signatures_valid = rss.with_route_spec_detail(&private_route.public_key, |rsd| { - // Ensure we have the right number of signatures - if routed_operation.signatures.len() != rsd.hops.len() - 1 { - // Wrong number of signatures - log_rpc!(debug "wrong number of signatures ({} should be {}) for routed operation on private route {}", routed_operation.signatures.len(), rsd.hops.len() - 1, private_route.public_key); - return None; - } - // Validate signatures to ensure the route was handled by the nodes and not messed with - for (hop_n, hop_public_key) in rsd.hops.iter().enumerate() { - // The last hop is not signed, as the whole packet is signed - if hop_n == routed_operation.signatures.len() { - // Verify the node we received the routed operation from is the last hop in our route - if *hop_public_key != sender_id { - log_rpc!(debug "received routed operation from the wrong hop ({} should be {}) on private route {}", hop_public_key.encode(), sender_id.encode(), private_route.public_key); - return None; - } - } else { - // Verify a signature for a hop node along the route - if let Err(e) = verify( - hop_public_key, - &routed_operation.data, - &routed_operation.signatures[hop_n], - ) { - log_rpc!(debug "failed to verify signature for hop {} at {} on private route {}", hop_n, hop_public_key, private_route.public_key); - return None; - } - } - } - // We got the correct signatures, return a key ans - Some(( - rsd.secret_key, - SafetySelection::Safe(SafetySpec { - preferred_route: Some(private_route.public_key), - hop_count: rsd.hops.len(), - stability: rsd.stability, - sequencing: rsd.sequencing, - }) - )) - }); - opt_signatures_valid.ok_or_else(|| { - RPCError::protocol("routed operation received on unallocated private route") - })? + let rss = self.routing_table.route_spec_store(); + rss.validate_signatures( + &private_route.public_key, + &routed_operation.signatures, + &routed_operation.data, + sender_id, + ) + .map_err(RPCError::protocol)? + .ok_or_else(|| RPCError::protocol("signatures did not validate for private route"))? }; - if opt_pr_info.is_none() { - return Err(RPCError::protocol( - "signatures did not validate for private route", - )); - } - let (secret_key, safety_selection) = opt_pr_info.unwrap(); // Now that things are valid, decrypt the routed operation with DEC(nonce, DH(the SR's public key, the PR's (or node's) secret) // xxx: punish nodes that send messages that fail to decrypt eventually @@ -344,8 +307,13 @@ impl RPCProcessor { .await?; } else { // Private route is empty, process routed operation - self.process_routed_operation(detail, route.operation, &route.safety_route, &private_route) - .await?; + self.process_routed_operation( + detail, + route.operation, + &route.safety_route, + &private_route, + ) + .await?; } } else if blob_tag == 0 { // RouteHop @@ -410,8 +378,13 @@ impl RPCProcessor { .await?; } else { // No hops left, time to process the routed operation - self.process_routed_operation(detail, route.operation, &route.safety_route, private_route) - .await?; + self.process_routed_operation( + detail, + route.operation, + &route.safety_route, + private_route, + ) + .await?; } } } diff --git a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs index 72b5f2f3..167f9527 100644 --- a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs +++ b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs @@ -34,7 +34,8 @@ impl RPCProcessor { // Wait for receipt match eventual_value.await.take_value().unwrap() { - ReceiptEvent::ReturnedPrivate | ReceiptEvent::ReturnedInBand { inbound_noderef: _ } => { + ReceiptEvent::ReturnedPrivate { private_route: _ } + | ReceiptEvent::ReturnedInBand { inbound_noderef: _ } => { log_net!(debug "validate_dial_info receipt should be returned out-of-band".green()); Ok(false) } @@ -94,20 +95,26 @@ impl RPCProcessor { routing_domain, dial_info.clone(), ); - let will_validate_dial_info_filter = Box::new(move |_rti, e: &BucketEntryInner| { - if let Some(status) = &e.node_status(routing_domain) { - status.will_validate_dial_info() - } else { - true - } - }); - let filter = RoutingTable::combine_entry_filters( + let will_validate_dial_info_filter = Box::new( + move |rti: &RoutingTableInner, _k: DHTKey, v: Option>| { + let entry = v.unwrap(); + entry.with(rti, move |_rti, e| { + if let Some(status) = &e.node_status(routing_domain) { + status.will_validate_dial_info() + } else { + true + } + }) + }, + ) as RoutingTableEntryFilter; + + let filters = VecDeque::from([ outbound_dial_info_entry_filter, - will_validate_dial_info_filter, fuck this shit. do it tomorrow. - ); + will_validate_dial_info_filter, + ]); // Find nodes matching filter to redirect this to - let peers = routing_table.find_fast_public_nodes_filtered(node_count, filter); + let peers = routing_table.find_fast_public_nodes_filtered(node_count, filters); if peers.is_empty() { return Err(RPCError::internal(format!( "no peers able to reach dialinfo '{:?}'", diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 9f14bd68..16a7ef4f 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -647,7 +647,7 @@ impl NodeInfo { .as_ref() .map(|rpi| { let relay_ni = &rpi.signed_node_info.node_info; - for did in relay_ni.dial_info_detail_list { + for did in &relay_ni.dial_info_detail_list { match sequencing { Sequencing::NoPreference | Sequencing::PreferOrdered => return true, Sequencing::EnsureOrdered => {