veilid/veilid-core/src/network_manager.rs

1387 lines
52 KiB
Rust
Raw Normal View History

2021-11-22 16:28:30 +00:00
use crate::*;
use connection_manager::*;
2021-11-22 16:28:30 +00:00
use dht::*;
2022-03-20 14:52:03 +00:00
use hashlink::LruCache;
2021-11-22 16:28:30 +00:00
use intf::*;
use receipt_manager::*;
use routing_table::*;
2022-04-17 17:28:39 +00:00
use rpc_processor::*;
2021-11-22 16:28:30 +00:00
use xx::*;
////////////////////////////////////////////////////////////////////////////////////////
2022-04-07 13:55:09 +00:00
pub const RELAY_MANAGEMENT_INTERVAL_SECS: u32 = 1;
2021-11-22 16:28:30 +00:00
pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE;
2022-03-19 22:19:40 +00:00
pub const IPADDR_TABLE_SIZE: usize = 1024;
pub const IPADDR_MAX_INACTIVE_DURATION_US: u64 = 300_000_000u64; // 5 minutes
2022-04-23 01:30:09 +00:00
pub const GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT: usize = 3;
2021-11-22 16:28:30 +00:00
2021-12-22 03:20:55 +00:00
#[derive(Copy, Clone, Debug, Default)]
pub struct ProtocolConfig {
2022-04-16 15:18:54 +00:00
pub outbound: ProtocolSet,
pub inbound: ProtocolSet,
2021-12-24 01:34:52 +00:00
}
2021-11-22 16:28:30 +00:00
// Things we get when we start up and go away when we shut down
// Routing table is not in here because we want it to survive a network shutdown/startup restart
#[derive(Clone)]
struct NetworkComponents {
net: Network,
connection_manager: ConnectionManager,
2021-11-22 16:28:30 +00:00
rpc_processor: RPCProcessor,
receipt_manager: ReceiptManager,
}
2022-03-19 22:19:40 +00:00
// Statistics per address
#[derive(Clone, Default)]
pub struct PerAddressStats {
last_seen_ts: u64,
transfer_stats_accounting: TransferStatsAccounting,
transfer_stats: TransferStatsDownUp,
}
2022-03-20 14:52:03 +00:00
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct PerAddressStatsKey(IpAddr);
impl Default for PerAddressStatsKey {
fn default() -> Self {
Self(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
}
}
2022-03-19 22:19:40 +00:00
// Statistics about the low-level network
2022-03-20 14:52:03 +00:00
#[derive(Clone)]
2022-03-19 22:19:40 +00:00
pub struct NetworkManagerStats {
self_stats: PerAddressStats,
2022-03-20 14:52:03 +00:00
per_address_stats: LruCache<PerAddressStatsKey, PerAddressStats>,
2022-03-19 22:19:40 +00:00
}
2022-03-20 14:52:03 +00:00
impl Default for NetworkManagerStats {
fn default() -> Self {
Self {
self_stats: PerAddressStats::default(),
per_address_stats: LruCache::new(IPADDR_TABLE_SIZE),
}
}
}
struct ClientWhitelistEntry {
last_seen: u64,
}
2022-04-16 15:18:54 +00:00
// Mechanism required to contact another node
2022-04-17 17:28:39 +00:00
enum ContactMethod {
2022-04-21 00:49:16 +00:00
Unreachable, // Node is not reachable by any means
Direct(DialInfo), // Contact the node directly
SignalReverse(NodeRef, NodeRef), // Request via signal the node connect back directly
SignalHolePunch(NodeRef, NodeRef), // Request via signal the node negotiate a hole punch
InboundRelay(NodeRef), // Must use an inbound relay to reach the node
OutboundRelay(NodeRef), // Must use outbound relay to reach the node
}
#[derive(Copy, Clone, Debug)]
pub enum SendDataKind {
2022-04-23 01:30:09 +00:00
LocalDirect,
GlobalDirect,
GlobalIndirect,
2022-04-16 15:18:54 +00:00
}
2021-11-22 16:28:30 +00:00
// The mutable state of the network manager
2022-03-19 22:19:40 +00:00
struct NetworkManagerInner {
2021-11-22 16:28:30 +00:00
routing_table: Option<RoutingTable>,
components: Option<NetworkComponents>,
2022-05-16 15:52:48 +00:00
update_callback: Option<UpdateCallback>,
2022-03-19 22:19:40 +00:00
stats: NetworkManagerStats,
client_whitelist: LruCache<key::DHTKey, ClientWhitelistEntry>,
2022-04-07 13:55:09 +00:00
relay_node: Option<NodeRef>,
public_address_check_cache: LruCache<key::DHTKey, SocketAddress>,
2022-03-19 22:19:40 +00:00
}
struct NetworkManagerUnlockedInner {
// Background processes
rolling_transfers_task: TickTask,
2022-04-07 13:55:09 +00:00
relay_management_task: TickTask,
2021-11-22 16:28:30 +00:00
}
#[derive(Clone)]
pub struct NetworkManager {
config: VeilidConfig,
table_store: TableStore,
crypto: Crypto,
inner: Arc<Mutex<NetworkManagerInner>>,
2022-03-19 22:19:40 +00:00
unlocked_inner: Arc<NetworkManagerUnlockedInner>,
2021-11-22 16:28:30 +00:00
}
impl NetworkManager {
fn new_inner() -> NetworkManagerInner {
NetworkManagerInner {
2021-11-22 16:28:30 +00:00
routing_table: None,
components: None,
2022-05-16 15:52:48 +00:00
update_callback: None,
2022-03-19 22:19:40 +00:00
stats: NetworkManagerStats::default(),
client_whitelist: LruCache::new_unbounded(),
2022-04-07 13:55:09 +00:00
relay_node: None,
public_address_check_cache: LruCache::new(8),
2022-03-19 22:19:40 +00:00
}
}
fn new_unlocked_inner(_config: VeilidConfig) -> NetworkManagerUnlockedInner {
//let c = config.get();
NetworkManagerUnlockedInner {
rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS),
2022-04-07 13:55:09 +00:00
relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS),
}
2021-11-22 16:28:30 +00:00
}
pub fn new(config: VeilidConfig, table_store: TableStore, crypto: Crypto) -> Self {
2022-03-19 22:19:40 +00:00
let this = Self {
config: config.clone(),
table_store,
crypto,
2021-11-22 16:28:30 +00:00
inner: Arc::new(Mutex::new(Self::new_inner())),
2022-03-19 22:19:40 +00:00
unlocked_inner: Arc::new(Self::new_unlocked_inner(config)),
};
// Set rolling transfers tick task
{
let this2 = this.clone();
this.unlocked_inner
.rolling_transfers_task
.set_routine(move |l, t| {
Box::pin(this2.clone().rolling_transfers_task_routine(l, t))
});
2021-11-22 16:28:30 +00:00
}
2022-04-07 13:55:09 +00:00
// Set relay management tick task
{
let this2 = this.clone();
this.unlocked_inner
.relay_management_task
.set_routine(move |l, t| {
Box::pin(this2.clone().relay_management_task_routine(l, t))
});
}
2022-03-19 22:19:40 +00:00
this
2021-11-22 16:28:30 +00:00
}
pub fn config(&self) -> VeilidConfig {
self.config.clone()
}
pub fn table_store(&self) -> TableStore {
self.table_store.clone()
}
pub fn crypto(&self) -> Crypto {
self.crypto.clone()
}
pub fn routing_table(&self) -> RoutingTable {
self.inner.lock().routing_table.as_ref().unwrap().clone()
}
pub fn net(&self) -> Network {
self.inner.lock().components.as_ref().unwrap().net.clone()
}
pub fn rpc_processor(&self) -> RPCProcessor {
self.inner
.lock()
.components
.as_ref()
.unwrap()
.rpc_processor
.clone()
}
pub fn receipt_manager(&self) -> ReceiptManager {
self.inner
.lock()
.components
.as_ref()
.unwrap()
.receipt_manager
.clone()
}
pub fn connection_manager(&self) -> ConnectionManager {
2021-11-22 16:28:30 +00:00
self.inner
.lock()
.components
.as_ref()
.unwrap()
.connection_manager
2021-11-22 16:28:30 +00:00
.clone()
}
2022-04-07 13:55:09 +00:00
pub fn relay_node(&self) -> Option<NodeRef> {
self.inner.lock().relay_node.clone()
}
2022-05-16 15:52:48 +00:00
pub async fn init(&self, update_callback: UpdateCallback) -> Result<(), String> {
2021-11-22 16:28:30 +00:00
let routing_table = RoutingTable::new(self.clone());
routing_table.init().await?;
self.inner.lock().routing_table = Some(routing_table.clone());
2022-05-16 15:52:48 +00:00
self.inner.lock().update_callback = Some(update_callback);
2021-11-22 16:28:30 +00:00
Ok(())
}
pub async fn terminate(&self) {
2022-02-07 02:18:42 +00:00
let routing_table = {
let mut inner = self.inner.lock();
inner.routing_table.take()
};
if let Some(routing_table) = routing_table {
2021-11-22 16:28:30 +00:00
routing_table.terminate().await;
}
2022-05-16 15:52:48 +00:00
self.inner.lock().update_callback = None;
2021-11-22 16:28:30 +00:00
}
pub async fn internal_startup(&self) -> Result<(), String> {
trace!("NetworkManager::internal_startup begin");
if self.inner.lock().components.is_some() {
debug!("NetworkManager::internal_startup already started");
return Ok(());
}
// Create network components
let net = Network::new(self.clone());
let connection_manager = ConnectionManager::new(self.clone());
2021-11-22 16:28:30 +00:00
let rpc_processor = RPCProcessor::new(self.clone());
let receipt_manager = ReceiptManager::new(self.clone());
self.inner.lock().components = Some(NetworkComponents {
net: net.clone(),
connection_manager: connection_manager.clone(),
2021-11-22 16:28:30 +00:00
rpc_processor: rpc_processor.clone(),
receipt_manager: receipt_manager.clone(),
});
// Start network components
rpc_processor.startup().await?;
receipt_manager.startup().await?;
net.startup().await?;
connection_manager.startup().await;
2021-11-22 16:28:30 +00:00
trace!("NetworkManager::internal_startup end");
Ok(())
}
pub async fn startup(&self) -> Result<(), String> {
if let Err(e) = self.internal_startup().await {
self.shutdown().await;
return Err(e);
}
2022-05-16 15:52:48 +00:00
self.send_network_update();
2021-11-22 16:28:30 +00:00
Ok(())
}
pub async fn shutdown(&self) {
trace!("NetworkManager::shutdown begin");
// Shutdown network components if they started up
let components = self.inner.lock().components.clone();
2021-11-22 16:28:30 +00:00
if let Some(components) = components {
components.connection_manager.shutdown().await;
2021-11-22 16:28:30 +00:00
components.net.shutdown().await;
components.receipt_manager.shutdown().await;
components.rpc_processor.shutdown().await;
}
// reset the state
2022-05-16 15:52:48 +00:00
{
let mut inner = self.inner.lock();
inner.components = None;
}
// send update
self.send_network_update();
2021-11-22 16:28:30 +00:00
trace!("NetworkManager::shutdown end");
}
pub fn update_client_whitelist(&self, client: key::DHTKey) {
let mut inner = self.inner.lock();
match inner.client_whitelist.entry(client) {
hashlink::lru_cache::Entry::Occupied(mut entry) => {
entry.get_mut().last_seen = intf::get_timestamp()
}
hashlink::lru_cache::Entry::Vacant(entry) => {
entry.insert(ClientWhitelistEntry {
last_seen: intf::get_timestamp(),
});
}
}
}
pub fn check_client_whitelist(&self, client: key::DHTKey) -> bool {
let mut inner = self.inner.lock();
match inner.client_whitelist.entry(client) {
hashlink::lru_cache::Entry::Occupied(mut entry) => {
entry.get_mut().last_seen = intf::get_timestamp();
true
}
hashlink::lru_cache::Entry::Vacant(_) => false,
}
}
pub fn purge_client_whitelist(&self) {
let timeout_ms = self.config.get().network.client_whitelist_timeout_ms;
let mut inner = self.inner.lock();
let cutoff_timestamp = intf::get_timestamp() - ((timeout_ms as u64) * 1000u64);
// Remove clients from the whitelist that haven't been since since our whitelist timeout
while inner
.client_whitelist
.peek_lru()
.map(|v| v.1.last_seen < cutoff_timestamp)
.unwrap_or_default()
{
inner.client_whitelist.remove_lru();
}
}
2021-11-22 16:28:30 +00:00
pub async fn tick(&self) -> Result<(), String> {
let (routing_table, net, receipt_manager) = {
2021-11-22 16:28:30 +00:00
let inner = self.inner.lock();
let components = inner.components.as_ref().unwrap();
(
inner.routing_table.as_ref().unwrap().clone(),
2021-11-22 16:28:30 +00:00
components.net.clone(),
components.receipt_manager.clone(),
)
};
// If the network needs to be reset, do it
// if things can't restart, then we fail out of the attachment manager
if net.needs_restart() {
net.shutdown().await;
2022-05-16 15:52:48 +00:00
self.send_network_update();
2021-11-22 16:28:30 +00:00
net.startup().await?;
2022-05-16 15:52:48 +00:00
self.send_network_update();
2021-11-22 16:28:30 +00:00
}
2022-05-18 14:17:04 +00:00
// Run the rolling transfers task
self.unlocked_inner.rolling_transfers_task.tick().await?;
// Run the relay management task
self.unlocked_inner.relay_management_task.tick().await?;
// Run the routing table tick
routing_table.tick().await?;
2021-11-22 16:28:30 +00:00
// Run the low level network tick
net.tick().await?;
// Run the receipt manager tick
receipt_manager.tick().await?;
// Purge the client whitelist
self.purge_client_whitelist();
2021-11-22 16:28:30 +00:00
Ok(())
}
// Return what network class we are in
2021-12-22 03:20:55 +00:00
pub fn get_network_class(&self) -> Option<NetworkClass> {
2021-11-22 16:28:30 +00:00
if let Some(components) = &self.inner.lock().components {
components.net.get_network_class()
} else {
2021-12-22 03:20:55 +00:00
None
}
}
// Get our node's capabilities
2022-04-08 14:17:09 +00:00
pub fn generate_node_status(&self) -> NodeStatus {
2022-04-25 00:16:13 +00:00
let peer_info = self.routing_table().get_own_peer_info();
2022-05-11 01:49:42 +00:00
let will_route = peer_info.signed_node_info.node_info.can_inbound_relay(); // xxx: eventually this may have more criteria added
let will_tunnel = peer_info.signed_node_info.node_info.can_inbound_relay(); // xxx: we may want to restrict by battery life and network bandwidth at some point
let will_signal = peer_info.signed_node_info.node_info.can_signal();
let will_relay = peer_info.signed_node_info.node_info.can_inbound_relay();
let will_validate_dial_info = peer_info
.signed_node_info
.node_info
.can_validate_dial_info();
2022-04-08 14:17:09 +00:00
NodeStatus {
will_route,
will_tunnel,
will_signal,
will_relay,
will_validate_dial_info,
}
}
2021-12-22 03:20:55 +00:00
// Return what protocols we have enabled
pub fn get_protocol_config(&self) -> Option<ProtocolConfig> {
if let Some(components) = &self.inner.lock().components {
components.net.get_protocol_config()
} else {
None
2021-11-22 16:28:30 +00:00
}
}
// Generates an out-of-band receipt
pub fn generate_receipt<D: AsRef<[u8]>>(
&self,
expiration_us: u64,
expected_returns: u32,
extra_data: D,
callback: impl ReceiptCallback,
) -> Result<Vec<u8>, String> {
let receipt_manager = self.receipt_manager();
let routing_table = self.routing_table();
// Generate receipt and serialized form to return
let nonce = Crypto::get_random_nonce();
let receipt = Receipt::try_new(0, nonce, routing_table.node_id(), extra_data)?;
let out = receipt
.to_signed_data(&routing_table.node_id_secret())
.map_err(|_| "failed to generate signed receipt".to_owned())?;
// Record the receipt for later
let exp_ts = intf::get_timestamp() + expiration_us;
receipt_manager.record_receipt(receipt, exp_ts, expected_returns, callback);
Ok(out)
}
pub fn generate_single_shot_receipt<D: AsRef<[u8]>>(
&self,
expiration_us: u64,
extra_data: D,
) -> Result<(Vec<u8>, EventualValueCloneFuture<ReceiptEvent>), String> {
let receipt_manager = self.receipt_manager();
let routing_table = self.routing_table();
// Generate receipt and serialized form to return
let nonce = Crypto::get_random_nonce();
let receipt = Receipt::try_new(0, nonce, routing_table.node_id(), extra_data)?;
let out = receipt
.to_signed_data(&routing_table.node_id_secret())
.map_err(|_| "failed to generate signed receipt".to_owned())?;
// Record the receipt for later
let exp_ts = intf::get_timestamp() + expiration_us;
2021-11-27 17:44:21 +00:00
let eventual = SingleShotEventual::new(ReceiptEvent::Cancelled);
2021-11-22 16:28:30 +00:00
let instance = eventual.instance();
receipt_manager.record_single_shot_receipt(receipt, exp_ts, eventual);
Ok((out, instance))
}
// Process a received out-of-band receipt
2022-04-17 23:10:10 +00:00
pub async fn handle_out_of_band_receipt<R: AsRef<[u8]>>(
2022-04-17 17:28:39 +00:00
&self,
receipt_data: R,
descriptor: ConnectionDescriptor,
) -> Result<(), String> {
let routing_table = self.routing_table();
let receipt_manager = self.receipt_manager();
let ts = intf::get_timestamp();
let receipt = Receipt::from_signed_data(receipt_data.as_ref())
.map_err(|_| "failed to parse signed receipt".to_owned())?;
// Cache the receipt information in the routing table
let source_noderef = routing_table
.register_node_with_existing_connection(receipt.get_sender_id(), descriptor, ts)
.map_err(|e| format!("node id registration from receipt failed: {}", e))?;
receipt_manager
.handle_receipt(source_noderef, receipt)
.await
}
// Process a received in-band receipt
2022-04-17 23:10:10 +00:00
pub async fn handle_in_band_receipt<R: AsRef<[u8]>>(
2022-04-17 17:28:39 +00:00
&self,
receipt_data: R,
inbound_nr: NodeRef,
) -> Result<(), String> {
2021-11-22 16:28:30 +00:00
let receipt_manager = self.receipt_manager();
2022-04-17 17:28:39 +00:00
2021-11-22 16:28:30 +00:00
let receipt = Receipt::from_signed_data(receipt_data.as_ref())
.map_err(|_| "failed to parse signed receipt".to_owned())?;
2022-04-17 17:28:39 +00:00
receipt_manager.handle_receipt(inbound_nr, receipt).await
2021-11-22 16:28:30 +00:00
}
2022-04-17 23:10:10 +00:00
// Process a received signal
pub async fn handle_signal(&self, signal_info: SignalInfo) -> Result<(), String> {
match signal_info {
SignalInfo::ReverseConnect { receipt, peer_info } => {
let routing_table = self.routing_table();
let rpc = self.rpc_processor();
// Add the peer info to our routing table
2022-05-11 01:49:42 +00:00
let peer_nr = routing_table.register_node_with_signed_node_info(
peer_info.node_id.key,
peer_info.signed_node_info,
)?;
2022-04-17 23:10:10 +00:00
// Make a reverse connection to the peer and send the receipt to it
rpc.rpc_call_return_receipt(Destination::Direct(peer_nr), None, receipt)
.await
.map_err(map_to_string)?;
}
SignalInfo::HolePunch { receipt, peer_info } => {
let routing_table = self.routing_table();
// Add the peer info to our routing table
2022-05-11 01:49:42 +00:00
let mut peer_nr = routing_table.register_node_with_signed_node_info(
peer_info.node_id.key,
peer_info.signed_node_info,
)?;
2022-04-17 23:10:10 +00:00
// Get the udp direct dialinfo for the hole punch
2022-04-21 00:49:16 +00:00
peer_nr.filter_protocols(ProtocolSet::only(ProtocolType::UDP));
2022-04-25 00:16:13 +00:00
let hole_punch_dial_info_detail = peer_nr
.first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet))
2022-04-21 00:49:16 +00:00
.ok_or_else(|| "No hole punch capable dialinfo found for node".to_owned())?;
2022-04-17 23:10:10 +00:00
// Do our half of the hole punch by sending an empty packet
// Both sides will do this and then the receipt will get sent over the punched hole
self.net()
2022-04-25 00:16:13 +00:00
.send_data_to_dial_info(
hole_punch_dial_info_detail.dial_info.clone(),
Vec::new(),
)
2022-04-17 23:10:10 +00:00
.await?;
// XXX: do we need a delay here? or another hole punch packet?
// Return the receipt over the direct channel since we want to use exactly the same dial info
2022-04-25 00:16:13 +00:00
self.send_direct_receipt(hole_punch_dial_info_detail.dial_info, receipt, false)
2022-04-17 23:10:10 +00:00
.await
.map_err(map_to_string)?;
}
}
Ok(())
}
2021-11-22 16:28:30 +00:00
// Builds an envelope for sending over the network
fn build_envelope<B: AsRef<[u8]>>(
&self,
dest_node_id: key::DHTKey,
version: u8,
body: B,
) -> Result<Vec<u8>, String> {
// DH to get encryption key
let routing_table = self.routing_table();
let node_id = routing_table.node_id();
let node_id_secret = routing_table.node_id_secret();
// Get timestamp, nonce
let ts = intf::get_timestamp();
let nonce = Crypto::get_random_nonce();
// Encode envelope
let envelope = Envelope::new(version, ts, nonce, node_id, dest_node_id);
envelope
.to_encrypted_data(self.crypto.clone(), body.as_ref(), &node_id_secret)
.map_err(|_| "envelope failed to encode".to_owned())
}
// Called by the RPC handler when we want to issue an RPC request or response
2022-04-16 15:18:54 +00:00
// node_ref is the direct destination to which the envelope will be sent
// If 'node_id' is specified, it can be different than node_ref.node_id()
// which will cause the envelope to be relayed
2021-11-22 16:28:30 +00:00
pub async fn send_envelope<B: AsRef<[u8]>>(
&self,
node_ref: NodeRef,
2022-04-23 01:30:09 +00:00
envelope_node_id: Option<DHTKey>,
2021-11-22 16:28:30 +00:00
body: B,
2022-04-23 01:30:09 +00:00
) -> Result<SendDataKind, String> {
let via_node_id = node_ref.node_id();
let envelope_node_id = envelope_node_id.unwrap_or(via_node_id);
if envelope_node_id != via_node_id {
log_net!(
"sending envelope to {:?} via {:?}",
envelope_node_id,
node_ref
);
2022-04-16 15:18:54 +00:00
} else {
log_net!("sending envelope to {:?}", node_ref);
}
2021-11-22 16:28:30 +00:00
// Get node's min/max version and see if we can send to it
// and if so, get the max version we can use
let version = if let Some((node_min, node_max)) = node_ref.operate(|e| e.min_max_version())
{
2021-11-27 17:44:21 +00:00
#[allow(clippy::absurd_extreme_comparisons)]
2021-11-22 16:28:30 +00:00
if node_min > MAX_VERSION || node_max < MIN_VERSION {
return Err(format!(
"can't talk to this node {} because version is unsupported: ({},{})",
2022-04-23 01:30:09 +00:00
via_node_id, node_min, node_max
2022-01-05 17:01:02 +00:00
))
.map_err(logthru_rpc!(warn));
2021-11-22 16:28:30 +00:00
}
cmp::min(node_max, MAX_VERSION)
} else {
MAX_VERSION
};
// Build the envelope to send
2022-01-05 17:01:02 +00:00
let out = self
2022-04-23 01:30:09 +00:00
.build_envelope(envelope_node_id, version, body)
2022-01-05 17:01:02 +00:00
.map_err(logthru_rpc!(error))?;
2021-11-22 16:28:30 +00:00
2022-04-16 15:18:54 +00:00
// Send the envelope via whatever means necessary
2022-04-23 01:30:09 +00:00
let send_data_kind = self.send_data(node_ref.clone(), out).await?;
// If we asked to relay from the start, then this is always indirect
if envelope_node_id != via_node_id {
return Ok(SendDataKind::GlobalIndirect);
}
2022-04-25 15:29:02 +00:00
Ok(send_data_kind)
2021-11-22 16:28:30 +00:00
}
// Called by the RPC handler when we want to issue an direct receipt
pub async fn send_direct_receipt<B: AsRef<[u8]>>(
&self,
2022-01-04 19:25:32 +00:00
dial_info: DialInfo,
2021-11-22 16:28:30 +00:00
rcpt_data: B,
alternate_port: bool,
) -> Result<(), String> {
// Validate receipt before we send it, otherwise this may be arbitrary data!
let _ = Receipt::from_signed_data(rcpt_data.as_ref())
.map_err(|_| "failed to validate direct receipt".to_owned())?;
// Send receipt directly
if alternate_port {
self.net()
.send_data_unbound_to_dial_info(dial_info, rcpt_data.as_ref().to_vec())
2021-11-22 16:28:30 +00:00
.await
} else {
self.net()
.send_data_to_dial_info(dial_info, rcpt_data.as_ref().to_vec())
2021-11-22 16:28:30 +00:00
.await
}
}
2022-04-16 15:18:54 +00:00
// Figure out how to reach a node
2022-04-25 15:29:02 +00:00
fn get_contact_method(&self, mut target_node_ref: NodeRef) -> Result<ContactMethod, String> {
2022-04-25 00:16:13 +00:00
let routing_table = self.routing_table();
2022-04-17 17:28:39 +00:00
// Get our network class and protocol config
let our_network_class = self.get_network_class().unwrap_or(NetworkClass::Invalid);
let our_protocol_config = self.get_protocol_config().unwrap();
2022-04-21 00:49:16 +00:00
// Scope noderef down to protocols we can do outbound
2022-04-25 00:16:13 +00:00
if !target_node_ref.filter_protocols(our_protocol_config.outbound) {
2022-04-21 00:49:16 +00:00
return Ok(ContactMethod::Unreachable);
2022-04-16 15:18:54 +00:00
}
2022-04-25 00:16:13 +00:00
// Get the best matching local direct dial info if we have it
2022-04-25 15:29:02 +00:00
let opt_target_local_did =
2022-04-25 00:16:13 +00:00
target_node_ref.first_filtered_dial_info_detail(Some(RoutingDomain::LocalNetwork));
2022-04-25 15:29:02 +00:00
if let Some(target_local_did) = opt_target_local_did {
return Ok(ContactMethod::Direct(target_local_did.dial_info));
2022-04-21 00:49:16 +00:00
}
2022-04-16 15:18:54 +00:00
2022-04-25 00:16:13 +00:00
// Get the best match internet dial info if we have it
2022-04-25 15:29:02 +00:00
let opt_target_public_did =
2022-04-25 00:16:13 +00:00
target_node_ref.first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet));
2022-04-25 15:29:02 +00:00
if let Some(target_public_did) = opt_target_public_did {
2022-04-16 15:18:54 +00:00
// Do we need to signal before going inbound?
2022-04-25 15:29:02 +00:00
if !target_public_did.class.requires_signal() {
// Go direct without signaling
return Ok(ContactMethod::Direct(target_public_did.dial_info));
}
2022-04-25 00:16:13 +00:00
2022-04-25 15:29:02 +00:00
// Get the target's inbound relay, it must have one or it is not reachable
if let Some(inbound_relay_nr) = target_node_ref.relay() {
// Can we reach the inbound relay?
if inbound_relay_nr
.first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet))
.is_some()
{
// Can we receive anything inbound ever?
if matches!(our_network_class, NetworkClass::InboundCapable) {
// Get the best match dial info for an reverse inbound connection
2022-05-11 01:49:42 +00:00
let reverse_dif = DialInfoFilter::global().with_protocol_set(
target_node_ref.outbound_protocols().unwrap_or_default(),
);
2022-04-25 15:29:02 +00:00
if let Some(reverse_did) = routing_table.first_filtered_dial_info_detail(
Some(RoutingDomain::PublicInternet),
&reverse_dif,
) {
// Can we receive a direct reverse connection?
if !reverse_did.class.requires_signal() {
return Ok(ContactMethod::SignalReverse(
inbound_relay_nr,
target_node_ref,
));
2022-04-17 17:28:39 +00:00
}
}
2022-04-25 15:29:02 +00:00
// Does we and the target have outbound protocols to hole-punch?
if our_protocol_config.outbound.contains(ProtocolType::UDP)
&& target_node_ref
.outbound_protocols()
2022-05-11 01:49:42 +00:00
.unwrap_or_default()
2022-04-25 15:29:02 +00:00
.contains(ProtocolType::UDP)
{
// Do the target and self nodes have a direct udp dialinfo
let udp_dif =
DialInfoFilter::global().with_protocol_type(ProtocolType::UDP);
let mut udp_target_nr = target_node_ref.clone();
udp_target_nr.filter_protocols(ProtocolSet::only(ProtocolType::UDP));
let target_has_udp_dialinfo = target_node_ref
.first_filtered_dial_info_detail(Some(
RoutingDomain::PublicInternet,
))
.is_some();
let self_has_udp_dialinfo = routing_table
.first_filtered_dial_info_detail(
Some(RoutingDomain::PublicInternet),
&udp_dif,
)
.is_some();
if target_has_udp_dialinfo && self_has_udp_dialinfo {
return Ok(ContactMethod::SignalHolePunch(
inbound_relay_nr,
udp_target_nr,
));
}
}
// Otherwise we have to inbound relay
2022-04-17 17:28:39 +00:00
}
2022-04-25 15:29:02 +00:00
return Ok(ContactMethod::InboundRelay(inbound_relay_nr));
2022-04-16 15:18:54 +00:00
}
}
2022-04-25 15:29:02 +00:00
}
// If the other node is not inbound capable at all, it is using a full relay
else if let Some(target_inbound_relay_nr) = target_node_ref.relay() {
// Can we reach the full relay?
if target_inbound_relay_nr
.first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet))
.is_some()
{
return Ok(ContactMethod::InboundRelay(target_inbound_relay_nr));
2022-04-17 17:28:39 +00:00
}
}
2022-04-25 15:29:02 +00:00
2022-04-17 17:28:39 +00:00
// If we can't reach the node by other means, try our outbound relay if we have one
if let Some(relay_node) = self.relay_node() {
return Ok(ContactMethod::OutboundRelay(relay_node));
2022-04-16 15:18:54 +00:00
}
2022-04-17 17:28:39 +00:00
// Otherwise, we can't reach this node
2022-05-03 20:43:15 +00:00
debug!(
"unable to reach node {:?}: {}",
target_node_ref,
target_node_ref.operate(|e| format!("{:#?}", e))
);
2022-04-17 17:28:39 +00:00
Ok(ContactMethod::Unreachable)
2022-04-16 15:18:54 +00:00
}
// Send a reverse connection signal and wait for the return receipt over it
// Then send the data across the new connection
pub async fn do_reverse_connect(
&self,
2022-04-17 17:28:39 +00:00
relay_nr: NodeRef,
target_nr: NodeRef,
2022-04-16 15:18:54 +00:00
data: Vec<u8>,
) -> Result<(), String> {
// Build a return receipt for the signal
2022-04-17 17:28:39 +00:00
let receipt_timeout =
ms_to_us(self.config.get().network.reverse_connection_receipt_time_ms);
let (receipt, eventual_value) = self
.generate_single_shot_receipt(receipt_timeout, [])
.map_err(map_to_string)?;
// Get our peer info
let peer_info = self.routing_table().get_own_peer_info();
2022-04-16 15:18:54 +00:00
// Issue the signal
let rpc = self.rpc_processor();
2022-04-17 17:28:39 +00:00
rpc.rpc_call_signal(
Destination::Relay(relay_nr.clone(), target_nr.node_id()),
None,
SignalInfo::ReverseConnect { receipt, peer_info },
)
.await
.map_err(logthru_net!("failed to send signal to {:?}", relay_nr))
.map_err(map_to_string)?;
2022-04-16 15:18:54 +00:00
// Wait for the return receipt
2022-04-17 17:28:39 +00:00
let inbound_nr = match eventual_value.await {
ReceiptEvent::Returned(inbound_nr) => inbound_nr,
2022-04-16 15:18:54 +00:00
ReceiptEvent::Expired => {
2022-04-17 17:28:39 +00:00
return Err(format!(
"reverse connect receipt expired from {:?}",
target_nr
));
2022-04-16 15:18:54 +00:00
}
ReceiptEvent::Cancelled => {
2022-04-17 17:28:39 +00:00
return Err(format!(
"reverse connect receipt cancelled from {:?}",
target_nr
));
2022-04-16 15:18:54 +00:00
}
};
2022-04-17 17:28:39 +00:00
// We expect the inbound noderef to be the same as the target noderef
// if they aren't the same, we should error on this and figure out what then hell is up
if target_nr != inbound_nr {
error!("unexpected noderef mismatch on reverse connect");
}
2022-04-16 15:18:54 +00:00
// And now use the existing connection to send over
2022-04-17 23:10:10 +00:00
if let Some(descriptor) = inbound_nr.last_connection().await {
2022-04-16 15:18:54 +00:00
match self
.net()
.send_data_to_existing_connection(descriptor, data)
.await
.map_err(logthru_net!())?
{
2022-04-17 17:28:39 +00:00
None => Ok(()),
Some(_) => Err("unable to send over reverse connection".to_owned()),
2022-04-16 15:18:54 +00:00
}
2022-04-17 17:28:39 +00:00
} else {
Err("no reverse connection available".to_owned())
2022-04-16 15:18:54 +00:00
}
}
// Send a hole punch signal and do a negotiating ping and wait for the return receipt
// Then send the data across the new connection
2022-04-17 17:28:39 +00:00
pub async fn do_hole_punch(
&self,
relay_nr: NodeRef,
target_nr: NodeRef,
data: Vec<u8>,
) -> Result<(), String> {
2022-04-21 00:49:16 +00:00
// Ensure we are filtered down to UDP (the only hole punch protocol supported today)
assert!(relay_nr
.filter_ref()
.map(|dif| dif.protocol_set == ProtocolSet::only(ProtocolType::UDP))
.unwrap_or_default());
assert!(target_nr
.filter_ref()
.map(|dif| dif.protocol_set == ProtocolSet::only(ProtocolType::UDP))
.unwrap_or_default());
2022-04-17 17:28:39 +00:00
// Build a return receipt for the signal
let receipt_timeout =
ms_to_us(self.config.get().network.reverse_connection_receipt_time_ms);
let (receipt, eventual_value) = self
.generate_single_shot_receipt(receipt_timeout, [])
.map_err(map_to_string)?;
// Get our peer info
let peer_info = self.routing_table().get_own_peer_info();
// Get the udp direct dialinfo for the hole punch
2022-04-25 00:16:13 +00:00
let hole_punch_did = target_nr
.first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet))
2022-04-21 00:49:16 +00:00
.ok_or_else(|| "No hole punch capable dialinfo found for node".to_owned())?;
2022-04-17 17:28:39 +00:00
// Do our half of the hole punch by sending an empty packet
// Both sides will do this and then the receipt will get sent over the punched hole
self.net()
2022-04-25 00:16:13 +00:00
.send_data_to_dial_info(hole_punch_did.dial_info, Vec::new())
2022-04-17 17:28:39 +00:00
.await?;
// Issue the signal
let rpc = self.rpc_processor();
rpc.rpc_call_signal(
Destination::Relay(relay_nr.clone(), target_nr.node_id()),
None,
SignalInfo::HolePunch { receipt, peer_info },
)
.await
.map_err(logthru_net!("failed to send signal to {:?}", relay_nr))
.map_err(map_to_string)?;
// Wait for the return receipt
let inbound_nr = match eventual_value.await {
ReceiptEvent::Returned(inbound_nr) => inbound_nr,
ReceiptEvent::Expired => {
2022-04-25 15:29:02 +00:00
return Err(format!("hole punch receipt expired from {:?}", target_nr));
2022-04-17 17:28:39 +00:00
}
ReceiptEvent::Cancelled => {
2022-04-25 15:29:02 +00:00
return Err(format!("hole punch receipt cancelled from {:?}", target_nr));
2022-04-17 17:28:39 +00:00
}
};
// We expect the inbound noderef to be the same as the target noderef
// if they aren't the same, we should error on this and figure out what then hell is up
if target_nr != inbound_nr {
2022-04-25 15:29:02 +00:00
error!("unexpected noderef mismatch on hole punch");
2022-04-17 17:28:39 +00:00
}
// And now use the existing connection to send over
2022-04-17 23:10:10 +00:00
if let Some(descriptor) = inbound_nr.last_connection().await {
2022-04-17 17:28:39 +00:00
match self
.net()
.send_data_to_existing_connection(descriptor, data)
2022-04-16 15:18:54 +00:00
.await
2022-04-17 17:28:39 +00:00
.map_err(logthru_net!())?
{
None => Ok(()),
2022-04-25 15:29:02 +00:00
Some(_) => Err("unable to send over hole punch".to_owned()),
2022-04-17 17:28:39 +00:00
}
2022-04-16 15:18:54 +00:00
} else {
2022-04-25 15:29:02 +00:00
Err("no hole punch available".to_owned())
2022-04-16 15:18:54 +00:00
}
}
// Send raw data to a node
2022-04-17 17:28:39 +00:00
//
2022-04-16 15:18:54 +00:00
// We may not have dial info for a node, but have an existing connection for it
// because an inbound connection happened first, and no FindNodeQ has happened to that
// node yet to discover its dial info. The existing connection should be tried first
// in this case.
//
// Sending to a node requires determining a NetworkClass compatible mechanism
//
2022-04-17 17:28:39 +00:00
pub fn send_data(
&self,
node_ref: NodeRef,
data: Vec<u8>,
2022-04-23 01:30:09 +00:00
) -> SystemPinBoxFuture<Result<SendDataKind, String>> {
2022-04-16 15:18:54 +00:00
let this = self.clone();
Box::pin(async move {
// First try to send data to the last socket we've seen this peer on
2022-04-17 23:10:10 +00:00
let data = if let Some(descriptor) = node_ref.last_connection().await {
2022-04-16 15:18:54 +00:00
match this
.net()
.send_data_to_existing_connection(descriptor, data)
.await
.map_err(logthru_net!())?
{
None => {
2022-04-23 01:30:09 +00:00
return Ok(if descriptor.matches_peer_scope(PeerScope::Local) {
SendDataKind::LocalDirect
} else {
SendDataKind::GlobalDirect
});
2022-04-16 15:18:54 +00:00
}
Some(d) => d,
}
} else {
data
};
// If we don't have last_connection, try to reach out to the peer via its dial info
2022-04-21 00:49:16 +00:00
match this.get_contact_method(node_ref).map_err(logthru_net!())? {
2022-04-17 17:28:39 +00:00
ContactMethod::OutboundRelay(relay_nr) | ContactMethod::InboundRelay(relay_nr) => {
2022-04-23 01:30:09 +00:00
this.send_data(relay_nr, data)
2022-04-21 00:49:16 +00:00
.await
2022-04-23 01:30:09 +00:00
.map(|_| SendDataKind::GlobalIndirect)
2022-04-17 17:28:39 +00:00
}
2022-04-25 15:29:02 +00:00
ContactMethod::Direct(dial_info) => {
let send_data_kind = if dial_info.is_local() {
SendDataKind::LocalDirect
} else {
SendDataKind::GlobalDirect
};
this.net()
.send_data_to_dial_info(dial_info, data)
.await
.map(|_| send_data_kind)
}
2022-04-23 01:30:09 +00:00
ContactMethod::SignalReverse(relay_nr, target_node_ref) => this
.do_reverse_connect(relay_nr, target_node_ref, data)
.await
.map(|_| SendDataKind::GlobalDirect),
ContactMethod::SignalHolePunch(relay_nr, target_node_ref) => this
.do_hole_punch(relay_nr, target_node_ref, data)
.await
.map(|_| SendDataKind::GlobalDirect),
ContactMethod::Unreachable => Err("Can't send to this node".to_owned()),
2022-04-16 15:18:54 +00:00
}
2022-04-17 17:28:39 +00:00
.map_err(logthru_net!())
2022-04-16 15:18:54 +00:00
})
}
2021-11-22 16:28:30 +00:00
// Called when a packet potentially containing an RPC envelope is received by a low-level
// network protocol handler. Processes the envelope, authenticates and decrypts the RPC message
// and passes it to the RPC handler
pub async fn on_recv_envelope(
&self,
data: &[u8],
2022-01-04 14:53:30 +00:00
descriptor: ConnectionDescriptor,
2021-12-17 02:57:28 +00:00
) -> Result<bool, String> {
2022-01-05 17:01:02 +00:00
log_net!(
"envelope of {} bytes received from {:?}",
data.len(),
descriptor
);
2022-03-20 14:52:03 +00:00
// Network accounting
self.stats_packet_rcvd(descriptor.remote.to_socket_addr().ip(), data.len() as u64);
2021-11-22 16:28:30 +00:00
// Is this an out-of-band receipt instead of an envelope?
if data[0..4] == *RECEIPT_MAGIC {
2022-04-17 23:10:10 +00:00
self.handle_out_of_band_receipt(data, descriptor).await?;
2021-11-22 16:28:30 +00:00
return Ok(true);
}
// Decode envelope header (may fail signature validation)
2021-12-17 02:57:28 +00:00
let envelope =
Envelope::from_signed_data(data).map_err(|_| "envelope failed to decode".to_owned())?;
2021-11-22 16:28:30 +00:00
// Get routing table and rpc processor
let (routing_table, rpc) = {
2021-11-22 16:28:30 +00:00
let inner = self.inner.lock();
(
inner.routing_table.as_ref().unwrap().clone(),
inner.components.as_ref().unwrap().rpc_processor.clone(),
)
};
// Get timestamp range
let (tsbehind, tsahead) = {
let c = self.config.get();
(
2022-01-27 14:53:01 +00:00
c.network.rpc.max_timestamp_behind_ms.map(ms_to_us),
c.network.rpc.max_timestamp_ahead_ms.map(ms_to_us),
2021-11-22 16:28:30 +00:00
)
};
// Validate timestamp isn't too old
let ts = intf::get_timestamp();
let ets = envelope.get_timestamp();
if let Some(tsbehind) = tsbehind {
if tsbehind > 0 && (ts > ets && ts - ets > tsbehind) {
2021-12-17 02:57:28 +00:00
return Err(format!(
2021-11-22 16:28:30 +00:00
"envelope time was too far in the past: {}ms ",
timestamp_to_secs(ts - ets) * 1000f64
2021-12-17 02:57:28 +00:00
));
2021-11-22 16:28:30 +00:00
}
}
if let Some(tsahead) = tsahead {
if tsahead > 0 && (ts < ets && ets - ts > tsahead) {
2021-12-17 02:57:28 +00:00
return Err(format!(
2021-11-22 16:28:30 +00:00
"envelope time was too far in the future: {}ms",
timestamp_to_secs(ets - ts) * 1000f64
2021-12-17 02:57:28 +00:00
));
2021-11-22 16:28:30 +00:00
}
}
// Peek at header and see if we need to relay this
// If the recipient id is not our node id, then it needs relaying
let sender_id = envelope.get_sender_id();
let recipient_id = envelope.get_recipient_id();
if recipient_id != routing_table.node_id() {
// See if the source node is allowed to resolve nodes
// This is a costly operation, so only outbound-relay permitted
// nodes are allowed to do this, for example PWA users
let relay_nr = if self.check_client_whitelist(sender_id) {
2022-04-16 15:18:54 +00:00
// Full relay allowed, do a full resolve_node
rpc.resolve_node(recipient_id).await.map_err(|e| {
format!(
"failed to resolve recipient node for relay, dropping outbound relayed packet...: {:?}",
e
)
}).map_err(logthru_net!())?
} else {
// If this is not a node in the client whitelist, only allow inbound relay
// which only performs a lightweight lookup before passing the packet back out
// See if we have the node in our routing table
// We should, because relays are chosen by nodes that have established connectivity and
// should be mutually in each others routing tables. The node needing the relay will be
// pinging this node regularly to keep itself in the routing table
2022-04-16 15:18:54 +00:00
routing_table.lookup_node_ref(recipient_id).ok_or_else(|| {
format!(
"Inbound relay asked for recipient not in routing table: {}",
recipient_id
2022-04-16 15:18:54 +00:00
)
})?
};
2022-04-16 15:18:54 +00:00
// Relay the packet to the desired destination
self.send_data(relay_nr, data.to_vec())
.await
.map_err(|e| format!("failed to forward envelope: {}", e))?;
// Inform caller that we dealt with the envelope, but did not process it locally
return Ok(false);
}
// DH to get decryption key (cached)
let node_id_secret = routing_table.node_id_secret();
// Decrypt the envelope body
// xxx: punish nodes that send messages that fail to decrypt eventually
let body = envelope
.decrypt_body(self.crypto(), data, &node_id_secret)
.map_err(|_| "failed to decrypt envelope body".to_owned())?;
2021-11-22 16:28:30 +00:00
// Cache the envelope information in the routing table
let source_noderef = routing_table
2022-01-04 14:53:30 +00:00
.register_node_with_existing_connection(envelope.get_sender_id(), descriptor, ts)
2021-12-17 02:57:28 +00:00
.map_err(|e| format!("node id registration failed: {}", e))?;
2021-11-22 16:28:30 +00:00
source_noderef.operate(|e| e.set_min_max_version(envelope.get_min_max_version()));
// xxx: deal with spoofing and flooding here?
// Pass message to RPC system
rpc.enqueue_message(envelope, body, source_noderef)
2021-12-17 02:57:28 +00:00
.map_err(|e| format!("enqueing rpc message failed: {}", e))?;
2021-11-22 16:28:30 +00:00
// Inform caller that we dealt with the envelope locally
Ok(true)
}
2022-03-19 22:19:40 +00:00
2022-04-07 13:55:09 +00:00
// Keep relays assigned and accessible
2022-04-16 15:18:54 +00:00
async fn relay_management_task_routine(self, _last_ts: u64, cur_ts: u64) -> Result<(), String> {
2022-04-07 13:55:09 +00:00
log_net!("--- network manager relay_management task");
2022-04-25 00:16:13 +00:00
// Get our node's current node info and network class and do the right thing
let routing_table = self.routing_table();
2022-05-11 01:49:42 +00:00
let node_info = routing_table.get_own_node_info();
2022-04-07 13:55:09 +00:00
let network_class = self.get_network_class();
// Do we know our network class yet?
if let Some(network_class) = network_class {
// If we already have a relay, see if it is dead, or if we don't need it any more
{
let mut inner = self.inner.lock();
if let Some(relay_node) = inner.relay_node.clone() {
let state = relay_node.operate(|e| e.state(cur_ts));
2022-04-25 00:16:13 +00:00
if matches!(state, BucketEntryState::Dead) || !node_info.requires_relay() {
2022-04-07 13:55:09 +00:00
// Relay node is dead or no longer needed
inner.relay_node = None;
}
}
}
2022-04-25 00:16:13 +00:00
// Do we need a relay?
if node_info.requires_relay() {
// Do we need an outbound relay?
if network_class.outbound_wants_relay() {
// The outbound relay is the host of the PWA
if let Some(outbound_relay_peerinfo) = intf::get_outbound_relay_peer().await {
let mut inner = self.inner.lock();
// Register new outbound relay
2022-05-11 01:49:42 +00:00
let nr = routing_table.register_node_with_signed_node_info(
2022-04-25 00:16:13 +00:00
outbound_relay_peerinfo.node_id.key,
2022-05-11 01:49:42 +00:00
outbound_relay_peerinfo.signed_node_info,
2022-04-25 00:16:13 +00:00
)?;
inner.relay_node = Some(nr);
}
// Otherwise we must need an inbound relay
} else {
// Find a node in our routing table that is an acceptable inbound relay
if let Some(nr) = routing_table.find_inbound_relay(cur_ts) {
let mut inner = self.inner.lock();
inner.relay_node = Some(nr);
}
2022-04-07 13:55:09 +00:00
}
}
}
Ok(())
}
2022-03-19 22:19:40 +00:00
// Compute transfer statistics for the low level network
async fn rolling_transfers_task_routine(self, last_ts: u64, cur_ts: u64) -> Result<(), String> {
log_net!("--- network manager rolling_transfers task");
2022-05-16 15:52:48 +00:00
{
let inner = &mut *self.inner.lock();
// Roll the low level network transfer stats for our address
inner
.stats
.self_stats
.transfer_stats_accounting
.roll_transfers(last_ts, cur_ts, &mut inner.stats.self_stats.transfer_stats);
// Roll all per-address transfers
let mut dead_addrs: HashSet<PerAddressStatsKey> = HashSet::new();
for (addr, stats) in &mut inner.stats.per_address_stats {
stats.transfer_stats_accounting.roll_transfers(
last_ts,
cur_ts,
&mut stats.transfer_stats,
);
// While we're here, lets see if this address has timed out
if cur_ts - stats.last_seen_ts >= IPADDR_MAX_INACTIVE_DURATION_US {
// it's dead, put it in the dead list
dead_addrs.insert(*addr);
}
}
2022-03-19 22:19:40 +00:00
2022-05-16 15:52:48 +00:00
// Remove the dead addresses from our tables
for da in &dead_addrs {
inner.stats.per_address_stats.remove(da);
2022-03-19 22:19:40 +00:00
}
}
2022-05-16 15:52:48 +00:00
// Send update
self.send_network_update();
2022-03-19 22:19:40 +00:00
Ok(())
}
// Callbacks from low level network for statistics gathering
2022-03-20 14:52:03 +00:00
pub fn stats_packet_sent(&self, addr: IpAddr, bytes: u64) {
2022-03-19 22:19:40 +00:00
let inner = &mut *self.inner.lock();
inner
.stats
.self_stats
.transfer_stats_accounting
.add_up(bytes);
inner
.stats
.per_address_stats
2022-03-20 14:52:03 +00:00
.entry(PerAddressStatsKey(addr))
.or_insert(PerAddressStats::default())
2022-03-19 22:19:40 +00:00
.transfer_stats_accounting
.add_up(bytes);
}
2022-03-20 14:52:03 +00:00
pub fn stats_packet_rcvd(&self, addr: IpAddr, bytes: u64) {
2022-03-19 22:19:40 +00:00
let inner = &mut *self.inner.lock();
inner
.stats
.self_stats
.transfer_stats_accounting
.add_down(bytes);
inner
.stats
.per_address_stats
2022-03-20 14:52:03 +00:00
.entry(PerAddressStatsKey(addr))
.or_insert(PerAddressStats::default())
2022-03-19 22:19:40 +00:00
.transfer_stats_accounting
.add_down(bytes);
}
2022-04-23 01:30:09 +00:00
2022-05-16 15:52:48 +00:00
// Get stats
pub fn get_stats(&self) -> NetworkManagerStats {
let inner = self.inner.lock();
inner.stats.clone()
}
fn get_veilid_state_inner(inner: &NetworkManagerInner) -> VeilidStateNetwork {
2022-05-17 20:55:53 +00:00
if inner.components.is_some() && inner.components.as_ref().unwrap().net.is_started() {
2022-05-16 15:52:48 +00:00
VeilidStateNetwork {
started: true,
bps_down: inner.stats.self_stats.transfer_stats.down.average,
bps_up: inner.stats.self_stats.transfer_stats.up.average,
}
} else {
VeilidStateNetwork {
started: false,
bps_down: 0,
bps_up: 0,
}
}
}
pub fn get_veilid_state(&self) -> VeilidStateNetwork {
let inner = self.inner.lock();
Self::get_veilid_state_inner(&*inner)
}
fn send_network_update(&self) {
let (update_cb, state) = {
let inner = self.inner.lock();
let update_cb = inner.update_callback.clone();
if update_cb.is_none() {
return;
}
let state = Self::get_veilid_state_inner(&*inner);
(update_cb.unwrap(), state)
};
update_cb(VeilidUpdate::Network(state));
}
2022-04-23 01:30:09 +00:00
// Determine if a local IP address has changed
// this means we should restart the low level network and and recreate all of our dial info
// Wait until we have received confirmation from N different peers
pub async fn report_local_socket_address(
&self,
_socket_address: SocketAddress,
_reporting_peer: NodeRef,
) {
// XXX: Nothing here yet.
}
// Determine if a global IP address has changed
// this means we should recreate our public dial info if it is not static and rediscover it
// Wait until we have received confirmation from N different peers
pub async fn report_global_socket_address(
&self,
socket_address: SocketAddress,
reporting_peer: NodeRef,
) {
let (net, routing_table) = {
let mut inner = self.inner.lock();
2022-04-23 01:30:09 +00:00
// Store the reported address
inner
.public_address_check_cache
.insert(reporting_peer.node_id(), socket_address);
2022-04-25 00:16:13 +00:00
let net = inner.components.as_ref().unwrap().net.clone();
let routing_table = inner.routing_table.as_ref().unwrap().clone();
(net, routing_table)
};
2022-04-25 00:16:13 +00:00
let network_class = net.get_network_class().unwrap_or(NetworkClass::Invalid);
2022-04-23 01:30:09 +00:00
// Determine if our external address has likely changed
let needs_public_address_detection =
if matches!(network_class, NetworkClass::InboundCapable) {
// Get current external ip/port from registered global dialinfo
let current_addresses: BTreeSet<SocketAddress> = routing_table
.all_filtered_dial_info_details(
Some(RoutingDomain::PublicInternet),
&DialInfoFilter::all(),
)
.iter()
.map(|did| did.dial_info.socket_address())
.collect();
// If we are inbound capable, but start to see inconsistent socket addresses from multiple reporting peers
// then we zap the network class and re-detect it
let inner = self.inner.lock();
let mut inconsistencies = 0;
let mut changed = false;
2022-05-01 15:01:29 +00:00
// Iteration goes from most recent to least recent node/address pair
for (_, a) in &inner.public_address_check_cache {
if !current_addresses.contains(a) {
inconsistencies += 1;
if inconsistencies >= GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT {
changed = true;
break;
}
}
}
changed
} else {
// If we are currently outbound only, we don't have any public dial info
// but if we are starting to see consistent socket address from multiple reporting peers
// then we may be become inbound capable, so zap the network class so we can re-detect it and any public dial info
let inner = self.inner.lock();
let mut consistencies = 0;
let mut consistent = false;
let mut current_address = Option::<SocketAddress>::None;
2022-05-01 15:01:29 +00:00
// Iteration goes from most recent to least recent node/address pair
for (_, a) in &inner.public_address_check_cache {
if let Some(current_address) = current_address {
if current_address == *a {
consistencies += 1;
if consistencies >= GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT {
consistent = true;
break;
}
}
} else {
current_address = Some(*a);
}
}
consistent
};
if needs_public_address_detection {
// Reset the address check cache now so we can start detecting fresh
let mut inner = self.inner.lock();
inner.public_address_check_cache.clear();
2022-04-23 01:30:09 +00:00
// Reset the network class and dial info so we can re-detect it
routing_table.clear_dial_info_details(RoutingDomain::PublicInternet);
2022-04-25 00:16:13 +00:00
net.reset_network_class();
2022-04-23 01:30:09 +00:00
}
}
2021-11-22 16:28:30 +00:00
}