refactor and change protocol selection for new connections

This commit is contained in:
John Smith 2023-06-24 18:12:17 -04:00
parent 197b7fef6e
commit 355040a3e4
14 changed files with 835 additions and 614 deletions

View File

@ -0,0 +1,67 @@
use super::*;
impl NetworkManager {
// Direct bootstrap request handler (separate fallback mechanism from cheaper TXT bootstrap mechanism)
#[instrument(level = "trace", skip(self), ret, err)]
pub(crate) async fn handle_boot_request(
&self,
descriptor: ConnectionDescriptor,
) -> EyreResult<NetworkResult<()>> {
let routing_table = self.routing_table();
// Get a bunch of nodes with the various
let bootstrap_nodes = routing_table.find_bootstrap_nodes_filtered(2);
// Serialize out peer info
let bootstrap_peerinfo: Vec<PeerInfo> = bootstrap_nodes
.iter()
.filter_map(|nr| nr.make_peer_info(RoutingDomain::PublicInternet))
.collect();
let mut json_bytes = serialize_json(bootstrap_peerinfo).as_bytes().to_vec();
self.apply_network_key(&mut json_bytes);
// Reply with a chunk of signed routing table
match self
.net()
.send_data_to_existing_connection(descriptor, json_bytes)
.await?
{
None => {
// Bootstrap reply was sent
Ok(NetworkResult::value(()))
}
Some(_) => Ok(NetworkResult::no_connection_other(
"bootstrap reply could not be sent",
)),
}
}
// Direct bootstrap request
#[instrument(level = "trace", err, skip(self))]
pub async fn boot_request(&self, dial_info: DialInfo) -> EyreResult<Vec<PeerInfo>> {
let timeout_ms = self.with_config(|c| c.network.rpc.timeout_ms);
// Send boot magic to requested peer address
let mut data = BOOT_MAGIC.to_vec();
// Apply network key
self.apply_network_key(&mut data);
let mut out_data: Vec<u8> = network_result_value_or_log!(self
.net()
.send_recv_data_unbound_to_dial_info(dial_info, data, timeout_ms)
.await? =>
{
return Ok(Vec::new());
});
// Apply network key
self.apply_network_key(&mut out_data);
let bootstrap_peerinfo: Vec<PeerInfo> =
deserialize_json(std::str::from_utf8(&out_data).wrap_err("bad utf8 in boot peerinfo")?)
.wrap_err("failed to deserialize boot peerinfo")?;
Ok(bootstrap_peerinfo)
}
}

View File

