From 9b0ab866f1877b3ae425e207ee0c151647f79de1 Mon Sep 17 00:00:00 2001 From: John Smith Date: Thu, 4 Aug 2022 20:21:03 -0400 Subject: [PATCH] network --- veilid-core/src/api_tracing_layer.rs | 20 +- .../src/network_manager/connection_manager.rs | 33 +++- .../src/network_manager/connection_table.rs | 36 ++-- veilid-core/src/network_manager/mod.rs | 11 +- .../native/network_class_discovery.rs | 180 ++++++++++-------- .../tests/test_connection_table.rs | 20 +- veilid-server/src/settings.rs | 6 +- 7 files changed, 179 insertions(+), 127 deletions(-) diff --git a/veilid-core/src/api_tracing_layer.rs b/veilid-core/src/api_tracing_layer.rs index c2d6d3a5..13fe64bb 100644 --- a/veilid-core/src/api_tracing_layer.rs +++ b/veilid-core/src/api_tracing_layer.rs @@ -103,13 +103,13 @@ impl registry::LookupSpan<'a>> Layer for ApiTracingLa struct StringRecorder { display: String, - is_following_args: bool, + //is_following_args: bool, } impl StringRecorder { fn new() -> Self { StringRecorder { display: String::new(), - is_following_args: false, + // is_following_args: false, } } } @@ -123,14 +123,14 @@ impl tracing::field::Visit for StringRecorder { self.display = format!("{:?}", value) } } else { - if self.is_following_args { - // following args - writeln!(self.display).unwrap(); - } else { - // first arg - write!(self.display, " ").unwrap(); - self.is_following_args = true; - } + //if self.is_following_args { + // following args + // writeln!(self.display).unwrap(); + //} else { + // first arg + write!(self.display, " ").unwrap(); + //self.is_following_args = true; + //} write!(self.display, "{} = {:?};", field.name(), value).unwrap(); } } diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index 2abf2d09..5aff1fb3 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -10,6 +10,7 @@ use stop_token::future::FutureExt; #[derive(Debug)] enum ConnectionManagerEvent { Accepted(ProtocolNetworkConnection), + Dead(NetworkConnection), Finished(ConnectionDescriptor), } @@ -141,9 +142,9 @@ impl ConnectionManager { fn on_new_protocol_network_connection( &self, inner: &mut ConnectionManagerInner, - conn: ProtocolNetworkConnection, + prot_conn: ProtocolNetworkConnection, ) -> EyreResult> { - log_net!("on_new_protocol_network_connection: {:?}", conn); + log_net!("on_new_protocol_network_connection: {:?}", prot_conn); // Wrap with NetworkConnection object to start the connection processing loop let stop_token = match &inner.stop_source { @@ -151,10 +152,30 @@ impl ConnectionManager { None => bail!("not creating connection because we are stopping"), }; - let conn = NetworkConnection::from_protocol(self.clone(), stop_token, conn); + let conn = NetworkConnection::from_protocol(self.clone(), stop_token, prot_conn); let handle = conn.get_handle(); // Add to the connection table - inner.connection_table.add_connection(conn)?; + match inner.connection_table.add_connection(conn) { + Ok(None) => { + // Connection added + } + Ok(Some(conn)) => { + // Connection added and a different one LRU'd out + let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); + } + Err(ConnectionTableAddError::AddressFilter(conn, e)) => { + // Connection filtered + let desc = conn.connection_descriptor(); + let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); + return Err(eyre!("connection filtered: {:?} ({})", desc, e)); + } + Err(ConnectionTableAddError::AlreadyExists(conn)) => { + // Connection already exists + let desc = conn.connection_descriptor(); + let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); + return Err(eyre!("connection already exists: {:?}", desc)); + } + }; Ok(NetworkResult::Value(handle)) } @@ -319,6 +340,10 @@ impl ConnectionManager { } }; } + ConnectionManagerEvent::Dead(mut conn) => { + conn.close(); + conn.await; + } ConnectionManagerEvent::Finished(desc) => { let conn = { let mut inner_lock = self.arc.inner.lock(); diff --git a/veilid-core/src/network_manager/connection_table.rs b/veilid-core/src/network_manager/connection_table.rs index d2e6d094..b8b9db93 100644 --- a/veilid-core/src/network_manager/connection_table.rs +++ b/veilid-core/src/network_manager/connection_table.rs @@ -4,25 +4,25 @@ use futures_util::StreamExt; use hashlink::LruCache; /////////////////////////////////////////////////////////////////////////////// -#[derive(ThisError, Debug, Clone, Eq, PartialEq)] +#[derive(ThisError, Debug)] pub enum ConnectionTableAddError { #[error("Connection already added to table")] - AlreadyExists, + AlreadyExists(NetworkConnection), #[error("Connection address was filtered")] - AddressFilter(AddressFilterError), + AddressFilter(NetworkConnection, AddressFilterError), } impl ConnectionTableAddError { - pub fn already_exists() -> Self { - ConnectionTableAddError::AlreadyExists + pub fn already_exists(conn: NetworkConnection) -> Self { + ConnectionTableAddError::AlreadyExists(conn) } - pub fn address_filter(err: AddressFilterError) -> Self { - ConnectionTableAddError::AddressFilter(err) + pub fn address_filter(conn: NetworkConnection, err: AddressFilterError) -> Self { + ConnectionTableAddError::AddressFilter(conn, err) } } /////////////////////////////////////////////////////////////////////////////// -#[derive(ThisError, Debug, Clone, Eq, PartialEq)] +#[derive(ThisError, Debug)] pub enum ConnectionTableRemoveError { #[error("Connection not in table")] NotInTable, @@ -89,19 +89,23 @@ impl ConnectionTable { pub fn add_connection( &mut self, conn: NetworkConnection, - ) -> Result<(), ConnectionTableAddError> { + ) -> Result, ConnectionTableAddError> { let descriptor = conn.connection_descriptor(); let ip_addr = descriptor.remote_address().to_ip_addr(); let index = protocol_to_index(descriptor.protocol_type()); if self.conn_by_descriptor[index].contains_key(&descriptor) { - return Err(ConnectionTableAddError::already_exists()); + return Err(ConnectionTableAddError::already_exists(conn)); } // Filter by ip for connection limits - self.address_filter - .add(ip_addr) - .map_err(ConnectionTableAddError::address_filter)?; + match self.address_filter.add(ip_addr) { + Ok(()) => {} + Err(e) => { + // send connection to get cleaned up cleanly + return Err(ConnectionTableAddError::address_filter(conn, e)); + } + }; // Add the connection to the table let res = self.conn_by_descriptor[index].insert(descriptor.clone(), conn); @@ -109,9 +113,11 @@ impl ConnectionTable { // if we have reached the maximum number of connections per protocol type // then drop the least recently used connection + let mut out_conn = None; if self.conn_by_descriptor[index].len() > self.max_connections[index] { - if let Some((lruk, _)) = self.conn_by_descriptor[index].remove_lru() { + if let Some((lruk, lru_conn)) = self.conn_by_descriptor[index].remove_lru() { debug!("connection lru out: {:?}", lruk); + out_conn = Some(lru_conn); self.remove_connection_records(lruk); } } @@ -124,7 +130,7 @@ impl ConnectionTable { descriptors.push(descriptor); - Ok(()) + Ok(out_conn) } pub fn get_connection(&mut self, descriptor: ConnectionDescriptor) -> Option { diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index dbdd9210..8cfa4f7d 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -1097,11 +1097,12 @@ impl NetworkManager { // Wait for the return receipt let inbound_nr = match eventual_value.await.take_value().unwrap() { ReceiptEvent::ReturnedOutOfBand => { - bail!("reverse connect receipt should be returned in-band"); + return Ok(NetworkResult::invalid_message( + "reverse connect receipt should be returned in-band", + )); } ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef, ReceiptEvent::Expired => { - //bail!("reverse connect receipt expired from {:?}", target_nr); return Ok(NetworkResult::timeout()); } ReceiptEvent::Cancelled => { @@ -1180,11 +1181,13 @@ impl NetworkManager { // Wait for the return receipt let inbound_nr = match eventual_value.await.take_value().unwrap() { ReceiptEvent::ReturnedOutOfBand => { - bail!("hole punch receipt should be returned in-band"); + return Ok(NetworkResult::invalid_message( + "hole punch receipt should be returned in-band", + )); } ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef, ReceiptEvent::Expired => { - bail!("hole punch receipt expired from {}", target_nr); + return Ok(NetworkResult::timeout()); } ReceiptEvent::Cancelled => { bail!("hole punch receipt cancelled from {}", target_nr); diff --git a/veilid-core/src/network_manager/native/network_class_discovery.rs b/veilid-core/src/network_manager/native/network_class_discovery.rs index 18f96c2e..bd06082b 100644 --- a/veilid-core/src/network_manager/native/network_class_discovery.rs +++ b/veilid-core/src/network_manager/native/network_class_discovery.rs @@ -496,105 +496,119 @@ impl Network { // Do UDPv4+v6 at the same time as everything else if protocol_config.inbound.contains(ProtocolType::UDP) { // UDPv4 - unord.push( - async { - let udpv4_context = DiscoveryContext::new(self.routing_table(), self.clone()); - if let Err(e) = self - .update_ipv4_protocol_dialinfo(&udpv4_context, ProtocolType::UDP) - .await - { - log_net!(debug "Failed UDPv4 dialinfo discovery: {}", e); - return None; + if protocol_config.family_global.contains(AddressType::IPV4) { + unord.push( + async { + let udpv4_context = + DiscoveryContext::new(self.routing_table(), self.clone()); + if let Err(e) = self + .update_ipv4_protocol_dialinfo(&udpv4_context, ProtocolType::UDP) + .await + { + log_net!(debug "Failed UDPv4 dialinfo discovery: {}", e); + return None; + } + Some(vec![udpv4_context]) } - Some(vec![udpv4_context]) - } - .boxed(), - ); + .boxed(), + ); + } // UDPv6 + if protocol_config.family_global.contains(AddressType::IPV6) { + unord.push( + async { + let udpv6_context = + DiscoveryContext::new(self.routing_table(), self.clone()); + if let Err(e) = self + .update_ipv6_protocol_dialinfo(&udpv6_context, ProtocolType::UDP) + .await + { + log_net!(debug "Failed UDPv6 dialinfo discovery: {}", e); + return None; + } + Some(vec![udpv6_context]) + } + .boxed(), + ); + } + } + + // Do TCPv4 + WSv4 in series because they may use the same connection 5-tuple + if protocol_config.family_global.contains(AddressType::IPV4) { unord.push( async { - let udpv6_context = DiscoveryContext::new(self.routing_table(), self.clone()); - if let Err(e) = self - .update_ipv6_protocol_dialinfo(&udpv6_context, ProtocolType::UDP) - .await - { - log_net!(debug "Failed UDPv6 dialinfo discovery: {}", e); - return None; + // TCPv4 + let mut out = Vec::::new(); + if protocol_config.inbound.contains(ProtocolType::TCP) { + let tcpv4_context = + DiscoveryContext::new(self.routing_table(), self.clone()); + if let Err(e) = self + .update_ipv4_protocol_dialinfo(&tcpv4_context, ProtocolType::TCP) + .await + { + log_net!(debug "Failed TCPv4 dialinfo discovery: {}", e); + return None; + } + out.push(tcpv4_context); } - Some(vec![udpv6_context]) + + // WSv4 + if protocol_config.inbound.contains(ProtocolType::WS) { + let wsv4_context = + DiscoveryContext::new(self.routing_table(), self.clone()); + if let Err(e) = self + .update_ipv4_protocol_dialinfo(&wsv4_context, ProtocolType::WS) + .await + { + log_net!(debug "Failed WSv4 dialinfo discovery: {}", e); + return None; + } + out.push(wsv4_context); + } + Some(out) } .boxed(), ); } - // Do TCPv4 + WSv4 in series because they may use the same connection 5-tuple - unord.push( - async { - // TCPv4 - let mut out = Vec::::new(); - if protocol_config.inbound.contains(ProtocolType::TCP) { - let tcpv4_context = DiscoveryContext::new(self.routing_table(), self.clone()); - if let Err(e) = self - .update_ipv4_protocol_dialinfo(&tcpv4_context, ProtocolType::TCP) - .await - { - log_net!(debug "Failed TCPv4 dialinfo discovery: {}", e); - return None; - } - out.push(tcpv4_context); - } - - // WSv4 - if protocol_config.inbound.contains(ProtocolType::WS) { - let wsv4_context = DiscoveryContext::new(self.routing_table(), self.clone()); - if let Err(e) = self - .update_ipv4_protocol_dialinfo(&wsv4_context, ProtocolType::WS) - .await - { - log_net!(debug "Failed WSv4 dialinfo discovery: {}", e); - return None; - } - out.push(wsv4_context); - } - Some(out) - } - .boxed(), - ); - // Do TCPv6 + WSv6 in series because they may use the same connection 5-tuple - unord.push( - async { - // TCPv6 - let mut out = Vec::::new(); - if protocol_config.inbound.contains(ProtocolType::TCP) { - let tcpv6_context = DiscoveryContext::new(self.routing_table(), self.clone()); - if let Err(e) = self - .update_ipv6_protocol_dialinfo(&tcpv6_context, ProtocolType::TCP) - .await - { - log_net!(debug "Failed TCPv6 dialinfo discovery: {}", e); - return None; + if protocol_config.family_global.contains(AddressType::IPV6) { + unord.push( + async { + // TCPv6 + let mut out = Vec::::new(); + if protocol_config.inbound.contains(ProtocolType::TCP) { + let tcpv6_context = + DiscoveryContext::new(self.routing_table(), self.clone()); + if let Err(e) = self + .update_ipv6_protocol_dialinfo(&tcpv6_context, ProtocolType::TCP) + .await + { + log_net!(debug "Failed TCPv6 dialinfo discovery: {}", e); + return None; + } + out.push(tcpv6_context); } - out.push(tcpv6_context); - } - // WSv6 - if protocol_config.inbound.contains(ProtocolType::WS) { - let wsv6_context = DiscoveryContext::new(self.routing_table(), self.clone()); - if let Err(e) = self - .update_ipv6_protocol_dialinfo(&wsv6_context, ProtocolType::WS) - .await - { - log_net!(debug "Failed WSv6 dialinfo discovery: {}", e); - return None; + // WSv6 + if protocol_config.inbound.contains(ProtocolType::WS) { + let wsv6_context = + DiscoveryContext::new(self.routing_table(), self.clone()); + if let Err(e) = self + .update_ipv6_protocol_dialinfo(&wsv6_context, ProtocolType::WS) + .await + { + log_net!(debug "Failed WSv6 dialinfo discovery: {}", e); + return None; + } + out.push(wsv6_context); } - out.push(wsv6_context); + Some(out) } - Some(out) - } - .boxed(), - ); + .boxed(), + ); + } // Wait for all discovery futures to complete and collect contexts let mut contexts = Vec::::new(); diff --git a/veilid-core/src/network_manager/tests/test_connection_table.rs b/veilid-core/src/network_manager/tests/test_connection_table.rs index cc04628b..6e4243c2 100644 --- a/veilid-core/src/network_manager/tests/test_connection_table.rs +++ b/veilid-core/src/network_manager/tests/test_connection_table.rs @@ -85,8 +85,9 @@ pub async fn test_add_get_remove() { assert_eq!( table .remove_connection(a2) - .map(|c| c.connection_descriptor()), - Ok(a1) + .map(|c| c.connection_descriptor()) + .unwrap(), + a1 ); assert_eq!(table.connection_count(), 0); assert_err!(table.remove_connection(a2)); @@ -106,20 +107,23 @@ pub async fn test_add_get_remove() { assert_eq!( table .remove_connection(a2) - .map(|c| c.connection_descriptor()), - Ok(a2) + .map(|c| c.connection_descriptor()) + .unwrap(), + a2 ); assert_eq!( table .remove_connection(a3) - .map(|c| c.connection_descriptor()), - Ok(a3) + .map(|c| c.connection_descriptor()) + .unwrap(), + a3 ); assert_eq!( table .remove_connection(a4) - .map(|c| c.connection_descriptor()), - Ok(a4) + .map(|c| c.connection_descriptor()) + .unwrap(), + a4 ); assert_eq!(table.connection_count(), 0); } diff --git a/veilid-server/src/settings.rs b/veilid-server/src/settings.rs index 4c1ee145..2183815a 100644 --- a/veilid-server/src/settings.rs +++ b/veilid-server/src/settings.rs @@ -58,10 +58,10 @@ core: network: connection_initial_timeout_ms: 2000 connection_inactivity_timeout_ms: 60000 - max_connections_per_ip4: 8 - max_connections_per_ip6_prefix: 8 + max_connections_per_ip4: 32 + max_connections_per_ip6_prefix: 32 max_connections_per_ip6_prefix_size: 56 - max_connection_frequency_per_min: 8 + max_connection_frequency_per_min: 128 client_whitelist_timeout_ms: 300000 reverse_connection_receipt_time_ms: 5000 hole_punch_receipt_time_ms: 5000