diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index 5aff1fb3..a704caf0 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -203,12 +203,15 @@ impl ConnectionManager { ); let peer_address = dial_info.to_peer_address(); + + // Make a connection to the address + // reject connections to addresses with an unknown or unsupported peer scope let descriptor = match local_addr { Some(la) => { ConnectionDescriptor::new(peer_address, SocketAddress::from_socket_addr(la)) } None => ConnectionDescriptor::new_no_local(peer_address), - }; + }?; // If any connection to this remote exists that has the same protocol, return it // Any connection will do, we don't have to match the local address diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index bad3033f..3ec829e0 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -123,11 +123,13 @@ pub(crate) enum ContactMethod { #[derive(Copy, Clone, Debug)] pub enum SendDataKind { - LocalDirect, - GlobalDirect, - GlobalIndirect, + Direct(ConnectionDescriptor), + Indirect, } +#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)] +struct PublicAddressCheckCacheKey(ProtocolType, AddressType); + // The mutable state of the network manager struct NetworkManagerInner { routing_table: Option, @@ -136,7 +138,8 @@ struct NetworkManagerInner { stats: NetworkManagerStats, client_whitelist: LruCache, relay_node: Option, - public_address_check_cache: LruCache, + public_address_check_cache: + BTreeMap>, protocol_config: Option, public_inbound_dial_info_filter: Option, local_inbound_dial_info_filter: Option, @@ -172,7 +175,7 @@ impl NetworkManager { stats: NetworkManagerStats::default(), client_whitelist: LruCache::new_unbounded(), relay_node: None, - public_address_check_cache: LruCache::new(8), + public_address_check_cache: BTreeMap::new(), protocol_config: None, public_inbound_dial_info_filter: None, local_inbound_dial_info_filter: None, @@ -760,9 +763,6 @@ impl NetworkManager { Some(nr) => nr, }; - // Remove any 'last connection' to this peer to ensure we start a new connection with the reverse connection - peer_nr.clear_last_connection(); - // Make a reverse connection to the peer and send the receipt to it rpc.rpc_call_return_receipt(Destination::Direct(peer_nr), None, receipt) .await @@ -786,9 +786,6 @@ impl NetworkManager { Some(nr) => nr, }; - // Remove any 'last connection' to this peer to ensure we start a new connection with the hole punch - peer_nr.clear_last_connection(); - // Get the udp direct dialinfo for the hole punch let outbound_dif = self .get_outbound_dial_info_filter(RoutingDomain::PublicInternet) @@ -897,14 +894,7 @@ impl NetworkManager { let out = self.build_envelope(envelope_node_id, version, body)?; // Send the envelope via whatever means necessary - let send_data_kind = network_result_try!(self.send_data(node_ref.clone(), out).await?); - - // If we asked to relay from the start, then this is always indirect - Ok(NetworkResult::value(if envelope_node_id != via_node_id { - SendDataKind::GlobalIndirect - } else { - send_data_kind - })) + self.send_data(node_ref.clone(), out).await } // Called by the RPC handler when we want to issue an direct receipt @@ -1094,7 +1084,7 @@ impl NetworkManager { relay_nr: NodeRef, target_nr: NodeRef, data: Vec, - ) -> EyreResult> { + ) -> EyreResult> { // Build a return receipt for the signal let receipt_timeout = ms_to_us(self.config.get().network.reverse_connection_receipt_time_ms); @@ -1143,7 +1133,7 @@ impl NetworkManager { .send_data_to_existing_connection(descriptor, data) .await? { - None => Ok(NetworkResult::value(())), + None => Ok(NetworkResult::value(descriptor)), Some(_) => Ok(NetworkResult::no_connection_other( "unable to send over reverse connection", )), @@ -1161,11 +1151,11 @@ impl NetworkManager { relay_nr: NodeRef, target_nr: NodeRef, data: Vec, - ) -> EyreResult> { + ) -> EyreResult> { // Ensure we are filtered down to UDP (the only hole punch protocol supported today) assert!(target_nr .filter_ref() - .map(|dif| dif.protocol_set == ProtocolTypeSet::only(ProtocolType::UDP)) + .map(|dif| dif.protocol_type_set == ProtocolTypeSet::only(ProtocolType::UDP)) .unwrap_or_default()); // Build a return receipt for the signal @@ -1233,7 +1223,7 @@ impl NetworkManager { .send_data_to_existing_connection(descriptor, data) .await? { - None => Ok(NetworkResult::value(())), + None => Ok(NetworkResult::value(descriptor)), Some(_) => Ok(NetworkResult::no_connection_other( "unable to send over hole punch", )), @@ -1260,20 +1250,19 @@ impl NetworkManager { let this = self.clone(); Box::pin(async move { // First try to send data to the last socket we've seen this peer on - let data = if let Some(descriptor) = node_ref.last_connection().await { + let data = if let Some(connection_descriptor) = node_ref.last_connection().await { match this .net() - .send_data_to_existing_connection(descriptor, data) + .send_data_to_existing_connection(connection_descriptor, data) .await? { None => { - return Ok( - if descriptor.matches_peer_scope(PeerScopeSet::only(PeerScope::Local)) { - NetworkResult::value(SendDataKind::LocalDirect) - } else { - NetworkResult::value(SendDataKind::GlobalDirect) - }, - ); + // Update timestamp for this last connection since we just sent to it + node_ref.set_last_connection(connection_descriptor, intf::get_timestamp()); + + return Ok(NetworkResult::value(SendDataKind::Direct( + connection_descriptor, + ))); } Some(d) => d, } @@ -1291,32 +1280,35 @@ impl NetworkManager { match contact_method { ContactMethod::OutboundRelay(relay_nr) | ContactMethod::InboundRelay(relay_nr) => { network_result_try!(this.send_data(relay_nr, data).await?); - Ok(NetworkResult::value(SendDataKind::GlobalIndirect)) + Ok(NetworkResult::value(SendDataKind::Indirect)) } ContactMethod::Direct(dial_info) => { - let send_data_kind = if dial_info.is_local() { - SendDataKind::LocalDirect - } else { - SendDataKind::GlobalDirect - }; let connection_descriptor = network_result_try!( this.net().send_data_to_dial_info(dial_info, data).await? ); // If we connected to this node directly, save off the last connection so we can use it again node_ref.set_last_connection(connection_descriptor, intf::get_timestamp()); - Ok(NetworkResult::value(send_data_kind)) + Ok(NetworkResult::value(SendDataKind::Direct( + connection_descriptor, + ))) } ContactMethod::SignalReverse(relay_nr, target_node_ref) => { - network_result_try!( + let connection_descriptor = network_result_try!( this.do_reverse_connect(relay_nr, target_node_ref, data) .await? ); - Ok(NetworkResult::value(SendDataKind::GlobalDirect)) + Ok(NetworkResult::value(SendDataKind::Direct( + connection_descriptor, + ))) } ContactMethod::SignalHolePunch(relay_nr, target_node_ref) => { - network_result_try!(this.do_hole_punch(relay_nr, target_node_ref, data).await?); - Ok(NetworkResult::value(SendDataKind::GlobalDirect)) + let connection_descriptor = network_result_try!( + this.do_hole_punch(relay_nr, target_node_ref, data).await? + ); + Ok(NetworkResult::value(SendDataKind::Direct( + connection_descriptor, + ))) } ContactMethod::Unreachable => Ok(NetworkResult::no_connection_other( "Can't send to this node", @@ -1626,6 +1618,7 @@ impl NetworkManager { pub async fn report_local_socket_address( &self, _socket_address: SocketAddress, + _connection_descriptor: ConnectionDescriptor, _reporting_peer: NodeRef, ) { // XXX: Nothing here yet. @@ -1636,16 +1629,24 @@ impl NetworkManager { // Wait until we have received confirmation from N different peers pub async fn report_global_socket_address( &self, - socket_address: SocketAddress, - reporting_peer: NodeRef, + socket_address: SocketAddress, // the socket address as seen by the remote peer + connection_descriptor: ConnectionDescriptor, // the connection descriptor used + reporting_peer: NodeRef, // the peer's noderef reporting the socket address ) { + let key = PublicAddressCheckCacheKey( + connection_descriptor.protocol_type(), + connection_descriptor.address_type(), + ); + let (net, routing_table) = { let mut inner = self.inner.lock(); // Store the reported address - inner + let pacc = inner .public_address_check_cache - .insert(reporting_peer.node_id(), socket_address); + .entry(key) + .or_insert_with(|| LruCache::new(8)); + pacc.insert(reporting_peer.node_id(), socket_address); let net = inner.components.as_ref().unwrap().net.clone(); let routing_table = inner.routing_table.as_ref().unwrap().clone(); @@ -1660,11 +1661,14 @@ impl NetworkManager { // Determine if our external address has likely changed let needs_public_address_detection = if matches!(network_class, NetworkClass::InboundCapable) { + // Get the dial info filter for this connection so we can check if we have any public dialinfo that may have changed + let dial_info_filter = connection_descriptor.make_dial_info_filter(); + // Get current external ip/port from registered global dialinfo let current_addresses: BTreeSet = routing_table .all_filtered_dial_info_details( Some(RoutingDomain::PublicInternet), - &DialInfoFilter::all(), + &dial_info_filter, ) .iter() .map(|did| did.dial_info.socket_address()) @@ -1672,11 +1676,15 @@ impl NetworkManager { // If we are inbound capable, but start to see inconsistent socket addresses from multiple reporting peers // then we zap the network class and re-detect it - let inner = self.inner.lock(); + let mut inner = self.inner.lock(); let mut inconsistencies = 0; let mut changed = false; // Iteration goes from most recent to least recent node/address pair - for (_, a) in &inner.public_address_check_cache { + let pacc = inner + .public_address_check_cache + .entry(key) + .or_insert_with(|| LruCache::new(8)); + for (_, a) in pacc { if !current_addresses.contains(a) { inconsistencies += 1; if inconsistencies >= GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT { @@ -1691,12 +1699,17 @@ impl NetworkManager { // but if we are starting to see consistent socket address from multiple reporting peers // then we may be become inbound capable, so zap the network class so we can re-detect it and any public dial info - let inner = self.inner.lock(); + let mut inner = self.inner.lock(); let mut consistencies = 0; let mut consistent = false; let mut current_address = Option::::None; // Iteration goes from most recent to least recent node/address pair - for (_, a) in &inner.public_address_check_cache { + let pacc = inner + .public_address_check_cache + .entry(key) + .or_insert_with(|| LruCache::new(8)); + + for (_, a) in pacc { if let Some(current_address) = current_address { if current_address == *a { consistencies += 1; @@ -1721,11 +1734,8 @@ impl NetworkManager { inner.public_address_check_cache.clear(); // Reset the network class and dial info so we can re-detect it - //routing_table.clear_dial_info_details(RoutingDomain::PublicInternet); - //net.reset_network_class(); - - // Do a full network reset since this doesn't take that long and ensures we get the local network stuff correct too - net.restart_network(); + routing_table.clear_dial_info_details(RoutingDomain::PublicInternet); + net.reset_network_class(); } else { warn!("Public address may have changed. Restarting the server may be required."); } diff --git a/veilid-core/src/network_manager/native/network_class_discovery.rs b/veilid-core/src/network_manager/native/network_class_discovery.rs index dde6ce19..b1c4d14c 100644 --- a/veilid-core/src/network_manager/native/network_class_discovery.rs +++ b/veilid-core/src/network_manager/native/network_class_discovery.rs @@ -80,9 +80,6 @@ impl DiscoveryContext { async fn request_public_address(&self, node_ref: NodeRef) -> Option { let rpc = self.routing_table.rpc_processor(); - // Ensure we ask for a fresh connection - node_ref.clear_last_connection(); - let res = network_result_value_or_log!(debug match rpc.rpc_call_status(node_ref.clone()).await { Ok(v) => v, Err(e) => { diff --git a/veilid-core/src/network_manager/native/protocol/tcp.rs b/veilid-core/src/network_manager/native/protocol/tcp.rs index 8608a2ad..ca610468 100644 --- a/veilid-core/src/network_manager/native/protocol/tcp.rs +++ b/veilid-core/src/network_manager/native/protocol/tcp.rs @@ -134,7 +134,8 @@ impl RawTcpProtocolHandler { ); let local_address = self.inner.lock().local_address; let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new( - ConnectionDescriptor::new(peer_addr, SocketAddress::from_socket_addr(local_address)), + ConnectionDescriptor::new(peer_addr, SocketAddress::from_socket_addr(local_address)) + .map_err(|e| io::Error::new(io::ErrorKind::AddrNotAvailable, e))?, stream, )); @@ -174,7 +175,8 @@ impl RawTcpProtocolHandler { ProtocolType::TCP, ), SocketAddress::from_socket_addr(actual_local_address), - ), + ) + .map_err(|e| io::Error::new(io::ErrorKind::AddrNotAvailable, e))?, ps, )); diff --git a/veilid-core/src/network_manager/native/protocol/udp.rs b/veilid-core/src/network_manager/native/protocol/udp.rs index a9599aed..22a3c7ae 100644 --- a/veilid-core/src/network_manager/native/protocol/udp.rs +++ b/veilid-core/src/network_manager/native/protocol/udp.rs @@ -13,24 +13,31 @@ impl RawUdpProtocolHandler { #[instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.len, ret.from))] pub async fn recv_message(&self, data: &mut [u8]) -> io::Result<(usize, ConnectionDescriptor)> { - let (size, remote_addr) = loop { + let (size, descriptor) = loop { let (size, remote_addr) = network_result_value_or_log!(debug self.socket.recv_from(data).await.into_network_result()? => continue); if size > MAX_MESSAGE_SIZE { log_net!(debug "{}({}) at {}@{}:{}", "Invalid message".green(), "received too large UDP message", file!(), line!(), column!()); continue; } - break (size, remote_addr); - }; - let peer_addr = PeerAddress::new( - SocketAddress::from_socket_addr(remote_addr), - ProtocolType::UDP, - ); - let local_socket_addr = self.socket.local_addr()?; - let descriptor = ConnectionDescriptor::new( - peer_addr, - SocketAddress::from_socket_addr(local_socket_addr), - ); + let peer_addr = PeerAddress::new( + SocketAddress::from_socket_addr(remote_addr), + ProtocolType::UDP, + ); + let local_socket_addr = self.socket.local_addr()?; + let descriptor = match ConnectionDescriptor::new( + peer_addr, + SocketAddress::from_socket_addr(local_socket_addr), + ) { + Ok(d) => d, + Err(_) => { + log_net!(debug "{}({}) at {}@{}:{}: {:?}", "Invalid peer scope".green(), "received message from invalid peer scope", file!(), line!(), column!(), peer_addr); + continue; + } + }; + + break (size, descriptor); + }; tracing::Span::current().record("ret.len", &size); tracing::Span::current().record("ret.from", &format!("{:?}", descriptor).as_str()); @@ -46,6 +53,17 @@ impl RawUdpProtocolHandler { if data.len() > MAX_MESSAGE_SIZE { bail_io_error_other!("sending too large UDP message"); } + let peer_addr = PeerAddress::new( + SocketAddress::from_socket_addr(socket_addr), + ProtocolType::UDP, + ); + let local_socket_addr = self.socket.local_addr()?; + + let descriptor = ConnectionDescriptor::new( + peer_addr, + SocketAddress::from_socket_addr(local_socket_addr), + ) + .map_err(|e| io::Error::new(io::ErrorKind::AddrNotAvailable, e))?; let len = network_result_try!(self .socket @@ -56,16 +74,6 @@ impl RawUdpProtocolHandler { bail_io_error_other!("UDP partial send") } - let peer_addr = PeerAddress::new( - SocketAddress::from_socket_addr(socket_addr), - ProtocolType::UDP, - ); - let local_socket_addr = self.socket.local_addr()?; - let descriptor = ConnectionDescriptor::new( - peer_addr, - SocketAddress::from_socket_addr(local_socket_addr), - ); - Ok(NetworkResult::value(descriptor)) } diff --git a/veilid-core/src/network_manager/native/protocol/ws.rs b/veilid-core/src/network_manager/native/protocol/ws.rs index 76c1a435..69ff5d8a 100644 --- a/veilid-core/src/network_manager/native/protocol/ws.rs +++ b/veilid-core/src/network_manager/native/protocol/ws.rs @@ -204,7 +204,8 @@ impl WebsocketProtocolHandler { ConnectionDescriptor::new( peer_addr, SocketAddress::from_socket_addr(self.arc.local_address), - ), + ) + .map_err(|e| io::Error::new(io::ErrorKind::AddrNotAvailable, e))?, ws_stream, )); @@ -259,7 +260,9 @@ impl WebsocketProtocolHandler { let descriptor = ConnectionDescriptor::new( dial_info.to_peer_address(), SocketAddress::from_socket_addr(actual_local_addr), - ); + ) + .map_err(|e| io::Error::new(io::ErrorKind::AddrNotAvailable, e))?; + // Negotiate TLS if this is WSS if tls { let connector = TlsConnector::default(); diff --git a/veilid-core/src/network_manager/native/start_protocols.rs b/veilid-core/src/network_manager/native/start_protocols.rs index 41b1ac38..687408b5 100644 --- a/veilid-core/src/network_manager/native/start_protocols.rs +++ b/veilid-core/src/network_manager/native/start_protocols.rs @@ -383,10 +383,9 @@ impl Network { // Save the bound ws port for use later on self.inner.lock().ws_port = ws_port; - trace!( + info!( "WS: starting listener on port {} at {:?}", - ws_port, - ip_addrs + ws_port, ip_addrs ); let socket_addresses = self .start_tcp_listener( @@ -510,10 +509,9 @@ impl Network { // Save the bound wss port for use later on self.inner.lock().wss_port = wss_port; - trace!( + info!( "WSS: starting listener on port {} at {:?}", - wss_port, - ip_addrs + wss_port, ip_addrs ); let socket_addresses = self .start_tcp_listener( @@ -615,10 +613,9 @@ impl Network { // Save the bound tcp port for use later on self.inner.lock().tcp_port = tcp_port; - trace!( + info!( "TCP: starting listener on port {} at {:?}", - tcp_port, - ip_addrs + tcp_port, ip_addrs ); let socket_addresses = self .start_tcp_listener( diff --git a/veilid-core/src/network_manager/tasks.rs b/veilid-core/src/network_manager/tasks.rs index 09cab642..b609e87c 100644 --- a/veilid-core/src/network_manager/tasks.rs +++ b/veilid-core/src/network_manager/tasks.rs @@ -2,7 +2,8 @@ use super::*; use crate::dht::*; use crate::xx::*; -use stop_token::future::FutureExt; +use futures_util::FutureExt; +use stop_token::future::FutureExt as StopFutureExt; impl NetworkManager { // Bootstrap lookup process @@ -328,13 +329,28 @@ impl NetworkManager { let routing_table = self.routing_table(); let relay_node_id = self.relay_node().map(|nr| nr.node_id()); - + let dids = routing_table.all_filtered_dial_info_details( + Some(RoutingDomain::PublicInternet), + &DialInfoFilter::global(), + ); let mut unord = FuturesUnordered::new(); let node_refs = routing_table.get_nodes_needing_ping(cur_ts, relay_node_id); for nr in node_refs { let rpc = rpc.clone(); - unord.push(async move { rpc.rpc_call_status(nr).await }); + if Some(nr.node_id()) == relay_node_id { + // Relay nodes get pinged over all protocols we have inbound dialinfo for + // This is so we can preserve the inbound NAT mappings at our router + for did in &dids { + let rpc = rpc.clone(); + let dif = did.dial_info.make_filter(true); + let nr_filtered = nr.filtered_clone(dif); + unord.push(async move { rpc.rpc_call_status(nr_filtered).await }.boxed()); + } + } else { + // Just do a single ping with the best protocol for all the other nodes + unord.push(async move { rpc.rpc_call_status(nr).await }.boxed()); + } } // Wait for futures to complete diff --git a/veilid-core/src/network_manager/tests/test_connection_table.rs b/veilid-core/src/network_manager/tests/test_connection_table.rs index 6e4243c2..0618af12 100644 --- a/veilid-core/src/network_manager/tests/test_connection_table.rs +++ b/veilid-core/src/network_manager/tests/test_connection_table.rs @@ -12,7 +12,8 @@ pub async fn test_add_get_remove() { let a1 = ConnectionDescriptor::new_no_local(PeerAddress::new( SocketAddress::new(Address::IPV4(Ipv4Addr::new(127, 0, 0, 1)), 8080), ProtocolType::TCP, - )); + )) + .unwrap(); let a2 = a1; let a3 = ConnectionDescriptor::new( PeerAddress::new( @@ -25,7 +26,8 @@ pub async fn test_add_get_remove() { 0, 0, ))), - ); + ) + .unwrap(); let a4 = ConnectionDescriptor::new( PeerAddress::new( SocketAddress::new(Address::IPV6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 8090), @@ -37,7 +39,8 @@ pub async fn test_add_get_remove() { 0, 0, ))), - ); + ) + .unwrap(); let a5 = ConnectionDescriptor::new( PeerAddress::new( SocketAddress::new(Address::IPV6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 8090), @@ -49,7 +52,8 @@ pub async fn test_add_get_remove() { 0, 0, ))), - ); + ) + .unwrap(); let c1 = NetworkConnection::dummy(a1); let c1h = c1.get_handle(); diff --git a/veilid-core/src/network_manager/wasm/protocol/ws.rs b/veilid-core/src/network_manager/wasm/protocol/ws.rs index 2b7bfdc1..977b97d5 100644 --- a/veilid-core/src/network_manager/wasm/protocol/ws.rs +++ b/veilid-core/src/network_manager/wasm/protocol/ws.rs @@ -106,7 +106,8 @@ impl WebsocketProtocolHandler { // Make our connection descriptor Ok(WebsocketNetworkConnection::new( - ConnectionDescriptor::new_no_local(dial_info.to_peer_address()), + ConnectionDescriptor::new_no_local(dial_info.to_peer_address()) + .map_err(|e| io::Error::new(io::ErrorKind::AddrNotAvailable, e))?, wsmeta, wsio, )) diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index b3b340df..2b50257a 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -35,11 +35,14 @@ pub enum BucketEntryState { Reliable, } +#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] +struct LastConnectionKey(PeerScope, ProtocolType, AddressType); + #[derive(Debug)] pub struct BucketEntryInner { min_max_version: Option<(u8, u8)>, seen_our_node_info: bool, - last_connection: Option<(ConnectionDescriptor, u64)>, + last_connections: BTreeMap, opt_signed_node_info: Option, opt_local_node_info: Option, peer_stats: PeerStats, @@ -162,16 +165,45 @@ impl BucketEntryInner { }) } - pub fn set_last_connection(&mut self, last_connection: ConnectionDescriptor, timestamp: u64) { - self.last_connection = Some((last_connection, timestamp)); - } - pub fn clear_last_connection(&mut self) { - self.last_connection = None; - } - pub fn last_connection(&self) -> Option<(ConnectionDescriptor, u64)> { - self.last_connection + fn descriptor_to_key(last_connection: ConnectionDescriptor) -> LastConnectionKey { + LastConnectionKey( + last_connection.peer_scope(), + last_connection.protocol_type(), + last_connection.address_type(), + ) } + // Stores a connection descriptor in this entry's table of last connections + pub fn set_last_connection(&mut self, last_connection: ConnectionDescriptor, timestamp: u64) { + let key = Self::descriptor_to_key(last_connection); + self.last_connections + .insert(key, (last_connection, timestamp)); + } + + // Clears the table of last connections to ensure we create new ones and drop any existing ones + pub fn clear_last_connections(&mut self) { + self.last_connections.clear(); + } + + // Gets the best 'last connection' that matches a set of protocol types and address types + pub fn last_connection( + &self, + dial_info_filter: Option, + ) -> Option<(ConnectionDescriptor, u64)> { + // Iterate peer scopes and protocol types and address type in order to ensure we pick the preferred protocols if all else is the same + let dif = dial_info_filter.unwrap_or_default(); + for ps in dif.peer_scope_set { + for pt in dif.protocol_type_set { + for at in dif.address_type_set { + let key = LastConnectionKey(ps, pt, at); + if let Some(v) = self.last_connections.get(&key) { + return Some(*v); + } + } + } + } + None + } pub fn set_min_max_version(&mut self, min_max_version: (u8, u8)) { self.min_max_version = Some(min_max_version); } @@ -429,7 +461,7 @@ impl BucketEntry { inner: RwLock::new(BucketEntryInner { min_max_version: None, seen_our_node_info: false, - last_connection: None, + last_connections: BTreeMap::new(), opt_signed_node_info: None, opt_local_node_info: None, peer_stats: PeerStats { diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index b4cdd58a..7e879c91 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -383,45 +383,93 @@ impl RoutingTable { out } + fn make_relay_node_filter(&self) -> impl Fn(&BucketEntryInner) -> bool { + // Get all our outbound protocol/address types + let protocol_config = self.network_manager().get_protocol_config(); + let outbound_dif = self + .network_manager() + .get_outbound_dial_info_filter(RoutingDomain::PublicInternet); + + move |e: &BucketEntryInner| { + // Ensure this node is not on our local network + let has_local_dial_info = e + .local_node_info() + .map(|l| l.has_dial_info()) + .unwrap_or(false); + if has_local_dial_info { + return false; + } + + // Disqualify nodes that don't have all our outbound protocol types + let can_serve_as_relay = e + .node_info() + .map(|n| { + let dids = + n.all_filtered_dial_info_details(|did| did.matches_filter(&outbound_dif)); + for pt in protocol_config.outbound { + for at in protocol_config.family_global { + let mut found = false; + for did in &dids { + if did.dial_info.protocol_type() == pt + && did.dial_info.address_type() == at + { + found = true; + break; + } + } + if !found { + return false; + } + } + } + true + }) + .unwrap_or(false); + if !can_serve_as_relay { + return false; + } + + true + } + } + #[instrument(level = "trace", skip(self), ret)] pub fn find_inbound_relay(&self, cur_ts: u64) -> Option { + // Get relay filter function + let relay_node_filter = self.make_relay_node_filter(); + + // Go through all entries and find fastest entry that matches filter function let inner = self.inner.read(); let inner = &*inner; let mut best_inbound_relay: Option<(DHTKey, Arc)> = None; // Iterate all known nodes for candidates Self::with_entries(inner, cur_ts, BucketEntryState::Unreliable, |k, v| { - // Ensure this node is not on our local network - if v.with(|e| { - e.local_node_info() - .map(|l| l.has_dial_info()) - .unwrap_or(false) - }) { - return Option::<()>::None; - } - - // Ensure we have the node's status - if let Some(node_status) = v.with(|e| e.peer_stats().status.clone()) { - // Ensure the node will relay - if node_status.will_relay { - // Compare against previous candidate - if let Some(best_inbound_relay) = best_inbound_relay.as_mut() { - // Less is faster - let better = v.with(|e| { - best_inbound_relay.1.with(|best| { + let v2 = v.clone(); + v.with(|e| { + // Ensure we have the node's status + if let Some(node_status) = e.peer_stats().status.clone() { + // Ensure the node will relay + if node_status.will_relay { + // Compare against previous candidate + if let Some(best_inbound_relay) = best_inbound_relay.as_mut() { + // Less is faster + let better = best_inbound_relay.1.with(|best| { BucketEntryInner::cmp_fastest_reliable(cur_ts, e, best) == std::cmp::Ordering::Less - }) - }); - if better { - *best_inbound_relay = (k, v); + }); + // Now apply filter function and see if this node should be included + if better && relay_node_filter(e) { + *best_inbound_relay = (k, v2); + } + } else if relay_node_filter(e) { + // Always store the first candidate + best_inbound_relay = Some((k, v2)); } - } else { - // Always store the first candidate - best_inbound_relay = Some((k, v)); } } - } + }); + // Don't end early, iterate through all entries Option::<()>::None }); // Return the best inbound relay noderef diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 584df8ab..c9f977dd 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -395,7 +395,7 @@ impl RoutingTable { for bucket in &mut inner.buckets { for entry in bucket.entries() { entry.1.with_mut(|e| { - e.clear_last_connection(); + e.clear_last_connections(); }); } } diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 86e489fd..34924680 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -228,24 +228,8 @@ impl NodeRef { pub async fn last_connection(&self) -> Option { // Get the last connection and the last time we saw anything with this connection - let (last_connection, last_seen) = self.operate(|e| { - if let Some((last_connection, connection_ts)) = e.last_connection() { - if let Some(last_seen_ts) = e.peer_stats().rpc_stats.last_seen_ts { - Some((last_connection, u64::max(last_seen_ts, connection_ts))) - } else { - Some((last_connection, connection_ts)) - } - } else { - None - } - })?; - - // Verify this connection matches the noderef filter - if let Some(filter) = &self.filter { - if !last_connection.matches_filter(filter) { - return None; - } - } + let (last_connection, last_seen) = + self.operate(|e| e.last_connection(self.filter.clone()))?; // Should we check the connection table? if last_connection.protocol_type().is_connection_oriented() { @@ -263,8 +247,8 @@ impl NodeRef { Some(last_connection) } - pub fn clear_last_connection(&self) { - self.operate_mut(|e| e.clear_last_connection()) + pub fn clear_last_connections(&self) { + self.operate_mut(|e| e.clear_last_connections()) } pub fn set_last_connection(&self, connection_descriptor: ConnectionDescriptor, ts: u64) { diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index 643ba932..47ca02ec 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -45,17 +45,29 @@ impl RPCProcessor { // Report sender_info IP addresses to network manager if let Some(socket_address) = status_a.sender_info.socket_address { match send_data_kind { - SendDataKind::LocalDirect => { - self.network_manager() - .report_local_socket_address(socket_address, peer) - .await; + SendDataKind::Direct(connection_descriptor) => { + match connection_descriptor.peer_scope() { + PeerScope::Global => { + self.network_manager() + .report_global_socket_address( + socket_address, + connection_descriptor, + peer, + ) + .await; + } + PeerScope::Local => { + self.network_manager() + .report_local_socket_address( + socket_address, + connection_descriptor, + peer, + ) + .await; + } + } } - SendDataKind::GlobalDirect => { - self.network_manager() - .report_global_socket_address(socket_address, peer) - .await; - } - SendDataKind::GlobalIndirect => { + SendDataKind::Indirect => { // Do nothing in this case, as the socket address returned here would be for any node other than ours } } diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 9d1b3d7e..bc163f9a 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -732,16 +732,16 @@ impl FromStr for SocketAddress { #[derive(Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct DialInfoFilter { pub peer_scope_set: PeerScopeSet, - pub protocol_set: ProtocolTypeSet, - pub address_set: AddressTypeSet, + pub protocol_type_set: ProtocolTypeSet, + pub address_type_set: AddressTypeSet, } impl Default for DialInfoFilter { fn default() -> Self { Self { peer_scope_set: PeerScopeSet::all(), - protocol_set: ProtocolTypeSet::all(), - address_set: AddressTypeSet::all(), + protocol_type_set: ProtocolTypeSet::all(), + address_type_set: AddressTypeSet::all(), } } } @@ -750,57 +750,57 @@ impl DialInfoFilter { pub fn all() -> Self { Self { peer_scope_set: PeerScopeSet::all(), - protocol_set: ProtocolTypeSet::all(), - address_set: AddressTypeSet::all(), + protocol_type_set: ProtocolTypeSet::all(), + address_type_set: AddressTypeSet::all(), } } pub fn global() -> Self { Self { peer_scope_set: PeerScopeSet::only(PeerScope::Global), - protocol_set: ProtocolTypeSet::all(), - address_set: AddressTypeSet::all(), + protocol_type_set: ProtocolTypeSet::all(), + address_type_set: AddressTypeSet::all(), } } pub fn local() -> Self { Self { peer_scope_set: PeerScopeSet::only(PeerScope::Local), - protocol_set: ProtocolTypeSet::all(), - address_set: AddressTypeSet::all(), + protocol_type_set: ProtocolTypeSet::all(), + address_type_set: AddressTypeSet::all(), } } pub fn scoped(peer_scope: PeerScope) -> Self { Self { peer_scope_set: PeerScopeSet::only(peer_scope), - protocol_set: ProtocolTypeSet::all(), - address_set: AddressTypeSet::all(), + protocol_type_set: ProtocolTypeSet::all(), + address_type_set: AddressTypeSet::all(), } } pub fn with_protocol_type(mut self, protocol_type: ProtocolType) -> Self { - self.protocol_set = ProtocolTypeSet::only(protocol_type); + self.protocol_type_set = ProtocolTypeSet::only(protocol_type); self } pub fn with_protocol_type_set(mut self, protocol_set: ProtocolTypeSet) -> Self { - self.protocol_set = protocol_set; + self.protocol_type_set = protocol_set; self } pub fn with_address_type(mut self, address_type: AddressType) -> Self { - self.address_set = AddressTypeSet::only(address_type); + self.address_type_set = AddressTypeSet::only(address_type); self } pub fn with_address_type_set(mut self, address_set: AddressTypeSet) -> Self { - self.address_set = address_set; + self.address_type_set = address_set; self } pub fn filtered(mut self, other_dif: DialInfoFilter) -> Self { self.peer_scope_set &= other_dif.peer_scope_set; - self.protocol_set &= other_dif.protocol_set; - self.address_set &= other_dif.address_set; + self.protocol_type_set &= other_dif.protocol_type_set; + self.address_type_set &= other_dif.address_type_set; self } pub fn is_dead(&self) -> bool { self.peer_scope_set.is_empty() - || self.protocol_set.is_empty() - || self.address_set.is_empty() + || self.protocol_type_set.is_empty() + || self.address_type_set.is_empty() } } @@ -810,11 +810,11 @@ impl fmt::Debug for DialInfoFilter { if self.peer_scope_set != PeerScopeSet::all() { out += &format!("+{:?}", self.peer_scope_set); } - if self.protocol_set != ProtocolTypeSet::all() { - out += &format!("+{:?}", self.protocol_set); + if self.protocol_type_set != ProtocolTypeSet::all() { + out += &format!("+{:?}", self.protocol_type_set); } - if self.address_set != AddressTypeSet::all() { - out += &format!("+{:?}", self.address_set); + if self.address_type_set != AddressTypeSet::all() { + out += &format!("+{:?}", self.address_type_set); } write!(f, "[{}]", out) } @@ -1171,8 +1171,8 @@ impl DialInfo { } else { PeerScopeSet::all() }, - protocol_set: ProtocolTypeSet::only(self.protocol_type()), - address_set: AddressTypeSet::only(self.address_type()), + protocol_type_set: ProtocolTypeSet::only(self.protocol_type()), + address_type_set: AddressTypeSet::only(self.address_type()), } } @@ -1360,10 +1360,10 @@ impl MatchesDialInfoFilter for DialInfo { if !self.matches_peer_scope(filter.peer_scope_set) { return false; } - if !filter.protocol_set.contains(self.protocol_type()) { + if !filter.protocol_type_set.contains(self.protocol_type()) { return false; } - if !filter.address_set.contains(self.address_type()) { + if !filter.address_type_set.contains(self.address_type()) { return false; } true @@ -1490,17 +1490,28 @@ pub struct ConnectionDescriptor { } impl ConnectionDescriptor { - pub fn new(remote: PeerAddress, local: SocketAddress) -> Self { - Self { + fn validate_peer_scope(remote: PeerAddress) -> Result<(), VeilidAPIError> { + // Verify address is in one of our peer scopes we care about + let addr = remote.socket_address.address(); + if !addr.is_global() && !addr.is_local() { + return Err(VeilidAPIError::generic("not a valid peer scope")); + } + Ok(()) + } + + pub fn new(remote: PeerAddress, local: SocketAddress) -> Result { + Self::validate_peer_scope(remote)?; + Ok(Self { remote, local: Some(local), - } + }) } - pub fn new_no_local(remote: PeerAddress) -> Self { - Self { + pub fn new_no_local(remote: PeerAddress) -> Result { + Self::validate_peer_scope(remote)?; + Ok(Self { remote, local: None, - } + }) } pub fn remote(&self) -> PeerAddress { self.remote @@ -1517,22 +1528,20 @@ impl ConnectionDescriptor { pub fn address_type(&self) -> AddressType { self.remote.address_type() } - pub fn peer_scope(&self) -> Option { + pub fn peer_scope(&self) -> PeerScope { let addr = self.remote.socket_address.address(); if addr.is_global() { - return Some(PeerScope::Global); + return PeerScope::Global; } - if addr.is_local() { - return Some(PeerScope::Local); - } - None + PeerScope::Local + } + pub fn make_dial_info_filter(&self) -> DialInfoFilter { + DialInfoFilter::scoped(self.peer_scope()) + .with_protocol_type(self.protocol_type()) + .with_address_type(self.address_type()) } pub fn matches_peer_scope(&self, scope: PeerScopeSet) -> bool { - if let Some(ps) = self.peer_scope() { - scope.contains(ps) - } else { - false - } + scope.contains(self.peer_scope()) } } @@ -1541,10 +1550,10 @@ impl MatchesDialInfoFilter for ConnectionDescriptor { if !self.matches_peer_scope(filter.peer_scope_set) { return false; } - if !filter.protocol_set.contains(self.protocol_type()) { + if !filter.protocol_type_set.contains(self.protocol_type()) { return false; } - if !filter.address_set.contains(self.address_type()) { + if !filter.address_type_set.contains(self.address_type()) { return false; } true