From 41b9a2259510a923ee1f776765b22f9ba1d21797 Mon Sep 17 00:00:00 2001 From: John Smith Date: Fri, 14 Jul 2023 14:21:00 -0400 Subject: [PATCH] fix keepalives --- veilid-core/src/network_manager/native/mod.rs | 8 +- .../native/network_class_discovery.rs | 8 +- veilid-core/src/routing_table/bucket_entry.rs | 12 +- veilid-core/src/routing_table/mod.rs | 4 + .../routing_table/routing_domain_editor.rs | 51 ++++-- .../src/routing_table/routing_domains.rs | 9 ++ .../src/routing_table/routing_table_inner.rs | 15 +- .../src/routing_table/tasks/ping_validator.rs | 150 +++++++++++------- .../routing_table/tasks/relay_management.rs | 6 +- .../src/routing_table/types/routing_domain.rs | 2 + 10 files changed, 160 insertions(+), 105 deletions(-) diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index a4cf7854..c469ed13 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -814,8 +814,8 @@ impl Network { } // commit routing table edits - editor_public_internet.commit().await; - editor_local_network.commit().await; + editor_public_internet.commit(); + editor_local_network.commit(); info!("network started"); self.inner.lock().network_started = true; @@ -868,12 +868,12 @@ impl Network { let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet); editor.clear_dial_info_details(); editor.set_network_class(None); - editor.commit().await; + editor.commit(); let mut editor = routing_table.edit_routing_domain(RoutingDomain::LocalNetwork); editor.clear_dial_info_details(); editor.set_network_class(None); - editor.commit().await; + editor.commit(); // Reset state including network class *self.inner.lock() = Self::new_inner(); diff --git a/veilid-core/src/network_manager/native/network_class_discovery.rs b/veilid-core/src/network_manager/native/network_class_discovery.rs index 3ddccde1..6fdf283a 100644 --- a/veilid-core/src/network_manager/native/network_class_discovery.rs +++ b/veilid-core/src/network_manager/native/network_class_discovery.rs @@ -176,7 +176,7 @@ impl DiscoveryContext { .routing_table .find_fast_public_nodes_filtered(node_count, filters); if peers.is_empty() { - log_net!( + log_net!(debug "no external address detection peers of type {:?}:{:?}", protocol_type, address_type @@ -195,7 +195,7 @@ impl DiscoveryContext { return Some((sa, peer)); } } - log_net!("no peers responded with an external address"); + log_net!(debug "no peers responded with an external address"); None } @@ -943,8 +943,8 @@ impl Network { punish(); } } else { - // Send updates to everyone - editor.commit().await; + // Commit updates + editor.commit(); } Ok(()) diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index f0f6df94..8f3d0155 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -23,10 +23,6 @@ const UNRELIABLE_PING_SPAN_SECS: u32 = 60; /// - Interval is the number of seconds between each ping const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5; -/// Keepalive pings are done occasionally to ensure holepunched public dialinfo -/// remains valid, as well as to make sure we remain in any relay node's routing table -const KEEPALIVE_PING_INTERVAL_SECS: u32 = 10; - /// How many times do we try to ping a never-reached node before we call it dead const NEVER_REACHED_PING_COUNT: u32 = 3; @@ -636,16 +632,10 @@ impl BucketEntryInner { } // Check if this node needs a ping right now to validate it is still reachable - pub(super) fn needs_ping(&self, cur_ts: Timestamp, needs_keepalive: bool) -> bool { + pub(super) fn needs_ping(&self, cur_ts: Timestamp) -> bool { // See which ping pattern we are to use let state = self.state(cur_ts); - // If this entry needs a keepalive (like a relay node), - // then we should ping it regularly to keep our association alive - if needs_keepalive { - return self.needs_constant_ping(cur_ts, TimestampDuration::new(KEEPALIVE_PING_INTERVAL_SECS as u64 * 1000000u64)); - } - // If we don't have node status for this node, then we should ping it to get some node status for routing_domain in RoutingDomainSet::all() { if self.has_node_info(routing_domain.into()) { diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index ba39eafb..dc541449 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -463,6 +463,10 @@ impl RoutingTable { self.inner.read().relay_node(domain) } + pub fn relay_node_last_keepalive(&self, domain: RoutingDomain) -> Option { + self.inner.read().relay_node_last_keepalive(domain) + } + pub fn has_dial_info(&self, domain: RoutingDomain) -> bool { self.inner.read().has_dial_info(domain) } diff --git a/veilid-core/src/routing_table/routing_domain_editor.rs b/veilid-core/src/routing_table/routing_domain_editor.rs index 01f862ff..898e1b82 100644 --- a/veilid-core/src/routing_table/routing_domain_editor.rs +++ b/veilid-core/src/routing_table/routing_domain_editor.rs @@ -6,6 +6,9 @@ enum RoutingDomainChange { SetRelayNode { relay_node: NodeRef, }, + SetRelayNodeKeepalive { + ts: Option, + }, AddDialInfoDetail { dial_info_detail: DialInfoDetail, }, @@ -36,24 +39,33 @@ impl RoutingDomainEditor { } #[instrument(level = "debug", skip(self))] - pub fn clear_dial_info_details(&mut self) { + pub fn clear_dial_info_details(&mut self) -> &mut Self { self.changes.push(RoutingDomainChange::ClearDialInfoDetails); + self } #[instrument(level = "debug", skip(self))] - pub fn clear_relay_node(&mut self) { + pub fn clear_relay_node(&mut self) -> &mut Self { self.changes.push(RoutingDomainChange::ClearRelayNode); + self } #[instrument(level = "debug", skip(self))] - pub fn set_relay_node(&mut self, relay_node: NodeRef) { + pub fn set_relay_node(&mut self, relay_node: NodeRef) -> &mut Self { self.changes - .push(RoutingDomainChange::SetRelayNode { relay_node }) + .push(RoutingDomainChange::SetRelayNode { relay_node }); + self } - #[instrument(level = "debug", skip(self), err)] + #[instrument(level = "debug", skip(self))] + pub fn set_relay_node_keepalive(&mut self, ts: Option) -> &mut Self { + self.changes + .push(RoutingDomainChange::SetRelayNodeKeepalive { ts }); + self + } + #[instrument(level = "debug", skip(self))] pub fn register_dial_info( &mut self, dial_info: DialInfo, class: DialInfoClass, - ) -> EyreResult<()> { + ) -> EyreResult<&mut Self> { if !self .routing_table .ensure_dial_info_is_valid(self.routing_domain, &dial_info) @@ -72,7 +84,7 @@ impl RoutingDomainEditor { }, }); - Ok(()) + Ok(self) } #[instrument(level = "debug", skip(self))] pub fn setup_network( @@ -81,23 +93,25 @@ impl RoutingDomainEditor { inbound_protocols: ProtocolTypeSet, address_types: AddressTypeSet, capabilities: Vec, - ) { + ) -> &mut Self { self.changes.push(RoutingDomainChange::SetupNetwork { outbound_protocols, inbound_protocols, address_types, capabilities, - }) + }); + self } #[instrument(level = "debug", skip(self))] - pub fn set_network_class(&mut self, network_class: Option) { + pub fn set_network_class(&mut self, network_class: Option) -> &mut Self { self.changes - .push(RoutingDomainChange::SetNetworkClass { network_class }) + .push(RoutingDomainChange::SetNetworkClass { network_class }); + self } #[instrument(level = "debug", skip(self))] - pub async fn commit(self) { + pub fn commit(&mut self) { // No locking if we have nothing to do if self.changes.is_empty() { return; @@ -109,7 +123,7 @@ impl RoutingDomainEditor { let mut inner = self.routing_table.inner.write(); inner.with_routing_domain_mut(self.routing_domain, |detail| { - for change in self.changes { + for change in self.changes.drain(..) { match change { RoutingDomainChange::ClearDialInfoDetails => { debug!("[{:?}] cleared dial info details", self.routing_domain); @@ -123,7 +137,12 @@ impl RoutingDomainEditor { } RoutingDomainChange::SetRelayNode { relay_node } => { debug!("[{:?}] set relay node: {}", self.routing_domain, relay_node); - detail.common_mut().set_relay_node(Some(relay_node)); + detail.common_mut().set_relay_node(Some(relay_node.clone())); + changed = true; + } + RoutingDomainChange::SetRelayNodeKeepalive { ts } => { + debug!("[{:?}] relay node keepalive: {:?}", self.routing_domain, ts); + detail.common_mut().set_relay_node_last_keepalive(ts); changed = true; } RoutingDomainChange::AddDialInfoDetail { dial_info_detail } => { @@ -155,7 +174,7 @@ impl RoutingDomainEditor { let this_changed = old_outbound_protocols != outbound_protocols || old_inbound_protocols != inbound_protocols || old_address_types != address_types - || old_capabilities != capabilities; + || old_capabilities != *capabilities; debug!( "[{:?}] setup network: {:?} {:?} {:?} {:?}", @@ -170,7 +189,7 @@ impl RoutingDomainEditor { outbound_protocols, inbound_protocols, address_types, - capabilities, + capabilities.clone(), ); if this_changed { changed = true; diff --git a/veilid-core/src/routing_table/routing_domains.rs b/veilid-core/src/routing_table/routing_domains.rs index 04a9a2cf..6c3273dc 100644 --- a/veilid-core/src/routing_table/routing_domains.rs +++ b/veilid-core/src/routing_table/routing_domains.rs @@ -27,6 +27,7 @@ pub struct RoutingDomainDetailCommon { inbound_protocols: ProtocolTypeSet, address_types: AddressTypeSet, relay_node: Option, + relay_node_last_keepalive: Option, capabilities: Vec, dial_info_details: Vec, // caches @@ -42,6 +43,7 @@ impl RoutingDomainDetailCommon { inbound_protocols: Default::default(), address_types: Default::default(), relay_node: Default::default(), + relay_node_last_keepalive: Default::default(), capabilities: Default::default(), dial_info_details: Default::default(), cached_peer_info: Mutex::new(Default::default()), @@ -85,12 +87,19 @@ impl RoutingDomainDetailCommon { pub fn relay_node(&self) -> Option { self.relay_node.clone() } + pub fn relay_node_last_keepalive(&self) -> Option { + self.relay_node_last_keepalive + } pub(super) fn set_relay_node(&mut self, opt_relay_node: Option) { self.relay_node = opt_relay_node.map(|nr| { nr.filtered_clone(NodeRefFilter::new().with_routing_domain(self.routing_domain)) }); + self.relay_node_last_keepalive = None; self.clear_cache(); } + pub(super) fn set_relay_node_last_keepalive(&mut self, ts: Option) { + self.relay_node_last_keepalive = ts; + } pub fn dial_info_details(&self) -> &Vec { &self.dial_info_details } diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index 92586e6d..c60d9e67 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -98,6 +98,10 @@ impl RoutingTableInner { self.with_routing_domain(domain, |rd| rd.common().relay_node()) } + pub fn relay_node_last_keepalive(&self, domain: RoutingDomain) -> Option { + self.with_routing_domain(domain, |rd| rd.common().relay_node_last_keepalive()) + } + pub fn has_dial_info(&self, domain: RoutingDomain) -> bool { self.with_routing_domain(domain, |rd| !rd.common().dial_info_details().is_empty()) } @@ -547,8 +551,6 @@ impl RoutingTableInner { routing_domain: RoutingDomain, cur_ts: Timestamp, ) -> Vec { - // Collect relay nodes - let opt_relay = self.with_routing_domain(routing_domain, |rd| rd.common().relay_node()); let own_node_info_ts = self.get_own_node_info_ts(routing_domain); // Collect all entries that are 'needs_ping' and have some node info making them reachable somehow @@ -559,13 +561,8 @@ impl RoutingTableInner { if !e.exists_in_routing_domain(rti, routing_domain) { return false; } - // If we need a ping via the normal timing mechanism, then do it - // or if this node is our own relay, then we keep it alive - let is_our_relay = opt_relay - .as_ref() - .map(|nr| nr.same_bucket_entry(&entry)) - .unwrap_or(false); - if e.needs_ping(cur_ts, is_our_relay) { + // If we need a ping then do it + if e.needs_ping(cur_ts) { return true; } // If we need a ping because this node hasn't seen our latest node info, then do it diff --git a/veilid-core/src/routing_table/tasks/ping_validator.rs b/veilid-core/src/routing_table/tasks/ping_validator.rs index b539d205..54f84dae 100644 --- a/veilid-core/src/routing_table/tasks/ping_validator.rs +++ b/veilid-core/src/routing_table/tasks/ping_validator.rs @@ -1,10 +1,90 @@ use super::*; +/// Keepalive pings are done occasionally to ensure holepunched public dialinfo +/// remains valid, as well as to make sure we remain in any relay node's routing table +const KEEPALIVE_PING_INTERVAL_SECS: u32 = 10; + use futures_util::stream::{FuturesUnordered, StreamExt}; use futures_util::FutureExt; use stop_token::future::FutureExt as StopFutureExt; impl RoutingTable { + // Ping each node in the routing table if they need to be pinged + // to determine their reliability + #[instrument(level = "trace", skip(self), err)] + fn relay_keepalive_public_internet( + &self, + cur_ts: Timestamp, + relay_nr: NodeRef, + unord: &mut FuturesUnordered< + SendPinBoxFuture>>, RPCError>>, + >, + ) -> EyreResult<()> { + let rpc = self.rpc_processor(); + // Get our publicinternet dial info + let dids = self.all_filtered_dial_info_details( + RoutingDomain::PublicInternet.into(), + &DialInfoFilter::all(), + ); + + let opt_relay_keepalive_ts = self.relay_node_last_keepalive(RoutingDomain::PublicInternet); + let relay_needs_keepalive = opt_relay_keepalive_ts + .map(|kts| { + cur_ts.saturating_sub(kts).as_u64() + >= (KEEPALIVE_PING_INTERVAL_SECS as u64 * 1_000_000u64) + }) + .unwrap_or(true); + + if !relay_needs_keepalive { + return Ok(()); + } + // Say we're doing this keepalive now + self.edit_routing_domain(RoutingDomain::PublicInternet) + .set_relay_node_keepalive(Some(cur_ts)) + .commit(); + + // Look up any NAT mappings we may need to try to preserve with keepalives + let mut mapped_port_info = self.get_low_level_port_info(); + + // Relay nodes get pinged over all protocols we have inbound dialinfo for + // This is so we can preserve the inbound NAT mappings at our router + for did in &dids { + // Do we need to do this ping? + // Check if we have already pinged over this low-level-protocol/address-type/port combo + // We want to ensure we do the bare minimum required here + let pt = did.dial_info.protocol_type(); + let at = did.dial_info.address_type(); + let needs_ping_for_protocol = + if let Some((llpt, port)) = mapped_port_info.protocol_to_port.get(&(pt, at)) { + mapped_port_info + .low_level_protocol_ports + .remove(&(*llpt, at, *port)) + } else { + false + }; + if !needs_ping_for_protocol { + continue; + } + + let rpc = rpc.clone(); + let dif = did.dial_info.make_filter(); + let relay_nr_filtered = + relay_nr.filtered_clone(NodeRefFilter::new().with_dial_info_filter(dif)); + + #[cfg(feature = "network-result-extra")] + log_rtab!(debug "--> Keepalive ping to {:?}", relay_nr_filtered); + + unord.push( + async move { + rpc.rpc_call_status(Destination::direct(relay_nr_filtered)) + .await + } + .instrument(Span::current()) + .boxed(), + ); + } + Ok(()) + } // Ping each node in the routing table if they need to be pinged // to determine their reliability #[instrument(level = "trace", skip(self), err)] @@ -20,70 +100,22 @@ impl RoutingTable { // Get all nodes needing pings in the PublicInternet routing domain let node_refs = self.get_nodes_needing_ping(RoutingDomain::PublicInternet, cur_ts); - // Look up any NAT mappings we may need to try to preserve with keepalives - let mut mapped_port_info = self.get_low_level_port_info(); - // Get the PublicInternet relay if we are using one let opt_relay_nr = self.relay_node(RoutingDomain::PublicInternet); - // Get our publicinternet dial info - let dids = self.all_filtered_dial_info_details( - RoutingDomain::PublicInternet.into(), - &DialInfoFilter::all(), - ); + // If this is our relay, let's check for NAT keepalives + if let Some(relay_nr) = opt_relay_nr { + self.relay_keepalive_public_internet(cur_ts, relay_nr, unord)?; + } - // For all nodes needing pings, figure out how many and over what protocols + // Just do a single ping with the best protocol for all the other nodes to check for liveness for nr in node_refs { - // If this is our relay, let's check for NAT keepalives - let mut did_pings = false; - if let Some(relay_nr) = &opt_relay_nr { - if nr.same_entry(relay_nr) { - // Relay nodes get pinged over all protocols we have inbound dialinfo for - // This is so we can preserve the inbound NAT mappings at our router - for did in &dids { - // Do we need to do this ping? - // Check if we have already pinged over this low-level-protocol/address-type/port combo - // We want to ensure we do the bare minimum required here - let pt = did.dial_info.protocol_type(); - let at = did.dial_info.address_type(); - let needs_ping = if let Some((llpt, port)) = - mapped_port_info.protocol_to_port.get(&(pt, at)) - { - mapped_port_info - .low_level_protocol_ports - .remove(&(*llpt, at, *port)) - } else { - false - }; - if needs_ping { - let rpc = rpc.clone(); - let dif = did.dial_info.make_filter(); - let nr_filtered = - nr.filtered_clone(NodeRefFilter::new().with_dial_info_filter(dif)); - log_net!("--> Keepalive ping to {:?}", nr_filtered); - unord.push( - async move { - rpc.rpc_call_status(Destination::direct(nr_filtered)).await - } - .instrument(Span::current()) - .boxed(), - ); - did_pings = true; - } - } - } - } - // Just do a single ping with the best protocol for all the other nodes, - // ensuring that we at least ping a relay with -something- even if we didnt have - // any mapped ports to preserve - if !did_pings { - let rpc = rpc.clone(); - unord.push( - async move { rpc.rpc_call_status(Destination::direct(nr)).await } - .instrument(Span::current()) - .boxed(), - ); - } + let rpc = rpc.clone(); + unord.push( + async move { rpc.rpc_call_status(Destination::direct(nr)).await } + .instrument(Span::current()) + .boxed(), + ); } Ok(()) diff --git a/veilid-core/src/routing_table/tasks/relay_management.rs b/veilid-core/src/routing_table/tasks/relay_management.rs index 95612f8a..71515132 100644 --- a/veilid-core/src/routing_table/tasks/relay_management.rs +++ b/veilid-core/src/routing_table/tasks/relay_management.rs @@ -69,7 +69,7 @@ impl RoutingTable { false, ) { Ok(nr) => { - log_rtab!("Outbound relay node selected: {}", nr); + info!("Outbound relay node selected: {}", nr); editor.set_relay_node(nr); got_outbound_relay = true; } @@ -77,6 +77,8 @@ impl RoutingTable { log_rtab!(error "failed to register node with peer info: {}", e); } } + } else { + info!("Outbound relay desired but not available"); } } if !got_outbound_relay { @@ -89,7 +91,7 @@ impl RoutingTable { } // Commit the changes - editor.commit().await; + editor.commit(); Ok(()) } diff --git a/veilid-core/src/routing_table/types/routing_domain.rs b/veilid-core/src/routing_table/types/routing_domain.rs index e1982a08..dcd143a9 100644 --- a/veilid-core/src/routing_table/types/routing_domain.rs +++ b/veilid-core/src/routing_table/types/routing_domain.rs @@ -1,3 +1,5 @@ +#![allow(non_snake_case)] + use super::*; // Routing domain here is listed in order of preference, keep in order