From ee0e729a92f1de6a6f5df2fec694f5b593f4979c Mon Sep 17 00:00:00 2001 From: John Smith Date: Thu, 18 Aug 2022 16:21:13 -0400 Subject: [PATCH] keepalive work --- veilid-core/src/network_manager/tasks.rs | 26 ++++++-- veilid-core/src/routing_table/find_nodes.rs | 71 ++++++++++++++++----- veilid-core/src/veilid_api/mod.rs | 22 +++++++ 3 files changed, 98 insertions(+), 21 deletions(-) diff --git a/veilid-core/src/network_manager/tasks.rs b/veilid-core/src/network_manager/tasks.rs index b609e87c..8ad0d0e1 100644 --- a/veilid-core/src/network_manager/tasks.rs +++ b/veilid-core/src/network_manager/tasks.rs @@ -336,16 +336,34 @@ impl NetworkManager { let mut unord = FuturesUnordered::new(); let node_refs = routing_table.get_nodes_needing_ping(cur_ts, relay_node_id); + let mut mapped_port_info = routing_table.get_mapped_port_info(); + for nr in node_refs { let rpc = rpc.clone(); if Some(nr.node_id()) == relay_node_id { // Relay nodes get pinged over all protocols we have inbound dialinfo for // This is so we can preserve the inbound NAT mappings at our router for did in &dids { - let rpc = rpc.clone(); - let dif = did.dial_info.make_filter(true); - let nr_filtered = nr.filtered_clone(dif); - unord.push(async move { rpc.rpc_call_status(nr_filtered).await }.boxed()); + // Do we need to do this ping? + let pt = did.dial_info.protocol_type(); + let at = did.dial_info.address_type(); + + let needs_ping = if let Some((llpt, port)) = + mapped_port_info.protocol_to_port.get(&(pt, at)) + { + mapped_port_info + .low_level_protocol_ports + .remove(&(*llpt, at, *port)) + } else { + false + }; + if needs_ping { + let rpc = rpc.clone(); + let dif = did.dial_info.make_filter(true); + let nr_filtered = nr.filtered_clone(dif); + log_net!("--> Keepalive ping to {:?}", nr_filtered); + unord.push(async move { rpc.rpc_call_status(nr_filtered).await }.boxed()); + } } } else { // Just do a single ping with the best protocol for all the other nodes diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index 7e879c91..54ce6c95 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -4,6 +4,14 @@ use crate::dht::*; use crate::xx::*; use crate::*; +pub type LowLevelProtocolPorts = BTreeSet<(LowLevelProtocolType, AddressType, u16)>; +pub type ProtocolToPortMapping = BTreeMap<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>; +#[derive(Clone, Debug)] +pub struct MappedPortInfo { + pub low_level_protocol_ports: LowLevelProtocolPorts, + pub protocol_to_port: ProtocolToPortMapping, +} + impl RoutingTable { // Makes a filter that finds nodes with a matching inbound dialinfo pub fn make_inbound_dial_info_entry_filter( @@ -383,12 +391,46 @@ impl RoutingTable { out } + // Build a map of protocols to low level ports + // This way we can get the set of protocols required to keep our NAT mapping alive for keepalive pings + // Only one protocol per low level protocol/port combination is required + // For example, if WS/WSS and TCP protocols are on the same low-level TCP port, only TCP keepalives will be required + // and we do not need to do WS/WSS keepalive as well. If they are on different ports, then we will need WS/WSS keepalives too. + pub fn get_mapped_port_info(&self) -> MappedPortInfo { + let mut low_level_protocol_ports = + BTreeSet::<(LowLevelProtocolType, AddressType, u16)>::new(); + let mut protocol_to_port = + BTreeMap::<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>::new(); + let our_dids = self.all_filtered_dial_info_details( + Some(RoutingDomain::PublicInternet), + &DialInfoFilter::all(), + ); + for did in our_dids { + low_level_protocol_ports.insert(( + did.dial_info.protocol_type().low_level_protocol_type(), + did.dial_info.address_type(), + did.dial_info.socket_address().port(), + )); + protocol_to_port.insert( + (did.dial_info.protocol_type(), did.dial_info.address_type()), + ( + did.dial_info.protocol_type().low_level_protocol_type(), + did.dial_info.socket_address().port(), + ), + ); + } + MappedPortInfo { + low_level_protocol_ports, + protocol_to_port, + } + } + fn make_relay_node_filter(&self) -> impl Fn(&BucketEntryInner) -> bool { // Get all our outbound protocol/address types - let protocol_config = self.network_manager().get_protocol_config(); let outbound_dif = self .network_manager() .get_outbound_dial_info_filter(RoutingDomain::PublicInternet); + let mapped_port_info = self.get_mapped_port_info(); move |e: &BucketEntryInner| { // Ensure this node is not on our local network @@ -400,29 +442,24 @@ impl RoutingTable { return false; } - // Disqualify nodes that don't have all our outbound protocol types + // Disqualify nodes that don't cover all our inbound ports for tcp and udp + // as we need to be able to use the relay for keepalives for all nat mappings + let mut low_level_protocol_ports = mapped_port_info.low_level_protocol_ports.clone(); + let can_serve_as_relay = e .node_info() .map(|n| { let dids = n.all_filtered_dial_info_details(|did| did.matches_filter(&outbound_dif)); - for pt in protocol_config.outbound { - for at in protocol_config.family_global { - let mut found = false; - for did in &dids { - if did.dial_info.protocol_type() == pt - && did.dial_info.address_type() == at - { - found = true; - break; - } - } - if !found { - return false; - } + for did in &dids { + let pt = did.dial_info.protocol_type(); + let at = did.dial_info.address_type(); + if let Some((llpt, port)) = mapped_port_info.protocol_to_port.get(&(pt, at)) + { + low_level_protocol_ports.remove(&(*llpt, at, *port)); } } - true + low_level_protocol_ports.is_empty() }) .unwrap_or(false); if !can_serve_as_relay { diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index bc163f9a..64386654 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -539,6 +539,22 @@ impl LocalNodeInfo { } } +#[allow(clippy::derive_hash_xor_eq)] +#[derive(Debug, PartialOrd, Ord, Hash, Serialize, Deserialize, EnumSetType)] +// Keep member order appropriate for sorting < preference +// Must match DialInfo order +pub enum LowLevelProtocolType { + UDP, + TCP, +} + +impl LowLevelProtocolType { + pub fn is_connection_oriented(&self) -> bool { + matches!(self, LowLevelProtocolType::TCP) + } +} +pub type LowLevelProtocolTypeSet = EnumSet; + #[allow(clippy::derive_hash_xor_eq)] #[derive(Debug, PartialOrd, Ord, Hash, Serialize, Deserialize, EnumSetType)] // Keep member order appropriate for sorting < preference @@ -557,6 +573,12 @@ impl ProtocolType { ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS ) } + pub fn low_level_protocol_type(&self) -> LowLevelProtocolType { + match self { + ProtocolType::UDP => LowLevelProtocolType::UDP, + ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS => LowLevelProtocolType::TCP, + } + } } pub type ProtocolTypeSet = EnumSet;