@ -5,6 +5,8 @@ mod native;
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
mod wasm; mod wasm;
mod direct_boot;
mod send_data;
mod connection_handle; mod connection_handle;
mod connection_limits; mod connection_limits;
mod connection_manager; mod connection_manager;
@ -12,6 +14,7 @@ mod connection_table;
mod network_connection; mod network_connection;
mod tasks; mod tasks;
mod types; mod types;
mod stats;
pub mod tests; pub mod tests;
@ -20,6 +23,9 @@ pub mod tests;
pub use connection_manager::*; pub use connection_manager::*;
pub use network_connection::*; pub use network_connection::*;
pub use types::*; pub use types::*;
pub use send_data::*;
pub use direct_boot::*;
pub use stats::*;
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
use connection_handle::*; use connection_handle::*;
@ -42,6 +48,7 @@ use wasm::*;
pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE; pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE;
pub const IPADDR_TABLE_SIZE: usize = 1024; pub const IPADDR_TABLE_SIZE: usize = 1024;
pub const IPADDR_MAX_INACTIVE_DURATION_US: TimestampDuration = TimestampDuration::new(300_000_000u64); // 5 minutes pub const IPADDR_MAX_INACTIVE_DURATION_US: TimestampDuration = TimestampDuration::new(300_000_000u64); // 5 minutes
pub const NODE_CONTACT_METHOD_CACHE_SIZE: usize = 1024;
pub const PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT: usize = 3; pub const PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT: usize = 3;
pub const PUBLIC_ADDRESS_CHECK_CACHE_SIZE: usize = 8; pub const PUBLIC_ADDRESS_CHECK_CACHE_SIZE: usize = 8;
pub const PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS: u32 = 60; pub const PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS: u32 = 60;
@ -67,38 +74,6 @@ struct NetworkComponents {
receipt_manager: ReceiptManager, receipt_manager: ReceiptManager,
} }
// Statistics per address
#[derive(Clone, Default)]
pub struct PerAddressStats {
last_seen_ts: Timestamp,
transfer_stats_accounting: TransferStatsAccounting,
transfer_stats: TransferStatsDownUp,
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct PerAddressStatsKey(IpAddr);
impl Default for PerAddressStatsKey {
fn default() -> Self {
Self(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
}
}
// Statistics about the low-level network
#[derive(Clone)]
pub struct NetworkManagerStats {
self_stats: PerAddressStats,
per_address_stats: LruCache<PerAddressStatsKey, PerAddressStats>,
}
impl Default for NetworkManagerStats {
fn default() -> Self {
Self {
self_stats: PerAddressStats::default(),
per_address_stats: LruCache::new(IPADDR_TABLE_SIZE),
}
}
}
#[derive(Debug)] #[derive(Debug)]
struct ClientWhitelistEntry { struct ClientWhitelistEntry {
@ -130,6 +105,12 @@ pub(crate) enum NodeContactMethod {
/// Must use outbound relay to reach the node /// Must use outbound relay to reach the node
OutboundRelay(NodeRef), OutboundRelay(NodeRef),
} }
#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)]
struct NodeContactMethodCacheKey {
own_node_info_ts: Option<Timestamp>,
target_node_info_ts: Timestamp,
target_node_ref_filter: Option<NodeRefFilter>,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)] #[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)]
struct PublicAddressCheckCacheKey(ProtocolType, AddressType); struct PublicAddressCheckCacheKey(ProtocolType, AddressType);
@ -138,6 +119,7 @@ struct PublicAddressCheckCacheKey(ProtocolType, AddressType);
struct NetworkManagerInner { struct NetworkManagerInner {
stats: NetworkManagerStats, stats: NetworkManagerStats,
client_whitelist: LruCache<TypedKey, ClientWhitelistEntry>, client_whitelist: LruCache<TypedKey, ClientWhitelistEntry>,
node_contact_method_cache: LruCache<NodeContactMethodCacheKey, NodeContactMethod>,
public_address_check_cache: public_address_check_cache:
BTreeMap<PublicAddressCheckCacheKey, LruCache<IpAddr, SocketAddress>>, BTreeMap<PublicAddressCheckCacheKey, LruCache<IpAddr, SocketAddress>>,
public_address_inconsistencies_table: public_address_inconsistencies_table:
@ -175,6 +157,7 @@ impl NetworkManager {
NetworkManagerInner { NetworkManagerInner {
stats: NetworkManagerStats::default(), stats: NetworkManagerStats::default(),
client_whitelist: LruCache::new_unbounded(), client_whitelist: LruCache::new_unbounded(),
node_contact_method_cache: LruCache::new(NODE_CONTACT_METHOD_CACHE_SIZE),
public_address_check_cache: BTreeMap::new(), public_address_check_cache: BTreeMap::new(),
public_address_inconsistencies_table: BTreeMap::new(), public_address_inconsistencies_table: BTreeMap::new(),
} }
@ -889,463 +872,6 @@ impl NetworkManager {
Ok(()) Ok(())
} }
/// Send a reverse connection signal and wait for the return receipt over it
/// Then send the data across the new connection
/// Only usable for PublicInternet routing domain
#[instrument(level = "trace", skip(self, data), err)]
pub async fn do_reverse_connect(
&self,
relay_nr: NodeRef,
target_nr: NodeRef,
data: Vec<u8>,
) -> EyreResult<NetworkResult<ConnectionDescriptor>> {
// Build a return receipt for the signal
let receipt_timeout = ms_to_us(
self.unlocked_inner
.config
.get()
.network
.reverse_connection_receipt_time_ms,
);
let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?;
// Get target routing domain
let Some(routing_domain) = target_nr.best_routing_domain() else {
return Ok(NetworkResult::no_connection_other("No routing domain for target"));
};
// Get our peer info
let Some(peer_info) = self
.routing_table()
.get_own_peer_info(routing_domain) else {
return Ok(NetworkResult::no_connection_other("Own peer info not available"));
};
// Issue the signal
let rpc = self.rpc_processor();
network_result_try!(rpc
.rpc_call_signal(
Destination::relay(relay_nr, target_nr.clone()),
SignalInfo::ReverseConnect { receipt, peer_info },
)
.await
.wrap_err("failed to send signal")?);
// Wait for the return receipt
let inbound_nr = match eventual_value.await.take_value().unwrap() {
ReceiptEvent::ReturnedPrivate { private_route: _ }
| ReceiptEvent::ReturnedOutOfBand
| ReceiptEvent::ReturnedSafety => {
return Ok(NetworkResult::invalid_message(
"reverse connect receipt should be returned in-band",
));
}
ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef,
ReceiptEvent::Expired => {
return Ok(NetworkResult::timeout());
}
ReceiptEvent::Cancelled => {
return Ok(NetworkResult::no_connection_other(format!("reverse connect receipt cancelled from {}", target_nr)))
}
};
// We expect the inbound noderef to be the same as the target noderef
// if they aren't the same, we should error on this and figure out what then hell is up
if !target_nr.same_entry(&inbound_nr) {
bail!("unexpected noderef mismatch on reverse connect");
}
// And now use the existing connection to send over
if let Some(descriptor) = inbound_nr.last_connection() {
match self
.net()
.send_data_to_existing_connection(descriptor, data)
.await?
{
None => Ok(NetworkResult::value(descriptor)),
Some(_) => Ok(NetworkResult::no_connection_other(
"unable to send over reverse connection",
)),
}
} else {
bail!("no reverse connection available")
}
}
/// Send a hole punch signal and do a negotiating ping and wait for the return receipt
/// Then send the data across the new connection
/// Only usable for PublicInternet routing domain
#[instrument(level = "trace", skip(self, data), err)]
pub async fn do_hole_punch(
&self,
relay_nr: NodeRef,
target_nr: NodeRef,
data: Vec<u8>,
) -> EyreResult<NetworkResult<ConnectionDescriptor>> {
// Ensure we are filtered down to UDP (the only hole punch protocol supported today)
assert!(target_nr
.filter_ref()
.map(|nrf| nrf.dial_info_filter.protocol_type_set
== ProtocolTypeSet::only(ProtocolType::UDP))
.unwrap_or_default());
// Build a return receipt for the signal
let receipt_timeout = ms_to_us(
self.unlocked_inner
.config
.get()
.network
.hole_punch_receipt_time_ms,
);
let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?;
// Get target routing domain
let Some(routing_domain) = target_nr.best_routing_domain() else {
return Ok(NetworkResult::no_connection_other("No routing domain for target"));
};
// Get our peer info
let Some(peer_info) = self
.routing_table()
.get_own_peer_info(routing_domain) else {
return Ok(NetworkResult::no_connection_other("Own peer info not available"));
};
// Get the udp direct dialinfo for the hole punch
let hole_punch_did = target_nr
.first_filtered_dial_info_detail()
.ok_or_else(|| eyre!("No hole punch capable dialinfo found for node"))?;
// Do our half of the hole punch by sending an empty packet
// Both sides will do this and then the receipt will get sent over the punched hole
// Don't bother storing the returned connection descriptor as the 'last connection' because the other side of the hole
// punch should come through and create a real 'last connection' for us if this succeeds
network_result_try!(
self.net()
.send_data_to_dial_info(hole_punch_did.dial_info, Vec::new())
.await?
);
// Issue the signal
let rpc = self.rpc_processor();
network_result_try!(rpc
.rpc_call_signal(
Destination::relay(relay_nr, target_nr.clone()),
SignalInfo::HolePunch { receipt, peer_info },
)
.await
.wrap_err("failed to send signal")?);
// Wait for the return receipt
let inbound_nr = match eventual_value.await.take_value().unwrap() {
ReceiptEvent::ReturnedPrivate { private_route: _ }
| ReceiptEvent::ReturnedOutOfBand
| ReceiptEvent::ReturnedSafety => {
return Ok(NetworkResult::invalid_message(
"hole punch receipt should be returned in-band",
));
}
ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef,
ReceiptEvent::Expired => {
return Ok(NetworkResult::timeout());
}
ReceiptEvent::Cancelled => {
return Ok(NetworkResult::no_connection_other(format!("hole punch receipt cancelled from {}", target_nr)))
}
};
// We expect the inbound noderef to be the same as the target noderef
// if they aren't the same, we should error on this and figure out what then hell is up
if !target_nr.same_entry(&inbound_nr) {
bail!(
"unexpected noderef mismatch on hole punch {}, expected {}",
inbound_nr,
target_nr
);
}
// And now use the existing connection to send over
if let Some(descriptor) = inbound_nr.last_connection() {
match self
.net()
.send_data_to_existing_connection(descriptor, data)
.await?
{
None => Ok(NetworkResult::value(descriptor)),
Some(_) => Ok(NetworkResult::no_connection_other(
"unable to send over hole punch",
)),
}
} else {
bail!("no hole punch available")
}
}
/// Figure out how to reach a node from our own node over the best routing domain and reference the nodes we want to access
/// Uses NodeRefs to ensure nodes are referenced, this is not a part of 'RoutingTable' because RoutingTable is not
/// allowed to use NodeRefs due to recursive locking
#[instrument(level = "trace", skip(self), ret)]
pub(crate) fn get_node_contact_method(
&self,
target_node_ref: NodeRef,
) -> EyreResult<NodeContactMethod> {
let routing_table = self.routing_table();
// Figure out the best routing domain to get the contact method over
let routing_domain = match target_node_ref.best_routing_domain() {
Some(rd) => rd,
None => {
log_net!("no routing domain for node {:?}", target_node_ref);
return Ok(NodeContactMethod::Unreachable);
}
};
// Node A is our own node
// Use whatever node info we've calculated so far
let peer_a = routing_table.get_best_effort_own_peer_info(routing_domain);
// Node B is the target node
let peer_b = match target_node_ref.make_peer_info(routing_domain) {
Some(ni) => ni,
None => {
log_net!("no node info for node {:?}", target_node_ref);
return Ok(NodeContactMethod::Unreachable);
}
};
// Dial info filter comes from the target node ref
let dial_info_filter = target_node_ref.dial_info_filter();
let mut sequencing = target_node_ref.sequencing();
// If the node has had lost questions or failures to send, prefer sequencing
// to improve reliability. The node may be experiencing UDP fragmentation drops
// or other firewalling issues and may perform better with TCP.
let unreliable = target_node_ref.peer_stats().rpc_stats.failed_to_send > 2 || target_node_ref.peer_stats().rpc_stats.recent_lost_answers > 2;
if unreliable && sequencing < Sequencing::PreferOrdered {
log_net!(debug "Node contact failing over to Ordered for {}", target_node_ref.to_string().cyan());
sequencing = Sequencing::PreferOrdered;
}
// Get the best contact method with these parameters from the routing domain
let cm = routing_table.get_contact_method(
routing_domain,
&peer_a,
&peer_b,
dial_info_filter,
sequencing,
);
// Translate the raw contact method to a referenced contact method
Ok(match cm {
ContactMethod::Unreachable => NodeContactMethod::Unreachable,
ContactMethod::Existing => NodeContactMethod::Existing,
ContactMethod::Direct(di) => NodeContactMethod::Direct(di),
ContactMethod::SignalReverse(relay_key, target_key) => {
let relay_nr = routing_table
.lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)?
.ok_or_else(|| eyre!("couldn't look up relay"))?;
if !target_node_ref.node_ids().contains(&target_key) {
bail!("signalreverse target noderef didn't match target key: {:?} != {} for relay {}", target_node_ref, target_key, relay_key );
}
NodeContactMethod::SignalReverse(relay_nr, target_node_ref)
}
ContactMethod::SignalHolePunch(relay_key, target_key) => {
let relay_nr = routing_table
.lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)?
.ok_or_else(|| eyre!("couldn't look up relay"))?;
if !target_node_ref.node_ids().contains(&target_key) {
bail!("signalholepunch target noderef didn't match target key: {:?} != {} for relay {}", target_node_ref, target_key, relay_key );
}
// if any other protocol were possible here we could update this and do_hole_punch
// but tcp hole punch is very very unreliable it seems
let udp_target_node_ref = target_node_ref.filtered_clone(NodeRefFilter::new().with_protocol_type(ProtocolType::UDP));
NodeContactMethod::SignalHolePunch(relay_nr, udp_target_node_ref)
}
ContactMethod::InboundRelay(relay_key) => {
let relay_nr = routing_table
.lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)?
.ok_or_else(|| eyre!("couldn't look up relay"))?;
NodeContactMethod::InboundRelay(relay_nr)
}
ContactMethod::OutboundRelay(relay_key) => {
let relay_nr = routing_table
.lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)?
.ok_or_else(|| eyre!("couldn't look up relay"))?;
NodeContactMethod::OutboundRelay(relay_nr)
}
})
}
/// Send raw data to a node
///
/// We may not have dial info for a node, but have an existing connection for it
/// because an inbound connection happened first, and no FindNodeQ has happened to that
/// node yet to discover its dial info. The existing connection should be tried first
/// in this case.
///
/// Sending to a node requires determining a NetworkClass compatible mechanism
pub fn send_data(
&self,
node_ref: NodeRef,
data: Vec<u8>,
) -> SendPinBoxFuture<EyreResult<NetworkResult<SendDataKind>>> {
let this = self.clone();
Box::pin(
async move {
// info!("{}", format!("send_data to: {:?}", node_ref).red());
// First try to send data to the last socket we've seen this peer on
let data = if let Some(connection_descriptor) = node_ref.last_connection() {
// info!(
// "{}",
// format!("last_connection to: {:?}", connection_descriptor).red()
// );
match this
.net()
.send_data_to_existing_connection(connection_descriptor, data)
.await?
{
None => {
// info!(
// "{}",
// format!("sent to existing connection: {:?}", connection_descriptor)
// .red()
// );
// Update timestamp for this last connection since we just sent to it
node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp());
return Ok(NetworkResult::value(SendDataKind::Existing(
connection_descriptor,
)));
}
Some(d) => d,
}
} else {
data
};
// info!("{}", "no existing connection".red());
// If we don't have last_connection, try to reach out to the peer via its dial info
let contact_method = this.get_node_contact_method(node_ref.clone())?;
log_net!(
"send_data via {:?} to dialinfo {:?}",
contact_method,
node_ref
);
match contact_method {
NodeContactMethod::OutboundRelay(relay_nr)
| NodeContactMethod::InboundRelay(relay_nr) => {
network_result_try!(this.send_data(relay_nr, data).await?);
Ok(NetworkResult::value(SendDataKind::Indirect))
}
NodeContactMethod::Direct(dial_info) => {
let connection_descriptor = network_result_try!(
this.net().send_data_to_dial_info(dial_info, data).await?
);
// If we connected to this node directly, save off the last connection so we can use it again
node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp());
Ok(NetworkResult::value(SendDataKind::Direct(
connection_descriptor,
)))
}
NodeContactMethod::SignalReverse(relay_nr, target_node_ref) => {
let connection_descriptor = network_result_try!(
this.do_reverse_connect(relay_nr, target_node_ref, data)
.await?
);
Ok(NetworkResult::value(SendDataKind::Direct(
connection_descriptor,
)))
}
NodeContactMethod::SignalHolePunch(relay_nr, target_node_ref) => {
let connection_descriptor = network_result_try!(
this.do_hole_punch(relay_nr, target_node_ref, data).await?
);
Ok(NetworkResult::value(SendDataKind::Direct(
connection_descriptor,
)))
}
NodeContactMethod::Existing => Ok(NetworkResult::no_connection_other(
"should have found an existing connection",
)),
NodeContactMethod::Unreachable => Ok(NetworkResult::no_connection_other(
"Can't send to this node",
)),
}
}
.instrument(trace_span!("send_data")),
)
}
// Direct bootstrap request handler (separate fallback mechanism from cheaper TXT bootstrap mechanism)
#[instrument(level = "trace", skip(self), ret, err)]
async fn handle_boot_request(
&self,
descriptor: ConnectionDescriptor,
) -> EyreResult<NetworkResult<()>> {
let routing_table = self.routing_table();
// Get a bunch of nodes with the various
let bootstrap_nodes = routing_table.find_bootstrap_nodes_filtered(2);
// Serialize out peer info
let bootstrap_peerinfo: Vec<PeerInfo> = bootstrap_nodes
.iter()
.filter_map(|nr| nr.make_peer_info(RoutingDomain::PublicInternet))
.collect();
let mut json_bytes = serialize_json(bootstrap_peerinfo).as_bytes().to_vec();
self.apply_network_key(&mut json_bytes);
// Reply with a chunk of signed routing table
match self
.net()
.send_data_to_existing_connection(descriptor, json_bytes)
.await?
{
None => {
// Bootstrap reply was sent
Ok(NetworkResult::value(()))
}
Some(_) => Ok(NetworkResult::no_connection_other(
"bootstrap reply could not be sent",
)),
}
}
// Direct bootstrap request
#[instrument(level = "trace", err, skip(self))]
pub async fn boot_request(&self, dial_info: DialInfo) -> EyreResult<Vec<PeerInfo>> {
let timeout_ms = self.with_config(|c| c.network.rpc.timeout_ms);
// Send boot magic to requested peer address
let mut data = BOOT_MAGIC.to_vec();
// Apply network key
self.apply_network_key(&mut data);
let mut out_data: Vec<u8> = network_result_value_or_log!(self
.net()
.send_recv_data_unbound_to_dial_info(dial_info, data, timeout_ms)
.await? =>
{
return Ok(Vec::new());
});
// Apply network key
self.apply_network_key(&mut out_data);
let bootstrap_peerinfo: Vec<PeerInfo> =
deserialize_json(std::str::from_utf8(&out_data).wrap_err("bad utf8 in boot peerinfo")?)
.wrap_err("failed to deserialize boot peerinfo")?;
Ok(bootstrap_peerinfo)
}
// Network isolation encryption // Network isolation encryption
fn apply_network_key(&self, data: &mut [u8]) { fn apply_network_key(&self, data: &mut [u8]) {
if let Some(network_key) = self.unlocked_inner.network_key { if let Some(network_key) = self.unlocked_inner.network_key {
@ -1576,108 +1102,6 @@ impl NetworkManager {
Ok(true) Ok(true)
} }
// Callbacks from low level network for statistics gathering
pub fn stats_packet_sent(&self, addr: IpAddr, bytes: ByteCount) {
let inner = &mut *self.inner.lock();
inner
.stats
.self_stats
.transfer_stats_accounting
.add_up(bytes);
inner
.stats
.per_address_stats
.entry(PerAddressStatsKey(addr), |_k,_v| {
// do nothing on LRU evict
})
.or_insert(PerAddressStats::default())
.transfer_stats_accounting
.add_up(bytes);
}
pub fn stats_packet_rcvd(&self, addr: IpAddr, bytes: ByteCount) {
let inner = &mut *self.inner.lock();
inner
.stats
.self_stats
.transfer_stats_accounting
.add_down(bytes);
inner
.stats
.per_address_stats
.entry(PerAddressStatsKey(addr), |_k,_v| {
// do nothing on LRU evict
})
.or_insert(PerAddressStats::default())
.transfer_stats_accounting
.add_down(bytes);
}
// Get stats
pub fn get_stats(&self) -> NetworkManagerStats {
let inner = self.inner.lock();
inner.stats.clone()
}
pub fn get_veilid_state(&self) -> VeilidStateNetwork {
let has_state = self
.unlocked_inner
.components
.read()
.as_ref()
.map(|c| c.net.is_started())
.unwrap_or(false);
if !has_state {
return VeilidStateNetwork {
started: false,
bps_down: 0.into(),
bps_up: 0.into(),
peers: Vec::new(),
};
}
let routing_table = self.routing_table();
let (bps_down, bps_up) = {
let inner = self.inner.lock();
(
inner.stats.self_stats.transfer_stats.down.average,
inner.stats.self_stats.transfer_stats.up.average,
)
};
VeilidStateNetwork {
started: true,
bps_down,
bps_up,
peers: {
let mut out = Vec::new();
for (k, v) in routing_table.get_recent_peers() {
if let Ok(Some(nr)) = routing_table.lookup_node_ref(k) {
let peer_stats = nr.peer_stats();
let peer = PeerTableData {
node_ids: nr.node_ids().iter().copied().collect(),
peer_address: v.last_connection.remote().to_string(),
peer_stats,
};
out.push(peer);
}
}
out
},
}
}
fn send_network_update(&self) {
let update_cb = self.unlocked_inner.update_callback.read().clone();
if update_cb.is_none() {
return;
}
let state = self.get_veilid_state();
(update_cb.unwrap())(VeilidUpdate::Network(state));
}
// Determine if a local IP address has changed // Determine if a local IP address has changed
// this means we should restart the low level network and and recreate all of our dial info // this means we should restart the low level network and and recreate all of our dial info
// Wait until we have received confirmation from N different peers // Wait until we have received confirmation from N different peers

View File

@ -0,0 +1,553 @@
use super::*;
impl NetworkManager {
/// Send raw data to a node
///
/// We may not have dial info for a node, but have an existing connection for it
/// because an inbound connection happened first, and no FindNodeQ has happened to that
/// node yet to discover its dial info. The existing connection should be tried first
/// in this case, if it matches the node ref's filters and no more permissive connection
/// could be established.
///
/// Sending to a node requires determining a NetworkClass compatible mechanism
pub fn send_data(
&self,
target_node_ref: NodeRef,
data: Vec<u8>,
) -> SendPinBoxFuture<EyreResult<NetworkResult<SendDataKind>>> {
let this = self.clone();
Box::pin(
async move {
// Get the best way to contact this node
let contact_method = this.get_node_contact_method(target_node_ref.clone())?;
// If we need to relay, do it
let (contact_method, node_ref, relayed) = match contact_method {
NodeContactMethod::OutboundRelay(relay_nr)
| NodeContactMethod::InboundRelay(relay_nr) => {
let cm = this.get_node_contact_method(relay_nr.clone())?;
(cm, relay_nr, true)
}
cm => (cm, target_node_ref.clone(), false),
};
// Try the contact method
let sdk = match contact_method {
NodeContactMethod::OutboundRelay(relay_nr)
| NodeContactMethod::InboundRelay(relay_nr) => {
// Relay loop or multiple relays
bail!(
"Relay loop or multiple relays detected: {} -> {} -> {}",
target_node_ref,
node_ref,
relay_nr
);
}
NodeContactMethod::Direct(dial_info) => {
network_result_try!(
this.send_data_ncm_direct(node_ref, dial_info, data).await?
)
}
NodeContactMethod::SignalReverse(relay_nr, target_node_ref) => {
network_result_try!(
this.send_data_ncm_signal_reverse(relay_nr, target_node_ref, data)
.await?
)
}
NodeContactMethod::SignalHolePunch(relay_nr, target_node_ref) => {
network_result_try!(
this.send_data_ncm_signal_hole_punch(relay_nr, target_node_ref, data)
.await?
)
}
NodeContactMethod::Existing => {
network_result_try!(
this.send_data_ncm_existing(target_node_ref, data).await?
)
}
NodeContactMethod::Unreachable => {
return Ok(NetworkResult::no_connection_other(
"Can't send to this node",
));
}
};
if relayed {
return Ok(NetworkResult::value(SendDataKind::Indirect));
}
Ok(NetworkResult::value(sdk))
}
.instrument(trace_span!("send_data")),
)
}
/// Send data using NodeContactMethod::Existing
async fn send_data_ncm_existing(
&self,
target_node_ref: NodeRef,
data: Vec<u8>,
) -> EyreResult<NetworkResult<SendDataKind>> {
// First try to send data to the last socket we've seen this peer on
let Some(connection_descriptor) = target_node_ref.last_connection() else {
return Ok(NetworkResult::no_connection_other(
"should have found an existing connection",
));
};
if self
.net()
.send_data_to_existing_connection(connection_descriptor, data)
.await?
.is_some()
{
return Ok(NetworkResult::no_connection_other(
"failed to send to existing connection",
));
}
// Update timestamp for this last connection since we just sent to it
target_node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp());
Ok(NetworkResult::value(SendDataKind::Existing(
connection_descriptor,
)))
}
/// Send data using NodeContactMethod::SignalReverse
async fn send_data_ncm_signal_reverse(
&self,
relay_nr: NodeRef,
target_node_ref: NodeRef,
data: Vec<u8>,
) -> EyreResult<NetworkResult<SendDataKind>> {
// First try to send data to the last socket we've seen this peer on
let data = if let Some(connection_descriptor) = target_node_ref.last_connection() {
match self
.net()
.send_data_to_existing_connection(connection_descriptor, data)
.await?
{
None => {
// Update timestamp for this last connection since we just sent to it
target_node_ref
.set_last_connection(connection_descriptor, get_aligned_timestamp());
return Ok(NetworkResult::value(SendDataKind::Existing(
connection_descriptor,
)));
}
Some(data) => {
// Couldn't send data to existing connection
// so pass the data back out
data
}
}
} else {
// No last connection
data
};
let connection_descriptor = network_result_try!(
self.do_reverse_connect(relay_nr, target_node_ref, data)
.await?
);
Ok(NetworkResult::value(SendDataKind::Direct(
connection_descriptor,
)))
}
/// Send data using NodeContactMethod::SignalHolePunch
async fn send_data_ncm_signal_hole_punch(
&self,
relay_nr: NodeRef,
target_node_ref: NodeRef,
data: Vec<u8>,
) -> EyreResult<NetworkResult<SendDataKind>> {
// First try to send data to the last socket we've seen this peer on
let data = if let Some(connection_descriptor) = target_node_ref.last_connection() {
match self
.net()
.send_data_to_existing_connection(connection_descriptor, data)
.await?
{
None => {
// Update timestamp for this last connection since we just sent to it
target_node_ref
.set_last_connection(connection_descriptor, get_aligned_timestamp());
return Ok(NetworkResult::value(SendDataKind::Existing(
connection_descriptor,
)));
}
Some(data) => {
// Couldn't send data to existing connection
// so pass the data back out
data
}
}
} else {
// No last connection
data
};
let connection_descriptor =
network_result_try!(self.do_hole_punch(relay_nr, target_node_ref, data).await?);
Ok(NetworkResult::value(SendDataKind::Direct(
connection_descriptor,
)))
}
/// Send data using NodeContactMethod::Direct
async fn send_data_ncm_direct(
&self,
node_ref: NodeRef,
dial_info: DialInfo,
data: Vec<u8>,
) -> EyreResult<NetworkResult<SendDataKind>> {
// Since we have the best dial info already, we can find a connection to use by protocol type
let node_ref = node_ref.filtered_clone(NodeRefFilter::from(dial_info.make_filter()));
// First try to send data to the last socket we've seen this peer on
let data = if let Some(connection_descriptor) = node_ref.last_connection() {
match self
.net()
.send_data_to_existing_connection(connection_descriptor, data)
.await?
{
None => {
// Update timestamp for this last connection since we just sent to it
node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp());
return Ok(NetworkResult::value(SendDataKind::Existing(
connection_descriptor,
)));
}
Some(d) => d,
}
} else {
data
};
// New direct connection was necessary for this dial info
let connection_descriptor =
network_result_try!(self.net().send_data_to_dial_info(dial_info, data).await?);
// If we connected to this node directly, save off the last connection so we can use it again
node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp());
Ok(NetworkResult::value(SendDataKind::Direct(
connection_descriptor,
)))
}
/// Figure out how to reach a node from our own node over the best routing domain and reference the nodes we want to access
/// Uses NodeRefs to ensure nodes are referenced, this is not a part of 'RoutingTable' because RoutingTable is not
/// allowed to use NodeRefs due to recursive locking
pub(crate) fn get_node_contact_method(
&self,
target_node_ref: NodeRef,
) -> EyreResult<NodeContactMethod> {
let routing_table = self.routing_table();
// Figure out the best routing domain to get the contact method over
let routing_domain = match target_node_ref.best_routing_domain() {
Some(rd) => rd,
None => {
log_net!("no routing domain for node {:?}", target_node_ref);
return Ok(NodeContactMethod::Unreachable);
}
};
// Get cache key
let ncm_key = NodeContactMethodCacheKey {
own_node_info_ts: routing_table.get_own_node_info_ts(routing_domain),
target_node_info_ts: target_node_ref.node_info_ts(routing_domain),
target_node_ref_filter: target_node_ref.filter_ref().cloned(),
};
if let Some(ncm) = self.inner.lock().node_contact_method_cache.get(&ncm_key) {
return Ok(ncm.clone());
}
// Node A is our own node
// Use whatever node info we've calculated so far
let peer_a = routing_table.get_best_effort_own_peer_info(routing_domain);
// Node B is the target node
let peer_b = match target_node_ref.make_peer_info(routing_domain) {
Some(ni) => ni,
None => {
log_net!("no node info for node {:?}", target_node_ref);
return Ok(NodeContactMethod::Unreachable);
}
};
// Dial info filter comes from the target node ref
let dial_info_filter = target_node_ref.dial_info_filter();
let sequencing = target_node_ref.sequencing();
// If the node has had lost questions or failures to send, prefer sequencing
// to improve reliability. The node may be experiencing UDP fragmentation drops
// or other firewalling issues and may perform better with TCP.
// let unreliable = target_node_ref.peer_stats().rpc_stats.failed_to_send > 2 || target_node_ref.peer_stats().rpc_stats.recent_lost_answers > 2;
// if unreliable && sequencing < Sequencing::PreferOrdered {
// log_net!(debug "Node contact failing over to Ordered for {}", target_node_ref.to_string().cyan());
// sequencing = Sequencing::PreferOrdered;
// }
// Get the best contact method with these parameters from the routing domain
let cm = routing_table.get_contact_method(
routing_domain,
&peer_a,
&peer_b,
dial_info_filter,
sequencing,
);
// Translate the raw contact method to a referenced contact method
let ncm = match cm {
ContactMethod::Unreachable => NodeContactMethod::Unreachable,
ContactMethod::Existing => NodeContactMethod::Existing,
ContactMethod::Direct(di) => NodeContactMethod::Direct(di),
ContactMethod::SignalReverse(relay_key, target_key) => {
let relay_nr = routing_table
.lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)?
.ok_or_else(|| eyre!("couldn't look up relay"))?;
if !target_node_ref.node_ids().contains(&target_key) {
bail!("signalreverse target noderef didn't match target key: {:?} != {} for relay {}", target_node_ref, target_key, relay_key );
}
NodeContactMethod::SignalReverse(relay_nr, target_node_ref)
}
ContactMethod::SignalHolePunch(relay_key, target_key) => {
let relay_nr = routing_table
.lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)?
.ok_or_else(|| eyre!("couldn't look up relay"))?;
if !target_node_ref.node_ids().contains(&target_key) {
bail!("signalholepunch target noderef didn't match target key: {:?} != {} for relay {}", target_node_ref, target_key, relay_key );
}
// if any other protocol were possible here we could update this and do_hole_punch
// but tcp hole punch is very very unreliable it seems
let udp_target_node_ref = target_node_ref
.filtered_clone(NodeRefFilter::new().with_protocol_type(ProtocolType::UDP));
NodeContactMethod::SignalHolePunch(relay_nr, udp_target_node_ref)
}
ContactMethod::InboundRelay(relay_key) => {
let relay_nr = routing_table
.lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)?
.ok_or_else(|| eyre!("couldn't look up relay"))?;
NodeContactMethod::InboundRelay(relay_nr)
}
ContactMethod::OutboundRelay(relay_key) => {
let relay_nr = routing_table
.lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)?
.ok_or_else(|| eyre!("couldn't look up relay"))?;
NodeContactMethod::OutboundRelay(relay_nr)
}
};
// Cache this
self.inner
.lock()
.node_contact_method_cache
.insert(ncm_key, ncm.clone(), |_, _| {});
Ok(ncm)
}
/// Send a reverse connection signal and wait for the return receipt over it
/// Then send the data across the new connection
/// Only usable for PublicInternet routing domain
#[instrument(level = "trace", skip(self, data), err)]
async fn do_reverse_connect(
&self,
relay_nr: NodeRef,
target_nr: NodeRef,
data: Vec<u8>,
) -> EyreResult<NetworkResult<ConnectionDescriptor>> {
// Build a return receipt for the signal
let receipt_timeout = ms_to_us(
self.unlocked_inner
.config
.get()
.network
.reverse_connection_receipt_time_ms,
);
let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?;
// Get target routing domain
let Some(routing_domain) = target_nr.best_routing_domain() else {
return Ok(NetworkResult::no_connection_other("No routing domain for target"));
};
// Get our peer info
let Some(peer_info) = self
.routing_table()
.get_own_peer_info(routing_domain) else {
return Ok(NetworkResult::no_connection_other("Own peer info not available"));
};
// Issue the signal
let rpc = self.rpc_processor();
network_result_try!(rpc
.rpc_call_signal(
Destination::relay(relay_nr, target_nr.clone()),
SignalInfo::ReverseConnect { receipt, peer_info },
)
.await
.wrap_err("failed to send signal")?);
// Wait for the return receipt
let inbound_nr = match eventual_value.await.take_value().unwrap() {
ReceiptEvent::ReturnedPrivate { private_route: _ }
| ReceiptEvent::ReturnedOutOfBand
| ReceiptEvent::ReturnedSafety => {
return Ok(NetworkResult::invalid_message(
"reverse connect receipt should be returned in-band",
));
}
ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef,
ReceiptEvent::Expired => {
return Ok(NetworkResult::timeout());
}
ReceiptEvent::Cancelled => {
return Ok(NetworkResult::no_connection_other(format!(
"reverse connect receipt cancelled from {}",
target_nr
)))
}
};
// We expect the inbound noderef to be the same as the target noderef
// if they aren't the same, we should error on this and figure out what then hell is up
if !target_nr.same_entry(&inbound_nr) {
bail!("unexpected noderef mismatch on reverse connect");
}
// And now use the existing connection to send over
if let Some(descriptor) = inbound_nr.last_connection() {
match self
.net()
.send_data_to_existing_connection(descriptor, data)
.await?
{
None => Ok(NetworkResult::value(descriptor)),
Some(_) => Ok(NetworkResult::no_connection_other(
"unable to send over reverse connection",
)),
}
} else {
bail!("no reverse connection available")
}
}
/// Send a hole punch signal and do a negotiating ping and wait for the return receipt
/// Then send the data across the new connection
/// Only usable for PublicInternet routing domain
#[instrument(level = "trace", skip(self, data), err)]
async fn do_hole_punch(
&self,
relay_nr: NodeRef,
target_nr: NodeRef,
data: Vec<u8>,
) -> EyreResult<NetworkResult<ConnectionDescriptor>> {
// Ensure we are filtered down to UDP (the only hole punch protocol supported today)
assert!(target_nr
.filter_ref()
.map(|nrf| nrf.dial_info_filter.protocol_type_set
== ProtocolTypeSet::only(ProtocolType::UDP))
.unwrap_or_default());
// Build a return receipt for the signal
let receipt_timeout = ms_to_us(
self.unlocked_inner
.config
.get()
.network
.hole_punch_receipt_time_ms,
);
let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?;
// Get target routing domain
let Some(routing_domain) = target_nr.best_routing_domain() else {
return Ok(NetworkResult::no_connection_other("No routing domain for target"));
};
// Get our peer info
let Some(peer_info) = self
.routing_table()
.get_own_peer_info(routing_domain) else {
return Ok(NetworkResult::no_connection_other("Own peer info not available"));
};
// Get the udp direct dialinfo for the hole punch
let hole_punch_did = target_nr
.first_filtered_dial_info_detail()
.ok_or_else(|| eyre!("No hole punch capable dialinfo found for node"))?;
// Do our half of the hole punch by sending an empty packet
// Both sides will do this and then the receipt will get sent over the punched hole
// Don't bother storing the returned connection descriptor as the 'last connection' because the other side of the hole
// punch should come through and create a real 'last connection' for us if this succeeds
network_result_try!(
self.net()
.send_data_to_dial_info(hole_punch_did.dial_info, Vec::new())
.await?
);
// Issue the signal
let rpc = self.rpc_processor();
network_result_try!(rpc
.rpc_call_signal(
Destination::relay(relay_nr, target_nr.clone()),
SignalInfo::HolePunch { receipt, peer_info },
)
.await
.wrap_err("failed to send signal")?);
// Wait for the return receipt
let inbound_nr = match eventual_value.await.take_value().unwrap() {
ReceiptEvent::ReturnedPrivate { private_route: _ }
| ReceiptEvent::ReturnedOutOfBand
| ReceiptEvent::ReturnedSafety => {
return Ok(NetworkResult::invalid_message(
"hole punch receipt should be returned in-band",
));
}
ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef,
ReceiptEvent::Expired => {
return Ok(NetworkResult::timeout());
}
ReceiptEvent::Cancelled => {
return Ok(NetworkResult::no_connection_other(format!(
"hole punch receipt cancelled from {}",
target_nr
)))
}
};
// We expect the inbound noderef to be the same as the target noderef
// if they aren't the same, we should error on this and figure out what then hell is up
if !target_nr.same_entry(&inbound_nr) {
bail!(
"unexpected noderef mismatch on hole punch {}, expected {}",
inbound_nr,
target_nr
);
}
// And now use the existing connection to send over
if let Some(descriptor) = inbound_nr.last_connection() {
match self
.net()
.send_data_to_existing_connection(descriptor, data)
.await?
{
None => Ok(NetworkResult::value(descriptor)),
Some(_) => Ok(NetworkResult::no_connection_other(
"unable to send over hole punch",
)),
}
} else {
bail!("no hole punch available")
}
}
}

