diff --git a/veilid-core/src/intf/native/network/mod.rs b/veilid-core/src/intf/native/network/mod.rs index eccac9e4..ac7c2af3 100644 --- a/veilid-core/src/intf/native/network/mod.rs +++ b/veilid-core/src/intf/native/network/mod.rs @@ -1,7 +1,7 @@ +mod network_class_discovery; mod network_tcp; mod network_udp; mod protocol; -mod public_dialinfo_discovery; mod start_protocols; use crate::connection_manager::*; @@ -41,9 +41,7 @@ struct NetworkInner { network_started: bool, network_needs_restart: bool, protocol_config: Option, - udp_static_public_dialinfo: bool, - tcp_static_public_dialinfo: bool, - ws_static_public_dialinfo: bool, + static_public_dialinfo: ProtocolSet, network_class: Option, join_handles: Vec>, udp_port: u16, @@ -64,7 +62,7 @@ struct NetworkInner { struct NetworkUnlockedInner { // Background processes - update_public_dialinfo_task: TickTask, + update_network_class_task: TickTask, } #[derive(Clone)] @@ -82,9 +80,7 @@ impl Network { network_started: false, network_needs_restart: false, protocol_config: None, - udp_static_public_dialinfo: false, - tcp_static_public_dialinfo: false, - ws_static_public_dialinfo: false, + static_public_dialinfo: ProtocolSet::empty(), network_class: None, join_handles: Vec::new(), udp_port: 0u16, @@ -104,7 +100,7 @@ impl Network { fn new_unlocked_inner() -> NetworkUnlockedInner { NetworkUnlockedInner { - update_public_dialinfo_task: TickTask::new(1), + update_network_class_task: TickTask::new(1), } } @@ -115,13 +111,13 @@ impl Network { unlocked_inner: Arc::new(Self::new_unlocked_inner()), }; - // Set public dialinfo tick task + // Set update network class tick task { let this2 = this.clone(); this.unlocked_inner - .update_public_dialinfo_task + .update_network_class_task .set_routine(move |l, t| { - Box::pin(this2.clone().update_public_dialinfo_task_routine(l, t)) + Box::pin(this2.clone().update_network_class_task_routine(l, t)) }); } @@ -239,6 +235,26 @@ impl Network { } } + pub fn with_interface_addresses(&self, f: F) -> R + where + F: FnOnce(&[IpAddr]) -> R, + { + let inner = self.inner.lock(); + inner.interfaces.with_best_addresses(f) + } + + // See if our interface addresses have changed, if so we need to punt the network + // and redo all our addresses. This is overkill, but anything more accurate + // would require inspection of routing tables that we dont want to bother with + pub async fn check_interface_addresses(&self) -> Result { + let mut inner = self.inner.lock(); + if !inner.interfaces.refresh().await? { + return Ok(false); + } + inner.network_needs_restart = true; + Ok(true) + } + //////////////////////////////////////////////////////////// // Send data to a dial info, unbound, using a new connection from a random port @@ -386,34 +402,50 @@ impl Network { // get protocol config let protocol_config = { let c = self.config.get(); - ProtocolConfig { - inbound: ProtocolSet { - udp: c.network.protocol.udp.enabled && c.capabilities.protocol_udp, - tcp: c.network.protocol.tcp.listen && c.capabilities.protocol_accept_tcp, - ws: c.network.protocol.ws.listen && c.capabilities.protocol_accept_ws, - wss: c.network.protocol.wss.listen && c.capabilities.protocol_accept_wss, - }, - outbound: ProtocolSet { - udp: c.network.protocol.udp.enabled && c.capabilities.protocol_udp, - tcp: c.network.protocol.tcp.connect && c.capabilities.protocol_connect_tcp, - ws: c.network.protocol.ws.connect && c.capabilities.protocol_connect_ws, - wss: c.network.protocol.wss.connect && c.capabilities.protocol_connect_wss, - }, + let mut inbound = ProtocolSet::new(); + + if c.network.protocol.udp.enabled && c.capabilities.protocol_udp { + inbound.insert(ProtocolType::UDP); } + if c.network.protocol.tcp.listen && c.capabilities.protocol_accept_tcp { + inbound.insert(ProtocolType::TCP); + } + if c.network.protocol.ws.listen && c.capabilities.protocol_accept_ws { + inbound.insert(ProtocolType::WS); + } + if c.network.protocol.wss.listen && c.capabilities.protocol_accept_wss { + inbound.insert(ProtocolType::WSS); + } + + let mut outbound = ProtocolSet::new(); + if c.network.protocol.udp.enabled && c.capabilities.protocol_udp { + outbound.insert(ProtocolType::UDP); + } + if c.network.protocol.tcp.connect && c.capabilities.protocol_connect_tcp { + outbound.insert(ProtocolType::TCP); + } + if c.network.protocol.ws.connect && c.capabilities.protocol_connect_ws { + outbound.insert(ProtocolType::WS); + } + if c.network.protocol.wss.connect && c.capabilities.protocol_connect_wss { + outbound.insert(ProtocolType::WSS); + } + + ProtocolConfig { inbound, outbound } }; self.inner.lock().protocol_config = Some(protocol_config); // start listeners - if protocol_config.inbound.udp { + if protocol_config.inbound.contains(ProtocolType::UDP) { self.start_udp_listeners().await?; } - if protocol_config.inbound.ws { + if protocol_config.inbound.contains(ProtocolType::WS) { self.start_ws_listeners().await?; } - if protocol_config.inbound.wss { + if protocol_config.inbound.contains(ProtocolType::WSS) { self.start_wss_listeners().await?; } - if protocol_config.inbound.tcp { + if protocol_config.inbound.contains(ProtocolType::TCP) { self.start_tcp_listeners().await?; } @@ -431,17 +463,21 @@ impl Network { self.inner.lock().network_needs_restart } + pub fn restart_network(&self) { + self.inner.lock().network_needs_restart = true; + } + pub async fn shutdown(&self) { info!("stopping network"); let network_manager = self.network_manager(); let routing_table = self.routing_table(); - // Reset state - // Drop all dial info - routing_table.clear_dial_info_details(); + routing_table.clear_dial_info_details(RoutingDomain::PublicInternet); + routing_table.clear_dial_info_details(RoutingDomain::LocalNetwork); + // Reset state including network class // Cancels all async background tasks by dropping join handles *self.inner.lock() = Self::new_inner(network_manager); @@ -451,109 +487,22 @@ impl Network { ////////////////////////////////////////// pub fn get_network_class(&self) -> Option { let inner = self.inner.lock(); - if !inner.network_started { - return None; - } + return inner.network_class; + } - // If we've fixed the network class, return it rather than calculating it - if inner.network_class.is_some() { - return inner.network_class; - } - - // Go through our global dialinfo and see what our best network class is - let mut network_class = NetworkClass::Invalid; - for did in inner.routing_table.public_dial_info_details() { - if let Some(nc) = did.network_class { - if nc < network_class { - network_class = nc; - } - } - } - Some(network_class) + pub fn reset_network_class(&self) { + let inner = self.inner.lock(); + inner.network_class = None; } ////////////////////////////////////////// pub async fn tick(&self) -> Result<(), String> { - let ( - routing_table, - protocol_config, - udp_static_public_dialinfo, - tcp_static_public_dialinfo, - ws_static_public_dialinfo, - network_class, - ) = { - let inner = self.inner.lock(); - ( - inner.routing_table.clone(), - inner.protocol_config.unwrap_or_default(), - inner.udp_static_public_dialinfo, - inner.tcp_static_public_dialinfo, - inner.ws_static_public_dialinfo, - inner.network_class.unwrap_or(NetworkClass::Invalid), - ) - }; + let network_class = self.get_network_class().unwrap_or(NetworkClass::Invalid); - // See if we have any UDPv4 public dialinfo, and if we should have it - // If we have statically configured public dialinfo, don't bother with this - // If we can have public dialinfo, or we haven't figured out our network class yet, - // and we're active for UDP, we should attempt to get our public dialinfo sorted out - // and assess our network class if we haven't already - if (network_class.inbound_capable() || network_class == NetworkClass::Invalid) - && - { - let filter = DialInfoFilter::global() - .with_protocol_type(ProtocolType::TCP) - .with_address_type(AddressType::IPV4); - let need_tcpv4_dialinfo = routing_table - .first_public_filtered_dial_info_detail(&filter) - .is_none(); - if need_tcpv4_dialinfo { - } - - self.unlocked_inner - .update_public_dialinfo_task - .tick() - .await?; - } - - // Same but for TCPv4 - if protocol_config.inbound.tcp - && !tcp_static_public_dialinfo - && (network_class.inbound_capable() || network_class == NetworkClass::Invalid) - { - let filter = DialInfoFilter::all() - .with_protocol_type(ProtocolType::TCP) - .with_address_type(AddressType::IPV4); - let need_tcpv4_dialinfo = routing_table - .first_public_filtered_dial_info_detail(&filter) - .is_none(); - if need_tcpv4_dialinfo { - // If we have no public TCPv4 dialinfo, then we need to run a NAT check - // ensure the singlefuture is running for this - self.unlocked_inner - .update_tcpv4_dialinfo_task - .tick() - .await?; - } - } - - // Same but for WSv4 - if protocol_config.inbound.ws - && !ws_static_public_dialinfo - && (network_class.inbound_capable() || network_class == NetworkClass::Invalid) - { - let filter = DialInfoFilter::all() - .with_protocol_type(ProtocolType::WS) - .with_address_type(AddressType::IPV4); - let need_wsv4_dialinfo = routing_table - .first_public_filtered_dial_info_detail(&filter) - .is_none(); - if need_wsv4_dialinfo { - // If we have no public TCPv4 dialinfo, then we need to run a NAT check - // ensure the singlefuture is running for this - self.unlocked_inner.update_wsv4_dialinfo_task.tick().await?; - } + // If we need to figure out our network class, tick the task for it + if network_class == NetworkClass::Invalid { + self.unlocked_inner.update_network_class_task.tick().await?; } Ok(()) diff --git a/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs b/veilid-core/src/intf/native/network/network_class_discovery.rs similarity index 92% rename from veilid-core/src/intf/native/network/public_dialinfo_discovery.rs rename to veilid-core/src/intf/native/network/network_class_discovery.rs index 94889a68..b10d27e9 100644 --- a/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs +++ b/veilid-core/src/intf/native/network/network_class_discovery.rs @@ -60,7 +60,7 @@ impl Network { .with_protocol_type(protocol_type) .with_address_type(address_type); routing_table - .interface_dial_info_details() + .dial_info_details(RoutingDomain::LocalNetwork) .iter() .filter_map(|did| { if did.dial_info.matches_filter(&filter) { @@ -100,7 +100,7 @@ impl Network { None } - pub async fn update_udpv4_dialinfo_task_routine(self, _l: u64, _t: u64) -> Result<(), String> { + pub async fn update_udpv4_dialinfo(&self) -> Result<(), String> { log_net!("looking for udpv4 public dial info"); let routing_table = self.routing_table(); @@ -258,17 +258,42 @@ impl Network { Ok(()) } - pub async fn update_tcpv4_dialinfo_task_routine(self, _l: u64, _t: u64) -> Result<(), String> { + pub async fn update_tcpv4_dialinfo(&self) -> Result<(), String> { log_net!("looking for tcpv4 public dial info"); // xxx //Err("unimplemented".to_owned()) Ok(()) } - pub async fn update_wsv4_dialinfo_task_routine(self, _l: u64, _t: u64) -> Result<(), String> { + pub async fn update_wsv4_dialinfo(&self) -> Result<(), String> { log_net!("looking for wsv4 public dial info"); // xxx //Err("unimplemented".to_owned()) Ok(()) } + + pub async fn update_network_class_task_routine(self, _l: u64, _t: u64) -> Result<(), String> { + log_net!("updating network class"); + + let protocol_config = self + .inner + .lock() + .protocol_config + .clone() + .unwrap_or_default(); + + if protocol_config.inbound.contains(ProtocolType::UDP) { + self.update_udpv4_dialinfo().await?; + } + + if protocol_config.inbound.contains(ProtocolType::TCP) { + self.update_tcpv4_dialinfo().await?; + } + + if protocol_config.inbound.contains(ProtocolType::WS) { + self.update_wsv4_dialinfo().await?; + } + + Ok(()) + } } diff --git a/veilid-core/src/intf/native/network/start_protocols.rs b/veilid-core/src/intf/native/network/start_protocols.rs index 0a13a8c2..e1e48dd6 100644 --- a/veilid-core/src/intf/native/network/start_protocols.rs +++ b/veilid-core/src/intf/native/network/start_protocols.rs @@ -261,11 +261,12 @@ impl Network { pub(super) async fn start_udp_listeners(&self) -> Result<(), String> { trace!("starting udp listeners"); let routing_table = self.routing_table(); - let (listen_address, public_address) = { + let (listen_address, public_address, enable_local_peer_scope) = { let c = self.config.get(); ( c.network.protocol.udp.listen_address.clone(), c.network.protocol.udp.public_address.clone(), + c.network.enable_local_peer_scope, ) }; @@ -287,23 +288,28 @@ impl Network { "UDP: starting listeners on port {} at {:?}", udp_port, ip_addrs ); - let dial_info_list = self.create_udp_inbound_sockets(ip_addrs, udp_port).await?; + let local_dial_info_list = self.create_udp_inbound_sockets(ip_addrs, udp_port).await?; let mut static_public = false; - for di in &dial_info_list { - // If the local interface address is global, + + // Register local dial info + for di in &local_dial_info_list { + // If the local interface address is global, or we are enabling local peer scope // register global dial info if no public address is specified - if public_address.is_none() && di.is_global() { - routing_table.register_public_dial_info( + if public_address.is_none() && (di.is_global() || enable_local_peer_scope) { + routing_table.register_dial_info( + RoutingDomain::PublicInternet, di.clone(), DialInfoOrigin::Static, - Some(NetworkClass::Server), ); - static_public = true; } // Register interface dial info as well since the address is on the local interface - routing_table.register_interface_dial_info(di.clone(), DialInfoOrigin::Static); + routing_table.register_dial_info( + RoutingDomain::LocalNetwork, + di.clone(), + DialInfoOrigin::Static, + ); } // Add static public dialinfo if it's configured @@ -316,16 +322,43 @@ impl Network { // Add all resolved addresses as public dialinfo for pdi_addr in &mut public_sockaddrs { - routing_table.register_public_dial_info( - DialInfo::udp_from_socketaddr(pdi_addr), + let pdi = DialInfo::udp_from_socketaddr(pdi_addr); + + // Register the public address + routing_table.register_dial_info( + RoutingDomain::PublicInternet, + pdi.clone(), DialInfoOrigin::Static, - Some(NetworkClass::Server), ); + + // See if this public address is also a local interface address + if !local_dial_info_list.contains(&pdi) + && self.with_interface_addresses(|ip_addrs| { + for ip_addr in ip_addrs { + if pdi_addr.ip() == *ip_addr { + return true; + } + } + false + }) + { + routing_table.register_dial_info( + RoutingDomain::LocalNetwork, + DialInfo::udp_from_socketaddr(pdi_addr), + DialInfoOrigin::Static, + ); + } + static_public = true; } } - self.inner.lock().udp_static_public_dialinfo = static_public; + if static_public { + self.inner + .lock() + .static_public_dialinfo + .insert(ProtocolType::UDP); + } // Now create tasks for udp listeners self.create_udp_listener_tasks().await @@ -334,12 +367,13 @@ impl Network { pub(super) async fn start_ws_listeners(&self) -> Result<(), String> { trace!("starting ws listeners"); let routing_table = self.routing_table(); - let (listen_address, url, path) = { + let (listen_address, url, path, enable_local_peer_scope) = { let c = self.config.get(); ( c.network.protocol.ws.listen_address.clone(), c.network.protocol.ws.url.clone(), c.network.protocol.ws.path.clone(), + c.network.enable_local_peer_scope, ) }; @@ -367,31 +401,7 @@ impl Network { trace!("WS: listener started"); let mut static_public = false; - for socket_address in socket_addresses { - if url.is_none() && socket_address.address().is_global() { - // Build global dial info request url - let global_url = format!("ws://{}/{}", socket_address, path); - - // Create global dial info - let di = DialInfo::try_ws(socket_address, global_url) - .map_err(map_to_string) - .map_err(logthru_net!(error))?; - routing_table.register_public_dial_info( - di, - DialInfoOrigin::Static, - Some(NetworkClass::Server), - ); - static_public = true; - } - // Build interface dial info request url - let interface_url = format!("ws://{}/{}", socket_address, path); - - // Create interface dial info - let di = DialInfo::try_ws(socket_address, interface_url) - .map_err(map_to_string) - .map_err(logthru_net!(error))?; - routing_table.register_interface_dial_info(di, DialInfoOrigin::Static); - } + let mut registered_addresses: HashSet = HashSet::new(); // Add static public dialinfo if it's configured if let Some(url) = url.as_ref() { @@ -410,17 +420,74 @@ impl Network { .map_err(logthru_net!(error))?; for gsa in global_socket_addrs { - routing_table.register_public_dial_info( - DialInfo::try_ws(SocketAddress::from_socket_addr(gsa), url.clone()) - .map_err(map_to_string) - .map_err(logthru_net!(error))?, + let pdi = DialInfo::try_ws(SocketAddress::from_socket_addr(gsa), url.clone()) + .map_err(map_to_string) + .map_err(logthru_net!(error))?; + + routing_table.register_dial_info( + RoutingDomain::PublicInternet, + pdi.clone(), DialInfoOrigin::Static, - Some(NetworkClass::Server), ); + static_public = true; + + // See if this public address is also a local interface address + if !registered_addresses.contains(&gsa.ip()) + && self.with_interface_addresses(|ip_addrs| { + for ip_addr in ip_addrs { + if gsa.ip() == *ip_addr { + return true; + } + } + false + }) + { + routing_table.register_dial_info( + RoutingDomain::LocalNetwork, + pdi, + DialInfoOrigin::Static, + ); + } + + registered_addresses.insert(gsa.ip()); } - static_public = true; } - self.inner.lock().ws_static_public_dialinfo = static_public; + + for socket_address in socket_addresses { + // Skip addresses we already did + if registered_addresses.contains(&socket_address.to_ip_addr()) { + continue; + } + // Build dial info request url + let local_url = format!("ws://{}/{}", socket_address, path); + let local_di = DialInfo::try_ws(socket_address, local_url) + .map_err(map_to_string) + .map_err(logthru_net!(error))?; + + if url.is_none() && (socket_address.address().is_global() || enable_local_peer_scope) { + // Register public dial info + routing_table.register_dial_info( + RoutingDomain::PublicInternet, + local_di.clone(), + DialInfoOrigin::Static, + ); + static_public = true; + } + + // Register local dial info + routing_table.register_dial_info( + RoutingDomain::LocalNetwork, + local_di, + DialInfoOrigin::Static, + ); + } + + if static_public { + self.inner + .lock() + .static_public_dialinfo + .insert(ProtocolType::WS); + } Ok(()) } @@ -429,11 +496,12 @@ impl Network { trace!("starting wss listeners"); let routing_table = self.routing_table(); - let (listen_address, url) = { + let (listen_address, url, enable_local_peer_scope) = { let c = self.config.get(); ( c.network.protocol.wss.listen_address.clone(), c.network.protocol.wss.url.clone(), + c.network.enable_local_peer_scope, ) }; @@ -465,6 +533,9 @@ impl Network { // is specified, then TLS won't validate, so no local dialinfo is possible. // This is not the case with unencrypted websockets, which can be specified solely by an IP address + let mut static_public = false; + let mut registered_addresses: HashSet = HashSet::new(); + // Add static public dialinfo if it's configured if let Some(url) = url.as_ref() { // Add static public dialinfo if it's configured @@ -483,18 +554,48 @@ impl Network { .map_err(logthru_net!(error))?; for gsa in global_socket_addrs { - routing_table.register_public_dial_info( - DialInfo::try_wss(SocketAddress::from_socket_addr(gsa), url.clone()) - .map_err(map_to_string) - .map_err(logthru_net!(error))?, + let pdi = DialInfo::try_wss(SocketAddress::from_socket_addr(gsa), url.clone()) + .map_err(map_to_string) + .map_err(logthru_net!(error))?; + + routing_table.register_dial_info( + RoutingDomain::PublicInternet, + pdi.clone(), DialInfoOrigin::Static, - Some(NetworkClass::Server), ); + static_public = true; + + // See if this public address is also a local interface address + if !registered_addresses.contains(&gsa.ip()) + && self.with_interface_addresses(|ip_addrs| { + for ip_addr in ip_addrs { + if gsa.ip() == *ip_addr { + return true; + } + } + false + }) + { + routing_table.register_dial_info( + RoutingDomain::LocalNetwork, + pdi, + DialInfoOrigin::Static, + ); + } + + registered_addresses.insert(gsa.ip()); } } else { return Err("WSS URL must be specified due to TLS requirements".to_owned()); } + if static_public { + self.inner + .lock() + .static_public_dialinfo + .insert(ProtocolType::WSS); + } + Ok(()) } @@ -502,11 +603,12 @@ impl Network { trace!("starting tcp listeners"); let routing_table = self.routing_table(); - let (listen_address, public_address) = { + let (listen_address, public_address, enable_local_peer_scope) = { let c = self.config.get(); ( c.network.protocol.tcp.listen_address.clone(), c.network.protocol.tcp.public_address.clone(), + c.network.enable_local_peer_scope, ) }; @@ -534,20 +636,27 @@ impl Network { trace!("TCP: listener started"); let mut static_public = false; + let mut registered_addresses: HashSet = HashSet::new(); + for socket_address in socket_addresses { let di = DialInfo::tcp(socket_address); // Register global dial info if no public address is specified - if public_address.is_none() && di.is_global() { - routing_table.register_public_dial_info( + if public_address.is_none() && (di.is_global() || enable_local_peer_scope) { + routing_table.register_dial_info( + RoutingDomain::PublicInternet, di.clone(), DialInfoOrigin::Static, - Some(NetworkClass::Server), ); static_public = true; } // Register interface dial info - routing_table.register_interface_dial_info(di.clone(), DialInfoOrigin::Static); + routing_table.register_dial_info( + RoutingDomain::LocalNetwork, + di.clone(), + DialInfoOrigin::Static, + ); + registered_addresses.insert(socket_address.to_ip_addr()); } // Add static public dialinfo if it's configured @@ -560,16 +669,45 @@ impl Network { // Add all resolved addresses as public dialinfo for pdi_addr in &mut public_sockaddrs { - routing_table.register_public_dial_info( - DialInfo::tcp_from_socketaddr(pdi_addr), + // Skip addresses we already did + if registered_addresses.contains(&pdi_addr.ip()) { + continue; + } + let pdi = DialInfo::tcp_from_socketaddr(pdi_addr); + + routing_table.register_dial_info( + RoutingDomain::PublicInternet, + pdi.clone(), DialInfoOrigin::Static, - None, ); static_public = true; + + // See if this public address is also a local interface address + if self.with_interface_addresses(|ip_addrs| { + for ip_addr in ip_addrs { + if pdi_addr.ip() == *ip_addr { + return true; + } + } + false + }) { + routing_table.register_dial_info( + RoutingDomain::LocalNetwork, + pdi, + DialInfoOrigin::Static, + ); + } + + static_public = true; } } - self.inner.lock().tcp_static_public_dialinfo = static_public; + if static_public { + self.inner + .lock() + .static_public_dialinfo + .insert(ProtocolType::TCP); + } Ok(()) } diff --git a/veilid-core/src/intf/native/utils/network_interfaces/mod.rs b/veilid-core/src/intf/native/utils/network_interfaces/mod.rs index 100aa6ac..1b4cdfb8 100644 --- a/veilid-core/src/intf/native/utils/network_interfaces/mod.rs +++ b/veilid-core/src/intf/native/utils/network_interfaces/mod.rs @@ -295,6 +295,7 @@ impl NetworkInterface { pub struct NetworkInterfaces { valid: bool, interfaces: BTreeMap, + interface_address_cache: Vec, } impl fmt::Debug for NetworkInterfaces { @@ -317,6 +318,7 @@ impl NetworkInterfaces { Self { valid: false, interfaces: BTreeMap::new(), + interface_address_cache: Vec::new(), } } pub fn is_valid(&self) -> bool { @@ -324,6 +326,7 @@ impl NetworkInterfaces { } pub fn clear(&mut self) { self.interfaces.clear(); + self.interface_address_cache.clear(); self.valid = false; } // returns Ok(false) if refresh had no changes, Ok(true) if changes were present @@ -341,6 +344,8 @@ impl NetworkInterfaces { let changed = last_interfaces != self.interfaces; if changed { trace!("NetworkInterfaces refreshed: {:#?}?", self); + + self.cache_best_addresses(); } Ok(changed) } @@ -351,7 +356,7 @@ impl NetworkInterfaces { self.interfaces.iter() } - pub fn best_addresses(&self) -> Vec { + fn cache_best_addresses(&mut self) { // Reduce interfaces to their best routable ip addresses let mut intf_addrs = Vec::new(); for intf in self.interfaces.values() { @@ -370,6 +375,17 @@ impl NetworkInterfaces { intf_addrs.sort(); // Now export just the addresses - intf_addrs.iter().map(|x| x.if_addr().ip()).collect() + self.interface_address_cache = intf_addrs.iter().map(|x| x.if_addr().ip()).collect() + } + + pub fn best_addresses(&self) -> Vec { + self.interface_address_cache.clone() + } + + pub fn with_best_addresses(&self, f: F) -> R + where + F: FnOnce(&[IpAddr]) -> R, + { + f(&self.interface_address_cache) } } diff --git a/veilid-core/src/intf/wasm/network/mod.rs b/veilid-core/src/intf/wasm/network/mod.rs index 7f7a20ea..ff945a27 100644 --- a/veilid-core/src/intf/wasm/network/mod.rs +++ b/veilid-core/src/intf/wasm/network/mod.rs @@ -162,6 +162,10 @@ impl Network { self.inner.lock().network_needs_restart } + pub fn restart_network(&self) { + self.inner.lock().network_needs_restart = true; + } + pub async fn shutdown(&self) { trace!("stopping network"); @@ -178,6 +182,18 @@ impl Network { trace!("network stopped"); } + pub fn with_interface_addresses(&self, f: F) -> R + where + F: FnOnce(&[IpAddr]) -> R, + { + f(&[]) + } + + pub async fn check_interface_addresses(&self) -> Result { + Ok(false) + } + + ////////////////////////////////////////// pub fn get_network_class(&self) -> Option { // xxx eventually detect tor browser? diff --git a/veilid-core/src/network_manager.rs b/veilid-core/src/network_manager.rs index b9adebc7..25541026 100644 --- a/veilid-core/src/network_manager.rs +++ b/veilid-core/src/network_manager.rs @@ -14,6 +14,7 @@ pub const RELAY_MANAGEMENT_INTERVAL_SECS: u32 = 1; pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE; pub const IPADDR_TABLE_SIZE: usize = 1024; pub const IPADDR_MAX_INACTIVE_DURATION_US: u64 = 300_000_000u64; // 5 minutes +pub const GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT: usize = 3; #[derive(Copy, Clone, Debug, Default)] pub struct ProtocolConfig { @@ -80,18 +81,19 @@ enum ContactMethod { #[derive(Copy, Clone, Debug)] pub enum SendDataKind { - Direct, - Indirect, + LocalDirect, + GlobalDirect, + GlobalIndirect, } // The mutable state of the network manager struct NetworkManagerInner { routing_table: Option, components: Option, - network_class: Option, stats: NetworkManagerStats, client_whitelist: LruCache, relay_node: Option, + global_address_check_cache: LruCache, } struct NetworkManagerUnlockedInner { @@ -114,10 +116,10 @@ impl NetworkManager { NetworkManagerInner { routing_table: None, components: None, - network_class: None, stats: NetworkManagerStats::default(), client_whitelist: LruCache::new_unbounded(), relay_node: None, + global_address_check_cache: LruCache::new(8), } } fn new_unlocked_inner(_config: VeilidConfig) -> NetworkManagerUnlockedInner { @@ -272,7 +274,6 @@ impl NetworkManager { // reset the state let mut inner = self.inner.lock(); inner.components = None; - inner.network_class = None; trace!("NetworkManager::shutdown end"); } @@ -551,11 +552,18 @@ impl NetworkManager { pub async fn send_envelope>( &self, node_ref: NodeRef, - node_id: Option, + envelope_node_id: Option, body: B, - ) -> Result<(), String> { - if let Some(node_id) = node_id { - log_net!("sending envelope to {:?} via {:?}", node_id, node_ref); + ) -> Result { + let via_node_id = node_ref.node_id(); + let envelope_node_id = envelope_node_id.unwrap_or(via_node_id); + + if envelope_node_id != via_node_id { + log_net!( + "sending envelope to {:?} via {:?}", + envelope_node_id, + node_ref + ); } else { log_net!("sending envelope to {:?}", node_ref); } @@ -567,9 +575,7 @@ impl NetworkManager { if node_min > MAX_VERSION || node_max < MIN_VERSION { return Err(format!( "can't talk to this node {} because version is unsupported: ({},{})", - node_ref.node_id(), - node_min, - node_max + via_node_id, node_min, node_max )) .map_err(logthru_rpc!(warn)); } @@ -580,11 +586,17 @@ impl NetworkManager { // Build the envelope to send let out = self - .build_envelope(node_id.unwrap_or_else(|| node_ref.node_id()), version, body) + .build_envelope(envelope_node_id, version, body) .map_err(logthru_rpc!(error))?; // Send the envelope via whatever means necessary - self.send_data(node_ref, out).await + let send_data_kind = self.send_data(node_ref.clone(), out).await?; + + // If we asked to relay from the start, then this is always indirect + if envelope_node_id != via_node_id { + return Ok(SendDataKind::GlobalIndirect); + } + return Ok(send_data_kind); } // Called by the RPC handler when we want to issue an direct receipt @@ -867,7 +879,7 @@ impl NetworkManager { &self, node_ref: NodeRef, data: Vec, - ) -> SystemPinBoxFuture> { + ) -> SystemPinBoxFuture> { let this = self.clone(); Box::pin(async move { // First try to send data to the last socket we've seen this peer on @@ -879,7 +891,11 @@ impl NetworkManager { .map_err(logthru_net!())? { None => { - return Ok(()); + return Ok(if descriptor.matches_peer_scope(PeerScope::Local) { + SendDataKind::LocalDirect + } else { + SendDataKind::GlobalDirect + }); } Some(d) => d, } @@ -890,19 +906,30 @@ impl NetworkManager { // If we don't have last_connection, try to reach out to the peer via its dial info match this.get_contact_method(node_ref).map_err(logthru_net!())? { ContactMethod::OutboundRelay(relay_nr) | ContactMethod::InboundRelay(relay_nr) => { - this.send_data(relay_nr, data).await - } - ContactMethod::Direct(dial_info) => { - this.net().send_data_to_dial_info(dial_info, data).await - } - ContactMethod::SignalReverse(relay_nr, target_node_ref) => { - this.do_reverse_connect(relay_nr, target_node_ref, data) + this.send_data(relay_nr, data) .await + .map(|_| SendDataKind::GlobalIndirect) } - ContactMethod::SignalHolePunch(relay_nr, target_node_ref) => { - this.do_hole_punch(relay_nr, target_node_ref, data).await - } - ContactMethod::Unreachable => Err("Can't send to this relay".to_owned()), + ContactMethod::Direct(dial_info) => this + .net() + .send_data_to_dial_info(dial_info, data) + .await + .map(|_| { + if dial_info.is_local() { + SendDataKind::LocalDirect + } else { + SendDataKind::GlobalDirect + } + }), + ContactMethod::SignalReverse(relay_nr, target_node_ref) => this + .do_reverse_connect(relay_nr, target_node_ref, data) + .await + .map(|_| SendDataKind::GlobalDirect), + ContactMethod::SignalHolePunch(relay_nr, target_node_ref) => this + .do_hole_punch(relay_nr, target_node_ref, data) + .await + .map(|_| SendDataKind::GlobalDirect), + ContactMethod::Unreachable => Err("Can't send to this node".to_owned()), } .map_err(logthru_net!()) }) @@ -1156,4 +1183,49 @@ impl NetworkManager { .transfer_stats_accounting .add_down(bytes); } + + // Determine if a local IP address has changed + // this means we should restart the low level network and and recreate all of our dial info + // Wait until we have received confirmation from N different peers + pub async fn report_local_socket_address( + &self, + _socket_address: SocketAddress, + _reporting_peer: NodeRef, + ) { + // XXX: Nothing here yet. + } + + // Determine if a global IP address has changed + // this means we should recreate our public dial info if it is not static and rediscover it + // Wait until we have received confirmation from N different peers + pub async fn report_global_socket_address( + &self, + socket_address: SocketAddress, + reporting_peer: NodeRef, + ) { + let mut inner = self.inner.lock(); + inner + .global_address_check_cache + .insert(reporting_peer.node_id(), socket_address); + + let network_class = inner + .components + .unwrap() + .net + .get_network_class() + .unwrap_or(NetworkClass::Invalid); + + if network_class.inbound_capable() { + // If we are inbound capable, but start to see inconsistent socket addresses from multiple reporting peers + // then we zap the network class and re-detect it + + // If we are inbound capable but start to see consistently different socket addresses from multiple reporting peers + // then we zap the network class and global dial info and re-detect it + } else { + // If we are currently outbound only, we don't have any public dial info + // but if we are starting to see consistent socket address from multiple reporting peers + // then we may be become inbound capable, so zap the network class so we can re-detect it and any public dial info + inner.components.unwrap().net.reset_network_class(); + } + } } diff --git a/veilid-core/src/routing_table/debug.rs b/veilid-core/src/routing_table/debug.rs index 4f82e2ba..629586b4 100644 --- a/veilid-core/src/routing_table/debug.rs +++ b/veilid-core/src/routing_table/debug.rs @@ -23,15 +23,15 @@ impl RoutingTable { out } pub fn debug_info_dialinfo(&self) -> String { - let ldis = self.interface_dial_info_details(); - let gdis = self.public_dial_info_details(); + let ldis = self.dial_info_details(RoutingDomain::LocalNetwork); + let gdis = self.dial_info_details(RoutingDomain::PublicInternet); let mut out = String::new(); - out += "Interface Dial Info Details:\n"; + out += "Local Network Dial Info Details:\n"; for (n, ldi) in ldis.iter().enumerate() { out += &format!(" {:>2}: {:?}\n", n, ldi); } - out += "Public Dial Info Details:\n"; + out += "Public Internet Dial Info Details:\n"; for (n, gdi) in gdis.iter().enumerate() { out += &format!(" {:>2}: {:?}\n", n, gdi); } diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index dc7c29d7..f0ed3308 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -55,22 +55,11 @@ impl RoutingTable { node_info: NodeInfo { network_class: netman.get_network_class().unwrap_or(NetworkClass::Invalid), outbound_protocols: netman.get_protocol_config().unwrap_or_default().outbound, - dial_info_list: if !enable_local_peer_scope { - self.public_dial_info_details() - .iter() - .map(|did| did.dial_info.clone()) - .collect() - } else { - self.public_dial_info_details() - .iter() - .map(|did| did.dial_info.clone()) - .chain( - self.interface_dial_info_details() - .iter() - .map(|did| did.dial_info.clone()), - ) - .collect() - }, + dial_info_list: self + .dial_info_details(RoutingDomain::PublicInternet) + .iter() + .map(|did| did.dial_info.clone()) + .collect(), relay_peer_info: relay_node.map(|rn| Box::new(rn.peer_info())), }, } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index e96276bc..0b74aa80 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -29,11 +29,21 @@ pub enum DialInfoOrigin { Mapped, } +#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Ord, Eq)] +pub enum RoutingDomain { + PublicInternet, + LocalNetwork, +} + +#[derive(Debug, Default)] +pub struct RoutingDomainDetail { + dial_info_details: Vec, +} + #[derive(Debug, Clone, PartialEq, PartialOrd, Ord, Eq)] pub struct DialInfoDetail { pub dial_info: DialInfo, pub origin: DialInfoOrigin, - pub network_class: Option, pub timestamp: u64, } @@ -48,13 +58,10 @@ struct RoutingTableInner { node_id: DHTKey, node_id_secret: DHTKeySecret, buckets: Vec, - public_dial_info_details: Vec, - interface_dial_info_details: Vec, + public_internet_routing_domain: RoutingDomainDetail, + local_network_routing_domain: RoutingDomainDetail, bucket_entry_count: usize, - // Waiters - eventual_changed_dial_info: Eventual, - // Transfer stats for this node self_latency_stats_accounting: LatencyStatsAccounting, self_transfer_stats_accounting: TransferStatsAccounting, @@ -90,10 +97,9 @@ impl RoutingTable { node_id: DHTKey::default(), node_id_secret: DHTKeySecret::default(), buckets: Vec::new(), - public_dial_info_details: Vec::new(), - interface_dial_info_details: Vec::new(), + public_internet_routing_domain: RoutingDomainDetail::default(), + local_network_routing_domain: RoutingDomainDetail::default(), bucket_entry_count: 0, - eventual_changed_dial_info: Eventual::new(), self_latency_stats_accounting: LatencyStatsAccounting::new(), self_transfer_stats_accounting: TransferStatsAccounting::new(), self_transfer_stats: TransferStatsDownUp::default(), @@ -165,81 +171,78 @@ impl RoutingTable { self.inner.lock().node_id_secret } - pub fn has_interface_dial_info(&self) -> bool { - !self.inner.lock().interface_dial_info_details.is_empty() + fn with_routing_domain(inner: &RoutingTableInner, domain: RoutingDomain, f: F) -> R + where + F: FnOnce(&RoutingDomainDetail) -> R, + { + match domain { + RoutingDomain::PublicInternet => f(&inner.public_internet_routing_domain), + RoutingDomain::LocalNetwork => f(&inner.local_network_routing_domain), + } } - pub fn has_public_dial_info(&self) -> bool { - !self.inner.lock().public_dial_info_details.is_empty() + fn with_routing_domain_mut( + inner: &mut RoutingTableInner, + domain: RoutingDomain, + f: F, + ) -> R + where + F: FnOnce(&mut RoutingDomainDetail) -> R, + { + match domain { + RoutingDomain::PublicInternet => f(&mut inner.public_internet_routing_domain), + RoutingDomain::LocalNetwork => f(&mut inner.local_network_routing_domain), + } } - pub fn public_dial_info_details(&self) -> Vec { - self.inner.lock().public_dial_info_details.clone() + pub fn has_dial_info(&self, domain: RoutingDomain) -> bool { + let inner = self.inner.lock(); + Self::with_routing_domain(&*inner, domain, |rd| !rd.dial_info_details.is_empty()) } - pub fn interface_dial_info_details(&self) -> Vec { - self.inner.lock().interface_dial_info_details.clone() + pub fn dial_info_details(&self, domain: RoutingDomain) -> Vec { + let inner = self.inner.lock(); + Self::with_routing_domain(&*inner, domain, |rd| rd.dial_info_details.clone()) } - pub fn first_public_filtered_dial_info_detail( + pub fn first_filtered_dial_info_detail( &self, + domain: RoutingDomain, filter: &DialInfoFilter, ) -> Option { let inner = self.inner.lock(); - for did in &inner.public_dial_info_details { - if did.matches_filter(filter) { - return Some(did.clone()); + Self::with_routing_domain(&*inner, domain, |rd| { + for did in rd.dial_info_details { + if did.matches_filter(filter) { + return Some(did.clone()); + } } - } - None + None + }) } - pub fn all_public_filtered_dial_info_details( + pub fn all_filtered_dial_info_details( &self, + domain: RoutingDomain, filter: &DialInfoFilter, ) -> Vec { let inner = self.inner.lock(); - let mut ret = Vec::new(); - for did in &inner.public_dial_info_details { - if did.matches_filter(filter) { - ret.push(did.clone()); + Self::with_routing_domain(&*inner, domain, |rd| { + let mut ret = Vec::new(); + for did in rd.dial_info_details { + if did.matches_filter(filter) { + ret.push(did.clone()); + } } - } - ret + ret + }) } - pub fn first_interface_filtered_dial_info_detail( - &self, - filter: &DialInfoFilter, - ) -> Option { - let inner = self.inner.lock(); - for did in &inner.interface_dial_info_details { - if did.matches_filter(filter) { - return Some(did.clone()); - } - } - None - } - - pub fn all_interface_filtered_dial_info_details( - &self, - filter: &DialInfoFilter, - ) -> Vec { - let inner = self.inner.lock(); - let mut ret = Vec::new(); - for did in &inner.interface_dial_info_details { - if did.matches_filter(filter) { - ret.push(did.clone()); - } - } - ret - } - - pub fn register_public_dial_info( + pub fn register_dial_info( &self, + domain: RoutingDomain, dial_info: DialInfo, origin: DialInfoOrigin, - network_class: Option, ) { let timestamp = get_timestamp(); let enable_local_peer_scope = { @@ -248,7 +251,10 @@ impl RoutingTable { c.network.enable_local_peer_scope }; - if !enable_local_peer_scope && dial_info.is_local() { + if !enable_local_peer_scope + && matches!(domain, RoutingDomain::PublicInternet) + && dial_info.is_local() + { error!("shouldn't be registering local addresses as public"); return; } @@ -258,21 +264,21 @@ impl RoutingTable { } let mut inner = self.inner.lock(); - - inner.public_dial_info_details.push(DialInfoDetail { - dial_info: dial_info.clone(), - origin, - network_class, - timestamp, + Self::with_routing_domain_mut(&mut *inner, domain, |rd| { + rd.dial_info_details.push(DialInfoDetail { + dial_info: dial_info.clone(), + origin, + timestamp, + }); }); - // Re-sort dial info to endure preference ordering - inner - .public_dial_info_details - .sort_by(|a, b| a.dial_info.cmp(&b.dial_info)); - + let domain_str = match domain { + RoutingDomain::PublicInternet => "Public", + RoutingDomain::LocalNetwork => "Local", + }; info!( - "Public Dial Info: {}", + "{} Dial Info: {}", + domain_str, NodeDialInfo { node_id: NodeId::new(inner.node_id), dial_info @@ -280,74 +286,13 @@ impl RoutingTable { .to_string(), ); debug!(" Origin: {:?}", origin); - debug!(" Network Class: {:?}", network_class); - - Self::trigger_changed_dial_info(&mut *inner); } - pub fn register_interface_dial_info(&self, dial_info: DialInfo, origin: DialInfoOrigin) { - if !dial_info.is_valid() { - error!("shouldn't be registering invalid interface addresses"); - return; - } - - let timestamp = get_timestamp(); + pub fn clear_dial_info_details(&self, domain: RoutingDomain) { let mut inner = self.inner.lock(); - - inner.interface_dial_info_details.push(DialInfoDetail { - dial_info: dial_info.clone(), - origin, - network_class: None, - timestamp, - }); - - // Re-sort dial info to endure preference ordering - inner - .interface_dial_info_details - .sort_by(|a, b| a.dial_info.cmp(&b.dial_info)); - - info!( - "Interface Dial Info: {}", - NodeDialInfo { - node_id: NodeId::new(inner.node_id), - dial_info - } - .to_string(), - ); - debug!(" Origin: {:?}", origin); - - Self::trigger_changed_dial_info(&mut *inner); - } - - pub fn clear_dial_info_details(&self) { - let mut inner = self.inner.lock(); - inner.public_dial_info_details.clear(); - inner.interface_dial_info_details.clear(); - Self::trigger_changed_dial_info(&mut *inner); - } - - pub async fn wait_changed_dial_info(&self) { - let inst = self - .inner - .lock() - .eventual_changed_dial_info - .instance_empty(); - inst.await; - } - - fn trigger_changed_dial_info(inner: &mut RoutingTableInner) { - // Clear 'seen node info' bits on routing table entries so we know to ping them - for b in &mut inner.buckets { - for e in b.entries_mut() { - e.1.set_seen_our_node_info(false); - } - } - // - - // Release any waiters - let mut new_eventual = Eventual::new(); - core::mem::swap(&mut inner.eventual_changed_dial_info, &mut new_eventual); - spawn(new_eventual.resolve()).detach(); + Self::with_routing_domain_mut(&mut *inner, domain, |rd| { + rd.dial_info_details.clear(); + }) } fn bucket_depth(index: usize) -> usize { @@ -688,7 +633,7 @@ impl RoutingTable { k, NodeInfo { network_class: NetworkClass::Server, // Bootstraps are always full servers - outbound_protocols: ProtocolSet::default(), // Bootstraps do not participate in relaying and will not make outbound requests + outbound_protocols: ProtocolSet::empty(), // Bootstraps do not participate in relaying and will not make outbound requests dial_info_list: v, // Dial info is as specified in the bootstrap list relay_peer_info: None, // Bootstraps never require a relay themselves }, diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 1f8d114b..c613c477 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -124,6 +124,7 @@ struct WaitableReply { timeout: u64, node_ref: NodeRef, send_ts: u64, + send_data_kind: SendDataKind, } ///////////////////////////////////////////////////////////////////// @@ -389,9 +390,6 @@ impl RPCProcessor { } // Issue a request over the network, possibly using an anonymized route - // If the request doesn't want a reply, returns immediately - // If the request wants a reply then it waits for one asynchronously - // If it doesn't receive a response in a sufficient time, then it returns a timeout error async fn request( &self, dest: Destination, @@ -411,11 +409,13 @@ impl RPCProcessor { (op_id, wants_answer) }; - let out_node_id; - let mut out_noderef: Option = None; - let hopcount: usize; + let out_node_id; // Envelope Node Id + let mut out_noderef: Option = None; // Node to send envelope to + let hopcount: usize; // Total safety + private route hop count + + // Create envelope data let out = { - let out; + let out; // Envelope data // To where are we sending the request match &dest { @@ -539,19 +539,22 @@ impl RPCProcessor { // send question let bytes = out.len() as u64; - if let Err(e) = self + let send_data_kind = match self .network_manager() .send_envelope(node_ref.clone(), Some(out_node_id), out) .await .map_err(logthru_rpc!(error)) .map_err(RPCError::Internal) { - // Make sure to clean up op id waiter in case of error - if eventual.is_some() { - self.cancel_op_id_waiter(op_id); + Ok(v) => v, + Err(e) => { + // Make sure to clean up op id waiter in case of error + if eventual.is_some() { + self.cancel_op_id_waiter(op_id); + } + return Err(e); } - return Err(e); - } + }; // Successfully sent let send_ts = get_timestamp(); @@ -570,12 +573,13 @@ impl RPCProcessor { timeout, node_ref, send_ts, + send_data_kind, })), } } // Issue a reply over the network, possibly using an anonymized route - // If the request doesn't want a reply, this routine does nothing + // The request must want a response, or this routine fails async fn reply( &self, request_rpcreader: RPCMessageReader, @@ -1379,6 +1383,9 @@ impl RPCProcessor { .await? .unwrap(); + // Note what kind of ping this was and to what peer scope + let send_data_kind = waitable_reply.send_data_kind; + // Wait for reply let (rpcreader, latency) = self.wait_for_reply(waitable_reply).await?; @@ -1423,6 +1430,26 @@ impl RPCProcessor { e.update_node_status(node_status.clone()); }); + // Report sender_info IP addresses to network manager + + if let Some(socket_address) = sender_info.socket_address { + match send_data_kind { + SendDataKind::LocalDirect => { + self.network_manager() + .report_local_socket_address(socket_address, peer) + .await; + } + SendDataKind::GlobalDirect => { + self.network_manager() + .report_global_socket_address(socket_address, peer) + .await; + } + SendDataKind::GlobalIndirect => { + // Do nothing in this case, as the socket address returned here would be for any node other than ours + } + } + } + // Return the answer for anyone who may care let out = InfoAnswer { latency,