fix keepalives

This commit is contained in:
John Smith 2023-07-14 14:21:00 -04:00
parent 742b8e09a5
commit 41b9a22595
10 changed files with 160 additions and 105 deletions

View File

@ -814,8 +814,8 @@ impl Network {
}
// commit routing table edits
editor_public_internet.commit().await;
editor_local_network.commit().await;
editor_public_internet.commit();
editor_local_network.commit();
info!("network started");
self.inner.lock().network_started = true;
@ -868,12 +868,12 @@ impl Network {
let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet);
editor.clear_dial_info_details();
editor.set_network_class(None);
editor.commit().await;
editor.commit();
let mut editor = routing_table.edit_routing_domain(RoutingDomain::LocalNetwork);
editor.clear_dial_info_details();
editor.set_network_class(None);
editor.commit().await;
editor.commit();
// Reset state including network class
*self.inner.lock() = Self::new_inner();

View File

@ -176,7 +176,7 @@ impl DiscoveryContext {
.routing_table
.find_fast_public_nodes_filtered(node_count, filters);
if peers.is_empty() {
log_net!(
log_net!(debug
"no external address detection peers of type {:?}:{:?}",
protocol_type,
address_type
@ -195,7 +195,7 @@ impl DiscoveryContext {
return Some((sa, peer));
}
}
log_net!("no peers responded with an external address");
log_net!(debug "no peers responded with an external address");
None
}
@ -943,8 +943,8 @@ impl Network {
punish();
}
} else {
// Send updates to everyone
editor.commit().await;
// Commit updates
editor.commit();
}
Ok(())

View File

@ -23,10 +23,6 @@ const UNRELIABLE_PING_SPAN_SECS: u32 = 60;
/// - Interval is the number of seconds between each ping
const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5;
/// Keepalive pings are done occasionally to ensure holepunched public dialinfo
/// remains valid, as well as to make sure we remain in any relay node's routing table
const KEEPALIVE_PING_INTERVAL_SECS: u32 = 10;
/// How many times do we try to ping a never-reached node before we call it dead
const NEVER_REACHED_PING_COUNT: u32 = 3;
@ -636,16 +632,10 @@ impl BucketEntryInner {
}
// Check if this node needs a ping right now to validate it is still reachable
pub(super) fn needs_ping(&self, cur_ts: Timestamp, needs_keepalive: bool) -> bool {
pub(super) fn needs_ping(&self, cur_ts: Timestamp) -> bool {
// See which ping pattern we are to use
let state = self.state(cur_ts);
// If this entry needs a keepalive (like a relay node),
// then we should ping it regularly to keep our association alive
if needs_keepalive {
return self.needs_constant_ping(cur_ts, TimestampDuration::new(KEEPALIVE_PING_INTERVAL_SECS as u64 * 1000000u64));
}
// If we don't have node status for this node, then we should ping it to get some node status
for routing_domain in RoutingDomainSet::all() {
if self.has_node_info(routing_domain.into()) {

View File

@ -463,6 +463,10 @@ impl RoutingTable {
self.inner.read().relay_node(domain)
}
pub fn relay_node_last_keepalive(&self, domain: RoutingDomain) -> Option<Timestamp> {
self.inner.read().relay_node_last_keepalive(domain)
}
pub fn has_dial_info(&self, domain: RoutingDomain) -> bool {
self.inner.read().has_dial_info(domain)
}

View File

@ -6,6 +6,9 @@ enum RoutingDomainChange {
SetRelayNode {
relay_node: NodeRef,
},
SetRelayNodeKeepalive {
ts: Option<Timestamp>,
},
AddDialInfoDetail {
dial_info_detail: DialInfoDetail,
},
@ -36,24 +39,33 @@ impl RoutingDomainEditor {
}
#[instrument(level = "debug", skip(self))]
pub fn clear_dial_info_details(&mut self) {
pub fn clear_dial_info_details(&mut self) -> &mut Self {
self.changes.push(RoutingDomainChange::ClearDialInfoDetails);
self
}
#[instrument(level = "debug", skip(self))]
pub fn clear_relay_node(&mut self) {
pub fn clear_relay_node(&mut self) -> &mut Self {
self.changes.push(RoutingDomainChange::ClearRelayNode);
self
}
#[instrument(level = "debug", skip(self))]
pub fn set_relay_node(&mut self, relay_node: NodeRef) {
pub fn set_relay_node(&mut self, relay_node: NodeRef) -> &mut Self {
self.changes
.push(RoutingDomainChange::SetRelayNode { relay_node })
.push(RoutingDomainChange::SetRelayNode { relay_node });
self
}
#[instrument(level = "debug", skip(self), err)]
#[instrument(level = "debug", skip(self))]
pub fn set_relay_node_keepalive(&mut self, ts: Option<Timestamp>) -> &mut Self {
self.changes
.push(RoutingDomainChange::SetRelayNodeKeepalive { ts });
self
}
#[instrument(level = "debug", skip(self))]
pub fn register_dial_info(
&mut self,
dial_info: DialInfo,
class: DialInfoClass,
) -> EyreResult<()> {
) -> EyreResult<&mut Self> {
if !self
.routing_table
.ensure_dial_info_is_valid(self.routing_domain, &dial_info)
@ -72,7 +84,7 @@ impl RoutingDomainEditor {
},
});
Ok(())
Ok(self)
}
#[instrument(level = "debug", skip(self))]
pub fn setup_network(
@ -81,23 +93,25 @@ impl RoutingDomainEditor {
inbound_protocols: ProtocolTypeSet,
address_types: AddressTypeSet,
capabilities: Vec<Capability>,
) {
) -> &mut Self {
self.changes.push(RoutingDomainChange::SetupNetwork {
outbound_protocols,
inbound_protocols,
address_types,
capabilities,
})
});
self
}
#[instrument(level = "debug", skip(self))]
pub fn set_network_class(&mut self, network_class: Option<NetworkClass>) {
pub fn set_network_class(&mut self, network_class: Option<NetworkClass>) -> &mut Self {
self.changes
.push(RoutingDomainChange::SetNetworkClass { network_class })
.push(RoutingDomainChange::SetNetworkClass { network_class });
self
}
#[instrument(level = "debug", skip(self))]
pub async fn commit(self) {
pub fn commit(&mut self) {
// No locking if we have nothing to do
if self.changes.is_empty() {
return;
@ -109,7 +123,7 @@ impl RoutingDomainEditor {
let mut inner = self.routing_table.inner.write();
inner.with_routing_domain_mut(self.routing_domain, |detail| {
for change in self.changes {
for change in self.changes.drain(..) {
match change {
RoutingDomainChange::ClearDialInfoDetails => {
debug!("[{:?}] cleared dial info details", self.routing_domain);
@ -123,7 +137,12 @@ impl RoutingDomainEditor {
}
RoutingDomainChange::SetRelayNode { relay_node } => {
debug!("[{:?}] set relay node: {}", self.routing_domain, relay_node);
detail.common_mut().set_relay_node(Some(relay_node));
detail.common_mut().set_relay_node(Some(relay_node.clone()));
changed = true;
}
RoutingDomainChange::SetRelayNodeKeepalive { ts } => {
debug!("[{:?}] relay node keepalive: {:?}", self.routing_domain, ts);
detail.common_mut().set_relay_node_last_keepalive(ts);
changed = true;
}
RoutingDomainChange::AddDialInfoDetail { dial_info_detail } => {
@ -155,7 +174,7 @@ impl RoutingDomainEditor {
let this_changed = old_outbound_protocols != outbound_protocols
|| old_inbound_protocols != inbound_protocols
|| old_address_types != address_types
|| old_capabilities != capabilities;
|| old_capabilities != *capabilities;
debug!(
"[{:?}] setup network: {:?} {:?} {:?} {:?}",
@ -170,7 +189,7 @@ impl RoutingDomainEditor {
outbound_protocols,
inbound_protocols,
address_types,
capabilities,
capabilities.clone(),
);
if this_changed {
changed = true;

View File

@ -27,6 +27,7 @@ pub struct RoutingDomainDetailCommon {
inbound_protocols: ProtocolTypeSet,
address_types: AddressTypeSet,
relay_node: Option<NodeRef>,
relay_node_last_keepalive: Option<Timestamp>,
capabilities: Vec<Capability>,
dial_info_details: Vec<DialInfoDetail>,
// caches
@ -42,6 +43,7 @@ impl RoutingDomainDetailCommon {
inbound_protocols: Default::default(),
address_types: Default::default(),
relay_node: Default::default(),
relay_node_last_keepalive: Default::default(),
capabilities: Default::default(),
dial_info_details: Default::default(),
cached_peer_info: Mutex::new(Default::default()),
@ -85,12 +87,19 @@ impl RoutingDomainDetailCommon {
pub fn relay_node(&self) -> Option<NodeRef> {
self.relay_node.clone()
}
pub fn relay_node_last_keepalive(&self) -> Option<Timestamp> {
self.relay_node_last_keepalive
}
pub(super) fn set_relay_node(&mut self, opt_relay_node: Option<NodeRef>) {
self.relay_node = opt_relay_node.map(|nr| {
nr.filtered_clone(NodeRefFilter::new().with_routing_domain(self.routing_domain))
});
self.relay_node_last_keepalive = None;
self.clear_cache();
}
pub(super) fn set_relay_node_last_keepalive(&mut self, ts: Option<Timestamp>) {
self.relay_node_last_keepalive = ts;
}
pub fn dial_info_details(&self) -> &Vec<DialInfoDetail> {
&self.dial_info_details
}

View File

@ -98,6 +98,10 @@ impl RoutingTableInner {
self.with_routing_domain(domain, |rd| rd.common().relay_node())
}
pub fn relay_node_last_keepalive(&self, domain: RoutingDomain) -> Option<Timestamp> {
self.with_routing_domain(domain, |rd| rd.common().relay_node_last_keepalive())
}
pub fn has_dial_info(&self, domain: RoutingDomain) -> bool {
self.with_routing_domain(domain, |rd| !rd.common().dial_info_details().is_empty())
}
@ -547,8 +551,6 @@ impl RoutingTableInner {
routing_domain: RoutingDomain,
cur_ts: Timestamp,
) -> Vec<NodeRef> {
// Collect relay nodes
let opt_relay = self.with_routing_domain(routing_domain, |rd| rd.common().relay_node());
let own_node_info_ts = self.get_own_node_info_ts(routing_domain);
// Collect all entries that are 'needs_ping' and have some node info making them reachable somehow
@ -559,13 +561,8 @@ impl RoutingTableInner {
if !e.exists_in_routing_domain(rti, routing_domain) {
return false;
}
// If we need a ping via the normal timing mechanism, then do it
// or if this node is our own relay, then we keep it alive
let is_our_relay = opt_relay
.as_ref()
.map(|nr| nr.same_bucket_entry(&entry))
.unwrap_or(false);
if e.needs_ping(cur_ts, is_our_relay) {
// If we need a ping then do it
if e.needs_ping(cur_ts) {
return true;
}
// If we need a ping because this node hasn't seen our latest node info, then do it

View File

@ -1,10 +1,90 @@
use super::*;
/// Keepalive pings are done occasionally to ensure holepunched public dialinfo
/// remains valid, as well as to make sure we remain in any relay node's routing table
const KEEPALIVE_PING_INTERVAL_SECS: u32 = 10;
use futures_util::stream::{FuturesUnordered, StreamExt};
use futures_util::FutureExt;
use stop_token::future::FutureExt as StopFutureExt;
impl RoutingTable {
// Ping each node in the routing table if they need to be pinged
// to determine their reliability
#[instrument(level = "trace", skip(self), err)]
fn relay_keepalive_public_internet(
&self,
cur_ts: Timestamp,
relay_nr: NodeRef,
unord: &mut FuturesUnordered<
SendPinBoxFuture<Result<NetworkResult<Answer<Option<SenderInfo>>>, RPCError>>,
>,
) -> EyreResult<()> {
let rpc = self.rpc_processor();
// Get our publicinternet dial info
let dids = self.all_filtered_dial_info_details(
RoutingDomain::PublicInternet.into(),
&DialInfoFilter::all(),
);
let opt_relay_keepalive_ts = self.relay_node_last_keepalive(RoutingDomain::PublicInternet);
let relay_needs_keepalive = opt_relay_keepalive_ts
.map(|kts| {
cur_ts.saturating_sub(kts).as_u64()
>= (KEEPALIVE_PING_INTERVAL_SECS as u64 * 1_000_000u64)
})
.unwrap_or(true);
if !relay_needs_keepalive {
return Ok(());
}
// Say we're doing this keepalive now
self.edit_routing_domain(RoutingDomain::PublicInternet)
.set_relay_node_keepalive(Some(cur_ts))
.commit();
// Look up any NAT mappings we may need to try to preserve with keepalives
let mut mapped_port_info = self.get_low_level_port_info();
// Relay nodes get pinged over all protocols we have inbound dialinfo for
// This is so we can preserve the inbound NAT mappings at our router
for did in &dids {
// Do we need to do this ping?
// Check if we have already pinged over this low-level-protocol/address-type/port combo
// We want to ensure we do the bare minimum required here
let pt = did.dial_info.protocol_type();
let at = did.dial_info.address_type();
let needs_ping_for_protocol =
if let Some((llpt, port)) = mapped_port_info.protocol_to_port.get(&(pt, at)) {
mapped_port_info
.low_level_protocol_ports
.remove(&(*llpt, at, *port))
} else {
false
};
if !needs_ping_for_protocol {
continue;
}
let rpc = rpc.clone();
let dif = did.dial_info.make_filter();
let relay_nr_filtered =
relay_nr.filtered_clone(NodeRefFilter::new().with_dial_info_filter(dif));
#[cfg(feature = "network-result-extra")]
log_rtab!(debug "--> Keepalive ping to {:?}", relay_nr_filtered);
unord.push(
async move {
rpc.rpc_call_status(Destination::direct(relay_nr_filtered))
.await
}
.instrument(Span::current())
.boxed(),
);
}
Ok(())
}
// Ping each node in the routing table if they need to be pinged
// to determine their reliability
#[instrument(level = "trace", skip(self), err)]
@ -20,63 +100,16 @@ impl RoutingTable {
// Get all nodes needing pings in the PublicInternet routing domain
let node_refs = self.get_nodes_needing_ping(RoutingDomain::PublicInternet, cur_ts);
// Look up any NAT mappings we may need to try to preserve with keepalives
let mut mapped_port_info = self.get_low_level_port_info();
// Get the PublicInternet relay if we are using one
let opt_relay_nr = self.relay_node(RoutingDomain::PublicInternet);
// Get our publicinternet dial info
let dids = self.all_filtered_dial_info_details(
RoutingDomain::PublicInternet.into(),
&DialInfoFilter::all(),
);
// For all nodes needing pings, figure out how many and over what protocols
for nr in node_refs {
// If this is our relay, let's check for NAT keepalives
let mut did_pings = false;
if let Some(relay_nr) = &opt_relay_nr {
if nr.same_entry(relay_nr) {
// Relay nodes get pinged over all protocols we have inbound dialinfo for
// This is so we can preserve the inbound NAT mappings at our router
for did in &dids {
// Do we need to do this ping?
// Check if we have already pinged over this low-level-protocol/address-type/port combo
// We want to ensure we do the bare minimum required here
let pt = did.dial_info.protocol_type();
let at = did.dial_info.address_type();
let needs_ping = if let Some((llpt, port)) =
mapped_port_info.protocol_to_port.get(&(pt, at))
{
mapped_port_info
.low_level_protocol_ports
.remove(&(*llpt, at, *port))
} else {
false
};
if needs_ping {
let rpc = rpc.clone();
let dif = did.dial_info.make_filter();
let nr_filtered =
nr.filtered_clone(NodeRefFilter::new().with_dial_info_filter(dif));
log_net!("--> Keepalive ping to {:?}", nr_filtered);
unord.push(
async move {
rpc.rpc_call_status(Destination::direct(nr_filtered)).await
if let Some(relay_nr) = opt_relay_nr {
self.relay_keepalive_public_internet(cur_ts, relay_nr, unord)?;
}
.instrument(Span::current())
.boxed(),
);
did_pings = true;
}
}
}
}
// Just do a single ping with the best protocol for all the other nodes,
// ensuring that we at least ping a relay with -something- even if we didnt have
// any mapped ports to preserve
if !did_pings {
// Just do a single ping with the best protocol for all the other nodes to check for liveness
for nr in node_refs {
let rpc = rpc.clone();
unord.push(
async move { rpc.rpc_call_status(Destination::direct(nr)).await }
@ -84,7 +117,6 @@ impl RoutingTable {
.boxed(),
);
}
}
Ok(())
}

View File

@ -69,7 +69,7 @@ impl RoutingTable {
false,
) {
Ok(nr) => {
log_rtab!("Outbound relay node selected: {}", nr);
info!("Outbound relay node selected: {}", nr);
editor.set_relay_node(nr);
got_outbound_relay = true;
}
@ -77,6 +77,8 @@ impl RoutingTable {
log_rtab!(error "failed to register node with peer info: {}", e);
}
}
} else {
info!("Outbound relay desired but not available");
}
}
if !got_outbound_relay {
@ -89,7 +91,7 @@ impl RoutingTable {
}
// Commit the changes
editor.commit().await;
editor.commit();
Ok(())
}

View File

@ -1,3 +1,5 @@
#![allow(non_snake_case)]
use super::*;
// Routing domain here is listed in order of preference, keep in order