diff --git a/veilid-core/src/network_manager/connection_limits.rs b/veilid-core/src/network_manager/connection_limits.rs index 2102bfe5..9c2121c0 100644 --- a/veilid-core/src/network_manager/connection_limits.rs +++ b/veilid-core/src/network_manager/connection_limits.rs @@ -41,30 +41,6 @@ impl ConnectionLimits { } } - // Converts an ip to a ip block by applying a netmask - // to the host part of the ip address - // ipv4 addresses are treated as single hosts - // ipv6 addresses are treated as prefix allocated blocks - fn ip_to_ipblock(&self, addr: IpAddr) -> IpAddr { - match addr { - IpAddr::V4(_) => addr, - IpAddr::V6(v6) => { - let mut hostlen = 128usize.saturating_sub(self.max_connections_per_ip6_prefix_size); - let mut out = v6.octets(); - for i in (0..16).rev() { - if hostlen >= 8 { - out[i] = 0xFF; - hostlen -= 8; - } else { - out[i] |= !(0xFFu8 << hostlen); - break; - } - } - IpAddr::V6(Ipv6Addr::from(out)) - } - } - } - fn purge_old_timestamps(&mut self, cur_ts: u64) { // v4 { @@ -101,7 +77,7 @@ impl ConnectionLimits { } pub fn add(&mut self, addr: IpAddr) -> Result<(), AddressFilterError> { - let ipblock = self.ip_to_ipblock(addr); + let ipblock = ip_to_ipblock(self.max_connections_per_ip6_prefix_size, addr); let ts = intf::get_timestamp(); self.purge_old_timestamps(ts); @@ -156,7 +132,7 @@ impl ConnectionLimits { } pub fn remove(&mut self, addr: IpAddr) -> Result<(), AddressNotInTableError> { - let ipblock = self.ip_to_ipblock(addr); + let ipblock = ip_to_ipblock(self.max_connections_per_ip6_prefix_size, addr); let ts = intf::get_timestamp(); self.purge_old_timestamps(ts); diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 65677e28..54baadab 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -41,9 +41,11 @@ pub const RELAY_MANAGEMENT_INTERVAL_SECS: u32 = 1; pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE; pub const IPADDR_TABLE_SIZE: usize = 1024; pub const IPADDR_MAX_INACTIVE_DURATION_US: u64 = 300_000_000u64; // 5 minutes -pub const GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT: usize = 3; +pub const PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT: usize = 3; +pub const PUBLIC_ADDRESS_CHECK_CACHE_SIZE: usize = 8; +pub const PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS: u32 = 60; +pub const PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US: u64 = 300_000_000u64; // 5 minutes pub const BOOT_MAGIC: &[u8; 4] = b"BOOT"; - pub const BOOTSTRAP_TXT_VERSION: u8 = 0; #[derive(Clone, Debug)] @@ -110,15 +112,21 @@ struct ClientWhitelistEntry { last_seen_ts: u64, } -// Mechanism required to contact another node +/// Mechanism required to contact another node #[derive(Clone, Debug)] pub(crate) enum ContactMethod { - Unreachable, // Node is not reachable by any means - Direct(DialInfo), // Contact the node directly - SignalReverse(NodeRef, NodeRef), // Request via signal the node connect back directly - SignalHolePunch(NodeRef, NodeRef), // Request via signal the node negotiate a hole punch - InboundRelay(NodeRef), // Must use an inbound relay to reach the node - OutboundRelay(NodeRef), // Must use outbound relay to reach the node + /// Node is not reachable by any means + Unreachable, + /// Contact the node directly + Direct(DialInfo), + /// Request via signal the node connect back directly (relay_nr, target_node_ref) + SignalReverse(NodeRef, NodeRef), + /// Request via signal the node negotiate a hole punch (relay_nr, target_node_ref) + SignalHolePunch(NodeRef, NodeRef), + /// Must use an inbound relay to reach the node + InboundRelay(NodeRef), + /// Must use outbound relay to reach the node + OutboundRelay(NodeRef), } #[derive(Copy, Clone, Debug)] @@ -140,7 +148,9 @@ struct NetworkManagerInner { client_whitelist: LruCache, relay_node: Option, public_address_check_cache: - BTreeMap>, + BTreeMap>, + public_address_inconsistencies_table: + BTreeMap>, protocol_config: Option, public_inbound_dial_info_filter: Option, local_inbound_dial_info_filter: Option, @@ -155,6 +165,7 @@ struct NetworkManagerUnlockedInner { bootstrap_task: TickTask, peer_minimum_refresh_task: TickTask, ping_validator_task: TickTask, + public_address_check_task: TickTask, node_info_update_single_future: MustJoinSingleFuture<()>, } @@ -177,6 +188,7 @@ impl NetworkManager { client_whitelist: LruCache::new_unbounded(), relay_node: None, public_address_check_cache: BTreeMap::new(), + public_address_inconsistencies_table: BTreeMap::new(), protocol_config: None, public_inbound_dial_info_filter: None, local_inbound_dial_info_filter: None, @@ -192,6 +204,7 @@ impl NetworkManager { bootstrap_task: TickTask::new(1), peer_minimum_refresh_task: TickTask::new_ms(c.network.dht.min_peer_refresh_time_ms), ping_validator_task: TickTask::new(1), + public_address_check_task: TickTask::new(PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS), node_info_update_single_future: MustJoinSingleFuture::new(), } } @@ -247,6 +260,15 @@ impl NetworkManager { Box::pin(this2.clone().ping_validator_task_routine(s, l, t)) }); } + // Set public address check task + { + let this2 = this.clone(); + this.unlocked_inner + .public_address_check_task + .set_routine(move |s, l, t| { + Box::pin(this2.clone().public_address_check_task_routine(s, l, t)) + }); + } this } pub fn config(&self) -> VeilidConfig { @@ -755,6 +777,7 @@ impl NetworkManager { let peer_nr = match routing_table.register_node_with_signed_node_info( peer_info.node_id.key, peer_info.signed_node_info, + false, ) { None => { return Ok(NetworkResult::invalid_message( @@ -777,6 +800,7 @@ impl NetworkManager { let mut peer_nr = match routing_table.register_node_with_signed_node_info( peer_info.node_id.key, peer_info.signed_node_info, + false, ) { None => { return Ok(NetworkResult::invalid_message( @@ -1653,23 +1677,27 @@ impl NetworkManager { connection_descriptor.address_type(), ); - let (net, routing_table) = { + let (net, routing_table, detect_address_changes) = { let mut inner = self.inner.lock(); + let c = self.config.get(); + + // Get the ip(block) this report is coming from + let ip6_prefix_size = c.network.max_connections_per_ip6_prefix_size as usize; + let ipblock = ip_to_ipblock( + ip6_prefix_size, + connection_descriptor.remote_address().to_ip_addr(), + ); // Store the reported address let pacc = inner .public_address_check_cache .entry(key) - .or_insert_with(|| LruCache::new(8)); - pacc.insert(reporting_peer.node_id(), socket_address); + .or_insert_with(|| LruCache::new(PUBLIC_ADDRESS_CHECK_CACHE_SIZE)); + pacc.insert(ipblock, socket_address); let net = inner.components.as_ref().unwrap().net.clone(); let routing_table = inner.routing_table.as_ref().unwrap().clone(); - (net, routing_table) - }; - let detect_address_changes = { - let c = self.config.get(); - c.network.detect_address_changes + (net, routing_table, c.network.detect_address_changes) }; let network_class = net.get_network_class().unwrap_or(NetworkClass::Invalid); @@ -1691,31 +1719,47 @@ impl NetworkManager { // If we are inbound capable, but start to see inconsistent socket addresses from multiple reporting peers // then we zap the network class and re-detect it - let mut inner = self.inner.lock(); - let mut inconsistencies = 0; - let mut changed = false; + let inner = &mut *self.inner.lock(); + let mut inconsistencies = Vec::new(); + let mut inconsistent = false; // Iteration goes from most recent to least recent node/address pair let pacc = inner .public_address_check_cache .entry(key) - .or_insert_with(|| LruCache::new(8)); - for (_, a) in pacc { - if !current_addresses.contains(a) { - inconsistencies += 1; - if inconsistencies >= GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT { - changed = true; + .or_insert_with(|| LruCache::new(PUBLIC_ADDRESS_CHECK_CACHE_SIZE)); + let pait = inner + .public_address_inconsistencies_table + .entry(key) + .or_insert_with(|| HashMap::new()); + for (reporting_ip_block, a) in pacc { + // If this address is not one of our current addresses (inconsistent) + // and we haven't already denylisted the reporting source, + if !current_addresses.contains(a) && !pait.contains_key(reporting_ip_block) { + // Record the origin of the inconsistency + inconsistencies.push(*reporting_ip_block); + + // If we have enough inconsistencies to consider changing our public dial info, + // add them to our denylist (throttling) and go ahead and check for new + // public dialinfo + if inconsistencies.len() >= PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT { + let exp_ts = + intf::get_timestamp() + PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US; + for i in inconsistencies { + pait.insert(i, exp_ts); + } + + inconsistent = true; break; } } } - // // debug code // if changed { // trace!("public_address_check_cache: {:#?}\ncurrent_addresses: {:#?}\ninconsistencies: {}", inner // .public_address_check_cache, current_addresses, inconsistencies); // } - changed + inconsistent } else { // If we are currently outbound only, we don't have any public dial info // but if we are starting to see consistent socket address from multiple reporting peers @@ -1729,13 +1773,13 @@ impl NetworkManager { let pacc = inner .public_address_check_cache .entry(key) - .or_insert_with(|| LruCache::new(8)); + .or_insert_with(|| LruCache::new(PUBLIC_ADDRESS_CHECK_CACHE_SIZE)); for (_, a) in pacc { if let Some(current_address) = current_address { if current_address == *a { consistencies += 1; - if consistencies >= GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT { + if consistencies >= PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT { consistent = true; break; } @@ -1811,7 +1855,7 @@ impl NetworkManager { return; } - // Mark the node as updated + // Mark the node as having seen our node info nr.set_seen_our_node_info(); }); } diff --git a/veilid-core/src/network_manager/native/igd_manager.rs b/veilid-core/src/network_manager/native/igd_manager.rs index 6c4f4a7f..f6658079 100644 --- a/veilid-core/src/network_manager/native/igd_manager.rs +++ b/veilid-core/src/network_manager/native/igd_manager.rs @@ -188,7 +188,7 @@ impl IGDManager { } } let pmk = found?; - let pmv = inner.port_maps.remove(&pmk).unwrap(); + let _pmv = inner.port_maps.remove(&pmk).expect("key found but remove failed"); // Find gateway let gw = Self::find_gateway(&mut *inner, at)?; diff --git a/veilid-core/src/network_manager/native/network_class_discovery.rs b/veilid-core/src/network_manager/native/network_class_discovery.rs index 89b85829..afc2b3dc 100644 --- a/veilid-core/src/network_manager/native/network_class_discovery.rs +++ b/veilid-core/src/network_manager/native/network_class_discovery.rs @@ -213,7 +213,7 @@ impl DiscoveryContext { #[instrument(level = "trace", skip(self), ret)] async fn try_port_mapping(&self) -> Option { - let (enable_upnp, enable_natpmp) = { + let (enable_upnp, _enable_natpmp) = { let c = self.net.config.get(); (c.network.upnp, c.network.natpmp) }; diff --git a/veilid-core/src/network_manager/native/network_tcp.rs b/veilid-core/src/network_manager/native/network_tcp.rs index de0a3d36..ca2e3792 100644 --- a/veilid-core/src/network_manager/native/network_tcp.rs +++ b/veilid-core/src/network_manager/native/network_tcp.rs @@ -113,7 +113,7 @@ impl Network { let addr = match tcp_stream.peer_addr() { Ok(addr) => addr, Err(e) => { - log_net!(error "failed to get peer address: {}", e); + log_net!(debug "failed to get peer address: {}", e); return; } }; @@ -139,7 +139,7 @@ impl Network { { // If we fail to get a packet within the connection initial timeout // then we punt this connection - log_net!(warn "connection initial timeout from: {:?}", addr); + log_net!("connection initial timeout from: {:?}", addr); return; } @@ -169,12 +169,12 @@ impl Network { } Ok(None) => { // No protocol handlers matched? drop it. - log_net!(warn "no protocol handler for connection from {:?}", addr); + log_net!(debug "no protocol handler for connection from {:?}", addr); return; } Err(e) => { // Failed to negotiate connection? drop it. - log_net!(warn "failed to negotiate connection from {:?}: {}", addr, e); + log_net!(debug "failed to negotiate connection from {:?}: {}", addr, e); return; } }; diff --git a/veilid-core/src/network_manager/native/protocol/tcp.rs b/veilid-core/src/network_manager/native/protocol/tcp.rs index 1237044d..1ea67296 100644 --- a/veilid-core/src/network_manager/native/protocol/tcp.rs +++ b/veilid-core/src/network_manager/native/protocol/tcp.rs @@ -69,11 +69,15 @@ impl RawTcpNetworkConnection { network_result_try!(stream.read_exact(&mut header).await.into_network_result()?); if header[0] != b'V' || header[1] != b'L' { - bail_io_error_other!("received invalid TCP frame header"); + return Ok(NetworkResult::invalid_message( + "received invalid TCP frame header", + )); } let len = ((header[3] as usize) << 8) | (header[2] as usize); if len > MAX_MESSAGE_SIZE { - bail_io_error_other!("received too large TCP frame"); + return Ok(NetworkResult::invalid_message( + "received too large TCP frame", + )); } let mut out: Vec = vec![0u8; len]; diff --git a/veilid-core/src/network_manager/tasks.rs b/veilid-core/src/network_manager/tasks.rs index 0b8748c5..1a8ae96d 100644 --- a/veilid-core/src/network_manager/tasks.rs +++ b/veilid-core/src/network_manager/tasks.rs @@ -188,7 +188,7 @@ impl NetworkManager { let k = pi.node_id.key; // Register the node if let Some(nr) = - routing_table.register_node_with_signed_node_info(k, pi.signed_node_info) + routing_table.register_node_with_signed_node_info(k, pi.signed_node_info, false) { // Add this our futures to process in parallel let routing_table = routing_table.clone(); @@ -288,6 +288,7 @@ impl NetworkManager { dial_info_detail_list: v.dial_info_details, // Dial info is as specified in the bootstrap list relay_peer_info: None, // Bootstraps never require a relay themselves }), + true, ) { // Add this our futures to process in parallel let routing_table = routing_table.clone(); @@ -458,6 +459,7 @@ impl NetworkManager { if let Some(nr) = routing_table.register_node_with_signed_node_info( outbound_relay_peerinfo.node_id.key, outbound_relay_peerinfo.signed_node_info, + false, ) { info!("Outbound relay node selected: {}", nr); inner.relay_node = Some(nr); @@ -531,4 +533,28 @@ impl NetworkManager { Ok(()) } + + // Clean up the public address check tables, removing entries that have timed out + #[instrument(level = "trace", skip(self), err)] + pub(super) async fn public_address_check_task_routine( + self, + stop_token: StopToken, + _last_ts: u64, + cur_ts: u64, + ) -> EyreResult<()> { + // go through public_address_inconsistencies_table and time out things that have expired + let mut inner = self.inner.lock(); + for (_, pait_v) in &mut inner.public_address_inconsistencies_table { + let mut expired = Vec::new(); + for (addr, exp_ts) in pait_v.iter() { + if *exp_ts <= cur_ts { + expired.push(*addr); + } + } + for exp in expired { + pait_v.remove(&exp); + } + } + Ok(()) + } } diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 2b50257a..97960ea5 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -1,33 +1,36 @@ use super::*; use core::sync::atomic::{AtomicU32, Ordering}; -// Reliable pings are done with increased spacing between pings -// - Start secs is the number of seconds between the first two pings -// - Max secs is the maximum number of seconds between consecutive pings -// - Multiplier changes the number of seconds between pings over time -// making it longer as the node becomes more reliable +/// Reliable pings are done with increased spacing between pings + +/// - Start secs is the number of seconds between the first two pings const RELIABLE_PING_INTERVAL_START_SECS: u32 = 10; +/// - Max secs is the maximum number of seconds between consecutive pings const RELIABLE_PING_INTERVAL_MAX_SECS: u32 = 10 * 60; +/// - Multiplier changes the number of seconds between pings over time +/// making it longer as the node becomes more reliable const RELIABLE_PING_INTERVAL_MULTIPLIER: f64 = 2.0; -// Unreliable pings are done for a fixed amount of time while the -// node is given a chance to come back online before it is made dead -// If a node misses a single ping, it is marked unreliable and must -// return reliable pings for the duration of the span before being -// marked reliable again -// - Span is the number of seconds total to attempt to validate the node -// - Interval is the number of seconds between each ping +/// Unreliable pings are done for a fixed amount of time while the +/// node is given a chance to come back online before it is made dead +/// If a node misses a single ping, it is marked unreliable and must +/// return reliable pings for the duration of the span before being +/// marked reliable again +/// +/// - Span is the number of seconds total to attempt to validate the node const UNRELIABLE_PING_SPAN_SECS: u32 = 60; +/// - Interval is the number of seconds between each ping const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5; -// Keepalive pings are done occasionally to ensure holepunched public dialinfo -// remains valid, as well as to make sure we remain in any relay node's routing table +/// Keepalive pings are done occasionally to ensure holepunched public dialinfo +/// remains valid, as well as to make sure we remain in any relay node's routing table const KEEPALIVE_PING_INTERVAL_SECS: u32 = 10; -// How many times do we try to ping a never-reached node before we call it dead +/// How many times do we try to ping a never-reached node before we call it dead const NEVER_REACHED_PING_COUNT: u32 = 3; // Do not change order here, it will mess up other sorts + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum BucketEntryState { Dead, @@ -42,6 +45,7 @@ struct LastConnectionKey(PeerScope, ProtocolType, AddressType); pub struct BucketEntryInner { min_max_version: Option<(u8, u8)>, seen_our_node_info: bool, + updated_since_last_network_change: bool, last_connections: BTreeMap, opt_signed_node_info: Option, opt_local_node_info: Option, @@ -112,22 +116,44 @@ impl BucketEntryInner { } // Retuns true if the node info changed - pub fn update_node_info(&mut self, signed_node_info: SignedNodeInfo) -> bool { - // Don't update with older node info, or something less valid + pub fn update_signed_node_info( + &mut self, + signed_node_info: SignedNodeInfo, + allow_invalid_signature: bool, + ) -> bool { + // Don't allow invalid signatures unless we are explicitly allowing it + if !allow_invalid_signature && !signed_node_info.signature.valid { + return false; + } + + // See if we have an existing signed_node_info to update or not if let Some(current_sni) = &self.opt_signed_node_info { - if current_sni.signature.valid && !signed_node_info.signature.valid { - return false; - } - if signed_node_info.timestamp < current_sni.timestamp { + // If the timestamp hasn't changed or is less, ignore this update + if signed_node_info.timestamp <= current_sni.timestamp { + // If we received a node update with the same timestamp + // we can try again, but only if our network hasn't changed + if !self.updated_since_last_network_change + && signed_node_info.timestamp == current_sni.timestamp + { + // No need to update the signednodeinfo though since the timestamp is the same + // Just return true so we can make the node not dead + self.updated_since_last_network_change = true; + return true; + } return false; } } + + // Update the protocol min/max version we have self.min_max_version = Some(( signed_node_info.node_info.min_version, signed_node_info.node_info.max_version, )); + + // Update the signed node info self.opt_signed_node_info = Some(signed_node_info); + self.updated_since_last_network_change = true; true } pub fn update_local_node_info(&mut self, local_node_info: LocalNodeInfo) { @@ -238,6 +264,14 @@ impl BucketEntryInner { self.seen_our_node_info } + pub fn set_updated_since_last_network_change(&mut self, updated: bool) { + self.updated_since_last_network_change = updated; + } + + pub fn has_updated_since_last_network_change(&self) -> bool { + self.updated_since_last_network_change + } + ///// stats methods // called every ROLLING_TRANSFERS_INTERVAL_SECS seconds pub(super) fn roll_transfers(&mut self, last_ts: u64, cur_ts: u64) { @@ -461,6 +495,7 @@ impl BucketEntry { inner: RwLock::new(BucketEntryInner { min_max_version: None, seen_our_node_info: false, + updated_since_last_network_change: false, last_connections: BTreeMap::new(), opt_signed_node_info: None, opt_local_node_info: None, diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index 54ce6c95..eed17590 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -533,9 +533,11 @@ impl RoutingTable { } // register the node if it's new - if let Some(nr) = - self.register_node_with_signed_node_info(p.node_id.key, p.signed_node_info.clone()) - { + if let Some(nr) = self.register_node_with_signed_node_info( + p.node_id.key, + p.signed_node_info.clone(), + false, + ) { out.push(nr); } } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index c9f977dd..a30a5a69 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -290,6 +290,7 @@ impl RoutingTable { // Public dial info changed, go through all nodes and reset their 'seen our node info' bit if matches!(domain, RoutingDomain::PublicInternet) { Self::reset_all_seen_our_node_info(&*inner); + Self::reset_all_updated_since_last_network_change(&*inner); } Ok(()) @@ -303,6 +304,14 @@ impl RoutingTable { }); } + fn reset_all_updated_since_last_network_change(inner: &RoutingTableInner) { + let cur_ts = intf::get_timestamp(); + Self::with_entries(&*inner, cur_ts, BucketEntryState::Dead, |_, v| { + v.with_mut(|e| e.set_updated_since_last_network_change(false)); + Option::<()>::None + }); + } + pub fn clear_dial_info_details(&self, domain: RoutingDomain) { trace!("clearing dial info domain: {:?}", domain); @@ -587,6 +596,7 @@ impl RoutingTable { &self, node_id: DHTKey, signed_node_info: SignedNodeInfo, + allow_invalid_signature: bool, ) -> Option { // validate signed node info is not something malicious if node_id == self.node_id() { @@ -601,7 +611,7 @@ impl RoutingTable { } self.create_node_ref(node_id, |e| { - if e.update_node_info(signed_node_info) { + if e.update_signed_node_info(signed_node_info, allow_invalid_signature) { // at least someone thought this node was live and its node info changed so lets try to contact it e.touch_last_seen(intf::get_timestamp()); } diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 34924680..f9b224ef 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -108,6 +108,12 @@ impl NodeRef { pub fn set_seen_our_node_info(&self) { self.operate_mut(|e| e.set_seen_our_node_info(true)); } + pub fn has_updated_since_last_network_change(&self) -> bool { + self.operate(|e| e.has_updated_since_last_network_change()) + } + pub fn set_updated_since_last_network_change(&self) { + self.operate_mut(|e| e.set_updated_since_last_network_change(true)); + } pub fn network_class(&self) -> Option { self.operate(|e| e.node_info().map(|n| n.network_class)) } @@ -139,7 +145,7 @@ impl NodeRef { // Register relay node and return noderef self.routing_table - .register_node_with_signed_node_info(t.node_id.key, t.signed_node_info) + .register_node_with_signed_node_info(t.node_id.key, t.signed_node_info, false) .map(|mut nr| { nr.set_filter(self.filter_ref().cloned()); nr diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 59461fd4..f849486f 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -253,8 +253,8 @@ impl RPCProcessor { ////////////////////////////////////////////////////////////////////// - // Search the DHT for a single node closest to a key and add it to the routing table and return the node reference - // If no node was found in the timeout, this returns None + /// Search the DHT for a single node closest to a key and add it to the routing table and return the node reference + /// If no node was found in the timeout, this returns None pub async fn search_dht_single_key( &self, _node_id: DHTKey, @@ -269,7 +269,7 @@ impl RPCProcessor { Err(RPCError::unimplemented("search_dht_single_key")).map_err(logthru_rpc!(error)) } - // Search the DHT for the 'count' closest nodes to a key, adding them all to the routing table if they are not there and returning their node references + /// Search the DHT for the 'count' closest nodes to a key, adding them all to the routing table if they are not there and returning their node references pub async fn search_dht_multi_key( &self, _node_id: DHTKey, @@ -281,8 +281,8 @@ impl RPCProcessor { Err(RPCError::unimplemented("search_dht_multi_key")).map_err(logthru_rpc!(error)) } - // Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference - // Note: This routine can possible be recursive, hence the SendPinBoxFuture async form + /// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference + /// Note: This routine can possible be recursive, hence the SendPinBoxFuture async form pub fn resolve_node( &self, node_id: DHTKey, @@ -393,7 +393,7 @@ impl RPCProcessor { .stats_question_lost(waitable_reply.node_ref.clone()); } Ok(TimeoutOr::Value((rpcreader, _))) => { - // Note that we definitely received this node info since we got a reply + // Note that the remote node definitely received this node info since we got a reply waitable_reply.node_ref.set_seen_our_node_info(); // Reply received @@ -410,9 +410,9 @@ impl RPCProcessor { out } - // Gets a 'RespondTo::Sender' that contains either our dial info, - // or None if the peer has seen our dial info before or our node info is not yet valid - // because of an unknown network class + /// Gets a 'RespondTo::Sender' that contains either our dial info, + /// or None if the peer has seen our dial info before or our node info is not yet valid + /// because of an unknown network class pub fn make_respond_to_sender(&self, peer: NodeRef) -> RespondTo { if peer.has_seen_our_node_info() || matches!( @@ -429,9 +429,9 @@ impl RPCProcessor { } } - // Produce a byte buffer that represents the wire encoding of the entire - // unencrypted envelope body for a RPC message. This incorporates - // wrapping a private and/or safety route if they are specified. + /// Produce a byte buffer that represents the wire encoding of the entire + /// unencrypted envelope body for a RPC message. This incorporates + /// wrapping a private and/or safety route if they are specified. #[instrument(level = "debug", skip(self, operation, safety_route_spec), err)] fn render_operation( &self, @@ -800,9 +800,11 @@ impl RPCProcessor { "respond_to_sender_signed_node_info has invalid peer scope", )); } - opt_sender_nr = self - .routing_table() - .register_node_with_signed_node_info(sender_node_id, sender_ni.clone()); + opt_sender_nr = self.routing_table().register_node_with_signed_node_info( + sender_node_id, + sender_ni.clone(), + false, + ); } _ => {} } diff --git a/veilid-core/src/rpc_processor/rpc_node_info_update.rs b/veilid-core/src/rpc_processor/rpc_node_info_update.rs index 7ef1c6d9..73cc6fa4 100644 --- a/veilid-core/src/rpc_processor/rpc_node_info_update.rs +++ b/veilid-core/src/rpc_processor/rpc_node_info_update.rs @@ -40,8 +40,11 @@ impl RPCProcessor { return Ok(()); } - self.routing_table() - .register_node_with_signed_node_info(sender_node_id, node_info_update.signed_node_info); + self.routing_table().register_node_with_signed_node_info( + sender_node_id, + node_info_update.signed_node_info, + false, + ); Ok(()) } diff --git a/veilid-core/src/xx/ip_extra.rs b/veilid-core/src/xx/ip_extra.rs index 5cbb2af5..2623a27c 100644 --- a/veilid-core/src/xx/ip_extra.rs +++ b/veilid-core/src/xx/ip_extra.rs @@ -191,3 +191,27 @@ pub fn ipv6addr_multicast_scope(addr: &Ipv6Addr) -> Option { pub fn ipv6addr_is_multicast(addr: &Ipv6Addr) -> bool { (addr.segments()[0] & 0xff00) == 0xff00 } + +// Converts an ip to a ip block by applying a netmask +// to the host part of the ip address +// ipv4 addresses are treated as single hosts +// ipv6 addresses are treated as prefix allocated blocks +pub fn ip_to_ipblock(ip6_prefix_size: usize, addr: IpAddr) -> IpAddr { + match addr { + IpAddr::V4(_) => addr, + IpAddr::V6(v6) => { + let mut hostlen = 128usize.saturating_sub(ip6_prefix_size); + let mut out = v6.octets(); + for i in (0..16).rev() { + if hostlen >= 8 { + out[i] = 0xFF; + hostlen -= 8; + } else { + out[i] |= !(0xFFu8 << hostlen); + break; + } + } + IpAddr::V6(Ipv6Addr::from(out)) + } + } +}