diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index cf6744ef..febad51c 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -173,6 +173,7 @@ impl ConnectionManager { Ok(Some(conn)) => { // Connection added and a different one LRU'd out // Send it to be terminated + log_net!(debug "== LRU kill connection due to limit: {:?}", conn); let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); } Err(ConnectionTableAddError::AddressFilter(conn, e)) => { @@ -205,40 +206,6 @@ impl ConnectionManager { .get_connection_by_descriptor(descriptor) } - // Terminate any connections that would collide with a new connection - // using different protocols to the same remote address and port. Used to ensure - // that we can switch quickly between TCP and WS if necessary to the same node - // Returns true if we killed off colliding connections - async fn kill_off_colliding_connections(&self, dial_info: &DialInfo) -> bool { - let protocol_type = dial_info.protocol_type(); - let socket_address = dial_info.socket_address(); - - let killed = self.arc.connection_table.drain_filter(|prior_descriptor| { - // If the protocol types aren't the same, then this is a candidate to be killed off - // If they are the same, then we would just return the exact same connection from get_or_create_connection() - if prior_descriptor.protocol_type() == protocol_type { - return false; - } - // If the prior remote is not the same address, then we're not going to collide - if *prior_descriptor.remote().socket_address() != socket_address { - return false; - } - - log_net!(debug - ">< Terminating connection prior_descriptor={:?}", - prior_descriptor - ); - true - }); - // Wait for the killed connections to end their recv loops - let did_kill = !killed.is_empty(); - for mut k in killed { - k.close(); - k.await; - } - did_kill - } - /// Called when we want to create a new connection or get the current one that already exists /// This will kill off any connections that are in conflict with the new connection to be made /// in order to make room for the new connection in the system's connection table @@ -246,45 +213,53 @@ impl ConnectionManager { #[instrument(level = "trace", skip(self), ret, err)] pub async fn get_or_create_connection( &self, - local_addr: Option, dial_info: DialInfo, ) -> EyreResult> { - // Async lock on the remote address for atomicity per remote let peer_address = dial_info.to_peer_address(); let remote_addr = peer_address.to_socket_addr(); + let mut preferred_local_address = self + .network_manager() + .net() + .get_preferred_local_address(&dial_info); + let best_port = preferred_local_address.map(|pla| pla.port()); + // Async lock on the remote address for atomicity per remote let _lock_guard = self.arc.address_lock_table.lock_tag(remote_addr).await; - log_net!( - "== get_or_create_connection local_addr={:?} dial_info={:?}", - local_addr, - dial_info - ); - - // Kill off any possibly conflicting connections - let did_kill = self.kill_off_colliding_connections(&dial_info).await; - let mut retry_count = if did_kill { 2 } else { 0 }; + log_net!("== get_or_create_connection dial_info={:?}", dial_info); // If any connection to this remote exists that has the same protocol, return it - // Any connection will do, we don't have to match the local address - if let Some(conn) = self + // Any connection will do, we don't have to match the local address but if we can + // match the preferred port do it + if let Some(best_existing_conn) = self .arc .connection_table - .get_last_connection_by_remote(peer_address) + .get_best_connection_by_remote(best_port, peer_address) { log_net!( - "== Returning existing connection local_addr={:?} peer_address={:?}", - local_addr, - peer_address + "== Returning best existing connection {:?}", + best_existing_conn ); - return Ok(NetworkResult::Value(conn)); + return Ok(NetworkResult::Value(best_existing_conn)); + } + + // If there is a low-level connection collision here, then we release the 'preferred local address' + // so we can make a second connection with an ephemeral port + if self + .arc + .connection_table + .check_for_colliding_connection(&dial_info) + { + preferred_local_address = None; } // Attempt new connection + let mut retry_count = 0; // Someday, if we need this + let prot_conn = network_result_try!(loop { let result_net_res = ProtocolNetworkConnection::connect( - local_addr, + preferred_local_address, &dial_info, self.arc.connection_initial_timeout_ms, self.network_manager().address_filter(), @@ -292,24 +267,28 @@ impl ConnectionManager { .await; match result_net_res { Ok(net_res) => { - // If the connection 'already exists', then try one last time to return a connection from the table, in case - // an 'accept' happened at literally the same time as our connect - if net_res.is_already_exists() { - if let Some(conn) = self - .arc - .connection_table - .get_last_connection_by_remote(peer_address) - { - log_net!( - "== Returning existing connection in race local_addr={:?} peer_address={:?}", - local_addr, - peer_address - ); - - return Ok(NetworkResult::Value(conn)); - } - } + // // If the connection 'already exists', then try one last time to return a connection from the table, in case + // // an 'accept' happened at literally the same time as our connect. A preferred local address must have been + // // specified otherwise we would have picked a different ephemeral port and this could not have happened + // if net_res.is_already_exists() && preferred_local_address.is_some() { + // // Make 'already existing' connection descriptor + // let conn_desc = ConnectionDescriptor::new( + // dial_info.to_peer_address(), + // SocketAddress::from_socket_addr(preferred_local_address.unwrap()), + // ); + // // Return the connection for this if we have it + // if let Some(conn) = self + // .arc + // .connection_table + // .get_connection_by_descriptor(conn_desc) + // { + // // Should not really happen, lets make sure we see this if it does + // log_net!(warn "== Returning existing connection in race: {:?}", conn_desc); + // return Ok(NetworkResult::Value(conn)); + // } + // } if net_res.is_value() || retry_count == 0 { + // Successful new connection, return it break net_res; } } diff --git a/veilid-core/src/network_manager/connection_table.rs b/veilid-core/src/network_manager/connection_table.rs index 3e8f11e8..2ece5f05 100644 --- a/veilid-core/src/network_manager/connection_table.rs +++ b/veilid-core/src/network_manager/connection_table.rs @@ -92,6 +92,37 @@ impl ConnectionTable { while unord.next().await.is_some() {} } + // Return true if there is another connection in the table using a different protocol type + // to the same address and port with the same low level protocol type. + // Specifically right now this checks for a TCP connection that exists to the same + // low level TCP remote as a WS or WSS connection, since they are all low-level TCP + #[instrument(level = "trace", skip(self), ret)] + pub fn check_for_colliding_connection(&self, dial_info: &DialInfo) -> bool { + let inner = self.inner.lock(); + + let protocol_type = dial_info.protocol_type(); + let low_level_protocol_type = protocol_type.low_level_protocol_type(); + + // check protocol types + let mut check_protocol_types = ProtocolTypeSet::empty(); + for check_pt in ProtocolTypeSet::all().iter() { + if check_pt != protocol_type + && check_pt.low_level_protocol_type() == low_level_protocol_type + { + check_protocol_types.insert(check_pt); + } + } + let socket_address = dial_info.socket_address(); + + for check_pt in check_protocol_types { + let check_pa = PeerAddress::new(socket_address, check_pt); + if inner.ids_by_remote.contains_key(&check_pa) { + return true; + } + } + false + } + #[instrument(level = "trace", skip(self), ret, err)] pub fn add_connection( &self, @@ -183,14 +214,42 @@ impl ConnectionTable { Some(out.get_handle()) } - //#[instrument(level = "trace", skip(self), ret)] - pub fn get_last_connection_by_remote(&self, remote: PeerAddress) -> Option { + // #[instrument(level = "trace", skip(self), ret)] + pub fn get_best_connection_by_remote( + &self, + best_port: Option, + remote: PeerAddress, + ) -> Option { let mut inner = self.inner.lock(); - let id = inner.ids_by_remote.get(&remote).map(|v| v[v.len() - 1])?; + let all_ids_by_remote = inner.ids_by_remote.get(&remote)?; let protocol_index = Self::protocol_to_index(remote.protocol_type()); - let out = inner.conn_by_id[protocol_index].get(&id).unwrap(); - Some(out.get_handle()) + if all_ids_by_remote.len() == 0 { + // no connections + return None; + } + if all_ids_by_remote.len() == 1 { + // only one connection + let id = all_ids_by_remote[0]; + let nc = inner.conn_by_id[protocol_index].get(&id).unwrap(); + return Some(nc.get_handle()); + } + // multiple connections, find the one that matches the best port, or the most recent + if let Some(best_port) = best_port { + for id in all_ids_by_remote.iter().copied() { + let nc = inner.conn_by_id[protocol_index].peek(&id).unwrap(); + if let Some(local_addr) = nc.connection_descriptor().local() { + if local_addr.port() == best_port { + let nc = inner.conn_by_id[protocol_index].get(&id).unwrap(); + return Some(nc.get_handle()); + } + } + } + } + // just return most recent network connection if a best port match can not be found + let best_id = *all_ids_by_remote.last().unwrap(); + let nc = inner.conn_by_id[protocol_index].get(&best_id).unwrap(); + Some(nc.get_handle()) } //#[instrument(level = "trace", skip(self), ret)] @@ -204,26 +263,26 @@ impl ConnectionTable { .unwrap_or_default() } - pub fn drain_filter(&self, mut filter: F) -> Vec - where - F: FnMut(ConnectionDescriptor) -> bool, - { - let mut inner = self.inner.lock(); - let mut filtered_ids = Vec::new(); - for cbi in &mut inner.conn_by_id { - for (id, conn) in cbi { - if filter(conn.connection_descriptor()) { - filtered_ids.push(*id); - } - } - } - let mut filtered_connections = Vec::new(); - for id in filtered_ids { - let conn = Self::remove_connection_records(&mut *inner, id); - filtered_connections.push(conn) - } - filtered_connections - } + // pub fn drain_filter(&self, mut filter: F) -> Vec + // where + // F: FnMut(ConnectionDescriptor) -> bool, + // { + // let mut inner = self.inner.lock(); + // let mut filtered_ids = Vec::new(); + // for cbi in &mut inner.conn_by_id { + // for (id, conn) in cbi { + // if filter(conn.connection_descriptor()) { + // filtered_ids.push(*id); + // } + // } + // } + // let mut filtered_connections = Vec::new(); + // for id in filtered_ids { + // let conn = Self::remove_connection_records(&mut *inner, id); + // filtered_connections.push(conn) + // } + // filtered_connections + // } pub fn connection_count(&self) -> usize { let inner = self.inner.lock(); diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index b7a55dad..4611dc3b 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -333,7 +333,7 @@ impl Network { } } - pub fn get_local_port(&self, protocol_type: ProtocolType) -> u16 { + pub fn get_local_port(&self, protocol_type: ProtocolType) -> Option { let inner = self.inner.lock(); let local_port = match protocol_type { ProtocolType::UDP => inner.udp_port, @@ -341,10 +341,10 @@ impl Network { ProtocolType::WS => inner.ws_port, ProtocolType::WSS => inner.wss_port, }; - local_port + Some(local_port) } - fn get_preferred_local_address(&self, dial_info: &DialInfo) -> SocketAddr { + pub fn get_preferred_local_address(&self, dial_info: &DialInfo) -> Option { let inner = self.inner.lock(); let local_port = match dial_info.protocol_type() { @@ -354,10 +354,10 @@ impl Network { ProtocolType::WSS => inner.wss_port, }; - match dial_info.address_type() { + Some(match dial_info.address_type() { AddressType::IPV4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), local_port), AddressType::IPV6 => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), local_port), - } + }) } pub fn is_usable_interface_address(&self, addr: IpAddr) -> bool { @@ -631,10 +631,9 @@ impl Network { .wrap_err("failed to send data to dial info")?); } else { // Handle connection-oriented protocols - let local_addr = self.get_preferred_local_address(&dial_info); let conn = network_result_try!( self.connection_manager() - .get_or_create_connection(Some(local_addr), dial_info.clone()) + .get_or_create_connection(dial_info.clone()) .await? ); diff --git a/veilid-core/src/network_manager/native/network_class_discovery.rs b/veilid-core/src/network_manager/native/network_class_discovery.rs index 0b13c521..d729f231 100644 --- a/veilid-core/src/network_manager/native/network_class_discovery.rs +++ b/veilid-core/src/network_manager/native/network_class_discovery.rs @@ -256,7 +256,7 @@ impl DiscoveryContext { let at = inner.address_type.unwrap(); let external_address_1 = inner.external_1_address.unwrap(); let node_1 = inner.node_1.as_ref().unwrap().clone(); - let local_port = self.net.get_local_port(pt); + let local_port = self.net.get_local_port(pt).unwrap(); (pt, llpt, at, external_address_1, node_1, local_port) }; diff --git a/veilid-core/src/network_manager/wasm/mod.rs b/veilid-core/src/network_manager/wasm/mod.rs index 5c529df7..7c3b40e0 100644 --- a/veilid-core/src/network_manager/wasm/mod.rs +++ b/veilid-core/src/network_manager/wasm/mod.rs @@ -284,7 +284,7 @@ impl Network { // Handle connection-oriented protocols let conn = network_result_try!( self.connection_manager() - .get_or_create_connection(None, dial_info.clone()) + .get_or_create_connection(dial_info.clone()) .await? ); @@ -405,6 +405,14 @@ impl Network { Vec::new() } + pub fn get_local_port(&self, protocol_type: ProtocolType) -> Option { + None + } + + pub fn get_preferred_local_address(&self, dial_info: &DialInfo) -> Option { + None + } + ////////////////////////////////////////// pub fn set_needs_public_dial_info_check( diff --git a/veilid-core/src/routing_table/debug.rs b/veilid-core/src/routing_table/debug.rs index 32c9d366..8f3875d9 100644 --- a/veilid-core/src/routing_table/debug.rs +++ b/veilid-core/src/routing_table/debug.rs @@ -90,19 +90,16 @@ impl RoutingTable { for (n, gdi) in gdis.iter().enumerate() { out += &format!(" {:>2}: {:?}\n", n, gdi); } + out + } - out += "LocalNetwork PeerInfo:\n"; + pub(crate) fn debug_info_peerinfo(&self, routing_domain: RoutingDomain) -> String { + let mut out = String::new(); out += &format!( - " {:#?}\n", - self.get_own_peer_info(RoutingDomain::LocalNetwork) + "{:?} PeerInfo:\n {:#?}\n", + routing_domain, + self.get_own_peer_info(routing_domain) ); - - out += "PublicInternet PeerInfo:\n"; - out += &format!( - " {:#?}\n", - self.get_own_peer_info(RoutingDomain::PublicInternet) - ); - out } diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index eacc3d85..3dfa1b5f 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -467,6 +467,23 @@ impl VeilidAPI { let routing_table = self.network_manager()?.routing_table(); Ok(routing_table.debug_info_dialinfo()) } + async fn debug_peerinfo(&self, args: String) -> VeilidAPIResult { + // Dump routing table peerinfo + let args: Vec = args.split_whitespace().map(|s| s.to_owned()).collect(); + let routing_table = self.network_manager()?.routing_table(); + + let routing_domain = get_debug_argument_at( + &args, + 0, + "debug_peerinfo", + "routing_domain", + get_routing_domain, + ) + .ok() + .unwrap_or(RoutingDomain::PublicInternet); + + Ok(routing_table.debug_info_peerinfo(routing_domain)) + } async fn debug_txtrecord(&self, _args: String) -> VeilidAPIResult { // Dump routing table txt record @@ -1327,6 +1344,7 @@ impl VeilidAPI { pub async fn debug_help(&self, _args: String) -> VeilidAPIResult { Ok(r#"buckets [dead|reliable] dialinfo +peerinfo [routingdomain] entries [dead|reliable] entry nodeinfo @@ -1400,6 +1418,8 @@ record list self.debug_buckets(rest).await } else if arg == "dialinfo" { self.debug_dialinfo(rest).await + } else if arg == "peerinfo" { + self.debug_peerinfo(rest).await } else if arg == "txtrecord" { self.debug_txtrecord(rest).await } else if arg == "keypair" {