network
This commit is contained in:
		| @@ -103,13 +103,13 @@ impl<S: Subscriber + for<'a> registry::LookupSpan<'a>> Layer<S> for ApiTracingLa | ||||
|  | ||||
| struct StringRecorder { | ||||
|     display: String, | ||||
|     is_following_args: bool, | ||||
|     //is_following_args: bool, | ||||
| } | ||||
| impl StringRecorder { | ||||
|     fn new() -> Self { | ||||
|         StringRecorder { | ||||
|             display: String::new(), | ||||
|             is_following_args: false, | ||||
|             //      is_following_args: false, | ||||
|         } | ||||
|     } | ||||
| } | ||||
| @@ -123,14 +123,14 @@ impl tracing::field::Visit for StringRecorder { | ||||
|                 self.display = format!("{:?}", value) | ||||
|             } | ||||
|         } else { | ||||
|             if self.is_following_args { | ||||
|                 // following args | ||||
|                 writeln!(self.display).unwrap(); | ||||
|             } else { | ||||
|                 // first arg | ||||
|                 write!(self.display, " ").unwrap(); | ||||
|                 self.is_following_args = true; | ||||
|             } | ||||
|             //if self.is_following_args { | ||||
|             // following args | ||||
|             //    writeln!(self.display).unwrap(); | ||||
|             //} else { | ||||
|             // first arg | ||||
|             write!(self.display, " ").unwrap(); | ||||
|             //self.is_following_args = true; | ||||
|             //} | ||||
|             write!(self.display, "{} = {:?};", field.name(), value).unwrap(); | ||||
|         } | ||||
|     } | ||||
|   | ||||
| @@ -10,6 +10,7 @@ use stop_token::future::FutureExt; | ||||
| #[derive(Debug)] | ||||
| enum ConnectionManagerEvent { | ||||
|     Accepted(ProtocolNetworkConnection), | ||||
|     Dead(NetworkConnection), | ||||
|     Finished(ConnectionDescriptor), | ||||
| } | ||||
|  | ||||
| @@ -141,9 +142,9 @@ impl ConnectionManager { | ||||
|     fn on_new_protocol_network_connection( | ||||
|         &self, | ||||
|         inner: &mut ConnectionManagerInner, | ||||
|         conn: ProtocolNetworkConnection, | ||||
|         prot_conn: ProtocolNetworkConnection, | ||||
|     ) -> EyreResult<NetworkResult<ConnectionHandle>> { | ||||
|         log_net!("on_new_protocol_network_connection: {:?}", conn); | ||||
|         log_net!("on_new_protocol_network_connection: {:?}", prot_conn); | ||||
|  | ||||
|         // Wrap with NetworkConnection object to start the connection processing loop | ||||
|         let stop_token = match &inner.stop_source { | ||||
| @@ -151,10 +152,30 @@ impl ConnectionManager { | ||||
|             None => bail!("not creating connection because we are stopping"), | ||||
|         }; | ||||
|  | ||||
|         let conn = NetworkConnection::from_protocol(self.clone(), stop_token, conn); | ||||
|         let conn = NetworkConnection::from_protocol(self.clone(), stop_token, prot_conn); | ||||
|         let handle = conn.get_handle(); | ||||
|         // Add to the connection table | ||||
|         inner.connection_table.add_connection(conn)?; | ||||
|         match inner.connection_table.add_connection(conn) { | ||||
|             Ok(None) => { | ||||
|                 // Connection added | ||||
|             } | ||||
|             Ok(Some(conn)) => { | ||||
|                 // Connection added and a different one LRU'd out | ||||
|                 let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); | ||||
|             } | ||||
|             Err(ConnectionTableAddError::AddressFilter(conn, e)) => { | ||||
|                 // Connection filtered | ||||
|                 let desc = conn.connection_descriptor(); | ||||
|                 let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); | ||||
|                 return Err(eyre!("connection filtered: {:?} ({})", desc, e)); | ||||
|             } | ||||
|             Err(ConnectionTableAddError::AlreadyExists(conn)) => { | ||||
|                 // Connection already exists | ||||
|                 let desc = conn.connection_descriptor(); | ||||
|                 let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); | ||||
|                 return Err(eyre!("connection already exists: {:?}", desc)); | ||||
|             } | ||||
|         }; | ||||
|         Ok(NetworkResult::Value(handle)) | ||||
|     } | ||||
|  | ||||
| @@ -319,6 +340,10 @@ impl ConnectionManager { | ||||
|                         } | ||||
|                     }; | ||||
|                 } | ||||
|                 ConnectionManagerEvent::Dead(mut conn) => { | ||||
|                     conn.close(); | ||||
|                     conn.await; | ||||
|                 } | ||||
|                 ConnectionManagerEvent::Finished(desc) => { | ||||
|                     let conn = { | ||||
|                         let mut inner_lock = self.arc.inner.lock(); | ||||
|   | ||||
| @@ -4,25 +4,25 @@ use futures_util::StreamExt; | ||||
| use hashlink::LruCache; | ||||
|  | ||||
| /////////////////////////////////////////////////////////////////////////////// | ||||
| #[derive(ThisError, Debug, Clone, Eq, PartialEq)] | ||||
| #[derive(ThisError, Debug)] | ||||
| pub enum ConnectionTableAddError { | ||||
|     #[error("Connection already added to table")] | ||||
|     AlreadyExists, | ||||
|     AlreadyExists(NetworkConnection), | ||||
|     #[error("Connection address was filtered")] | ||||
|     AddressFilter(AddressFilterError), | ||||
|     AddressFilter(NetworkConnection, AddressFilterError), | ||||
| } | ||||
|  | ||||
| impl ConnectionTableAddError { | ||||
|     pub fn already_exists() -> Self { | ||||
|         ConnectionTableAddError::AlreadyExists | ||||
|     pub fn already_exists(conn: NetworkConnection) -> Self { | ||||
|         ConnectionTableAddError::AlreadyExists(conn) | ||||
|     } | ||||
|     pub fn address_filter(err: AddressFilterError) -> Self { | ||||
|         ConnectionTableAddError::AddressFilter(err) | ||||
|     pub fn address_filter(conn: NetworkConnection, err: AddressFilterError) -> Self { | ||||
|         ConnectionTableAddError::AddressFilter(conn, err) | ||||
|     } | ||||
| } | ||||
|  | ||||
| /////////////////////////////////////////////////////////////////////////////// | ||||
| #[derive(ThisError, Debug, Clone, Eq, PartialEq)] | ||||
| #[derive(ThisError, Debug)] | ||||
| pub enum ConnectionTableRemoveError { | ||||
|     #[error("Connection not in table")] | ||||
|     NotInTable, | ||||
| @@ -89,19 +89,23 @@ impl ConnectionTable { | ||||
|     pub fn add_connection( | ||||
|         &mut self, | ||||
|         conn: NetworkConnection, | ||||
|     ) -> Result<(), ConnectionTableAddError> { | ||||
|     ) -> Result<Option<NetworkConnection>, ConnectionTableAddError> { | ||||
|         let descriptor = conn.connection_descriptor(); | ||||
|         let ip_addr = descriptor.remote_address().to_ip_addr(); | ||||
|  | ||||
|         let index = protocol_to_index(descriptor.protocol_type()); | ||||
|         if self.conn_by_descriptor[index].contains_key(&descriptor) { | ||||
|             return Err(ConnectionTableAddError::already_exists()); | ||||
|             return Err(ConnectionTableAddError::already_exists(conn)); | ||||
|         } | ||||
|  | ||||
|         // Filter by ip for connection limits | ||||
|         self.address_filter | ||||
|             .add(ip_addr) | ||||
|             .map_err(ConnectionTableAddError::address_filter)?; | ||||
|         match self.address_filter.add(ip_addr) { | ||||
|             Ok(()) => {} | ||||
|             Err(e) => { | ||||
|                 // send connection to get cleaned up cleanly | ||||
|                 return Err(ConnectionTableAddError::address_filter(conn, e)); | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         // Add the connection to the table | ||||
|         let res = self.conn_by_descriptor[index].insert(descriptor.clone(), conn); | ||||
| @@ -109,9 +113,11 @@ impl ConnectionTable { | ||||
|  | ||||
|         // if we have reached the maximum number of connections per protocol type | ||||
|         // then drop the least recently used connection | ||||
|         let mut out_conn = None; | ||||
|         if self.conn_by_descriptor[index].len() > self.max_connections[index] { | ||||
|             if let Some((lruk, _)) = self.conn_by_descriptor[index].remove_lru() { | ||||
|             if let Some((lruk, lru_conn)) = self.conn_by_descriptor[index].remove_lru() { | ||||
|                 debug!("connection lru out: {:?}", lruk); | ||||
|                 out_conn = Some(lru_conn); | ||||
|                 self.remove_connection_records(lruk); | ||||
|             } | ||||
|         } | ||||
| @@ -124,7 +130,7 @@ impl ConnectionTable { | ||||
|  | ||||
|         descriptors.push(descriptor); | ||||
|  | ||||
|         Ok(()) | ||||
|         Ok(out_conn) | ||||
|     } | ||||
|  | ||||
|     pub fn get_connection(&mut self, descriptor: ConnectionDescriptor) -> Option<ConnectionHandle> { | ||||
|   | ||||
| @@ -1097,11 +1097,12 @@ impl NetworkManager { | ||||
|         // Wait for the return receipt | ||||
|         let inbound_nr = match eventual_value.await.take_value().unwrap() { | ||||
|             ReceiptEvent::ReturnedOutOfBand => { | ||||
|                 bail!("reverse connect receipt should be returned in-band"); | ||||
|                 return Ok(NetworkResult::invalid_message( | ||||
|                     "reverse connect receipt should be returned in-band", | ||||
|                 )); | ||||
|             } | ||||
|             ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef, | ||||
|             ReceiptEvent::Expired => { | ||||
|                 //bail!("reverse connect receipt expired from {:?}", target_nr); | ||||
|                 return Ok(NetworkResult::timeout()); | ||||
|             } | ||||
|             ReceiptEvent::Cancelled => { | ||||
| @@ -1180,11 +1181,13 @@ impl NetworkManager { | ||||
|         // Wait for the return receipt | ||||
|         let inbound_nr = match eventual_value.await.take_value().unwrap() { | ||||
|             ReceiptEvent::ReturnedOutOfBand => { | ||||
|                 bail!("hole punch receipt should be returned in-band"); | ||||
|                 return Ok(NetworkResult::invalid_message( | ||||
|                     "hole punch receipt should be returned in-band", | ||||
|                 )); | ||||
|             } | ||||
|             ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef, | ||||
|             ReceiptEvent::Expired => { | ||||
|                 bail!("hole punch receipt expired from {}", target_nr); | ||||
|                 return Ok(NetworkResult::timeout()); | ||||
|             } | ||||
|             ReceiptEvent::Cancelled => { | ||||
|                 bail!("hole punch receipt cancelled from {}", target_nr); | ||||
|   | ||||
| @@ -496,105 +496,119 @@ impl Network { | ||||
|         // Do UDPv4+v6 at the same time as everything else | ||||
|         if protocol_config.inbound.contains(ProtocolType::UDP) { | ||||
|             // UDPv4 | ||||
|             unord.push( | ||||
|                 async { | ||||
|                     let udpv4_context = DiscoveryContext::new(self.routing_table(), self.clone()); | ||||
|                     if let Err(e) = self | ||||
|                         .update_ipv4_protocol_dialinfo(&udpv4_context, ProtocolType::UDP) | ||||
|                         .await | ||||
|                     { | ||||
|                         log_net!(debug "Failed UDPv4 dialinfo discovery: {}", e); | ||||
|                         return None; | ||||
|             if protocol_config.family_global.contains(AddressType::IPV4) { | ||||
|                 unord.push( | ||||
|                     async { | ||||
|                         let udpv4_context = | ||||
|                             DiscoveryContext::new(self.routing_table(), self.clone()); | ||||
|                         if let Err(e) = self | ||||
|                             .update_ipv4_protocol_dialinfo(&udpv4_context, ProtocolType::UDP) | ||||
|                             .await | ||||
|                         { | ||||
|                             log_net!(debug "Failed UDPv4 dialinfo discovery: {}", e); | ||||
|                             return None; | ||||
|                         } | ||||
|                         Some(vec![udpv4_context]) | ||||
|                     } | ||||
|                     Some(vec![udpv4_context]) | ||||
|                 } | ||||
|                 .boxed(), | ||||
|             ); | ||||
|                     .boxed(), | ||||
|                 ); | ||||
|             } | ||||
|  | ||||
|             // UDPv6 | ||||
|             if protocol_config.family_global.contains(AddressType::IPV6) { | ||||
|                 unord.push( | ||||
|                     async { | ||||
|                         let udpv6_context = | ||||
|                             DiscoveryContext::new(self.routing_table(), self.clone()); | ||||
|                         if let Err(e) = self | ||||
|                             .update_ipv6_protocol_dialinfo(&udpv6_context, ProtocolType::UDP) | ||||
|                             .await | ||||
|                         { | ||||
|                             log_net!(debug "Failed UDPv6 dialinfo discovery: {}", e); | ||||
|                             return None; | ||||
|                         } | ||||
|                         Some(vec![udpv6_context]) | ||||
|                     } | ||||
|                     .boxed(), | ||||
|                 ); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // Do TCPv4 + WSv4 in series because they may use the same connection 5-tuple | ||||
|         if protocol_config.family_global.contains(AddressType::IPV4) { | ||||
|             unord.push( | ||||
|                 async { | ||||
|                     let udpv6_context = DiscoveryContext::new(self.routing_table(), self.clone()); | ||||
|                     if let Err(e) = self | ||||
|                         .update_ipv6_protocol_dialinfo(&udpv6_context, ProtocolType::UDP) | ||||
|                         .await | ||||
|                     { | ||||
|                         log_net!(debug "Failed UDPv6 dialinfo discovery: {}", e); | ||||
|                         return None; | ||||
|                     // TCPv4 | ||||
|                     let mut out = Vec::<DiscoveryContext>::new(); | ||||
|                     if protocol_config.inbound.contains(ProtocolType::TCP) { | ||||
|                         let tcpv4_context = | ||||
|                             DiscoveryContext::new(self.routing_table(), self.clone()); | ||||
|                         if let Err(e) = self | ||||
|                             .update_ipv4_protocol_dialinfo(&tcpv4_context, ProtocolType::TCP) | ||||
|                             .await | ||||
|                         { | ||||
|                             log_net!(debug "Failed TCPv4 dialinfo discovery: {}", e); | ||||
|                             return None; | ||||
|                         } | ||||
|                         out.push(tcpv4_context); | ||||
|                     } | ||||
|                     Some(vec![udpv6_context]) | ||||
|  | ||||
|                     // WSv4 | ||||
|                     if protocol_config.inbound.contains(ProtocolType::WS) { | ||||
|                         let wsv4_context = | ||||
|                             DiscoveryContext::new(self.routing_table(), self.clone()); | ||||
|                         if let Err(e) = self | ||||
|                             .update_ipv4_protocol_dialinfo(&wsv4_context, ProtocolType::WS) | ||||
|                             .await | ||||
|                         { | ||||
|                             log_net!(debug "Failed WSv4 dialinfo discovery: {}", e); | ||||
|                             return None; | ||||
|                         } | ||||
|                         out.push(wsv4_context); | ||||
|                     } | ||||
|                     Some(out) | ||||
|                 } | ||||
|                 .boxed(), | ||||
|             ); | ||||
|         } | ||||
|  | ||||
|         // Do TCPv4 + WSv4 in series because they may use the same connection 5-tuple | ||||
|         unord.push( | ||||
|             async { | ||||
|                 // TCPv4 | ||||
|                 let mut out = Vec::<DiscoveryContext>::new(); | ||||
|                 if protocol_config.inbound.contains(ProtocolType::TCP) { | ||||
|                     let tcpv4_context = DiscoveryContext::new(self.routing_table(), self.clone()); | ||||
|                     if let Err(e) = self | ||||
|                         .update_ipv4_protocol_dialinfo(&tcpv4_context, ProtocolType::TCP) | ||||
|                         .await | ||||
|                     { | ||||
|                         log_net!(debug "Failed TCPv4 dialinfo discovery: {}", e); | ||||
|                         return None; | ||||
|                     } | ||||
|                     out.push(tcpv4_context); | ||||
|                 } | ||||
|  | ||||
|                 // WSv4 | ||||
|                 if protocol_config.inbound.contains(ProtocolType::WS) { | ||||
|                     let wsv4_context = DiscoveryContext::new(self.routing_table(), self.clone()); | ||||
|                     if let Err(e) = self | ||||
|                         .update_ipv4_protocol_dialinfo(&wsv4_context, ProtocolType::WS) | ||||
|                         .await | ||||
|                     { | ||||
|                         log_net!(debug "Failed WSv4 dialinfo discovery: {}", e); | ||||
|                         return None; | ||||
|                     } | ||||
|                     out.push(wsv4_context); | ||||
|                 } | ||||
|                 Some(out) | ||||
|             } | ||||
|             .boxed(), | ||||
|         ); | ||||
|  | ||||
|         // Do TCPv6 + WSv6 in series because they may use the same connection 5-tuple | ||||
|         unord.push( | ||||
|             async { | ||||
|                 // TCPv6 | ||||
|                 let mut out = Vec::<DiscoveryContext>::new(); | ||||
|                 if protocol_config.inbound.contains(ProtocolType::TCP) { | ||||
|                     let tcpv6_context = DiscoveryContext::new(self.routing_table(), self.clone()); | ||||
|                     if let Err(e) = self | ||||
|                         .update_ipv6_protocol_dialinfo(&tcpv6_context, ProtocolType::TCP) | ||||
|                         .await | ||||
|                     { | ||||
|                         log_net!(debug "Failed TCPv6 dialinfo discovery: {}", e); | ||||
|                         return None; | ||||
|         if protocol_config.family_global.contains(AddressType::IPV6) { | ||||
|             unord.push( | ||||
|                 async { | ||||
|                     // TCPv6 | ||||
|                     let mut out = Vec::<DiscoveryContext>::new(); | ||||
|                     if protocol_config.inbound.contains(ProtocolType::TCP) { | ||||
|                         let tcpv6_context = | ||||
|                             DiscoveryContext::new(self.routing_table(), self.clone()); | ||||
|                         if let Err(e) = self | ||||
|                             .update_ipv6_protocol_dialinfo(&tcpv6_context, ProtocolType::TCP) | ||||
|                             .await | ||||
|                         { | ||||
|                             log_net!(debug "Failed TCPv6 dialinfo discovery: {}", e); | ||||
|                             return None; | ||||
|                         } | ||||
|                         out.push(tcpv6_context); | ||||
|                     } | ||||
|                     out.push(tcpv6_context); | ||||
|                 } | ||||
|  | ||||
|                 // WSv6 | ||||
|                 if protocol_config.inbound.contains(ProtocolType::WS) { | ||||
|                     let wsv6_context = DiscoveryContext::new(self.routing_table(), self.clone()); | ||||
|                     if let Err(e) = self | ||||
|                         .update_ipv6_protocol_dialinfo(&wsv6_context, ProtocolType::WS) | ||||
|                         .await | ||||
|                     { | ||||
|                         log_net!(debug "Failed WSv6 dialinfo discovery: {}", e); | ||||
|                         return None; | ||||
|                     // WSv6 | ||||
|                     if protocol_config.inbound.contains(ProtocolType::WS) { | ||||
|                         let wsv6_context = | ||||
|                             DiscoveryContext::new(self.routing_table(), self.clone()); | ||||
|                         if let Err(e) = self | ||||
|                             .update_ipv6_protocol_dialinfo(&wsv6_context, ProtocolType::WS) | ||||
|                             .await | ||||
|                         { | ||||
|                             log_net!(debug "Failed WSv6 dialinfo discovery: {}", e); | ||||
|                             return None; | ||||
|                         } | ||||
|                         out.push(wsv6_context); | ||||
|                     } | ||||
|                     out.push(wsv6_context); | ||||
|                     Some(out) | ||||
|                 } | ||||
|                 Some(out) | ||||
|             } | ||||
|             .boxed(), | ||||
|         ); | ||||
|                 .boxed(), | ||||
|             ); | ||||
|         } | ||||
|  | ||||
|         // Wait for all discovery futures to complete and collect contexts | ||||
|         let mut contexts = Vec::<DiscoveryContext>::new(); | ||||
|   | ||||
| @@ -85,8 +85,9 @@ pub async fn test_add_get_remove() { | ||||
|     assert_eq!( | ||||
|         table | ||||
|             .remove_connection(a2) | ||||
|             .map(|c| c.connection_descriptor()), | ||||
|         Ok(a1) | ||||
|             .map(|c| c.connection_descriptor()) | ||||
|             .unwrap(), | ||||
|         a1 | ||||
|     ); | ||||
|     assert_eq!(table.connection_count(), 0); | ||||
|     assert_err!(table.remove_connection(a2)); | ||||
| @@ -106,20 +107,23 @@ pub async fn test_add_get_remove() { | ||||
|     assert_eq!( | ||||
|         table | ||||
|             .remove_connection(a2) | ||||
|             .map(|c| c.connection_descriptor()), | ||||
|         Ok(a2) | ||||
|             .map(|c| c.connection_descriptor()) | ||||
|             .unwrap(), | ||||
|         a2 | ||||
|     ); | ||||
|     assert_eq!( | ||||
|         table | ||||
|             .remove_connection(a3) | ||||
|             .map(|c| c.connection_descriptor()), | ||||
|         Ok(a3) | ||||
|             .map(|c| c.connection_descriptor()) | ||||
|             .unwrap(), | ||||
|         a3 | ||||
|     ); | ||||
|     assert_eq!( | ||||
|         table | ||||
|             .remove_connection(a4) | ||||
|             .map(|c| c.connection_descriptor()), | ||||
|         Ok(a4) | ||||
|             .map(|c| c.connection_descriptor()) | ||||
|             .unwrap(), | ||||
|         a4 | ||||
|     ); | ||||
|     assert_eq!(table.connection_count(), 0); | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user