2021-11-22 16:28:30 +00:00
mod bucket ;
mod bucket_entry ;
2021-12-14 14:48:33 +00:00
mod debug ;
2021-11-22 16:28:30 +00:00
mod find_nodes ;
mod node_ref ;
2022-09-04 18:17:28 +00:00
mod routing_domain_editor ;
2022-09-03 17:57:25 +00:00
mod routing_domains ;
2021-11-26 14:54:38 +00:00
mod stats_accounting ;
2022-06-25 14:57:33 +00:00
mod tasks ;
2021-11-22 16:28:30 +00:00
use crate ::dht ::* ;
use crate ::network_manager ::* ;
use crate ::rpc_processor ::* ;
use crate ::xx ::* ;
use crate ::* ;
2021-11-26 14:54:38 +00:00
use bucket ::* ;
pub use bucket_entry ::* ;
2021-12-14 14:48:33 +00:00
pub use debug ::* ;
2021-11-26 14:54:38 +00:00
pub use find_nodes ::* ;
2022-08-31 01:21:16 +00:00
use hashlink ::LruCache ;
2021-11-26 14:54:38 +00:00
pub use node_ref ::* ;
2022-09-04 18:17:28 +00:00
pub use routing_domain_editor ::* ;
2022-09-03 17:57:25 +00:00
pub use routing_domains ::* ;
2021-11-26 14:54:38 +00:00
pub use stats_accounting ::* ;
2021-11-22 16:28:30 +00:00
2022-08-31 01:21:16 +00:00
const RECENT_PEERS_TABLE_SIZE : usize = 64 ;
2021-11-22 16:28:30 +00:00
//////////////////////////////////////////////////////////////////////////
2022-08-31 01:21:16 +00:00
#[ derive(Debug, Clone, Copy) ]
pub struct RecentPeersEntry {
2022-09-06 20:49:43 +00:00
pub last_connection : ConnectionDescriptor ,
2022-08-31 01:21:16 +00:00
}
2022-09-03 17:57:25 +00:00
/// RoutingTable rwlock-internal data
2021-11-22 16:28:30 +00:00
struct RoutingTableInner {
2022-09-23 15:23:33 +00:00
/// Routing table buckets that hold entries
buckets : Vec < Bucket > ,
/// A fast counter for the number of entries in the table, total
bucket_entry_count : usize ,
/// The public internet routing domain
public_internet_routing_domain : PublicInternetRoutingDomainDetail ,
/// The dial info we use on the local network
local_network_routing_domain : LocalInternetRoutingDomainDetail ,
/// Interim accounting mechanism for this node's RPC latency to any other node
self_latency_stats_accounting : LatencyStatsAccounting ,
/// Interim accounting mechanism for the total bandwidth to/from this node
self_transfer_stats_accounting : TransferStatsAccounting ,
/// Statistics about the total bandwidth to/from this node
self_transfer_stats : TransferStatsDownUp ,
/// Peers we have recently communicated with
recent_peers : LruCache < DHTKey , RecentPeersEntry > ,
2021-11-22 16:28:30 +00:00
}
2022-03-24 14:14:50 +00:00
#[ derive(Clone, Debug, Default) ]
pub struct RoutingTableHealth {
2022-09-23 15:23:33 +00:00
/// Number of reliable (responsive) entries in the routing table
2022-03-24 14:14:50 +00:00
pub reliable_entry_count : usize ,
2022-09-23 15:23:33 +00:00
/// Number of unreliable (occasionally unresponsive) entries in the routing table
2022-03-24 14:14:50 +00:00
pub unreliable_entry_count : usize ,
2022-09-23 15:23:33 +00:00
/// Number of dead (always unresponsive) entries in the routing table
2022-03-24 14:14:50 +00:00
pub dead_entry_count : usize ,
}
2021-11-22 16:28:30 +00:00
struct RoutingTableUnlockedInner {
2022-09-23 15:23:33 +00:00
// Accessors
config : VeilidConfig ,
2022-09-08 01:52:08 +00:00
network_manager : NetworkManager ,
2022-09-23 15:23:33 +00:00
/// The current node's public DHT key
node_id : DHTKey ,
/// The current node's DHT key secret
node_id_secret : DHTKeySecret ,
/// Buckets to kick on our next kick task
kick_queue : Mutex < BTreeSet < usize > > ,
/// Background process for computing statistics
2022-07-10 21:36:50 +00:00
rolling_transfers_task : TickTask < EyreReport > ,
2022-09-23 15:23:33 +00:00
/// Backgroup process to purge dead routing table entries when necessary
2022-07-10 21:36:50 +00:00
kick_buckets_task : TickTask < EyreReport > ,
2021-11-22 16:28:30 +00:00
}
#[ derive(Clone) ]
pub struct RoutingTable {
2022-06-25 14:57:33 +00:00
inner : Arc < RwLock < RoutingTableInner > > ,
2021-11-22 16:28:30 +00:00
unlocked_inner : Arc < RoutingTableUnlockedInner > ,
}
impl RoutingTable {
2022-09-08 01:52:08 +00:00
fn new_inner ( ) -> RoutingTableInner {
2021-11-22 16:28:30 +00:00
RoutingTableInner {
buckets : Vec ::new ( ) ,
2022-09-03 17:57:25 +00:00
public_internet_routing_domain : PublicInternetRoutingDomainDetail ::default ( ) ,
local_network_routing_domain : LocalInternetRoutingDomainDetail ::default ( ) ,
2021-11-22 16:28:30 +00:00
bucket_entry_count : 0 ,
2022-03-19 22:19:40 +00:00
self_latency_stats_accounting : LatencyStatsAccounting ::new ( ) ,
self_transfer_stats_accounting : TransferStatsAccounting ::new ( ) ,
self_transfer_stats : TransferStatsDownUp ::default ( ) ,
2022-08-31 01:21:16 +00:00
recent_peers : LruCache ::new ( RECENT_PEERS_TABLE_SIZE ) ,
2021-11-22 16:28:30 +00:00
}
}
2022-09-08 01:52:08 +00:00
fn new_unlocked_inner (
2022-09-23 15:23:33 +00:00
config : VeilidConfig ,
2022-09-08 01:52:08 +00:00
network_manager : NetworkManager ,
) -> RoutingTableUnlockedInner {
2022-09-23 15:23:33 +00:00
let c = config . get ( ) ;
2021-11-22 16:28:30 +00:00
RoutingTableUnlockedInner {
2022-09-23 15:23:33 +00:00
config : config . clone ( ) ,
2022-09-08 01:52:08 +00:00
network_manager ,
2022-09-23 15:23:33 +00:00
node_id : c . network . node_id ,
node_id_secret : c . network . node_id_secret ,
kick_queue : Mutex ::new ( BTreeSet ::default ( ) ) ,
2021-11-26 14:54:38 +00:00
rolling_transfers_task : TickTask ::new ( ROLLING_TRANSFERS_INTERVAL_SECS ) ,
2022-06-25 14:57:33 +00:00
kick_buckets_task : TickTask ::new ( 1 ) ,
2021-11-22 16:28:30 +00:00
}
}
pub fn new ( network_manager : NetworkManager ) -> Self {
let config = network_manager . config ( ) ;
let this = Self {
2022-09-08 01:52:08 +00:00
inner : Arc ::new ( RwLock ::new ( Self ::new_inner ( ) ) ) ,
unlocked_inner : Arc ::new ( Self ::new_unlocked_inner ( config , network_manager ) ) ,
2021-11-22 16:28:30 +00:00
} ;
// Set rolling transfers tick task
{
let this2 = this . clone ( ) ;
this . unlocked_inner
. rolling_transfers_task
2022-06-13 00:58:02 +00:00
. set_routine ( move | s , l , t | {
2022-10-04 17:09:03 +00:00
Box ::pin (
this2
. clone ( )
. rolling_transfers_task_routine ( s , l , t )
. instrument ( trace_span! (
parent : None ,
" RoutingTable rolling transfers task routine "
) ) ,
)
2021-11-22 16:28:30 +00:00
} ) ;
}
2022-07-22 17:05:28 +00:00
2022-06-25 14:57:33 +00:00
// Set kick buckets tick task
{
let this2 = this . clone ( ) ;
this . unlocked_inner
. kick_buckets_task
. set_routine ( move | s , l , t | {
2022-10-04 17:09:03 +00:00
Box ::pin (
this2
. clone ( )
. kick_buckets_task_routine ( s , l , t )
. instrument ( trace_span! ( parent : None , " kick buckets task routine " ) ) ,
)
2022-06-25 14:57:33 +00:00
} ) ;
}
2021-11-22 16:28:30 +00:00
this
}
pub fn network_manager ( & self ) -> NetworkManager {
2022-09-08 01:52:08 +00:00
self . unlocked_inner . network_manager . clone ( )
2021-11-22 16:28:30 +00:00
}
pub fn rpc_processor ( & self ) -> RPCProcessor {
self . network_manager ( ) . rpc_processor ( )
}
pub fn node_id ( & self ) -> DHTKey {
2022-09-23 15:23:33 +00:00
self . unlocked_inner . node_id
2021-11-22 16:28:30 +00:00
}
pub fn node_id_secret ( & self ) -> DHTKeySecret {
2022-09-23 15:23:33 +00:00
self . unlocked_inner . node_id_secret
2021-11-22 16:28:30 +00:00
}
2022-09-04 19:40:35 +00:00
fn routing_domain_for_address_inner (
2022-09-04 18:17:28 +00:00
inner : & RoutingTableInner ,
address : Address ,
) -> Option < RoutingDomain > {
2022-09-03 17:57:25 +00:00
for rd in RoutingDomain ::all ( ) {
let can_contain =
2022-09-04 18:17:28 +00:00
Self ::with_routing_domain ( inner , rd , | rdd | rdd . can_contain_address ( address ) ) ;
2022-09-03 17:57:25 +00:00
if can_contain {
return Some ( rd ) ;
}
}
None
}
2022-09-04 18:17:28 +00:00
pub fn routing_domain_for_address ( & self , address : Address ) -> Option < RoutingDomain > {
let inner = self . inner . read ( ) ;
Self ::routing_domain_for_address_inner ( & * inner , address )
}
2022-04-23 01:30:09 +00:00
fn with_routing_domain < F , R > ( inner : & RoutingTableInner , domain : RoutingDomain , f : F ) -> R
where
2022-09-03 17:57:25 +00:00
F : FnOnce ( & dyn RoutingDomainDetail ) -> R ,
2022-04-23 01:30:09 +00:00
{
match domain {
RoutingDomain ::PublicInternet = > f ( & inner . public_internet_routing_domain ) ,
RoutingDomain ::LocalNetwork = > f ( & inner . local_network_routing_domain ) ,
}
2021-12-24 23:02:53 +00:00
}
2022-04-23 01:30:09 +00:00
fn with_routing_domain_mut < F , R > (
inner : & mut RoutingTableInner ,
domain : RoutingDomain ,
f : F ,
) -> R
where
2022-09-03 17:57:25 +00:00
F : FnOnce ( & mut dyn RoutingDomainDetail ) -> R ,
2022-04-23 01:30:09 +00:00
{
match domain {
RoutingDomain ::PublicInternet = > f ( & mut inner . public_internet_routing_domain ) ,
RoutingDomain ::LocalNetwork = > f ( & mut inner . local_network_routing_domain ) ,
}
2021-11-22 16:28:30 +00:00
}
2022-08-31 01:21:16 +00:00
pub fn relay_node ( & self , domain : RoutingDomain ) -> Option < NodeRef > {
let inner = self . inner . read ( ) ;
2022-09-03 17:57:25 +00:00
Self ::with_routing_domain ( & * inner , domain , | rd | rd . relay_node ( ) )
2022-08-31 01:21:16 +00:00
}
2022-04-23 01:30:09 +00:00
pub fn has_dial_info ( & self , domain : RoutingDomain ) -> bool {
2022-06-25 14:57:33 +00:00
let inner = self . inner . read ( ) ;
2022-09-03 17:57:25 +00:00
Self ::with_routing_domain ( & * inner , domain , | rd | ! rd . dial_info_details ( ) . is_empty ( ) )
2021-12-24 01:34:52 +00:00
}
2022-04-16 15:18:54 +00:00
2022-04-23 01:30:09 +00:00
pub fn dial_info_details ( & self , domain : RoutingDomain ) -> Vec < DialInfoDetail > {
2022-06-25 14:57:33 +00:00
let inner = self . inner . read ( ) ;
2022-09-03 17:57:25 +00:00
Self ::with_routing_domain ( & * inner , domain , | rd | rd . dial_info_details ( ) . clone ( ) )
2021-11-22 16:28:30 +00:00
}
2022-04-23 01:30:09 +00:00
pub fn first_filtered_dial_info_detail (
2022-04-16 15:18:54 +00:00
& self ,
2022-09-03 17:57:25 +00:00
routing_domain_set : RoutingDomainSet ,
2022-04-16 15:18:54 +00:00
filter : & DialInfoFilter ,
) -> Option < DialInfoDetail > {
2022-06-25 14:57:33 +00:00
let inner = self . inner . read ( ) ;
2022-09-03 17:57:25 +00:00
for routing_domain in routing_domain_set {
let did = Self ::with_routing_domain ( & * inner , routing_domain , | rd | {
for did in rd . dial_info_details ( ) {
2022-04-25 15:29:02 +00:00
if did . matches_filter ( filter ) {
return Some ( did . clone ( ) ) ;
}
2022-04-23 01:30:09 +00:00
}
2022-04-25 15:29:02 +00:00
None
2022-09-03 17:57:25 +00:00
} ) ;
if did . is_some ( ) {
return did ;
2022-04-25 15:29:02 +00:00
}
2022-09-03 17:57:25 +00:00
}
None
2022-04-16 15:18:54 +00:00
}
2022-04-23 01:30:09 +00:00
pub fn all_filtered_dial_info_details (
2022-04-16 15:18:54 +00:00
& self ,
2022-09-03 17:57:25 +00:00
routing_domain_set : RoutingDomainSet ,
2022-04-16 15:18:54 +00:00
filter : & DialInfoFilter ,
) -> Vec < DialInfoDetail > {
2022-06-25 14:57:33 +00:00
let inner = self . inner . read ( ) ;
2022-04-24 02:08:02 +00:00
let mut ret = Vec ::new ( ) ;
2022-09-03 17:57:25 +00:00
for routing_domain in routing_domain_set {
Self ::with_routing_domain ( & * inner , routing_domain , | rd | {
for did in rd . dial_info_details ( ) {
2022-04-24 02:08:02 +00:00
if did . matches_filter ( filter ) {
ret . push ( did . clone ( ) ) ;
}
}
} ) ;
}
ret . remove_duplicates ( ) ;
ret
2022-04-16 15:18:54 +00:00
}
2022-08-28 17:13:09 +00:00
pub fn ensure_dial_info_is_valid ( & self , domain : RoutingDomain , dial_info : & DialInfo ) -> bool {
2022-09-03 17:57:25 +00:00
let address = dial_info . socket_address ( ) . address ( ) ;
let inner = self . inner . read ( ) ;
let can_contain_address =
Self ::with_routing_domain ( & * inner , domain , | rd | rd . can_contain_address ( address ) ) ;
2022-04-16 15:18:54 +00:00
2022-09-03 17:57:25 +00:00
if ! can_contain_address {
log_rtab! ( debug " can not add dial info to this routing domain " ) ;
2022-08-28 17:13:09 +00:00
return false ;
2022-04-17 17:28:39 +00:00
}
if ! dial_info . is_valid ( ) {
2022-08-28 17:13:09 +00:00
log_rtab! ( debug
2022-04-26 13:16:48 +00:00
" shouldn't be registering invalid addresses: {:?} " ,
dial_info
2022-07-10 21:36:50 +00:00
) ;
2022-08-28 17:13:09 +00:00
return false ;
}
true
}
2022-09-04 18:17:28 +00:00
pub fn node_info_is_valid_in_routing_domain (
& self ,
routing_domain : RoutingDomain ,
node_info : & NodeInfo ,
) -> bool {
// Should not be passing around nodeinfo with an invalid network class
if matches! ( node_info . network_class , NetworkClass ::Invalid ) {
return false ;
}
// Ensure all of the dial info works in this routing domain
2022-09-04 19:40:35 +00:00
for did in & node_info . dial_info_detail_list {
2022-09-04 18:17:28 +00:00
if ! self . ensure_dial_info_is_valid ( routing_domain , & did . dial_info ) {
return false ;
}
}
// Ensure the relay is also valid in this routing domain if it is provided
2022-09-04 19:40:35 +00:00
if let Some ( relay_peer_info ) = node_info . relay_peer_info . as_ref ( ) {
2022-09-04 18:17:28 +00:00
let relay_ni = & relay_peer_info . signed_node_info . node_info ;
if ! self . node_info_is_valid_in_routing_domain ( routing_domain , relay_ni ) {
return false ;
}
}
true
}
#[ instrument(level = " debug " , skip(self)) ]
pub fn edit_routing_domain ( & self , domain : RoutingDomain ) -> RoutingDomainEditor {
RoutingDomainEditor ::new ( self . clone ( ) , domain )
}
2022-09-03 17:57:25 +00:00
fn reset_all_seen_our_node_info ( inner : & mut RoutingTableInner , routing_domain : RoutingDomain ) {
2022-07-06 01:21:58 +00:00
let cur_ts = intf ::get_timestamp ( ) ;
Self ::with_entries ( & * inner , cur_ts , BucketEntryState ::Dead , | _ , v | {
2022-09-03 17:57:25 +00:00
v . with_mut ( | e | {
e . set_seen_our_node_info ( routing_domain , false ) ;
} ) ;
2022-07-06 01:21:58 +00:00
Option ::< ( ) > ::None
} ) ;
}
2022-08-31 01:21:16 +00:00
fn reset_all_updated_since_last_network_change ( inner : & mut RoutingTableInner ) {
2022-08-27 02:52:08 +00:00
let cur_ts = intf ::get_timestamp ( ) ;
Self ::with_entries ( & * inner , cur_ts , BucketEntryState ::Dead , | _ , v | {
v . with_mut ( | e | e . set_updated_since_last_network_change ( false ) ) ;
Option ::< ( ) > ::None
} ) ;
}
2022-09-03 17:57:25 +00:00
pub fn get_own_peer_info ( & self , routing_domain : RoutingDomain ) -> PeerInfo {
PeerInfo ::new (
NodeId ::new ( self . node_id ( ) ) ,
self . get_own_signed_node_info ( routing_domain ) ,
)
}
pub fn get_own_signed_node_info ( & self , routing_domain : RoutingDomain ) -> SignedNodeInfo {
let node_id = NodeId ::new ( self . node_id ( ) ) ;
let secret = self . node_id_secret ( ) ;
SignedNodeInfo ::with_secret ( self . get_own_node_info ( routing_domain ) , node_id , & secret )
. unwrap ( )
}
pub fn get_own_node_info ( & self , routing_domain : RoutingDomain ) -> NodeInfo {
let netman = self . network_manager ( ) ;
let relay_node = self . relay_node ( routing_domain ) ;
let pc = netman . get_protocol_config ( ) ;
NodeInfo {
network_class : netman
. get_network_class ( routing_domain )
. unwrap_or ( NetworkClass ::Invalid ) ,
outbound_protocols : pc . outbound ,
address_types : pc . family_global ,
min_version : MIN_VERSION ,
max_version : MAX_VERSION ,
dial_info_detail_list : self . dial_info_details ( routing_domain ) ,
2022-09-04 18:17:28 +00:00
relay_peer_info : relay_node
. and_then ( | rn | rn . make_peer_info ( routing_domain ) . map ( Box ::new ) ) ,
2022-07-06 01:21:58 +00:00
}
2021-11-22 16:28:30 +00:00
}
2022-09-03 17:57:25 +00:00
pub fn has_valid_own_node_info ( & self , routing_domain : RoutingDomain ) -> bool {
let netman = self . network_manager ( ) ;
let nc = netman
. get_network_class ( routing_domain )
. unwrap_or ( NetworkClass ::Invalid ) ;
! matches! ( nc , NetworkClass ::Invalid )
}
2021-11-22 16:28:30 +00:00
fn bucket_depth ( index : usize ) -> usize {
match index {
0 = > 256 ,
1 = > 128 ,
2 = > 64 ,
3 = > 32 ,
4 = > 16 ,
5 = > 8 ,
6 = > 4 ,
7 = > 4 ,
8 = > 4 ,
9 = > 4 ,
_ = > 4 ,
}
}
2022-07-10 21:36:50 +00:00
pub async fn init ( & self ) -> EyreResult < ( ) > {
2022-06-25 14:57:33 +00:00
let mut inner = self . inner . write ( ) ;
2021-11-22 16:28:30 +00:00
// Size the buckets (one per bit)
inner . buckets . reserve ( DHT_KEY_LENGTH * 8 ) ;
for _ in 0 .. DHT_KEY_LENGTH * 8 {
let bucket = Bucket ::new ( self . clone ( ) ) ;
inner . buckets . push ( bucket ) ;
}
Ok ( ( ) )
}
pub async fn terminate ( & self ) {
2022-06-15 18:05:04 +00:00
debug! ( " starting routing table terminate " ) ;
2022-05-25 15:12:19 +00:00
// Cancel all tasks being ticked
2022-06-15 18:05:04 +00:00
debug! ( " stopping rolling transfers task " ) ;
2022-06-13 00:58:02 +00:00
if let Err ( e ) = self . unlocked_inner . rolling_transfers_task . stop ( ) . await {
error! ( " rolling_transfers_task not stopped: {} " , e ) ;
2022-05-25 15:12:19 +00:00
}
2022-07-22 17:05:28 +00:00
debug! ( " stopping kick buckets task " ) ;
if let Err ( e ) = self . unlocked_inner . kick_buckets_task . stop ( ) . await {
error! ( " kick_buckets_task not stopped: {} " , e ) ;
2022-05-25 15:12:19 +00:00
}
2022-09-08 01:52:08 +00:00
* self . inner . write ( ) = Self ::new_inner ( ) ;
2022-06-15 18:05:04 +00:00
debug! ( " finished routing table terminate " ) ;
2021-11-22 16:28:30 +00:00
}
2022-09-03 17:57:25 +00:00
pub fn configure_local_network_routing_domain ( & self , local_networks : Vec < ( IpAddr , IpAddr ) > ) {
2022-09-07 14:33:14 +00:00
log_net! ( debug " configure_local_network_routing_domain: {:#?} " , local_networks ) ;
2022-09-03 17:57:25 +00:00
let mut inner = self . inner . write ( ) ;
let changed = inner
. local_network_routing_domain
. set_local_networks ( local_networks ) ;
// If the local network topology has changed, nuke the existing local node info and let new local discovery happen
if changed {
let cur_ts = intf ::get_timestamp ( ) ;
Self ::with_entries ( & * inner , cur_ts , BucketEntryState ::Dead , | _rti , e | {
e . with_mut ( | e | {
e . clear_signed_node_info ( RoutingDomain ::LocalNetwork ) ;
2022-09-04 18:17:28 +00:00
e . set_seen_our_node_info ( RoutingDomain ::LocalNetwork , false ) ;
e . set_updated_since_last_network_change ( false ) ;
2022-09-03 17:57:25 +00:00
} ) ;
Option ::< ( ) > ::None
} ) ;
}
}
2022-03-09 03:32:12 +00:00
// Attempt to empty the routing table
// should only be performed when there are no node_refs (detached)
2022-08-06 00:34:00 +00:00
pub fn purge_buckets ( & self ) {
2022-06-25 14:57:33 +00:00
let mut inner = self . inner . write ( ) ;
2022-03-09 03:32:12 +00:00
log_rtab! (
2022-08-06 00:34:00 +00:00
" Starting routing table buckets purge. Table currently has {} nodes " ,
2022-03-09 03:32:12 +00:00
inner . bucket_entry_count
) ;
for bucket in & mut inner . buckets {
bucket . kick ( 0 ) ;
}
2022-05-24 21:13:52 +00:00
log_rtab! ( debug
2022-08-06 00:34:00 +00:00
" Routing table buckets purge complete. Routing table now has {} nodes " ,
inner . bucket_entry_count
) ;
}
// Attempt to remove last_connections from entries
pub fn purge_last_connections ( & self ) {
let mut inner = self . inner . write ( ) ;
log_rtab! (
" Starting routing table last_connections purge. Table currently has {} nodes " ,
inner . bucket_entry_count
) ;
for bucket in & mut inner . buckets {
for entry in bucket . entries ( ) {
entry . 1. with_mut ( | e | {
2022-08-09 00:42:27 +00:00
e . clear_last_connections ( ) ;
2022-08-06 00:34:00 +00:00
} ) ;
}
}
log_rtab! ( debug
" Routing table last_connections purge complete. Routing table now has {} nodes " ,
2022-03-09 03:32:12 +00:00
inner . bucket_entry_count
) ;
}
2021-11-22 16:28:30 +00:00
// Attempt to settle buckets and remove entries down to the desired number
// which may not be possible due extant NodeRefs
fn kick_bucket ( inner : & mut RoutingTableInner , idx : usize ) {
let bucket = & mut inner . buckets [ idx ] ;
let bucket_depth = Self ::bucket_depth ( idx ) ;
if let Some ( dead_node_ids ) = bucket . kick ( bucket_depth ) {
// Remove counts
inner . bucket_entry_count - = dead_node_ids . len ( ) ;
2022-05-24 21:13:52 +00:00
log_rtab! ( debug " Routing table now has {} nodes " , inner . bucket_entry_count ) ;
2021-11-22 16:28:30 +00:00
// Now purge the routing table inner vectors
//let filter = |k: &DHTKey| dead_node_ids.contains(k);
//inner.closest_reliable_nodes.retain(filter);
//inner.fastest_reliable_nodes.retain(filter);
//inner.closest_nodes.retain(filter);
//inner.fastest_nodes.retain(filter);
}
}
2022-09-23 15:23:33 +00:00
fn find_bucket_index ( & self , node_id : DHTKey ) -> usize {
distance ( & node_id , & self . unlocked_inner . node_id )
2021-11-22 16:28:30 +00:00
. first_nonzero_bit ( )
. unwrap ( )
}
2022-09-03 17:57:25 +00:00
pub fn get_entry_count (
& self ,
routing_domain_set : RoutingDomainSet ,
min_state : BucketEntryState ,
) -> usize {
2022-07-22 17:05:28 +00:00
let inner = self . inner . read ( ) ;
2022-09-03 17:57:25 +00:00
Self ::get_entry_count_inner ( & * inner , routing_domain_set , min_state )
2022-07-22 17:05:28 +00:00
}
2022-09-03 17:57:25 +00:00
fn get_entry_count_inner (
inner : & RoutingTableInner ,
routing_domain_set : RoutingDomainSet ,
min_state : BucketEntryState ,
) -> usize {
2022-05-24 21:13:52 +00:00
let mut count = 0 usize ;
let cur_ts = intf ::get_timestamp ( ) ;
2022-09-03 17:57:25 +00:00
Self ::with_entries ( inner , cur_ts , min_state , | _ , e | {
if e . with ( | e | e . best_routing_domain ( routing_domain_set ) )
. is_some ( )
{
count + = 1 ;
}
2022-05-24 21:13:52 +00:00
Option ::< ( ) > ::None
} ) ;
count
}
2022-06-25 19:28:27 +00:00
fn with_entries < T , F : FnMut ( DHTKey , Arc < BucketEntry > ) -> Option < T > > (
2022-06-25 14:57:33 +00:00
inner : & RoutingTableInner ,
2022-05-24 21:13:52 +00:00
cur_ts : u64 ,
min_state : BucketEntryState ,
mut f : F ,
) -> Option < T > {
2022-06-25 14:57:33 +00:00
for bucket in & inner . buckets {
for entry in bucket . entries ( ) {
if entry . 1. with ( | e | e . state ( cur_ts ) > = min_state ) {
if let Some ( out ) = f ( * entry . 0 , entry . 1. clone ( ) ) {
2022-05-24 21:13:52 +00:00
return Some ( out ) ;
}
}
}
}
None
}
2022-08-31 01:21:16 +00:00
pub fn get_nodes_needing_updates (
& self ,
routing_domain : RoutingDomain ,
cur_ts : u64 ,
all : bool ,
) -> Vec < NodeRef > {
2022-07-22 17:05:28 +00:00
let inner = self . inner . read ( ) ;
let mut node_refs = Vec ::< NodeRef > ::with_capacity ( inner . bucket_entry_count ) ;
Self ::with_entries ( & * inner , cur_ts , BucketEntryState ::Unreliable , | k , v | {
// Only update nodes that haven't seen our node info yet
2022-08-31 01:21:16 +00:00
if all | | ! v . with ( | e | e . has_seen_our_node_info ( routing_domain ) ) {
2022-09-04 18:17:28 +00:00
node_refs . push ( NodeRef ::new (
self . clone ( ) ,
k ,
v ,
Some ( NodeRefFilter ::new ( ) . with_routing_domain ( routing_domain ) ) ,
) ) ;
2022-07-22 17:05:28 +00:00
}
Option ::< ( ) > ::None
} ) ;
node_refs
}
2022-09-03 17:57:25 +00:00
pub fn get_nodes_needing_ping (
& self ,
routing_domain : RoutingDomain ,
cur_ts : u64 ,
) -> Vec < NodeRef > {
2022-07-22 17:05:28 +00:00
let inner = self . inner . read ( ) ;
2022-08-31 01:21:16 +00:00
// Collect relay nodes
2022-09-03 17:57:25 +00:00
let opt_relay_id = Self ::with_routing_domain ( & * inner , routing_domain , | rd | {
rd . relay_node ( ) . map ( | rn | rn . node_id ( ) )
} ) ;
2022-08-31 01:21:16 +00:00
// Collect all entries that are 'needs_ping' and have some node info making them reachable somehow
2022-07-22 17:05:28 +00:00
let mut node_refs = Vec ::< NodeRef > ::with_capacity ( inner . bucket_entry_count ) ;
Self ::with_entries ( & * inner , cur_ts , BucketEntryState ::Unreliable , | k , v | {
2022-09-03 17:57:25 +00:00
if v . with ( | e | {
2022-09-04 18:17:28 +00:00
e . has_node_info ( routing_domain . into ( ) )
2022-09-04 19:40:35 +00:00
& & e . needs_ping ( cur_ts , opt_relay_id = = Some ( k ) )
2022-09-03 17:57:25 +00:00
} ) {
2022-09-04 18:17:28 +00:00
node_refs . push ( NodeRef ::new (
self . clone ( ) ,
k ,
v ,
Some ( NodeRefFilter ::new ( ) . with_routing_domain ( routing_domain ) ) ,
) ) ;
2022-07-22 17:05:28 +00:00
}
Option ::< ( ) > ::None
} ) ;
node_refs
}
pub fn get_all_nodes ( & self , cur_ts : u64 ) -> Vec < NodeRef > {
let inner = self . inner . read ( ) ;
let mut node_refs = Vec ::< NodeRef > ::with_capacity ( inner . bucket_entry_count ) ;
Self ::with_entries ( & * inner , cur_ts , BucketEntryState ::Unreliable , | k , v | {
node_refs . push ( NodeRef ::new ( self . clone ( ) , k , v , None ) ) ;
Option ::< ( ) > ::None
} ) ;
node_refs
}
2022-06-25 14:57:33 +00:00
fn queue_bucket_kick ( & self , node_id : DHTKey ) {
2022-09-23 15:23:33 +00:00
let idx = self . find_bucket_index ( node_id ) ;
self . unlocked_inner . kick_queue . lock ( ) . insert ( idx ) ;
2021-11-22 16:28:30 +00:00
}
2022-05-03 20:43:15 +00:00
// Create a node reference, possibly creating a bucket entry
// the 'update_func' closure is called on the node, and, if created,
// in a locked fashion as to ensure the bucket entry state is always valid
2022-07-20 13:39:38 +00:00
pub fn create_node_ref < F > ( & self , node_id : DHTKey , update_func : F ) -> Option < NodeRef >
2022-05-03 20:43:15 +00:00
where
2022-06-25 14:57:33 +00:00
F : FnOnce ( & mut BucketEntryInner ) ,
2022-05-03 20:43:15 +00:00
{
2021-11-22 16:28:30 +00:00
// Ensure someone isn't trying register this node itself
if node_id = = self . node_id ( ) {
2022-07-20 13:39:38 +00:00
log_rtab! ( debug " can't register own node " ) ;
return None ;
2021-11-22 16:28:30 +00:00
}
2022-05-03 20:43:15 +00:00
// Lock this entire operation
2022-06-25 14:57:33 +00:00
let mut inner = self . inner . write ( ) ;
2022-05-03 20:43:15 +00:00
// Look up existing entry
2022-09-23 15:23:33 +00:00
let idx = self . find_bucket_index ( node_id ) ;
2022-05-03 20:43:15 +00:00
let noderef = {
2022-06-25 14:57:33 +00:00
let bucket = & inner . buckets [ idx ] ;
let entry = bucket . entry ( & node_id ) ;
2022-05-03 20:43:15 +00:00
entry . map ( | e | NodeRef ::new ( self . clone ( ) , node_id , e , None ) )
} ;
// If one doesn't exist, insert into bucket, possibly evicting a bucket member
let noderef = match noderef {
2021-11-22 16:28:30 +00:00
None = > {
// Make new entry
2022-05-03 20:43:15 +00:00
inner . bucket_entry_count + = 1 ;
2022-05-24 21:13:52 +00:00
let cnt = inner . bucket_entry_count ;
2022-05-03 20:43:15 +00:00
let bucket = & mut inner . buckets [ idx ] ;
let nr = bucket . add_entry ( node_id ) ;
// Update the entry
2022-06-25 14:57:33 +00:00
let entry = bucket . entry ( & node_id ) . unwrap ( ) ;
entry . with_mut ( update_func ) ;
2021-11-22 16:28:30 +00:00
// Kick the bucket
2022-09-23 15:23:33 +00:00
self . unlocked_inner . kick_queue . lock ( ) . insert ( idx ) ;
2022-09-03 17:57:25 +00:00
log_rtab! ( debug " Routing table now has {} nodes, {} live " , cnt , Self ::get_entry_count_inner ( & mut * inner , RoutingDomainSet ::all ( ) , BucketEntryState ::Unreliable ) ) ;
2021-11-22 16:28:30 +00:00
nr
}
2022-05-03 20:43:15 +00:00
Some ( nr ) = > {
// Update the entry
let bucket = & mut inner . buckets [ idx ] ;
2022-06-25 14:57:33 +00:00
let entry = bucket . entry ( & node_id ) . unwrap ( ) ;
entry . with_mut ( | e | {
update_func ( e ) ;
} ) ;
2022-05-03 20:43:15 +00:00
nr
}
2021-11-22 16:28:30 +00:00
} ;
2022-07-20 13:39:38 +00:00
Some ( noderef )
2021-11-22 16:28:30 +00:00
}
pub fn lookup_node_ref ( & self , node_id : DHTKey ) -> Option < NodeRef > {
2022-09-23 15:23:33 +00:00
if node_id = = self . unlocked_inner . node_id {
2022-08-04 17:25:05 +00:00
log_rtab! ( debug " can't look up own node id in routing table " ) ;
return None ;
}
2022-09-23 15:23:33 +00:00
let idx = self . find_bucket_index ( node_id ) ;
let inner = self . inner . read ( ) ;
2022-06-25 14:57:33 +00:00
let bucket = & inner . buckets [ idx ] ;
2021-11-26 14:54:38 +00:00
bucket
2022-06-25 14:57:33 +00:00
. entry ( & node_id )
2022-04-19 15:23:44 +00:00
. map ( | e | NodeRef ::new ( self . clone ( ) , node_id , e , None ) )
2021-11-22 16:28:30 +00:00
}
// Shortcut function to add a node to our routing table if it doesn't exist
2022-09-04 18:17:28 +00:00
// and add the dial info we have for it. Returns a noderef filtered to
// the routing domain in which this node was registered for convenience.
2022-05-11 01:49:42 +00:00
pub fn register_node_with_signed_node_info (
2021-11-22 16:28:30 +00:00
& self ,
2022-09-04 18:17:28 +00:00
routing_domain : RoutingDomain ,
2021-11-22 16:28:30 +00:00
node_id : DHTKey ,
2022-05-11 01:49:42 +00:00
signed_node_info : SignedNodeInfo ,
2022-09-04 18:17:28 +00:00
allow_invalid : bool ,
2022-07-20 13:39:38 +00:00
) -> Option < NodeRef > {
2022-09-14 18:36:29 +00:00
//log_rtab!("register_node_with_signed_node_info: routing_domain: {:?}, node_id: {:?}, signed_node_info: {:?}, allow_invalid: {:?}", routing_domain, node_id, signed_node_info, allow_invalid );
2022-09-09 20:27:13 +00:00
2022-05-11 13:37:54 +00:00
// validate signed node info is not something malicious
if node_id = = self . node_id ( ) {
2022-07-20 13:39:38 +00:00
log_rtab! ( debug " can't register own node id in routing table " ) ;
return None ;
2022-05-11 13:37:54 +00:00
}
if let Some ( rpi ) = & signed_node_info . node_info . relay_peer_info {
if rpi . node_id . key = = node_id {
2022-07-20 13:39:38 +00:00
log_rtab! ( debug " node can not be its own relay " ) ;
return None ;
2022-05-11 13:37:54 +00:00
}
}
2022-09-04 18:17:28 +00:00
if ! allow_invalid {
// verify signature
if ! signed_node_info . has_valid_signature ( ) {
log_rtab! ( debug " signed node info for {} has invalid signature " , node_id ) ;
return None ;
}
// verify signed node info is valid in this routing domain
if ! self
. node_info_is_valid_in_routing_domain ( routing_domain , & signed_node_info . node_info )
{
log_rtab! ( debug " signed node info for {} not valid in the {:?} routing domain " , node_id , routing_domain ) ;
return None ;
}
}
2022-07-20 13:39:38 +00:00
self . create_node_ref ( node_id , | e | {
2022-09-04 18:17:28 +00:00
e . update_signed_node_info ( routing_domain , signed_node_info ) ;
2022-09-03 17:57:25 +00:00
} )
. map ( | mut nr | {
2022-09-04 18:17:28 +00:00
nr . set_filter ( Some (
NodeRefFilter ::new ( ) . with_routing_domain ( routing_domain ) ,
) ) ;
2022-09-03 17:57:25 +00:00
nr
2022-07-20 13:39:38 +00:00
} )
2021-11-22 16:28:30 +00:00
}
// Shortcut function to add a node to our routing table if it doesn't exist
// and add the last peer address we have for it, since that's pretty common
pub fn register_node_with_existing_connection (
& self ,
node_id : DHTKey ,
descriptor : ConnectionDescriptor ,
timestamp : u64 ,
2022-07-20 13:39:38 +00:00
) -> Option < NodeRef > {
2022-09-06 20:49:43 +00:00
let out = self . create_node_ref ( node_id , | e | {
2022-08-05 22:07:32 +00:00
// this node is live because it literally just connected to us
e . touch_last_seen ( timestamp ) ;
2022-09-06 20:49:43 +00:00
} ) ;
if let Some ( nr ) = & out {
// set the most recent node address for connection finding and udp replies
nr . set_last_connection ( descriptor , timestamp ) ;
}
out
2021-11-22 16:28:30 +00:00
}
// Ticks about once per second
// to run tick tasks which may run at slower tick rates as configured
2022-07-10 21:36:50 +00:00
pub async fn tick ( & self ) -> EyreResult < ( ) > {
2021-11-22 16:28:30 +00:00
// Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs
self . unlocked_inner . rolling_transfers_task . tick ( ) . await ? ;
2022-06-25 14:57:33 +00:00
// Kick buckets task
2022-09-23 15:23:33 +00:00
let kick_bucket_queue_count = self . unlocked_inner . kick_queue . lock ( ) . len ( ) ;
2022-06-25 14:57:33 +00:00
if kick_bucket_queue_count > 0 {
self . unlocked_inner . kick_buckets_task . tick ( ) . await ? ;
}
2022-04-07 13:55:09 +00:00
2021-11-22 16:28:30 +00:00
Ok ( ( ) )
}
2021-11-26 14:54:38 +00:00
2022-03-24 14:14:50 +00:00
//////////////////////////////////////////////////////////////////////
// Routing Table Health Metrics
pub fn get_routing_table_health ( & self ) -> RoutingTableHealth {
let mut health = RoutingTableHealth ::default ( ) ;
let cur_ts = intf ::get_timestamp ( ) ;
2022-06-25 14:57:33 +00:00
let inner = self . inner . read ( ) ;
2022-03-24 14:14:50 +00:00
for bucket in & inner . buckets {
2022-06-25 14:57:33 +00:00
for ( _ , v ) in bucket . entries ( ) {
match v . with ( | e | e . state ( cur_ts ) ) {
2022-03-24 14:14:50 +00:00
BucketEntryState ::Reliable = > {
health . reliable_entry_count + = 1 ;
}
BucketEntryState ::Unreliable = > {
health . unreliable_entry_count + = 1 ;
}
BucketEntryState ::Dead = > {
health . dead_entry_count + = 1 ;
}
}
}
}
health
}
2022-08-31 01:21:16 +00:00
pub fn get_recent_peers ( & self ) -> Vec < ( DHTKey , RecentPeersEntry ) > {
2022-09-07 15:30:43 +00:00
let mut recent_peers = Vec ::new ( ) ;
let mut dead_peers = Vec ::new ( ) ;
let mut out = Vec ::new ( ) ;
// collect all recent peers
{
let inner = self . inner . read ( ) ;
for ( k , _v ) in & inner . recent_peers {
recent_peers . push ( * k ) ;
}
}
// look up each node and make sure the connection is still live
// (uses same logic as send_data, ensuring last_connection works for UDP)
for e in & recent_peers {
let mut dead = true ;
if let Some ( nr ) = self . lookup_node_ref ( * e ) {
if let Some ( last_connection ) = nr . last_connection ( ) {
out . push ( ( * e , RecentPeersEntry { last_connection } ) ) ;
dead = false ;
}
}
if dead {
dead_peers . push ( e ) ;
}
}
// purge dead recent peers
{
let mut inner = self . inner . write ( ) ;
for d in dead_peers {
inner . recent_peers . remove ( d ) ;
}
}
out
2022-08-31 01:21:16 +00:00
}
2022-09-06 20:49:43 +00:00
pub fn touch_recent_peer ( & self , node_id : DHTKey , last_connection : ConnectionDescriptor ) {
let mut inner = self . inner . write ( ) ;
2022-08-31 01:21:16 +00:00
inner
. recent_peers
. insert ( node_id , RecentPeersEntry { last_connection } ) ;
}
2021-11-22 16:28:30 +00:00
}