From 6d5df71ac1e195f172a621acf32bc8f7c05d4ff0 Mon Sep 17 00:00:00 2001 From: John Smith Date: Tue, 18 Oct 2022 21:53:45 -0400 Subject: [PATCH] routing table refactor --- veilid-core/src/network_manager/mod.rs | 141 +- veilid-core/src/routing_table/bucket_entry.rs | 17 +- veilid-core/src/routing_table/find_nodes.rs | 647 --------- veilid-core/src/routing_table/mod.rs | 1287 +++++++---------- veilid-core/src/routing_table/node_ref.rs | 14 +- .../src/routing_table/route_spec_store.rs | 67 +- .../routing_table/routing_domain_editor.rs | 7 +- .../src/routing_table/routing_table_inner.rs | 1033 +++++++++++++ veilid-core/src/routing_table/tasks.rs | 2 +- .../coders/private_safety_route.rs | 158 +- veilid-core/src/rpc_processor/destination.rs | 44 +- veilid-core/src/rpc_processor/mod.rs | 6 +- .../src/rpc_processor/rpc_find_node.rs | 9 +- veilid-core/src/veilid_api/debug.rs | 2 +- veilid-core/src/veilid_api/privacy.rs | 4 +- veilid-core/src/veilid_api/routing_context.rs | 48 +- veilid-server/src/settings.rs | 4 +- 17 files changed, 1904 insertions(+), 1586 deletions(-) delete mode 100644 veilid-core/src/routing_table/find_nodes.rs create mode 100644 veilid-core/src/routing_table/routing_table_inner.rs diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 00d8ac63..6bca250c 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -644,7 +644,7 @@ impl NetworkManager { Ok(()) } - // Get our node's capabilities + /// Get our node's capabilities in the PublicInternet routing domain fn generate_public_internet_node_status(&self) -> PublicInternetNodeStatus { let node_info = self .routing_table() @@ -664,6 +664,7 @@ impl NetworkManager { will_validate_dial_info, } } + /// Get our node's capabilities in the LocalNetwork routing domain fn generate_local_network_node_status(&self) -> LocalNetworkNodeStatus { let node_info = self .routing_table() @@ -689,7 +690,7 @@ impl NetworkManager { } } - // Generates a multi-shot/normal receipt + /// Generates a multi-shot/normal receipt #[instrument(level = "trace", skip(self, extra_data, callback), err)] pub fn generate_receipt>( &self, @@ -715,7 +716,7 @@ impl NetworkManager { Ok(out) } - // Generates a single-shot/normal receipt + /// Generates a single-shot/normal receipt #[instrument(level = "trace", skip(self, extra_data), err)] pub fn generate_single_shot_receipt>( &self, @@ -741,7 +742,7 @@ impl NetworkManager { Ok((out, instance)) } - // Process a received out-of-band receipt + /// Process a received out-of-band receipt #[instrument(level = "trace", skip(self, receipt_data), ret)] pub async fn handle_out_of_band_receipt>( &self, @@ -759,7 +760,7 @@ impl NetworkManager { receipt_manager.handle_receipt(receipt, None).await } - // Process a received in-band receipt + /// Process a received in-band receipt #[instrument(level = "trace", skip(self, receipt_data), ret)] pub async fn handle_in_band_receipt>( &self, @@ -871,7 +872,7 @@ impl NetworkManager { } } - // Builds an envelope for sending over the network + /// Builds an envelope for sending over the network #[instrument(level = "trace", skip(self, body), err)] fn build_envelope>( &self, @@ -895,10 +896,10 @@ impl NetworkManager { .wrap_err("envelope failed to encode") } - // Called by the RPC handler when we want to issue an RPC request or response - // node_ref is the direct destination to which the envelope will be sent - // If 'node_id' is specified, it can be different than node_ref.node_id() - // which will cause the envelope to be relayed + /// Called by the RPC handler when we want to issue an RPC request or response + /// node_ref is the direct destination to which the envelope will be sent + /// If 'node_id' is specified, it can be different than node_ref.node_id() + /// which will cause the envelope to be relayed #[instrument(level = "trace", skip(self, body), ret, err)] pub async fn send_envelope>( &self, @@ -942,7 +943,7 @@ impl NetworkManager { self.send_data(node_ref.clone(), out).await } - // Called by the RPC handler when we want to issue an direct receipt + /// Called by the RPC handler when we want to issue an direct receipt #[instrument(level = "trace", skip(self, rcpt_data), err)] pub async fn send_out_of_band_receipt( &self, @@ -967,9 +968,9 @@ impl NetworkManager { Ok(()) } - // Send a reverse connection signal and wait for the return receipt over it - // Then send the data across the new connection - // Only usable for PublicInternet routing domain + /// Send a reverse connection signal and wait for the return receipt over it + /// Then send the data across the new connection + /// Only usable for PublicInternet routing domain #[instrument(level = "trace", skip(self, data), err)] pub async fn do_reverse_connect( &self, @@ -1041,9 +1042,9 @@ impl NetworkManager { } } - // Send a hole punch signal and do a negotiating ping and wait for the return receipt - // Then send the data across the new connection - // Only usable for PublicInternet routing domain + /// Send a hole punch signal and do a negotiating ping and wait for the return receipt + /// Then send the data across the new connection + /// Only usable for PublicInternet routing domain #[instrument(level = "trace", skip(self, data), err)] pub async fn do_hole_punch( &self, @@ -1146,15 +1147,99 @@ impl NetworkManager { } } - // Send raw data to a node - // - // We may not have dial info for a node, but have an existing connection for it - // because an inbound connection happened first, and no FindNodeQ has happened to that - // node yet to discover its dial info. The existing connection should be tried first - // in this case. - // - // Sending to a node requires determining a NetworkClass compatible mechanism - // + /// Figure out how to reach a node from our own node over the best routing domain and reference the nodes we want to access + /// Uses NodeRefs to ensure nodes are referenced, this is not a part of 'RoutingTable' because RoutingTable is not + /// allowed to use NodeRefs due to recursive locking + #[instrument(level = "trace", skip(self), ret)] + pub(crate) fn get_node_contact_method( + &self, + target_node_ref: NodeRef, + ) -> EyreResult { + let routing_table = self.routing_table(); + + // Figure out the best routing domain to get the contact method over + let routing_domain = match target_node_ref.best_routing_domain() { + Some(rd) => rd, + None => { + log_net!("no routing domain for node {:?}", target_node_ref); + return Ok(NodeContactMethod::Unreachable); + } + }; + + // Node A is our own node + let node_a = routing_table.get_own_node_info(routing_domain); + let node_a_id = routing_table.node_id(); + + // Node B is the target node + let node_b = match target_node_ref.node_info(routing_domain) { + Some(ni) => ni, + None => { + log_net!("no node info for node {:?}", target_node_ref); + return Ok(NodeContactMethod::Unreachable); + } + }; + let node_b_id = target_node_ref.node_id(); + + // Dial info filter comes from the target node ref + let dial_info_filter = target_node_ref.dial_info_filter(); + let reliable = target_node_ref.reliable(); + + let cm = routing_table.get_contact_method( + routing_domain, + &node_a_id, + &node_a, + &node_b_id, + &node_b, + dial_info_filter, + reliable, + ); + + // Translate the raw contact method to a referenced contact method + Ok(match cm { + ContactMethod::Unreachable => NodeContactMethod::Unreachable, + ContactMethod::Existing => NodeContactMethod::Existing, + ContactMethod::Direct(di) => NodeContactMethod::Direct(di), + ContactMethod::SignalReverse(relay_key, target_key) => { + let relay_nr = routing_table + .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter) + .ok_or_else(|| eyre!("couldn't look up relay"))?; + if target_node_ref.node_id() != target_key { + bail!("target noderef didn't match target key"); + } + NodeContactMethod::SignalReverse(relay_nr, target_node_ref) + } + ContactMethod::SignalHolePunch(relay_key, target_key) => { + let relay_nr = routing_table + .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter) + .ok_or_else(|| eyre!("couldn't look up relay"))?; + if target_node_ref.node_id() != target_key { + bail!("target noderef didn't match target key"); + } + NodeContactMethod::SignalHolePunch(relay_nr, target_node_ref) + } + ContactMethod::InboundRelay(relay_key) => { + let relay_nr = routing_table + .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter) + .ok_or_else(|| eyre!("couldn't look up relay"))?; + NodeContactMethod::InboundRelay(relay_nr) + } + ContactMethod::OutboundRelay(relay_key) => { + let relay_nr = routing_table + .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter) + .ok_or_else(|| eyre!("couldn't look up relay"))?; + NodeContactMethod::OutboundRelay(relay_nr) + } + }) + } + + /// Send raw data to a node + /// + /// We may not have dial info for a node, but have an existing connection for it + /// because an inbound connection happened first, and no FindNodeQ has happened to that + /// node yet to discover its dial info. The existing connection should be tried first + /// in this case. + /// + /// Sending to a node requires determining a NetworkClass compatible mechanism pub fn send_data( &self, node_ref: NodeRef, @@ -1201,9 +1286,7 @@ impl NetworkManager { // info!("{}", "no existing connection".red()); // If we don't have last_connection, try to reach out to the peer via its dial info - let contact_method = this - .routing_table() - .get_node_contact_method(node_ref.clone())?; + let contact_method = this.get_node_contact_method(node_ref.clone())?; log_net!( "send_data via {:?} to dialinfo {:?}", contact_method, diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index cb60d8ec..fd563f7c 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -286,20 +286,10 @@ impl BucketEntryInner { self.last_connections.clear(); } - // Gets the 'last connection' that matches a specific connection key - // pub(super) fn last_connection( - // &self, - // protocol_type: ProtocolType, - // address_type: AddressType, - // ) -> Option<(ConnectionDescriptor, u64)> { - // let key = LastConnectionKey(protocol_type, address_type); - // self.last_connections.get(&key).cloned() - // } - // Gets all the 'last connections' that match a particular filter pub(super) fn last_connections( &self, - routing_table_inner: &RoutingTableInner, + rti: &RoutingTableInner, filter: Option, ) -> Vec<(ConnectionDescriptor, u64)> { let mut out: Vec<(ConnectionDescriptor, u64)> = self @@ -308,10 +298,7 @@ impl BucketEntryInner { .filter_map(|(k, v)| { let include = if let Some(filter) = &filter { let remote_address = v.0.remote_address().address(); - if let Some(routing_domain) = RoutingTable::routing_domain_for_address_inner( - routing_table_inner, - remote_address, - ) { + if let Some(routing_domain) = rti.routing_domain_for_address(remote_address) { if filter.routing_domain_set.contains(routing_domain) && filter.dial_info_filter.protocol_type_set.contains(k.0) && filter.dial_info_filter.address_type_set.contains(k.1) diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs deleted file mode 100644 index 952c4b14..00000000 --- a/veilid-core/src/routing_table/find_nodes.rs +++ /dev/null @@ -1,647 +0,0 @@ -use super::*; - -use crate::dht::*; -use crate::xx::*; -use crate::*; - -pub type LowLevelProtocolPorts = BTreeSet<(LowLevelProtocolType, AddressType, u16)>; -pub type ProtocolToPortMapping = BTreeMap<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>; -#[derive(Clone, Debug)] -pub struct LowLevelPortInfo { - pub low_level_protocol_ports: LowLevelProtocolPorts, - pub protocol_to_port: ProtocolToPortMapping, -} - -impl RoutingTable { - // Makes a filter that finds nodes with a matching inbound dialinfo - pub fn make_inbound_dial_info_entry_filter( - routing_domain: RoutingDomain, - dial_info_filter: DialInfoFilter, - ) -> impl FnMut(&RoutingTableInner, &BucketEntryInner) -> bool { - // does it have matching public dial info? - move |_rti, e| { - if let Some(ni) = e.node_info(routing_domain) { - if ni - .first_filtered_dial_info_detail(DialInfoDetail::NO_SORT, |did| { - did.matches_filter(&dial_info_filter) - }) - .is_some() - { - return true; - } - } - false - } - } - - // Makes a filter that finds nodes capable of dialing a particular outbound dialinfo - pub fn make_outbound_dial_info_entry_filter<'s>( - routing_domain: RoutingDomain, - dial_info: DialInfo, - ) -> impl FnMut(&RoutingTableInner, &'s BucketEntryInner) -> bool { - // does the node's outbound capabilities match the dialinfo? - move |_rti, e| { - if let Some(ni) = e.node_info(routing_domain) { - let dif = DialInfoFilter::all() - .with_protocol_type_set(ni.outbound_protocols) - .with_address_type_set(ni.address_types); - if dial_info.matches_filter(&dif) { - return true; - } - } - false - } - } - - // Make a filter that wraps another filter - pub fn combine_entry_filters<'a, 'b, F, G>( - mut f1: F, - mut f2: G, - ) -> impl FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool - where - F: FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool, - G: FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool, - { - move |rti, e| { - if !f1(rti, e) { - return false; - } - if !f2(rti, e) { - return false; - } - true - } - } - - // Retrieve the fastest nodes in the routing table matching an entry filter - pub fn find_fast_public_nodes_filtered<'a, 'b, F>( - &self, - node_count: usize, - mut entry_filter: F, - ) -> Vec - where - F: FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool, - { - self.find_fastest_nodes( - // count - node_count, - // filter - |rti, _k: DHTKey, v: Option>| { - let entry = v.unwrap(); - entry.with(rti, |rti, e| { - // skip nodes on local network - if e.node_info(RoutingDomain::LocalNetwork).is_some() { - return false; - } - // skip nodes not on public internet - if e.node_info(RoutingDomain::PublicInternet).is_none() { - return false; - } - // skip nodes that dont match entry filter - entry_filter(rti, e) - }) - }, - // transform - |_rti, k: DHTKey, v: Option>| { - NodeRef::new(self.clone(), k, v.unwrap().clone(), None) - }, - ) - } - - // Retrieve up to N of each type of protocol capable nodes - pub fn find_bootstrap_nodes_filtered(&self, max_per_type: usize) -> Vec { - let protocol_types = vec![ - ProtocolType::UDP, - ProtocolType::TCP, - ProtocolType::WS, - ProtocolType::WSS, - ]; - let mut nodes_proto_v4 = vec![0usize, 0usize, 0usize, 0usize]; - let mut nodes_proto_v6 = vec![0usize, 0usize, 0usize, 0usize]; - - self.find_fastest_nodes( - // count - protocol_types.len() * 2 * max_per_type, - // filter - move |rti, _k: DHTKey, v: Option>| { - let entry = v.unwrap(); - entry.with(rti, |_rti, e| { - // skip nodes on our local network here - if e.has_node_info(RoutingDomain::LocalNetwork.into()) { - return false; - } - - // does it have some dial info we need? - let filter = |n: &NodeInfo| { - let mut keep = false; - for did in &n.dial_info_detail_list { - if matches!(did.dial_info.address_type(), AddressType::IPV4) { - for (n, protocol_type) in protocol_types.iter().enumerate() { - if nodes_proto_v4[n] < max_per_type - && did.dial_info.protocol_type() == *protocol_type - { - nodes_proto_v4[n] += 1; - keep = true; - } - } - } else if matches!(did.dial_info.address_type(), AddressType::IPV6) { - for (n, protocol_type) in protocol_types.iter().enumerate() { - if nodes_proto_v6[n] < max_per_type - && did.dial_info.protocol_type() == *protocol_type - { - nodes_proto_v6[n] += 1; - keep = true; - } - } - } - } - keep - }; - - e.node_info(RoutingDomain::PublicInternet) - .map(filter) - .unwrap_or(false) - }) - }, - // transform - |_rti, k: DHTKey, v: Option>| { - NodeRef::new(self.clone(), k, v.unwrap().clone(), None) - }, - ) - } - - pub fn filter_has_valid_signed_node_info_inner( - inner: &RoutingTableInner, - routing_domain: RoutingDomain, - has_valid_own_node_info: bool, - v: Option>, - ) -> bool { - match v { - None => has_valid_own_node_info, - Some(entry) => entry.with(inner, |_rti, e| { - e.signed_node_info(routing_domain.into()) - .map(|sni| sni.has_valid_signature()) - .unwrap_or(false) - }), - } - } - - pub fn transform_to_peer_info_inner( - inner: &RoutingTableInner, - routing_domain: RoutingDomain, - own_peer_info: PeerInfo, - k: DHTKey, - v: Option>, - ) -> PeerInfo { - match v { - None => own_peer_info, - Some(entry) => entry.with(inner, |_rti, e| { - e.make_peer_info(k, routing_domain).unwrap() - }), - } - } - - pub fn find_peers_with_sort_and_filter<'a, 'b, F, C, T, O>( - &self, - node_count: usize, - cur_ts: u64, - mut filter: F, - compare: C, - mut transform: T, - ) -> Vec - where - F: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> bool, - C: FnMut( - &'a RoutingTableInner, - &'b (DHTKey, Option>), - &'b (DHTKey, Option>), - ) -> core::cmp::Ordering, - T: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> O, - { - let inner = &*self.inner.read(); - Self::find_peers_with_sort_and_filter_inner( - inner, node_count, cur_ts, filter, compare, transform, - ) - } - - pub fn find_peers_with_sort_and_filter_inner<'a, 'b, F, C, T, O>( - inner: &RoutingTableInner, - node_count: usize, - cur_ts: u64, - mut filter: F, - compare: C, - mut transform: T, - ) -> Vec - where - F: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> bool, - C: FnMut( - &'a RoutingTableInner, - &'b (DHTKey, Option>), - &'b (DHTKey, Option>), - ) -> core::cmp::Ordering, - T: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> O, - { - // collect all the nodes for sorting - let mut nodes = - Vec::<(DHTKey, Option>)>::with_capacity(inner.bucket_entry_count + 1); - - // add our own node (only one of there with the None entry) - if filter(inner, inner.node_id, None) { - nodes.push((inner.node_id, None)); - } - - // add all nodes from buckets - Self::with_entries( - &*inner, - cur_ts, - BucketEntryState::Unreliable, - |rti, k, v| { - // Apply filter - if filter(rti, k, Some(v.clone())) { - nodes.push((k, Some(v.clone()))); - } - Option::<()>::None - }, - ); - - // sort by preference for returning nodes - nodes.sort_by(|a, b| compare(inner, a, b)); - - // return transformed vector for filtered+sorted nodes - let cnt = usize::min(node_count, nodes.len()); - let mut out = Vec::::with_capacity(cnt); - for node in nodes { - let val = transform(inner, node.0, node.1); - out.push(val); - } - - out - } - - pub fn find_fastest_nodes<'a, T, F, O>( - &self, - node_count: usize, - mut filter: F, - transform: T, - ) -> Vec - where - F: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> bool, - T: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> O, - { - let cur_ts = intf::get_timestamp(); - let out = self.find_peers_with_sort_and_filter( - node_count, - cur_ts, - // filter - |rti, k, v| { - if let Some(entry) = &v { - // always filter out dead nodes - if entry.with(rti, |_rti, e| e.state(cur_ts) == BucketEntryState::Dead) { - false - } else { - filter(rti, k, v) - } - } else { - // always filter out self peer, as it is irrelevant to the 'fastest nodes' search - false - } - }, - // sort - |rti, (a_key, a_entry), (b_key, b_entry)| { - // same nodes are always the same - if a_key == b_key { - return core::cmp::Ordering::Equal; - } - // our own node always comes last (should not happen, here for completeness) - if a_entry.is_none() { - return core::cmp::Ordering::Greater; - } - if b_entry.is_none() { - return core::cmp::Ordering::Less; - } - // reliable nodes come first - let ae = a_entry.as_ref().unwrap(); - let be = b_entry.as_ref().unwrap(); - ae.with(rti, |rti, ae| { - be.with(rti, |_rti, be| { - let ra = ae.check_reliable(cur_ts); - let rb = be.check_reliable(cur_ts); - if ra != rb { - if ra { - return core::cmp::Ordering::Less; - } else { - return core::cmp::Ordering::Greater; - } - } - - // latency is the next metric, closer nodes first - let a_latency = match ae.peer_stats().latency.as_ref() { - None => { - // treat unknown latency as slow - return core::cmp::Ordering::Greater; - } - Some(l) => l, - }; - let b_latency = match be.peer_stats().latency.as_ref() { - None => { - // treat unknown latency as slow - return core::cmp::Ordering::Less; - } - Some(l) => l, - }; - // Sort by average latency - a_latency.average.cmp(&b_latency.average) - }) - }) - }, - // transform, - transform, - ); - out - } - - pub fn find_closest_nodes<'a, F, T, O>( - &self, - node_id: DHTKey, - filter: F, - mut transform: T, - ) -> Vec - where - F: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> bool, - T: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> O, - { - let cur_ts = intf::get_timestamp(); - let node_count = { - let c = self.unlocked_inner.config.get(); - c.network.dht.max_find_node_count as usize - }; - let out = self.find_peers_with_sort_and_filter( - node_count, - cur_ts, - // filter - filter, - // sort - |rti, (a_key, a_entry), (b_key, b_entry)| { - // same nodes are always the same - if a_key == b_key { - return core::cmp::Ordering::Equal; - } - - // reliable nodes come first, pessimistically treating our own node as unreliable - let ra = a_entry - .as_ref() - .map_or(false, |x| x.with(rti, |_rti, x| x.check_reliable(cur_ts))); - let rb = b_entry - .as_ref() - .map_or(false, |x| x.with(rti, |_rti, x| x.check_reliable(cur_ts))); - if ra != rb { - if ra { - return core::cmp::Ordering::Less; - } else { - return core::cmp::Ordering::Greater; - } - } - - // distance is the next metric, closer nodes first - let da = distance(a_key, &node_id); - let db = distance(b_key, &node_id); - da.cmp(&db) - }, - // transform, - &mut transform, - ); - log_rtab!(">> find_closest_nodes: node count = {}", out.len()); - out - } - - // Build a map of protocols to low level ports - // This way we can get the set of protocols required to keep our NAT mapping alive for keepalive pings - // Only one protocol per low level protocol/port combination is required - // For example, if WS/WSS and TCP protocols are on the same low-level TCP port, only TCP keepalives will be required - // and we do not need to do WS/WSS keepalive as well. If they are on different ports, then we will need WS/WSS keepalives too. - pub fn get_low_level_port_info(&self) -> LowLevelPortInfo { - let mut low_level_protocol_ports = - BTreeSet::<(LowLevelProtocolType, AddressType, u16)>::new(); - let mut protocol_to_port = - BTreeMap::<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>::new(); - let our_dids = self.all_filtered_dial_info_details( - RoutingDomain::PublicInternet.into(), - &DialInfoFilter::all(), - ); - for did in our_dids { - low_level_protocol_ports.insert(( - did.dial_info.protocol_type().low_level_protocol_type(), - did.dial_info.address_type(), - did.dial_info.socket_address().port(), - )); - protocol_to_port.insert( - (did.dial_info.protocol_type(), did.dial_info.address_type()), - ( - did.dial_info.protocol_type().low_level_protocol_type(), - did.dial_info.socket_address().port(), - ), - ); - } - LowLevelPortInfo { - low_level_protocol_ports, - protocol_to_port, - } - } - - fn make_public_internet_relay_node_filter(&self) -> impl Fn(&BucketEntryInner) -> bool { - // Get all our outbound protocol/address types - let outbound_dif = self.get_outbound_dial_info_filter(RoutingDomain::PublicInternet); - let mapped_port_info = self.get_low_level_port_info(); - - move |e: &BucketEntryInner| { - // Ensure this node is not on the local network - if e.has_node_info(RoutingDomain::LocalNetwork.into()) { - return false; - } - - // Disqualify nodes that don't cover all our inbound ports for tcp and udp - // as we need to be able to use the relay for keepalives for all nat mappings - let mut low_level_protocol_ports = mapped_port_info.low_level_protocol_ports.clone(); - - let can_serve_as_relay = e - .node_info(RoutingDomain::PublicInternet) - .map(|n| { - let dids = n.all_filtered_dial_info_details( - Some(DialInfoDetail::reliable_sort), // By default, choose reliable protocol for relay - |did| did.matches_filter(&outbound_dif), - ); - for did in &dids { - let pt = did.dial_info.protocol_type(); - let at = did.dial_info.address_type(); - if let Some((llpt, port)) = mapped_port_info.protocol_to_port.get(&(pt, at)) - { - low_level_protocol_ports.remove(&(*llpt, at, *port)); - } - } - low_level_protocol_ports.is_empty() - }) - .unwrap_or(false); - if !can_serve_as_relay { - return false; - } - - true - } - } - - #[instrument(level = "trace", skip(self), ret)] - pub fn find_inbound_relay( - &self, - routing_domain: RoutingDomain, - cur_ts: u64, - ) -> Option { - // Get relay filter function - let relay_node_filter = match routing_domain { - RoutingDomain::PublicInternet => self.make_public_internet_relay_node_filter(), - RoutingDomain::LocalNetwork => { - unimplemented!(); - } - }; - - // Go through all entries and find fastest entry that matches filter function - let inner = self.inner.read(); - let inner = &*inner; - let mut best_inbound_relay: Option<(DHTKey, Arc)> = None; - - // Iterate all known nodes for candidates - Self::with_entries(inner, cur_ts, BucketEntryState::Unreliable, |rti, k, v| { - let v2 = v.clone(); - v.with(rti, |rti, e| { - // Ensure we have the node's status - if let Some(node_status) = e.node_status(routing_domain) { - // Ensure the node will relay - if node_status.will_relay() { - // Compare against previous candidate - if let Some(best_inbound_relay) = best_inbound_relay.as_mut() { - // Less is faster - let better = best_inbound_relay.1.with(rti, |_rti, best| { - BucketEntryInner::cmp_fastest_reliable(cur_ts, e, best) - == std::cmp::Ordering::Less - }); - // Now apply filter function and see if this node should be included - if better && relay_node_filter(e) { - *best_inbound_relay = (k, v2); - } - } else if relay_node_filter(e) { - // Always store the first candidate - best_inbound_relay = Some((k, v2)); - } - } - } - }); - // Don't end early, iterate through all entries - Option::<()>::None - }); - // Return the best inbound relay noderef - best_inbound_relay.map(|(k, e)| NodeRef::new(self.clone(), k, e, None)) - } - - #[instrument(level = "trace", skip(self), ret)] - pub fn register_find_node_answer(&self, peers: Vec) -> Vec { - let node_id = self.node_id(); - - // register nodes we'd found - let mut out = Vec::::with_capacity(peers.len()); - for p in peers { - // if our own node if is in the list then ignore it, as we don't add ourselves to our own routing table - if p.node_id.key == node_id { - continue; - } - - // node can not be its own relay - if let Some(rpi) = &p.signed_node_info.node_info.relay_peer_info { - if rpi.node_id == p.node_id { - continue; - } - } - - // register the node if it's new - if let Some(nr) = self.register_node_with_signed_node_info( - RoutingDomain::PublicInternet, - p.node_id.key, - p.signed_node_info.clone(), - false, - ) { - out.push(nr); - } - } - out - } - - #[instrument(level = "trace", skip(self), ret, err)] - pub async fn find_node( - &self, - node_ref: NodeRef, - node_id: DHTKey, - ) -> EyreResult>> { - let rpc_processor = self.rpc_processor(); - - let res = network_result_try!( - rpc_processor - .clone() - .rpc_call_find_node(Destination::direct(node_ref), node_id) - .await? - ); - - // register nodes we'd found - Ok(NetworkResult::value( - self.register_find_node_answer(res.answer), - )) - } - - #[instrument(level = "trace", skip(self), ret, err)] - pub async fn find_self(&self, node_ref: NodeRef) -> EyreResult>> { - let node_id = self.node_id(); - self.find_node(node_ref, node_id).await - } - - #[instrument(level = "trace", skip(self), ret, err)] - pub async fn find_target(&self, node_ref: NodeRef) -> EyreResult>> { - let node_id = node_ref.node_id(); - self.find_node(node_ref, node_id).await - } - - #[instrument(level = "trace", skip(self))] - pub async fn reverse_find_node(&self, node_ref: NodeRef, wide: bool) { - // Ask bootstrap node to 'find' our own node so we can get some more nodes near ourselves - // and then contact those nodes to inform -them- that we exist - - // Ask bootstrap server for nodes closest to our own node - let closest_nodes = network_result_value_or_log!(debug match self.find_self(node_ref.clone()).await { - Err(e) => { - log_rtab!(error - "find_self failed for {:?}: {:?}", - &node_ref, e - ); - return; - } - Ok(v) => v, - } => { - return; - }); - - // Ask each node near us to find us as well - if wide { - for closest_nr in closest_nodes { - network_result_value_or_log!(debug match self.find_self(closest_nr.clone()).await { - Err(e) => { - log_rtab!(error - "find_self failed for {:?}: {:?}", - &closest_nr, e - ); - continue; - } - Ok(v) => v, - } => { - // Do nothing with non-values - continue; - }); - } - } - } -} diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 2bccb4fb..efbdd077 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -1,11 +1,11 @@ mod bucket; mod bucket_entry; mod debug; -mod find_nodes; mod node_ref; mod route_spec_store; mod routing_domain_editor; mod routing_domains; +mod routing_table_inner; mod stats_accounting; mod tasks; @@ -17,43 +17,24 @@ use crate::*; use bucket::*; pub use bucket_entry::*; pub use debug::*; -pub use find_nodes::*; use hashlink::LruCache; pub use node_ref::*; pub use route_spec_store::*; pub use routing_domain_editor::*; pub use routing_domains::*; +pub use routing_table_inner::*; pub use stats_accounting::*; const RECENT_PEERS_TABLE_SIZE: usize = 64; ////////////////////////////////////////////////////////////////////////// -#[derive(Debug, Clone, Copy)] -pub struct RecentPeersEntry { - pub last_connection: ConnectionDescriptor, -} - -/// RoutingTable rwlock-internal data -struct RoutingTableInner { - /// Routing table buckets that hold entries - buckets: Vec, - /// A fast counter for the number of entries in the table, total - bucket_entry_count: usize, - /// The public internet routing domain - public_internet_routing_domain: PublicInternetRoutingDomainDetail, - /// The dial info we use on the local network - local_network_routing_domain: LocalNetworkRoutingDomainDetail, - /// Interim accounting mechanism for this node's RPC latency to any other node - self_latency_stats_accounting: LatencyStatsAccounting, - /// Interim accounting mechanism for the total bandwidth to/from this node - self_transfer_stats_accounting: TransferStatsAccounting, - /// Statistics about the total bandwidth to/from this node - self_transfer_stats: TransferStatsDownUp, - /// Peers we have recently communicated with - recent_peers: LruCache, - /// Storage for private/safety RouteSpecs - route_spec_store: RouteSpecStore, +pub type LowLevelProtocolPorts = BTreeSet<(LowLevelProtocolType, AddressType, u16)>; +pub type ProtocolToPortMapping = BTreeMap<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>; +#[derive(Clone, Debug)] +pub struct LowLevelPortInfo { + pub low_level_protocol_ports: LowLevelProtocolPorts, + pub protocol_to_port: ProtocolToPortMapping, } #[derive(Clone, Debug, Default)] @@ -90,19 +71,6 @@ pub struct RoutingTable { } impl RoutingTable { - fn new_inner(config: VeilidConfig) -> RoutingTableInner { - RoutingTableInner { - buckets: Vec::new(), - public_internet_routing_domain: PublicInternetRoutingDomainDetail::default(), - local_network_routing_domain: LocalNetworkRoutingDomainDetail::default(), - bucket_entry_count: 0, - self_latency_stats_accounting: LatencyStatsAccounting::new(), - self_transfer_stats_accounting: TransferStatsAccounting::new(), - self_transfer_stats: TransferStatsDownUp::default(), - recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE), - route_spec_store: RouteSpecStore::new(config), - } - } fn new_unlocked_inner( config: VeilidConfig, network_manager: NetworkManager, @@ -120,10 +88,13 @@ impl RoutingTable { } pub fn new(network_manager: NetworkManager) -> Self { let config = network_manager.config(); + let unlocked_inner = Arc::new(Self::new_unlocked_inner(config, network_manager)); + let inner = Arc::new(RwLock::new(RoutingTableInner::new(unlocked_inner.clone()))); let this = Self { - inner: Arc::new(RwLock::new(Self::new_inner(config.clone()))), - unlocked_inner: Arc::new(Self::new_unlocked_inner(config, network_manager)), + inner, + unlocked_inner, }; + // Set rolling transfers tick task { let this2 = this.clone(); @@ -174,82 +145,76 @@ impl RoutingTable { self.unlocked_inner.node_id_secret } - fn routing_domain_for_address_inner( - inner: &RoutingTableInner, - address: Address, - ) -> Option { - for rd in RoutingDomain::all() { - let can_contain = - Self::with_routing_domain(inner, rd, |rdd| rdd.can_contain_address(address)); - if can_contain { - return Some(rd); - } - } - None + ///////////////////////////////////// + /// Initialization + + /// Called to initialize the routing table after it is created + pub async fn init(&self) -> EyreResult<()> { + let mut inner = self.inner.write(); + inner.init(self.clone()); + Ok(()) } + /// Called to shut down the routing table + pub async fn terminate(&self) { + debug!("starting routing table terminate"); + + // Cancel all tasks being ticked + debug!("stopping rolling transfers task"); + if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await { + error!("rolling_transfers_task not stopped: {}", e); + } + debug!("stopping kick buckets task"); + if let Err(e) = self.unlocked_inner.kick_buckets_task.stop().await { + error!("kick_buckets_task not stopped: {}", e); + } + + let mut inner = self.inner.write(); + inner.terminate(); + *inner = RoutingTableInner::new(self.unlocked_inner.clone()); + + debug!("finished routing table terminate"); + } + + /// Set up the local network routing domain with our local routing table configuration + pub fn configure_local_network_routing_domain(&self, local_networks: Vec<(IpAddr, IpAddr)>) { + log_net!(debug "configure_local_network_routing_domain: {:#?}", local_networks); + self.inner + .write() + .configure_local_network_routing_domain(local_networks); + } + + ///////////////////////////////////// + /// Locked operations + pub fn routing_domain_for_address(&self, address: Address) -> Option { - let inner = self.inner.read(); - Self::routing_domain_for_address_inner(&*inner, address) - } - - fn with_routing_domain(inner: &RoutingTableInner, domain: RoutingDomain, f: F) -> R - where - F: FnOnce(&dyn RoutingDomainDetail) -> R, - { - match domain { - RoutingDomain::PublicInternet => f(&inner.public_internet_routing_domain), - RoutingDomain::LocalNetwork => f(&inner.local_network_routing_domain), - } - } - - fn with_routing_domain_mut( - inner: &mut RoutingTableInner, - domain: RoutingDomain, - f: F, - ) -> R - where - F: FnOnce(&mut dyn RoutingDomainDetail) -> R, - { - match domain { - RoutingDomain::PublicInternet => f(&mut inner.public_internet_routing_domain), - RoutingDomain::LocalNetwork => f(&mut inner.local_network_routing_domain), - } + self.inner.read().routing_domain_for_address(address) } pub fn with_route_spec_store_mut(&self, f: F) -> R where F: FnOnce(&mut RouteSpecStore, &mut RoutingTableInner) -> R, { - let inner = &mut *self.inner.write(); - f(&mut inner.route_spec_store, inner) + self.inner.write().with_route_spec_store_mut(f) } pub fn with_route_spec_store(&self, f: F) -> R where F: FnOnce(&RouteSpecStore, &RoutingTableInner) -> R, { - let inner = &*self.inner.read(); - f(&inner.route_spec_store, inner) + self.inner.read().with_route_spec_store(f) } pub fn relay_node(&self, domain: RoutingDomain) -> Option { - let inner = self.inner.read(); - Self::with_routing_domain(&*inner, domain, |rd| rd.common().relay_node()) + self.inner.read().relay_node(domain) } pub fn has_dial_info(&self, domain: RoutingDomain) -> bool { - let inner = self.inner.read(); - Self::with_routing_domain(&*inner, domain, |rd| { - !rd.common().dial_info_details().is_empty() - }) + self.inner.read().has_dial_info(domain) } pub fn dial_info_details(&self, domain: RoutingDomain) -> Vec { - let inner = self.inner.read(); - Self::with_routing_domain(&*inner, domain, |rd| { - rd.common().dial_info_details().clone() - }) + self.inner.read().dial_info_details(domain) } pub fn first_filtered_dial_info_detail( @@ -257,21 +222,9 @@ impl RoutingTable { routing_domain_set: RoutingDomainSet, filter: &DialInfoFilter, ) -> Option { - let inner = self.inner.read(); - for routing_domain in routing_domain_set { - let did = Self::with_routing_domain(&*inner, routing_domain, |rd| { - for did in rd.common().dial_info_details() { - if did.matches_filter(filter) { - return Some(did.clone()); - } - } - None - }); - if did.is_some() { - return did; - } - } - None + self.inner + .read() + .first_filtered_dial_info_detail(routing_domain_set, filter) } pub fn all_filtered_dial_info_details( @@ -279,39 +232,15 @@ impl RoutingTable { routing_domain_set: RoutingDomainSet, filter: &DialInfoFilter, ) -> Vec { - let inner = self.inner.read(); - let mut ret = Vec::new(); - for routing_domain in routing_domain_set { - Self::with_routing_domain(&*inner, routing_domain, |rd| { - for did in rd.common().dial_info_details() { - if did.matches_filter(filter) { - ret.push(did.clone()); - } - } - }); - } - ret.remove_duplicates(); - ret + self.inner + .read() + .all_filtered_dial_info_details(routing_domain_set, filter) } pub fn ensure_dial_info_is_valid(&self, domain: RoutingDomain, dial_info: &DialInfo) -> bool { - let address = dial_info.socket_address().address(); - let inner = self.inner.read(); - let can_contain_address = - Self::with_routing_domain(&*inner, domain, |rd| rd.can_contain_address(address)); - - if !can_contain_address { - log_rtab!(debug "can not add dial info to this routing domain"); - return false; - } - if !dial_info.is_valid() { - log_rtab!(debug - "shouldn't be registering invalid addresses: {:?}", - dial_info - ); - return false; - } - true + self.inner + .read() + .ensure_dial_info_is_valid(domain, dial_info) } pub fn node_info_is_valid_in_routing_domain( @@ -319,48 +248,9 @@ impl RoutingTable { routing_domain: RoutingDomain, node_info: &NodeInfo, ) -> bool { - // Should not be passing around nodeinfo with an invalid network class - if matches!(node_info.network_class, NetworkClass::Invalid) { - return false; - } - // Ensure all of the dial info works in this routing domain - for did in &node_info.dial_info_detail_list { - if !self.ensure_dial_info_is_valid(routing_domain, &did.dial_info) { - return false; - } - } - // Ensure the relay is also valid in this routing domain if it is provided - if let Some(relay_peer_info) = node_info.relay_peer_info.as_ref() { - let relay_ni = &relay_peer_info.signed_node_info.node_info; - if !self.node_info_is_valid_in_routing_domain(routing_domain, relay_ni) { - return false; - } - } - true - } - - #[instrument(level = "trace", skip(inner), ret)] - fn get_contact_method_inner( - inner: &RoutingTableInner, - routing_domain: RoutingDomain, - node_a_id: &DHTKey, - node_a: &NodeInfo, - node_b_id: &DHTKey, - node_b: &NodeInfo, - dial_info_filter: DialInfoFilter, - reliable: bool, - ) -> ContactMethod { - Self::with_routing_domain(inner, routing_domain, |rdd| { - rdd.get_contact_method( - inner, - node_a_id, - node_a, - node_b_id, - node_b, - dial_info_filter, - reliable, - ) - }) + self.inner + .read() + .node_info_is_valid_in_routing_domain(routing_domain, node_info) } /// Look up the best way for two nodes to reach each other over a specific routing domain @@ -375,9 +265,7 @@ impl RoutingTable { dial_info_filter: DialInfoFilter, reliable: bool, ) -> ContactMethod { - let inner = &*self.inner.read(); - Self::get_contact_method_inner( - inner, + self.inner.read().get_contact_method( routing_domain, node_a_id, node_a, @@ -388,307 +276,72 @@ impl RoutingTable { ) } - // Figure out how to reach a node from our own node over the best routing domain and reference the nodes we want to access - #[instrument(level = "trace", skip(self), ret)] - pub(crate) fn get_node_contact_method( - &self, - target_node_ref: NodeRef, - ) -> EyreResult { - // Lock the routing table for read to ensure the table doesn't change - let inner = &*self.inner.read(); - - // Figure out the best routing domain to get the contact method over - let routing_domain = match target_node_ref.best_routing_domain() { - Some(rd) => rd, - None => { - log_net!("no routing domain for node {:?}", target_node_ref); - return Ok(NodeContactMethod::Unreachable); - } - }; - - // Node A is our own node - let node_a = get_own_node_info_inner(inner, routing_domain); - let node_a_id = self.node_id(); - - // Node B is the target node - let node_b = target_node_ref.xxx operate(|_rti, e| e.node_info(routing_domain).unwrap()); - let node_b_id = target_node_ref.node_id(); - - // Dial info filter comes from the target node ref - let dial_info_filter = target_node_ref.dial_info_filter(); - let reliable = target_node_ref.reliable(); - - let cm = self.get_contact_method( - routing_domain, - &node_a_id, - &node_a, - &node_b_id, - node_b, - dial_info_filter, - reliable, - ); - - // Translate the raw contact method to a referenced contact method - Ok(match cm { - ContactMethod::Unreachable => NodeContactMethod::Unreachable, - ContactMethod::Existing => NodeContactMethod::Existing, - ContactMethod::Direct(di) => NodeContactMethod::Direct(di), - ContactMethod::SignalReverse(relay_key, target_key) => { - let relay_nr = Self::lookup_and_filter_noderef_inner(inner, self.clone(), relay_key, routing_domain.into(), dial_info_filter) - .ok_or_else(|| eyre!("couldn't look up relay"))?; - if target_node_ref.node_id() != target_key { - bail!("target noderef didn't match target key"); - } - NodeContactMethod::SignalReverse(relay_nr, target_node_ref) - } - ContactMethod::SignalHolePunch(relay_key, target_key) => { - let relay_nr = Self::lookup_and_filter_noderef_inner(inner, self.clone(), relay_key, routing_domain.into(), dial_info_filter) - .ok_or_else(|| eyre!("couldn't look up relay"))?; - if target_node_ref.node_id() != target_key { - bail!("target noderef didn't match target key"); - } - NodeContactMethod::SignalHolePunch(relay_nr, target_node_ref) - } - ContactMethod::InboundRelay(relay_key) => { - let relay_nr = Self::lookup_and_filter_noderef_nner(inner, self.clone(), relay_key, routing_domain.into(), dial_info_filter) - .ok_or_else(|| eyre!("couldn't look up relay"))?; - NodeContactMethod::InboundRelay(relay_nr) - } - ContactMethod::OutboundRelay(relay_key) => { - let relay_nr = Self::lookup_and_filter_noderef(inner, self.clone(), relay_key, routing_domain.into(), dial_info_filter) - .ok_or_else(|| eyre!("couldn't look up relay"))?; - NodeContactMethod::OutboundRelay(relay_nr) - } - }) - } - #[instrument(level = "debug", skip(self))] pub fn edit_routing_domain(&self, domain: RoutingDomain) -> RoutingDomainEditor { RoutingDomainEditor::new(self.clone(), domain) } - fn reset_all_seen_our_node_info(inner: &mut RoutingTableInner, routing_domain: RoutingDomain) { - let cur_ts = intf::get_timestamp(); - Self::with_entries_mut(inner, cur_ts, BucketEntryState::Dead, |rti, _, v| { - v.with_mut(rti, |_rti, e| { - e.set_seen_our_node_info(routing_domain, false); - }); - Option::<()>::None - }); - } - - fn reset_all_updated_since_last_network_change(inner: &mut RoutingTableInner) { - let cur_ts = intf::get_timestamp(); - Self::with_entries_mut(inner, cur_ts, BucketEntryState::Dead, |rti, _, v| { - v.with_mut(rti, |_rti, e| { - e.set_updated_since_last_network_change(false) - }); - Option::<()>::None - }); - } - /// Return a copy of our node's peerinfo pub fn get_own_peer_info(&self, routing_domain: RoutingDomain) -> PeerInfo { - let inner = &*self.inner.read(); - Self::with_routing_domain(inner, routing_domain, |rdd| { - rdd.common().with_peer_info(|pi| pi.clone()) - }) + self.inner.read().get_own_peer_info(routing_domain) } /// Return a copy of our node's signednodeinfo pub fn get_own_signed_node_info(&self, routing_domain: RoutingDomain) -> SignedNodeInfo { - let inner = &*self.inner.read(); - Self::with_routing_domain(inner, routing_domain, |rdd| { - rdd.common() - .with_peer_info(|pi| pi.signed_node_info.clone()) - }) + self.inner.read().get_own_signed_node_info(routing_domain) } /// Return a copy of our node's nodeinfo - fn get_own_node_info_inner( - inner: &RoutingTableInner, - routing_domain: RoutingDomain, - ) -> NodeInfo { - Self::with_routing_domain(inner, routing_domain, |rdd| { - rdd.common() - .with_peer_info(|pi| pi.signed_node_info.node_info.clone()) - }) - } pub fn get_own_node_info(&self, routing_domain: RoutingDomain) -> NodeInfo { - let inner = &*self.inner.read(); - Self::get_own_node_info_inner(inner, routing_domain) + self.inner.read().get_own_node_info(routing_domain) } - /// Return our currently registered network class + /// If we have a valid network class in this routing domain, then our 'NodeInfo' is valid pub fn has_valid_own_node_info(&self, routing_domain: RoutingDomain) -> bool { - let inner = &*self.inner.read(); - Self::with_routing_domain(inner, routing_domain, |rdd| { - rdd.common().has_valid_own_node_info() - }) + self.inner.read().has_valid_own_node_info(routing_domain) } /// Return the domain's currently registered network class pub fn get_network_class(&self, routing_domain: RoutingDomain) -> Option { - let inner = &*self.inner.read(); - Self::with_routing_domain(inner, routing_domain, |rdd| rdd.common().network_class()) + self.inner.read().get_network_class(routing_domain) } /// Return the domain's filter for what we can receivein the form of a dial info filter pub fn get_inbound_dial_info_filter(&self, routing_domain: RoutingDomain) -> DialInfoFilter { - let inner = &*self.inner.read(); - Self::with_routing_domain(inner, routing_domain, |rdd| { - rdd.common().inbound_dial_info_filter() - }) + self.inner + .read() + .get_inbound_dial_info_filter(routing_domain) } /// Return the domain's filter for what we can receive in the form of a node ref filter pub fn get_inbound_node_ref_filter(&self, routing_domain: RoutingDomain) -> NodeRefFilter { - let dif = self.get_inbound_dial_info_filter(routing_domain); - NodeRefFilter::new() - .with_routing_domain(routing_domain) - .with_dial_info_filter(dif) + self.inner + .read() + .get_inbound_node_ref_filter(routing_domain) } /// Return the domain's filter for what we can send out in the form of a dial info filter pub fn get_outbound_dial_info_filter(&self, routing_domain: RoutingDomain) -> DialInfoFilter { - let inner = &*self.inner.read(); - Self::with_routing_domain(inner, routing_domain, |rdd| { - rdd.common().outbound_dial_info_filter() - }) + self.inner + .read() + .get_outbound_dial_info_filter(routing_domain) } /// Return the domain's filter for what we can receive in the form of a node ref filter pub fn get_outbound_node_ref_filter(&self, routing_domain: RoutingDomain) -> NodeRefFilter { - let dif = self.get_outbound_dial_info_filter(routing_domain); - NodeRefFilter::new() - .with_routing_domain(routing_domain) - .with_dial_info_filter(dif) - } - - fn bucket_depth(index: usize) -> usize { - match index { - 0 => 256, - 1 => 128, - 2 => 64, - 3 => 32, - 4 => 16, - 5 => 8, - 6 => 4, - 7 => 4, - 8 => 4, - 9 => 4, - _ => 4, - } - } - - pub async fn init(&self) -> EyreResult<()> { - let mut inner = self.inner.write(); - // Size the buckets (one per bit) - inner.buckets.reserve(DHT_KEY_LENGTH * 8); - for _ in 0..DHT_KEY_LENGTH * 8 { - let bucket = Bucket::new(self.clone()); - inner.buckets.push(bucket); - } - - Ok(()) - } - - pub async fn terminate(&self) { - debug!("starting routing table terminate"); - - // Cancel all tasks being ticked - debug!("stopping rolling transfers task"); - if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await { - error!("rolling_transfers_task not stopped: {}", e); - } - debug!("stopping kick buckets task"); - if let Err(e) = self.unlocked_inner.kick_buckets_task.stop().await { - error!("kick_buckets_task not stopped: {}", e); - } - - *self.inner.write() = Self::new_inner(self.unlocked_inner.config.clone()); - - debug!("finished routing table terminate"); - } - - pub fn configure_local_network_routing_domain(&self, local_networks: Vec<(IpAddr, IpAddr)>) { - log_net!(debug "configure_local_network_routing_domain: {:#?}", local_networks); - - let mut inner = self.inner.write(); - let changed = inner - .local_network_routing_domain - .set_local_networks(local_networks); - - // If the local network topology has changed, nuke the existing local node info and let new local discovery happen - if changed { - let cur_ts = intf::get_timestamp(); - Self::with_entries_mut(&mut *inner, cur_ts, BucketEntryState::Dead, |rti, _, e| { - e.with_mut(rti, |_rti, e| { - e.clear_signed_node_info(RoutingDomain::LocalNetwork); - e.set_seen_our_node_info(RoutingDomain::LocalNetwork, false); - e.set_updated_since_last_network_change(false); - }); - Option::<()>::None - }); - } + self.inner + .read() + .get_outbound_node_ref_filter(routing_domain) } /// Attempt to empty the routing table /// should only be performed when there are no node_refs (detached) pub fn purge_buckets(&self) { - let mut inner = self.inner.write(); - let inner = &mut *inner; - log_rtab!( - "Starting routing table buckets purge. Table currently has {} nodes", - inner.bucket_entry_count - ); - for bucket in &inner.buckets { - bucket.kick(inner, 0); - } - log_rtab!(debug - "Routing table buckets purge complete. Routing table now has {} nodes", - inner.bucket_entry_count - ); + self.inner.write().purge_buckets(); } /// Attempt to remove last_connections from entries pub fn purge_last_connections(&self) { - let mut inner = self.inner.write(); - let inner = &mut *inner; - log_rtab!( - "Starting routing table last_connections purge. Table currently has {} nodes", - inner.bucket_entry_count - ); - for bucket in &inner.buckets { - for entry in bucket.entries() { - entry.1.with_mut(inner, |_rti, e| { - e.clear_last_connections(); - }); - } - } - log_rtab!(debug - "Routing table last_connections purge complete. Routing table now has {} nodes", - inner.bucket_entry_count - ); - } - - /// Attempt to settle buckets and remove entries down to the desired number - /// which may not be possible due extant NodeRefs - fn kick_bucket(inner: &mut RoutingTableInner, idx: usize) { - let bucket = &mut inner.buckets[idx]; - let bucket_depth = Self::bucket_depth(idx); - - if let Some(dead_node_ids) = bucket.kick(inner, bucket_depth) { - // Remove counts - inner.bucket_entry_count -= dead_node_ids.len(); - log_rtab!(debug "Routing table now has {} nodes", inner.bucket_entry_count); - - // Now purge the routing table inner vectors - //let filter = |k: &DHTKey| dead_node_ids.contains(k); - //inner.closest_reliable_nodes.retain(filter); - //inner.fastest_reliable_nodes.retain(filter); - //inner.closest_nodes.retain(filter); - //inner.fastest_nodes.retain(filter); - } + self.inner.write().purge_last_connections(); } fn find_bucket_index(&self, node_id: DHTKey) -> usize { @@ -702,65 +355,9 @@ impl RoutingTable { routing_domain_set: RoutingDomainSet, min_state: BucketEntryState, ) -> usize { - let inner = self.inner.read(); - Self::get_entry_count_inner(&*inner, routing_domain_set, min_state) - } - - fn get_entry_count_inner( - inner: &RoutingTableInner, - routing_domain_set: RoutingDomainSet, - min_state: BucketEntryState, - ) -> usize { - let mut count = 0usize; - let cur_ts = intf::get_timestamp(); - Self::with_entries(inner, cur_ts, min_state, |rti, _, e| { - if e.with(rti, |_rti, e| e.best_routing_domain(routing_domain_set)) - .is_some() - { - count += 1; - } - Option::<()>::None - }); - count - } - - fn with_entries) -> Option>( - inner: &RoutingTableInner, - cur_ts: u64, - min_state: BucketEntryState, - mut f: F, - ) -> Option { - for bucket in &inner.buckets { - for entry in bucket.entries() { - if entry.1.with(inner, |_rti, e| e.state(cur_ts) >= min_state) { - if let Some(out) = f(inner, *entry.0, entry.1.clone()) { - return Some(out); - } - } - } - } - None - } - - fn with_entries_mut< - T, - F: FnMut(&mut RoutingTableInner, DHTKey, Arc) -> Option, - >( - inner: &mut RoutingTableInner, - cur_ts: u64, - min_state: BucketEntryState, - mut f: F, - ) -> Option { - for bucket in &inner.buckets { - for entry in bucket.entries() { - if entry.1.with(inner, |_rti, e| e.state(cur_ts) >= min_state) { - if let Some(out) = f(inner, *entry.0, entry.1.clone()) { - return Some(out); - } - } - } - } - None + self.inner + .read() + .get_entry_count(routing_domain_set, min_state) } pub fn get_nodes_needing_updates( @@ -769,26 +366,9 @@ impl RoutingTable { cur_ts: u64, all: bool, ) -> Vec { - let inner = self.inner.read(); - let mut node_refs = Vec::::with_capacity(inner.bucket_entry_count); - Self::with_entries( - &*inner, - cur_ts, - BucketEntryState::Unreliable, - |rti, k, v| { - // Only update nodes that haven't seen our node info yet - if all || !v.with(rti, |_rti, e| e.has_seen_our_node_info(routing_domain)) { - node_refs.push(NodeRef::new( - self.clone(), - k, - v, - Some(NodeRefFilter::new().with_routing_domain(routing_domain)), - )); - } - Option::<()>::None - }, - ); - node_refs + self.inner + .read() + .get_nodes_needing_updates(self.clone(), routing_domain, cur_ts, all) } pub fn get_nodes_needing_ping( @@ -796,50 +376,14 @@ impl RoutingTable { routing_domain: RoutingDomain, cur_ts: u64, ) -> Vec { - let inner = self.inner.read(); - - // Collect relay nodes - let opt_relay_id = Self::with_routing_domain(&*inner, routing_domain, |rd| { - rd.common().relay_node().map(|rn| rn.node_id()) - }); - - // Collect all entries that are 'needs_ping' and have some node info making them reachable somehow - let mut node_refs = Vec::::with_capacity(inner.bucket_entry_count); - Self::with_entries( - &*inner, - cur_ts, - BucketEntryState::Unreliable, - |rti, k, v| { - if v.with(rti, |_rti, e| { - e.has_node_info(routing_domain.into()) - && e.needs_ping(cur_ts, opt_relay_id == Some(k)) - }) { - node_refs.push(NodeRef::new( - self.clone(), - k, - v, - Some(NodeRefFilter::new().with_routing_domain(routing_domain)), - )); - } - Option::<()>::None - }, - ); - node_refs + self.inner + .read() + .get_nodes_needing_ping(self.clone(), routing_domain, cur_ts) } pub fn get_all_nodes(&self, cur_ts: u64) -> Vec { let inner = self.inner.read(); - let mut node_refs = Vec::::with_capacity(inner.bucket_entry_count); - Self::with_entries( - &*inner, - cur_ts, - BucketEntryState::Unreliable, - |_rti, k, v| { - node_refs.push(NodeRef::new(self.clone(), k, v, None)); - Option::<()>::None - }, - ); - node_refs + inner.get_all_nodes(self.clone(), cur_ts) } fn queue_bucket_kick(&self, node_id: DHTKey) { @@ -854,77 +398,14 @@ impl RoutingTable { where F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner), { - // Ensure someone isn't trying register this node itself - if node_id == self.node_id() { - log_rtab!(debug "can't register own node"); - return None; - } - - // Lock this entire operation - let mut inner = self.inner.write(); - let inner = &mut *inner; - - // Look up existing entry - let idx = self.find_bucket_index(node_id); - let noderef = { - let bucket = &inner.buckets[idx]; - let entry = bucket.entry(&node_id); - entry.map(|e| NodeRef::new(self.clone(), node_id, e, None)) - }; - - // If one doesn't exist, insert into bucket, possibly evicting a bucket member - let noderef = match noderef { - None => { - // Make new entry - inner.bucket_entry_count += 1; - let cnt = inner.bucket_entry_count; - let bucket = &mut inner.buckets[idx]; - let nr = bucket.add_entry(node_id); - - // Update the entry - let entry = bucket.entry(&node_id).unwrap(); - entry.with_mut(inner, update_func); - - // Kick the bucket - self.unlocked_inner.kick_queue.lock().insert(idx); - log_rtab!(debug "Routing table now has {} nodes, {} live", cnt, Self::get_entry_count_inner(&mut *inner, RoutingDomainSet::all(), BucketEntryState::Unreliable)); - - nr - } - Some(nr) => { - // Update the entry - let bucket = &mut inner.buckets[idx]; - let entry = bucket.entry(&node_id).unwrap(); - entry.with_mut(inner, update_func); - - nr - } - }; - - Some(noderef) + self.inner + .write() + .create_node_ref(self.clone(), node_id, update_func) } /// Resolve an existing routing table entry and return a reference to it - fn lookup_node_ref_inner(inner: &RoutingTableInner, routing_table: RoutingTable, node_id: DHTKey) -> Option { - { - let idx = routing_table.find_bucket_index(node_id); - let bucket = &inner.buckets[idx]; - bucket - .entry(&node_id) - .map(|e| NodeRef::new(routing_table, node_id, e, None)) - } - pub fn lookup_node_ref(&self, node_id: DHTKey) -> Option { - if node_id == self.unlocked_inner.node_id { - log_rtab!(debug "can't look up own node id in routing table"); - return None; - } - let idx = self.find_bucket_index(node_id); - let inner = self.inner.read(); - let bucket = &inner.buckets[idx]; - bucket - .entry(&node_id) - .map(|e| NodeRef::new(self.clone(), node_id, e, None)) + self.inner.read().lookup_node_ref(self.clone(), node_id) } /// Resolve an existing routing table entry and return a filtered reference to it @@ -934,13 +415,11 @@ impl RoutingTable { routing_domain_set: RoutingDomainSet, dial_info_filter: DialInfoFilter, ) -> Option { - let nr = self.lookup_node_ref(node_id)?; - Some( - nr.filtered_clone( - NodeRefFilter::new() - .with_dial_info_filter(dial_info_filter) - .with_routing_domain_set(routing_domain_set), - ), + self.inner.read().lookup_and_filter_noderef( + self.clone(), + node_id, + routing_domain_set, + dial_info_filter, ) } @@ -954,43 +433,13 @@ impl RoutingTable { signed_node_info: SignedNodeInfo, allow_invalid: bool, ) -> Option { - //log_rtab!("register_node_with_signed_node_info: routing_domain: {:?}, node_id: {:?}, signed_node_info: {:?}, allow_invalid: {:?}", routing_domain, node_id, signed_node_info, allow_invalid ); - - // validate signed node info is not something malicious - if node_id == self.node_id() { - log_rtab!(debug "can't register own node id in routing table"); - return None; - } - if let Some(rpi) = &signed_node_info.node_info.relay_peer_info { - if rpi.node_id.key == node_id { - log_rtab!(debug "node can not be its own relay"); - return None; - } - } - if !allow_invalid { - // verify signature - if !signed_node_info.has_valid_signature() { - log_rtab!(debug "signed node info for {} has invalid signature", node_id); - return None; - } - // verify signed node info is valid in this routing domain - if !self - .node_info_is_valid_in_routing_domain(routing_domain, &signed_node_info.node_info) - { - log_rtab!(debug "signed node info for {} not valid in the {:?} routing domain", node_id, routing_domain); - return None; - } - } - - self.create_node_ref(node_id, |_rti, e| { - e.update_signed_node_info(routing_domain, signed_node_info); - }) - .map(|mut nr| { - nr.set_filter(Some( - NodeRefFilter::new().with_routing_domain(routing_domain), - )); - nr - }) + self.inner.write().register_node_with_signed_node_info( + self.clone(), + routing_domain, + node_id, + signed_node_info, + allow_invalid, + ) } /// Shortcut function to add a node to our routing table if it doesn't exist @@ -1001,15 +450,12 @@ impl RoutingTable { descriptor: ConnectionDescriptor, timestamp: u64, ) -> Option { - let out = self.create_node_ref(node_id, |_rti, e| { - // this node is live because it literally just connected to us - e.touch_last_seen(timestamp); - }); - if let Some(nr) = &out { - // set the most recent node address for connection finding and udp replies - nr.set_last_connection(descriptor, timestamp); - } - out + self.inner.write().register_node_with_existing_connection( + self.clone(), + node_id, + descriptor, + timestamp, + ) } /// Ticks about once per second @@ -1031,71 +477,436 @@ impl RoutingTable { // Routing Table Health Metrics pub fn get_routing_table_health(&self) -> RoutingTableHealth { - let mut health = RoutingTableHealth::default(); - let cur_ts = intf::get_timestamp(); - let inner = self.inner.read(); - let inner = &*inner; - for bucket in &inner.buckets { - for (_, v) in bucket.entries() { - match v.with(inner, |_rti, e| e.state(cur_ts)) { - BucketEntryState::Reliable => { - health.reliable_entry_count += 1; - } - BucketEntryState::Unreliable => { - health.unreliable_entry_count += 1; - } - BucketEntryState::Dead => { - health.dead_entry_count += 1; - } - } - } - } - health + self.inner.read().get_routing_table_health() } pub fn get_recent_peers(&self) -> Vec<(DHTKey, RecentPeersEntry)> { - let mut recent_peers = Vec::new(); - let mut dead_peers = Vec::new(); - let mut out = Vec::new(); - - // collect all recent peers - { - let inner = self.inner.read(); - for (k, _v) in &inner.recent_peers { - recent_peers.push(*k); - } - } - - // look up each node and make sure the connection is still live - // (uses same logic as send_data, ensuring last_connection works for UDP) - for e in &recent_peers { - let mut dead = true; - if let Some(nr) = self.lookup_node_ref(*e) { - if let Some(last_connection) = nr.last_connection() { - out.push((*e, RecentPeersEntry { last_connection })); - dead = false; - } - } - if dead { - dead_peers.push(e); - } - } - - // purge dead recent peers - { - let mut inner = self.inner.write(); - for d in dead_peers { - inner.recent_peers.remove(d); - } - } - - out + self.inner.write().get_recent_peers(self.clone()) } pub fn touch_recent_peer(&self, node_id: DHTKey, last_connection: ConnectionDescriptor) { - let mut inner = self.inner.write(); - inner - .recent_peers - .insert(node_id, RecentPeersEntry { last_connection }); + self.inner + .write() + .touch_recent_peer(node_id, last_connection) + } + + ////////////////////////////////////////////////////////////////////// + // Find Nodes + + /// Build a map of protocols to low level ports + /// This way we can get the set of protocols required to keep our NAT mapping alive for keepalive pings + /// Only one protocol per low level protocol/port combination is required + /// For example, if WS/WSS and TCP protocols are on the same low-level TCP port, only TCP keepalives will be required + /// and we do not need to do WS/WSS keepalive as well. If they are on different ports, then we will need WS/WSS keepalives too. + pub fn get_low_level_port_info(&self) -> LowLevelPortInfo { + let mut low_level_protocol_ports = + BTreeSet::<(LowLevelProtocolType, AddressType, u16)>::new(); + let mut protocol_to_port = + BTreeMap::<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>::new(); + let our_dids = self.all_filtered_dial_info_details( + RoutingDomain::PublicInternet.into(), + &DialInfoFilter::all(), + ); + for did in our_dids { + low_level_protocol_ports.insert(( + did.dial_info.protocol_type().low_level_protocol_type(), + did.dial_info.address_type(), + did.dial_info.socket_address().port(), + )); + protocol_to_port.insert( + (did.dial_info.protocol_type(), did.dial_info.address_type()), + ( + did.dial_info.protocol_type().low_level_protocol_type(), + did.dial_info.socket_address().port(), + ), + ); + } + LowLevelPortInfo { + low_level_protocol_ports, + protocol_to_port, + } + } + + /// Makes a filter that finds nodes with a matching inbound dialinfo + pub fn make_inbound_dial_info_entry_filter( + routing_domain: RoutingDomain, + dial_info_filter: DialInfoFilter, + ) -> impl FnMut(&RoutingTableInner, &BucketEntryInner) -> bool { + // does it have matching public dial info? + move |_rti, e| { + if let Some(ni) = e.node_info(routing_domain) { + if ni + .first_filtered_dial_info_detail(DialInfoDetail::NO_SORT, |did| { + did.matches_filter(&dial_info_filter) + }) + .is_some() + { + return true; + } + } + false + } + } + + /// Makes a filter that finds nodes capable of dialing a particular outbound dialinfo + pub fn make_outbound_dial_info_entry_filter<'s>( + routing_domain: RoutingDomain, + dial_info: DialInfo, + ) -> impl FnMut(&RoutingTableInner, &'s BucketEntryInner) -> bool { + // does the node's outbound capabilities match the dialinfo? + move |_rti, e| { + if let Some(ni) = e.node_info(routing_domain) { + let dif = DialInfoFilter::all() + .with_protocol_type_set(ni.outbound_protocols) + .with_address_type_set(ni.address_types); + if dial_info.matches_filter(&dif) { + return true; + } + } + false + } + } + + /// Make a filter that wraps another filter + pub fn combine_entry_filters<'a, 'b, F, G>( + mut f1: F, + mut f2: G, + ) -> impl FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool + where + F: FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool, + G: FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool, + { + move |rti, e| { + if !f1(rti, e) { + return false; + } + if !f2(rti, e) { + return false; + } + true + } + } + + pub fn find_fast_public_nodes_filtered<'a, 'b, F>( + &self, + node_count: usize, + mut entry_filter: F, + ) -> Vec + where + F: FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool, + { + self.inner + .read() + .find_fast_public_nodes_filtered(self.clone(), node_count, entry_filter) + } + + /// Retrieve up to N of each type of protocol capable nodes + pub fn find_bootstrap_nodes_filtered(&self, max_per_type: usize) -> Vec { + let protocol_types = vec![ + ProtocolType::UDP, + ProtocolType::TCP, + ProtocolType::WS, + ProtocolType::WSS, + ]; + let mut nodes_proto_v4 = vec![0usize, 0usize, 0usize, 0usize]; + let mut nodes_proto_v6 = vec![0usize, 0usize, 0usize, 0usize]; + + self.find_fastest_nodes( + // count + protocol_types.len() * 2 * max_per_type, + // filter + move |rti, _k: DHTKey, v: Option>| { + let entry = v.unwrap(); + entry.with(rti, |_rti, e| { + // skip nodes on our local network here + if e.has_node_info(RoutingDomain::LocalNetwork.into()) { + return false; + } + + // does it have some dial info we need? + let filter = |n: &NodeInfo| { + let mut keep = false; + for did in &n.dial_info_detail_list { + if matches!(did.dial_info.address_type(), AddressType::IPV4) { + for (n, protocol_type) in protocol_types.iter().enumerate() { + if nodes_proto_v4[n] < max_per_type + && did.dial_info.protocol_type() == *protocol_type + { + nodes_proto_v4[n] += 1; + keep = true; + } + } + } else if matches!(did.dial_info.address_type(), AddressType::IPV6) { + for (n, protocol_type) in protocol_types.iter().enumerate() { + if nodes_proto_v6[n] < max_per_type + && did.dial_info.protocol_type() == *protocol_type + { + nodes_proto_v6[n] += 1; + keep = true; + } + } + } + } + keep + }; + + e.node_info(RoutingDomain::PublicInternet) + .map(filter) + .unwrap_or(false) + }) + }, + // transform + |_rti, k: DHTKey, v: Option>| { + NodeRef::new(self.clone(), k, v.unwrap().clone(), None) + }, + ) + } + + pub fn find_peers_with_sort_and_filter<'a, 'b, F, C, T, O>( + &self, + node_count: usize, + cur_ts: u64, + mut filter: F, + compare: C, + mut transform: T, + ) -> Vec + where + F: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> bool, + C: FnMut( + &'a RoutingTableInner, + &'b (DHTKey, Option>), + &'b (DHTKey, Option>), + ) -> core::cmp::Ordering, + T: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> O, + { + self.inner + .read() + .find_peers_with_sort_and_filter(node_count, cur_ts, filter, compare, transform) + } + + pub fn find_fastest_nodes<'a, T, F, O>( + &self, + node_count: usize, + mut filter: F, + transform: T, + ) -> Vec + where + F: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> bool, + T: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> O, + { + self.inner + .read() + .find_fastest_nodes(node_count, filter, transform) + } + + pub fn find_closest_nodes<'a, F, T, O>( + &self, + node_id: DHTKey, + filter: F, + mut transform: T, + ) -> Vec + where + F: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> bool, + T: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> O, + { + self.inner + .read() + .find_closest_nodes(node_id, filter, transform) + } + #[instrument(level = "trace", skip(self), ret)] + pub fn register_find_node_answer(&self, peers: Vec) -> Vec { + let node_id = self.node_id(); + + // register nodes we'd found + let mut out = Vec::::with_capacity(peers.len()); + for p in peers { + // if our own node if is in the list then ignore it, as we don't add ourselves to our own routing table + if p.node_id.key == node_id { + continue; + } + + // node can not be its own relay + if let Some(rpi) = &p.signed_node_info.node_info.relay_peer_info { + if rpi.node_id == p.node_id { + continue; + } + } + + // register the node if it's new + if let Some(nr) = self.register_node_with_signed_node_info( + RoutingDomain::PublicInternet, + p.node_id.key, + p.signed_node_info.clone(), + false, + ) { + out.push(nr); + } + } + out + } + + #[instrument(level = "trace", skip(self), ret, err)] + pub async fn find_node( + &self, + node_ref: NodeRef, + node_id: DHTKey, + ) -> EyreResult>> { + let rpc_processor = self.rpc_processor(); + + let res = network_result_try!( + rpc_processor + .clone() + .rpc_call_find_node(Destination::direct(node_ref), node_id) + .await? + ); + + // register nodes we'd found + Ok(NetworkResult::value( + self.register_find_node_answer(res.answer), + )) + } + + #[instrument(level = "trace", skip(self), ret, err)] + pub async fn find_self(&self, node_ref: NodeRef) -> EyreResult>> { + let node_id = self.node_id(); + self.find_node(node_ref, node_id).await + } + + #[instrument(level = "trace", skip(self), ret, err)] + pub async fn find_target(&self, node_ref: NodeRef) -> EyreResult>> { + let node_id = node_ref.node_id(); + self.find_node(node_ref, node_id).await + } + + #[instrument(level = "trace", skip(self))] + pub async fn reverse_find_node(&self, node_ref: NodeRef, wide: bool) { + // Ask bootstrap node to 'find' our own node so we can get some more nodes near ourselves + // and then contact those nodes to inform -them- that we exist + + // Ask bootstrap server for nodes closest to our own node + let closest_nodes = network_result_value_or_log!(debug match self.find_self(node_ref.clone()).await { + Err(e) => { + log_rtab!(error + "find_self failed for {:?}: {:?}", + &node_ref, e + ); + return; + } + Ok(v) => v, + } => { + return; + }); + + // Ask each node near us to find us as well + if wide { + for closest_nr in closest_nodes { + network_result_value_or_log!(debug match self.find_self(closest_nr.clone()).await { + Err(e) => { + log_rtab!(error + "find_self failed for {:?}: {:?}", + &closest_nr, e + ); + continue; + } + Ok(v) => v, + } => { + // Do nothing with non-values + continue; + }); + } + } + } + + pub fn make_public_internet_relay_node_filter(&self) -> impl Fn(&BucketEntryInner) -> bool { + // Get all our outbound protocol/address types + let outbound_dif = self.get_outbound_dial_info_filter(RoutingDomain::PublicInternet); + let mapped_port_info = self.get_low_level_port_info(); + + move |e: &BucketEntryInner| { + // Ensure this node is not on the local network + if e.has_node_info(RoutingDomain::LocalNetwork.into()) { + return false; + } + + // Disqualify nodes that don't cover all our inbound ports for tcp and udp + // as we need to be able to use the relay for keepalives for all nat mappings + let mut low_level_protocol_ports = mapped_port_info.low_level_protocol_ports.clone(); + + let can_serve_as_relay = e + .node_info(RoutingDomain::PublicInternet) + .map(|n| { + let dids = n.all_filtered_dial_info_details( + Some(DialInfoDetail::reliable_sort), // By default, choose reliable protocol for relay + |did| did.matches_filter(&outbound_dif), + ); + for did in &dids { + let pt = did.dial_info.protocol_type(); + let at = did.dial_info.address_type(); + if let Some((llpt, port)) = mapped_port_info.protocol_to_port.get(&(pt, at)) + { + low_level_protocol_ports.remove(&(*llpt, at, *port)); + } + } + low_level_protocol_ports.is_empty() + }) + .unwrap_or(false); + if !can_serve_as_relay { + return false; + } + + true + } + } + + #[instrument(level = "trace", skip(self), ret)] + pub fn find_inbound_relay( + &self, + routing_domain: RoutingDomain, + cur_ts: u64, + ) -> Option { + // Get relay filter function + let relay_node_filter = match routing_domain { + RoutingDomain::PublicInternet => self.make_public_internet_relay_node_filter(), + RoutingDomain::LocalNetwork => { + unimplemented!(); + } + }; + + // Go through all entries and find fastest entry that matches filter function + let inner = self.inner.read(); + let inner = &*inner; + let mut best_inbound_relay: Option<(DHTKey, Arc)> = None; + + // Iterate all known nodes for candidates + inner.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, k, v| { + let v2 = v.clone(); + v.with(rti, |rti, e| { + // Ensure we have the node's status + if let Some(node_status) = e.node_status(routing_domain) { + // Ensure the node will relay + if node_status.will_relay() { + // Compare against previous candidate + if let Some(best_inbound_relay) = best_inbound_relay.as_mut() { + // Less is faster + let better = best_inbound_relay.1.with(rti, |_rti, best| { + BucketEntryInner::cmp_fastest_reliable(cur_ts, e, best) + == std::cmp::Ordering::Less + }); + // Now apply filter function and see if this node should be included + if better && relay_node_filter(e) { + *best_inbound_relay = (k, v2); + } + } else if relay_node_filter(e) { + // Always store the first candidate + best_inbound_relay = Some((k, v2)); + } + } + } + }); + // Don't end early, iterate through all entries + Option::<()>::None + }); + // Return the best inbound relay noderef + best_inbound_relay.map(|(k, e)| NodeRef::new(self.clone(), k, e, None)) } } diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index d3aa3ac5..19c14f9e 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -97,7 +97,6 @@ impl NodeRef { } // Operate on entry accessors - pub(super) fn operate(&self, f: F) -> T where F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> T, @@ -217,6 +216,9 @@ impl NodeRef { pub fn make_peer_info(&self, routing_domain: RoutingDomain) -> Option { self.operate(|_rti, e| e.make_peer_info(self.node_id(), routing_domain)) } + pub fn node_info(&self, routing_domain: RoutingDomain) -> Option { + self.operate(|_rti, e| e.node_info(routing_domain).cloned()) + } pub fn signed_node_info_has_valid_signature(&self, routing_domain: RoutingDomain) -> bool { self.operate(|_rti, e| { e.signed_node_info(routing_domain) @@ -371,26 +373,26 @@ impl NodeRef { pub fn stats_question_sent(&self, ts: u64, bytes: u64, expects_answer: bool) { self.operate_mut(|rti, e| { - rti.self_transfer_stats_accounting.add_up(bytes); + rti.transfer_stats_accounting().add_up(bytes); e.question_sent(ts, bytes, expects_answer); }) } pub fn stats_question_rcvd(&self, ts: u64, bytes: u64) { self.operate_mut(|rti, e| { - rti.self_transfer_stats_accounting.add_down(bytes); + rti.transfer_stats_accounting().add_down(bytes); e.question_rcvd(ts, bytes); }) } pub fn stats_answer_sent(&self, bytes: u64) { self.operate_mut(|rti, e| { - rti.self_transfer_stats_accounting.add_up(bytes); + rti.transfer_stats_accounting().add_up(bytes); e.answer_sent(bytes); }) } pub fn stats_answer_rcvd(&self, send_ts: u64, recv_ts: u64, bytes: u64) { self.operate_mut(|rti, e| { - rti.self_transfer_stats_accounting.add_down(bytes); - rti.self_latency_stats_accounting + rti.transfer_stats_accounting().add_down(bytes); + rti.latency_stats_accounting() .record_latency(recv_ts - send_ts); e.answer_rcvd(send_ts, recv_ts, bytes); }) diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index 8228acb9..5dc639d8 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -14,7 +14,7 @@ pub struct SafetySpec { } /// Compiled route (safety route + private route) -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug)] pub struct CompiledRoute { /// The safety route attached to the private route safety_route: SafetyRoute, @@ -78,10 +78,6 @@ pub struct RouteSpecStoreCache { #[derive(Debug)] pub struct RouteSpecStore { - /// Our node id - node_id: DHTKey, - /// Our node id secret - node_id_secret: DHTKeySecret, /// Maximum number of hops in a route max_route_hop_count: usize, /// Default number of hops in a route @@ -179,8 +175,6 @@ impl RouteSpecStore { let c = config.get(); Self { - node_id: c.network.node_id, - node_id_secret: c.network.node_id_secret, max_route_hop_count: c.network.rpc.max_route_hop_count.into(), default_route_hop_count: c.network.rpc.default_route_hop_count.into(), content: RouteSpecStoreContent { @@ -198,8 +192,6 @@ impl RouteSpecStore { let rsstdb = table_store.open("RouteSpecStore", 1).await?; let content = rsstdb.load_cbor(0, b"content").await?.unwrap_or_default(); let mut rss = RouteSpecStore { - node_id: c.network.node_id, - node_id_secret: c.network.node_id_secret, max_route_hop_count: c.network.rpc.max_route_hop_count.into(), default_route_hop_count: c.network.rpc.default_route_hop_count.into(), content, @@ -295,6 +287,7 @@ impl RouteSpecStore { pub fn allocate_route( &mut self, rti: &RoutingTableInner, + routing_table: RoutingTable, reliable: bool, hop_count: usize, directions: DirectionSet, @@ -413,20 +406,12 @@ impl RouteSpecStore { }; // Pull the whole routing table in sorted order - let node_count = RoutingTable::get_entry_count_inner( - rti, + let node_count = rti.get_entry_count( RoutingDomain::PublicInternet.into(), BucketEntryState::Unreliable, ); - let nodes = RoutingTable::find_peers_with_sort_and_filter_inner( - rti, - self.node_id, - node_count, - cur_ts, - filter, - compare, - transform, - ); + let nodes = + rti.find_peers_with_sort_and_filter(node_count, cur_ts, filter, compare, transform); // If we couldn't find enough nodes, wait until we have more nodes in the routing table if nodes.len() < hop_count { @@ -450,15 +435,13 @@ impl RouteSpecStore { // Ensure this route is viable by checking that each node can contact the next one if directions.contains(Direction::Outbound) { - let our_node_info = - RoutingTable::get_own_node_info_inner(rti, RoutingDomain::PublicInternet); - let our_node_id = self.node_id; + let our_node_info = rti.get_own_node_info(RoutingDomain::PublicInternet); + let our_node_id = rti.node_id(); let mut previous_node = &(our_node_id, our_node_info); let mut reachable = true; for n in permutation { let current_node = nodes.get(*n).unwrap(); - let cm = RoutingTable::get_contact_method_inner( - rti, + let cm = rti.get_contact_method( RoutingDomain::PublicInternet, &previous_node.0, &previous_node.1, @@ -478,15 +461,13 @@ impl RouteSpecStore { } } if directions.contains(Direction::Inbound) { - let our_node_info = - RoutingTable::get_own_node_info_inner(rti, RoutingDomain::PublicInternet); - let our_node_id = self.node_id; + let our_node_info = rti.get_own_node_info(RoutingDomain::PublicInternet); + let our_node_id = rti.node_id(); let mut next_node = &(our_node_id, our_node_info); let mut reachable = true; for n in permutation.iter().rev() { let current_node = nodes.get(*n).unwrap(); - let cm = RoutingTable::get_contact_method_inner( - rti, + let cm = rti.get_contact_method( RoutingDomain::PublicInternet, &next_node.0, &next_node.1, @@ -522,7 +503,7 @@ impl RouteSpecStore { let hops = route_nodes.iter().map(|v| nodes[*v].0).collect(); let hop_node_refs = route_nodes .iter() - .map(|v| routing_table.lookup_node_ref(nodes[*v].0).unwrap()) + .map(|v| rti.lookup_node_ref(routing_table, nodes[*v].0).unwrap()) .collect(); let (public_key, secret_key) = generate_secret(); @@ -613,12 +594,15 @@ impl RouteSpecStore { ////////////////////////////////////////////////////////////////////// /// Compiles a safety route to the private route, with caching + /// Returns an Err() if the parameters are wrong + /// Returns Ok(None) if no allocation could happen at this time (not an error) pub fn compile_safety_route( &mut self, rti: &RoutingTableInner, + routing_table: RoutingTable, safety_spec: SafetySpec, private_route: PrivateRoute, - ) -> Result { + ) -> Result, RPCError> { let pr_hopcount = private_route.hop_count as usize; if pr_hopcount > self.max_route_hop_count { return Err(RPCError::internal("private route hop count too long")); @@ -647,7 +631,20 @@ impl RouteSpecStore { self.detail_mut(&sr_pubkey).unwrap() } else { // No route found, gotta allocate one - self.allocate_route(rti) + let sr_pubkey = match self + .allocate_route( + rti, + routing_table, + safety_spec.reliable, + safety_spec.hop_count, + Direction::Outbound.into(), + ) + .map_err(RPCError::internal)? + { + Some(pk) => pk, + None => return Ok(None), + }; + self.detail_mut(&sr_pubkey).unwrap() } }; @@ -787,7 +784,9 @@ impl RouteSpecStore { .detail_mut(&key) .ok_or_else(|| eyre!("route does not exist"))? .latency_stats_accounting; - self.detail_mut(&key).latency_stats = lsa.record_latency(latency); + self.detail_mut(&key) + .ok_or_else(|| eyre!("route does not exist"))? + .latency_stats = lsa.record_latency(latency); Ok(()) } diff --git a/veilid-core/src/routing_table/routing_domain_editor.rs b/veilid-core/src/routing_table/routing_domain_editor.rs index c507b3fa..65001c09 100644 --- a/veilid-core/src/routing_table/routing_domain_editor.rs +++ b/veilid-core/src/routing_table/routing_domain_editor.rs @@ -122,8 +122,7 @@ impl RoutingDomainEditor { let node_id = self.routing_table.node_id(); let mut inner = self.routing_table.inner.write(); - let inner = &mut *inner; - RoutingTable::with_routing_domain_mut(inner, self.routing_domain, |detail| { + inner.with_routing_domain_mut(self.routing_domain, |detail| { for change in self.changes { match change { RoutingDomainChange::ClearDialInfoDetails => { @@ -225,8 +224,8 @@ impl RoutingDomainEditor { } }); if changed { - RoutingTable::reset_all_seen_our_node_info(inner, self.routing_domain); - RoutingTable::reset_all_updated_since_last_network_change(inner); + inner.reset_all_seen_our_node_info(self.routing_domain); + inner.reset_all_updated_since_last_network_change(); } } if changed && self.send_node_info_updates { diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs new file mode 100644 index 00000000..f0bf9a24 --- /dev/null +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -0,0 +1,1033 @@ +use super::*; + +const RECENT_PEERS_TABLE_SIZE: usize = 64; + +////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, Clone, Copy)] +pub struct RecentPeersEntry { + pub last_connection: ConnectionDescriptor, +} + +/// RoutingTable rwlock-internal data +pub struct RoutingTableInner { + /// Extra pointer to unlocked members to simplify access + pub(super) unlocked_inner: Arc, + /// Routing table buckets that hold entries + pub(super) buckets: Vec, + /// A fast counter for the number of entries in the table, total + pub(super) bucket_entry_count: usize, + /// The public internet routing domain + pub(super) public_internet_routing_domain: PublicInternetRoutingDomainDetail, + /// The dial info we use on the local network + pub(super) local_network_routing_domain: LocalNetworkRoutingDomainDetail, + /// Interim accounting mechanism for this node's RPC latency to any other node + pub(super) self_latency_stats_accounting: LatencyStatsAccounting, + /// Interim accounting mechanism for the total bandwidth to/from this node + pub(super) self_transfer_stats_accounting: TransferStatsAccounting, + /// Statistics about the total bandwidth to/from this node + pub(super) self_transfer_stats: TransferStatsDownUp, + /// Peers we have recently communicated with + pub(super) recent_peers: LruCache, + /// Storage for private/safety RouteSpecs + pub(super) route_spec_store: RouteSpecStore, +} + +impl RoutingTableInner { + pub fn new(unlocked_inner: Arc) -> RoutingTableInner { + RoutingTableInner { + unlocked_inner, + buckets: Vec::new(), + public_internet_routing_domain: PublicInternetRoutingDomainDetail::default(), + local_network_routing_domain: LocalNetworkRoutingDomainDetail::default(), + bucket_entry_count: 0, + self_latency_stats_accounting: LatencyStatsAccounting::new(), + self_transfer_stats_accounting: TransferStatsAccounting::new(), + self_transfer_stats: TransferStatsDownUp::default(), + recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE), + route_spec_store: RouteSpecStore::new(unlocked_inner.config.clone()), + } + } + + pub fn network_manager(&self) -> NetworkManager { + self.unlocked_inner.network_manager.clone() + } + pub fn rpc_processor(&self) -> RPCProcessor { + self.network_manager().rpc_processor() + } + + pub fn node_id(&self) -> DHTKey { + self.unlocked_inner.node_id + } + + pub fn node_id_secret(&self) -> DHTKeySecret { + self.unlocked_inner.node_id_secret + } + + pub fn config(&self) -> VeilidConfig { + self.unlocked_inner.config.clone() + } + + pub fn transfer_stats_accounting(&mut self) -> &TransferStatsAccounting { + &mut self.self_transfer_stats_accounting + } + pub fn latency_stats_accounting(&mut self) -> &LatencyStatsAccounting { + &mut self.self_latency_stats_accounting + } + + pub fn routing_domain_for_address(&self, address: Address) -> Option { + for rd in RoutingDomain::all() { + let can_contain = self.with_routing_domain(rd, |rdd| rdd.can_contain_address(address)); + if can_contain { + return Some(rd); + } + } + None + } + + pub fn with_routing_domain(&self, domain: RoutingDomain, f: F) -> R + where + F: FnOnce(&dyn RoutingDomainDetail) -> R, + { + match domain { + RoutingDomain::PublicInternet => f(&self.public_internet_routing_domain), + RoutingDomain::LocalNetwork => f(&self.local_network_routing_domain), + } + } + + pub fn with_routing_domain_mut(&mut self, domain: RoutingDomain, f: F) -> R + where + F: FnOnce(&mut dyn RoutingDomainDetail) -> R, + { + match domain { + RoutingDomain::PublicInternet => f(&mut self.public_internet_routing_domain), + RoutingDomain::LocalNetwork => f(&mut self.local_network_routing_domain), + } + } + + pub fn with_route_spec_store_mut(&mut self, f: F) -> R + where + F: FnOnce(&mut RouteSpecStore, &mut RoutingTableInner) -> R, + { + f(&mut self.route_spec_store, self) + } + + pub fn with_route_spec_store(&self, f: F) -> R + where + F: FnOnce(&RouteSpecStore, &RoutingTableInner) -> R, + { + f(&self.route_spec_store, self) + } + + pub fn relay_node(&self, domain: RoutingDomain) -> Option { + self.with_routing_domain(domain, |rd| rd.common().relay_node()) + } + + pub fn has_dial_info(&self, domain: RoutingDomain) -> bool { + self.with_routing_domain(domain, |rd| !rd.common().dial_info_details().is_empty()) + } + + pub fn dial_info_details(&self, domain: RoutingDomain) -> Vec { + self.with_routing_domain(domain, |rd| rd.common().dial_info_details().clone()) + } + + pub fn first_filtered_dial_info_detail( + &self, + routing_domain_set: RoutingDomainSet, + filter: &DialInfoFilter, + ) -> Option { + for routing_domain in routing_domain_set { + let did = self.with_routing_domain(routing_domain, |rd| { + for did in rd.common().dial_info_details() { + if did.matches_filter(filter) { + return Some(did.clone()); + } + } + None + }); + if did.is_some() { + return did; + } + } + None + } + + pub fn all_filtered_dial_info_details( + &self, + routing_domain_set: RoutingDomainSet, + filter: &DialInfoFilter, + ) -> Vec { + let mut ret = Vec::new(); + for routing_domain in routing_domain_set { + self.with_routing_domain(routing_domain, |rd| { + for did in rd.common().dial_info_details() { + if did.matches_filter(filter) { + ret.push(did.clone()); + } + } + }); + } + ret.remove_duplicates(); + ret + } + + pub fn ensure_dial_info_is_valid(&self, domain: RoutingDomain, dial_info: &DialInfo) -> bool { + let address = dial_info.socket_address().address(); + let can_contain_address = + self.with_routing_domain(domain, |rd| rd.can_contain_address(address)); + + if !can_contain_address { + log_rtab!(debug "can not add dial info to this routing domain"); + return false; + } + if !dial_info.is_valid() { + log_rtab!(debug + "shouldn't be registering invalid addresses: {:?}", + dial_info + ); + return false; + } + true + } + + pub fn node_info_is_valid_in_routing_domain( + &self, + routing_domain: RoutingDomain, + node_info: &NodeInfo, + ) -> bool { + // Should not be passing around nodeinfo with an invalid network class + if matches!(node_info.network_class, NetworkClass::Invalid) { + return false; + } + // Ensure all of the dial info works in this routing domain + for did in &node_info.dial_info_detail_list { + if !self.ensure_dial_info_is_valid(routing_domain, &did.dial_info) { + return false; + } + } + // Ensure the relay is also valid in this routing domain if it is provided + if let Some(relay_peer_info) = node_info.relay_peer_info.as_ref() { + let relay_ni = &relay_peer_info.signed_node_info.node_info; + if !self.node_info_is_valid_in_routing_domain(routing_domain, relay_ni) { + return false; + } + } + true + } + + #[instrument(level = "trace", skip(self), ret)] + pub fn get_contact_method( + &self, + routing_domain: RoutingDomain, + node_a_id: &DHTKey, + node_a: &NodeInfo, + node_b_id: &DHTKey, + node_b: &NodeInfo, + dial_info_filter: DialInfoFilter, + reliable: bool, + ) -> ContactMethod { + self.with_routing_domain(routing_domain, |rdd| { + rdd.get_contact_method( + self, + node_a_id, + node_a, + node_b_id, + node_b, + dial_info_filter, + reliable, + ) + }) + } + + pub fn reset_all_seen_our_node_info(&mut self, routing_domain: RoutingDomain) { + let cur_ts = intf::get_timestamp(); + self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, _, v| { + v.with_mut(rti, |_rti, e| { + e.set_seen_our_node_info(routing_domain, false); + }); + Option::<()>::None + }); + } + + pub fn reset_all_updated_since_last_network_change(&mut self) { + let cur_ts = intf::get_timestamp(); + self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, _, v| { + v.with_mut(rti, |_rti, e| { + e.set_updated_since_last_network_change(false) + }); + Option::<()>::None + }); + } + + /// Return a copy of our node's peerinfo + pub fn get_own_peer_info(&self, routing_domain: RoutingDomain) -> PeerInfo { + self.with_routing_domain(routing_domain, |rdd| { + rdd.common().with_peer_info(|pi| pi.clone()) + }) + } + + /// Return a copy of our node's signednodeinfo + pub fn get_own_signed_node_info(&self, routing_domain: RoutingDomain) -> SignedNodeInfo { + self.with_routing_domain(routing_domain, |rdd| { + rdd.common() + .with_peer_info(|pi| pi.signed_node_info.clone()) + }) + } + + /// Return a copy of our node's nodeinfo + pub fn get_own_node_info(&self, routing_domain: RoutingDomain) -> NodeInfo { + self.with_routing_domain(routing_domain, |rdd| { + rdd.common() + .with_peer_info(|pi| pi.signed_node_info.node_info.clone()) + }) + } + + /// Return our currently registered network class + pub fn has_valid_own_node_info(&self, routing_domain: RoutingDomain) -> bool { + self.with_routing_domain(routing_domain, |rdd| rdd.common().has_valid_own_node_info()) + } + + /// Return the domain's currently registered network class + pub fn get_network_class(&self, routing_domain: RoutingDomain) -> Option { + self.with_routing_domain(routing_domain, |rdd| rdd.common().network_class()) + } + + /// Return the domain's filter for what we can receivein the form of a dial info filter + pub fn get_inbound_dial_info_filter(&self, routing_domain: RoutingDomain) -> DialInfoFilter { + self.with_routing_domain(routing_domain, |rdd| { + rdd.common().inbound_dial_info_filter() + }) + } + + /// Return the domain's filter for what we can receive in the form of a node ref filter + pub fn get_inbound_node_ref_filter(&self, routing_domain: RoutingDomain) -> NodeRefFilter { + let dif = self.get_inbound_dial_info_filter(routing_domain); + NodeRefFilter::new() + .with_routing_domain(routing_domain) + .with_dial_info_filter(dif) + } + + /// Return the domain's filter for what we can send out in the form of a dial info filter + pub fn get_outbound_dial_info_filter(&self, routing_domain: RoutingDomain) -> DialInfoFilter { + self.with_routing_domain(routing_domain, |rdd| { + rdd.common().outbound_dial_info_filter() + }) + } + /// Return the domain's filter for what we can receive in the form of a node ref filter + pub fn get_outbound_node_ref_filter(&self, routing_domain: RoutingDomain) -> NodeRefFilter { + let dif = self.get_outbound_dial_info_filter(routing_domain); + NodeRefFilter::new() + .with_routing_domain(routing_domain) + .with_dial_info_filter(dif) + } + + fn bucket_depth(index: usize) -> usize { + match index { + 0 => 256, + 1 => 128, + 2 => 64, + 3 => 32, + 4 => 16, + 5 => 8, + 6 => 4, + 7 => 4, + 8 => 4, + 9 => 4, + _ => 4, + } + } + + pub fn init(&mut self, routing_table: RoutingTable) -> EyreResult<()> { + // Size the buckets (one per bit) + self.buckets.reserve(DHT_KEY_LENGTH * 8); + for _ in 0..DHT_KEY_LENGTH * 8 { + let bucket = Bucket::new(routing_table.clone()); + self.buckets.push(bucket); + } + Ok(()) + } + + pub fn terminate(&mut self) { + // + } + + pub fn configure_local_network_routing_domain( + &mut self, + local_networks: Vec<(IpAddr, IpAddr)>, + ) { + log_net!(debug "configure_local_network_routing_domain: {:#?}", local_networks); + + let changed = self + .local_network_routing_domain + .set_local_networks(local_networks); + + // If the local network topology has changed, nuke the existing local node info and let new local discovery happen + if changed { + let cur_ts = intf::get_timestamp(); + self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, _, e| { + e.with_mut(rti, |_rti, e| { + e.clear_signed_node_info(RoutingDomain::LocalNetwork); + e.set_seen_our_node_info(RoutingDomain::LocalNetwork, false); + e.set_updated_since_last_network_change(false); + }); + Option::<()>::None + }); + } + } + + /// Attempt to empty the routing table + /// should only be performed when there are no node_refs (detached) + pub fn purge_buckets(&mut self) { + log_rtab!( + "Starting routing table buckets purge. Table currently has {} nodes", + self.bucket_entry_count + ); + for bucket in &self.buckets { + bucket.kick(self, 0); + } + log_rtab!(debug + "Routing table buckets purge complete. Routing table now has {} nodes", + self.bucket_entry_count + ); + } + + /// Attempt to remove last_connections from entries + pub fn purge_last_connections(&mut self) { + log_rtab!( + "Starting routing table last_connections purge. Table currently has {} nodes", + self.bucket_entry_count + ); + for bucket in &self.buckets { + for entry in bucket.entries() { + entry.1.with_mut(self, |_rti, e| { + e.clear_last_connections(); + }); + } + } + log_rtab!(debug + "Routing table last_connections purge complete. Routing table now has {} nodes", + self.bucket_entry_count + ); + } + + /// Attempt to settle buckets and remove entries down to the desired number + /// which may not be possible due extant NodeRefs + pub fn kick_bucket(&mut self, idx: usize) { + let bucket = &mut self.buckets[idx]; + let bucket_depth = Self::bucket_depth(idx); + + if let Some(dead_node_ids) = bucket.kick(self, bucket_depth) { + // Remove counts + self.bucket_entry_count -= dead_node_ids.len(); + log_rtab!(debug "Routing table now has {} nodes", self.bucket_entry_count); + + // Now purge the routing table inner vectors + //let filter = |k: &DHTKey| dead_node_ids.contains(k); + //inner.closest_reliable_nodes.retain(filter); + //inner.fastest_reliable_nodes.retain(filter); + //inner.closest_nodes.retain(filter); + //inner.fastest_nodes.retain(filter); + } + } + + pub fn find_bucket_index(&self, node_id: DHTKey) -> usize { + distance(&node_id, &self.unlocked_inner.node_id) + .first_nonzero_bit() + .unwrap() + } + + pub fn get_entry_count( + &self, + routing_domain_set: RoutingDomainSet, + min_state: BucketEntryState, + ) -> usize { + let mut count = 0usize; + let cur_ts = intf::get_timestamp(); + self.with_entries(cur_ts, min_state, |rti, _, e| { + if e.with(rti, |_rti, e| e.best_routing_domain(routing_domain_set)) + .is_some() + { + count += 1; + } + Option::<()>::None + }); + count + } + + pub fn with_entries) -> Option>( + &self, + cur_ts: u64, + min_state: BucketEntryState, + mut f: F, + ) -> Option { + for bucket in &self.buckets { + for entry in bucket.entries() { + if entry.1.with(self, |_rti, e| e.state(cur_ts) >= min_state) { + if let Some(out) = f(self, *entry.0, entry.1.clone()) { + return Some(out); + } + } + } + } + None + } + + pub fn with_entries_mut< + T, + F: FnMut(&mut RoutingTableInner, DHTKey, Arc) -> Option, + >( + &mut self, + cur_ts: u64, + min_state: BucketEntryState, + mut f: F, + ) -> Option { + for bucket in &self.buckets { + for entry in bucket.entries() { + if entry.1.with(self, |_rti, e| e.state(cur_ts) >= min_state) { + if let Some(out) = f(self, *entry.0, entry.1.clone()) { + return Some(out); + } + } + } + } + None + } + + pub fn get_nodes_needing_updates( + &self, + outer_self: RoutingTable, + routing_domain: RoutingDomain, + cur_ts: u64, + all: bool, + ) -> Vec { + let mut node_refs = Vec::::with_capacity(self.bucket_entry_count); + self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, k, v| { + // Only update nodes that haven't seen our node info yet + if all || !v.with(rti, |_rti, e| e.has_seen_our_node_info(routing_domain)) { + node_refs.push(NodeRef::new( + outer_self.clone(), + k, + v, + Some(NodeRefFilter::new().with_routing_domain(routing_domain)), + )); + } + Option::<()>::None + }); + node_refs + } + + pub fn get_nodes_needing_ping( + &self, + outer_self: RoutingTable, + routing_domain: RoutingDomain, + cur_ts: u64, + ) -> Vec { + // Collect relay nodes + let opt_relay_id = self.with_routing_domain(routing_domain, |rd| { + rd.common().relay_node().map(|rn| rn.node_id()) + }); + + // Collect all entries that are 'needs_ping' and have some node info making them reachable somehow + let mut node_refs = Vec::::with_capacity(self.bucket_entry_count); + self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, k, v| { + if v.with(rti, |_rti, e| { + e.has_node_info(routing_domain.into()) + && e.needs_ping(cur_ts, opt_relay_id == Some(k)) + }) { + node_refs.push(NodeRef::new( + outer_self.clone(), + k, + v, + Some(NodeRefFilter::new().with_routing_domain(routing_domain)), + )); + } + Option::<()>::None + }); + node_refs + } + + pub fn get_all_nodes(&self, outer_self: RoutingTable, cur_ts: u64) -> Vec { + let mut node_refs = Vec::::with_capacity(self.bucket_entry_count); + self.with_entries(cur_ts, BucketEntryState::Unreliable, |_rti, k, v| { + node_refs.push(NodeRef::new(outer_self.clone(), k, v, None)); + Option::<()>::None + }); + node_refs + } + + /// Create a node reference, possibly creating a bucket entry + /// the 'update_func' closure is called on the node, and, if created, + /// in a locked fashion as to ensure the bucket entry state is always valid + pub fn create_node_ref( + &mut self, + outer_self: RoutingTable, + node_id: DHTKey, + update_func: F, + ) -> Option + where + F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner), + { + // Ensure someone isn't trying register this node itself + if node_id == self.node_id() { + log_rtab!(debug "can't register own node"); + return None; + } + + // Look up existing entry + let idx = self.find_bucket_index(node_id); + let noderef = { + let bucket = &self.buckets[idx]; + let entry = bucket.entry(&node_id); + entry.map(|e| NodeRef::new(outer_self.clone(), node_id, e, None)) + }; + + // If one doesn't exist, insert into bucket, possibly evicting a bucket member + let noderef = match noderef { + None => { + // Make new entry + self.bucket_entry_count += 1; + let cnt = self.bucket_entry_count; + let bucket = &mut self.buckets[idx]; + let nr = bucket.add_entry(node_id); + + // Update the entry + let entry = bucket.entry(&node_id).unwrap(); + entry.with_mut(self, update_func); + + // Kick the bucket + self.unlocked_inner.kick_queue.lock().insert(idx); + log_rtab!(debug "Routing table now has {} nodes, {} live", cnt, self.get_entry_count(RoutingDomainSet::all(), BucketEntryState::Unreliable)); + + nr + } + Some(nr) => { + // Update the entry + let bucket = &mut self.buckets[idx]; + let entry = bucket.entry(&node_id).unwrap(); + entry.with_mut(self, update_func); + + nr + } + }; + + Some(noderef) + } + + /// Resolve an existing routing table entry and return a reference to it + pub fn lookup_node_ref(&self, outer_self: RoutingTable, node_id: DHTKey) -> Option { + if node_id == self.unlocked_inner.node_id { + log_rtab!(debug "can't look up own node id in routing table"); + return None; + } + let idx = self.find_bucket_index(node_id); + let bucket = &self.buckets[idx]; + bucket + .entry(&node_id) + .map(|e| NodeRef::new(outer_self, node_id, e, None)) + } + + /// Resolve an existing routing table entry and return a filtered reference to it + pub fn lookup_and_filter_noderef( + &self, + outer_self: RoutingTable, + node_id: DHTKey, + routing_domain_set: RoutingDomainSet, + dial_info_filter: DialInfoFilter, + ) -> Option { + let nr = self.lookup_node_ref(outer_self, node_id)?; + Some( + nr.filtered_clone( + NodeRefFilter::new() + .with_dial_info_filter(dial_info_filter) + .with_routing_domain_set(routing_domain_set), + ), + ) + } + + /// Shortcut function to add a node to our routing table if it doesn't exist + /// and add the dial info we have for it. Returns a noderef filtered to + /// the routing domain in which this node was registered for convenience. + pub fn register_node_with_signed_node_info( + &mut self, + outer_self: RoutingTable, + routing_domain: RoutingDomain, + node_id: DHTKey, + signed_node_info: SignedNodeInfo, + allow_invalid: bool, + ) -> Option { + // validate signed node info is not something malicious + if node_id == self.node_id() { + log_rtab!(debug "can't register own node id in routing table"); + return None; + } + if let Some(rpi) = &signed_node_info.node_info.relay_peer_info { + if rpi.node_id.key == node_id { + log_rtab!(debug "node can not be its own relay"); + return None; + } + } + if !allow_invalid { + // verify signature + if !signed_node_info.has_valid_signature() { + log_rtab!(debug "signed node info for {} has invalid signature", node_id); + return None; + } + // verify signed node info is valid in this routing domain + if !self + .node_info_is_valid_in_routing_domain(routing_domain, &signed_node_info.node_info) + { + log_rtab!(debug "signed node info for {} not valid in the {:?} routing domain", node_id, routing_domain); + return None; + } + } + + self.create_node_ref(outer_self, node_id, |_rti, e| { + e.update_signed_node_info(routing_domain, signed_node_info); + }) + .map(|mut nr| { + nr.set_filter(Some( + NodeRefFilter::new().with_routing_domain(routing_domain), + )); + nr + }) + } + + /// Shortcut function to add a node to our routing table if it doesn't exist + /// and add the last peer address we have for it, since that's pretty common + pub fn register_node_with_existing_connection( + &mut self, + outer_self: RoutingTable, + node_id: DHTKey, + descriptor: ConnectionDescriptor, + timestamp: u64, + ) -> Option { + let out = self.create_node_ref(outer_self, node_id, |_rti, e| { + // this node is live because it literally just connected to us + e.touch_last_seen(timestamp); + }); + if let Some(nr) = &out { + // set the most recent node address for connection finding and udp replies + nr.set_last_connection(descriptor, timestamp); + } + out + } + + ////////////////////////////////////////////////////////////////////// + // Routing Table Health Metrics + + pub fn get_routing_table_health(&self) -> RoutingTableHealth { + let mut health = RoutingTableHealth::default(); + let cur_ts = intf::get_timestamp(); + for bucket in &self.buckets { + for (_, v) in bucket.entries() { + match v.with(self, |_rti, e| e.state(cur_ts)) { + BucketEntryState::Reliable => { + health.reliable_entry_count += 1; + } + BucketEntryState::Unreliable => { + health.unreliable_entry_count += 1; + } + BucketEntryState::Dead => { + health.dead_entry_count += 1; + } + } + } + } + health + } + + pub fn get_recent_peers( + &mut self, + outer_self: RoutingTable, + ) -> Vec<(DHTKey, RecentPeersEntry)> { + let mut recent_peers = Vec::new(); + let mut dead_peers = Vec::new(); + let mut out = Vec::new(); + + // collect all recent peers + for (k, _v) in &self.recent_peers { + recent_peers.push(*k); + } + + // look up each node and make sure the connection is still live + // (uses same logic as send_data, ensuring last_connection works for UDP) + for e in &recent_peers { + let mut dead = true; + if let Some(nr) = self.lookup_node_ref(outer_self.clone(), *e) { + if let Some(last_connection) = nr.last_connection() { + out.push((*e, RecentPeersEntry { last_connection })); + dead = false; + } + } + if dead { + dead_peers.push(e); + } + } + + // purge dead recent peers + for d in dead_peers { + self.recent_peers.remove(d); + } + + out + } + + pub fn touch_recent_peer(&self, node_id: DHTKey, last_connection: ConnectionDescriptor) { + self.recent_peers + .insert(node_id, RecentPeersEntry { last_connection }); + } + + ////////////////////////////////////////////////////////////////////// + // Find Nodes + + // Retrieve the fastest nodes in the routing table matching an entry filter + pub fn find_fast_public_nodes_filtered<'a, 'b, F>( + &self, + outer_self: RoutingTable, + node_count: usize, + mut entry_filter: F, + ) -> Vec + where + F: FnMut(&'a RoutingTableInner, &'b BucketEntryInner) -> bool, + { + self.find_fastest_nodes( + // count + node_count, + // filter + |rti, _k: DHTKey, v: Option>| { + let entry = v.unwrap(); + entry.with(rti, |rti, e| { + // skip nodes on local network + if e.node_info(RoutingDomain::LocalNetwork).is_some() { + return false; + } + // skip nodes not on public internet + if e.node_info(RoutingDomain::PublicInternet).is_none() { + return false; + } + // skip nodes that dont match entry filter + entry_filter(rti, e) + }) + }, + // transform + |_rti, k: DHTKey, v: Option>| { + NodeRef::new(outer_self.clone(), k, v.unwrap().clone(), None) + }, + ) + } + + pub fn filter_has_valid_signed_node_info( + &self, + routing_domain: RoutingDomain, + has_valid_own_node_info: bool, + v: Option>, + ) -> bool { + match v { + None => has_valid_own_node_info, + Some(entry) => entry.with(self, |_rti, e| { + e.signed_node_info(routing_domain.into()) + .map(|sni| sni.has_valid_signature()) + .unwrap_or(false) + }), + } + } + + pub fn transform_to_peer_info( + &self, + routing_domain: RoutingDomain, + own_peer_info: PeerInfo, + k: DHTKey, + v: Option>, + ) -> PeerInfo { + match v { + None => own_peer_info, + Some(entry) => entry.with(self, |_rti, e| e.make_peer_info(k, routing_domain).unwrap()), + } + } + + pub fn find_peers_with_sort_and_filter<'a, 'b, F, C, T, O>( + &self, + node_count: usize, + cur_ts: u64, + mut filter: F, + compare: C, + mut transform: T, + ) -> Vec + where + F: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> bool, + C: FnMut( + &'a RoutingTableInner, + &'b (DHTKey, Option>), + &'b (DHTKey, Option>), + ) -> core::cmp::Ordering, + T: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> O, + { + // collect all the nodes for sorting + let mut nodes = + Vec::<(DHTKey, Option>)>::with_capacity(self.bucket_entry_count + 1); + + // add our own node (only one of there with the None entry) + if filter(self, self.unlocked_inner.node_id, None) { + nodes.push((self.unlocked_inner.node_id, None)); + } + + // add all nodes from buckets + self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, k, v| { + // Apply filter + if filter(rti, k, Some(v.clone())) { + nodes.push((k, Some(v.clone()))); + } + Option::<()>::None + }); + + // sort by preference for returning nodes + nodes.sort_by(|a, b| compare(self, a, b)); + + // return transformed vector for filtered+sorted nodes + let cnt = usize::min(node_count, nodes.len()); + let mut out = Vec::::with_capacity(cnt); + for node in nodes { + let val = transform(self, node.0, node.1); + out.push(val); + } + + out + } + + pub fn find_fastest_nodes<'a, T, F, O>( + &self, + node_count: usize, + mut filter: F, + transform: T, + ) -> Vec + where + F: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> bool, + T: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> O, + { + let cur_ts = intf::get_timestamp(); + let out = self.find_peers_with_sort_and_filter( + node_count, + cur_ts, + // filter + |rti, k, v| { + if let Some(entry) = &v { + // always filter out dead nodes + if entry.with(rti, |_rti, e| e.state(cur_ts) == BucketEntryState::Dead) { + false + } else { + filter(rti, k, v) + } + } else { + // always filter out self peer, as it is irrelevant to the 'fastest nodes' search + false + } + }, + // sort + |rti, (a_key, a_entry), (b_key, b_entry)| { + // same nodes are always the same + if a_key == b_key { + return core::cmp::Ordering::Equal; + } + // our own node always comes last (should not happen, here for completeness) + if a_entry.is_none() { + return core::cmp::Ordering::Greater; + } + if b_entry.is_none() { + return core::cmp::Ordering::Less; + } + // reliable nodes come first + let ae = a_entry.as_ref().unwrap(); + let be = b_entry.as_ref().unwrap(); + ae.with(rti, |rti, ae| { + be.with(rti, |_rti, be| { + let ra = ae.check_reliable(cur_ts); + let rb = be.check_reliable(cur_ts); + if ra != rb { + if ra { + return core::cmp::Ordering::Less; + } else { + return core::cmp::Ordering::Greater; + } + } + + // latency is the next metric, closer nodes first + let a_latency = match ae.peer_stats().latency.as_ref() { + None => { + // treat unknown latency as slow + return core::cmp::Ordering::Greater; + } + Some(l) => l, + }; + let b_latency = match be.peer_stats().latency.as_ref() { + None => { + // treat unknown latency as slow + return core::cmp::Ordering::Less; + } + Some(l) => l, + }; + // Sort by average latency + a_latency.average.cmp(&b_latency.average) + }) + }) + }, + // transform, + transform, + ); + out + } + + pub fn find_closest_nodes<'a, F, T, O>( + &self, + node_id: DHTKey, + filter: F, + mut transform: T, + ) -> Vec + where + F: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> bool, + T: FnMut(&'a RoutingTableInner, DHTKey, Option>) -> O, + { + let cur_ts = intf::get_timestamp(); + let node_count = { + let config = self.config(); + let c = config.get(); + c.network.dht.max_find_node_count as usize + }; + let out = self.find_peers_with_sort_and_filter( + node_count, + cur_ts, + // filter + filter, + // sort + |rti, (a_key, a_entry), (b_key, b_entry)| { + // same nodes are always the same + if a_key == b_key { + return core::cmp::Ordering::Equal; + } + + // reliable nodes come first, pessimistically treating our own node as unreliable + let ra = a_entry + .as_ref() + .map_or(false, |x| x.with(rti, |_rti, x| x.check_reliable(cur_ts))); + let rb = b_entry + .as_ref() + .map_or(false, |x| x.with(rti, |_rti, x| x.check_reliable(cur_ts))); + if ra != rb { + if ra { + return core::cmp::Ordering::Less; + } else { + return core::cmp::Ordering::Greater; + } + } + + // distance is the next metric, closer nodes first + let da = distance(a_key, &node_id); + let db = distance(b_key, &node_id); + da.cmp(&db) + }, + // transform, + &mut transform, + ); + log_rtab!(">> find_closest_nodes: node count = {}", out.len()); + out + } +} diff --git a/veilid-core/src/routing_table/tasks.rs b/veilid-core/src/routing_table/tasks.rs index d43ed5a4..f651d30e 100644 --- a/veilid-core/src/routing_table/tasks.rs +++ b/veilid-core/src/routing_table/tasks.rs @@ -47,7 +47,7 @@ impl RoutingTable { .collect(); let mut inner = self.inner.write(); for idx in kick_queue { - Self::kick_bucket(&mut *inner, idx) + inner.kick_bucket(idx) } Ok(()) } diff --git a/veilid-core/src/rpc_processor/coders/private_safety_route.rs b/veilid-core/src/rpc_processor/coders/private_safety_route.rs index f96261cb..50604970 100644 --- a/veilid-core/src/rpc_processor/coders/private_safety_route.rs +++ b/veilid-core/src/rpc_processor/coders/private_safety_route.rs @@ -24,14 +24,42 @@ pub fn encode_route_hop_data( Ok(()) } +pub fn decode_route_hop_data( + reader: &veilid_capnp::route_hop_data::Reader, +) -> Result { + let nonce = decode_nonce( + &reader + .reborrow() + .get_nonce() + .map_err(RPCError::map_protocol("invalid nonce in route hop data"))?, + ); + + let blob = reader + .reborrow() + .get_blob() + .map_err(RPCError::map_protocol("invalid blob in route hop data"))? + .to_vec(); + + Ok(RouteHopData { nonce, blob }) +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// + pub fn encode_route_hop( route_hop: &RouteHop, builder: &mut veilid_capnp::route_hop::Builder, ) -> Result<(), RPCError> { - encode_node_dial_info( - &route_hop.dial_info, - &mut builder.reborrow().init_dial_info(), - )?; + let node_builder = builder.reborrow().init_node(); + match &route_hop.node { + RouteNode::NodeId(ni) => { + let ni_builder = node_builder.init_node_id(); + encode_public_key(&ni.key, &mut ni_builder)?; + } + RouteNode::PeerInfo(pi) => { + let pi_builder = node_builder.init_peer_info(); + encode_peer_info(&pi, &mut pi_builder)?; + } + } if let Some(rhd) = &route_hop.next_hop { let mut rhd_builder = builder.reborrow().init_next_hop(); encode_route_hop_data(rhd, &mut rhd_builder)?; @@ -39,6 +67,36 @@ pub fn encode_route_hop( Ok(()) } +pub fn decode_route_hop(reader: &veilid_capnp::route_hop::Reader) -> Result { + let n_reader = reader.reborrow().get_node(); + let node = match n_reader.which().map_err(RPCError::protocol)? { + veilid_capnp::route_hop::node::Which::NodeId(ni) => { + let ni_reader = ni.map_err(RPCError::protocol)?; + RouteNode::NodeId(NodeId::new(decode_public_key(&ni_reader))) + } + veilid_capnp::route_hop::node::Which::PeerInfo(pi) => { + let pi_reader = pi.map_err(RPCError::protocol)?; + RouteNode::PeerInfo( + decode_peer_info(&pi_reader, true) + .map_err(RPCError::map_protocol("invalid peer info in route hop"))?, + ) + } + }; + + let next_hop = if reader.has_next_hop() { + let rhd_reader = reader + .get_next_hop() + .map_err(RPCError::map_protocol("invalid next hop in route hop"))?; + Some(decode_route_hop_data(&rhd_reader)?) + } else { + None + }; + + Ok(RouteHop { node, next_hop }) +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// + pub fn encode_private_route( private_route: &PrivateRoute, builder: &mut veilid_capnp::private_route::Builder, @@ -48,7 +106,7 @@ pub fn encode_private_route( &mut builder.reborrow().init_public_key(), )?; builder.set_hop_count(private_route.hop_count); - if let Some(rh) = &private_route.hops { + if let Some(rh) = &private_route.first_hop { let mut rh_builder = builder.reborrow().init_first_hop(); encode_route_hop(rh, &mut rh_builder)?; }; @@ -56,6 +114,31 @@ pub fn encode_private_route( Ok(()) } +pub fn decode_private_route( + reader: &veilid_capnp::private_route::Reader, +) -> Result { + let public_key = decode_public_key(&reader.get_public_key().map_err( + RPCError::map_protocol("invalid public key in private route"), + )?); + let hop_count = reader.get_hop_count(); + let first_hop = if reader.has_first_hop() { + let rh_reader = reader + .get_first_hop() + .map_err(RPCError::map_protocol("invalid first hop in private route"))?; + Some(decode_route_hop(&rh_reader)?) + } else { + None + }; + + Ok(PrivateRoute { + public_key, + hop_count, + first_hop, + }) +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// + pub fn encode_safety_route( safety_route: &SafetyRoute, builder: &mut veilid_capnp::safety_route::Builder, @@ -80,71 +163,6 @@ pub fn encode_safety_route( Ok(()) } -pub fn decode_route_hop_data( - reader: &veilid_capnp::route_hop_data::Reader, -) -> Result { - let nonce = decode_nonce( - &reader - .reborrow() - .get_nonce() - .map_err(RPCError::map_protocol("invalid nonce in route hop data"))?, - ); - - let blob = reader - .reborrow() - .get_blob() - .map_err(RPCError::map_protocol("invalid blob in route hop data"))? - .to_vec(); - - Ok(RouteHopData { nonce, blob }) -} - -pub fn decode_route_hop(reader: &veilid_capnp::route_hop::Reader) -> Result { - let dial_info = decode_node_dial_info( - &reader - .reborrow() - .get_dial_info() - .map_err(RPCError::map_protocol("invalid dial info in route hop"))?, - )?; - - let next_hop = if reader.has_next_hop() { - let rhd_reader = reader - .get_next_hop() - .map_err(RPCError::map_protocol("invalid next hop in route hop"))?; - Some(decode_route_hop_data(&rhd_reader)?) - } else { - None - }; - - Ok(RouteHop { - dial_info, - next_hop, - }) -} - -pub fn decode_private_route( - reader: &veilid_capnp::private_route::Reader, -) -> Result { - let public_key = decode_public_key(&reader.get_public_key().map_err( - RPCError::map_protocol("invalid public key in private route"), - )?); - let hop_count = reader.get_hop_count(); - let hops = if reader.has_first_hop() { - let rh_reader = reader - .get_first_hop() - .map_err(RPCError::map_protocol("invalid first hop in private route"))?; - Some(decode_route_hop(&rh_reader)?) - } else { - None - }; - - Ok(PrivateRoute { - public_key, - hop_count, - hops, - }) -} - pub fn decode_safety_route( reader: &veilid_capnp::safety_route::Reader, ) -> Result { diff --git a/veilid-core/src/rpc_processor/destination.rs b/veilid-core/src/rpc_processor/destination.rs index 76d86990..152923cf 100644 --- a/veilid-core/src/rpc_processor/destination.rs +++ b/veilid-core/src/rpc_processor/destination.rs @@ -8,7 +8,7 @@ pub enum Destination { /// The node to send to target: NodeRef, /// Require safety route or not - safety: Option, + safety_spec: Option, }, /// Send to node for relay purposes Relay { @@ -17,14 +17,14 @@ pub enum Destination { /// The final destination the relay should send to target: DHTKey, /// Require safety route or not - safety: Option, + safety_spec: Option, }, /// Send to private route (privateroute) PrivateRoute { /// A private route to send to private_route: PrivateRoute, /// Require safety route or not - safety: Option, + safety_spec: Option, /// Prefer reliability or not reliable: bool, }, @@ -34,46 +34,49 @@ impl Destination { pub fn direct(target: NodeRef) -> Self { Self::Direct { target, - safety: None, + safety_spec: None, } } pub fn relay(relay: NodeRef, target: DHTKey) -> Self { Self::Relay { relay, target, - safety: None, + safety_spec: None, } } pub fn private_route(private_route: PrivateRoute, reliable: bool) -> Self { Self::PrivateRoute { private_route, - safety: None, + safety_spec: None, reliable, } } - pub fn with_safety(self, spec: SafetySpec) -> Self { + pub fn with_safety(self, safety_spec: SafetySpec) -> Self { match self { - Destination::Direct { target, safety: _ } => Self::Direct { + Destination::Direct { target, - safety: Some(spec), + safety_spec: _, + } => Self::Direct { + target, + safety_spec: Some(safety_spec), }, Destination::Relay { relay, target, - safety: _, + safety_spec: _, } => Self::Relay { relay, target, - safety: Some(spec), + safety_spec: Some(safety_spec), }, Destination::PrivateRoute { private_route, - safety: _, + safety_spec: _, reliable, } => Self::PrivateRoute { private_route, - safety: Some(spec), + safety_spec: Some(safety_spec), reliable, }, } @@ -83,26 +86,29 @@ impl Destination { impl fmt::Display for Destination { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - Destination::Direct { target, safety } => { - let sr = if safety.is_some() { "+SR" } else { "" }; + Destination::Direct { + target, + safety_spec, + } => { + let sr = if safety_spec.is_some() { "+SR" } else { "" }; write!(f, "{}{}", target, sr) } Destination::Relay { relay, target, - safety, + safety_spec, } => { - let sr = if safety.is_some() { "+SR" } else { "" }; + let sr = if safety_spec.is_some() { "+SR" } else { "" }; write!(f, "{}@{}{}", target.encode(), relay, sr) } Destination::PrivateRoute { private_route, - safety, + safety_spec, reliable, } => { - let sr = if safety.is_some() { "+SR" } else { "" }; + let sr = if safety_spec.is_some() { "+SR" } else { "" }; let rl = if *reliable { "+RL" } else { "" }; write!(f, "{}{}{}", private_route, sr, rl) diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index dab096a6..0edc535c 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -1,7 +1,6 @@ mod coders; mod destination; mod operation_waiter; -mod private_route; mod rpc_app_call; mod rpc_app_message; mod rpc_cancel_tunnel; @@ -24,7 +23,6 @@ mod rpc_watch_value; pub use destination::*; pub use operation_waiter::*; -pub use private_route::*; pub use rpc_error::*; use super::*; @@ -398,7 +396,7 @@ impl RPCProcessor { } // Wrap an operation with a private route inside a safety route - pub(super) fn wrap_with_route( + pub(super) fn wrap_with_route(xxx continue here &self, safety_spec: SafetySpec, private_route: PrivateRoute, @@ -406,7 +404,7 @@ impl RPCProcessor { ) -> Result { let compiled_route: CompiledRoute = self.routing_table().with_route_spec_store(|rss| { // Compile the safety route with the private route - rss.compile_safety_route(safety_spec, private_route) + rss.compile_safety_route(self.safety_spec, private_route) })?; // Encrypt routed operation diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index 2448b9b8..b5674b70 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -72,8 +72,7 @@ impl RPCProcessor { find_node_q.node_id, // filter |rti, _k, v| { - RoutingTable::filter_has_valid_signed_node_info_inner( - rti, + rti.filter_has_valid_signed_node_info( RoutingDomain::PublicInternet, has_valid_own_node_info, v, @@ -81,11 +80,9 @@ impl RPCProcessor { }, // transform |rti, k, v| { - let own_peer_info = own_peer_info.clone(); - RoutingTable::transform_to_peer_info_inner( - rti, + rti.transform_to_peer_info( RoutingDomain::PublicInternet, - own_peer_info, + own_peer_info.clone(), k, v, ) diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index 96f46ae1..e990ebfc 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -341,7 +341,7 @@ impl VeilidAPI { nr.merge_filter(NodeRefFilter::new().with_routing_domain(routing_domain)) } - let cm = routing_table + let cm = network_manager .get_node_contact_method(nr) .map_err(VeilidAPIError::internal)?; diff --git a/veilid-core/src/veilid_api/privacy.rs b/veilid-core/src/veilid_api/privacy.rs index ea37ecfd..dafd0c25 100644 --- a/veilid-core/src/veilid_api/privacy.rs +++ b/veilid-core/src/veilid_api/privacy.rs @@ -11,7 +11,7 @@ pub struct RouteHopData { #[derive(Clone, Debug)] pub enum RouteNode { - NodeId(DHTKey), + NodeId(NodeId), PeerInfo(PeerInfo), } impl fmt::Display for RouteNode { @@ -20,7 +20,7 @@ impl fmt::Display for RouteNode { f, "{}", match self { - RouteNode::NodeId(x) => x.encode(), + RouteNode::NodeId(x) => x.key.encode(), RouteNode::PeerInfo(pi) => pi.node_id.key.encode(), } ) diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index 215a8b2c..e0c3a8b4 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -11,7 +11,7 @@ pub struct RoutingContextInner {} pub struct RoutingContextUnlockedInner { /// Enforce use of private routing - privacy: bool, + privacy: usize, /// Choose reliable protocols over unreliable/faster protocols when available reliable: bool, } @@ -41,21 +41,45 @@ impl RoutingContext { api, inner: Arc::new(Mutex::new(RoutingContextInner {})), unlocked_inner: Arc::new(RoutingContextUnlockedInner { - privacy: false, + privacy: 0, reliable: false, }), } } - pub fn with_privacy(self) -> Self { - Self { + pub fn with_default_privacy(self) -> Result { + let config = self.api.config()?; + let c = config.get(); + Ok(Self { api: self.api.clone(), inner: Arc::new(Mutex::new(RoutingContextInner {})), unlocked_inner: Arc::new(RoutingContextUnlockedInner { - privacy: true, + privacy: c.network.rpc.default_route_hop_count as usize, reliable: self.unlocked_inner.reliable, }), - } + }) + } + pub fn with_privacy(self, hops: usize) -> Result { + let config = self.api.config()?; + let c = config.get(); + + let privacy = if hops > 0 && hops <= c.network.rpc.max_route_hop_count as usize { + hops + } else { + return Err(VeilidAPIError::invalid_argument( + "hops value is too large", + "hops", + hops, + )); + }; + Ok(Self { + api: self.api.clone(), + inner: Arc::new(Mutex::new(RoutingContextInner {})), + unlocked_inner: Arc::new(RoutingContextUnlockedInner { + privacy, + reliable: self.unlocked_inner.reliable, + }), + }) } pub fn with_reliability(self) -> Self { @@ -93,12 +117,20 @@ impl RoutingContext { } Ok(rpc_processor::Destination::Direct { target: nr, - safety: self.unlocked_inner.privacy, + safety_spec: Some(routing_table::SafetySpec { + preferred_route: None, + hop_count: self.unlocked_inner.privacy, + reliable: self.unlocked_inner.reliable, + }), }) } Target::PrivateRoute(pr) => Ok(rpc_processor::Destination::PrivateRoute { private_route: pr, - safety: self.unlocked_inner.privacy, + safety_spec: Some(routing_table::SafetySpec { + preferred_route: None, + hop_count: self.unlocked_inner.privacy, + reliable: self.unlocked_inner.reliable, + }), reliable: self.unlocked_inner.reliable, }), } diff --git a/veilid-server/src/settings.rs b/veilid-server/src/settings.rs index f40dd2ea..6aed0875 100644 --- a/veilid-server/src/settings.rs +++ b/veilid-server/src/settings.rs @@ -81,7 +81,7 @@ core: max_timestamp_behind_ms: 10000 max_timestamp_ahead_ms: 10000 timeout_ms: 10000 - max_route_hop_count: 7 + max_route_hop_count: 4 default_route_hop_count: 2 dht: resolve_node_timeout: @@ -1491,7 +1491,7 @@ mod tests { assert_eq!(s.core.network.rpc.max_timestamp_behind_ms, Some(10_000u32)); assert_eq!(s.core.network.rpc.max_timestamp_ahead_ms, Some(10_000u32)); assert_eq!(s.core.network.rpc.timeout_ms, 10_000u32); - assert_eq!(s.core.network.rpc.max_route_hop_count, 7); + assert_eq!(s.core.network.rpc.max_route_hop_count, 4); assert_eq!(s.core.network.rpc.default_route_hop_count, 2); // assert_eq!(s.core.network.dht.resolve_node_timeout_ms, None);