From 0adcc70bc9d1c03f995eeb7f384d1a193c627424 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sat, 25 Jun 2022 10:57:33 -0400 Subject: [PATCH] refactor --- veilid-core/src/network_manager/mod.rs | 37 +- .../native/network_class_discovery.rs | 10 +- veilid-core/src/routing_table/bucket.rs | 83 +- veilid-core/src/routing_table/bucket_entry.rs | 92 +- veilid-core/src/routing_table/debug.rs | 14 +- veilid-core/src/routing_table/find_nodes.rs | 403 +++++++-- veilid-core/src/routing_table/mod.rs | 804 +++++------------- veilid-core/src/routing_table/node_ref.rs | 49 +- veilid-core/src/routing_table/tasks.rs | 378 ++++++++ veilid-core/src/rpc_processor/mod.rs | 24 +- 10 files changed, 1075 insertions(+), 819 deletions(-) create mode 100644 veilid-core/src/routing_table/tasks.rs diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 2267d75d..3c7837c8 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -40,6 +40,7 @@ pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE; pub const IPADDR_TABLE_SIZE: usize = 1024; pub const IPADDR_MAX_INACTIVE_DURATION_US: u64 = 300_000_000u64; // 5 minutes pub const GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT: usize = 3; +pub const BOOT_MAGIC: &[u8; 4] = b"BOOT"; #[derive(Copy, Clone, Debug, Default)] pub struct ProtocolConfig { @@ -1062,6 +1063,34 @@ impl NetworkManager { }) } + // Direct bootstrap request handler (separate fallback mechanism from cheaper TXT bootstrap mechanism) + async fn handle_boot_request(&self, descriptor: ConnectionDescriptor) -> Result<(), String> { + let routing_table = self.routing_table(); + + // Get a bunch of nodes with the various + let bootstrap_nodes = routing_table.find_bootstrap_nodes_filtered(2); + + // Serialize out peer info + let bootstrap_peerinfo: Vec = bootstrap_nodes + .iter() + .filter_map(|b| b.peer_info()) + .collect(); + let json_bytes = serialize_json(bootstrap_peerinfo).as_bytes().to_vec(); + + // Reply with a chunk of signed routing table + match self + .net() + .send_data_to_existing_connection(descriptor, json_bytes) + .await? + { + None => { + // Bootstrap reply was sent + Ok(()) + } + Some(_) => Err("bootstrap reply could not be sent".to_owned()), + } + } + // Called when a packet potentially containing an RPC envelope is received by a low-level // network protocol handler. Processes the envelope, authenticates and decrypts the RPC message // and passes it to the RPC handler @@ -1085,6 +1114,12 @@ impl NetworkManager { return Err("short packet".to_owned()); } + // Is this a direct bootstrap request instead of an envelope? + if data[0..4] == *BOOT_MAGIC { + self.handle_boot_request(descriptor).await?; + return Ok(true); + } + // Is this an out-of-band receipt instead of an envelope? if data[0..4] == *RECEIPT_MAGIC { self.handle_out_of_band_receipt(data).await?; @@ -1192,7 +1227,7 @@ impl NetworkManager { let source_noderef = routing_table .register_node_with_existing_connection(envelope.get_sender_id(), descriptor, ts) .map_err(|e| format!("node id registration failed: {}", e))?; - source_noderef.operate(|e| e.set_min_max_version(envelope.get_min_max_version())); + source_noderef.operate_mut(|e| e.set_min_max_version(envelope.get_min_max_version())); // xxx: deal with spoofing and flooding here? diff --git a/veilid-core/src/network_manager/native/network_class_discovery.rs b/veilid-core/src/network_manager/native/network_class_discovery.rs index 8509be06..9308e81f 100644 --- a/veilid-core/src/network_manager/native/network_class_discovery.rs +++ b/veilid-core/src/network_manager/native/network_class_discovery.rs @@ -104,7 +104,15 @@ impl DiscoveryContext { let filter = DialInfoFilter::global() .with_protocol_type(protocol_type) .with_address_type(address_type); - let peers = self.routing_table.find_fast_public_nodes_filtered(&filter); + let node_count = { + let config = self.routing_table.network_manager().config(); + let c = config.get(); + c.network.dht.max_find_node_count as usize + }; + + let peers = self + .routing_table + .find_fast_public_nodes_filtered(node_count, &filter); if peers.is_empty() { log_net!("no peers of type '{:?}'", filter); return None; diff --git a/veilid-core/src/routing_table/bucket.rs b/veilid-core/src/routing_table/bucket.rs index 69581526..fbdbaf12 100644 --- a/veilid-core/src/routing_table/bucket.rs +++ b/veilid-core/src/routing_table/bucket.rs @@ -1,14 +1,12 @@ use super::*; +use core::sync::atomic::Ordering; -#[derive(Clone)] pub struct Bucket { routing_table: RoutingTable, - entries: BTreeMap, + entries: BTreeMap>, newest_entry: Option, } -pub(super) type EntriesIterMut<'a> = - alloc::collections::btree_map::IterMut<'a, DHTKey, BucketEntry>; -pub(super) type EntriesIter<'a> = alloc::collections::btree_map::Iter<'a, DHTKey, BucketEntry>; +pub(super) type EntriesIter<'a> = alloc::collections::btree_map::Iter<'a, DHTKey, Arc>; fn state_ordering(state: BucketEntryState) -> usize { match state { @@ -31,14 +29,14 @@ impl Bucket { log_rtab!("Node added: {}", node_id.encode()); // Add new entry - self.entries.insert(node_id, BucketEntry::new()); + self.entries.insert(node_id, Arc::new(BucketEntry::new())); // This is now the newest bucket entry self.newest_entry = Some(node_id); // Get a node ref to return - let entry_ref = self.entries.get_mut(&node_id).unwrap(); - NodeRef::new(self.routing_table.clone(), node_id, entry_ref, None) + let entry = self.entries.get(&node_id).unwrap().clone(); + NodeRef::new(self.routing_table.clone(), node_id, entry, None) } pub(super) fn remove_entry(&mut self, node_id: &DHTKey) { @@ -50,25 +48,21 @@ impl Bucket { // newest_entry is updated by kick_bucket() } - pub(super) fn roll_transfers(&mut self, last_ts: u64, cur_ts: u64) { + pub(super) fn roll_transfers(&self, last_ts: u64, cur_ts: u64) { // Called every ROLLING_TRANSFERS_INTERVAL_SECS - for entry in &mut self.entries { - entry.1.roll_transfers(last_ts, cur_ts); + for (_k, v) in &self.entries { + v.with_mut(|e| e.roll_transfers(last_ts, cur_ts)); } } - pub(super) fn entry_mut(&mut self, key: &DHTKey) -> Option<&mut BucketEntry> { - self.entries.get_mut(key) + pub(super) fn entry(&self, key: &DHTKey) -> Option> { + self.entries.get(key).cloned() } pub(super) fn entries(&self) -> EntriesIter { self.entries.iter() } - pub(super) fn entries_mut(&mut self) -> EntriesIterMut { - self.entries.iter_mut() - } - pub(super) fn kick(&mut self, bucket_depth: usize) -> Option> { // Get number of entries to attempt to purge from bucket let bucket_len = self.entries.len(); @@ -83,27 +77,34 @@ impl Bucket { let mut extra_entries = bucket_len - bucket_depth; // Get the sorted list of entries by their kick order - let mut sorted_entries: Vec<(&_, &_)> = self.entries.iter().collect(); + let mut sorted_entries: Vec<(DHTKey, Arc)> = self + .entries + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); let cur_ts = get_timestamp(); - sorted_entries.sort_by( - |a: &(&DHTKey, &BucketEntry), b: &(&DHTKey, &BucketEntry)| -> core::cmp::Ordering { - let ea = a.1; - let eb = b.1; - let astate = state_ordering(ea.state(cur_ts)); - let bstate = state_ordering(eb.state(cur_ts)); - // first kick dead nodes, then unreliable nodes - if astate < bstate { - return core::cmp::Ordering::Less; - } - if astate > bstate { - return core::cmp::Ordering::Greater; - } - // then kick by time added, most recent nodes are kicked first - let ata = ea.peer_stats().time_added; - let bta = eb.peer_stats().time_added; - bta.cmp(&ata) - }, - ); + sorted_entries.sort_by(|a, b| -> core::cmp::Ordering { + if a.0 == b.0 { + return core::cmp::Ordering::Equal; + } + a.1.with(|ea| { + b.1.with(|eb| { + let astate = state_ordering(ea.state(cur_ts)); + let bstate = state_ordering(eb.state(cur_ts)); + // first kick dead nodes, then unreliable nodes + if astate < bstate { + return core::cmp::Ordering::Less; + } + if astate > bstate { + return core::cmp::Ordering::Greater; + } + // then kick by time added, most recent nodes are kicked first + let ata = ea.peer_stats().time_added; + let bta = eb.peer_stats().time_added; + bta.cmp(&ata) + }) + }) + }); self.newest_entry = None; for entry in sorted_entries { @@ -111,23 +112,23 @@ impl Bucket { if extra_entries == 0 { // The first 'live' entry we find is our newest entry if self.newest_entry.is_none() { - self.newest_entry = Some(*entry.0); + self.newest_entry = Some(entry.0); } break; } extra_entries -= 1; // if this entry has references we can't drop it yet - if entry.1.ref_count > 0 { + if entry.1.ref_count.load(Ordering::Acquire) > 0 { // The first 'live' entry we fine is our newest entry if self.newest_entry.is_none() { - self.newest_entry = Some(*entry.0); + self.newest_entry = Some(entry.0); } continue; } // if no references, lets evict it - dead_node_ids.insert(*entry.0); + dead_node_ids.insert(entry.0); } // Now purge the dead node ids diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 4e11bc78..6c1b6e97 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -1,4 +1,5 @@ use super::*; +use core::sync::atomic::{AtomicU32, Ordering}; // Reliable pings are done with increased spacing between pings // - Start secs is the number of seconds between the first two pings @@ -34,9 +35,8 @@ pub enum BucketEntryState { Reliable, } -#[derive(Debug, Clone)] -pub struct BucketEntry { - pub(super) ref_count: u32, +#[derive(Debug)] +pub struct BucketEntryInner { min_max_version: Option<(u8, u8)>, seen_our_node_info: bool, last_connection: Option<(ConnectionDescriptor, u64)>, @@ -51,32 +51,7 @@ pub struct BucketEntry { node_ref_tracks: HashMap, } -impl BucketEntry { - pub(super) fn new() -> Self { - let now = get_timestamp(); - Self { - ref_count: 0, - min_max_version: None, - seen_our_node_info: false, - last_connection: None, - opt_signed_node_info: None, - opt_local_node_info: None, - peer_stats: PeerStats { - time_added: now, - rpc_stats: RPCStats::default(), - latency: None, - transfer: TransferStatsDownUp::default(), - status: None, - }, - latency_stats_accounting: LatencyStatsAccounting::new(), - transfer_stats_accounting: TransferStatsAccounting::new(), - #[cfg(feature = "tracking")] - next_track_id: 0, - #[cfg(feature = "tracking")] - node_ref_tracks: HashMap::new(), - } - } - +impl BucketEntryInner { #[cfg(feature = "tracking")] pub fn track(&mut self) -> usize { let track_id = self.next_track_id; @@ -363,7 +338,7 @@ impl BucketEntry { self.peer_stats.rpc_stats.last_seen_ts = Some(ts); } - pub(super) fn state_debug_info(&self, cur_ts: u64) -> String { + pub(super) fn _state_debug_info(&self, cur_ts: u64) -> String { let first_consecutive_seen_ts = if let Some(first_consecutive_seen_ts) = self.peer_stats.rpc_stats.first_consecutive_seen_ts { @@ -384,7 +359,7 @@ impl BucketEntry { }; format!( - "state: {:?}, first_consecutive_seen_ts: {}, last_seen_ts: {}", + "state: {:?}, first_consecutive_seen_ts: {}, last_seen_ts: {}", self.state(cur_ts), first_consecutive_seen_ts, last_seen_ts_str @@ -435,9 +410,60 @@ impl BucketEntry { } } +#[derive(Debug)] +pub struct BucketEntry { + pub(super) ref_count: AtomicU32, + inner: RwLock, +} + +impl BucketEntry { + pub(super) fn new() -> Self { + let now = get_timestamp(); + Self { + ref_count: AtomicU32::new(0), + inner: RwLock::new(BucketEntryInner { + min_max_version: None, + seen_our_node_info: false, + last_connection: None, + opt_signed_node_info: None, + opt_local_node_info: None, + peer_stats: PeerStats { + time_added: now, + rpc_stats: RPCStats::default(), + latency: None, + transfer: TransferStatsDownUp::default(), + status: None, + }, + latency_stats_accounting: LatencyStatsAccounting::new(), + transfer_stats_accounting: TransferStatsAccounting::new(), + #[cfg(feature = "tracking")] + next_track_id: 0, + #[cfg(feature = "tracking")] + node_ref_tracks: HashMap::new(), + }), + } + } + + pub fn with(&self, f: F) -> R + where + F: FnOnce(&BucketEntryInner) -> R, + { + let inner = self.inner.read(); + f(&*inner) + } + + pub fn with_mut(&self, f: F) -> R + where + F: FnOnce(&mut BucketEntryInner) -> R, + { + let mut inner = self.inner.write(); + f(&mut *inner) + } +} + impl Drop for BucketEntry { fn drop(&mut self) { - if self.ref_count != 0 { + if self.ref_count.load(Ordering::Relaxed) != 0 { #[cfg(feature = "tracking")] { println!("NodeRef Tracking"); @@ -449,7 +475,7 @@ impl Drop for BucketEntry { panic!( "bucket entry dropped with non-zero refcount: {:#?}", - self.node_info() + self.inner.read().node_info() ) } } diff --git a/veilid-core/src/routing_table/debug.rs b/veilid-core/src/routing_table/debug.rs index d90dd8ec..4731c5db 100644 --- a/veilid-core/src/routing_table/debug.rs +++ b/veilid-core/src/routing_table/debug.rs @@ -3,7 +3,7 @@ use super::*; impl RoutingTable { pub fn debug_info_nodeinfo(&self) -> String { let mut out = String::new(); - let inner = self.inner.lock(); + let inner = self.inner.read(); out += "Routing Table Info:\n"; out += &format!(" Node Id: {}\n", inner.node_id.encode()); @@ -88,7 +88,7 @@ impl RoutingTable { } pub fn debug_info_entries(&self, limit: usize, min_state: BucketEntryState) -> String { - let inner = self.inner.lock(); + let inner = self.inner.read(); let cur_ts = get_timestamp(); let mut out = String::new(); @@ -98,17 +98,17 @@ impl RoutingTable { let mut cnt = 0; out += &format!("Entries: {}\n", inner.bucket_entry_count); while b < blen { - let filtered_entries: Vec<(&DHTKey, &BucketEntry)> = inner.buckets[b] + let filtered_entries: Vec<(&DHTKey, &Arc)> = inner.buckets[b] .entries() .filter(|e| { - let state = e.1.state(cur_ts); + let state = e.1.with(|e| e.state(cur_ts)); state >= min_state }) .collect(); if !filtered_entries.is_empty() { out += &format!(" Bucket #{}:\n", b); for e in filtered_entries { - let state = e.1.state(cur_ts); + let state = e.1.with(|e| e.state(cur_ts)); out += &format!( " {} [{}]\n", e.0.encode(), @@ -147,7 +147,7 @@ impl RoutingTable { } pub fn debug_info_buckets(&self, min_state: BucketEntryState) -> String { - let inner = self.inner.lock(); + let inner = self.inner.read(); let cur_ts = get_timestamp(); let mut out = String::new(); @@ -162,7 +162,7 @@ impl RoutingTable { while c < COLS { let mut cnt = 0; for e in inner.buckets[b].entries() { - if e.1.state(cur_ts) >= min_state { + if e.1.with(|e| e.state(cur_ts) >= min_state) { cnt += 1; } } diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index 1e44ab9b..57b9abed 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -5,30 +5,30 @@ use crate::intf::*; use crate::xx::*; use crate::*; -pub type FilterType = Box)) -> bool>; - impl RoutingTable { // Retrieve the fastest nodes in the routing table with a particular kind of protocol and address type // Returns noderefs are are scoped to that address type only pub fn find_fast_public_nodes_filtered( &self, + node_count: usize, dial_info_filter: &DialInfoFilter, ) -> Vec { let dial_info_filter1 = dial_info_filter.clone(); - self.find_fastest_nodes( - // filter - Some(Box::new( - move |params: &(&DHTKey, Option<&mut BucketEntry>)| { - let entry = params.1.as_ref().unwrap(); + self.find_fastest_nodes( + // count + node_count, + // filter + Some(move |_k: DHTKey, v: Option>| { + let entry = v.unwrap(); + entry.with(|e| { // skip nodes on our local network here - if entry.local_node_info().is_some() { + if e.local_node_info().is_some() { return false; } // does it have matching public dial info? - entry - .node_info() + e.node_info() .map(|n| { n.first_filtered_dial_info_detail(|did| { did.matches_filter(&dial_info_filter1) @@ -36,20 +36,83 @@ impl RoutingTable { .is_some() }) .unwrap_or(false) - }, - )), + }) + }), // transform - |e| { + |k: DHTKey, v: Option>| { NodeRef::new( self.clone(), - *e.0, - e.1.as_mut().unwrap(), + k, + v.unwrap().clone(), Some(dial_info_filter.clone()), ) }, ) } + // Retrieve up to N of each type of protocol capable nodes + pub fn find_bootstrap_nodes_filtered(&self, max_per_type: usize) -> Vec { + let protocol_types = vec![ + ProtocolType::UDP, + ProtocolType::TCP, + ProtocolType::WS, + ProtocolType::WSS, + ]; + let mut nodes_proto_v4 = vec![0usize, 0usize, 0usize, 0usize]; + let mut nodes_proto_v6 = vec![0usize, 0usize, 0usize, 0usize]; + + self.find_fastest_nodes( + // count + protocol_types.len() * 2 * max_per_type, + // filter + Some(move |_k: DHTKey, v: Option>| { + let entry = v.unwrap(); + entry.with(|e| { + // skip nodes on our local network here + if e.local_node_info().is_some() { + return false; + } + + // does it have some dial info we need? + let filter = |n: NodeInfo| { + let mut keep = false; + for did in n.dial_info_detail_list { + if did.dial_info.is_global() { + if matches!(did.dial_info.address_type(), AddressType::IPV4) { + for (n, protocol_type) in protocol_types.iter().enumerate() { + if nodes_proto_v4[n] < max_per_type + && did.dial_info.protocol_type() == *protocol_type + { + nodes_proto_v4[n] += 1; + keep = true; + } + } + } else if matches!(did.dial_info.address_type(), AddressType::IPV6) + { + for (n, protocol_type) in protocol_types.iter().enumerate() { + if nodes_proto_v6[n] < max_per_type + && did.dial_info.protocol_type() == *protocol_type + { + nodes_proto_v6[n] += 1; + keep = true; + } + } + } + } + } + keep + }; + + e.node_info().map(filter).unwrap_or(false) + }) + }), + // transform + |k: DHTKey, v: Option>| { + NodeRef::new(self.clone(), k, v.unwrap().clone(), None) + }, + ) + } + // Get our own node's peer info (public node info) so we can share it with other nodes pub fn get_own_peer_info(&self) -> PeerInfo { PeerInfo::new(NodeId::new(self.node_id()), self.get_own_signed_node_info()) @@ -75,22 +138,23 @@ impl RoutingTable { } pub fn filter_has_valid_signed_node_info( - kv: &(&DHTKey, Option<&mut BucketEntry>), + v: Option>, own_peer_info_is_valid: bool, ) -> bool { - match &kv.1 { + match v { None => own_peer_info_is_valid, - Some(b) => b.has_valid_signed_node_info(), + Some(entry) => entry.with(|e| e.has_valid_signed_node_info()), } } pub fn transform_to_peer_info( - kv: &mut (&DHTKey, Option<&mut BucketEntry>), + k: DHTKey, + v: Option>, own_peer_info: &PeerInfo, ) -> PeerInfo { - match &kv.1 { + match v { None => own_peer_info.clone(), - Some(entry) => entry.peer_info(*kv.0).unwrap(), + Some(entry) => entry.with(|e| e.peer_info(k).unwrap()), } } @@ -98,43 +162,38 @@ impl RoutingTable { &self, node_count: usize, cur_ts: u64, - filter: F, + mut filter: F, compare: C, - transform: T, + mut transform: T, ) -> Vec where - F: Fn(&(&DHTKey, Option<&mut BucketEntry>)) -> bool, - C: Fn( - &(&DHTKey, Option<&mut BucketEntry>), - &(&DHTKey, Option<&mut BucketEntry>), + F: FnMut(DHTKey, Option>) -> bool, + C: FnMut( + &(DHTKey, Option>), + &(DHTKey, Option>), ) -> core::cmp::Ordering, - T: Fn(&mut (&DHTKey, Option<&mut BucketEntry>)) -> O, + T: FnMut(DHTKey, Option>) -> O, { - let mut inner = self.inner.lock(); + let inner = self.inner.read(); + let self_node_id = inner.node_id; // collect all the nodes for sorting let mut nodes = - Vec::<(&DHTKey, Option<&mut BucketEntry>)>::with_capacity(inner.bucket_entry_count + 1); + Vec::<(DHTKey, Option>)>::with_capacity(inner.bucket_entry_count + 1); + // add our own node (only one of there with the None entry) - let self_node_id = inner.node_id; - let selfkv = (&self_node_id, None); - if filter(&selfkv) { - nodes.push(selfkv); + if filter(self_node_id, None) { + nodes.push((self_node_id, None)); } + // add all nodes from buckets - // Can't use with_entries() here due to lifetime issues - for b in &mut inner.buckets { - for (k, v) in b.entries_mut() { - // Don't bother with dead nodes - if v.state(cur_ts) >= BucketEntryState::Unreliable { - // Apply filter - let kv = (k, Some(v)); - if filter(&kv) { - nodes.push(kv); - } - } + Self::with_entries_unlocked(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { + // Apply filter + if filter(k, Some(v.clone())) { + nodes.push((k, Some(v.clone()))); } - } + Option::<()>::None + }); // sort by preference for returning nodes nodes.sort_by(compare); @@ -142,33 +201,40 @@ impl RoutingTable { // return transformed vector for filtered+sorted nodes let cnt = usize::min(node_count, nodes.len()); let mut out = Vec::::with_capacity(cnt); - for mut node in nodes { - let val = transform(&mut node); + for node in nodes { + let val = transform(node.0, node.1); out.push(val); } out } - pub fn find_fastest_nodes(&self, filter: Option, transform: T) -> Vec + pub fn find_fastest_nodes( + &self, + node_count: usize, + mut filter: Option, + transform: T, + ) -> Vec where - T: Fn(&mut (&DHTKey, Option<&mut BucketEntry>)) -> O, + F: FnMut(DHTKey, Option>) -> bool, + T: FnMut(DHTKey, Option>) -> O, { let cur_ts = get_timestamp(); - let node_count = { - let c = self.config.get(); - c.network.dht.max_find_node_count as usize - }; let out = self.find_peers_with_sort_and_filter( node_count, cur_ts, // filter - |kv| { - if kv.1.is_none() { + |k, v| { + if let Some(entry) = &v { + // always filter out dead nodes + if entry.with(|e| e.state(cur_ts) == BucketEntryState::Dead) { + false + } else { + filter.as_mut().map(|f| f(k, v)).unwrap_or(true) + } + } else { // always filter out self peer, as it is irrelevant to the 'fastest nodes' search false - } else { - filter.as_ref().map(|f| f(kv)).unwrap_or(true) } }, // sort @@ -187,50 +253,53 @@ impl RoutingTable { // reliable nodes come first let ae = a_entry.as_ref().unwrap(); let be = b_entry.as_ref().unwrap(); + ae.with(|ae| { + be.with(|be| { + let ra = ae.check_reliable(cur_ts); + let rb = be.check_reliable(cur_ts); + if ra != rb { + if ra { + return core::cmp::Ordering::Less; + } else { + return core::cmp::Ordering::Greater; + } + } - let ra = ae.check_reliable(cur_ts); - let rb = be.check_reliable(cur_ts); - if ra != rb { - if ra { - return core::cmp::Ordering::Less; - } else { - return core::cmp::Ordering::Greater; - } - } - - // latency is the next metric, closer nodes first - let a_latency = match ae.peer_stats().latency.as_ref() { - None => { - // treat unknown latency as slow - return core::cmp::Ordering::Greater; - } - Some(l) => l, - }; - let b_latency = match be.peer_stats().latency.as_ref() { - None => { - // treat unknown latency as slow - return core::cmp::Ordering::Less; - } - Some(l) => l, - }; - // Sort by average latency - a_latency.average.cmp(&b_latency.average) + // latency is the next metric, closer nodes first + let a_latency = match ae.peer_stats().latency.as_ref() { + None => { + // treat unknown latency as slow + return core::cmp::Ordering::Greater; + } + Some(l) => l, + }; + let b_latency = match be.peer_stats().latency.as_ref() { + None => { + // treat unknown latency as slow + return core::cmp::Ordering::Less; + } + Some(l) => l, + }; + // Sort by average latency + a_latency.average.cmp(&b_latency.average) + }) + }) }, // transform, transform, ); - log_rtab!(">> find_fastest_nodes: node count = {}", out.len()); out } - pub fn find_closest_nodes( + pub fn find_closest_nodes( &self, node_id: DHTKey, - filter: Option, - transform: T, + mut filter: Option, + mut transform: T, ) -> Vec where - T: Fn(&mut (&DHTKey, Option<&mut BucketEntry>)) -> O, + T: FnMut(DHTKey, Option>) -> O, + F: FnMut(DHTKey, Option>) -> bool, { let cur_ts = get_timestamp(); let node_count = { @@ -241,16 +310,21 @@ impl RoutingTable { node_count, cur_ts, // filter - |kv| filter.as_ref().map(|f| f(kv)).unwrap_or(true), + |k, v| filter.as_mut().map(|f| f(k, v)).unwrap_or(true), // sort |(a_key, a_entry), (b_key, b_entry)| { // same nodes are always the same if a_key == b_key { return core::cmp::Ordering::Equal; } + // reliable nodes come first, pessimistically treating our own node as unreliable - let ra = a_entry.as_ref().map_or(false, |x| x.check_reliable(cur_ts)); - let rb = b_entry.as_ref().map_or(false, |x| x.check_reliable(cur_ts)); + let ra = a_entry + .as_ref() + .map_or(false, |x| x.with(|x| x.check_reliable(cur_ts))); + let rb = b_entry + .as_ref() + .map_or(false, |x| x.with(|x| x.check_reliable(cur_ts))); if ra != rb { if ra { return core::cmp::Ordering::Less; @@ -265,9 +339,150 @@ impl RoutingTable { da.cmp(&db) }, // transform, - transform, + &mut transform, ); log_rtab!(">> find_closest_nodes: node count = {}", out.len()); out } + + #[instrument(level = "trace", skip(self), ret)] + pub fn find_inbound_relay(&self, cur_ts: u64) -> Option { + let inner = self.inner.read(); + let inner = &*inner; + let mut best_inbound_relay: Option<(DHTKey, Arc)> = None; + + // Iterate all known nodes for candidates + Self::with_entries_unlocked(inner, cur_ts, BucketEntryState::Unreliable, |k, v| { + // Ensure this node is not on our local network + if v.with(|e| { + e.local_node_info() + .map(|l| l.has_dial_info()) + .unwrap_or(false) + }) { + return Option::<()>::None; + } + + // Ensure we have the node's status + if let Some(node_status) = v.with(|e| e.peer_stats().status.clone()) { + // Ensure the node will relay + if node_status.will_relay { + // Compare against previous candidate + if let Some(best_inbound_relay) = best_inbound_relay.as_mut() { + // Less is faster + let better = v.with(|e| { + best_inbound_relay.1.with(|best| { + BucketEntryInner::cmp_fastest_reliable(cur_ts, e, best) + == std::cmp::Ordering::Less + }) + }); + if better { + *best_inbound_relay = (k, v); + } + } else { + // Always store the first candidate + best_inbound_relay = Some((k, v)); + } + } + } + Option::<()>::None + }); + // Return the best inbound relay noderef + best_inbound_relay.map(|(k, e)| NodeRef::new(self.clone(), k, e, None)) + } + + #[instrument(level = "trace", skip(self), ret, err)] + pub fn register_find_node_answer(&self, fna: FindNodeAnswer) -> Result, String> { + let node_id = self.node_id(); + + // register nodes we'd found + let mut out = Vec::::with_capacity(fna.peers.len()); + for p in fna.peers { + // if our own node if is in the list then ignore it, as we don't add ourselves to our own routing table + if p.node_id.key == node_id { + continue; + } + + // register the node if it's new + let nr = self + .register_node_with_signed_node_info(p.node_id.key, p.signed_node_info.clone()) + .map_err(map_to_string) + .map_err(logthru_rtab!( + "couldn't register node {} at {:?}", + p.node_id.key, + &p.signed_node_info + ))?; + out.push(nr); + } + Ok(out) + } + + #[instrument(level = "trace", skip(self), ret, err)] + pub async fn find_node( + &self, + node_ref: NodeRef, + node_id: DHTKey, + ) -> Result, String> { + let rpc_processor = self.rpc_processor(); + + let res = rpc_processor + .clone() + .rpc_call_find_node( + Destination::Direct(node_ref.clone()), + node_id, + None, + rpc_processor.make_respond_to_sender(node_ref.clone()), + ) + .await + .map_err(map_to_string) + .map_err(logthru_rtab!())?; + + // register nodes we'd found + self.register_find_node_answer(res) + } + + #[instrument(level = "trace", skip(self), ret, err)] + pub async fn find_self(&self, node_ref: NodeRef) -> Result, String> { + let node_id = self.node_id(); + self.find_node(node_ref, node_id).await + } + + #[instrument(level = "trace", skip(self), ret, err)] + pub async fn find_target(&self, node_ref: NodeRef) -> Result, String> { + let node_id = node_ref.node_id(); + self.find_node(node_ref, node_id).await + } + + #[instrument(level = "trace", skip(self))] + pub async fn reverse_find_node(&self, node_ref: NodeRef, wide: bool) { + // Ask bootstrap node to 'find' our own node so we can get some more nodes near ourselves + // and then contact those nodes to inform -them- that we exist + + // Ask bootstrap server for nodes closest to our own node + let closest_nodes = match self.find_self(node_ref.clone()).await { + Err(e) => { + log_rtab!(error + "reverse_find_node: find_self failed for {:?}: {}", + &node_ref, e + ); + return; + } + Ok(v) => v, + }; + + // Ask each node near us to find us as well + if wide { + for closest_nr in closest_nodes { + match self.find_self(closest_nr.clone()).await { + Err(e) => { + log_rtab!(error + "reverse_find_node: closest node find_self failed for {:?}: {}", + &closest_nr, e + ); + return; + } + Ok(v) => v, + }; + } + } + } } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 6b68380e..0a73bf2a 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -4,6 +4,7 @@ mod debug; mod find_nodes; mod node_ref; mod stats_accounting; +mod tasks; use crate::dht::*; use crate::intf::*; @@ -45,17 +46,19 @@ pub struct RoutingDomainDetail { struct RoutingTableInner { network_manager: NetworkManager, - node_id: DHTKey, - node_id_secret: DHTKeySecret, - buckets: Vec, - public_internet_routing_domain: RoutingDomainDetail, - local_network_routing_domain: RoutingDomainDetail, - bucket_entry_count: usize, + node_id: DHTKey, // The current node's public DHT key + node_id_secret: DHTKeySecret, // The current node's DHT key secret - // Transfer stats for this node - self_latency_stats_accounting: LatencyStatsAccounting, - self_transfer_stats_accounting: TransferStatsAccounting, - self_transfer_stats: TransferStatsDownUp, + buckets: Vec, // Routing table buckets that hold entries + kick_queue: BTreeSet, // Buckets to kick on our next kick task + bucket_entry_count: usize, // A fast counter for the number of entries in the table, total + + public_internet_routing_domain: RoutingDomainDetail, // The dial info we use on the public internet + local_network_routing_domain: RoutingDomainDetail, // The dial info we use on the local network + + self_latency_stats_accounting: LatencyStatsAccounting, // Interim accounting mechanism for this node's RPC latency to any other node + self_transfer_stats_accounting: TransferStatsAccounting, // Interim accounting mechanism for the total bandwidth to/from this node + self_transfer_stats: TransferStatsDownUp, // Statistics about the total bandwidth to/from this node } #[derive(Clone, Debug, Default)] @@ -72,12 +75,13 @@ struct RoutingTableUnlockedInner { peer_minimum_refresh_task: TickTask, ping_validator_task: TickTask, node_info_update_single_future: MustJoinSingleFuture<()>, + kick_buckets_task: TickTask, } #[derive(Clone)] pub struct RoutingTable { config: VeilidConfig, - inner: Arc>, + inner: Arc>, unlocked_inner: Arc, } @@ -88,6 +92,7 @@ impl RoutingTable { node_id: DHTKey::default(), node_id_secret: DHTKeySecret::default(), buckets: Vec::new(), + kick_queue: BTreeSet::default(), public_internet_routing_domain: RoutingDomainDetail::default(), local_network_routing_domain: RoutingDomainDetail::default(), bucket_entry_count: 0, @@ -104,13 +109,14 @@ impl RoutingTable { peer_minimum_refresh_task: TickTask::new_ms(c.network.dht.min_peer_refresh_time_ms), ping_validator_task: TickTask::new(1), node_info_update_single_future: MustJoinSingleFuture::new(), + kick_buckets_task: TickTask::new(1), } } pub fn new(network_manager: NetworkManager) -> Self { let config = network_manager.config(); let this = Self { config: config.clone(), - inner: Arc::new(Mutex::new(Self::new_inner(network_manager))), + inner: Arc::new(RwLock::new(Self::new_inner(network_manager))), unlocked_inner: Arc::new(Self::new_unlocked_inner(config)), }; // Set rolling transfers tick task @@ -147,22 +153,31 @@ impl RoutingTable { Box::pin(this2.clone().ping_validator_task_routine(s, l, t)) }); } + // Set kick buckets tick task + { + let this2 = this.clone(); + this.unlocked_inner + .kick_buckets_task + .set_routine(move |s, l, t| { + Box::pin(this2.clone().kick_buckets_task_routine(s, l, t)) + }); + } this } pub fn network_manager(&self) -> NetworkManager { - self.inner.lock().network_manager.clone() + self.inner.read().network_manager.clone() } pub fn rpc_processor(&self) -> RPCProcessor { self.network_manager().rpc_processor() } pub fn node_id(&self) -> DHTKey { - self.inner.lock().node_id + self.inner.read().node_id } pub fn node_id_secret(&self) -> DHTKeySecret { - self.inner.lock().node_id_secret + self.inner.read().node_id_secret } fn with_routing_domain(inner: &RoutingTableInner, domain: RoutingDomain, f: F) -> R @@ -190,12 +205,12 @@ impl RoutingTable { } pub fn has_dial_info(&self, domain: RoutingDomain) -> bool { - let inner = self.inner.lock(); + let inner = self.inner.read(); Self::with_routing_domain(&*inner, domain, |rd| !rd.dial_info_details.is_empty()) } pub fn dial_info_details(&self, domain: RoutingDomain) -> Vec { - let inner = self.inner.lock(); + let inner = self.inner.read(); Self::with_routing_domain(&*inner, domain, |rd| rd.dial_info_details.clone()) } @@ -204,7 +219,7 @@ impl RoutingTable { domain: Option, filter: &DialInfoFilter, ) -> Option { - let inner = self.inner.lock(); + let inner = self.inner.read(); // Prefer local network first if it isn't filtered out if domain == None || domain == Some(RoutingDomain::LocalNetwork) { Self::with_routing_domain(&*inner, RoutingDomain::LocalNetwork, |rd| { @@ -239,7 +254,7 @@ impl RoutingTable { domain: Option, filter: &DialInfoFilter, ) -> Vec { - let inner = self.inner.lock(); + let inner = self.inner.read(); let mut ret = Vec::new(); if domain == None || domain == Some(RoutingDomain::LocalNetwork) { @@ -295,7 +310,7 @@ impl RoutingTable { .map_err(logthru_rtab!(error)); } - let mut inner = self.inner.lock(); + let mut inner = self.inner.write(); Self::with_routing_domain_mut(&mut *inner, domain, |rd| { rd.dial_info_details.push(DialInfoDetail { dial_info: dial_info.clone(), @@ -322,8 +337,8 @@ impl RoutingTable { // Public dial info changed, go through all nodes and reset their 'seen our node info' bit if matches!(domain, RoutingDomain::PublicInternet) { let cur_ts = intf::get_timestamp(); - Self::with_entries(&mut *inner, cur_ts, BucketEntryState::Dead, |_, e| { - e.set_seen_our_node_info(false); + Self::with_entries_unlocked(&*inner, cur_ts, BucketEntryState::Dead, |_, v| { + v.with_mut(|e| e.set_seen_our_node_info(false)); Option::<()>::None }); } @@ -334,7 +349,7 @@ impl RoutingTable { pub fn clear_dial_info_details(&self, domain: RoutingDomain) { trace!("clearing dial info domain: {:?}", domain); - let mut inner = self.inner.lock(); + let mut inner = self.inner.write(); Self::with_routing_domain_mut(&mut *inner, domain, |rd| { rd.dial_info_details.clear(); }) @@ -357,7 +372,7 @@ impl RoutingTable { } pub async fn init(&self) -> Result<(), String> { - let mut inner = self.inner.lock(); + let mut inner = self.inner.write(); // Size the buckets (one per bit) inner.buckets.reserve(DHT_KEY_LENGTH * 8); for _ in 0..DHT_KEY_LENGTH * 8 { @@ -404,7 +419,7 @@ impl RoutingTable { error!("node_info_update_single_future not stopped"); } - *self.inner.lock() = Self::new_inner(self.network_manager()); + *self.inner.write() = Self::new_inner(self.network_manager()); debug!("finished routing table terminate"); } @@ -432,17 +447,17 @@ impl RoutingTable { // Get the list of refs to all nodes to update let node_refs = { - let mut inner = this.inner.lock(); + let inner = this.inner.read(); let mut node_refs = Vec::::with_capacity(inner.bucket_entry_count); let cur_ts = intf::get_timestamp(); - Self::with_entries( - &mut *inner, + Self::with_entries_unlocked( + &*inner, cur_ts, BucketEntryState::Unreliable, - |k, e| { + |k, v| { // Only update nodes that haven't seen our node info yet - if !e.has_seen_our_node_info() { - node_refs.push(NodeRef::new(this.clone(), *k, e, None)); + if !v.with(|e| e.has_seen_our_node_info()) { + node_refs.push(NodeRef::new(this.clone(), k, v, None)); } Option::<()>::None }, @@ -482,7 +497,7 @@ impl RoutingTable { // Attempt to empty the routing table // should only be performed when there are no node_refs (detached) pub fn purge(&self) { - let mut inner = self.inner.lock(); + let mut inner = self.inner.write(); log_rtab!( "Starting routing table purge. Table currently has {} nodes", inner.bucket_entry_count @@ -522,26 +537,26 @@ impl RoutingTable { .unwrap() } - fn get_entry_count(inner: &mut RoutingTableInner, min_state: BucketEntryState) -> usize { + fn get_entry_count(inner: &RoutingTableInner, min_state: BucketEntryState) -> usize { let mut count = 0usize; let cur_ts = intf::get_timestamp(); - Self::with_entries(inner, cur_ts, min_state, |_, _| { + Self::with_entries_unlocked(inner, cur_ts, min_state, |_, _| { count += 1; Option::<()>::None }); count } - fn with_entries Option>( - inner: &mut RoutingTableInner, + fn with_entries_unlocked) -> Option>( + inner: &RoutingTableInner, cur_ts: u64, min_state: BucketEntryState, mut f: F, ) -> Option { - for bucket in &mut inner.buckets { - for entry in bucket.entries_mut() { - if entry.1.state(cur_ts) >= min_state { - if let Some(out) = f(entry.0, entry.1) { + for bucket in &inner.buckets { + for entry in bucket.entries() { + if entry.1.with(|e| e.state(cur_ts) >= min_state) { + if let Some(out) = f(*entry.0, entry.1.clone()) { return Some(out); } } @@ -550,22 +565,58 @@ impl RoutingTable { None } - fn drop_node_ref(&self, node_id: DHTKey) { - // Reduce ref count on entry - let mut inner = self.inner.lock(); - let idx = Self::find_bucket_index(&*inner, node_id); - let new_ref_count = { - let bucket = &mut inner.buckets[idx]; - let entry = bucket.entry_mut(&node_id).unwrap(); - entry.ref_count -= 1; - entry.ref_count - }; + // fn with_entries Option>( + // inner: &RoutingTableInner, + // cur_ts: u64, + // min_state: BucketEntryState, + // mut f: F, + // ) -> Option { + // for bucket in &inner.buckets { + // for entry in bucket.entries() { + // let out = entry.1.with(|e| { + // if e.state(cur_ts) >= min_state { + // if let Some(out) = f(entry.0, e) { + // return Some(out); + // } + // } + // None + // }); + // if out.is_some() { + // return out; + // } + // } + // } + // None + // } - // If this entry could possibly go away, kick the bucket - if new_ref_count == 0 { - // it important to do this in the same inner lock as the ref count decrease - Self::kick_bucket(&mut *inner, idx); - } + // fn with_entries_mut Option>( + // inner: &RoutingTableInner, + // cur_ts: u64, + // min_state: BucketEntryState, + // mut f: F, + // ) -> Option { + // for bucket in &inner.buckets { + // for entry in bucket.entries() { + // let out = entry.1.with_mut(|e| { + // if e.state(cur_ts) >= min_state { + // if let Some(out) = f(entry.0, e) { + // return Some(out); + // } + // } + // None + // }); + // if out.is_some() { + // return out; + // } + // } + // } + // None + // } + + fn queue_bucket_kick(&self, node_id: DHTKey) { + let mut inner = self.inner.write(); + let idx = Self::find_bucket_index(&*inner, node_id); + inner.kick_queue.insert(idx); } // Create a node reference, possibly creating a bucket entry @@ -573,7 +624,7 @@ impl RoutingTable { // in a locked fashion as to ensure the bucket entry state is always valid pub fn create_node_ref(&self, node_id: DHTKey, update_func: F) -> Result where - F: FnOnce(&mut BucketEntry), + F: FnOnce(&mut BucketEntryInner), { // Ensure someone isn't trying register this node itself if node_id == self.node_id() { @@ -581,13 +632,13 @@ impl RoutingTable { } // Lock this entire operation - let mut inner = self.inner.lock(); + let mut inner = self.inner.write(); // Look up existing entry let idx = Self::find_bucket_index(&*inner, node_id); let noderef = { - let bucket = &mut inner.buckets[idx]; - let entry = bucket.entry_mut(&node_id); + let bucket = &inner.buckets[idx]; + let entry = bucket.entry(&node_id); entry.map(|e| NodeRef::new(self.clone(), node_id, e, None)) }; @@ -602,20 +653,22 @@ impl RoutingTable { let nr = bucket.add_entry(node_id); // Update the entry - let entry = bucket.entry_mut(&node_id); - update_func(entry.unwrap()); + let entry = bucket.entry(&node_id).unwrap(); + entry.with_mut(update_func); // Kick the bucket // It is important to do this in the same inner lock as the add_entry - Self::kick_bucket(&mut *inner, idx); + inner.kick_queue.insert(idx); nr } Some(nr) => { // Update the entry let bucket = &mut inner.buckets[idx]; - let entry = bucket.entry_mut(&node_id); - update_func(entry.unwrap()); + let entry = bucket.entry(&node_id).unwrap(); + entry.with_mut(|e| { + update_func(e); + }); nr } @@ -625,11 +678,11 @@ impl RoutingTable { } pub fn lookup_node_ref(&self, node_id: DHTKey) -> Option { - let mut inner = self.inner.lock(); + let inner = self.inner.read(); let idx = Self::find_bucket_index(&*inner, node_id); - let bucket = &mut inner.buckets[idx]; + let bucket = &inner.buckets[idx]; bucket - .entry_mut(&node_id) + .entry(&node_id) .map(|e| NodeRef::new(self.clone(), node_id, e, None)) } @@ -673,531 +726,49 @@ impl RoutingTable { Ok(nr) } - fn operate_on_bucket_entry_locked( - inner: &mut RoutingTableInner, - node_id: DHTKey, - f: F, - ) -> T - where - F: FnOnce(&mut BucketEntry) -> T, - { - let idx = Self::find_bucket_index(&*inner, node_id); - let bucket = &mut inner.buckets[idx]; - let entry = bucket.entry_mut(&node_id).unwrap(); - f(entry) - } + // fn operate_on_bucket_entry_inner_locked( + // inner: &RoutingTableInner, + // node_id: DHTKey, + // f: F, + // ) -> T + // where + // F: FnOnce(&BucketEntryInner) -> T, + // { + // let idx = Self::find_bucket_index(&*inner, node_id); + // let bucket = &inner.buckets[idx]; + // let entry = bucket.entry(&node_id).unwrap(); + // entry.with(f) + // } - fn operate_on_bucket_entry(&self, node_id: DHTKey, f: F) -> T - where - F: FnOnce(&mut BucketEntry) -> T, - { - let mut inner = self.inner.lock(); - Self::operate_on_bucket_entry_locked(&mut *inner, node_id, f) - } + // fn operate_on_bucket_entry_inner_locked_mut( + // inner: &RoutingTableInner, + // node_id: DHTKey, + // f: F, + // ) -> T + // where + // F: FnOnce(&mut BucketEntryInner) -> T, + // { + // let idx = Self::find_bucket_index(&*inner, node_id); + // let bucket = &inner.buckets[idx]; + // let entry = bucket.entry(&node_id).unwrap(); + // entry.with_mut(f) + // } - pub fn find_inbound_relay(&self, cur_ts: u64) -> Option { - let mut inner = self.inner.lock(); - let inner = &mut *inner; - let mut best_inbound_relay: Option<(&DHTKey, &mut BucketEntry)> = None; + // fn operate_on_bucket_entry(&self, node_id: DHTKey, f: F) -> T + // where + // F: FnOnce(&BucketEntryInner) -> T, + // { + // let inner = self.inner.read(); + // Self::operate_on_bucket_entry_inner_locked(&mut *inner, node_id, f) + // } - // Iterate all known nodes for candidates - for bucket in &mut inner.buckets { - for (k, e) in bucket.entries_mut() { - if e.state(cur_ts) >= BucketEntryState::Unreliable { - // Ensure this node is not on our local network - if !e - .local_node_info() - .map(|l| l.has_dial_info()) - .unwrap_or(false) - { - // Ensure we have the node's status - if let Some(node_status) = &e.peer_stats().status { - // Ensure the node will relay - if node_status.will_relay { - // Compare against previous candidate - if let Some(best_inbound_relay) = best_inbound_relay.as_mut() { - // Less is faster - if BucketEntry::cmp_fastest_reliable( - cur_ts, - e, - best_inbound_relay.1, - ) == std::cmp::Ordering::Less - { - *best_inbound_relay = (k, e); - } - } else { - // Always store the first candidate - best_inbound_relay = Some((k, e)); - } - } - } - } - } - } - } - // Return the best inbound relay noderef - best_inbound_relay.map(|(k, e)| NodeRef::new(self.clone(), *k, e, None)) - } - - #[instrument(level = "trace", skip(self), ret, err)] - pub fn register_find_node_answer(&self, fna: FindNodeAnswer) -> Result, String> { - let node_id = self.node_id(); - - // register nodes we'd found - let mut out = Vec::::with_capacity(fna.peers.len()); - for p in fna.peers { - // if our own node if is in the list then ignore it, as we don't add ourselves to our own routing table - if p.node_id.key == node_id { - continue; - } - - // register the node if it's new - let nr = self - .register_node_with_signed_node_info(p.node_id.key, p.signed_node_info.clone()) - .map_err(map_to_string) - .map_err(logthru_rtab!( - "couldn't register node {} at {:?}", - p.node_id.key, - &p.signed_node_info - ))?; - out.push(nr); - } - Ok(out) - } - - #[instrument(level = "trace", skip(self), ret, err)] - pub async fn find_node( - &self, - node_ref: NodeRef, - node_id: DHTKey, - ) -> Result, String> { - let rpc_processor = self.rpc_processor(); - - let res = rpc_processor - .clone() - .rpc_call_find_node( - Destination::Direct(node_ref.clone()), - node_id, - None, - rpc_processor.make_respond_to_sender(node_ref.clone()), - ) - .await - .map_err(map_to_string) - .map_err(logthru_rtab!())?; - - // register nodes we'd found - self.register_find_node_answer(res) - } - - #[instrument(level = "trace", skip(self), ret, err)] - pub async fn find_self(&self, node_ref: NodeRef) -> Result, String> { - let node_id = self.node_id(); - self.find_node(node_ref, node_id).await - } - - #[instrument(level = "trace", skip(self), ret, err)] - pub async fn find_target(&self, node_ref: NodeRef) -> Result, String> { - let node_id = node_ref.node_id(); - self.find_node(node_ref, node_id).await - } - - #[instrument(level = "trace", skip(self))] - pub async fn reverse_find_node(&self, node_ref: NodeRef, wide: bool) { - // Ask bootstrap node to 'find' our own node so we can get some more nodes near ourselves - // and then contact those nodes to inform -them- that we exist - - // Ask bootstrap server for nodes closest to our own node - let closest_nodes = match self.find_self(node_ref.clone()).await { - Err(e) => { - log_rtab!(error - "reverse_find_node: find_self failed for {:?}: {}", - &node_ref, e - ); - return; - } - Ok(v) => v, - }; - - // Ask each node near us to find us as well - if wide { - for closest_nr in closest_nodes { - match self.find_self(closest_nr.clone()).await { - Err(e) => { - log_rtab!(error - "reverse_find_node: closest node find_self failed for {:?}: {}", - &closest_nr, e - ); - return; - } - Ok(v) => v, - }; - } - } - } - - // Bootstrap lookup process - #[instrument(level = "trace", skip(self), ret, err)] - async fn resolve_bootstrap( - &self, - bootstrap: Vec, - ) -> Result { - // Resolve from bootstrap root to bootstrap hostnames - let mut bsnames = Vec::::new(); - for bh in bootstrap { - // Get TXT record for bootstrap (bootstrap.veilid.net, or similar) - let records = intf::txt_lookup(&bh).await?; - for record in records { - // Split the bootstrap name record by commas - for rec in record.split(',') { - let rec = rec.trim(); - // If the name specified is fully qualified, go with it - let bsname = if rec.ends_with('.') { - rec.to_string() - } - // If the name is not fully qualified, prepend it to the bootstrap name - else { - format!("{}.{}", rec, bh) - }; - - // Add to the list of bootstrap name to look up - bsnames.push(bsname); - } - } - } - - // Get bootstrap nodes from hostnames concurrently - let mut unord = FuturesUnordered::new(); - for bsname in bsnames { - unord.push(async move { - // look up boostrap node txt records - let bsnirecords = match intf::txt_lookup(&bsname).await { - Err(e) => { - warn!("bootstrap node txt lookup failed for {}: {}", bsname, e); - return None; - } - Ok(v) => v, - }; - // for each record resolve into key/bootstraprecord pairs - let mut bootstrap_records: Vec<(DHTKey, BootstrapRecord)> = Vec::new(); - for bsnirecord in bsnirecords { - // Bootstrap TXT Record Format Version 0: - // txt_version,min_version,max_version,nodeid,hostname,dialinfoshort* - // - // Split bootstrap node record by commas. Example: - // 0,0,0,7lxDEabK_qgjbe38RtBa3IZLrud84P6NhGP-pRTZzdQ,bootstrap-dev-alpha.veilid.net,T5150,U5150,W5150/ws - let records: Vec = bsnirecord - .trim() - .split(',') - .map(|x| x.trim().to_owned()) - .collect(); - if records.len() < 6 { - warn!("invalid number of fields in bootstrap txt record"); - continue; - } - - // Bootstrap TXT record version - let txt_version: u8 = match records[0].parse::() { - Ok(v) => v, - Err(e) => { - warn!( - "invalid txt_version specified in bootstrap node txt record: {}", - e - ); - continue; - } - }; - if txt_version != BOOTSTRAP_TXT_VERSION { - warn!("unsupported bootstrap txt record version"); - continue; - } - - // Min/Max wire protocol version - let min_version: u8 = match records[1].parse::() { - Ok(v) => v, - Err(e) => { - warn!( - "invalid min_version specified in bootstrap node txt record: {}", - e - ); - continue; - } - }; - let max_version: u8 = match records[2].parse::() { - Ok(v) => v, - Err(e) => { - warn!( - "invalid max_version specified in bootstrap node txt record: {}", - e - ); - continue; - } - }; - - // Node Id - let node_id_str = &records[3]; - let node_id_key = match DHTKey::try_decode(node_id_str) { - Ok(v) => v, - Err(e) => { - warn!( - "Invalid node id in bootstrap node record {}: {}", - node_id_str, e - ); - continue; - } - }; - - // Hostname - let hostname_str = &records[4]; - - // If this is our own node id, then we skip it for bootstrap, in case we are a bootstrap node - if self.node_id() == node_id_key { - continue; - } - - // Resolve each record and store in node dial infos list - let mut bootstrap_record = BootstrapRecord { - min_version, - max_version, - dial_info_details: Vec::new(), - }; - for rec in &records[5..] { - let rec = rec.trim(); - let dial_infos = match DialInfo::try_vec_from_short(rec, hostname_str) { - Ok(dis) => dis, - Err(e) => { - warn!("Couldn't resolve bootstrap node dial info {}: {}", rec, e); - continue; - } - }; - - for di in dial_infos { - bootstrap_record.dial_info_details.push(DialInfoDetail { - dial_info: di, - class: DialInfoClass::Direct, - }); - } - } - bootstrap_records.push((node_id_key, bootstrap_record)); - } - Some(bootstrap_records) - }); - } - - let mut bsmap = BootstrapRecordMap::new(); - while let Some(bootstrap_records) = unord.next().await { - if let Some(bootstrap_records) = bootstrap_records { - for (bskey, mut bsrec) in bootstrap_records { - let rec = bsmap.entry(bskey).or_insert_with(|| BootstrapRecord { - min_version: bsrec.min_version, - max_version: bsrec.max_version, - dial_info_details: Vec::new(), - }); - rec.dial_info_details.append(&mut bsrec.dial_info_details); - } - } - } - - Ok(bsmap) - } - - #[instrument(level = "trace", skip(self), err)] - async fn bootstrap_task_routine(self, stop_token: StopToken) -> Result<(), String> { - let (bootstrap, bootstrap_nodes) = { - let c = self.config.get(); - ( - c.network.bootstrap.clone(), - c.network.bootstrap_nodes.clone(), - ) - }; - - log_rtab!(debug "--- bootstrap_task"); - - // If we aren't specifying a bootstrap node list explicitly, then pull from the bootstrap server(s) - - let bsmap: BootstrapRecordMap = if !bootstrap_nodes.is_empty() { - let mut bsmap = BootstrapRecordMap::new(); - let mut bootstrap_node_dial_infos = Vec::new(); - for b in bootstrap_nodes { - let ndis = NodeDialInfo::from_str(b.as_str()) - .map_err(map_to_string) - .map_err(logthru_rtab!( - "Invalid node dial info in bootstrap entry: {}", - b - ))?; - bootstrap_node_dial_infos.push(ndis); - } - for ndi in bootstrap_node_dial_infos { - let node_id = ndi.node_id.key; - bsmap - .entry(node_id) - .or_insert_with(|| BootstrapRecord { - min_version: MIN_VERSION, - max_version: MAX_VERSION, - dial_info_details: Vec::new(), - }) - .dial_info_details - .push(DialInfoDetail { - dial_info: ndi.dial_info, - class: DialInfoClass::Direct, // Bootstraps are always directly reachable - }); - } - bsmap - } else { - // Resolve bootstrap servers and recurse their TXT entries - self.resolve_bootstrap(bootstrap).await? - }; - - // Map all bootstrap entries to a single key with multiple dialinfo - - // Run all bootstrap operations concurrently - let mut unord = FuturesUnordered::new(); - for (k, mut v) in bsmap { - // Sort dial info so we get the preferred order correct - v.dial_info_details.sort(); - - log_rtab!("--- bootstrapping {} with {:?}", k.encode(), &v); - - // Make invalid signed node info (no signature) - let nr = self - .register_node_with_signed_node_info( - k, - SignedNodeInfo::with_no_signature(NodeInfo { - network_class: NetworkClass::InboundCapable, // Bootstraps are always inbound capable - outbound_protocols: ProtocolSet::empty(), // Bootstraps do not participate in relaying and will not make outbound requests - min_version: v.min_version, // Minimum protocol version specified in txt record - max_version: v.max_version, // Maximum protocol version specified in txt record - dial_info_detail_list: v.dial_info_details, // Dial info is as specified in the bootstrap list - relay_peer_info: None, // Bootstraps never require a relay themselves - }), - ) - .map_err(logthru_rtab!(error "Couldn't add bootstrap node: {}", k))?; - - // Add this our futures to process in parallel - let this = self.clone(); - unord.push(async move { - // Need VALID signed peer info, so ask bootstrap to find_node of itself - // which will ensure it has the bootstrap's signed peer info as part of the response - let _ = this.find_target(nr.clone()).await; - - // Ensure we got the signed peer info - if !nr.operate(|e| e.has_valid_signed_node_info()) { - log_rtab!(warn - "bootstrap at {:?} did not return valid signed node info", - nr - ); - // If this node info is invalid, it will time out after being unpingable - } else { - // otherwise this bootstrap is valid, lets ask it to find ourselves now - this.reverse_find_node(nr, true).await - } - }); - } - - // Wait for all bootstrap operations to complete before we complete the singlefuture - while unord.next().await.is_some() {} - Ok(()) - } - - /////////////////////////////////////////////////////////// - /// Peer ping validation - - // Ask our remaining peers to give us more peers before we go - // back to the bootstrap servers to keep us from bothering them too much - #[instrument(level = "trace", skip(self), err)] - async fn peer_minimum_refresh_task_routine(self, stop_token: StopToken) -> Result<(), String> { - // get list of all peers we know about, even the unreliable ones, and ask them to find nodes close to our node too - let noderefs = { - let mut inner = self.inner.lock(); - let mut noderefs = Vec::::with_capacity(inner.bucket_entry_count); - let cur_ts = intf::get_timestamp(); - Self::with_entries( - &mut *inner, - cur_ts, - BucketEntryState::Unreliable, - |k, entry| { - noderefs.push(NodeRef::new(self.clone(), *k, entry, None)); - Option::<()>::None - }, - ); - noderefs - }; - - // do peer minimum search concurrently - let mut unord = FuturesUnordered::new(); - for nr in noderefs { - log_rtab!("--- peer minimum search with {:?}", nr); - unord.push(self.reverse_find_node(nr, false)); - } - while unord.next().await.is_some() {} - - Ok(()) - } - - // Ping each node in the routing table if they need to be pinged - // to determine their reliability - #[instrument(level = "trace", skip(self), err)] - async fn ping_validator_task_routine( - self, - stop_token: StopToken, - _last_ts: u64, - cur_ts: u64, - ) -> Result<(), String> { - // log_rtab!("--- ping_validator task"); - - let rpc = self.rpc_processor(); - let netman = self.network_manager(); - let relay_node_id = netman.relay_node().map(|nr| nr.node_id()); - - let mut unord = FuturesUnordered::new(); - { - let mut inner = self.inner.lock(); - - Self::with_entries(&mut *inner, cur_ts, BucketEntryState::Unreliable, |k, e| { - if e.needs_ping(k, cur_ts, relay_node_id) { - let nr = NodeRef::new(self.clone(), *k, e, None); - log_rtab!( - " --- ping validating: {:?} ({})", - nr, - e.state_debug_info(cur_ts) - ); - unord.push(MustJoinHandle::new(intf::spawn_local( - rpc.clone().rpc_call_status(nr), - ))); - } - Option::<()>::None - }); - } - - // Wait for futures to complete - while unord.next().await.is_some() {} - - Ok(()) - } - - // Compute transfer statistics to determine how 'fast' a node is - #[instrument(level = "trace", skip(self), err)] - async fn rolling_transfers_task_routine( - self, - stop_token: StopToken, - last_ts: u64, - cur_ts: u64, - ) -> Result<(), String> { - // log_rtab!("--- rolling_transfers task"); - let inner = &mut *self.inner.lock(); - - // Roll our own node's transfers - inner.self_transfer_stats_accounting.roll_transfers( - last_ts, - cur_ts, - &mut inner.self_transfer_stats, - ); - - // Roll all bucket entry transfers - for b in &mut inner.buckets { - b.roll_transfers(last_ts, cur_ts); - } - Ok(()) - } + // fn operate_on_bucket_entry_mut(&self, node_id: DHTKey, f: F) -> T + // where + // F: FnOnce(&mut BucketEntryInner) -> T, + // { + // let inner = self.inner.read(); + // Self::operate_on_bucket_entry_inner_locked_mut(&*inner, node_id, f) + // } // Ticks about once per second // to run tick tasks which may run at slower tick rates as configured @@ -1206,7 +777,10 @@ impl RoutingTable { self.unlocked_inner.rolling_transfers_task.tick().await?; // If routing table has no live entries, then add the bootstrap nodes to it - if Self::get_entry_count(&mut *self.inner.lock(), BucketEntryState::Unreliable) == 0 { + let live_entry_count = + Self::get_entry_count(&*self.inner.read(), BucketEntryState::Unreliable); + + if live_entry_count == 0 { self.unlocked_inner.bootstrap_task.tick().await?; } @@ -1215,15 +789,18 @@ impl RoutingTable { let c = self.config.get(); c.network.dht.min_peer_count as usize }; - if Self::get_entry_count(&mut *self.inner.lock(), BucketEntryState::Unreliable) - < min_peer_count - { + if live_entry_count < min_peer_count { self.unlocked_inner.peer_minimum_refresh_task.tick().await?; } + // Ping validate some nodes to groom the table self.unlocked_inner.ping_validator_task.tick().await?; - // Keepalive + // Kick buckets task + let kick_bucket_queue_count = { self.inner.read().kick_queue.len() }; + if kick_bucket_queue_count > 0 { + self.unlocked_inner.kick_buckets_task.tick().await?; + } Ok(()) } @@ -1238,51 +815,50 @@ impl RoutingTable { expects_answer: bool, ) { self.inner - .lock() + .write() .self_transfer_stats_accounting .add_up(bytes); - node_ref.operate(|e| { + node_ref.operate_mut(|e| { e.question_sent(ts, bytes, expects_answer); }) } pub fn stats_question_rcvd(&self, node_ref: NodeRef, ts: u64, bytes: u64) { self.inner - .lock() + .write() .self_transfer_stats_accounting .add_down(bytes); - node_ref.operate(|e| { + node_ref.operate_mut(|e| { e.question_rcvd(ts, bytes); }) } pub fn stats_answer_sent(&self, node_ref: NodeRef, bytes: u64) { self.inner - .lock() + .write() .self_transfer_stats_accounting .add_up(bytes); - node_ref.operate(|e| { + node_ref.operate_mut(|e| { e.answer_sent(bytes); }) } pub fn stats_answer_rcvd(&self, node_ref: NodeRef, send_ts: u64, recv_ts: u64, bytes: u64) { - self.inner - .lock() - .self_transfer_stats_accounting - .add_down(bytes); - self.inner - .lock() - .self_latency_stats_accounting - .record_latency(recv_ts - send_ts); - node_ref.operate(|e| { + { + let mut inner = self.inner.write(); + inner.self_transfer_stats_accounting.add_down(bytes); + inner + .self_latency_stats_accounting + .record_latency(recv_ts - send_ts); + } + node_ref.operate_mut(|e| { e.answer_rcvd(send_ts, recv_ts, bytes); }) } pub fn stats_question_lost(&self, node_ref: NodeRef) { - node_ref.operate(|e| { + node_ref.operate_mut(|e| { e.question_lost(); }) } pub fn stats_failed_to_send(&self, node_ref: NodeRef, ts: u64, expects_answer: bool) { - node_ref.operate(|e| { + node_ref.operate_mut(|e| { e.failed_to_send(ts, expects_answer); }) } @@ -1293,10 +869,10 @@ impl RoutingTable { pub fn get_routing_table_health(&self) -> RoutingTableHealth { let mut health = RoutingTableHealth::default(); let cur_ts = intf::get_timestamp(); - let inner = self.inner.lock(); + let inner = self.inner.read(); for bucket in &inner.buckets { - for entry in bucket.entries() { - match entry.1.state(cur_ts) { + for (_, v) in bucket.entries() { + match v.with(|e| e.state(cur_ts)) { BucketEntryState::Reliable => { health.reliable_entry_count += 1; } diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 54082a13..c55c25f9 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -9,6 +9,7 @@ const CONNECTIONLESS_TIMEOUT_SECS: u32 = 29; pub struct NodeRef { routing_table: RoutingTable, node_id: DHTKey, + entry: Arc, filter: Option, #[cfg(feature = "tracking")] track_id: usize, @@ -17,15 +18,16 @@ pub struct NodeRef { impl NodeRef { pub fn new( routing_table: RoutingTable, - key: DHTKey, - entry: &mut BucketEntry, + node_id: DHTKey, + entry: Arc, filter: Option, ) -> Self { - entry.ref_count += 1; + entry.ref_count.fetch_add(1u32, Ordering::Relaxed); Self { routing_table, - node_id: key, + node_id, + entry, filter, #[cfg(feature = "tracking")] track_id: entry.track(), @@ -63,9 +65,16 @@ impl NodeRef { pub fn operate(&self, f: F) -> T where - F: FnOnce(&mut BucketEntry) -> T, + F: FnOnce(&BucketEntryInner) -> T, { - self.routing_table.operate_on_bucket_entry(self.node_id, f) + self.entry.with(f) + } + + pub fn operate_mut(&self, f: F) -> T + where + F: FnOnce(&mut BucketEntryInner) -> T, + { + self.entry.with_mut(f) } pub fn peer_info(&self) -> Option { @@ -75,7 +84,7 @@ impl NodeRef { self.operate(|e| e.has_seen_our_node_info()) } pub fn set_seen_our_node_info(&self) { - self.operate(|e| e.set_seen_our_node_info(true)); + self.operate_mut(|e| e.set_seen_our_node_info(true)); } pub fn network_class(&self) -> Option { self.operate(|e| e.node_info().map(|n| n.network_class)) @@ -266,17 +275,16 @@ impl NodeRef { impl Clone for NodeRef { fn clone(&self) -> Self { - self.operate(move |e| { - e.ref_count += 1; + self.entry.ref_count.fetch_add(1u32, Ordering::Relaxed); - Self { - routing_table: self.routing_table.clone(), - node_id: self.node_id, - filter: self.filter.clone(), - #[cfg(feature = "tracking")] - track_id: e.track(), - } - }) + Self { + routing_table: self.routing_table.clone(), + node_id: self.node_id, + entry: self.entry.clone(), + filter: self.filter.clone(), + #[cfg(feature = "tracking")] + track_id: e.track(), + } } } @@ -307,6 +315,11 @@ impl Drop for NodeRef { fn drop(&mut self) { #[cfg(feature = "tracking")] self.operate(|e| e.untrack(self.track_id)); - self.routing_table.drop_node_ref(self.node_id); + + // drop the noderef and queue a bucket kick if it was the last one + let new_ref_count = self.entry.ref_count.fetch_sub(1u32, Ordering::Relaxed) - 1; + if new_ref_count == 0 { + self.routing_table.queue_bucket_kick(self.node_id); + } } } diff --git a/veilid-core/src/routing_table/tasks.rs b/veilid-core/src/routing_table/tasks.rs new file mode 100644 index 00000000..6cb5a878 --- /dev/null +++ b/veilid-core/src/routing_table/tasks.rs @@ -0,0 +1,378 @@ +use super::*; + +use crate::dht::*; +use crate::xx::*; +use crate::*; + +impl RoutingTable { + // Compute transfer statistics to determine how 'fast' a node is + #[instrument(level = "trace", skip(self), err)] + pub(super) async fn rolling_transfers_task_routine( + self, + stop_token: StopToken, + last_ts: u64, + cur_ts: u64, + ) -> Result<(), String> { + // log_rtab!("--- rolling_transfers task"); + let mut inner = self.inner.write(); + let inner = &mut *inner; + + // Roll our own node's transfers + inner.self_transfer_stats_accounting.roll_transfers( + last_ts, + cur_ts, + &mut inner.self_transfer_stats, + ); + + // Roll all bucket entry transfers + for b in &mut inner.buckets { + b.roll_transfers(last_ts, cur_ts); + } + Ok(()) + } + + // Bootstrap lookup process + #[instrument(level = "trace", skip(self), ret, err)] + pub(super) async fn resolve_bootstrap( + &self, + bootstrap: Vec, + ) -> Result { + // Resolve from bootstrap root to bootstrap hostnames + let mut bsnames = Vec::::new(); + for bh in bootstrap { + // Get TXT record for bootstrap (bootstrap.veilid.net, or similar) + let records = intf::txt_lookup(&bh).await?; + for record in records { + // Split the bootstrap name record by commas + for rec in record.split(',') { + let rec = rec.trim(); + // If the name specified is fully qualified, go with it + let bsname = if rec.ends_with('.') { + rec.to_string() + } + // If the name is not fully qualified, prepend it to the bootstrap name + else { + format!("{}.{}", rec, bh) + }; + + // Add to the list of bootstrap name to look up + bsnames.push(bsname); + } + } + } + + // Get bootstrap nodes from hostnames concurrently + let mut unord = FuturesUnordered::new(); + for bsname in bsnames { + unord.push(async move { + // look up boostrap node txt records + let bsnirecords = match intf::txt_lookup(&bsname).await { + Err(e) => { + warn!("bootstrap node txt lookup failed for {}: {}", bsname, e); + return None; + } + Ok(v) => v, + }; + // for each record resolve into key/bootstraprecord pairs + let mut bootstrap_records: Vec<(DHTKey, BootstrapRecord)> = Vec::new(); + for bsnirecord in bsnirecords { + // Bootstrap TXT Record Format Version 0: + // txt_version,min_version,max_version,nodeid,hostname,dialinfoshort* + // + // Split bootstrap node record by commas. Example: + // 0,0,0,7lxDEabK_qgjbe38RtBa3IZLrud84P6NhGP-pRTZzdQ,bootstrap-dev-alpha.veilid.net,T5150,U5150,W5150/ws + let records: Vec = bsnirecord + .trim() + .split(',') + .map(|x| x.trim().to_owned()) + .collect(); + if records.len() < 6 { + warn!("invalid number of fields in bootstrap txt record"); + continue; + } + + // Bootstrap TXT record version + let txt_version: u8 = match records[0].parse::() { + Ok(v) => v, + Err(e) => { + warn!( + "invalid txt_version specified in bootstrap node txt record: {}", + e + ); + continue; + } + }; + if txt_version != BOOTSTRAP_TXT_VERSION { + warn!("unsupported bootstrap txt record version"); + continue; + } + + // Min/Max wire protocol version + let min_version: u8 = match records[1].parse::() { + Ok(v) => v, + Err(e) => { + warn!( + "invalid min_version specified in bootstrap node txt record: {}", + e + ); + continue; + } + }; + let max_version: u8 = match records[2].parse::() { + Ok(v) => v, + Err(e) => { + warn!( + "invalid max_version specified in bootstrap node txt record: {}", + e + ); + continue; + } + }; + + // Node Id + let node_id_str = &records[3]; + let node_id_key = match DHTKey::try_decode(node_id_str) { + Ok(v) => v, + Err(e) => { + warn!( + "Invalid node id in bootstrap node record {}: {}", + node_id_str, e + ); + continue; + } + }; + + // Hostname + let hostname_str = &records[4]; + + // If this is our own node id, then we skip it for bootstrap, in case we are a bootstrap node + if self.node_id() == node_id_key { + continue; + } + + // Resolve each record and store in node dial infos list + let mut bootstrap_record = BootstrapRecord { + min_version, + max_version, + dial_info_details: Vec::new(), + }; + for rec in &records[5..] { + let rec = rec.trim(); + let dial_infos = match DialInfo::try_vec_from_short(rec, hostname_str) { + Ok(dis) => dis, + Err(e) => { + warn!("Couldn't resolve bootstrap node dial info {}: {}", rec, e); + continue; + } + }; + + for di in dial_infos { + bootstrap_record.dial_info_details.push(DialInfoDetail { + dial_info: di, + class: DialInfoClass::Direct, + }); + } + } + bootstrap_records.push((node_id_key, bootstrap_record)); + } + Some(bootstrap_records) + }); + } + + let mut bsmap = BootstrapRecordMap::new(); + while let Some(bootstrap_records) = unord.next().await { + if let Some(bootstrap_records) = bootstrap_records { + for (bskey, mut bsrec) in bootstrap_records { + let rec = bsmap.entry(bskey).or_insert_with(|| BootstrapRecord { + min_version: bsrec.min_version, + max_version: bsrec.max_version, + dial_info_details: Vec::new(), + }); + rec.dial_info_details.append(&mut bsrec.dial_info_details); + } + } + } + + Ok(bsmap) + } + + #[instrument(level = "trace", skip(self), err)] + pub(super) async fn bootstrap_task_routine(self, stop_token: StopToken) -> Result<(), String> { + let (bootstrap, bootstrap_nodes) = { + let c = self.config.get(); + ( + c.network.bootstrap.clone(), + c.network.bootstrap_nodes.clone(), + ) + }; + + log_rtab!(debug "--- bootstrap_task"); + + // If we aren't specifying a bootstrap node list explicitly, then pull from the bootstrap server(s) + + let bsmap: BootstrapRecordMap = if !bootstrap_nodes.is_empty() { + let mut bsmap = BootstrapRecordMap::new(); + let mut bootstrap_node_dial_infos = Vec::new(); + for b in bootstrap_nodes { + let ndis = NodeDialInfo::from_str(b.as_str()) + .map_err(map_to_string) + .map_err(logthru_rtab!( + "Invalid node dial info in bootstrap entry: {}", + b + ))?; + bootstrap_node_dial_infos.push(ndis); + } + for ndi in bootstrap_node_dial_infos { + let node_id = ndi.node_id.key; + bsmap + .entry(node_id) + .or_insert_with(|| BootstrapRecord { + min_version: MIN_VERSION, + max_version: MAX_VERSION, + dial_info_details: Vec::new(), + }) + .dial_info_details + .push(DialInfoDetail { + dial_info: ndi.dial_info, + class: DialInfoClass::Direct, // Bootstraps are always directly reachable + }); + } + bsmap + } else { + // Resolve bootstrap servers and recurse their TXT entries + self.resolve_bootstrap(bootstrap).await? + }; + + // Map all bootstrap entries to a single key with multiple dialinfo + + // Run all bootstrap operations concurrently + let mut unord = FuturesUnordered::new(); + for (k, mut v) in bsmap { + // Sort dial info so we get the preferred order correct + v.dial_info_details.sort(); + + log_rtab!("--- bootstrapping {} with {:?}", k.encode(), &v); + + // Make invalid signed node info (no signature) + let nr = self + .register_node_with_signed_node_info( + k, + SignedNodeInfo::with_no_signature(NodeInfo { + network_class: NetworkClass::InboundCapable, // Bootstraps are always inbound capable + outbound_protocols: ProtocolSet::empty(), // Bootstraps do not participate in relaying and will not make outbound requests + min_version: v.min_version, // Minimum protocol version specified in txt record + max_version: v.max_version, // Maximum protocol version specified in txt record + dial_info_detail_list: v.dial_info_details, // Dial info is as specified in the bootstrap list + relay_peer_info: None, // Bootstraps never require a relay themselves + }), + ) + .map_err(logthru_rtab!(error "Couldn't add bootstrap node: {}", k))?; + + // Add this our futures to process in parallel + let this = self.clone(); + unord.push(async move { + // Need VALID signed peer info, so ask bootstrap to find_node of itself + // which will ensure it has the bootstrap's signed peer info as part of the response + let _ = this.find_target(nr.clone()).await; + + // Ensure we got the signed peer info + if !nr.operate(|e| e.has_valid_signed_node_info()) { + log_rtab!(warn + "bootstrap at {:?} did not return valid signed node info", + nr + ); + // If this node info is invalid, it will time out after being unpingable + } else { + // otherwise this bootstrap is valid, lets ask it to find ourselves now + this.reverse_find_node(nr, true).await + } + }); + } + + // Wait for all bootstrap operations to complete before we complete the singlefuture + while unord.next().await.is_some() {} + Ok(()) + } + + // Ping each node in the routing table if they need to be pinged + // to determine their reliability + #[instrument(level = "trace", skip(self), err)] + pub(super) async fn ping_validator_task_routine( + self, + stop_token: StopToken, + _last_ts: u64, + cur_ts: u64, + ) -> Result<(), String> { + let rpc = self.rpc_processor(); + let netman = self.network_manager(); + let relay_node_id = netman.relay_node().map(|nr| nr.node_id()); + + let mut unord = FuturesUnordered::new(); + { + let inner = self.inner.read(); + + Self::with_entries_unlocked(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { + if v.with(|e| e.needs_ping(&k, cur_ts, relay_node_id)) { + let nr = NodeRef::new(self.clone(), k, v, None); + unord.push(MustJoinHandle::new(intf::spawn_local( + rpc.clone().rpc_call_status(nr), + ))); + } + Option::<()>::None + }); + } + + // Wait for futures to complete + while unord.next().await.is_some() {} + + Ok(()) + } + + // Ask our remaining peers to give us more peers before we go + // back to the bootstrap servers to keep us from bothering them too much + #[instrument(level = "trace", skip(self), err)] + pub(super) async fn peer_minimum_refresh_task_routine( + self, + stop_token: StopToken, + ) -> Result<(), String> { + // get list of all peers we know about, even the unreliable ones, and ask them to find nodes close to our node too + let noderefs = { + let inner = self.inner.read(); + let mut noderefs = Vec::::with_capacity(inner.bucket_entry_count); + let cur_ts = intf::get_timestamp(); + Self::with_entries_unlocked(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { + noderefs.push(NodeRef::new(self.clone(), k, v, None)); + Option::<()>::None + }); + noderefs + }; + + // do peer minimum search concurrently + let mut unord = FuturesUnordered::new(); + for nr in noderefs { + log_rtab!("--- peer minimum search with {:?}", nr); + unord.push(self.reverse_find_node(nr, false)); + } + while unord.next().await.is_some() {} + + Ok(()) + } + + // Kick the queued buckets in the routing table to free dead nodes if necessary + // Attempts to keep the size of the routing table down to the bucket depth + #[instrument(level = "trace", skip(self), err)] + pub(super) async fn kick_buckets_task_routine( + self, + _stop_token: StopToken, + _last_ts: u64, + cur_ts: u64, + ) -> Result<(), String> { + let mut inner = self.inner.write(); + let kick_queue: Vec = inner.kick_queue.iter().map(|v| *v).collect(); + inner.kick_queue.clear(); + for idx in kick_queue { + Self::kick_bucket(&mut *inner, idx) + } + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index b2f7c532..6433ae35 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -839,7 +839,7 @@ impl RPCProcessor { // update node status for the requesting node to our routing table if let Some(sender_nr) = rpcreader.opt_sender_nr.clone() { // Update latest node status in routing table for the statusq sender - sender_nr.operate(|e| { + sender_nr.operate_mut(|e| { e.update_node_status(node_status); }); } @@ -917,7 +917,11 @@ impl RPCProcessor { let routing_table = self.routing_table(); let filter = DialInfoFilter::global().with_address_type(dial_info.address_type()); let sender_id = rpcreader.header.envelope.get_sender_id(); - let mut peers = routing_table.find_fast_public_nodes_filtered(&filter); + let node_count = { + let c = self.config.get(); + c.network.dht.max_find_node_count as usize + }; + let mut peers = routing_table.find_fast_public_nodes_filtered(node_count, &filter); if peers.is_empty() { return Err(rpc_error_internal(format!( "no peers matching filter '{:?}'", @@ -939,8 +943,8 @@ impl RPCProcessor { // Ensure the peer's status is known and that it is capable of // making outbound connections for the dial info we want to verify // and if this peer can validate dial info - let can_contact_dial_info = peer.operate(|e: &mut BucketEntry| { - if let Some(ni) = &e.node_info() { + let can_contact_dial_info = peer.operate(|e: &BucketEntryInner| { + if let Some(ni) = e.node_info() { ni.outbound_protocols.contains(dial_info.protocol_type()) && ni.can_validate_dial_info() } else { false @@ -951,7 +955,7 @@ impl RPCProcessor { } // See if this peer will validate dial info - let will_validate_dial_info = peer.operate(|e: &mut BucketEntry| { + let will_validate_dial_info = peer.operate(|e: &BucketEntryInner| { if let Some(status) = &e.peer_stats().status { status.will_validate_dial_info } else { @@ -1040,11 +1044,11 @@ impl RPCProcessor { let closest_nodes = routing_table.find_closest_nodes( target_node_id, // filter - Some(Box::new(move |kv| { - RoutingTable::filter_has_valid_signed_node_info(kv, own_peer_info_is_valid) - })), + Some(move |_k, v| { + RoutingTable::filter_has_valid_signed_node_info(v, own_peer_info_is_valid) + }), // transform - |e| RoutingTable::transform_to_peer_info(e, &own_peer_info), + move |k, v| RoutingTable::transform_to_peer_info(k, v, &own_peer_info), ); log_rpc!(">>>> Returning {} closest peers", closest_nodes.len()); @@ -1567,7 +1571,7 @@ impl RPCProcessor { }; // Update latest node status in routing table - peer.operate(|e| { + peer.operate_mut(|e| { e.update_node_status(node_status.clone()); });