peer table cleanup

async cleanup
This commit is contained in:
John Smith 2022-09-07 11:30:43 -04:00
parent c36db533f2
commit 19db64cdfa
5 changed files with 44 additions and 16 deletions

View File

@ -122,10 +122,7 @@ impl ConnectionManager {
} }
// Returns a network connection if one already is established // Returns a network connection if one already is established
pub async fn get_connection( pub fn get_connection(&self, descriptor: ConnectionDescriptor) -> Option<ConnectionHandle> {
&self,
descriptor: ConnectionDescriptor,
) -> Option<ConnectionHandle> {
let mut inner = self.arc.inner.lock(); let mut inner = self.arc.inner.lock();
let inner = match &mut *inner { let inner = match &mut *inner {
Some(v) => v, Some(v) => v,
@ -363,6 +360,7 @@ impl ConnectionManager {
None => None, None => None,
} }
}; };
if let Some(mut conn) = conn { if let Some(mut conn) = conn {
conn.close(); conn.close();
conn.await; conn.await;

View File

@ -1184,7 +1184,7 @@ impl NetworkManager {
} }
// And now use the existing connection to send over // And now use the existing connection to send over
if let Some(descriptor) = inbound_nr.last_connection().await { if let Some(descriptor) = inbound_nr.last_connection() {
match self match self
.net() .net()
.send_data_to_existing_connection(descriptor, data) .send_data_to_existing_connection(descriptor, data)
@ -1283,7 +1283,7 @@ impl NetworkManager {
} }
// And now use the existing connection to send over // And now use the existing connection to send over
if let Some(descriptor) = inbound_nr.last_connection().await { if let Some(descriptor) = inbound_nr.last_connection() {
match self match self
.net() .net()
.send_data_to_existing_connection(descriptor, data) .send_data_to_existing_connection(descriptor, data)
@ -1316,7 +1316,7 @@ impl NetworkManager {
let this = self.clone(); let this = self.clone();
Box::pin(async move { Box::pin(async move {
// First try to send data to the last socket we've seen this peer on // First try to send data to the last socket we've seen this peer on
let data = if let Some(connection_descriptor) = node_ref.last_connection().await { let data = if let Some(connection_descriptor) = node_ref.last_connection() {
match this match this
.net() .net()
.send_data_to_existing_connection(connection_descriptor, data) .send_data_to_existing_connection(connection_descriptor, data)

View File

@ -521,7 +521,7 @@ impl Network {
// Handle connection-oriented protocols // Handle connection-oriented protocols
// Try to send to the exact existing connection if one exists // Try to send to the exact existing connection if one exists
if let Some(conn) = self.connection_manager().get_connection(descriptor).await { if let Some(conn) = self.connection_manager().get_connection(descriptor) {
// connection exists, send over it // connection exists, send over it
match conn.send_async(data).await { match conn.send_async(data).await {
ConnectionHandleSendResult::Sent => { ConnectionHandleSendResult::Sent => {

View File

@ -834,12 +834,42 @@ impl RoutingTable {
} }
pub fn get_recent_peers(&self) -> Vec<(DHTKey, RecentPeersEntry)> { pub fn get_recent_peers(&self) -> Vec<(DHTKey, RecentPeersEntry)> {
let mut recent_peers = Vec::new();
let mut dead_peers = Vec::new();
let mut out = Vec::new();
// collect all recent peers
{
let inner = self.inner.read(); let inner = self.inner.read();
inner for (k, _v) in &inner.recent_peers {
.recent_peers recent_peers.push(*k);
.iter() }
.map(|(k, v)| (k.clone(), v.clone())) }
.collect()
// look up each node and make sure the connection is still live
// (uses same logic as send_data, ensuring last_connection works for UDP)
for e in &recent_peers {
let mut dead = true;
if let Some(nr) = self.lookup_node_ref(*e) {
if let Some(last_connection) = nr.last_connection() {
out.push((*e, RecentPeersEntry { last_connection }));
dead = false;
}
}
if dead {
dead_peers.push(e);
}
}
// purge dead recent peers
{
let mut inner = self.inner.write();
for d in dead_peers {
inner.recent_peers.remove(d);
}
}
out
} }
pub fn touch_recent_peer(&self, node_id: DHTKey, last_connection: ConnectionDescriptor) { pub fn touch_recent_peer(&self, node_id: DHTKey, last_connection: ConnectionDescriptor) {

View File

@ -299,7 +299,7 @@ impl NodeRef {
out out
} }
pub async fn last_connection(&self) -> Option<ConnectionDescriptor> { pub fn last_connection(&self) -> Option<ConnectionDescriptor> {
// Get the last connection and the last time we saw anything with this connection // Get the last connection and the last time we saw anything with this connection
let (last_connection, last_seen) = let (last_connection, last_seen) =
self.operate(|rti, e| e.last_connection(rti, self.filter.clone()))?; self.operate(|rti, e| e.last_connection(rti, self.filter.clone()))?;
@ -308,7 +308,7 @@ impl NodeRef {
if last_connection.protocol_type().is_connection_oriented() { if last_connection.protocol_type().is_connection_oriented() {
// Look the connection up in the connection manager and see if it's still there // Look the connection up in the connection manager and see if it's still there
let connection_manager = self.routing_table.network_manager().connection_manager(); let connection_manager = self.routing_table.network_manager().connection_manager();
connection_manager.get_connection(last_connection).await?; connection_manager.get_connection(last_connection)?;
} else { } else {
// If this is not connection oriented, then we check our last seen time // If this is not connection oriented, then we check our last seen time
// to see if this mapping has expired (beyond our timeout) // to see if this mapping has expired (beyond our timeout)