View File

@ -0,0 +1,137 @@
use super::*;
// Statistics per address
#[derive(Clone, Default)]
pub struct PerAddressStats {
pub last_seen_ts: Timestamp,
pub transfer_stats_accounting: TransferStatsAccounting,
pub transfer_stats: TransferStatsDownUp,
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct PerAddressStatsKey(IpAddr);
impl Default for PerAddressStatsKey {
fn default() -> Self {
Self(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
}
}
// Statistics about the low-level network
#[derive(Clone)]
pub struct NetworkManagerStats {
pub self_stats: PerAddressStats,
pub per_address_stats: LruCache<PerAddressStatsKey, PerAddressStats>,
}
impl Default for NetworkManagerStats {
fn default() -> Self {
Self {
self_stats: PerAddressStats::default(),
per_address_stats: LruCache::new(IPADDR_TABLE_SIZE),
}
}
}
impl NetworkManager {
// Callbacks from low level network for statistics gathering
pub fn stats_packet_sent(&self, addr: IpAddr, bytes: ByteCount) {
let inner = &mut *self.inner.lock();
inner
.stats
.self_stats
.transfer_stats_accounting
.add_up(bytes);
inner
.stats
.per_address_stats
.entry(PerAddressStatsKey(addr), |_k, _v| {
// do nothing on LRU evict
})
.or_insert(PerAddressStats::default())
.transfer_stats_accounting
.add_up(bytes);
}
pub fn stats_packet_rcvd(&self, addr: IpAddr, bytes: ByteCount) {
let inner = &mut *self.inner.lock();
inner
.stats
.self_stats
.transfer_stats_accounting
.add_down(bytes);
inner
.stats
.per_address_stats
.entry(PerAddressStatsKey(addr), |_k, _v| {
// do nothing on LRU evict
})
.or_insert(PerAddressStats::default())
.transfer_stats_accounting
.add_down(bytes);
}
// Get stats
pub fn get_stats(&self) -> NetworkManagerStats {
let inner = self.inner.lock();
inner.stats.clone()
}
pub fn get_veilid_state(&self) -> VeilidStateNetwork {
let has_state = self
.unlocked_inner
.components
.read()
.as_ref()
.map(|c| c.net.is_started())
.unwrap_or(false);
if !has_state {
return VeilidStateNetwork {
started: false,
bps_down: 0.into(),
bps_up: 0.into(),
peers: Vec::new(),
};
}
let routing_table = self.routing_table();
let (bps_down, bps_up) = {
let inner = self.inner.lock();
(
inner.stats.self_stats.transfer_stats.down.average,
inner.stats.self_stats.transfer_stats.up.average,
)
};
VeilidStateNetwork {
started: true,
bps_down,
bps_up,
peers: {
let mut out = Vec::new();
for (k, v) in routing_table.get_recent_peers() {
if let Ok(Some(nr)) = routing_table.lookup_node_ref(k) {
let peer_stats = nr.peer_stats();
let peer = PeerTableData {
node_ids: nr.node_ids().iter().copied().collect(),
peer_address: v.last_connection.remote().to_string(),
peer_stats,
};
out.push(peer);
}
}
out
},
}
}
pub(super) fn send_network_update(&self) {
let update_cb = self.unlocked_inner.update_callback.read().clone();
if update_cb.is_none() {
return;
}
let state = self.get_veilid_state();
(update_cb.unwrap())(VeilidUpdate::Network(state));
}
}

View File

@ -7,6 +7,7 @@ use super::*;
Eq, Eq,
PartialOrd, PartialOrd,
Ord, Ord,
Hash,
Serialize, Serialize,
Deserialize, Deserialize,
RkyvArchive, RkyvArchive,
@ -61,7 +62,7 @@ impl DialInfoFilter {
pub fn is_dead(&self) -> bool { pub fn is_dead(&self) -> bool {
self.protocol_type_set.is_empty() || self.address_type_set.is_empty() self.protocol_type_set.is_empty() || self.address_type_set.is_empty()
} }
pub fn with_sequencing(mut self, sequencing: Sequencing) -> (bool, DialInfoFilter) { pub fn with_sequencing(self, sequencing: Sequencing) -> (bool, DialInfoFilter) {
// Get first filtered dialinfo // Get first filtered dialinfo
match sequencing { match sequencing {
Sequencing::NoPreference => (false, self), Sequencing::NoPreference => (false, self),

View File

@ -317,7 +317,6 @@ impl BucketEntryInner {
rti, rti,
true, true,
NodeRefFilter::from(routing_domain), NodeRefFilter::from(routing_domain),
false,
); );
!last_connections.is_empty() !last_connections.is_empty()
} }
@ -372,7 +371,6 @@ impl BucketEntryInner {
rti, rti,
true, true,
NodeRefFilter::from(routing_domain_set), NodeRefFilter::from(routing_domain_set),
false
); );
for lc in last_connections { for lc in last_connections {
if let Some(rd) = if let Some(rd) =
@ -415,7 +413,6 @@ impl BucketEntryInner {
rti: &RoutingTableInner, rti: &RoutingTableInner,
only_live: bool, only_live: bool,
filter: NodeRefFilter, filter: NodeRefFilter,
ordered: bool,
) -> Vec<(ConnectionDescriptor, Timestamp)> { ) -> Vec<(ConnectionDescriptor, Timestamp)> {
let connection_manager = let connection_manager =
rti.unlocked_inner.network_manager.connection_manager(); rti.unlocked_inner.network_manager.connection_manager();
@ -461,14 +458,8 @@ impl BucketEntryInner {
} }
}) })
.collect(); .collect();
// Sort with ordering preference first and then sort with newest timestamps // Sort with newest timestamps
out.sort_by(|a, b| { out.sort_by(|a, b| {
if ordered {
let s = ProtocolType::ordered_sequencing_sort(a.0.protocol_type(), b.0.protocol_type());
if s != core::cmp::Ordering::Equal {
return s;
}
}
b.1.cmp(&a.1) b.1.cmp(&a.1)
}); });
out out

View File

@ -277,15 +277,22 @@ pub trait NodeRefBase: Sized {
out out
} }
/// Get the most recent 'last connection' to this node
/// Filtered first and then sorted by ordering preference and then by most recent
fn last_connection(&self) -> Option<ConnectionDescriptor> { fn last_connection(&self) -> Option<ConnectionDescriptor> {
// Get the last connections and the last time we saw anything with this connection
// Filtered first and then sorted by sequencing and then by most recent
self.operate(|rti, e| { self.operate(|rti, e| {
// apply sequencing to filter and get sort // apply sequencing to filter and get sort
let sequencing = self.common().sequencing; let sequencing = self.common().sequencing;
let filter = self.common().filter.clone().unwrap_or_default(); let filter = self.common().filter.clone().unwrap_or_default();
let (ordered, filter) = filter.with_sequencing(sequencing); let (ordered, filter) = filter.with_sequencing(sequencing);
let last_connections = e.last_connections(rti, true, filter, ordered); let mut last_connections = e.last_connections(rti, true, filter);
if ordered {
last_connections.sort_by(|a, b| {
ProtocolType::ordered_sequencing_sort(a.0.protocol_type(), b.0.protocol_type())
});
}
last_connections.first().map(|x| x.0) last_connections.first().map(|x| x.0)
}) })
} }

