2021-11-22 16:28:30 +00:00
use crate ::* ;
2022-05-31 23:54:52 +00:00
#[ cfg(not(target_arch = " wasm32 " )) ]
mod native ;
#[ cfg(target_arch = " wasm32 " ) ]
mod wasm ;
2022-06-05 00:18:26 +00:00
mod connection_handle ;
2022-05-31 23:54:52 +00:00
mod connection_limits ;
mod connection_manager ;
mod connection_table ;
mod network_connection ;
2022-07-22 17:05:28 +00:00
mod tasks ;
2022-05-31 23:54:52 +00:00
pub mod tests ;
////////////////////////////////////////////////////////////////////////////////////////
pub use network_connection ::* ;
////////////////////////////////////////////////////////////////////////////////////////
2022-06-05 17:23:18 +00:00
use connection_handle ::* ;
2022-06-05 00:18:26 +00:00
use connection_limits ::* ;
2022-01-03 04:49:01 +00:00
use connection_manager ::* ;
2021-11-22 16:28:30 +00:00
use dht ::* ;
2022-07-22 17:05:28 +00:00
use futures_util ::stream ::{ FuturesUnordered , StreamExt } ;
2022-03-20 14:52:03 +00:00
use hashlink ::LruCache ;
2021-11-22 16:28:30 +00:00
use intf ::* ;
2022-05-31 23:54:52 +00:00
#[ cfg(not(target_arch = " wasm32 " )) ]
use native ::* ;
2021-11-22 16:28:30 +00:00
use receipt_manager ::* ;
use routing_table ::* ;
2022-04-17 17:28:39 +00:00
use rpc_processor ::* ;
2022-05-31 23:54:52 +00:00
#[ cfg(target_arch = " wasm32 " ) ]
use wasm ::* ;
2021-11-22 16:28:30 +00:00
use xx ::* ;
////////////////////////////////////////////////////////////////////////////////////////
2022-04-07 13:55:09 +00:00
pub const RELAY_MANAGEMENT_INTERVAL_SECS : u32 = 1 ;
2021-11-22 16:28:30 +00:00
pub const MAX_MESSAGE_SIZE : usize = MAX_ENVELOPE_SIZE ;
2022-03-19 22:19:40 +00:00
pub const IPADDR_TABLE_SIZE : usize = 1024 ;
pub const IPADDR_MAX_INACTIVE_DURATION_US : u64 = 300_000_000 u64 ; // 5 minutes
2022-04-23 01:30:09 +00:00
pub const GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT : usize = 3 ;
2022-06-25 14:57:33 +00:00
pub const BOOT_MAGIC : & [ u8 ; 4 ] = b " BOOT " ;
2021-11-22 16:28:30 +00:00
2022-07-22 17:05:28 +00:00
pub const BOOTSTRAP_TXT_VERSION : u8 = 0 ;
#[ derive(Clone, Debug) ]
pub struct BootstrapRecord {
min_version : u8 ,
max_version : u8 ,
dial_info_details : Vec < DialInfoDetail > ,
}
pub type BootstrapRecordMap = BTreeMap < DHTKey , BootstrapRecord > ;
2021-12-22 03:20:55 +00:00
#[ derive(Copy, Clone, Debug, Default) ]
pub struct ProtocolConfig {
2022-08-02 01:06:31 +00:00
pub outbound : ProtocolTypeSet ,
pub inbound : ProtocolTypeSet ,
pub family_global : AddressTypeSet ,
pub family_local : AddressTypeSet ,
2021-12-24 01:34:52 +00:00
}
2021-11-22 16:28:30 +00:00
// Things we get when we start up and go away when we shut down
// Routing table is not in here because we want it to survive a network shutdown/startup restart
#[ derive(Clone) ]
struct NetworkComponents {
net : Network ,
2022-01-03 04:49:01 +00:00
connection_manager : ConnectionManager ,
2021-11-22 16:28:30 +00:00
rpc_processor : RPCProcessor ,
receipt_manager : ReceiptManager ,
}
2022-03-19 22:19:40 +00:00
// Statistics per address
#[ derive(Clone, Default) ]
pub struct PerAddressStats {
last_seen_ts : u64 ,
transfer_stats_accounting : TransferStatsAccounting ,
transfer_stats : TransferStatsDownUp ,
}
2022-03-20 14:52:03 +00:00
#[ derive(Clone, Copy, PartialEq, Eq, Hash, Debug) ]
pub struct PerAddressStatsKey ( IpAddr ) ;
impl Default for PerAddressStatsKey {
fn default ( ) -> Self {
Self ( IpAddr ::V4 ( Ipv4Addr ::UNSPECIFIED ) )
}
}
2022-03-19 22:19:40 +00:00
// Statistics about the low-level network
2022-03-20 14:52:03 +00:00
#[ derive(Clone) ]
2022-03-19 22:19:40 +00:00
pub struct NetworkManagerStats {
self_stats : PerAddressStats ,
2022-03-20 14:52:03 +00:00
per_address_stats : LruCache < PerAddressStatsKey , PerAddressStats > ,
2022-03-19 22:19:40 +00:00
}
2022-03-20 14:52:03 +00:00
impl Default for NetworkManagerStats {
fn default ( ) -> Self {
Self {
self_stats : PerAddressStats ::default ( ) ,
per_address_stats : LruCache ::new ( IPADDR_TABLE_SIZE ) ,
}
}
}
2022-04-03 16:58:06 +00:00
2022-06-15 19:03:13 +00:00
#[ derive(Debug) ]
2022-04-03 16:58:06 +00:00
struct ClientWhitelistEntry {
2022-05-24 21:13:52 +00:00
last_seen_ts : u64 ,
2022-04-03 16:58:06 +00:00
}
2022-04-16 15:18:54 +00:00
// Mechanism required to contact another node
2022-05-24 21:13:52 +00:00
#[ derive(Clone, Debug) ]
2022-04-17 17:28:39 +00:00
enum ContactMethod {
2022-04-21 00:49:16 +00:00
Unreachable , // Node is not reachable by any means
Direct ( DialInfo ) , // Contact the node directly
SignalReverse ( NodeRef , NodeRef ) , // Request via signal the node connect back directly
SignalHolePunch ( NodeRef , NodeRef ) , // Request via signal the node negotiate a hole punch
InboundRelay ( NodeRef ) , // Must use an inbound relay to reach the node
OutboundRelay ( NodeRef ) , // Must use outbound relay to reach the node
}
#[ derive(Copy, Clone, Debug) ]
pub enum SendDataKind {
2022-04-23 01:30:09 +00:00
LocalDirect ,
GlobalDirect ,
GlobalIndirect ,
2022-04-16 15:18:54 +00:00
}
2021-11-22 16:28:30 +00:00
// The mutable state of the network manager
2022-03-19 22:19:40 +00:00
struct NetworkManagerInner {
2021-11-22 16:28:30 +00:00
routing_table : Option < RoutingTable > ,
components : Option < NetworkComponents > ,
2022-05-16 15:52:48 +00:00
update_callback : Option < UpdateCallback > ,
2022-03-19 22:19:40 +00:00
stats : NetworkManagerStats ,
2022-05-31 23:54:52 +00:00
client_whitelist : LruCache < DHTKey , ClientWhitelistEntry > ,
2022-04-07 13:55:09 +00:00
relay_node : Option < NodeRef > ,
2022-05-31 23:54:52 +00:00
public_address_check_cache : LruCache < DHTKey , SocketAddress > ,
2022-08-02 01:06:31 +00:00
protocol_config : Option < ProtocolConfig > ,
public_inbound_dial_info_filter : Option < DialInfoFilter > ,
local_inbound_dial_info_filter : Option < DialInfoFilter > ,
public_outbound_dial_info_filter : Option < DialInfoFilter > ,
local_outbound_dial_info_filter : Option < DialInfoFilter > ,
2022-03-19 22:19:40 +00:00
}
struct NetworkManagerUnlockedInner {
// Background processes
2022-07-10 21:36:50 +00:00
rolling_transfers_task : TickTask < EyreReport > ,
relay_management_task : TickTask < EyreReport > ,
2022-07-22 17:05:28 +00:00
bootstrap_task : TickTask < EyreReport > ,
peer_minimum_refresh_task : TickTask < EyreReport > ,
ping_validator_task : TickTask < EyreReport > ,
node_info_update_single_future : MustJoinSingleFuture < ( ) > ,
2021-11-22 16:28:30 +00:00
}
#[ derive(Clone) ]
pub struct NetworkManager {
config : VeilidConfig ,
table_store : TableStore ,
crypto : Crypto ,
inner : Arc < Mutex < NetworkManagerInner > > ,
2022-03-19 22:19:40 +00:00
unlocked_inner : Arc < NetworkManagerUnlockedInner > ,
2021-11-22 16:28:30 +00:00
}
impl NetworkManager {
fn new_inner ( ) -> NetworkManagerInner {
2021-11-26 14:54:38 +00:00
NetworkManagerInner {
2021-11-22 16:28:30 +00:00
routing_table : None ,
components : None ,
2022-05-16 15:52:48 +00:00
update_callback : None ,
2022-03-19 22:19:40 +00:00
stats : NetworkManagerStats ::default ( ) ,
2022-04-03 16:58:06 +00:00
client_whitelist : LruCache ::new_unbounded ( ) ,
2022-04-07 13:55:09 +00:00
relay_node : None ,
2022-04-26 13:16:48 +00:00
public_address_check_cache : LruCache ::new ( 8 ) ,
2022-08-02 01:06:31 +00:00
protocol_config : None ,
public_inbound_dial_info_filter : None ,
local_inbound_dial_info_filter : None ,
public_outbound_dial_info_filter : None ,
local_outbound_dial_info_filter : None ,
2022-03-19 22:19:40 +00:00
}
}
2022-07-22 17:05:28 +00:00
fn new_unlocked_inner ( config : VeilidConfig ) -> NetworkManagerUnlockedInner {
let c = config . get ( ) ;
2022-03-19 22:19:40 +00:00
NetworkManagerUnlockedInner {
rolling_transfers_task : TickTask ::new ( ROLLING_TRANSFERS_INTERVAL_SECS ) ,
2022-04-07 13:55:09 +00:00
relay_management_task : TickTask ::new ( RELAY_MANAGEMENT_INTERVAL_SECS ) ,
2022-07-22 17:05:28 +00:00
bootstrap_task : TickTask ::new ( 1 ) ,
peer_minimum_refresh_task : TickTask ::new_ms ( c . network . dht . min_peer_refresh_time_ms ) ,
ping_validator_task : TickTask ::new ( 1 ) ,
node_info_update_single_future : MustJoinSingleFuture ::new ( ) ,
2021-11-26 14:54:38 +00:00
}
2021-11-22 16:28:30 +00:00
}
pub fn new ( config : VeilidConfig , table_store : TableStore , crypto : Crypto ) -> Self {
2022-03-19 22:19:40 +00:00
let this = Self {
config : config . clone ( ) ,
2021-11-26 14:54:38 +00:00
table_store ,
crypto ,
2021-11-22 16:28:30 +00:00
inner : Arc ::new ( Mutex ::new ( Self ::new_inner ( ) ) ) ,
2022-03-19 22:19:40 +00:00
unlocked_inner : Arc ::new ( Self ::new_unlocked_inner ( config ) ) ,
} ;
// Set rolling transfers tick task
{
let this2 = this . clone ( ) ;
this . unlocked_inner
. rolling_transfers_task
2022-06-13 00:58:02 +00:00
. set_routine ( move | s , l , t | {
Box ::pin ( this2 . clone ( ) . rolling_transfers_task_routine ( s , l , t ) )
2022-03-19 22:19:40 +00:00
} ) ;
2021-11-22 16:28:30 +00:00
}
2022-04-07 13:55:09 +00:00
// Set relay management tick task
{
let this2 = this . clone ( ) ;
this . unlocked_inner
. relay_management_task
2022-06-13 00:58:02 +00:00
. set_routine ( move | s , l , t | {
Box ::pin ( this2 . clone ( ) . relay_management_task_routine ( s , l , t ) )
2022-04-07 13:55:09 +00:00
} ) ;
}
2022-07-22 17:05:28 +00:00
// Set bootstrap tick task
{
let this2 = this . clone ( ) ;
this . unlocked_inner
. bootstrap_task
. set_routine ( move | s , _l , _t | Box ::pin ( this2 . clone ( ) . bootstrap_task_routine ( s ) ) ) ;
}
// Set peer minimum refresh tick task
{
let this2 = this . clone ( ) ;
this . unlocked_inner
. peer_minimum_refresh_task
. set_routine ( move | s , _l , _t | {
Box ::pin ( this2 . clone ( ) . peer_minimum_refresh_task_routine ( s ) )
} ) ;
}
// Set ping validator tick task
{
let this2 = this . clone ( ) ;
this . unlocked_inner
. ping_validator_task
. set_routine ( move | s , l , t | {
Box ::pin ( this2 . clone ( ) . ping_validator_task_routine ( s , l , t ) )
} ) ;
}
2022-03-19 22:19:40 +00:00
this
2021-11-22 16:28:30 +00:00
}
pub fn config ( & self ) -> VeilidConfig {
self . config . clone ( )
}
pub fn table_store ( & self ) -> TableStore {
self . table_store . clone ( )
}
pub fn crypto ( & self ) -> Crypto {
self . crypto . clone ( )
}
pub fn routing_table ( & self ) -> RoutingTable {
self . inner . lock ( ) . routing_table . as_ref ( ) . unwrap ( ) . clone ( )
}
pub fn net ( & self ) -> Network {
self . inner . lock ( ) . components . as_ref ( ) . unwrap ( ) . net . clone ( )
}
pub fn rpc_processor ( & self ) -> RPCProcessor {
self . inner
. lock ( )
. components
. as_ref ( )
. unwrap ( )
. rpc_processor
. clone ( )
}
pub fn receipt_manager ( & self ) -> ReceiptManager {
self . inner
. lock ( )
. components
. as_ref ( )
. unwrap ( )
. receipt_manager
. clone ( )
}
2022-01-03 04:49:01 +00:00
pub fn connection_manager ( & self ) -> ConnectionManager {
2021-11-22 16:28:30 +00:00
self . inner
. lock ( )
. components
. as_ref ( )
. unwrap ( )
2022-01-03 04:49:01 +00:00
. connection_manager
2021-11-22 16:28:30 +00:00
. clone ( )
}
2022-04-07 13:55:09 +00:00
pub fn relay_node ( & self ) -> Option < NodeRef > {
self . inner . lock ( ) . relay_node . clone ( )
}
2022-06-10 21:07:10 +00:00
#[ instrument(level = " debug " , skip_all, err) ]
2022-07-10 21:36:50 +00:00
pub async fn init ( & self , update_callback : UpdateCallback ) -> EyreResult < ( ) > {
2021-11-22 16:28:30 +00:00
let routing_table = RoutingTable ::new ( self . clone ( ) ) ;
routing_table . init ( ) . await ? ;
self . inner . lock ( ) . routing_table = Some ( routing_table . clone ( ) ) ;
2022-05-16 15:52:48 +00:00
self . inner . lock ( ) . update_callback = Some ( update_callback ) ;
2021-11-22 16:28:30 +00:00
Ok ( ( ) )
}
2022-06-10 21:07:10 +00:00
#[ instrument(level = " debug " , skip_all) ]
2021-11-22 16:28:30 +00:00
pub async fn terminate ( & self ) {
2022-02-07 02:18:42 +00:00
let routing_table = {
let mut inner = self . inner . lock ( ) ;
inner . routing_table . take ( )
} ;
if let Some ( routing_table ) = routing_table {
2021-11-22 16:28:30 +00:00
routing_table . terminate ( ) . await ;
}
2022-05-16 15:52:48 +00:00
self . inner . lock ( ) . update_callback = None ;
2021-11-22 16:28:30 +00:00
}
2022-06-10 21:07:10 +00:00
#[ instrument(level = " debug " , skip_all, err) ]
2022-07-10 21:36:50 +00:00
pub async fn internal_startup ( & self ) -> EyreResult < ( ) > {
2021-11-22 16:28:30 +00:00
trace! ( " NetworkManager::internal_startup begin " ) ;
if self . inner . lock ( ) . components . is_some ( ) {
debug! ( " NetworkManager::internal_startup already started " ) ;
return Ok ( ( ) ) ;
}
// Create network components
2022-01-03 04:49:01 +00:00
let connection_manager = ConnectionManager ::new ( self . clone ( ) ) ;
2022-07-06 18:11:44 +00:00
let net = Network ::new (
self . clone ( ) ,
self . routing_table ( ) ,
connection_manager . clone ( ) ,
) ;
2021-11-22 16:28:30 +00:00
let rpc_processor = RPCProcessor ::new ( self . clone ( ) ) ;
let receipt_manager = ReceiptManager ::new ( self . clone ( ) ) ;
self . inner . lock ( ) . components = Some ( NetworkComponents {
net : net . clone ( ) ,
2022-01-03 04:49:01 +00:00
connection_manager : connection_manager . clone ( ) ,
2021-11-22 16:28:30 +00:00
rpc_processor : rpc_processor . clone ( ) ,
receipt_manager : receipt_manager . clone ( ) ,
} ) ;
// Start network components
2022-06-13 00:58:02 +00:00
connection_manager . startup ( ) . await ;
net . startup ( ) . await ? ;
2021-11-22 16:28:30 +00:00
rpc_processor . startup ( ) . await ? ;
receipt_manager . startup ( ) . await ? ;
trace! ( " NetworkManager::internal_startup end " ) ;
Ok ( ( ) )
}
2022-06-10 21:07:10 +00:00
#[ instrument(level = " debug " , skip_all, err) ]
2022-07-10 21:36:50 +00:00
pub async fn startup ( & self ) -> EyreResult < ( ) > {
2021-11-22 16:28:30 +00:00
if let Err ( e ) = self . internal_startup ( ) . await {
self . shutdown ( ) . await ;
return Err ( e ) ;
}
2022-05-16 15:52:48 +00:00
2022-08-02 01:06:31 +00:00
// Store copy of protocol config and dial info filters
{
let mut inner = self . inner . lock ( ) ;
let pc = inner
. components
. as_ref ( )
. unwrap ( )
. net
. get_protocol_config ( )
. unwrap ( ) ;
inner . public_inbound_dial_info_filter = Some (
DialInfoFilter ::global ( )
. with_protocol_type_set ( pc . inbound )
. with_address_type_set ( pc . family_global ) ,
) ;
inner . local_inbound_dial_info_filter = Some (
DialInfoFilter ::local ( )
. with_protocol_type_set ( pc . inbound )
. with_address_type_set ( pc . family_local ) ,
) ;
inner . public_outbound_dial_info_filter = Some (
DialInfoFilter ::global ( )
. with_protocol_type_set ( pc . outbound )
. with_address_type_set ( pc . family_global ) ,
) ;
inner . local_outbound_dial_info_filter = Some (
DialInfoFilter ::local ( )
. with_protocol_type_set ( pc . outbound )
. with_address_type_set ( pc . family_local ) ,
) ;
inner . protocol_config = Some ( pc ) ;
}
2022-07-22 17:05:28 +00:00
// Inform routing table entries that our dial info has changed
self . send_node_info_updates ( true ) . await ;
// Inform api clients that things have changed
2022-05-16 15:52:48 +00:00
self . send_network_update ( ) ;
2021-11-22 16:28:30 +00:00
Ok ( ( ) )
}
2022-06-10 21:07:10 +00:00
#[ instrument(level = " debug " , skip_all) ]
2021-11-22 16:28:30 +00:00
pub async fn shutdown ( & self ) {
2022-06-15 18:05:04 +00:00
debug! ( " starting network manager shutdown " ) ;
2021-11-22 16:28:30 +00:00
2022-05-25 15:12:19 +00:00
// Cancel all tasks
2022-06-15 18:05:04 +00:00
debug! ( " stopping rolling transfers task " ) ;
2022-06-13 00:58:02 +00:00
if let Err ( e ) = self . unlocked_inner . rolling_transfers_task . stop ( ) . await {
warn! ( " rolling_transfers_task not stopped: {} " , e ) ;
2022-05-25 15:12:19 +00:00
}
2022-07-22 17:05:28 +00:00
debug! ( " stopping relay management task " ) ;
2022-06-13 00:58:02 +00:00
if let Err ( e ) = self . unlocked_inner . relay_management_task . stop ( ) . await {
warn! ( " relay_management_task not stopped: {} " , e ) ;
2022-05-25 15:12:19 +00:00
}
2022-07-22 17:05:28 +00:00
debug! ( " stopping bootstrap task " ) ;
if let Err ( e ) = self . unlocked_inner . bootstrap_task . stop ( ) . await {
error! ( " bootstrap_task not stopped: {} " , e ) ;
}
debug! ( " stopping peer minimum refresh task " ) ;
if let Err ( e ) = self . unlocked_inner . peer_minimum_refresh_task . stop ( ) . await {
error! ( " peer_minimum_refresh_task not stopped: {} " , e ) ;
}
debug! ( " stopping ping_validator task " ) ;
if let Err ( e ) = self . unlocked_inner . ping_validator_task . stop ( ) . await {
error! ( " ping_validator_task not stopped: {} " , e ) ;
}
debug! ( " stopping node info update singlefuture " ) ;
if self
. unlocked_inner
. node_info_update_single_future
. join ( )
. await
. is_err ( )
{
error! ( " node_info_update_single_future not stopped " ) ;
}
2022-05-25 15:12:19 +00:00
2021-11-22 16:28:30 +00:00
// Shutdown network components if they started up
2022-06-15 18:05:04 +00:00
debug! ( " shutting down network components " ) ;
2022-01-03 04:49:01 +00:00
let components = self . inner . lock ( ) . components . clone ( ) ;
2021-11-22 16:28:30 +00:00
if let Some ( components ) = components {
2022-06-13 00:58:02 +00:00
components . net . shutdown ( ) . await ;
2022-06-15 18:05:04 +00:00
components . rpc_processor . shutdown ( ) . await ;
components . receipt_manager . shutdown ( ) . await ;
2022-06-29 14:13:49 +00:00
components . connection_manager . shutdown ( ) . await ;
2021-11-22 16:28:30 +00:00
}
// reset the state
2022-06-15 18:05:04 +00:00
debug! ( " resetting network manager state " ) ;
2022-05-16 15:52:48 +00:00
{
let mut inner = self . inner . lock ( ) ;
inner . components = None ;
2022-05-28 15:44:09 +00:00
inner . relay_node = None ;
2022-08-02 01:06:31 +00:00
inner . public_inbound_dial_info_filter = None ;
inner . local_inbound_dial_info_filter = None ;
inner . public_outbound_dial_info_filter = None ;
inner . local_outbound_dial_info_filter = None ;
inner . protocol_config = None ;
2022-05-16 15:52:48 +00:00
}
// send update
2022-06-15 18:05:04 +00:00
debug! ( " sending network state update " ) ;
2022-05-16 15:52:48 +00:00
self . send_network_update ( ) ;
2021-11-22 16:28:30 +00:00
2022-06-15 18:05:04 +00:00
debug! ( " finished network manager shutdown " ) ;
2021-11-22 16:28:30 +00:00
}
2022-05-31 23:54:52 +00:00
pub fn update_client_whitelist ( & self , client : DHTKey ) {
2022-04-03 16:58:06 +00:00
let mut inner = self . inner . lock ( ) ;
match inner . client_whitelist . entry ( client ) {
hashlink ::lru_cache ::Entry ::Occupied ( mut entry ) = > {
2022-05-24 21:13:52 +00:00
entry . get_mut ( ) . last_seen_ts = intf ::get_timestamp ( )
2022-04-03 16:58:06 +00:00
}
hashlink ::lru_cache ::Entry ::Vacant ( entry ) = > {
entry . insert ( ClientWhitelistEntry {
2022-05-24 21:13:52 +00:00
last_seen_ts : intf ::get_timestamp ( ) ,
2022-04-03 16:58:06 +00:00
} ) ;
}
}
}
2022-06-10 21:07:10 +00:00
#[ instrument(level = " trace " , skip(self), ret) ]
2022-05-31 23:54:52 +00:00
pub fn check_client_whitelist ( & self , client : DHTKey ) -> bool {
2022-04-03 16:58:06 +00:00
let mut inner = self . inner . lock ( ) ;
match inner . client_whitelist . entry ( client ) {
hashlink ::lru_cache ::Entry ::Occupied ( mut entry ) = > {
2022-05-24 21:13:52 +00:00
entry . get_mut ( ) . last_seen_ts = intf ::get_timestamp ( ) ;
2022-04-03 16:58:06 +00:00
true
}
hashlink ::lru_cache ::Entry ::Vacant ( _ ) = > false ,
}
}
pub fn purge_client_whitelist ( & self ) {
let timeout_ms = self . config . get ( ) . network . client_whitelist_timeout_ms ;
let mut inner = self . inner . lock ( ) ;
let cutoff_timestamp = intf ::get_timestamp ( ) - ( ( timeout_ms as u64 ) * 1000 u64 ) ;
// Remove clients from the whitelist that haven't been since since our whitelist timeout
while inner
. client_whitelist
. peek_lru ( )
2022-05-24 21:13:52 +00:00
. map ( | v | v . 1. last_seen_ts < cutoff_timestamp )
2022-04-03 16:58:06 +00:00
. unwrap_or_default ( )
{
2022-06-15 19:03:13 +00:00
let ( k , v ) = inner . client_whitelist . remove_lru ( ) . unwrap ( ) ;
trace! ( key = ? k , value = ? v , " purge_client_whitelist: remove_lru " )
2022-04-03 16:58:06 +00:00
}
}
2022-07-22 17:05:28 +00:00
pub fn needs_restart ( & self ) -> bool {
let net = self . net ( ) ;
net . needs_restart ( )
2022-06-10 21:07:10 +00:00
}
2022-07-10 21:36:50 +00:00
pub async fn tick ( & self ) -> EyreResult < ( ) > {
2022-08-04 14:35:19 +00:00
let ( routing_table , net , receipt_manager , protocol_config ) = {
2021-11-22 16:28:30 +00:00
let inner = self . inner . lock ( ) ;
let components = inner . components . as_ref ( ) . unwrap ( ) ;
2022-08-04 14:35:19 +00:00
let protocol_config = inner . protocol_config . as_ref ( ) . unwrap ( ) ;
2021-11-22 16:28:30 +00:00
(
2021-12-09 21:00:47 +00:00
inner . routing_table . as_ref ( ) . unwrap ( ) . clone ( ) ,
2021-11-22 16:28:30 +00:00
components . net . clone ( ) ,
components . receipt_manager . clone ( ) ,
2022-08-04 14:35:19 +00:00
protocol_config . clone ( ) ,
2021-11-22 16:28:30 +00:00
)
} ;
2022-05-18 14:17:04 +00:00
// Run the rolling transfers task
self . unlocked_inner . rolling_transfers_task . tick ( ) . await ? ;
2022-08-04 14:35:19 +00:00
// Process global peer scope ticks
// These only make sense when connected to the actual internet and not just the local network
// Must have at least one outbound protocol enabled, and one global peer scope address family enabled
let global_peer_scope_enabled =
! protocol_config . outbound . is_empty ( ) & & ! protocol_config . family_global . is_empty ( ) ;
if global_peer_scope_enabled {
// Run the relay management task
self . unlocked_inner . relay_management_task . tick ( ) . await ? ;
// If routing table has no live entries, then add the bootstrap nodes to it
let live_entry_count = routing_table . get_entry_count ( BucketEntryState ::Unreliable ) ;
if live_entry_count = = 0 {
self . unlocked_inner . bootstrap_task . tick ( ) . await ? ;
}
2022-05-18 14:17:04 +00:00
2022-08-04 14:35:19 +00:00
// If we still don't have enough peers, find nodes until we do
let min_peer_count = {
let c = self . config . get ( ) ;
c . network . dht . min_peer_count as usize
} ;
if live_entry_count < min_peer_count {
self . unlocked_inner . peer_minimum_refresh_task . tick ( ) . await ? ;
}
2022-07-22 17:05:28 +00:00
2022-08-04 14:35:19 +00:00
// Ping validate some nodes to groom the table
self . unlocked_inner . ping_validator_task . tick ( ) . await ? ;
2022-07-22 17:05:28 +00:00
}
2021-12-09 21:00:47 +00:00
// Run the routing table tick
routing_table . tick ( ) . await ? ;
2021-11-22 16:28:30 +00:00
// Run the low level network tick
net . tick ( ) . await ? ;
// Run the receipt manager tick
receipt_manager . tick ( ) . await ? ;
2022-04-03 16:58:06 +00:00
// Purge the client whitelist
self . purge_client_whitelist ( ) ;
2021-11-22 16:28:30 +00:00
Ok ( ( ) )
}
// Return what network class we are in
2021-12-22 03:20:55 +00:00
pub fn get_network_class ( & self ) -> Option < NetworkClass > {
2021-11-22 16:28:30 +00:00
if let Some ( components ) = & self . inner . lock ( ) . components {
components . net . get_network_class ( )
} else {
2021-12-22 03:20:55 +00:00
None
}
}
2022-04-03 16:58:06 +00:00
// Get our node's capabilities
2022-04-08 14:17:09 +00:00
pub fn generate_node_status ( & self ) -> NodeStatus {
2022-04-25 00:16:13 +00:00
let peer_info = self . routing_table ( ) . get_own_peer_info ( ) ;
2022-04-03 16:58:06 +00:00
2022-05-11 01:49:42 +00:00
let will_route = peer_info . signed_node_info . node_info . can_inbound_relay ( ) ; // xxx: eventually this may have more criteria added
let will_tunnel = peer_info . signed_node_info . node_info . can_inbound_relay ( ) ; // xxx: we may want to restrict by battery life and network bandwidth at some point
let will_signal = peer_info . signed_node_info . node_info . can_signal ( ) ;
let will_relay = peer_info . signed_node_info . node_info . can_inbound_relay ( ) ;
let will_validate_dial_info = peer_info
. signed_node_info
. node_info
. can_validate_dial_info ( ) ;
2022-04-03 16:58:06 +00:00
2022-04-08 14:17:09 +00:00
NodeStatus {
2022-04-03 16:58:06 +00:00
will_route ,
will_tunnel ,
will_signal ,
will_relay ,
will_validate_dial_info ,
}
}
2021-12-22 03:20:55 +00:00
// Return what protocols we have enabled
2022-08-02 01:06:31 +00:00
pub fn get_protocol_config ( & self ) -> ProtocolConfig {
let inner = self . inner . lock ( ) ;
inner . protocol_config . as_ref ( ) . unwrap ( ) . clone ( )
}
// Return a dial info filter for what we can receive
pub fn get_inbound_dial_info_filter ( & self , routing_domain : RoutingDomain ) -> DialInfoFilter {
let inner = self . inner . lock ( ) ;
match routing_domain {
RoutingDomain ::PublicInternet = > inner
. public_inbound_dial_info_filter
. as_ref ( )
. unwrap ( )
. clone ( ) ,
RoutingDomain ::LocalNetwork = > inner
. local_inbound_dial_info_filter
. as_ref ( )
. unwrap ( )
. clone ( ) ,
}
}
// Return a dial info filter for what we can send out
pub fn get_outbound_dial_info_filter ( & self , routing_domain : RoutingDomain ) -> DialInfoFilter {
let inner = self . inner . lock ( ) ;
match routing_domain {
RoutingDomain ::PublicInternet = > inner
. public_outbound_dial_info_filter
. as_ref ( )
. unwrap ( )
. clone ( ) ,
RoutingDomain ::LocalNetwork = > inner
. local_outbound_dial_info_filter
. as_ref ( )
. unwrap ( )
. clone ( ) ,
2021-11-22 16:28:30 +00:00
}
}
2022-05-28 14:07:57 +00:00
// Generates a multi-shot/normal receipt
2022-06-10 21:07:10 +00:00
#[ instrument(level = " trace " , skip(self, extra_data, callback), err) ]
2022-05-28 20:11:50 +00:00
pub fn generate_receipt < D : AsRef < [ u8 ] > > (
2021-11-22 16:28:30 +00:00
& self ,
expiration_us : u64 ,
expected_returns : u32 ,
2022-05-28 20:11:50 +00:00
extra_data : D ,
2021-11-22 16:28:30 +00:00
callback : impl ReceiptCallback ,
2022-07-10 21:36:50 +00:00
) -> EyreResult < Vec < u8 > > {
2021-11-22 16:28:30 +00:00
let receipt_manager = self . receipt_manager ( ) ;
2022-05-28 20:11:50 +00:00
let routing_table = self . routing_table ( ) ;
2021-11-22 16:28:30 +00:00
2022-05-28 20:11:50 +00:00
// Generate receipt and serialized form to return
let nonce = Crypto ::get_random_nonce ( ) ;
let receipt = Receipt ::try_new ( 0 , nonce , routing_table . node_id ( ) , extra_data ) ? ;
let out = receipt
. to_signed_data ( & routing_table . node_id_secret ( ) )
2022-07-10 21:36:50 +00:00
. wrap_err ( " failed to generate signed receipt " ) ? ;
2021-11-22 16:28:30 +00:00
// Record the receipt for later
let exp_ts = intf ::get_timestamp ( ) + expiration_us ;
2022-05-28 20:11:50 +00:00
receipt_manager . record_receipt ( receipt , exp_ts , expected_returns , callback ) ;
2021-11-22 16:28:30 +00:00
2022-05-28 20:11:50 +00:00
Ok ( out )
2021-11-22 16:28:30 +00:00
}
2022-05-28 14:07:57 +00:00
// Generates a single-shot/normal receipt
2022-06-10 21:07:10 +00:00
#[ instrument(level = " trace " , skip(self, extra_data), err) ]
2022-05-28 20:11:50 +00:00
pub fn generate_single_shot_receipt < D : AsRef < [ u8 ] > > (
2021-11-22 16:28:30 +00:00
& self ,
expiration_us : u64 ,
2022-05-28 20:11:50 +00:00
extra_data : D ,
2022-07-10 21:36:50 +00:00
) -> EyreResult < ( Vec < u8 > , EventualValueFuture < ReceiptEvent > ) > {
2021-11-22 16:28:30 +00:00
let receipt_manager = self . receipt_manager ( ) ;
2022-05-28 20:11:50 +00:00
let routing_table = self . routing_table ( ) ;
2021-11-22 16:28:30 +00:00
2022-05-28 20:11:50 +00:00
// Generate receipt and serialized form to return
let nonce = Crypto ::get_random_nonce ( ) ;
let receipt = Receipt ::try_new ( 0 , nonce , routing_table . node_id ( ) , extra_data ) ? ;
let out = receipt
. to_signed_data ( & routing_table . node_id_secret ( ) )
2022-07-10 21:36:50 +00:00
. wrap_err ( " failed to generate signed receipt " ) ? ;
2021-11-22 16:28:30 +00:00
// Record the receipt for later
let exp_ts = intf ::get_timestamp ( ) + expiration_us ;
2022-05-24 21:13:52 +00:00
let eventual = SingleShotEventual ::new ( Some ( ReceiptEvent ::Cancelled ) ) ;
2021-11-22 16:28:30 +00:00
let instance = eventual . instance ( ) ;
2022-05-28 20:11:50 +00:00
receipt_manager . record_single_shot_receipt ( receipt , exp_ts , eventual ) ;
2021-11-22 16:28:30 +00:00
2022-05-28 20:11:50 +00:00
Ok ( ( out , instance ) )
2021-11-22 16:28:30 +00:00
}
// Process a received out-of-band receipt
2022-07-20 13:39:38 +00:00
#[ instrument(level = " trace " , skip(self, receipt_data), ret) ]
2022-04-17 23:10:10 +00:00
pub async fn handle_out_of_band_receipt < R : AsRef < [ u8 ] > > (
2022-04-17 17:28:39 +00:00
& self ,
receipt_data : R ,
2022-07-20 13:39:38 +00:00
) -> NetworkResult < ( ) > {
2022-04-17 17:28:39 +00:00
let receipt_manager = self . receipt_manager ( ) ;
2022-07-20 13:39:38 +00:00
let receipt = match Receipt ::from_signed_data ( receipt_data . as_ref ( ) ) {
Err ( e ) = > {
return NetworkResult ::invalid_message ( e . to_string ( ) ) ;
}
Ok ( v ) = > v ,
} ;
2022-04-17 17:28:39 +00:00
2022-05-28 20:11:50 +00:00
receipt_manager . handle_receipt ( receipt , None ) . await
2022-04-17 17:28:39 +00:00
}
// Process a received in-band receipt
2022-07-20 13:39:38 +00:00
#[ instrument(level = " trace " , skip(self, receipt_data), ret) ]
2022-05-28 20:11:50 +00:00
pub async fn handle_in_band_receipt < R : AsRef < [ u8 ] > > (
2022-04-17 17:28:39 +00:00
& self ,
2022-05-28 20:11:50 +00:00
receipt_data : R ,
2022-04-17 17:28:39 +00:00
inbound_nr : NodeRef ,
2022-07-20 13:39:38 +00:00
) -> NetworkResult < ( ) > {
2021-11-22 16:28:30 +00:00
let receipt_manager = self . receipt_manager ( ) ;
2022-04-17 17:28:39 +00:00
2022-07-20 13:39:38 +00:00
let receipt = match Receipt ::from_signed_data ( receipt_data . as_ref ( ) ) {
Err ( e ) = > {
return NetworkResult ::invalid_message ( e . to_string ( ) ) ;
}
Ok ( v ) = > v ,
} ;
2022-05-28 20:11:50 +00:00
2022-05-28 14:07:57 +00:00
receipt_manager
2022-05-28 20:11:50 +00:00
. handle_receipt ( receipt , Some ( inbound_nr ) )
2022-05-28 14:07:57 +00:00
. await
2021-11-22 16:28:30 +00:00
}
2022-04-17 23:10:10 +00:00
// Process a received signal
2022-06-10 21:07:10 +00:00
#[ instrument(level = " trace " , skip(self), err) ]
2022-07-20 13:39:38 +00:00
pub async fn handle_signal (
& self ,
_sender_id : DHTKey ,
signal_info : SignalInfo ,
) -> EyreResult < NetworkResult < ( ) > > {
2022-04-17 23:10:10 +00:00
match signal_info {
2022-05-28 20:11:50 +00:00
SignalInfo ::ReverseConnect { receipt , peer_info } = > {
2022-04-17 23:10:10 +00:00
let routing_table = self . routing_table ( ) ;
let rpc = self . rpc_processor ( ) ;
// Add the peer info to our routing table
2022-07-20 13:39:38 +00:00
let peer_nr = match routing_table . register_node_with_signed_node_info (
2022-05-11 01:49:42 +00:00
peer_info . node_id . key ,
peer_info . signed_node_info ,
2022-07-20 13:39:38 +00:00
) {
None = > {
return Ok ( NetworkResult ::invalid_message (
" unable to register reverse connect peerinfo " ,
) )
}
Some ( nr ) = > nr ,
} ;
2022-04-17 23:10:10 +00:00
// Make a reverse connection to the peer and send the receipt to it
2022-05-28 20:11:50 +00:00
rpc . rpc_call_return_receipt ( Destination ::Direct ( peer_nr ) , None , receipt )
2022-04-17 23:10:10 +00:00
. await
2022-07-20 13:39:38 +00:00
. wrap_err ( " rpc failure " )
2022-04-17 23:10:10 +00:00
}
2022-05-28 20:11:50 +00:00
SignalInfo ::HolePunch { receipt , peer_info } = > {
2022-04-17 23:10:10 +00:00
let routing_table = self . routing_table ( ) ;
2022-05-28 14:07:57 +00:00
let rpc = self . rpc_processor ( ) ;
2022-04-17 23:10:10 +00:00
// Add the peer info to our routing table
2022-07-20 13:39:38 +00:00
let mut peer_nr = match routing_table . register_node_with_signed_node_info (
2022-05-11 01:49:42 +00:00
peer_info . node_id . key ,
peer_info . signed_node_info ,
2022-07-20 13:39:38 +00:00
) {
None = > {
return Ok ( NetworkResult ::invalid_message (
//sender_id,
" unable to register hole punch connect peerinfo " ,
) ) ;
}
Some ( nr ) = > nr ,
} ;
2022-04-17 23:10:10 +00:00
// Get the udp direct dialinfo for the hole punch
2022-08-02 01:06:31 +00:00
let outbound_dif = self
. get_outbound_dial_info_filter ( RoutingDomain ::PublicInternet )
. with_protocol_type ( ProtocolType ::UDP ) ;
peer_nr . set_filter ( Some ( outbound_dif ) ) ;
2022-04-25 00:16:13 +00:00
let hole_punch_dial_info_detail = peer_nr
. first_filtered_dial_info_detail ( Some ( RoutingDomain ::PublicInternet ) )
2022-07-10 21:36:50 +00:00
. ok_or_else ( | | eyre! ( " No hole punch capable dialinfo found for node " ) ) ? ;
2022-04-17 23:10:10 +00:00
2022-05-28 14:07:57 +00:00
// Now that we picked a specific dialinfo, further restrict the noderef to the specific address type
2022-08-02 01:06:31 +00:00
let filter = peer_nr . take_filter ( ) . unwrap ( ) ;
let filter =
filter . with_address_type ( hole_punch_dial_info_detail . dial_info . address_type ( ) ) ;
2022-05-28 14:07:57 +00:00
peer_nr . set_filter ( Some ( filter ) ) ;
2022-04-17 23:10:10 +00:00
// Do our half of the hole punch by sending an empty packet
// Both sides will do this and then the receipt will get sent over the punched hole
2022-07-20 13:39:38 +00:00
network_result_try! (
self . net ( )
. send_data_to_dial_info (
hole_punch_dial_info_detail . dial_info . clone ( ) ,
Vec ::new ( ) ,
)
. await ?
) ;
2022-04-17 23:10:10 +00:00
// XXX: do we need a delay here? or another hole punch packet?
2022-05-28 14:07:57 +00:00
// Return the receipt using the same dial info send the receipt to it
2022-05-28 20:11:50 +00:00
rpc . rpc_call_return_receipt ( Destination ::Direct ( peer_nr ) , None , receipt )
2022-04-17 23:10:10 +00:00
. await
2022-07-20 13:39:38 +00:00
. wrap_err ( " rpc failure " )
2022-04-17 23:10:10 +00:00
}
}
}
2021-11-22 16:28:30 +00:00
// Builds an envelope for sending over the network
2022-06-10 21:07:10 +00:00
#[ instrument(level = " trace " , skip(self, body), err) ]
2021-11-22 16:28:30 +00:00
fn build_envelope < B : AsRef < [ u8 ] > > (
& self ,
2022-05-31 23:54:52 +00:00
dest_node_id : DHTKey ,
2021-11-22 16:28:30 +00:00
version : u8 ,
body : B ,
2022-07-10 21:36:50 +00:00
) -> EyreResult < Vec < u8 > > {
2021-11-22 16:28:30 +00:00
// DH to get encryption key
let routing_table = self . routing_table ( ) ;
let node_id = routing_table . node_id ( ) ;
let node_id_secret = routing_table . node_id_secret ( ) ;
// Get timestamp, nonce
let ts = intf ::get_timestamp ( ) ;
let nonce = Crypto ::get_random_nonce ( ) ;
// Encode envelope
let envelope = Envelope ::new ( version , ts , nonce , node_id , dest_node_id ) ;
envelope
. to_encrypted_data ( self . crypto . clone ( ) , body . as_ref ( ) , & node_id_secret )
2022-07-10 21:36:50 +00:00
. wrap_err ( " envelope failed to encode " )
2021-11-22 16:28:30 +00:00
}
// Called by the RPC handler when we want to issue an RPC request or response
2022-04-16 15:18:54 +00:00
// node_ref is the direct destination to which the envelope will be sent
// If 'node_id' is specified, it can be different than node_ref.node_id()
// which will cause the envelope to be relayed
2022-06-10 21:07:10 +00:00
#[ instrument(level = " trace " , skip(self, body), ret, err) ]
2021-11-22 16:28:30 +00:00
pub async fn send_envelope < B : AsRef < [ u8 ] > > (
& self ,
node_ref : NodeRef ,
2022-04-23 01:30:09 +00:00
envelope_node_id : Option < DHTKey > ,
2021-11-22 16:28:30 +00:00
body : B ,
2022-07-20 13:39:38 +00:00
) -> EyreResult < NetworkResult < SendDataKind > > {
2022-04-23 01:30:09 +00:00
let via_node_id = node_ref . node_id ( ) ;
let envelope_node_id = envelope_node_id . unwrap_or ( via_node_id ) ;
if envelope_node_id ! = via_node_id {
log_net! (
" sending envelope to {:?} via {:?} " ,
envelope_node_id ,
node_ref
) ;
2022-04-16 15:18:54 +00:00
} else {
log_net! ( " sending envelope to {:?} " , node_ref ) ;
}
2021-11-22 16:28:30 +00:00
// Get node's min/max version and see if we can send to it
// and if so, get the max version we can use
let version = if let Some ( ( node_min , node_max ) ) = node_ref . operate ( | e | e . min_max_version ( ) )
{
2021-11-27 17:44:21 +00:00
#[ allow(clippy::absurd_extreme_comparisons) ]
2021-11-22 16:28:30 +00:00
if node_min > MAX_VERSION | | node_max < MIN_VERSION {
2022-07-10 21:36:50 +00:00
bail! (
2021-11-22 16:28:30 +00:00
" can't talk to this node {} because version is unsupported: ({},{}) " ,
2022-07-10 21:36:50 +00:00
via_node_id ,
node_min ,
node_max
) ;
2021-11-22 16:28:30 +00:00
}
cmp ::min ( node_max , MAX_VERSION )
} else {
MAX_VERSION
} ;
// Build the envelope to send
2022-07-20 13:39:38 +00:00
let out = self . build_envelope ( envelope_node_id , version , body ) ? ;
2021-11-22 16:28:30 +00:00
2022-04-16 15:18:54 +00:00
// Send the envelope via whatever means necessary
2022-07-20 13:39:38 +00:00
let send_data_kind = network_result_try! ( self . send_data ( node_ref . clone ( ) , out ) . await ? ) ;
2022-04-23 01:30:09 +00:00
// If we asked to relay from the start, then this is always indirect
2022-07-20 13:39:38 +00:00
Ok ( NetworkResult ::value ( if envelope_node_id ! = via_node_id {
SendDataKind ::GlobalIndirect
} else {
send_data_kind
} ) )
2021-11-22 16:28:30 +00:00
}
// Called by the RPC handler when we want to issue an direct receipt
2022-06-10 21:07:10 +00:00
#[ instrument(level = " trace " , skip(self, rcpt_data), err) ]
2022-05-28 20:11:50 +00:00
pub async fn send_out_of_band_receipt (
2021-11-22 16:28:30 +00:00
& self ,
2022-01-04 19:25:32 +00:00
dial_info : DialInfo ,
2022-05-28 20:11:50 +00:00
rcpt_data : Vec < u8 > ,
2022-07-10 21:36:50 +00:00
) -> EyreResult < ( ) > {
2022-05-28 20:11:50 +00:00
// Do we need to validate the outgoing receipt? Probably not
// because it is supposed to be opaque and the
// recipient/originator does the validation
// Also, in the case of an old 'version', returning the receipt
// should not be subject to our ability to decode it
2021-11-22 16:28:30 +00:00
// Send receipt directly
2022-07-20 13:39:38 +00:00
network_result_value_or_log! ( debug self
2022-07-14 20:57:34 +00:00
. net ( )
2022-05-28 14:07:57 +00:00
. send_data_unbound_to_dial_info ( dial_info , rcpt_data )
2022-07-20 13:39:38 +00:00
. await ? = > {
return Ok ( ( ) ) ;
2022-07-14 20:57:34 +00:00
}
2022-07-20 13:39:38 +00:00
) ;
2022-07-14 20:57:34 +00:00
Ok ( ( ) )
2021-11-22 16:28:30 +00:00
}
2022-07-10 21:36:50 +00:00
#[ instrument(level = " trace " , skip(self), ret) ]
2022-08-02 01:06:31 +00:00
fn get_contact_method_public ( & self , target_node_ref : NodeRef ) -> ContactMethod {
2022-04-21 00:49:16 +00:00
// Scope noderef down to protocols we can do outbound
2022-08-02 01:06:31 +00:00
let public_outbound_dif = self . get_outbound_dial_info_filter ( RoutingDomain ::PublicInternet ) ;
let target_node_ref = target_node_ref . filtered_clone ( public_outbound_dif . clone ( ) ) ;
2022-04-16 15:18:54 +00:00
2022-04-25 00:16:13 +00:00
// Get the best match internet dial info if we have it
2022-04-25 15:29:02 +00:00
let opt_target_public_did =
2022-04-25 00:16:13 +00:00
target_node_ref . first_filtered_dial_info_detail ( Some ( RoutingDomain ::PublicInternet ) ) ;
2022-04-25 15:29:02 +00:00
if let Some ( target_public_did ) = opt_target_public_did {
2022-04-16 15:18:54 +00:00
// Do we need to signal before going inbound?
2022-04-25 15:29:02 +00:00
if ! target_public_did . class . requires_signal ( ) {
// Go direct without signaling
2022-07-10 21:36:50 +00:00
return ContactMethod ::Direct ( target_public_did . dial_info ) ;
2022-04-25 15:29:02 +00:00
}
2022-04-25 00:16:13 +00:00
2022-04-25 15:29:02 +00:00
// Get the target's inbound relay, it must have one or it is not reachable
2022-07-06 01:21:58 +00:00
// Note that .relay() never returns our own node. We can't relay to ourselves.
2022-04-25 15:29:02 +00:00
if let Some ( inbound_relay_nr ) = target_node_ref . relay ( ) {
2022-08-02 01:06:31 +00:00
// Scope down to protocols we can do outbound
let inbound_relay_nr = inbound_relay_nr . filtered_clone ( public_outbound_dif . clone ( ) ) ;
2022-04-25 15:29:02 +00:00
// Can we reach the inbound relay?
if inbound_relay_nr
. first_filtered_dial_info_detail ( Some ( RoutingDomain ::PublicInternet ) )
. is_some ( )
{
// Can we receive anything inbound ever?
2022-08-02 01:06:31 +00:00
let our_network_class =
self . get_network_class ( ) . unwrap_or ( NetworkClass ::Invalid ) ;
2022-04-25 15:29:02 +00:00
if matches! ( our_network_class , NetworkClass ::InboundCapable ) {
2022-08-02 01:06:31 +00:00
let routing_table = self . routing_table ( ) ;
///////// Reverse connection
2022-04-25 15:29:02 +00:00
// Get the best match dial info for an reverse inbound connection
2022-08-02 01:06:31 +00:00
let reverse_dif = self
. get_inbound_dial_info_filter ( RoutingDomain ::PublicInternet )
. filtered ( target_node_ref . node_info_outbound_filter ( ) ) ;
2022-04-25 15:29:02 +00:00
if let Some ( reverse_did ) = routing_table . first_filtered_dial_info_detail (
Some ( RoutingDomain ::PublicInternet ) ,
& reverse_dif ,
) {
// Can we receive a direct reverse connection?
if ! reverse_did . class . requires_signal ( ) {
2022-07-10 21:36:50 +00:00
return ContactMethod ::SignalReverse (
2022-04-25 15:29:02 +00:00
inbound_relay_nr ,
target_node_ref ,
2022-07-10 21:36:50 +00:00
) ;
2022-04-17 17:28:39 +00:00
}
}
2022-08-02 01:06:31 +00:00
///////// UDP hole-punch
// Does the target have a direct udp dialinfo we can reach?
let udp_target_nr = target_node_ref . filtered_clone (
DialInfoFilter ::global ( ) . with_protocol_type ( ProtocolType ::UDP ) ,
) ;
let target_has_udp_dialinfo = udp_target_nr
. first_filtered_dial_info_detail ( Some ( RoutingDomain ::PublicInternet ) )
. is_some ( ) ;
// Does the self node have a direct udp dialinfo the target can reach?
let inbound_udp_dif = self
. get_inbound_dial_info_filter ( RoutingDomain ::PublicInternet )
. filtered ( target_node_ref . node_info_outbound_filter ( ) )
. filtered (
DialInfoFilter ::global ( ) . with_protocol_type ( ProtocolType ::UDP ) ,
) ;
let self_has_udp_dialinfo = routing_table
. first_filtered_dial_info_detail (
Some ( RoutingDomain ::PublicInternet ) ,
& inbound_udp_dif ,
)
. is_some ( ) ;
// Does the target and ourselves have a udp dialinfo that they can reach?
if target_has_udp_dialinfo & & self_has_udp_dialinfo {
return ContactMethod ::SignalHolePunch ( inbound_relay_nr , udp_target_nr ) ;
2022-04-25 15:29:02 +00:00
}
// Otherwise we have to inbound relay
2022-04-17 17:28:39 +00:00
}
2022-04-25 15:29:02 +00:00
2022-07-10 21:36:50 +00:00
return ContactMethod ::InboundRelay ( inbound_relay_nr ) ;
2022-04-16 15:18:54 +00:00
}
}
2022-04-25 15:29:02 +00:00
}
2022-08-02 01:06:31 +00:00
// If the other node is not inbound capable at all, it needs to have an inbound relay
2022-04-25 15:29:02 +00:00
else if let Some ( target_inbound_relay_nr ) = target_node_ref . relay ( ) {
// Can we reach the full relay?
if target_inbound_relay_nr
. first_filtered_dial_info_detail ( Some ( RoutingDomain ::PublicInternet ) )
. is_some ( )
{
2022-07-10 21:36:50 +00:00
return ContactMethod ::InboundRelay ( target_inbound_relay_nr ) ;
2022-04-17 17:28:39 +00:00
}
}
2022-04-25 15:29:02 +00:00
2022-08-02 01:06:31 +00:00
ContactMethod ::Unreachable
}
#[ instrument(level = " trace " , skip(self), ret) ]
fn get_contact_method_local ( & self , target_node_ref : NodeRef ) -> ContactMethod {
// Scope noderef down to protocols we can do outbound
let local_outbound_dif = self . get_outbound_dial_info_filter ( RoutingDomain ::LocalNetwork ) ;
let target_node_ref = target_node_ref . filtered_clone ( local_outbound_dif ) ;
// Get the best matching local direct dial info if we have it
// ygjghhtiygiukuymyg
if target_node_ref . is_filter_dead ( ) {
return ContactMethod ::Unreachable ;
}
let opt_target_local_did =
target_node_ref . first_filtered_dial_info_detail ( Some ( RoutingDomain ::LocalNetwork ) ) ;
if let Some ( target_local_did ) = opt_target_local_did {
return ContactMethod ::Direct ( target_local_did . dial_info ) ;
}
return ContactMethod ::Unreachable ;
}
// Figure out how to reach a node
#[ instrument(level = " trace " , skip(self), ret) ]
fn get_contact_method ( & self , target_node_ref : NodeRef ) -> ContactMethod {
// Try local first
let out = self . get_contact_method_local ( target_node_ref . clone ( ) ) ;
if ! matches! ( out , ContactMethod ::Unreachable ) {
return out ;
}
// Try public next
let out = self . get_contact_method_public ( target_node_ref . clone ( ) ) ;
if ! matches! ( out , ContactMethod ::Unreachable ) {
return out ;
}
2022-04-17 17:28:39 +00:00
// If we can't reach the node by other means, try our outbound relay if we have one
if let Some ( relay_node ) = self . relay_node ( ) {
2022-07-10 21:36:50 +00:00
return ContactMethod ::OutboundRelay ( relay_node ) ;
2022-04-16 15:18:54 +00:00
}
2022-08-02 01:06:31 +00:00
2022-04-17 17:28:39 +00:00
// Otherwise, we can't reach this node
2022-05-24 21:13:52 +00:00
debug! ( " unable to reach node {:?} " , target_node_ref ) ;
2022-07-10 21:36:50 +00:00
ContactMethod ::Unreachable
2022-04-16 15:18:54 +00:00
}
// Send a reverse connection signal and wait for the return receipt over it
// Then send the data across the new connection
2022-06-10 21:07:10 +00:00
#[ instrument(level = " trace " , skip(self, data), err) ]
2022-04-16 15:18:54 +00:00
pub async fn do_reverse_connect (
& self ,
2022-04-17 17:28:39 +00:00
relay_nr : NodeRef ,
target_nr : NodeRef ,
2022-04-16 15:18:54 +00:00
data : Vec < u8 > ,
2022-07-20 13:39:38 +00:00
) -> EyreResult < NetworkResult < ( ) > > {
2022-04-16 15:18:54 +00:00
// Build a return receipt for the signal
2022-04-17 17:28:39 +00:00
let receipt_timeout =
ms_to_us ( self . config . get ( ) . network . reverse_connection_receipt_time_ms ) ;
2022-07-10 21:36:50 +00:00
let ( receipt , eventual_value ) = self . generate_single_shot_receipt ( receipt_timeout , [ ] ) ? ;
2022-04-17 17:28:39 +00:00
// Get our peer info
let peer_info = self . routing_table ( ) . get_own_peer_info ( ) ;
2022-04-16 15:18:54 +00:00
// Issue the signal
let rpc = self . rpc_processor ( ) ;
2022-07-20 13:39:38 +00:00
network_result_try! ( rpc
. rpc_call_signal (
Destination ::Relay ( relay_nr . clone ( ) , target_nr . node_id ( ) ) ,
None ,
SignalInfo ::ReverseConnect { receipt , peer_info } ,
)
. await
. wrap_err ( " failed to send signal " ) ? ) ;
2022-04-16 15:18:54 +00:00
// Wait for the return receipt
2022-05-24 21:13:52 +00:00
let inbound_nr = match eventual_value . await . take_value ( ) . unwrap ( ) {
2022-05-28 20:11:50 +00:00
ReceiptEvent ::ReturnedOutOfBand = > {
2022-07-10 21:36:50 +00:00
bail! ( " reverse connect receipt should be returned in-band " ) ;
2022-05-28 14:07:57 +00:00
}
2022-05-28 20:11:50 +00:00
ReceiptEvent ::ReturnedInBand { inbound_noderef } = > inbound_noderef ,
2022-04-16 15:18:54 +00:00
ReceiptEvent ::Expired = > {
2022-07-20 13:39:38 +00:00
//bail!("reverse connect receipt expired from {:?}", target_nr);
return Ok ( NetworkResult ::timeout ( ) ) ;
2022-04-16 15:18:54 +00:00
}
ReceiptEvent ::Cancelled = > {
2022-07-10 21:36:50 +00:00
bail! ( " reverse connect receipt cancelled from {:?} " , target_nr ) ;
2022-04-16 15:18:54 +00:00
}
} ;
2022-04-17 17:28:39 +00:00
// We expect the inbound noderef to be the same as the target noderef
// if they aren't the same, we should error on this and figure out what then hell is up
if target_nr ! = inbound_nr {
2022-07-10 21:36:50 +00:00
bail! ( " unexpected noderef mismatch on reverse connect " ) ;
2022-04-17 17:28:39 +00:00
}
2022-04-16 15:18:54 +00:00
// And now use the existing connection to send over
2022-04-17 23:10:10 +00:00
if let Some ( descriptor ) = inbound_nr . last_connection ( ) . await {
2022-04-16 15:18:54 +00:00
match self
. net ( )
. send_data_to_existing_connection ( descriptor , data )
2022-06-15 18:05:04 +00:00
. await ?
2022-04-16 15:18:54 +00:00
{
2022-07-20 13:39:38 +00:00
None = > Ok ( NetworkResult ::value ( ( ) ) ) ,
Some ( _ ) = > Ok ( NetworkResult ::no_connection_other (
" unable to send over reverse connection " ,
) ) ,
2022-04-16 15:18:54 +00:00
}
2022-04-17 17:28:39 +00:00
} else {
2022-07-10 21:36:50 +00:00
bail! ( " no reverse connection available " )
2022-04-16 15:18:54 +00:00
}
}
// Send a hole punch signal and do a negotiating ping and wait for the return receipt
// Then send the data across the new connection
2022-06-10 21:07:10 +00:00
#[ instrument(level = " trace " , skip(self, data), err) ]
2022-04-17 17:28:39 +00:00
pub async fn do_hole_punch (
& self ,
relay_nr : NodeRef ,
target_nr : NodeRef ,
data : Vec < u8 > ,
2022-07-20 13:39:38 +00:00
) -> EyreResult < NetworkResult < ( ) > > {
2022-04-21 00:49:16 +00:00
// Ensure we are filtered down to UDP (the only hole punch protocol supported today)
assert! ( target_nr
. filter_ref ( )
2022-08-02 01:06:31 +00:00
. map ( | dif | dif . protocol_set = = ProtocolTypeSet ::only ( ProtocolType ::UDP ) )
2022-04-21 00:49:16 +00:00
. unwrap_or_default ( ) ) ;
2022-04-17 17:28:39 +00:00
// Build a return receipt for the signal
2022-07-06 18:12:28 +00:00
let receipt_timeout = ms_to_us ( self . config . get ( ) . network . hole_punch_receipt_time_ms ) ;
2022-07-10 21:36:50 +00:00
let ( receipt , eventual_value ) = self . generate_single_shot_receipt ( receipt_timeout , [ ] ) ? ;
2022-04-17 17:28:39 +00:00
// Get our peer info
let peer_info = self . routing_table ( ) . get_own_peer_info ( ) ;
// Get the udp direct dialinfo for the hole punch
2022-04-25 00:16:13 +00:00
let hole_punch_did = target_nr
. first_filtered_dial_info_detail ( Some ( RoutingDomain ::PublicInternet ) )
2022-07-10 21:36:50 +00:00
. ok_or_else ( | | eyre! ( " No hole punch capable dialinfo found for node " ) ) ? ;
2022-04-17 17:28:39 +00:00
// Do our half of the hole punch by sending an empty packet
// Both sides will do this and then the receipt will get sent over the punched hole
2022-07-20 13:39:38 +00:00
network_result_try! (
self . net ( )
. send_data_to_dial_info ( hole_punch_did . dial_info , Vec ::new ( ) )
. await ?
) ;
2022-04-17 17:28:39 +00:00
// Issue the signal
let rpc = self . rpc_processor ( ) ;
2022-07-20 13:39:38 +00:00
network_result_try! ( rpc
. rpc_call_signal (
Destination ::Relay ( relay_nr . clone ( ) , target_nr . node_id ( ) ) ,
None ,
SignalInfo ::HolePunch { receipt , peer_info } ,
)
. await
. wrap_err ( " failed to send signal " ) ? ) ;
2022-04-17 17:28:39 +00:00
// Wait for the return receipt
2022-05-24 21:13:52 +00:00
let inbound_nr = match eventual_value . await . take_value ( ) . unwrap ( ) {
2022-05-28 20:11:50 +00:00
ReceiptEvent ::ReturnedOutOfBand = > {
2022-07-10 21:36:50 +00:00
bail! ( " hole punch receipt should be returned in-band " ) ;
2022-05-28 14:07:57 +00:00
}
2022-05-28 20:11:50 +00:00
ReceiptEvent ::ReturnedInBand { inbound_noderef } = > inbound_noderef ,
2022-04-17 17:28:39 +00:00
ReceiptEvent ::Expired = > {
2022-07-10 21:36:50 +00:00
bail! ( " hole punch receipt expired from {} " , target_nr ) ;
2022-04-17 17:28:39 +00:00
}
ReceiptEvent ::Cancelled = > {
2022-07-10 21:36:50 +00:00
bail! ( " hole punch receipt cancelled from {} " , target_nr ) ;
2022-04-17 17:28:39 +00:00
}
} ;
// We expect the inbound noderef to be the same as the target noderef
// if they aren't the same, we should error on this and figure out what then hell is up
if target_nr ! = inbound_nr {
2022-07-10 21:36:50 +00:00
bail! (
2022-05-28 14:07:57 +00:00
" unexpected noderef mismatch on hole punch {}, expected {} " ,
2022-07-10 21:36:50 +00:00
inbound_nr ,
target_nr
) ;
2022-04-17 17:28:39 +00:00
}
// And now use the existing connection to send over
2022-04-17 23:10:10 +00:00
if let Some ( descriptor ) = inbound_nr . last_connection ( ) . await {
2022-04-17 17:28:39 +00:00
match self
. net ( )
. send_data_to_existing_connection ( descriptor , data )
2022-06-15 18:05:04 +00:00
. await ?
2022-04-17 17:28:39 +00:00
{
2022-07-20 13:39:38 +00:00
None = > Ok ( NetworkResult ::value ( ( ) ) ) ,
Some ( _ ) = > Ok ( NetworkResult ::no_connection_other (
" unable to send over hole punch " ,
) ) ,
2022-04-17 17:28:39 +00:00
}
2022-04-16 15:18:54 +00:00
} else {
2022-07-10 21:36:50 +00:00
bail! ( " no hole punch available " )
2022-04-16 15:18:54 +00:00
}
}
// Send raw data to a node
2022-04-17 17:28:39 +00:00
//
2022-04-16 15:18:54 +00:00
// We may not have dial info for a node, but have an existing connection for it
// because an inbound connection happened first, and no FindNodeQ has happened to that
// node yet to discover its dial info. The existing connection should be tried first
// in this case.
//
// Sending to a node requires determining a NetworkClass compatible mechanism
//
2022-04-17 17:28:39 +00:00
pub fn send_data (
& self ,
node_ref : NodeRef ,
data : Vec < u8 > ,
2022-07-20 13:39:38 +00:00
) -> SendPinBoxFuture < EyreResult < NetworkResult < SendDataKind > > > {
2022-04-16 15:18:54 +00:00
let this = self . clone ( ) ;
Box ::pin ( async move {
// First try to send data to the last socket we've seen this peer on
2022-04-17 23:10:10 +00:00
let data = if let Some ( descriptor ) = node_ref . last_connection ( ) . await {
2022-04-16 15:18:54 +00:00
match this
. net ( )
. send_data_to_existing_connection ( descriptor , data )
2022-06-15 18:05:04 +00:00
. await ?
2022-04-16 15:18:54 +00:00
{
None = > {
2022-08-02 01:06:31 +00:00
return Ok (
if descriptor . matches_peer_scope ( PeerScopeSet ::only ( PeerScope ::Local ) ) {
NetworkResult ::value ( SendDataKind ::LocalDirect )
} else {
NetworkResult ::value ( SendDataKind ::GlobalDirect )
} ,
) ;
2022-04-16 15:18:54 +00:00
}
Some ( d ) = > d ,
}
} else {
data
} ;
// If we don't have last_connection, try to reach out to the peer via its dial info
2022-07-20 13:39:38 +00:00
let contact_method = this . get_contact_method ( node_ref . clone ( ) ) ;
log_net! (
" send_data via {:?} to dialinfo {:?} " ,
contact_method ,
node_ref
) ;
match contact_method {
2022-04-17 17:28:39 +00:00
ContactMethod ::OutboundRelay ( relay_nr ) | ContactMethod ::InboundRelay ( relay_nr ) = > {
2022-07-20 13:39:38 +00:00
network_result_try! ( this . send_data ( relay_nr , data ) . await ? ) ;
Ok ( NetworkResult ::value ( SendDataKind ::GlobalIndirect ) )
2022-04-17 17:28:39 +00:00
}
2022-04-25 15:29:02 +00:00
ContactMethod ::Direct ( dial_info ) = > {
let send_data_kind = if dial_info . is_local ( ) {
SendDataKind ::LocalDirect
} else {
SendDataKind ::GlobalDirect
} ;
2022-07-20 13:39:38 +00:00
network_result_try! ( this . net ( ) . send_data_to_dial_info ( dial_info , data ) . await ? ) ;
Ok ( NetworkResult ::value ( send_data_kind ) )
2022-04-25 15:29:02 +00:00
}
2022-07-20 13:39:38 +00:00
ContactMethod ::SignalReverse ( relay_nr , target_node_ref ) = > {
network_result_try! (
this . do_reverse_connect ( relay_nr , target_node_ref , data )
. await ?
) ;
Ok ( NetworkResult ::value ( SendDataKind ::GlobalDirect ) )
}
ContactMethod ::SignalHolePunch ( relay_nr , target_node_ref ) = > {
network_result_try! ( this . do_hole_punch ( relay_nr , target_node_ref , data ) . await ? ) ;
Ok ( NetworkResult ::value ( SendDataKind ::GlobalDirect ) )
}
ContactMethod ::Unreachable = > Ok ( NetworkResult ::no_connection_other (
" Can't send to this node " ,
) ) ,
2022-04-16 15:18:54 +00:00
}
} )
}
2022-06-25 14:57:33 +00:00
// Direct bootstrap request handler (separate fallback mechanism from cheaper TXT bootstrap mechanism)
2022-07-20 13:39:38 +00:00
#[ instrument(level = " trace " , skip(self), ret, err) ]
async fn handle_boot_request (
& self ,
descriptor : ConnectionDescriptor ,
) -> EyreResult < NetworkResult < ( ) > > {
2022-06-25 14:57:33 +00:00
let routing_table = self . routing_table ( ) ;
// Get a bunch of nodes with the various
let bootstrap_nodes = routing_table . find_bootstrap_nodes_filtered ( 2 ) ;
// Serialize out peer info
let bootstrap_peerinfo : Vec < PeerInfo > = bootstrap_nodes
. iter ( )
. filter_map ( | b | b . peer_info ( ) )
. collect ( ) ;
let json_bytes = serialize_json ( bootstrap_peerinfo ) . as_bytes ( ) . to_vec ( ) ;
// Reply with a chunk of signed routing table
match self
. net ( )
. send_data_to_existing_connection ( descriptor , json_bytes )
. await ?
{
None = > {
// Bootstrap reply was sent
2022-07-20 13:39:38 +00:00
Ok ( NetworkResult ::value ( ( ) ) )
2022-06-25 14:57:33 +00:00
}
2022-07-20 13:39:38 +00:00
Some ( _ ) = > Ok ( NetworkResult ::no_connection_other (
" bootstrap reply could not be sent " ,
) ) ,
2022-06-25 14:57:33 +00:00
}
}
2022-06-25 19:28:27 +00:00
// Direct bootstrap request
2022-07-10 21:36:50 +00:00
#[ instrument(level = " trace " , err, skip(self)) ]
pub async fn boot_request ( & self , dial_info : DialInfo ) -> EyreResult < Vec < PeerInfo > > {
2022-06-25 19:28:27 +00:00
let timeout_ms = {
let c = self . config . get ( ) ;
c . network . rpc . timeout_ms
} ;
// Send boot magic to requested peer address
let data = BOOT_MAGIC . to_vec ( ) ;
2022-07-13 13:51:56 +00:00
let out_data : Vec < u8 > = match self
2022-06-25 19:28:27 +00:00
. net ( )
. send_recv_data_unbound_to_dial_info ( dial_info , data , timeout_ms )
2022-07-13 13:51:56 +00:00
. await ?
{
2022-07-14 20:57:34 +00:00
NetworkResult ::Value ( v ) = > v ,
_ = > return Ok ( Vec ::new ( ) ) ,
2022-07-13 13:51:56 +00:00
} ;
2022-06-25 19:28:27 +00:00
let bootstrap_peerinfo : Vec < PeerInfo > =
2022-07-10 21:36:50 +00:00
deserialize_json ( std ::str ::from_utf8 ( & out_data ) . wrap_err ( " bad utf8 in boot peerinfo " ) ? )
. wrap_err ( " failed to deserialize boot peerinfo " ) ? ;
2022-06-25 19:28:27 +00:00
Ok ( bootstrap_peerinfo )
}
2021-11-22 16:28:30 +00:00
// Called when a packet potentially containing an RPC envelope is received by a low-level
// network protocol handler. Processes the envelope, authenticates and decrypts the RPC message
// and passes it to the RPC handler
2022-06-05 00:18:26 +00:00
async fn on_recv_envelope (
2021-11-22 16:28:30 +00:00
& self ,
data : & [ u8 ] ,
2022-01-04 14:53:30 +00:00
descriptor : ConnectionDescriptor ,
2022-07-10 21:36:50 +00:00
) -> EyreResult < bool > {
2022-07-20 13:39:38 +00:00
let root = span! (
parent : None ,
Level ::TRACE ,
" on_recv_envelope " ,
" data.len " = data . len ( ) ,
" descriptor " = ? descriptor
) ;
let _root_enter = root . enter ( ) ;
2022-01-05 17:01:02 +00:00
log_net! (
" envelope of {} bytes received from {:?} " ,
data . len ( ) ,
descriptor
) ;
2022-03-20 14:52:03 +00:00
// Network accounting
2022-06-05 17:23:18 +00:00
self . stats_packet_rcvd ( descriptor . remote_address ( ) . to_ip_addr ( ) , data . len ( ) as u64 ) ;
2022-03-20 14:52:03 +00:00
2022-07-06 18:12:28 +00:00
// If this is a zero length packet, just drop it, because these are used for hole punching
// and possibly other low-level network connectivity tasks and will never require
// more processing or forwarding
if data . len ( ) = = 0 {
return Ok ( true ) ;
}
2022-05-28 14:07:57 +00:00
// Ensure we can read the magic number
if data . len ( ) < 4 {
2022-07-20 13:39:38 +00:00
log_net! ( debug " short packet " . green ( ) ) ;
return Ok ( false ) ;
2022-05-28 14:07:57 +00:00
}
2022-06-25 14:57:33 +00:00
// Is this a direct bootstrap request instead of an envelope?
if data [ 0 .. 4 ] = = * BOOT_MAGIC {
2022-07-20 13:39:38 +00:00
network_result_value_or_log! ( debug self . handle_boot_request ( descriptor ) . await ? = > { } ) ;
2022-06-25 14:57:33 +00:00
return Ok ( true ) ;
}
2021-11-22 16:28:30 +00:00
// Is this an out-of-band receipt instead of an envelope?
if data [ 0 .. 4 ] = = * RECEIPT_MAGIC {
2022-07-20 13:39:38 +00:00
network_result_value_or_log! ( debug self . handle_out_of_band_receipt ( data ) . await = > { } ) ;
2021-11-22 16:28:30 +00:00
return Ok ( true ) ;
}
2022-04-03 16:58:06 +00:00
// Decode envelope header (may fail signature validation)
2022-07-10 21:36:50 +00:00
let envelope = Envelope ::from_signed_data ( data ) . wrap_err ( " envelope failed to decode " ) ? ;
2021-11-22 16:28:30 +00:00
// Get routing table and rpc processor
2022-04-03 16:58:06 +00:00
let ( routing_table , rpc ) = {
2021-11-22 16:28:30 +00:00
let inner = self . inner . lock ( ) ;
(
inner . routing_table . as_ref ( ) . unwrap ( ) . clone ( ) ,
inner . components . as_ref ( ) . unwrap ( ) . rpc_processor . clone ( ) ,
)
} ;
// Get timestamp range
let ( tsbehind , tsahead ) = {
let c = self . config . get ( ) ;
(
2022-01-27 14:53:01 +00:00
c . network . rpc . max_timestamp_behind_ms . map ( ms_to_us ) ,
c . network . rpc . max_timestamp_ahead_ms . map ( ms_to_us ) ,
2021-11-22 16:28:30 +00:00
)
} ;
// Validate timestamp isn't too old
let ts = intf ::get_timestamp ( ) ;
let ets = envelope . get_timestamp ( ) ;
if let Some ( tsbehind ) = tsbehind {
if tsbehind > 0 & & ( ts > ets & & ts - ets > tsbehind ) {
2022-07-10 21:36:50 +00:00
bail! (
2021-11-22 16:28:30 +00:00
" envelope time was too far in the past: {}ms " ,
timestamp_to_secs ( ts - ets ) * 1000 f64
2022-07-10 21:36:50 +00:00
) ;
2021-11-22 16:28:30 +00:00
}
}
if let Some ( tsahead ) = tsahead {
if tsahead > 0 & & ( ts < ets & & ets - ts > tsahead ) {
2022-07-10 21:36:50 +00:00
bail! (
2021-11-22 16:28:30 +00:00
" envelope time was too far in the future: {}ms " ,
timestamp_to_secs ( ets - ts ) * 1000 f64
2022-07-10 21:36:50 +00:00
) ;
2021-11-22 16:28:30 +00:00
}
}
2022-04-03 16:58:06 +00:00
// Peek at header and see if we need to relay this
// If the recipient id is not our node id, then it needs relaying
let sender_id = envelope . get_sender_id ( ) ;
let recipient_id = envelope . get_recipient_id ( ) ;
if recipient_id ! = routing_table . node_id ( ) {
// See if the source node is allowed to resolve nodes
// This is a costly operation, so only outbound-relay permitted
// nodes are allowed to do this, for example PWA users
2022-07-20 13:39:38 +00:00
let some_relay_nr = if self . check_client_whitelist ( sender_id ) {
2022-04-16 15:18:54 +00:00
// Full relay allowed, do a full resolve_node
2022-07-10 21:36:50 +00:00
rpc . resolve_node ( recipient_id ) . await . wrap_err (
" failed to resolve recipient node for relay, dropping outbound relayed packet " ,
) ?
2022-04-03 16:58:06 +00:00
} else {
// If this is not a node in the client whitelist, only allow inbound relay
// which only performs a lightweight lookup before passing the packet back out
// See if we have the node in our routing table
// We should, because relays are chosen by nodes that have established connectivity and
// should be mutually in each others routing tables. The node needing the relay will be
// pinging this node regularly to keep itself in the routing table
2022-07-20 13:39:38 +00:00
routing_table . lookup_node_ref ( recipient_id )
2022-04-03 16:58:06 +00:00
} ;
2022-07-20 13:39:38 +00:00
if let Some ( relay_nr ) = some_relay_nr {
// Relay the packet to the desired destination
log_net! ( " relaying {} bytes to {} " , data . len ( ) , relay_nr ) ;
network_result_value_or_log! ( debug self . send_data ( relay_nr , data . to_vec ( ) )
. await
. wrap_err ( " failed to forward envelope " ) ? = > {
return Ok ( false ) ;
}
) ;
}
2022-04-03 16:58:06 +00:00
// Inform caller that we dealt with the envelope, but did not process it locally
return Ok ( false ) ;
}
// DH to get decryption key (cached)
let node_id_secret = routing_table . node_id_secret ( ) ;
// Decrypt the envelope body
// xxx: punish nodes that send messages that fail to decrypt eventually
let body = envelope
. decrypt_body ( self . crypto ( ) , data , & node_id_secret )
2022-07-10 21:36:50 +00:00
. wrap_err ( " failed to decrypt envelope body " ) ? ;
2022-04-03 16:58:06 +00:00
2021-11-22 16:28:30 +00:00
// Cache the envelope information in the routing table
2022-07-20 13:39:38 +00:00
let source_noderef = match routing_table . register_node_with_existing_connection (
2022-07-10 21:36:50 +00:00
envelope . get_sender_id ( ) ,
descriptor ,
ts ,
2022-07-20 13:39:38 +00:00
) {
None = > {
// If the node couldn't be registered just skip this envelope,
// the error will have already been logged
return Ok ( false ) ;
}
Some ( v ) = > v ,
} ;
2022-06-25 14:57:33 +00:00
source_noderef . operate_mut ( | e | e . set_min_max_version ( envelope . get_min_max_version ( ) ) ) ;
2021-11-22 16:28:30 +00:00
// xxx: deal with spoofing and flooding here?
// Pass message to RPC system
2022-07-10 21:36:50 +00:00
rpc . enqueue_message ( envelope , body , source_noderef ) ? ;
2021-11-22 16:28:30 +00:00
// Inform caller that we dealt with the envelope locally
Ok ( true )
}
2022-03-19 22:19:40 +00:00
// Callbacks from low level network for statistics gathering
2022-03-20 14:52:03 +00:00
pub fn stats_packet_sent ( & self , addr : IpAddr , bytes : u64 ) {
2022-03-19 22:19:40 +00:00
let inner = & mut * self . inner . lock ( ) ;
inner
. stats
. self_stats
. transfer_stats_accounting
. add_up ( bytes ) ;
inner
. stats
. per_address_stats
2022-03-20 14:52:03 +00:00
. entry ( PerAddressStatsKey ( addr ) )
. or_insert ( PerAddressStats ::default ( ) )
2022-03-19 22:19:40 +00:00
. transfer_stats_accounting
. add_up ( bytes ) ;
}
2022-03-20 14:52:03 +00:00
pub fn stats_packet_rcvd ( & self , addr : IpAddr , bytes : u64 ) {
2022-03-19 22:19:40 +00:00
let inner = & mut * self . inner . lock ( ) ;
inner
. stats
. self_stats
. transfer_stats_accounting
. add_down ( bytes ) ;
inner
. stats
. per_address_stats
2022-03-20 14:52:03 +00:00
. entry ( PerAddressStatsKey ( addr ) )
. or_insert ( PerAddressStats ::default ( ) )
2022-03-19 22:19:40 +00:00
. transfer_stats_accounting
. add_down ( bytes ) ;
}
2022-04-23 01:30:09 +00:00
2022-05-16 15:52:48 +00:00
// Get stats
pub fn get_stats ( & self ) -> NetworkManagerStats {
let inner = self . inner . lock ( ) ;
inner . stats . clone ( )
}
fn get_veilid_state_inner ( inner : & NetworkManagerInner ) -> VeilidStateNetwork {
2022-05-17 20:55:53 +00:00
if inner . components . is_some ( ) & & inner . components . as_ref ( ) . unwrap ( ) . net . is_started ( ) {
2022-05-16 15:52:48 +00:00
VeilidStateNetwork {
started : true ,
bps_down : inner . stats . self_stats . transfer_stats . down . average ,
bps_up : inner . stats . self_stats . transfer_stats . up . average ,
}
} else {
VeilidStateNetwork {
started : false ,
bps_down : 0 ,
bps_up : 0 ,
}
}
}
pub fn get_veilid_state ( & self ) -> VeilidStateNetwork {
let inner = self . inner . lock ( ) ;
Self ::get_veilid_state_inner ( & * inner )
}
fn send_network_update ( & self ) {
let ( update_cb , state ) = {
let inner = self . inner . lock ( ) ;
let update_cb = inner . update_callback . clone ( ) ;
if update_cb . is_none ( ) {
return ;
}
let state = Self ::get_veilid_state_inner ( & * inner ) ;
( update_cb . unwrap ( ) , state )
} ;
update_cb ( VeilidUpdate ::Network ( state ) ) ;
}
2022-04-23 01:30:09 +00:00
// Determine if a local IP address has changed
// this means we should restart the low level network and and recreate all of our dial info
// Wait until we have received confirmation from N different peers
pub async fn report_local_socket_address (
& self ,
_socket_address : SocketAddress ,
_reporting_peer : NodeRef ,
) {
// XXX: Nothing here yet.
}
// Determine if a global IP address has changed
// this means we should recreate our public dial info if it is not static and rediscover it
// Wait until we have received confirmation from N different peers
pub async fn report_global_socket_address (
& self ,
socket_address : SocketAddress ,
reporting_peer : NodeRef ,
) {
2022-04-26 13:16:48 +00:00
let ( net , routing_table ) = {
let mut inner = self . inner . lock ( ) ;
2022-04-23 01:30:09 +00:00
2022-04-26 13:16:48 +00:00
// Store the reported address
inner
. public_address_check_cache
. insert ( reporting_peer . node_id ( ) , socket_address ) ;
2022-04-25 00:16:13 +00:00
2022-04-26 13:16:48 +00:00
let net = inner . components . as_ref ( ) . unwrap ( ) . net . clone ( ) ;
let routing_table = inner . routing_table . as_ref ( ) . unwrap ( ) . clone ( ) ;
( net , routing_table )
} ;
2022-04-25 00:16:13 +00:00
let network_class = net . get_network_class ( ) . unwrap_or ( NetworkClass ::Invalid ) ;
2022-04-23 01:30:09 +00:00
2022-04-26 13:16:48 +00:00
// Determine if our external address has likely changed
let needs_public_address_detection =
if matches! ( network_class , NetworkClass ::InboundCapable ) {
// Get current external ip/port from registered global dialinfo
let current_addresses : BTreeSet < SocketAddress > = routing_table
. all_filtered_dial_info_details (
Some ( RoutingDomain ::PublicInternet ) ,
& DialInfoFilter ::all ( ) ,
)
. iter ( )
. map ( | did | did . dial_info . socket_address ( ) )
. collect ( ) ;
// If we are inbound capable, but start to see inconsistent socket addresses from multiple reporting peers
// then we zap the network class and re-detect it
let inner = self . inner . lock ( ) ;
let mut inconsistencies = 0 ;
let mut changed = false ;
2022-05-01 15:01:29 +00:00
// Iteration goes from most recent to least recent node/address pair
for ( _ , a ) in & inner . public_address_check_cache {
2022-04-26 13:16:48 +00:00
if ! current_addresses . contains ( a ) {
inconsistencies + = 1 ;
if inconsistencies > = GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT {
changed = true ;
break ;
}
}
}
changed
} else {
// If we are currently outbound only, we don't have any public dial info
// but if we are starting to see consistent socket address from multiple reporting peers
// then we may be become inbound capable, so zap the network class so we can re-detect it and any public dial info
let inner = self . inner . lock ( ) ;
let mut consistencies = 0 ;
let mut consistent = false ;
let mut current_address = Option ::< SocketAddress > ::None ;
2022-05-01 15:01:29 +00:00
// Iteration goes from most recent to least recent node/address pair
for ( _ , a ) in & inner . public_address_check_cache {
2022-04-26 13:16:48 +00:00
if let Some ( current_address ) = current_address {
if current_address = = * a {
consistencies + = 1 ;
if consistencies > = GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT {
consistent = true ;
break ;
}
}
} else {
current_address = Some ( * a ) ;
}
}
consistent
} ;
if needs_public_address_detection {
// Reset the address check cache now so we can start detecting fresh
let mut inner = self . inner . lock ( ) ;
inner . public_address_check_cache . clear ( ) ;
2022-04-23 01:30:09 +00:00
2022-04-26 13:16:48 +00:00
// Reset the network class and dial info so we can re-detect it
routing_table . clear_dial_info_details ( RoutingDomain ::PublicInternet ) ;
2022-04-25 00:16:13 +00:00
net . reset_network_class ( ) ;
2022-04-23 01:30:09 +00:00
}
}
2022-07-22 17:05:28 +00:00
// Inform routing table entries that our dial info has changed
pub async fn send_node_info_updates ( & self , all : bool ) {
let this = self . clone ( ) ;
// Run in background only once
let _ = self
. clone ( )
. unlocked_inner
. node_info_update_single_future
. single_spawn ( async move {
// Only update if we actually have a valid network class
if matches! (
this . get_network_class ( ) . unwrap_or ( NetworkClass ::Invalid ) ,
NetworkClass ::Invalid
) {
trace! (
" not sending node info update because our network class is not yet valid "
) ;
return ;
}
// Get the list of refs to all nodes to update
let cur_ts = intf ::get_timestamp ( ) ;
let node_refs = this . routing_table ( ) . get_nodes_needing_updates ( cur_ts , all ) ;
// Send the updates
log_net! ( debug " Sending node info updates to {} nodes " , node_refs . len ( ) ) ;
let mut unord = FuturesUnordered ::new ( ) ;
for nr in node_refs {
let rpc = this . rpc_processor ( ) ;
unord . push ( async move {
// Update the node
if let Err ( e ) = rpc
. rpc_call_node_info_update ( Destination ::Direct ( nr . clone ( ) ) , None )
. await
{
// Not fatal, but we should be able to see if this is happening
trace! ( " failed to send node info update to {:?}: {} " , nr , e ) ;
return ;
}
// Mark the node as updated
nr . set_seen_our_node_info ( ) ;
} ) ;
}
// Wait for futures to complete
while unord . next ( ) . await . is_some ( ) { }
log_rtab! ( debug " Finished sending node updates " ) ;
} )
. await ;
}
2021-11-22 16:28:30 +00:00
}