diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index c36c4fa8..43087c27 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -127,7 +127,7 @@ impl ConnectionManager { if let Some(conn) = inner .connection_table - .get_last_connection_by_remote(descriptor.remote) + .get_last_connection_by_remote(descriptor.remote()) { log_net!( "== Returning existing connection local_addr={:?} peer_address={:?}", @@ -138,34 +138,39 @@ impl ConnectionManager { return Ok(conn); } - // Drop any other protocols connections that have the same local addr + // Drop any other protocols connections to this remote that have the same local addr // otherwise this connection won't succeed due to binding + let mut killed = false; if let Some(local_addr) = local_addr { if local_addr.port() != 0 { for pt in [ProtocolType::TCP, ProtocolType::WS, ProtocolType::WSS] { - let pa = PeerAddress::new(descriptor.remote.socket_address, pt); - for desc in inner + let pa = PeerAddress::new(descriptor.remote_address().clone(), pt); + for prior_descriptor in inner .connection_table .get_connection_descriptors_by_remote(pa) { let mut kill = false; - if let Some(conn_local) = desc.local { + // See if the local address would collide + if let Some(prior_local) = prior_descriptor.local() { if (local_addr.ip().is_unspecified() - || (local_addr.ip() == conn_local.to_ip_addr())) - && conn_local.port() == local_addr.port() + || prior_local.to_ip_addr().is_unspecified() + || (local_addr.ip() == prior_local.to_ip_addr())) + && prior_local.port() == local_addr.port() { kill = true; } } if kill { log_net!(debug - ">< Terminating connection local_addr={:?} peer_address={:?}", - local_addr.green(), - pa.green() + ">< Terminating connection prior_descriptor={:?}", + prior_descriptor ); - if let Err(e) = inner.connection_table.remove_connection(descriptor) { + if let Err(e) = + inner.connection_table.remove_connection(prior_descriptor) + { log_net!(error e); } + killed = true; } } } @@ -173,7 +178,21 @@ impl ConnectionManager { } // Attempt new connection - let conn = ProtocolNetworkConnection::connect(local_addr, dial_info).await?; + let mut retry_count = if killed { 2 } else { 0 }; + + let conn = loop { + match ProtocolNetworkConnection::connect(local_addr, dial_info.clone()).await { + Ok(v) => break Ok(v), + Err(e) => { + if retry_count == 0 { + break Err(e); + } + log_net!(debug "get_or_create_connection retries left: {}", retry_count); + retry_count -= 1; + intf::sleep(500).await; + } + } + }?; self.on_new_protocol_network_connection(&mut *inner, conn) } diff --git a/veilid-core/src/network_manager/connection_table.rs b/veilid-core/src/network_manager/connection_table.rs index 21ce47ff..fa1f0c54 100644 --- a/veilid-core/src/network_manager/connection_table.rs +++ b/veilid-core/src/network_manager/connection_table.rs @@ -43,7 +43,7 @@ impl ConnectionTable { pub fn add_connection(&mut self, conn: NetworkConnection) -> Result<(), String> { let descriptor = conn.connection_descriptor(); - let ip_addr = descriptor.remote.socket_address.to_ip_addr(); + let ip_addr = descriptor.remote_address().to_ip_addr(); let index = protocol_to_index(descriptor.protocol_type()); if self.conn_by_descriptor[index].contains_key(&descriptor) { @@ -72,7 +72,7 @@ impl ConnectionTable { // add connection records let descriptors = self .descriptors_by_remote - .entry(descriptor.remote) + .entry(descriptor.remote()) .or_default(); warn!("add_connection: {:?}", descriptor); @@ -125,10 +125,10 @@ impl ConnectionTable { } fn remove_connection_records(&mut self, descriptor: ConnectionDescriptor) { - let ip_addr = descriptor.remote.socket_address.to_ip_addr(); + let ip_addr = descriptor.remote_address().to_ip_addr(); // conns_by_remote - match self.descriptors_by_remote.entry(descriptor.remote) { + match self.descriptors_by_remote.entry(descriptor.remote()) { Entry::Vacant(_) => { panic!("inconsistency in connection table") } diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 978c6d06..9748642a 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -18,9 +18,9 @@ pub mod tests; pub use network_connection::*; //////////////////////////////////////////////////////////////////////////////////////// +use connection_handle::*; use connection_limits::*; use connection_manager::*; -use connection_handle::*; use dht::*; use hashlink::LruCache; use intf::*; @@ -1048,7 +1048,7 @@ impl NetworkManager { ); // Network accounting - self.stats_packet_rcvd(descriptor.remote.to_socket_addr().ip(), data.len() as u64); + self.stats_packet_rcvd(descriptor.remote_address().to_ip_addr(), data.len() as u64); // Ensure we can read the magic number if data.len() < 4 { diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index e7d859ff..5994b36a 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -310,10 +310,10 @@ impl Network { // Handle connectionless protocol if descriptor.protocol_type() == ProtocolType::UDP { // send over the best udp socket we have bound since UDP is not connection oriented - let peer_socket_addr = descriptor.remote.to_socket_addr(); + let peer_socket_addr = descriptor.remote().to_socket_addr(); if let Some(ph) = self.find_best_udp_protocol_handler( &peer_socket_addr, - &descriptor.local.map(|sa| sa.to_socket_addr()), + &descriptor.local().map(|sa| sa.to_socket_addr()), ) { log_net!( "send_data_to_existing_connection connectionless to {:?}", @@ -345,7 +345,7 @@ impl Network { // Network accounting self.network_manager() - .stats_packet_sent(descriptor.remote.to_socket_addr().ip(), data_len as u64); + .stats_packet_sent(descriptor.remote().to_socket_addr().ip(), data_len as u64); // Data was consumed Ok(None) diff --git a/veilid-core/src/network_manager/native/network_class_discovery.rs b/veilid-core/src/network_manager/native/network_class_discovery.rs index e7017ffa..620f9583 100644 --- a/veilid-core/src/network_manager/native/network_class_discovery.rs +++ b/veilid-core/src/network_manager/native/network_class_discovery.rs @@ -150,6 +150,12 @@ impl DiscoveryContext { redirect: bool, ) -> bool { let rpc = self.routing_table.rpc_processor(); + + // asking for node validation doesn't have to use the dial info filter of the dial info we are validating + let mut node_ref = node_ref.clone(); + node_ref.set_filter(None); + + // ask the node to send us a dial info validation receipt rpc.rpc_call_validate_dial_info(node_ref.clone(), dial_info, redirect) .await .map_err(logthru_net!( @@ -229,7 +235,7 @@ impl DiscoveryContext { // If we know we are not behind NAT, check our firewall status pub async fn protocol_process_no_nat(&self) -> Result<(), String> { - let (node_b, external_1_dial_info) = { + let (node_1, external_1_dial_info) = { let inner = self.inner.lock(); ( inner.node_1.as_ref().unwrap().clone(), @@ -239,7 +245,7 @@ impl DiscoveryContext { // Do a validate_dial_info on the external address from a redirected node if self - .validate_dial_info(node_b.clone(), external_1_dial_info.clone(), true) + .validate_dial_info(node_1.clone(), external_1_dial_info.clone(), true) .await { // Add public dial info with Direct dialinfo class diff --git a/veilid-core/src/network_manager/native/network_udp.rs b/veilid-core/src/network_manager/native/network_udp.rs index c858255f..ec189e04 100644 --- a/veilid-core/src/network_manager/native/network_udp.rs +++ b/veilid-core/src/network_manager/native/network_udp.rs @@ -55,7 +55,7 @@ impl Network { // Network accounting network_manager.stats_packet_rcvd( - descriptor.remote.to_socket_addr().ip(), + descriptor.remote_address().to_ip_addr(), size as u64, ); diff --git a/veilid-core/src/network_manager/native/protocol/tcp.rs b/veilid-core/src/network_manager/native/protocol/tcp.rs index 7bbf1014..ca6bafca 100644 --- a/veilid-core/src/network_manager/native/protocol/tcp.rs +++ b/veilid-core/src/network_manager/native/protocol/tcp.rs @@ -180,10 +180,10 @@ impl RawTcpProtocolHandler { // Wrap the stream in a network connection and return it let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new( - ConnectionDescriptor { - local: Some(SocketAddress::from_socket_addr(actual_local_address)), - remote: dial_info.to_peer_address(), - }, + ConnectionDescriptor::new( + dial_info.to_peer_address(), + SocketAddress::from_socket_addr(actual_local_address), + ), ps, ts, )); diff --git a/veilid-core/src/network_manager/native/protocol/ws.rs b/veilid-core/src/network_manager/native/protocol/ws.rs index 4ad6d1b4..1718b537 100644 --- a/veilid-core/src/network_manager/native/protocol/ws.rs +++ b/veilid-core/src/network_manager/native/protocol/ws.rs @@ -243,10 +243,10 @@ impl WebsocketProtocolHandler { .map_err(logthru_net!())?; // Make our connection descriptor - let descriptor = ConnectionDescriptor { - local: Some(SocketAddress::from_socket_addr(actual_local_addr)), - remote: dial_info.to_peer_address(), - }; + let descriptor = ConnectionDescriptor::new( + dial_info.to_peer_address(), + SocketAddress::from_socket_addr(actual_local_addr), + ); // Negotiate TLS if this is WSS if tls { let connector = TlsConnector::default(); diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index 87607521..6da7c35e 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -317,9 +317,8 @@ impl NetworkConnection { } log_net!( - "== Connection loop finished local_addr={:?} remote={:?}", - descriptor.local.green(), - descriptor.remote.green() + "== Connection loop finished descriptor={:?}", + descriptor.green() ); connection_manager diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index d638ff63..10154efd 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -799,7 +799,7 @@ impl RPCProcessor { let socket_address = peer_noderef .last_connection() .await - .map(|c| c.remote.socket_address); + .map(|c| c.remote_address().clone()); SenderInfo { socket_address } } @@ -905,27 +905,51 @@ impl RPCProcessor { // Redirect this request if we are asked to if redirect { + + // Find peers capable of validating this dial info + // We filter on the -outgoing- protocol capability status not the node's dial info + // Use the address type though, to ensure we reach an ipv6 capable node if this is + // an ipv6 address let routing_table = self.routing_table(); - let filter = dial_info.make_filter(true); + let filter = DialInfoFilter::global().with_address_type(dial_info.address_type()); let sender_id = rpcreader.header.envelope.get_sender_id(); - let peers = routing_table.find_fast_public_nodes_filtered(&filter); + let mut peers = routing_table.find_fast_public_nodes_filtered(&filter); if peers.is_empty() { return Err(rpc_error_internal(format!( "no peers matching filter '{:?}'", filter ))); } - for peer in peers { + for peer in &mut peers { // Ensure the peer is not the one asking for the validation if peer.node_id() == sender_id { continue; - } + } + + // Release the filter on the peer because we don't need to send the redirect with the filter + // we just wanted to make sure we only selected nodes that were capable of + // using the correct protocol for the dial info being validated + peer.set_filter(None); + + // Ensure the peer's status is known and that it is capable of + // making outbound connections for the dial info we want to verify + // and if this peer can validate dial info + let can_contact_dial_info = peer.operate(|e: &mut BucketEntry| { + if let Some(ni) = &e.node_info() { + ni.outbound_protocols.contains(dial_info.protocol_type()) && ni.can_validate_dial_info() + } else { + false + } + }); + if !can_contact_dial_info { + continue; + } // See if this peer will validate dial info let will_validate_dial_info = peer.operate(|e: &mut BucketEntry| { - if let Some(ni) = &e.peer_stats().status { - ni.will_validate_dial_info + if let Some(status) = &e.peer_stats().status { + status.will_validate_dial_info } else { true } @@ -933,6 +957,7 @@ impl RPCProcessor { if !will_validate_dial_info { continue; } + // Make a copy of the request, without the redirect flag let vdi_msg_reader = { let mut vdi_msg = ::capnp::message::Builder::new_default(); diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index b3f5110f..75d25167 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -1315,8 +1315,8 @@ impl PeerInfo { #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)] pub struct PeerAddress { - pub socket_address: SocketAddress, - pub protocol_type: ProtocolType, + socket_address: SocketAddress, + protocol_type: ProtocolType, } impl PeerAddress { @@ -1327,6 +1327,14 @@ impl PeerAddress { } } + pub fn socket_address(&self) -> &SocketAddress { + &self.socket_address + } + + pub fn protocol_type(&self) -> ProtocolType { + self.protocol_type + } + pub fn to_socket_addr(&self) -> SocketAddr { self.socket_address.to_socket_addr() } @@ -1338,8 +1346,8 @@ impl PeerAddress { #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] pub struct ConnectionDescriptor { - pub remote: PeerAddress, - pub local: Option, + remote: PeerAddress, + local: Option, } impl ConnectionDescriptor { @@ -1355,6 +1363,15 @@ impl ConnectionDescriptor { local: None, } } + pub fn remote(&self) -> PeerAddress { + self.remote + } + pub fn remote_address(&self) -> &SocketAddress { + self.remote.socket_address() + } + pub fn local(&self) -> Option { + self.local + } pub fn protocol_type(&self) -> ProtocolType { self.remote.protocol_type }