View File

@ -1,6 +1,6 @@
use super::*; use super::*;
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct NodeRefFilter { pub struct NodeRefFilter {
pub routing_domain_set: RoutingDomainSet, pub routing_domain_set: RoutingDomainSet,
pub dial_info_filter: DialInfoFilter, pub dial_info_filter: DialInfoFilter,

View File

@ -529,6 +529,7 @@ impl RPCProcessor {
async fn wait_for_reply( async fn wait_for_reply(
&self, &self,
waitable_reply: WaitableReply, waitable_reply: WaitableReply,
debug_string: String,
) -> Result<TimeoutOr<(RPCMessage, TimestampDuration)>, RPCError> { ) -> Result<TimeoutOr<(RPCMessage, TimestampDuration)>, RPCError> {
let out = self let out = self
.unlocked_inner .unlocked_inner
@ -536,7 +537,20 @@ impl RPCProcessor {
.wait_for_op(waitable_reply.handle, waitable_reply.timeout_us) .wait_for_op(waitable_reply.handle, waitable_reply.timeout_us)
.await; .await;
match &out { match &out {
Err(_) | Ok(TimeoutOr::Timeout) => { Err(e) => {
let msg = format!("RPC Lost ({}): {}", debug_string, e);
log_rpc!(debug "{}", msg.bright_magenta());
self.record_question_lost(
waitable_reply.send_ts,
waitable_reply.node_ref.clone(),
waitable_reply.safety_route,
waitable_reply.remote_private_route,
waitable_reply.reply_private_route,
);
}
Ok(TimeoutOr::Timeout) => {
let msg = format!("RPC Lost ({}): Timeout", debug_string);
log_rpc!(debug "{}", msg.bright_cyan());
self.record_question_lost( self.record_question_lost(
waitable_reply.send_ts, waitable_reply.send_ts,
waitable_reply.node_ref.clone(), waitable_reply.node_ref.clone(),
@ -878,8 +892,6 @@ impl RPCProcessor {
) { ) {
// Record for node if this was not sent via a route // Record for node if this was not sent via a route
if safety_route.is_none() && remote_private_route.is_none() { if safety_route.is_none() && remote_private_route.is_none() {
log_rpc!(debug "RPC Question Lost: {:?}", node_ref);
node_ref.stats_question_lost(); node_ref.stats_question_lost();
// Also clear the last_connections for the entry so we make a new connection next time // Also clear the last_connections for the entry so we make a new connection next time

View File

@ -9,6 +9,8 @@ impl RPCProcessor {
dest: Destination, dest: Destination,
message: Vec<u8>, message: Vec<u8>,
) -> Result<NetworkResult<Answer<Vec<u8>>>, RPCError> { ) -> Result<NetworkResult<Answer<Vec<u8>>>, RPCError> {
let debug_string = format!("AppCall(message(len)={}) => {}", message.len(), dest);
let app_call_q = RPCOperationAppCallQ::new(message)?; let app_call_q = RPCOperationAppCallQ::new(message)?;
let question = RPCQuestion::new( let question = RPCQuestion::new(
network_result_try!(self.get_destination_respond_to(&dest)?), network_result_try!(self.get_destination_respond_to(&dest)?),
@ -19,7 +21,7 @@ impl RPCProcessor {
let waitable_reply = network_result_try!(self.question(dest, question, None).await?); let waitable_reply = network_result_try!(self.question(dest, question, None).await?);
// Wait for reply // Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply).await? { let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {
TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), TimeoutOr::Timeout => return Ok(NetworkResult::Timeout),
TimeoutOr::Value(v) => v, TimeoutOr::Value(v) => v,
}; };

View File

@ -32,11 +32,13 @@ impl RPCProcessor {
find_node_q_detail, find_node_q_detail,
); );
let debug_string = format!("FindNode(node_id={}) => {}", node_id, dest);
// Send the find_node request // Send the find_node request
let waitable_reply = network_result_try!(self.question(dest, find_node_q, None).await?); let waitable_reply = network_result_try!(self.question(dest, find_node_q, None).await?);
// Wait for reply // Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply).await? { let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {
TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), TimeoutOr::Timeout => return Ok(NetworkResult::Timeout),
TimeoutOr::Value(v) => v, TimeoutOr::Value(v) => v,
}; };

View File

@ -39,6 +39,18 @@ impl RPCProcessor {
return Err(RPCError::internal("No node id for crypto kind")); return Err(RPCError::internal("No node id for crypto kind"));
}; };
let debug_string = format!(
"GetValue(key={} subkey={} last_descriptor={}) => {}",
key,
subkey,
if last_descriptor.is_some() {
"Some"
} else {
"None"
},
dest
);
// Send the getvalue question // Send the getvalue question
let get_value_q = RPCOperationGetValueQ::new(key, subkey, last_descriptor.is_none()); let get_value_q = RPCOperationGetValueQ::new(key, subkey, last_descriptor.is_none());
let question = RPCQuestion::new( let question = RPCQuestion::new(
@ -58,7 +70,7 @@ impl RPCProcessor {
); );
// Wait for reply // Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply).await? { let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {
TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), TimeoutOr::Timeout => return Ok(NetworkResult::Timeout),
TimeoutOr::Value(v) => v, TimeoutOr::Value(v) => v,
}; };

View File

@ -39,6 +39,16 @@ impl RPCProcessor {
return Err(RPCError::internal("No node id for crypto kind")); return Err(RPCError::internal("No node id for crypto kind"));
}; };
let debug_string = format!(
"SetValue(key={} subkey={} value_data(writer)={} value_data(len)={} send_descriptor={}) => {}",
key,
subkey,
value.value_data().writer(),
value.value_data().data().len(),
send_descriptor,
dest
);
// Send the setvalue question // Send the setvalue question
let set_value_q = RPCOperationSetValueQ::new( let set_value_q = RPCOperationSetValueQ::new(
key, key,
@ -59,13 +69,14 @@ impl RPCProcessor {
subkey, subkey,
vcrypto: vcrypto.clone(), vcrypto: vcrypto.clone(),
}); });
let waitable_reply = network_result_try!( let waitable_reply = network_result_try!(
self.question(dest, question, Some(question_context)) self.question(dest, question, Some(question_context))
.await? .await?
); );
// Wait for reply // Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply).await? { let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {
TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), TimeoutOr::Timeout => return Ok(NetworkResult::Timeout),
TimeoutOr::Value(v) => v, TimeoutOr::Value(v) => v,
}; };

View File

@ -101,6 +101,8 @@ impl RPCProcessor {
RPCQuestionDetail::StatusQ(status_q), RPCQuestionDetail::StatusQ(status_q),
); );
let debug_string = format!("Status => {}", dest);
// Send the info request // Send the info request
let waitable_reply = let waitable_reply =
network_result_try!(self.question(dest.clone(), question, None).await?); network_result_try!(self.question(dest.clone(), question, None).await?);
@ -109,7 +111,7 @@ impl RPCProcessor {
let send_data_kind = waitable_reply.send_data_kind; let send_data_kind = waitable_reply.send_data_kind;
// Wait for reply // Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply).await? { let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {
TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), TimeoutOr::Timeout => return Ok(NetworkResult::Timeout),
TimeoutOr::Value(v) => v, TimeoutOr::Value(v) => v,
}; };