From 19db64cdfa06dcf965f8d2296f459d8600ec3454 Mon Sep 17 00:00:00 2001 From: John Smith Date: Wed, 7 Sep 2022 11:30:43 -0400 Subject: [PATCH] peer table cleanup async cleanup --- .../src/network_manager/connection_manager.rs | 6 +-- veilid-core/src/network_manager/mod.rs | 6 +-- veilid-core/src/network_manager/native/mod.rs | 2 +- veilid-core/src/routing_table/mod.rs | 42 ++++++++++++++++--- veilid-core/src/routing_table/node_ref.rs | 4 +- 5 files changed, 44 insertions(+), 16 deletions(-) diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index a704caf0..f181dfe9 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -122,10 +122,7 @@ impl ConnectionManager { } // Returns a network connection if one already is established - pub async fn get_connection( - &self, - descriptor: ConnectionDescriptor, - ) -> Option { + pub fn get_connection(&self, descriptor: ConnectionDescriptor) -> Option { let mut inner = self.arc.inner.lock(); let inner = match &mut *inner { Some(v) => v, @@ -363,6 +360,7 @@ impl ConnectionManager { None => None, } }; + if let Some(mut conn) = conn { conn.close(); conn.await; diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index ad53e620..4c79931e 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -1184,7 +1184,7 @@ impl NetworkManager { } // And now use the existing connection to send over - if let Some(descriptor) = inbound_nr.last_connection().await { + if let Some(descriptor) = inbound_nr.last_connection() { match self .net() .send_data_to_existing_connection(descriptor, data) @@ -1283,7 +1283,7 @@ impl NetworkManager { } // And now use the existing connection to send over - if let Some(descriptor) = inbound_nr.last_connection().await { + if let Some(descriptor) = inbound_nr.last_connection() { match self .net() .send_data_to_existing_connection(descriptor, data) @@ -1316,7 +1316,7 @@ impl NetworkManager { let this = self.clone(); Box::pin(async move { // First try to send data to the last socket we've seen this peer on - let data = if let Some(connection_descriptor) = node_ref.last_connection().await { + let data = if let Some(connection_descriptor) = node_ref.last_connection() { match this .net() .send_data_to_existing_connection(connection_descriptor, data) diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 5567e3eb..a95a5648 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -521,7 +521,7 @@ impl Network { // Handle connection-oriented protocols // Try to send to the exact existing connection if one exists - if let Some(conn) = self.connection_manager().get_connection(descriptor).await { + if let Some(conn) = self.connection_manager().get_connection(descriptor) { // connection exists, send over it match conn.send_async(data).await { ConnectionHandleSendResult::Sent => { diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 53e643a5..ad3fa95f 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -834,12 +834,42 @@ impl RoutingTable { } pub fn get_recent_peers(&self) -> Vec<(DHTKey, RecentPeersEntry)> { - let inner = self.inner.read(); - inner - .recent_peers - .iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect() + let mut recent_peers = Vec::new(); + let mut dead_peers = Vec::new(); + let mut out = Vec::new(); + + // collect all recent peers + { + let inner = self.inner.read(); + for (k, _v) in &inner.recent_peers { + recent_peers.push(*k); + } + } + + // look up each node and make sure the connection is still live + // (uses same logic as send_data, ensuring last_connection works for UDP) + for e in &recent_peers { + let mut dead = true; + if let Some(nr) = self.lookup_node_ref(*e) { + if let Some(last_connection) = nr.last_connection() { + out.push((*e, RecentPeersEntry { last_connection })); + dead = false; + } + } + if dead { + dead_peers.push(e); + } + } + + // purge dead recent peers + { + let mut inner = self.inner.write(); + for d in dead_peers { + inner.recent_peers.remove(d); + } + } + + out } pub fn touch_recent_peer(&self, node_id: DHTKey, last_connection: ConnectionDescriptor) { diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 6d50658b..26db4a8b 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -299,7 +299,7 @@ impl NodeRef { out } - pub async fn last_connection(&self) -> Option { + pub fn last_connection(&self) -> Option { // Get the last connection and the last time we saw anything with this connection let (last_connection, last_seen) = self.operate(|rti, e| e.last_connection(rti, self.filter.clone()))?; @@ -308,7 +308,7 @@ impl NodeRef { if last_connection.protocol_type().is_connection_oriented() { // Look the connection up in the connection manager and see if it's still there let connection_manager = self.routing_table.network_manager().connection_manager(); - connection_manager.get_connection(last_connection).await?; + connection_manager.get_connection(last_connection)?; } else { // If this is not connection oriented, then we check our last seen time // to see if this mapping has expired (beyond our timeout)