keepalive work

This commit is contained in:
John Smith 2022-08-18 16:21:13 -04:00
parent 6226845e9f
commit ee0e729a92
3 changed files with 98 additions and 21 deletions

View File

@ -336,17 +336,35 @@ impl NetworkManager {
let mut unord = FuturesUnordered::new(); let mut unord = FuturesUnordered::new();
let node_refs = routing_table.get_nodes_needing_ping(cur_ts, relay_node_id); let node_refs = routing_table.get_nodes_needing_ping(cur_ts, relay_node_id);
let mut mapped_port_info = routing_table.get_mapped_port_info();
for nr in node_refs { for nr in node_refs {
let rpc = rpc.clone(); let rpc = rpc.clone();
if Some(nr.node_id()) == relay_node_id { if Some(nr.node_id()) == relay_node_id {
// Relay nodes get pinged over all protocols we have inbound dialinfo for // Relay nodes get pinged over all protocols we have inbound dialinfo for
// This is so we can preserve the inbound NAT mappings at our router // This is so we can preserve the inbound NAT mappings at our router
for did in &dids { for did in &dids {
// Do we need to do this ping?
let pt = did.dial_info.protocol_type();
let at = did.dial_info.address_type();
let needs_ping = if let Some((llpt, port)) =
mapped_port_info.protocol_to_port.get(&(pt, at))
{
mapped_port_info
.low_level_protocol_ports
.remove(&(*llpt, at, *port))
} else {
false
};
if needs_ping {
let rpc = rpc.clone(); let rpc = rpc.clone();
let dif = did.dial_info.make_filter(true); let dif = did.dial_info.make_filter(true);
let nr_filtered = nr.filtered_clone(dif); let nr_filtered = nr.filtered_clone(dif);
log_net!("--> Keepalive ping to {:?}", nr_filtered);
unord.push(async move { rpc.rpc_call_status(nr_filtered).await }.boxed()); unord.push(async move { rpc.rpc_call_status(nr_filtered).await }.boxed());
} }
}
} else { } else {
// Just do a single ping with the best protocol for all the other nodes // Just do a single ping with the best protocol for all the other nodes
unord.push(async move { rpc.rpc_call_status(nr).await }.boxed()); unord.push(async move { rpc.rpc_call_status(nr).await }.boxed());

View File

@ -4,6 +4,14 @@ use crate::dht::*;
use crate::xx::*; use crate::xx::*;
use crate::*; use crate::*;
pub type LowLevelProtocolPorts = BTreeSet<(LowLevelProtocolType, AddressType, u16)>;
pub type ProtocolToPortMapping = BTreeMap<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>;
#[derive(Clone, Debug)]
pub struct MappedPortInfo {
pub low_level_protocol_ports: LowLevelProtocolPorts,
pub protocol_to_port: ProtocolToPortMapping,
}
impl RoutingTable { impl RoutingTable {
// Makes a filter that finds nodes with a matching inbound dialinfo // Makes a filter that finds nodes with a matching inbound dialinfo
pub fn make_inbound_dial_info_entry_filter( pub fn make_inbound_dial_info_entry_filter(
@ -383,12 +391,46 @@ impl RoutingTable {
out out
} }
// Build a map of protocols to low level ports
// This way we can get the set of protocols required to keep our NAT mapping alive for keepalive pings
// Only one protocol per low level protocol/port combination is required
// For example, if WS/WSS and TCP protocols are on the same low-level TCP port, only TCP keepalives will be required
// and we do not need to do WS/WSS keepalive as well. If they are on different ports, then we will need WS/WSS keepalives too.
pub fn get_mapped_port_info(&self) -> MappedPortInfo {
let mut low_level_protocol_ports =
BTreeSet::<(LowLevelProtocolType, AddressType, u16)>::new();
let mut protocol_to_port =
BTreeMap::<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>::new();
let our_dids = self.all_filtered_dial_info_details(
Some(RoutingDomain::PublicInternet),
&DialInfoFilter::all(),
);
for did in our_dids {
low_level_protocol_ports.insert((
did.dial_info.protocol_type().low_level_protocol_type(),
did.dial_info.address_type(),
did.dial_info.socket_address().port(),
));
protocol_to_port.insert(
(did.dial_info.protocol_type(), did.dial_info.address_type()),
(
did.dial_info.protocol_type().low_level_protocol_type(),
did.dial_info.socket_address().port(),
),
);
}
MappedPortInfo {
low_level_protocol_ports,
protocol_to_port,
}
}
fn make_relay_node_filter(&self) -> impl Fn(&BucketEntryInner) -> bool { fn make_relay_node_filter(&self) -> impl Fn(&BucketEntryInner) -> bool {
// Get all our outbound protocol/address types // Get all our outbound protocol/address types
let protocol_config = self.network_manager().get_protocol_config();
let outbound_dif = self let outbound_dif = self
.network_manager() .network_manager()
.get_outbound_dial_info_filter(RoutingDomain::PublicInternet); .get_outbound_dial_info_filter(RoutingDomain::PublicInternet);
let mapped_port_info = self.get_mapped_port_info();
move |e: &BucketEntryInner| { move |e: &BucketEntryInner| {
// Ensure this node is not on our local network // Ensure this node is not on our local network
@ -400,29 +442,24 @@ impl RoutingTable {
return false; return false;
} }
// Disqualify nodes that don't have all our outbound protocol types // Disqualify nodes that don't cover all our inbound ports for tcp and udp
// as we need to be able to use the relay for keepalives for all nat mappings
let mut low_level_protocol_ports = mapped_port_info.low_level_protocol_ports.clone();
let can_serve_as_relay = e let can_serve_as_relay = e
.node_info() .node_info()
.map(|n| { .map(|n| {
let dids = let dids =
n.all_filtered_dial_info_details(|did| did.matches_filter(&outbound_dif)); n.all_filtered_dial_info_details(|did| did.matches_filter(&outbound_dif));
for pt in protocol_config.outbound {
for at in protocol_config.family_global {
let mut found = false;
for did in &dids { for did in &dids {
if did.dial_info.protocol_type() == pt let pt = did.dial_info.protocol_type();
&& did.dial_info.address_type() == at let at = did.dial_info.address_type();
if let Some((llpt, port)) = mapped_port_info.protocol_to_port.get(&(pt, at))
{ {
found = true; low_level_protocol_ports.remove(&(*llpt, at, *port));
break;
} }
} }
if !found { low_level_protocol_ports.is_empty()
return false;
}
}
}
true
}) })
.unwrap_or(false); .unwrap_or(false);
if !can_serve_as_relay { if !can_serve_as_relay {

View File

@ -539,6 +539,22 @@ impl LocalNodeInfo {
} }
} }
#[allow(clippy::derive_hash_xor_eq)]
#[derive(Debug, PartialOrd, Ord, Hash, Serialize, Deserialize, EnumSetType)]
// Keep member order appropriate for sorting < preference
// Must match DialInfo order
pub enum LowLevelProtocolType {
UDP,
TCP,
}
impl LowLevelProtocolType {
pub fn is_connection_oriented(&self) -> bool {
matches!(self, LowLevelProtocolType::TCP)
}
}
pub type LowLevelProtocolTypeSet = EnumSet<LowLevelProtocolType>;
#[allow(clippy::derive_hash_xor_eq)] #[allow(clippy::derive_hash_xor_eq)]
#[derive(Debug, PartialOrd, Ord, Hash, Serialize, Deserialize, EnumSetType)] #[derive(Debug, PartialOrd, Ord, Hash, Serialize, Deserialize, EnumSetType)]
// Keep member order appropriate for sorting < preference // Keep member order appropriate for sorting < preference
@ -557,6 +573,12 @@ impl ProtocolType {
ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS
) )
} }
pub fn low_level_protocol_type(&self) -> LowLevelProtocolType {
match self {
ProtocolType::UDP => LowLevelProtocolType::UDP,
ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS => LowLevelProtocolType::TCP,
}
}
} }
pub type ProtocolTypeSet = EnumSet<ProtocolType>; pub type ProtocolTypeSet = EnumSet<ProtocolType>;