diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index 9ea09d34..6f350a9f 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -139,6 +139,33 @@ impl ConnectionManager { debug!("finished connection manager shutdown"); } + // Internal routine to see if we should keep this connection + // from being LRU removed. Used on our initiated relay connections. + fn should_protect_connection(&self, conn: &NetworkConnection) -> bool { + let netman = self.network_manager(); + let routing_table = netman.routing_table(); + let remote_address = conn.connection_descriptor().remote_address().address(); + let Some(routing_domain) = routing_table.routing_domain_for_address(remote_address) else { + return false; + }; + let Some(rn) = routing_table.relay_node(routing_domain) else { + return false; + }; + let relay_nr = rn.filtered_clone( + NodeRefFilter::new() + .with_routing_domain(routing_domain) + .with_address_type(conn.connection_descriptor().address_type()) + .with_protocol_type(conn.connection_descriptor().protocol_type()), + ); + let dids = relay_nr.all_filtered_dial_info_details(); + for did in dids { + if did.dial_info.address() == remote_address { + return true; + } + } + false + } + // Internal routine to register new connection atomically. // Registers connection in the connection table for later access // and spawns a message processing loop for the connection @@ -163,8 +190,16 @@ impl ConnectionManager { None => bail!("not creating connection because we are stopping"), }; - let conn = NetworkConnection::from_protocol(self.clone(), stop_token, prot_conn, id); + let mut conn = NetworkConnection::from_protocol(self.clone(), stop_token, prot_conn, id); let handle = conn.get_handle(); + + // See if this should be a protected connection + let protect = self.should_protect_connection(&conn); + if protect { + log_net!(debug "== PROTECTING connection: {} -> {}", id, conn.debug_print(get_aligned_timestamp())); + conn.protect(); + } + // Add to the connection table match self.arc.connection_table.add_connection(conn) { Ok(None) => { @@ -173,7 +208,7 @@ impl ConnectionManager { Ok(Some(conn)) => { // Connection added and a different one LRU'd out // Send it to be terminated - log_net!(debug "== LRU kill connection due to limit: {:?}", conn); + // log_net!(debug "== LRU kill connection due to limit: {:?}", conn); let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); } Err(ConnectionTableAddError::AddressFilter(conn, e)) => { diff --git a/veilid-core/src/network_manager/connection_table.rs b/veilid-core/src/network_manager/connection_table.rs index d2d13bcb..233c447e 100644 --- a/veilid-core/src/network_manager/connection_table.rs +++ b/veilid-core/src/network_manager/connection_table.rs @@ -184,10 +184,20 @@ impl ConnectionTable { // then drop the least recently used connection let mut out_conn = None; if inner.conn_by_id[protocol_index].len() > inner.max_connections[protocol_index] { - if let Some((lruk, lru_conn)) = inner.conn_by_id[protocol_index].peek_lru() { + while let Some((lruk, lru_conn)) = inner.conn_by_id[protocol_index].peek_lru() { let lruk = *lruk; - log_net!(debug "connection lru out: {:?}", lru_conn); + + // Don't LRU protected connections + if lru_conn.is_protected() { + // Mark as recently used + log_net!(debug "== No LRU Out for PROTECTED connection: {} -> {}", lruk, lru_conn.debug_print(get_aligned_timestamp())); + inner.conn_by_id[protocol_index].get(&lruk); + continue; + } + + log_net!(debug "== LRU Connection Killed: {} -> {}", lruk, lru_conn.debug_print(get_aligned_timestamp())); out_conn = Some(Self::remove_connection_records(&mut *inner, lruk)); + break; } } diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index dc9756fa..d1b7bc74 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -665,7 +665,7 @@ impl NetworkManager { #[instrument(level = "trace", skip(self), err)] pub async fn handle_signal( &self, - connection_descriptor: ConnectionDescriptor, + signal_connection_descriptor: ConnectionDescriptor, signal_info: SignalInfo, ) -> EyreResult> { match signal_info { @@ -689,8 +689,9 @@ impl NetworkManager { }; // Restrict reverse connection to same protocol as inbound signal - let peer_nr = peer_nr - .filtered_clone(NodeRefFilter::from(connection_descriptor.protocol_type())); + let peer_nr = peer_nr.filtered_clone(NodeRefFilter::from( + signal_connection_descriptor.protocol_type(), + )); // Make a reverse connection to the peer and send the receipt to it rpc.rpc_call_return_receipt(Destination::direct(peer_nr), receipt) diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index 76b4e13d..0b73bc6c 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -94,6 +94,7 @@ pub struct NetworkConnection { stats: Arc>, sender: flume::Sender<(Option, Vec)>, stop_source: Option, + protected: bool, } impl NetworkConnection { @@ -112,6 +113,7 @@ impl NetworkConnection { })), sender, stop_source: None, + protected: false, } } @@ -157,6 +159,7 @@ impl NetworkConnection { stats, sender, stop_source: Some(stop_source), + protected: false, } } @@ -172,6 +175,14 @@ impl NetworkConnection { ConnectionHandle::new(self.connection_id, self.descriptor.clone(), self.sender.clone()) } + pub fn is_protected(&self) -> bool { + self.protected + } + + pub fn protect(&mut self) { + self.protected = true; + } + pub fn close(&mut self) { if let Some(stop_source) = self.stop_source.take() { // drop the stopper diff --git a/veilid-core/src/network_manager/send_data.rs b/veilid-core/src/network_manager/send_data.rs index 0cc57375..beecf26b 100644 --- a/veilid-core/src/network_manager/send_data.rs +++ b/veilid-core/src/network_manager/send_data.rs @@ -18,6 +18,36 @@ impl NetworkManager { let this = self.clone(); Box::pin( async move { + + // First try to send data to the last socket we've seen this peer on + let data = if let Some(connection_descriptor) = destination_node_ref.last_connection() { + match this + .net() + .send_data_to_existing_connection(connection_descriptor, data) + .await? + { + None => { + // Update timestamp for this last connection since we just sent to it + destination_node_ref + .set_last_connection(connection_descriptor, get_aligned_timestamp()); + + return Ok(NetworkResult::value(SendDataKind::Existing( + connection_descriptor, + ))); + } + Some(data) => { + // Couldn't send data to existing connection + // so pass the data back out + data + } + } + } else { + // No last connection + data + }; + + // No existing connection was found or usable, so we proceed to see how to make a new one + // Get the best way to contact this node let contact_method = this.get_node_contact_method(destination_node_ref.clone())?; diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 6a72bf20..fd59c82b 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -244,7 +244,7 @@ pub trait NodeRefBase: Sized { }) } - fn all_filtered_dial_info_details(&self) -> Vec { + fn all_filtered_dial_info_details(&self) -> Vec { let routing_domain_set = self.routing_domain_set(); let dial_info_filter = self.dial_info_filter(); diff --git a/veilid-flutter/lib/default_config.dart b/veilid-flutter/lib/default_config.dart index a10787f3..eac855bc 100644 --- a/veilid-flutter/lib/default_config.dart +++ b/veilid-flutter/lib/default_config.dart @@ -173,14 +173,14 @@ Future getDefaultVeilidConfig(String programName) async { ws: VeilidConfigWS( connect: true, listen: !kIsWeb, - maxConnections: 1024, + maxConnections: 32, listenAddress: '', path: 'ws', ), wss: VeilidConfigWSS( connect: true, listen: false, - maxConnections: 1024, + maxConnections: 32, listenAddress: '', path: 'ws', ),