refactor get_contact_method

This commit is contained in:
John Smith
2022-10-12 22:53:40 -04:00
parent a06c2fb5a3
commit 2d526674a5
8 changed files with 424 additions and 222 deletions

View File

@@ -323,6 +323,110 @@ impl RoutingTable {
true
}
/// Look up the best way for two nodes to reach each other over a specific routing domain
#[instrument(level = "trace", skip(self), ret)]
pub fn get_contact_method(
&self,
routing_domain: RoutingDomain,
node_a_id: &DHTKey,
node_a: &NodeInfo,
node_b_id: &DHTKey,
node_b: &NodeInfo,
dial_info_filter: DialInfoFilter,
reliable: bool,
) -> ContactMethod {
let inner = &*self.inner.read();
Self::with_routing_domain(inner, routing_domain, |rdd| {
rdd.get_contact_method(
inner,
node_a_id,
node_a,
node_b_id,
node_b,
dial_info_filter,
reliable,
)
})
}
// Figure out how to reach a node from our own node over the best routing domain and reference the nodes we want to access
#[instrument(level = "trace", skip(self), ret)]
pub(crate) fn get_node_contact_method(
&self,
target_node_ref: NodeRef,
) -> EyreResult<NodeContactMethod> {
// Lock the routing table for read to ensure the table doesn't change
let inner = &*self.inner.read();
// Figure out the best routing domain to get the contact method over
let routing_domain = match target_node_ref.best_routing_domain() {
Some(rd) => rd,
None => {
log_net!("no routing domain for node {:?}", target_node_ref);
return Ok(NodeContactMethod::Unreachable);
}
};
// Node A is our own node
let node_a = self.get_own_node_info(routing_domain);
let node_a_id = self.node_id();
// Node B is the target node
let node_b = target_node_ref.operate(|_rti, e| e.node_info(routing_domain).unwrap());
let node_b_id = target_node_ref.node_id();
// Dial info filter comes from the target node ref
let dial_info_filter = target_node_ref.dial_info_filter();
let reliable = target_node_ref.reliable();
let cm = self.get_contact_method(
routing_domain,
&node_a_id,
&node_a,
&node_b_id,
node_b,
dial_info_filter,
reliable,
);
// Translate the raw contact method to a referenced contact method
Ok(match cm {
ContactMethod::Unreachable => NodeContactMethod::Unreachable,
ContactMethod::Existing => NodeContactMethod::Existing,
ContactMethod::Direct(di) => NodeContactMethod::Direct(di),
ContactMethod::SignalReverse(relay_key, target_key) => {
let relay_nr = self
.lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)
.ok_or_else(|| eyre!("couldn't look up relay"))?;
if target_node_ref.node_id() != target_key {
bail!("target noderef didn't match target key");
}
NodeContactMethod::SignalReverse(relay_nr, target_node_ref)
}
ContactMethod::SignalHolePunch(relay_key, target_key) => {
let relay_nr = self
.lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)
.ok_or_else(|| eyre!("couldn't look up relay"))?;
if target_node_ref.node_id() != target_key {
bail!("target noderef didn't match target key");
}
NodeContactMethod::SignalHolePunch(relay_nr, target_node_ref)
}
ContactMethod::InboundRelay(relay_key) => {
let relay_nr = self
.lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)
.ok_or_else(|| eyre!("couldn't look up relay"))?;
NodeContactMethod::InboundRelay(relay_nr)
}
ContactMethod::OutboundRelay(relay_key) => {
let relay_nr = self
.lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)
.ok_or_else(|| eyre!("couldn't look up relay"))?;
NodeContactMethod::OutboundRelay(relay_nr)
}
})
}
#[instrument(level = "debug", skip(self))]
pub fn edit_routing_domain(&self, domain: RoutingDomain) -> RoutingDomainEditor {
RoutingDomainEditor::new(self.clone(), domain)
@@ -487,8 +591,8 @@ impl RoutingTable {
}
}
// Attempt to empty the routing table
// should only be performed when there are no node_refs (detached)
/// Attempt to empty the routing table
/// should only be performed when there are no node_refs (detached)
pub fn purge_buckets(&self) {
let mut inner = self.inner.write();
let inner = &mut *inner;
@@ -505,7 +609,7 @@ impl RoutingTable {
);
}
// Attempt to remove last_connections from entries
/// Attempt to remove last_connections from entries
pub fn purge_last_connections(&self) {
let mut inner = self.inner.write();
let inner = &mut *inner;
@@ -526,8 +630,8 @@ impl RoutingTable {
);
}
// Attempt to settle buckets and remove entries down to the desired number
// which may not be possible due extant NodeRefs
/// Attempt to settle buckets and remove entries down to the desired number
/// which may not be possible due extant NodeRefs
fn kick_bucket(inner: &mut RoutingTableInner, idx: usize) {
let bucket = &mut inner.buckets[idx];
let bucket_depth = Self::bucket_depth(idx);
@@ -702,9 +806,9 @@ impl RoutingTable {
self.unlocked_inner.kick_queue.lock().insert(idx);
}
// Create a node reference, possibly creating a bucket entry
// the 'update_func' closure is called on the node, and, if created,
// in a locked fashion as to ensure the bucket entry state is always valid
/// Create a node reference, possibly creating a bucket entry
/// the 'update_func' closure is called on the node, and, if created,
/// in a locked fashion as to ensure the bucket entry state is always valid
pub fn create_node_ref<F>(&self, node_id: DHTKey, update_func: F) -> Option<NodeRef>
where
F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner),
@@ -759,6 +863,7 @@ impl RoutingTable {
Some(noderef)
}
/// Resolve an existing routing table entry and return a reference to it
pub fn lookup_node_ref(&self, node_id: DHTKey) -> Option<NodeRef> {
if node_id == self.unlocked_inner.node_id {
log_rtab!(debug "can't look up own node id in routing table");
@@ -772,9 +877,26 @@ impl RoutingTable {
.map(|e| NodeRef::new(self.clone(), node_id, e, None))
}
// Shortcut function to add a node to our routing table if it doesn't exist
// and add the dial info we have for it. Returns a noderef filtered to
// the routing domain in which this node was registered for convenience.
/// Resolve an existing routing table entry and return a filtered reference to it
pub fn lookup_and_filter_noderef(
&self,
node_id: DHTKey,
routing_domain_set: RoutingDomainSet,
dial_info_filter: DialInfoFilter,
) -> Option<NodeRef> {
let nr = self.lookup_node_ref(node_id)?;
Some(
nr.filtered_clone(
NodeRefFilter::new()
.with_dial_info_filter(dial_info_filter)
.with_routing_domain_set(routing_domain_set),
),
)
}
/// Shortcut function to add a node to our routing table if it doesn't exist
/// and add the dial info we have for it. Returns a noderef filtered to
/// the routing domain in which this node was registered for convenience.
pub fn register_node_with_signed_node_info(
&self,
routing_domain: RoutingDomain,
@@ -821,8 +943,8 @@ impl RoutingTable {
})
}
// Shortcut function to add a node to our routing table if it doesn't exist
// and add the last peer address we have for it, since that's pretty common
/// Shortcut function to add a node to our routing table if it doesn't exist
/// and add the last peer address we have for it, since that's pretty common
pub fn register_node_with_existing_connection(
&self,
node_id: DHTKey,
@@ -840,8 +962,8 @@ impl RoutingTable {
out
}
// Ticks about once per second
// to run tick tasks which may run at slower tick rates as configured
/// Ticks about once per second
/// to run tick tasks which may run at slower tick rates as configured
pub async fn tick(&self) -> EyreResult<()> {
// Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs
self.unlocked_inner.rolling_transfers_task.tick().await?;

View File

@@ -6,7 +6,7 @@ use alloc::fmt;
// We should ping them with some frequency and 30 seconds is typical timeout
const CONNECTIONLESS_TIMEOUT_SECS: u32 = 29;
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct NodeRefFilter {
pub routing_domain_set: RoutingDomainSet,
pub dial_info_filter: DialInfoFilter,
@@ -131,6 +131,9 @@ impl NodeRef {
pub fn set_reliable(&mut self) {
self.reliable = true;
}
pub fn reliable(&self) -> bool {
self.reliable
}
pub fn merge_filter(&mut self, filter: NodeRefFilter) {
if let Some(self_filter) = self.filter.take() {

View File

@@ -274,6 +274,10 @@ impl RouteSpecStore {
bail!("Not allocating route longer than max route hop count");
}
// Lock routing table for reading, make sure things don't change
// because we want to iterate the table without changes being made to it
let rti = routing_table.inner.read();
// Get list of all nodes, and sort them for selection
let cur_ts = intf::get_timestamp();
let dial_info_sort = if reliable {
@@ -409,19 +413,25 @@ impl RouteSpecStore {
if directions.contains(Direction::Outbound) {
let our_node_info =
routing_table.get_own_node_info(RoutingDomain::PublicInternet);
let mut previous_node_info = &our_node_info;
let our_node_id = routing_table.node_id();
let mut previous_node = &(our_node_id, our_node_info);
let mut reachable = true;
for n in permutation {
let current_node_info = &nodes.get(*n).as_ref().unwrap().1;
let cm = NetworkManager::get_node_contact_method(
previous_node_info,
current_node_info,
let current_node = nodes.get(*n).unwrap();
let cm = routing_table.get_contact_method(
RoutingDomain::PublicInternet,
&previous_node.0,
&previous_node.1,
&current_node.0,
&current_node.1,
DialInfoFilter::all(),
reliable,
);
if matches!(cm, ContactMethod::Unreachable) {
reachable = false;
break;
}
previous_node_info = current_node_info;
previous_node = current_node;
}
if !reachable {
return false;
@@ -430,19 +440,25 @@ impl RouteSpecStore {
if directions.contains(Direction::Inbound) {
let our_node_info =
routing_table.get_own_node_info(RoutingDomain::PublicInternet);
let mut next_node_info = &our_node_info;
let our_node_id = routing_table.node_id();
let mut next_node = &(our_node_id, our_node_info);
let mut reachable = true;
for n in permutation.iter().rev() {
let current_node_info = &nodes.get(*n).as_ref().unwrap().1;
let cm = NetworkManager::get_node_contact_method(
current_node_info,
next_node_info,
let current_node = nodes.get(*n).unwrap();
let cm = routing_table.get_contact_method(
RoutingDomain::PublicInternet,
&next_node.0,
&next_node.1,
&current_node.0,
&current_node.1,
DialInfoFilter::all(),
reliable,
);
if matches!(cm, ContactMethod::Unreachable) {
reachable = false;
break;
}
next_node_info = current_node_info;
next_node = current_node;
}
if !reachable {
return false;

View File

@@ -1,5 +1,24 @@
use super::*;
/// Mechanism required to contact another node
#[derive(Clone, Debug)]
pub(crate) enum ContactMethod {
/// Node is not reachable by any means
Unreachable,
/// Connection should have already existed
Existing,
/// Contact the node directly
Direct(DialInfo),
/// Request via signal the node connect back directly (relay, target)
SignalReverse(DHTKey, DHTKey),
/// Request via signal the node negotiate a hole punch (relay, target_node)
SignalHolePunch(DHTKey, DHTKey),
/// Must use an inbound relay to reach the node
InboundRelay(DHTKey),
/// Must use outbound relay to reach the node
OutboundRelay(DHTKey),
}
#[derive(Debug)]
pub struct RoutingDomainDetailCommon {
routing_domain: RoutingDomain,
@@ -147,8 +166,21 @@ pub trait RoutingDomainDetail {
fn common(&self) -> &RoutingDomainDetailCommon;
fn common_mut(&mut self) -> &mut RoutingDomainDetailCommon;
// Per-domain accessors
/// Can this routing domain contain a particular address
fn can_contain_address(&self, address: Address) -> bool;
/// Get the contact method required for node A to reach node B in this routing domain
/// Routing table must be locked for reading to use this function
fn get_contact_method(
&self,
rti: &RoutingTableInner,
node_a_id: &DHTKey,
node_a: &NodeInfo,
node_b_id: &DHTKey,
node_b: &NodeInfo,
dial_info_filter: DialInfoFilter,
reliable: bool,
) -> ContactMethod;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -168,6 +200,30 @@ impl Default for PublicInternetRoutingDomainDetail {
}
}
fn first_filtered_dial_info_detail(
from_node: &NodeInfo,
to_node: &NodeInfo,
dial_info_filter: &DialInfoFilter,
reliable: bool,
) -> Option<DialInfoDetail> {
let direct_dial_info_filter = dial_info_filter.clone().filtered(
&DialInfoFilter::all()
.with_address_type_set(from_node.address_types)
.with_protocol_type_set(from_node.outbound_protocols),
);
// Get first filtered dialinfo
let sort = if reliable {
Some(DialInfoDetail::reliable_sort)
} else {
None
};
let direct_filter = |did: &DialInfoDetail| did.matches_filter(&direct_dial_info_filter);
// Get the best match dial info for node B if we have it
to_node.first_filtered_dial_info_detail(sort, direct_filter)
}
impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
fn common(&self) -> &RoutingDomainDetailCommon {
&self.common
@@ -178,6 +234,128 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
fn can_contain_address(&self, address: Address) -> bool {
address.is_global()
}
fn get_contact_method(
&self,
_rti: &RoutingTableInner,
node_a_id: &DHTKey,
node_a: &NodeInfo,
node_b_id: &DHTKey,
node_b: &NodeInfo,
dial_info_filter: DialInfoFilter,
reliable: bool,
) -> ContactMethod {
// Get the best match dial info for node B if we have it
if let Some(target_did) =
first_filtered_dial_info_detail(node_a, node_b, &dial_info_filter, reliable)
{
// Do we need to signal before going inbound?
if !target_did.class.requires_signal() {
// Go direct without signaling
return ContactMethod::Direct(target_did.dial_info);
}
// Get the target's inbound relay, it must have one or it is not reachable
if let Some(inbound_relay) = node_b.relay_peer_info {
// Note that relay_peer_info could be node_a, in which case a connection already exists
// and we shouldn't have even gotten here
if inbound_relay.node_id.key == *node_a_id {
return ContactMethod::Existing;
}
// Can node A reach the inbound relay directly?
if first_filtered_dial_info_detail(
node_a,
&inbound_relay.signed_node_info.node_info,
&dial_info_filter,
reliable,
)
.is_some()
{
// Can node A receive anything inbound ever?
if matches!(node_a.network_class, NetworkClass::InboundCapable) {
///////// Reverse connection
// Get the best match dial info for an reverse inbound connection from node B to node A
if let Some(reverse_did) = first_filtered_dial_info_detail(
node_b,
node_a,
&dial_info_filter,
reliable,
) {
// Ensure we aren't on the same public IP address (no hairpin nat)
if reverse_did.dial_info.to_ip_addr()
!= target_did.dial_info.to_ip_addr()
{
// Can we receive a direct reverse connection?
if !reverse_did.class.requires_signal() {
return ContactMethod::SignalReverse(
inbound_relay.node_id.key,
*node_b_id,
);
}
}
}
///////// UDP hole-punch
// Does node B have a direct udp dialinfo node A can reach?
let udp_dial_info_filter = dial_info_filter
.clone()
.filtered(&DialInfoFilter::all().with_protocol_type(ProtocolType::UDP));
if let Some(target_udp_did) = first_filtered_dial_info_detail(
node_a,
node_b,
&udp_dial_info_filter,
reliable,
) {
// Does node A have a direct udp dialinfo that node B can reach?
if let Some(reverse_udp_did) = first_filtered_dial_info_detail(
node_b,
node_a,
&udp_dial_info_filter,
reliable,
) {
// Ensure we aren't on the same public IP address (no hairpin nat)
if reverse_udp_did.dial_info.to_ip_addr()
!= target_udp_did.dial_info.to_ip_addr()
{
// The target and ourselves have a udp dialinfo that they can reach
return ContactMethod::SignalHolePunch(
inbound_relay.node_id.key,
*node_b_id,
);
}
}
}
// Otherwise we have to inbound relay
}
return ContactMethod::InboundRelay(inbound_relay.node_id.key);
}
}
}
// If the node B has no direct dial info, it needs to have an inbound relay
else if let Some(inbound_relay) = node_b.relay_peer_info {
// Can we reach the full relay?
if first_filtered_dial_info_detail(
node_a,
&inbound_relay.signed_node_info.node_info,
&dial_info_filter,
reliable,
)
.is_some()
{
return ContactMethod::InboundRelay(inbound_relay.node_id.key);
}
}
// If node A can't reach the node by other means, it may need to use its own relay
if let Some(outbound_relay) = node_a.relay_peer_info {
return ContactMethod::OutboundRelay(outbound_relay.node_id.key);
}
ContactMethod::Unreachable
}
}
/// Local Network routing domain internals
@@ -225,4 +403,42 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
}
false
}
fn get_contact_method(
&self,
_rti: &RoutingTableInner,
_node_a_id: &DHTKey,
node_a: &NodeInfo,
_node_b_id: &DHTKey,
node_b: &NodeInfo,
dial_info_filter: DialInfoFilter,
reliable: bool,
) -> ContactMethod {
// Scope the filter down to protocols node A can do outbound
let dial_info_filter = dial_info_filter.filtered(
&DialInfoFilter::all()
.with_address_type_set(node_a.address_types)
.with_protocol_type_set(node_a.outbound_protocols),
);
// If the filter is dead then we won't be able to connect
if dial_info_filter.is_dead() {
return ContactMethod::Unreachable;
}
// Get first filtered dialinfo
let sort = if reliable {
Some(DialInfoDetail::reliable_sort)
} else {
None
};
let filter = |did: &DialInfoDetail| did.matches_filter(&dial_info_filter);
let opt_target_did = node_b.first_filtered_dial_info_detail(sort, filter);
if let Some(target_did) = opt_target_did {
return ContactMethod::Direct(target_did.dial_info);
}
ContactMethod::Unreachable
}
}