diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 9ec0a8cf..9099f038 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -937,6 +937,7 @@ impl NetworkManager { // should not be subject to our ability to decode it // Send receipt directly + log_net!(debug "send_out_of_band_receipt: dial_info={}", dial_info); network_result_value_or_log!(debug self .net() .send_data_unbound_to_dial_info(dial_info, rcpt_data) @@ -1652,7 +1653,7 @@ impl NetworkManager { // Determine if a local IP address has changed // this means we should restart the low level network and and recreate all of our dial info // Wait until we have received confirmation from N different peers - pub async fn report_local_socket_address( + pub fn report_local_socket_address( &self, _socket_address: SocketAddress, _connection_descriptor: ConnectionDescriptor, @@ -1664,7 +1665,7 @@ impl NetworkManager { // Determine if a global IP address has changed // this means we should recreate our public dial info if it is not static and rediscover it // Wait until we have received confirmation from N different peers - pub async fn report_global_socket_address( + pub fn report_global_socket_address( &self, socket_address: SocketAddress, // the socket address as seen by the remote peer connection_descriptor: ConnectionDescriptor, // the connection descriptor used @@ -1673,39 +1674,46 @@ impl NetworkManager { // debug code //info!("report_global_socket_address\nsocket_address: {:#?}\nconnection_descriptor: {:#?}\nreporting_peer: {:#?}", socket_address, connection_descriptor, reporting_peer); + // Ignore these reports if we are currently detecting public dial info + let inner = &mut *self.inner.lock(); + let net = inner.components.as_ref().unwrap().net.clone(); + if net.doing_public_dial_info_check() { + return; + } + let routing_table = inner.routing_table.as_ref().unwrap().clone(); + let c = self.config.get(); + let detect_address_changes = c.network.detect_address_changes; + + // Get the ip(block) this report is coming from + let ip6_prefix_size = c.network.max_connections_per_ip6_prefix_size as usize; + let ipblock = ip_to_ipblock( + ip6_prefix_size, + connection_descriptor.remote_address().to_ip_addr(), + ); + + // Store the reported address if it isn't denylisted let key = PublicAddressCheckCacheKey( connection_descriptor.protocol_type(), connection_descriptor.address_type(), ); - - let (net, routing_table, detect_address_changes) = { - let mut inner = self.inner.lock(); - let c = self.config.get(); - - // Get the ip(block) this report is coming from - let ip6_prefix_size = c.network.max_connections_per_ip6_prefix_size as usize; - let ipblock = ip_to_ipblock( - ip6_prefix_size, - connection_descriptor.remote_address().to_ip_addr(), - ); - - // Store the reported address - let pacc = inner - .public_address_check_cache - .entry(key) - .or_insert_with(|| LruCache::new(PUBLIC_ADDRESS_CHECK_CACHE_SIZE)); - pacc.insert(ipblock, socket_address); - - let net = inner.components.as_ref().unwrap().net.clone(); - let routing_table = inner.routing_table.as_ref().unwrap().clone(); - (net, routing_table, c.network.detect_address_changes) - }; - let network_class = net.get_network_class().unwrap_or(NetworkClass::Invalid); + let pacc = inner + .public_address_check_cache + .entry(key) + .or_insert_with(|| LruCache::new(PUBLIC_ADDRESS_CHECK_CACHE_SIZE)); + let pait = inner + .public_address_inconsistencies_table + .entry(key) + .or_insert_with(|| HashMap::new()); + if pait.contains_key(&ipblock) { + return; + } + pacc.insert(ipblock, socket_address); // Determine if our external address has likely changed let mut bad_public_address_detection_punishment: Option< Box, > = None; + let network_class = net.get_network_class().unwrap_or(NetworkClass::Invalid); let needs_public_address_detection = if matches!(network_class, NetworkClass::InboundCapable) { // Get the dial info filter for this connection so we can check if we have any public dialinfo that may have changed @@ -1723,17 +1731,9 @@ impl NetworkManager { // If we are inbound capable, but start to see inconsistent socket addresses from multiple reporting peers // then we zap the network class and re-detect it - let inner = &mut *self.inner.lock(); let mut inconsistencies = Vec::new(); + // Iteration goes from most recent to least recent node/address pair - let pacc = inner - .public_address_check_cache - .entry(key) - .or_insert_with(|| LruCache::new(PUBLIC_ADDRESS_CHECK_CACHE_SIZE)); - let pait = inner - .public_address_inconsistencies_table - .entry(key) - .or_insert_with(|| HashMap::new()); for (reporting_ip_block, a) in pacc { // If this address is not one of our current addresses (inconsistent) // and we haven't already denylisted the reporting source, @@ -1785,7 +1785,6 @@ impl NetworkManager { // but if we are starting to see consistent socket address from multiple reporting peers // then we may be become inbound capable, so zap the network class so we can re-detect it and any public dial info - let mut inner = self.inner.lock(); let mut consistencies = 0; let mut consistent = false; let mut current_address = Option::::None; @@ -1816,13 +1815,11 @@ impl NetworkManager { // Reset the address check cache now so we can start detecting fresh info!("Public address has changed, detecting public dial info"); - let mut inner = self.inner.lock(); inner.public_address_check_cache.clear(); // Re-detect the public dialinfo net.set_needs_public_dial_info_check(bad_public_address_detection_punishment); } else { - let inner = self.inner.lock(); warn!("Public address may have changed. Restarting the server may be required."); warn!("report_global_socket_address\nsocket_address: {:#?}\nconnection_descriptor: {:#?}\nreporting_peer: {:#?}", socket_address, connection_descriptor, reporting_peer); warn!( diff --git a/veilid-core/src/network_manager/native/igd_manager.rs b/veilid-core/src/network_manager/native/igd_manager.rs index f6658079..fe06c70f 100644 --- a/veilid-core/src/network_manager/native/igd_manager.rs +++ b/veilid-core/src/network_manager/native/igd_manager.rs @@ -6,6 +6,7 @@ use std::net::UdpSocket; const UPNP_GATEWAY_DETECT_TIMEOUT_MS: u32 = 5_000; const UPNP_MAPPING_LIFETIME_MS: u32 = 120_000; const UPNP_MAPPING_ATTEMPTS: u32 = 3; +const UPNP_MAPPING_LIFETIME_US:u64 = (UPNP_MAPPING_LIFETIME_MS as u64) * 1000u64; #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] struct PortMapKey { @@ -299,18 +300,17 @@ impl IGDManager { // If an error is received, then return false to restart the local network let mut full_renews: Vec<(PortMapKey, PortMapValue)> = Vec::new(); let mut renews: Vec<(PortMapKey, PortMapValue)> = Vec::new(); - let now = intf::get_timestamp(); - const UPNP_MAPPING_LIFETIME_US:u64 = (UPNP_MAPPING_LIFETIME_MS as u64) * 1000u64; - { let inner = self.inner.lock(); + let now = intf::get_timestamp(); for (k, v) in &inner.port_maps { - if (now - v.timestamp) >= UPNP_MAPPING_LIFETIME_US || v.renewal_attempts >= UPNP_MAPPING_ATTEMPTS { + let mapping_lifetime = now.saturating_sub(v.timestamp); + if mapping_lifetime >= UPNP_MAPPING_LIFETIME_US || v.renewal_attempts >= UPNP_MAPPING_ATTEMPTS { // Past expiration time or tried N times, do a full renew and fail out if we can't full_renews.push((*k, *v)); } - else if (now - v.timestamp) >= v.renewal_lifetime { + else if mapping_lifetime >= v.renewal_lifetime { // Attempt a normal renewal renews.push((*k, *v)); } diff --git a/veilid-core/src/network_manager/native/network_class_discovery.rs b/veilid-core/src/network_manager/native/network_class_discovery.rs index 0d715080..5e18ffa8 100644 --- a/veilid-core/src/network_manager/native/network_class_discovery.rs +++ b/veilid-core/src/network_manager/native/network_class_discovery.rs @@ -1,5 +1,4 @@ use super::*; -//use futures_util::stream::FuturesOrdered; use futures_util::stream::FuturesUnordered; use futures_util::FutureExt; use stop_token::future::FutureExt as StopTokenFutureExt; @@ -612,19 +611,24 @@ impl Network { _t: u64, ) -> EyreResult<()> { // Figure out if we can optimize TCP/WS checking since they are often on the same port - let protocol_config = self.inner.lock().protocol_config.unwrap_or_default(); - let tcp_same_port = if protocol_config.inbound.contains(ProtocolType::TCP) - && protocol_config.inbound.contains(ProtocolType::WS) - { + let (protocol_config, existing_network_class, tcp_same_port) = { let inner = self.inner.lock(); - inner.tcp_port == inner.ws_port - } else { - false + let protocol_config = inner.protocol_config.unwrap_or_default(); + let existing_network_class = inner.network_class; + let tcp_same_port = if protocol_config.inbound.contains(ProtocolType::TCP) + && protocol_config.inbound.contains(ProtocolType::WS) + { + inner.tcp_port == inner.ws_port + } else { + false + }; + (protocol_config, existing_network_class, tcp_same_port) }; + let routing_table = self.routing_table(); + let network_manager = self.network_manager(); + // Process all protocol and address combinations let mut futures = FuturesUnordered::new(); - //let mut futures = FuturesOrdered::new(); - // Do UDPv4+v6 at the same time as everything else if protocol_config.inbound.contains(ProtocolType::UDP) { // UDPv4 @@ -750,19 +754,19 @@ impl Network { // Wait for all discovery futures to complete and collect contexts let mut contexts = Vec::::new(); - let mut network_class = Option::::None; + let mut new_network_class = Option::::None; loop { match futures.next().timeout_at(stop_token.clone()).await { Ok(Some(ctxvec)) => { if let Some(ctxvec) = ctxvec { for ctx in ctxvec { if let Some(nc) = ctx.inner.lock().detected_network_class { - if let Some(last_nc) = network_class { + if let Some(last_nc) = new_network_class { if nc < last_nc { - network_class = Some(nc); + new_network_class = Some(nc); } } else { - network_class = Some(nc); + new_network_class = Some(nc); } } @@ -781,47 +785,89 @@ impl Network { } } - // Get best network class - if network_class.is_some() { - // Update public dial info - let routing_table = self.routing_table(); - let network_manager = self.network_manager(); + // If a network class could be determined + // see about updating our public dial info + let mut changed = false; + if new_network_class.is_some() { + // Get existing public dial info + let existing_public_dial_info: HashSet = routing_table + .all_filtered_dial_info_details( + Some(RoutingDomain::PublicInternet), + &DialInfoFilter::all(), + ) + .into_iter() + .collect(); + // Get new public dial info and ensure it is valid + let mut new_public_dial_info: HashSet = HashSet::new(); for ctx in contexts { let inner = ctx.inner.lock(); if let Some(pdi) = &inner.detected_public_dial_info { - if let Err(e) = routing_table.register_dial_info( - RoutingDomain::PublicInternet, - pdi.dial_info.clone(), - pdi.class, - ) { - log_net!(warn "Failed to register detected public dial info: {}", e); + if routing_table + .ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &pdi.dial_info) + { + new_public_dial_info.insert(DialInfoDetail { + class: pdi.class, + dial_info: pdi.dial_info.clone(), + }); } // duplicate for same port if tcp_same_port && pdi.dial_info.protocol_type() == ProtocolType::TCP { let ws_dial_info = ctx.make_dial_info(pdi.dial_info.socket_address(), ProtocolType::WS); - if let Err(e) = routing_table.register_dial_info( - RoutingDomain::PublicInternet, - ws_dial_info, - pdi.class, - ) { - log_net!(warn "Failed to register detected public dial info: {}", e); + if routing_table + .ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &ws_dial_info) + { + new_public_dial_info.insert(DialInfoDetail { + class: pdi.class, + dial_info: ws_dial_info, + }); } } } } - // Update network class - self.inner.lock().network_class = network_class; - log_net!(debug "network class changed to {:?}", network_class); + // Is the public dial info different? + if existing_public_dial_info != new_public_dial_info { + // If so, clear existing public dial info and re-register the new public dial info + routing_table.clear_dial_info_details(RoutingDomain::PublicInternet); + for did in new_public_dial_info { + if let Err(e) = routing_table.register_dial_info( + RoutingDomain::PublicInternet, + did.dial_info, + did.class, + ) { + log_net!(error "Failed to register detected public dial info: {}", e); + } + } + changed = true; + } + + // Is the network class different? + if existing_network_class != new_network_class { + self.inner.lock().network_class = new_network_class; + changed = true; + log_net!(debug "network class changed to {:?}", new_network_class); + } + } else if existing_network_class.is_some() { + // Network class could not be determined + routing_table.clear_dial_info_details(RoutingDomain::PublicInternet); + self.inner.lock().network_class = None; + changed = true; + log_net!(debug "network class cleared"); + } + + // Punish nodes that told us our public address had changed when it didn't + if !changed { + if let Some(punish) = self.inner.lock().public_dial_info_check_punishment.take() { + punish(); + } + } else { // Send updates to everyone network_manager.send_node_info_updates(true).await; } - if !changed {} - Ok(()) } #[instrument(level = "trace", skip(self), err)] @@ -832,10 +878,11 @@ impl Network { t: u64, ) -> EyreResult<()> { // Note that we are doing the public dial info check + // We don't have to check this for concurrency, since this routine is run in a TickTask/SingleFuture self.inner.lock().doing_public_dial_info_check = true; // Do the public dial info check - let out = self.do_public_dial_info_check(stop_token, l, t); + let out = self.do_public_dial_info_check(stop_token, l, t).await; // Done with public dial info check self.inner.lock().doing_public_dial_info_check = false; diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index a30a5a69..f79ff5fd 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -234,16 +234,7 @@ impl RoutingTable { ret } - pub fn register_dial_info( - &self, - domain: RoutingDomain, - dial_info: DialInfo, - class: DialInfoClass, - ) -> EyreResult<()> { - log_rtab!(debug - "Registering dial_info with:\n domain: {:?}\n dial_info: {:?}\n class: {:?}", - domain, dial_info, class - ); + pub fn ensure_dial_info_is_valid(&self, domain: RoutingDomain, dial_info: &DialInfo) -> bool { let enable_local_peer_scope = { let config = self.network_manager().config(); let c = config.get(); @@ -254,13 +245,28 @@ impl RoutingTable { && matches!(domain, RoutingDomain::PublicInternet) && dial_info.is_local() { - bail!("shouldn't be registering local addresses as public"); + log_rtab!(debug "shouldn't be registering local addresses as public"); + return false; } if !dial_info.is_valid() { - bail!( + log_rtab!(debug "shouldn't be registering invalid addresses: {:?}", dial_info ); + return false; + } + true + } + + #[instrument(level = "debug", skip(self), err)] + pub fn register_dial_info( + &self, + domain: RoutingDomain, + dial_info: DialInfo, + class: DialInfoClass, + ) -> EyreResult<()> { + if !self.ensure_dial_info_is_valid(domain, &dial_info) { + return Err(eyre!("dial info is not valid")); } let mut inner = self.inner.write(); diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index d4041f3b..9eb85c84 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -47,24 +47,16 @@ impl RPCProcessor { match send_data_kind { SendDataKind::Direct(connection_descriptor) => { match connection_descriptor.peer_scope() { - PeerScope::Global => { - self.network_manager() - .report_global_socket_address( - socket_address, - connection_descriptor, - peer, - ) - .await; - } - PeerScope::Local => { - self.network_manager() - .report_local_socket_address( - socket_address, - connection_descriptor, - peer, - ) - .await; - } + PeerScope::Global => self.network_manager().report_global_socket_address( + socket_address, + connection_descriptor, + peer, + ), + PeerScope::Local => self.network_manager().report_local_socket_address( + socket_address, + connection_descriptor, + peer, + ), } } SendDataKind::Indirect => { diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index afee0bbc..12c01697 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -323,7 +323,7 @@ pub struct SenderInfo { } // Keep member order appropriate for sorting < preference -#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)] pub enum DialInfoClass { Direct = 0, // D = Directly reachable with public IP and no firewall, with statically configured port Mapped = 1, // M = Directly reachable with via portmap behind any NAT or firewalled with dynamically negotiated port @@ -357,7 +357,7 @@ impl DialInfoClass { } // Keep member order appropriate for sorting < preference -#[derive(Debug, Clone, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize, Hash)] pub struct DialInfoDetail { pub class: DialInfoClass, pub dial_info: DialInfo, @@ -369,7 +369,7 @@ impl MatchesDialInfoFilter for DialInfoDetail { } } -#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)] pub enum NetworkClass { InboundCapable = 0, // I = Inbound capable without relay, may require signal OutboundOnly = 1, // O = Outbound only, inbound relay required except with reverse connect signal @@ -856,29 +856,29 @@ pub trait MatchesDialInfoFilter { fn matches_filter(&self, filter: &DialInfoFilter) -> bool; } -#[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)] +#[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize, Hash)] pub struct DialInfoUDP { pub socket_address: SocketAddress, } -#[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)] +#[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize, Hash)] pub struct DialInfoTCP { pub socket_address: SocketAddress, } -#[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)] +#[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize, Hash)] pub struct DialInfoWS { pub socket_address: SocketAddress, pub request: String, } -#[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)] +#[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize, Hash)] pub struct DialInfoWSS { pub socket_address: SocketAddress, pub request: String, } -#[derive(Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize, Hash)] #[serde(tag = "kind")] // Keep member order appropriate for sorting < preference // Must match ProtocolType order