relay work

This commit is contained in:
Christien Rioux 2023-07-19 20:55:37 -04:00
parent 2fc979235e
commit dfc2a09847
3 changed files with 49 additions and 12 deletions

View File

@ -1046,11 +1046,17 @@ impl NetworkManager {
}; };
if let Some(relay_nr) = some_relay_nr { if let Some(relay_nr) = some_relay_nr {
// Ensure the protocol is forwarded exactly as is // Ensure the protocol used to forward is of the same sequencing requirement
// Address type is allowed to change if connectivity is better // Address type is allowed to change if connectivity is better
let relay_nr = relay_nr.filtered_clone( let relay_nr = if connection_descriptor.protocol_type().is_ordered() {
NodeRefFilter::new().with_protocol_type(connection_descriptor.protocol_type()), // XXX: this is a little redundant
); let (_, nrf) = NodeRefFilter::new().with_sequencing(Sequencing::EnsureOrdered);
let mut relay_nr = relay_nr.filtered_clone(nrf);
relay_nr.set_sequencing(Sequencing::EnsureOrdered);
relay_nr
} else {
relay_nr
};
// Relay the packet to the desired destination // Relay the packet to the desired destination
log_net!("relaying {} bytes to {}", data.len(), relay_nr); log_net!("relaying {} bytes to {}", data.len(), relay_nr);

View File

@ -103,7 +103,7 @@ impl NetworkManager {
target_node_ref: NodeRef, target_node_ref: NodeRef,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<SendDataKind>> { ) -> EyreResult<NetworkResult<SendDataKind>> {
// First try to send data to the last socket we've seen this peer on // First try to send data to the last connection we've seen this peer on
let Some(connection_descriptor) = target_node_ref.last_connection() else { let Some(connection_descriptor) = target_node_ref.last_connection() else {
return Ok(NetworkResult::no_connection_other( return Ok(NetworkResult::no_connection_other(
format!("should have found an existing connection: {}", target_node_ref) format!("should have found an existing connection: {}", target_node_ref)
@ -149,7 +149,7 @@ impl NetworkManager {
.is_some() .is_some()
{ {
return Ok(NetworkResult::no_connection_other( return Ok(NetworkResult::no_connection_other(
format!("failed to send to existing connection: {:?}", connection_descriptor) format!("failed to send to unreachable node over existing connection: {:?}", connection_descriptor)
)); ));
} }

View File

@ -43,12 +43,24 @@ impl RoutingTable {
.set_relay_node_keepalive(Some(cur_ts)) .set_relay_node_keepalive(Some(cur_ts))
.commit(); .commit();
// We need to keep-alive at one connection per ordering for relays
// but also one per NAT mapping that we need to keep open for our inbound dial info
let mut got_unordered = false;
let mut got_ordered = false;
// Look up any NAT mappings we may need to try to preserve with keepalives // Look up any NAT mappings we may need to try to preserve with keepalives
let mut mapped_port_info = self.get_low_level_port_info(); let mut mapped_port_info = self.get_low_level_port_info();
// Relay nodes get pinged over all protocols we have inbound dialinfo for // Relay nodes get pinged over all protocols we have inbound dialinfo for
// This is so we can preserve the inbound NAT mappings at our router // This is so we can preserve the inbound NAT mappings at our router
let mut relay_noderefs = vec![];
for did in &dids { for did in &dids {
// Can skip the ones that are direct, those are not mapped or natted
// because we can have both direct and natted dialinfo on the same
// node, for example ipv4 can be natted, while ipv6 is direct
if did.class == DialInfoClass::Direct {
continue;
}
// Do we need to do this ping? // Do we need to do this ping?
// Check if we have already pinged over this low-level-protocol/address-type/port combo // Check if we have already pinged over this low-level-protocol/address-type/port combo
// We want to ensure we do the bare minimum required here // We want to ensure we do the bare minimum required here
@ -62,16 +74,35 @@ impl RoutingTable {
} else { } else {
false false
}; };
if !needs_ping_for_protocol { if needs_ping_for_protocol {
continue; if pt.is_ordered() {
got_ordered = true;
} else {
got_unordered = true;
}
let dif = did.dial_info.make_filter();
let relay_nr_filtered =
relay_nr.filtered_clone(NodeRefFilter::new().with_dial_info_filter(dif));
relay_noderefs.push(relay_nr_filtered);
} }
}
// Add noderef filters for ordered or unordered sequencing if we havent already seen those
if !got_ordered {
let (_, nrf) = NodeRefFilter::new().with_sequencing(Sequencing::EnsureOrdered);
let mut relay_nr_filtered = relay_nr.filtered_clone(nrf);
relay_nr_filtered.set_sequencing(Sequencing::EnsureOrdered);
relay_noderefs.push(relay_nr_filtered);
}
if !got_unordered {
relay_noderefs.push(relay_nr);
}
for relay_nr_filtered in relay_noderefs {
let rpc = rpc.clone(); let rpc = rpc.clone();
let dif = did.dial_info.make_filter();
let relay_nr_filtered =
relay_nr.filtered_clone(NodeRefFilter::new().with_dial_info_filter(dif));
//#[cfg(feature = "network-result-extra")] #[cfg(feature = "network-result-extra")]
log_rtab!(debug "--> Keepalive ping to {:?}", relay_nr_filtered);
#[cfg(not(feature = "network-result-extra"))]
log_rtab!("--> Keepalive ping to {:?}", relay_nr_filtered); log_rtab!("--> Keepalive ping to {:?}", relay_nr_filtered);
unord.push( unord.push(