initial import of main veilid core
This commit is contained in:
142
veilid-core/src/routing_table/bucket.rs
Normal file
142
veilid-core/src/routing_table/bucket.rs
Normal file
@@ -0,0 +1,142 @@
|
||||
use super::*;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Bucket {
|
||||
routing_table: RoutingTable,
|
||||
entries: BTreeMap<DHTKey, BucketEntry>,
|
||||
newest_entry: Option<DHTKey>,
|
||||
}
|
||||
pub(super) type EntriesIterMut<'a> =
|
||||
alloc::collections::btree_map::IterMut<'a, DHTKey, BucketEntry>;
|
||||
pub(super) type EntriesIter<'a> = alloc::collections::btree_map::Iter<'a, DHTKey, BucketEntry>;
|
||||
|
||||
fn state_ordering(state: BucketEntryState) -> usize {
|
||||
match state {
|
||||
BucketEntryState::Dead => 0,
|
||||
BucketEntryState::Unreliable => 1,
|
||||
BucketEntryState::Reliable => 2,
|
||||
}
|
||||
}
|
||||
|
||||
impl Bucket {
|
||||
pub fn new(routing_table: RoutingTable) -> Self {
|
||||
Self {
|
||||
routing_table: routing_table,
|
||||
entries: BTreeMap::new(),
|
||||
newest_entry: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn add_entry(&mut self, node_id: DHTKey) -> NodeRef {
|
||||
info!("Node added: {}", node_id.encode());
|
||||
|
||||
// Add new entry
|
||||
self.entries.insert(node_id, BucketEntry::new());
|
||||
|
||||
// This is now the newest bucket entry
|
||||
self.newest_entry = Some(node_id);
|
||||
|
||||
// Get a node ref to return
|
||||
let entry_ref = self.entries.get_mut(&node_id).unwrap();
|
||||
NodeRef::new(self.routing_table.clone(), node_id, entry_ref)
|
||||
}
|
||||
|
||||
pub(super) fn remove_entry(&mut self, node_id: &DHTKey) {
|
||||
info!("Node removed: {}", node_id);
|
||||
|
||||
// Remove the entry
|
||||
self.entries.remove(node_id);
|
||||
|
||||
// newest_entry is updated by kick_bucket()
|
||||
}
|
||||
|
||||
pub(super) fn roll_transfers(&mut self, last_ts: u64, cur_ts: u64) {
|
||||
// Called every ROLLING_TRANSFERS_INTERVAL_SECS
|
||||
for entry in &mut self.entries {
|
||||
entry.1.roll_transfers(last_ts, cur_ts);
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn entry_mut(&mut self, key: &DHTKey) -> Option<&mut BucketEntry> {
|
||||
self.entries.get_mut(key)
|
||||
}
|
||||
|
||||
pub(super) fn entries(&self) -> EntriesIter {
|
||||
self.entries.iter()
|
||||
}
|
||||
|
||||
pub(super) fn entries_mut(&mut self) -> EntriesIterMut {
|
||||
self.entries.iter_mut()
|
||||
}
|
||||
|
||||
pub(super) fn kick(&mut self, bucket_depth: usize) -> Option<BTreeSet<DHTKey>> {
|
||||
// Get number of entries to attempt to purge from bucket
|
||||
let bucket_len = self.entries.len();
|
||||
if bucket_len <= bucket_depth {
|
||||
return None;
|
||||
}
|
||||
// Try to purge the newest entries that overflow the bucket
|
||||
let mut dead_node_ids: BTreeSet<DHTKey> = BTreeSet::new();
|
||||
let mut extra_entries = bucket_len - bucket_depth;
|
||||
|
||||
// Get the sorted list of entries by their kick order
|
||||
let mut sorted_entries: Vec<(&_, &_)> = self.entries.iter().collect();
|
||||
let cur_ts = get_timestamp();
|
||||
sorted_entries.sort_by(
|
||||
|a: &(&DHTKey, &BucketEntry), b: &(&DHTKey, &BucketEntry)| -> core::cmp::Ordering {
|
||||
let ea = a.1;
|
||||
let eb = b.1;
|
||||
let astate = state_ordering(ea.state(cur_ts));
|
||||
let bstate = state_ordering(eb.state(cur_ts));
|
||||
// first kick dead nodes, then unreliable nodes
|
||||
if astate < bstate {
|
||||
return core::cmp::Ordering::Less;
|
||||
}
|
||||
if astate > bstate {
|
||||
return core::cmp::Ordering::Greater;
|
||||
}
|
||||
// then kick by time added, most recent nodes are kicked first
|
||||
let ata = ea.peer_stats().time_added;
|
||||
let bta = eb.peer_stats().time_added;
|
||||
bta.cmp(&ata)
|
||||
},
|
||||
);
|
||||
|
||||
self.newest_entry = None;
|
||||
for i in 0..sorted_entries.len() {
|
||||
// If we're not evicting more entries, exit, noting this may be the newest entry
|
||||
if extra_entries == 0 {
|
||||
// The first 'live' entry we find is our newest entry
|
||||
if self.newest_entry.is_none() {
|
||||
self.newest_entry = Some(sorted_entries[i].0.clone());
|
||||
}
|
||||
break;
|
||||
}
|
||||
extra_entries -= 1;
|
||||
|
||||
// if this entry has references we can't drop it yet
|
||||
if sorted_entries[i].1.ref_count > 0 {
|
||||
// The first 'live' entry we fine is our newest entry
|
||||
if self.newest_entry.is_none() {
|
||||
self.newest_entry = Some(sorted_entries[i].0.clone());
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// if no references, lets evict it
|
||||
dead_node_ids.insert(sorted_entries[i].0.clone());
|
||||
}
|
||||
|
||||
// Now purge the dead node ids
|
||||
for id in &dead_node_ids {
|
||||
// Remove the entry
|
||||
self.remove_entry(id);
|
||||
}
|
||||
|
||||
if dead_node_ids.len() > 0 {
|
||||
Some(dead_node_ids)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
446
veilid-core/src/routing_table/bucket_entry.rs
Normal file
446
veilid-core/src/routing_table/bucket_entry.rs
Normal file
@@ -0,0 +1,446 @@
|
||||
use super::*;
|
||||
|
||||
// Latency entry is per round-trip packet (ping or data)
|
||||
// - Size is number of entries
|
||||
const ROLLING_LATENCIES_SIZE: usize = 10;
|
||||
|
||||
// Transfers entries are in bytes total for the interval
|
||||
// - Size is number of entries
|
||||
// - Interval is number of seconds in each entry
|
||||
const ROLLING_TRANSFERS_SIZE: usize = 10;
|
||||
pub const ROLLING_TRANSFERS_INTERVAL_SECS: u32 = 10;
|
||||
|
||||
// Reliable pings are done with increased spacing between pings
|
||||
// - Start secs is the number of seconds between the first two pings
|
||||
// - Max secs is the maximum number of seconds between consecutive pings
|
||||
// - Multiplier changes the number of seconds between pings over time
|
||||
// making it longer as the node becomes more reliable
|
||||
const RELIABLE_PING_INTERVAL_START_SECS: u32 = 10;
|
||||
const RELIABLE_PING_INTERVAL_MAX_SECS: u32 = 10 * 60;
|
||||
const RELIABLE_PING_INTERVAL_MULTIPLIER: f64 = 2.0;
|
||||
|
||||
// Unreliable pings are done for a fixed amount of time while the
|
||||
// node is given a chance to come back online before it is made dead
|
||||
// If a node misses a single ping, it is marked unreliable and must
|
||||
// return reliable pings for the duration of the span before being
|
||||
// marked reliable again
|
||||
// - Span is the number of seconds total to attempt to validate the node
|
||||
// - Interval is the number of seconds between each ping
|
||||
const UNRELIABLE_PING_SPAN_SECS: u32 = 60;
|
||||
const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum BucketEntryState {
|
||||
Reliable,
|
||||
Unreliable,
|
||||
Dead,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BucketEntry {
|
||||
pub(super) ref_count: u32,
|
||||
min_max_version: Option<(u8, u8)>,
|
||||
last_connection: Option<(ConnectionDescriptor, u64)>,
|
||||
dial_info_entries: VecDeque<DialInfoEntry>,
|
||||
rolling_latencies: VecDeque<u64>,
|
||||
rolling_transfers: VecDeque<(u64, u64)>,
|
||||
current_transfer: (u64, u64),
|
||||
peer_stats: PeerStats,
|
||||
}
|
||||
|
||||
impl BucketEntry {
|
||||
pub(super) fn new() -> Self {
|
||||
Self {
|
||||
ref_count: 0,
|
||||
min_max_version: None,
|
||||
last_connection: None,
|
||||
dial_info_entries: VecDeque::new(),
|
||||
rolling_latencies: VecDeque::new(),
|
||||
rolling_transfers: VecDeque::new(),
|
||||
current_transfer: (0, 0),
|
||||
peer_stats: PeerStats {
|
||||
time_added: get_timestamp(),
|
||||
last_seen: None,
|
||||
ping_stats: PingStats::default(),
|
||||
latency: None,
|
||||
transfer: (TransferStats::default(), TransferStats::default()),
|
||||
node_info: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_dial_info(&mut self, dial_info: DialInfo) -> Result<(), String> {
|
||||
let mut idx: Option<usize> = None;
|
||||
for i in 0..self.dial_info_entries.len() {
|
||||
if self.dial_info_entries[i].dial_info() == &dial_info {
|
||||
idx = Some(i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
match idx {
|
||||
None => {
|
||||
self.dial_info_entries
|
||||
.push_front(DialInfoEntry::try_new(dial_info)?);
|
||||
}
|
||||
Some(idx) => {
|
||||
let die = self.dial_info_entries.remove(idx).unwrap();
|
||||
self.dial_info_entries.push_front(die);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn best_dial_info(&self) -> Option<DialInfo> {
|
||||
self.dial_info_entries
|
||||
.front()
|
||||
.map(|die| die.dial_info().clone())
|
||||
}
|
||||
|
||||
pub fn filtered_dial_info<F>(&self, filter: F) -> Option<DialInfo>
|
||||
where
|
||||
F: Fn(&DialInfoEntry) -> bool,
|
||||
{
|
||||
for die in &self.dial_info_entries {
|
||||
if filter(die) {
|
||||
return Some(die.dial_info().clone());
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub fn dial_info_entries_as_ref(&self) -> &VecDeque<DialInfoEntry> {
|
||||
&self.dial_info_entries
|
||||
}
|
||||
|
||||
pub fn dial_info(&self) -> Vec<DialInfo> {
|
||||
self.dial_info_entries
|
||||
.iter()
|
||||
.map(|e| e.dial_info().clone())
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn public_dial_info(&self) -> Vec<DialInfo> {
|
||||
self.dial_info_entries
|
||||
.iter()
|
||||
.filter_map(|e| {
|
||||
if e.is_public() {
|
||||
Some(e.dial_info().clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn public_dial_info_for_protocol(&self, protocol_type: ProtocolType) -> Vec<DialInfo> {
|
||||
self.dial_info_entries
|
||||
.iter()
|
||||
.filter_map(|e| {
|
||||
if e.dial_info().protocol_type() != protocol_type {
|
||||
None
|
||||
} else if e.is_public() {
|
||||
Some(e.dial_info().clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn private_dial_info(&self) -> Vec<DialInfo> {
|
||||
self.dial_info_entries
|
||||
.iter()
|
||||
.filter_map(|e| {
|
||||
if e.is_private() {
|
||||
Some(e.dial_info().clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn private_dial_info_for_protocol(&mut self, protocol_type: ProtocolType) -> Vec<DialInfo> {
|
||||
self.dial_info_entries
|
||||
.iter_mut()
|
||||
.filter_map(|e| {
|
||||
if e.dial_info().protocol_type() != protocol_type {
|
||||
None
|
||||
} else if e.is_private() {
|
||||
Some(e.dial_info().clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn get_peer_info(&self, key: DHTKey, scope: PeerScope) -> PeerInfo {
|
||||
PeerInfo {
|
||||
node_id: NodeId::new(key),
|
||||
dial_infos: match scope {
|
||||
PeerScope::All => self.dial_info(),
|
||||
PeerScope::Public => self.public_dial_info(),
|
||||
PeerScope::Private => self.private_dial_info(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_last_connection(&mut self, last_connection: ConnectionDescriptor, timestamp: u64) {
|
||||
self.last_connection = Some((last_connection, timestamp));
|
||||
|
||||
// sort the dialinfoentries by the last peer address if we have a match
|
||||
// if one particular peer address is being used and matches a dialinfoentry
|
||||
// then we should prefer it
|
||||
for i in 0..self.dial_info_entries.len() {
|
||||
let die = &mut self.dial_info_entries[i];
|
||||
|
||||
// see if we have a matching address
|
||||
if RoutingTable::dial_info_peer_address_match(
|
||||
die.dial_info(),
|
||||
&self.last_connection.as_ref().unwrap().0.remote,
|
||||
) {
|
||||
drop(die);
|
||||
|
||||
// push the most recent dialinfo to the front
|
||||
let dies = &mut self.dial_info_entries;
|
||||
let die = dies.remove(i).unwrap();
|
||||
dies.push_front(die);
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn last_connection(&self) -> Option<ConnectionDescriptor> {
|
||||
match self.last_connection.as_ref() {
|
||||
Some(x) => Some(x.0.clone()),
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_min_max_version(&mut self, min_max_version: (u8, u8)) {
|
||||
self.min_max_version = Some(min_max_version);
|
||||
}
|
||||
|
||||
pub fn min_max_version(&self) -> Option<(u8, u8)> {
|
||||
self.min_max_version
|
||||
}
|
||||
|
||||
pub fn state(&self, cur_ts: u64) -> BucketEntryState {
|
||||
if self.check_reliable(cur_ts) {
|
||||
BucketEntryState::Reliable
|
||||
} else if self.check_dead(cur_ts) {
|
||||
BucketEntryState::Dead
|
||||
} else {
|
||||
BucketEntryState::Unreliable
|
||||
}
|
||||
}
|
||||
|
||||
pub fn peer_stats(&self) -> &PeerStats {
|
||||
&self.peer_stats
|
||||
}
|
||||
|
||||
pub fn update_node_info(&mut self, node_info: NodeInfo) {
|
||||
self.peer_stats.node_info = Some(node_info);
|
||||
}
|
||||
|
||||
///// stats methods
|
||||
// called every ROLLING_TRANSFERS_INTERVAL_SECS seconds
|
||||
pub(super) fn roll_transfers(&mut self, last_ts: u64, cur_ts: u64) {
|
||||
let dur_ms = (cur_ts - last_ts) / 1000u64;
|
||||
while self.rolling_transfers.len() >= ROLLING_TRANSFERS_SIZE {
|
||||
self.rolling_transfers.pop_front();
|
||||
}
|
||||
self.rolling_transfers.push_back(self.current_transfer);
|
||||
self.current_transfer = (0, 0);
|
||||
|
||||
let xd = &mut self.peer_stats.transfer.0;
|
||||
let xu = &mut self.peer_stats.transfer.1;
|
||||
|
||||
xd.maximum = 0;
|
||||
xu.maximum = 0;
|
||||
xd.minimum = u64::MAX;
|
||||
xu.minimum = u64::MAX;
|
||||
xd.average = 0;
|
||||
xu.average = 0;
|
||||
for (rtd, rtu) in &self.rolling_transfers {
|
||||
let bpsd = rtd * 1000u64 / dur_ms;
|
||||
let bpsu = rtu * 1000u64 / dur_ms;
|
||||
if bpsd > xd.maximum {
|
||||
xd.maximum = bpsd;
|
||||
}
|
||||
if bpsu > xu.maximum {
|
||||
xu.maximum = bpsu;
|
||||
}
|
||||
if bpsd < xd.minimum {
|
||||
xd.minimum = bpsd;
|
||||
}
|
||||
if bpsu < xu.minimum {
|
||||
xu.minimum = bpsu;
|
||||
}
|
||||
xd.average += bpsd;
|
||||
xu.average += bpsu;
|
||||
}
|
||||
let len = self.rolling_transfers.len() as u64;
|
||||
xd.average /= len;
|
||||
xu.average /= len;
|
||||
// total remains unchanged
|
||||
}
|
||||
|
||||
// Called for every round trip packet we receive
|
||||
fn record_latency(&mut self, latency: u64) {
|
||||
while self.rolling_latencies.len() >= ROLLING_LATENCIES_SIZE {
|
||||
self.rolling_latencies.pop_front();
|
||||
}
|
||||
self.rolling_latencies.push_back(latency);
|
||||
|
||||
let mut ls = LatencyStats {
|
||||
fastest: 0,
|
||||
average: 0,
|
||||
slowest: 0,
|
||||
};
|
||||
for rl in &self.rolling_latencies {
|
||||
if *rl < ls.fastest {
|
||||
ls.fastest = *rl;
|
||||
}
|
||||
if *rl > ls.slowest {
|
||||
ls.slowest = *rl;
|
||||
}
|
||||
ls.average += *rl;
|
||||
}
|
||||
let len = self.rolling_latencies.len() as u64;
|
||||
ls.average /= len;
|
||||
|
||||
self.peer_stats.latency = Some(ls);
|
||||
}
|
||||
|
||||
///// state machine handling
|
||||
pub(super) fn check_reliable(&self, cur_ts: u64) -> bool {
|
||||
// if we have had consecutive ping replies for longer that UNRELIABLE_PING_SPAN_SECS
|
||||
match self.peer_stats.ping_stats.first_consecutive_pong_time {
|
||||
None => false,
|
||||
Some(ts) => (cur_ts - ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64),
|
||||
}
|
||||
}
|
||||
pub(super) fn check_dead(&self, cur_ts: u64) -> bool {
|
||||
// if we have not heard from the node at all for the duration of the unreliable ping span
|
||||
match self.peer_stats.last_seen {
|
||||
None => true,
|
||||
Some(ts) => (cur_ts - ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn needs_ping(&self, cur_ts: u64) -> bool {
|
||||
// if we need a ping right now to validate us
|
||||
match self.state(cur_ts) {
|
||||
BucketEntryState::Reliable => {
|
||||
// If we are in a reliable state, we need a ping on an exponential scale
|
||||
match self.peer_stats.ping_stats.last_pinged {
|
||||
None => true,
|
||||
Some(last_pinged) => {
|
||||
let first_consecutive_pong_time = self
|
||||
.peer_stats
|
||||
.ping_stats
|
||||
.first_consecutive_pong_time
|
||||
.unwrap();
|
||||
let start_of_reliable_time = first_consecutive_pong_time
|
||||
+ (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64);
|
||||
let reliable_cur = cur_ts - start_of_reliable_time;
|
||||
let reliable_last = last_pinged - start_of_reliable_time;
|
||||
|
||||
retry_falloff_log(
|
||||
reliable_last,
|
||||
reliable_cur,
|
||||
RELIABLE_PING_INTERVAL_START_SECS as u64 * 1000000u64,
|
||||
RELIABLE_PING_INTERVAL_MAX_SECS as u64 * 1000000u64,
|
||||
RELIABLE_PING_INTERVAL_MULTIPLIER,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
BucketEntryState::Unreliable => {
|
||||
// If we are in an unreliable state, we need a ping every UNRELIABLE_PING_INTERVAL_SECS seconds
|
||||
match self.peer_stats.ping_stats.last_pinged {
|
||||
None => true,
|
||||
Some(last_pinged) => {
|
||||
(cur_ts - last_pinged)
|
||||
>= (UNRELIABLE_PING_INTERVAL_SECS as u64 * 1000000u64)
|
||||
}
|
||||
}
|
||||
}
|
||||
BucketEntryState::Dead => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn touch_last_seen(&mut self, ts: u64) {
|
||||
// If we've heard from the node at all, we can always restart our lost ping count
|
||||
self.peer_stats.ping_stats.recent_lost_pings = 0;
|
||||
// Mark the node as seen
|
||||
self.peer_stats.last_seen = Some(ts);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////
|
||||
/// Called by RPC processor as events happen
|
||||
|
||||
pub fn ping_sent(&mut self, ts: u64, bytes: u64) {
|
||||
self.peer_stats.ping_stats.total_sent += 1;
|
||||
self.current_transfer.1 += bytes;
|
||||
self.peer_stats.ping_stats.in_flight += 1;
|
||||
self.peer_stats.ping_stats.last_pinged = Some(ts);
|
||||
}
|
||||
pub fn ping_rcvd(&mut self, ts: u64, bytes: u64) {
|
||||
self.current_transfer.0 += bytes;
|
||||
self.touch_last_seen(ts);
|
||||
}
|
||||
pub fn pong_sent(&mut self, _ts: u64, bytes: u64) {
|
||||
self.current_transfer.1 += bytes;
|
||||
}
|
||||
pub fn pong_rcvd(&mut self, send_ts: u64, recv_ts: u64, bytes: u64) {
|
||||
self.current_transfer.0 += bytes;
|
||||
self.peer_stats.ping_stats.in_flight -= 1;
|
||||
self.peer_stats.ping_stats.total_returned += 1;
|
||||
self.peer_stats.ping_stats.consecutive_pongs += 1;
|
||||
if self
|
||||
.peer_stats
|
||||
.ping_stats
|
||||
.first_consecutive_pong_time
|
||||
.is_none()
|
||||
{
|
||||
self.peer_stats.ping_stats.first_consecutive_pong_time = Some(recv_ts);
|
||||
}
|
||||
self.record_latency(recv_ts - send_ts);
|
||||
self.touch_last_seen(recv_ts);
|
||||
}
|
||||
pub fn ping_lost(&mut self, _ts: u64) {
|
||||
self.peer_stats.ping_stats.in_flight -= 1;
|
||||
self.peer_stats.ping_stats.recent_lost_pings += 1;
|
||||
self.peer_stats.ping_stats.consecutive_pongs = 0;
|
||||
self.peer_stats.ping_stats.first_consecutive_pong_time = None;
|
||||
}
|
||||
pub fn question_sent(&mut self, _ts: u64, bytes: u64) {
|
||||
self.current_transfer.1 += bytes;
|
||||
}
|
||||
pub fn question_rcvd(&mut self, ts: u64, bytes: u64) {
|
||||
self.current_transfer.0 += bytes;
|
||||
self.touch_last_seen(ts);
|
||||
}
|
||||
pub fn answer_sent(&mut self, _ts: u64, bytes: u64) {
|
||||
self.current_transfer.1 += bytes;
|
||||
}
|
||||
pub fn answer_rcvd(&mut self, send_ts: u64, recv_ts: u64, bytes: u64) {
|
||||
self.current_transfer.0 += bytes;
|
||||
self.record_latency(recv_ts - send_ts);
|
||||
self.touch_last_seen(recv_ts);
|
||||
}
|
||||
pub fn question_lost(&mut self, _ts: u64) {
|
||||
self.peer_stats.ping_stats.consecutive_pongs = 0;
|
||||
self.peer_stats.ping_stats.first_consecutive_pong_time = None;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for BucketEntry {
|
||||
fn drop(&mut self) {
|
||||
assert_eq!(self.ref_count, 0);
|
||||
}
|
||||
}
|
61
veilid-core/src/routing_table/dial_info_entry.rs
Normal file
61
veilid-core/src/routing_table/dial_info_entry.rs
Normal file
@@ -0,0 +1,61 @@
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DialInfoEntry {
|
||||
dial_info: DialInfo,
|
||||
resolved_address: IpAddr,
|
||||
}
|
||||
|
||||
impl DialInfoEntry {
|
||||
pub fn try_new(dial_info: DialInfo) -> Result<Self, String> {
|
||||
let addr = match dial_info.resolve() {
|
||||
Ok(a) => a,
|
||||
Err(_) => return Err("failed to resolve address".to_owned()),
|
||||
};
|
||||
Ok(Self {
|
||||
dial_info: dial_info,
|
||||
resolved_address: addr,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn dial_info(&self) -> &DialInfo {
|
||||
&self.dial_info
|
||||
}
|
||||
|
||||
pub fn address(&self) -> IpAddr {
|
||||
self.resolved_address
|
||||
}
|
||||
|
||||
pub fn resolve(&mut self) -> Result<IpAddr, String> {
|
||||
let addr = match self.dial_info.resolve() {
|
||||
Ok(a) => a,
|
||||
Err(_) => return Err("failed to resolve address".to_owned()),
|
||||
};
|
||||
self.resolved_address = addr;
|
||||
Ok(addr)
|
||||
}
|
||||
|
||||
pub fn matches_peer_scope(&self, scope: PeerScope) -> bool {
|
||||
match scope {
|
||||
PeerScope::All => true,
|
||||
PeerScope::Public => self.is_public(),
|
||||
PeerScope::Private => self.is_private(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_public(&self) -> bool {
|
||||
ipaddr_is_global(&self.resolved_address)
|
||||
}
|
||||
pub fn is_private(&self) -> bool {
|
||||
match self.resolved_address {
|
||||
IpAddr::V4(a) => ipv4addr_is_private(&a),
|
||||
IpAddr::V6(a) => ipv6addr_is_unicast_site_local(&a),
|
||||
}
|
||||
}
|
||||
pub fn is_valid(&self) -> bool {
|
||||
self.is_public() || self.is_private()
|
||||
}
|
||||
pub fn is_loopback(&self) -> bool {
|
||||
ipaddr_is_loopback(&self.resolved_address)
|
||||
}
|
||||
}
|
280
veilid-core/src/routing_table/find_nodes.rs
Normal file
280
veilid-core/src/routing_table/find_nodes.rs
Normal file
@@ -0,0 +1,280 @@
|
||||
use super::*;
|
||||
|
||||
use crate::dht::*;
|
||||
use crate::intf::*;
|
||||
use crate::xx::*;
|
||||
use crate::*;
|
||||
|
||||
impl RoutingTable {
|
||||
// Retrieve the fastest nodes in the routing table with a particular kind of protocol address type
|
||||
// Returns noderefs are are scoped to that address type only
|
||||
pub fn get_fast_nodes_of_type(
|
||||
&self,
|
||||
protocol_address_type: ProtocolAddressType,
|
||||
) -> Vec<NodeRef> {
|
||||
self.find_fastest_nodes(
|
||||
// filter
|
||||
Some(Box::new(
|
||||
move |params: &(&DHTKey, Option<&mut BucketEntry>)| {
|
||||
// Only interested in nodes with node info
|
||||
if let Some(node_info) = ¶ms.1.as_ref().unwrap().peer_stats().node_info {
|
||||
// Will the node validate dial info?
|
||||
// and does it have a UDPv4, public scope, dial info?
|
||||
if node_info.will_validate_dial_info
|
||||
&& params
|
||||
.1
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.dial_info_entries_as_ref()
|
||||
.iter()
|
||||
.find_map(|die| {
|
||||
if die.matches_peer_scope(PeerScope::Public)
|
||||
&& die.dial_info().protocol_address_type()
|
||||
== protocol_address_type
|
||||
{
|
||||
Some(())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.is_some()
|
||||
{
|
||||
// If so return true and include this node
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
},
|
||||
)),
|
||||
// transform
|
||||
|e| {
|
||||
NodeRef::new_filtered(
|
||||
self.clone(),
|
||||
*e.0,
|
||||
e.1.as_mut().unwrap(),
|
||||
protocol_address_type,
|
||||
)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub fn get_own_peer_info(&self, scope: PeerScope) -> PeerInfo {
|
||||
let dial_infos = match scope {
|
||||
PeerScope::All => {
|
||||
let mut divec = self.public_dial_info();
|
||||
divec.append(&mut self.local_dial_info());
|
||||
divec.dedup();
|
||||
divec
|
||||
}
|
||||
PeerScope::Public => self.public_dial_info(),
|
||||
PeerScope::Private => self.local_dial_info(),
|
||||
};
|
||||
|
||||
PeerInfo {
|
||||
node_id: NodeId::new(self.node_id()),
|
||||
dial_infos: dial_infos.iter().map(|x| x.dial_info.clone()).collect(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn transform_to_peer_info(
|
||||
kv: &mut (&DHTKey, Option<&mut BucketEntry>),
|
||||
scope: PeerScope,
|
||||
own_peer_info: &PeerInfo,
|
||||
) -> PeerInfo {
|
||||
match &kv.1 {
|
||||
None => own_peer_info.clone(),
|
||||
Some(entry) => entry.get_peer_info(*kv.0, scope),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn find_peers_with_sort_and_filter<F, C, T, O>(
|
||||
&self,
|
||||
node_count: usize,
|
||||
cur_ts: u64,
|
||||
filter: F,
|
||||
compare: C,
|
||||
transform: T,
|
||||
) -> Vec<O>
|
||||
where
|
||||
F: Fn(&(&DHTKey, Option<&mut BucketEntry>)) -> bool,
|
||||
C: Fn(
|
||||
&(&DHTKey, Option<&mut BucketEntry>),
|
||||
&(&DHTKey, Option<&mut BucketEntry>),
|
||||
) -> core::cmp::Ordering,
|
||||
T: Fn(&mut (&DHTKey, Option<&mut BucketEntry>)) -> O,
|
||||
{
|
||||
let mut inner = self.inner.lock();
|
||||
|
||||
// collect all the nodes for sorting
|
||||
let mut nodes =
|
||||
Vec::<(&DHTKey, Option<&mut BucketEntry>)>::with_capacity(inner.bucket_entry_count + 1);
|
||||
// add our own node (only one of there with the None entry)
|
||||
let self_node_id = inner.node_id.clone();
|
||||
let selfkv = (&self_node_id, None);
|
||||
if filter(&selfkv) {
|
||||
nodes.push(selfkv);
|
||||
}
|
||||
// add all nodes from buckets
|
||||
for b in &mut inner.buckets {
|
||||
for (k, v) in b.entries_mut() {
|
||||
// Don't bother with dead nodes
|
||||
if !v.check_dead(cur_ts) {
|
||||
// Apply filter
|
||||
let kv = (k, Some(v));
|
||||
if filter(&kv) {
|
||||
nodes.push(kv);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sort by preference for returning nodes
|
||||
nodes.sort_by(compare);
|
||||
|
||||
// return transformed vector for filtered+sorted nodes
|
||||
let cnt = usize::min(node_count, nodes.len());
|
||||
let mut out = Vec::<O>::with_capacity(cnt);
|
||||
for i in 0..cnt {
|
||||
let val = transform(&mut nodes[i]);
|
||||
out.push(val);
|
||||
}
|
||||
|
||||
out
|
||||
}
|
||||
|
||||
pub fn find_fastest_nodes<T, O>(
|
||||
&self,
|
||||
filter: Option<Box<dyn Fn(&(&DHTKey, Option<&mut BucketEntry>)) -> bool>>,
|
||||
transform: T,
|
||||
) -> Vec<O>
|
||||
where
|
||||
T: Fn(&mut (&DHTKey, Option<&mut BucketEntry>)) -> O,
|
||||
{
|
||||
let cur_ts = get_timestamp();
|
||||
let node_count = {
|
||||
let c = self.config.get();
|
||||
c.network.dht.max_find_node_count as usize
|
||||
};
|
||||
let out = self.find_peers_with_sort_and_filter(
|
||||
node_count,
|
||||
cur_ts,
|
||||
// filter
|
||||
|kv| {
|
||||
if kv.1.is_none() {
|
||||
// filter out self peer, as it is irrelevant to the 'fastest nodes' search
|
||||
false
|
||||
} else if filter.is_some() && !filter.as_ref().unwrap()(kv) {
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
},
|
||||
// sort
|
||||
|(a_key, a_entry), (b_key, b_entry)| {
|
||||
// same nodes are always the same
|
||||
if a_key == b_key {
|
||||
return core::cmp::Ordering::Equal;
|
||||
}
|
||||
// our own node always comes last (should not happen, here for completeness)
|
||||
if a_entry.is_none() {
|
||||
return core::cmp::Ordering::Greater;
|
||||
}
|
||||
if b_entry.is_none() {
|
||||
return core::cmp::Ordering::Less;
|
||||
}
|
||||
// reliable nodes come first
|
||||
let ae = a_entry.as_ref().unwrap();
|
||||
let be = b_entry.as_ref().unwrap();
|
||||
|
||||
let ra = ae.check_reliable(cur_ts);
|
||||
let rb = be.check_reliable(cur_ts);
|
||||
if ra != rb {
|
||||
if ra {
|
||||
return core::cmp::Ordering::Less;
|
||||
} else {
|
||||
return core::cmp::Ordering::Greater;
|
||||
}
|
||||
}
|
||||
|
||||
// latency is the next metric, closer nodes first
|
||||
let a_latency = match ae.peer_stats().latency.as_ref() {
|
||||
None => {
|
||||
// treat unknown latency as slow
|
||||
return core::cmp::Ordering::Greater;
|
||||
}
|
||||
Some(l) => l,
|
||||
};
|
||||
let b_latency = match be.peer_stats().latency.as_ref() {
|
||||
None => {
|
||||
// treat unknown latency as slow
|
||||
return core::cmp::Ordering::Less;
|
||||
}
|
||||
Some(l) => l,
|
||||
};
|
||||
// Sort by average latency
|
||||
a_latency.average.cmp(&b_latency.average)
|
||||
},
|
||||
// transform,
|
||||
transform,
|
||||
);
|
||||
trace!(">> find_fastest_nodes: node count = {}", out.len());
|
||||
out
|
||||
}
|
||||
|
||||
pub fn find_closest_nodes<T, O>(
|
||||
&self,
|
||||
node_id: DHTKey,
|
||||
filter: Option<Box<dyn Fn(&(&DHTKey, Option<&mut BucketEntry>)) -> bool>>,
|
||||
transform: T,
|
||||
) -> Vec<O>
|
||||
where
|
||||
T: Fn(&mut (&DHTKey, Option<&mut BucketEntry>)) -> O,
|
||||
{
|
||||
let cur_ts = get_timestamp();
|
||||
let node_count = {
|
||||
let c = self.config.get();
|
||||
c.network.dht.max_find_node_count as usize
|
||||
};
|
||||
let out = self.find_peers_with_sort_and_filter(
|
||||
node_count,
|
||||
cur_ts,
|
||||
// filter
|
||||
|kv| {
|
||||
if kv.1.is_none() {
|
||||
// include self peer, as it is relevant to the 'closest nodes' search
|
||||
true
|
||||
} else if filter.is_some() && !filter.as_ref().unwrap()(kv) {
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
},
|
||||
// sort
|
||||
|(a_key, a_entry), (b_key, b_entry)| {
|
||||
// same nodes are always the same
|
||||
if a_key == b_key {
|
||||
return core::cmp::Ordering::Equal;
|
||||
}
|
||||
// reliable nodes come first, pessimistically treating our own node as unreliable
|
||||
let ra = a_entry.as_ref().map_or(false, |x| x.check_reliable(cur_ts));
|
||||
let rb = b_entry.as_ref().map_or(false, |x| x.check_reliable(cur_ts));
|
||||
if ra != rb {
|
||||
if ra {
|
||||
return core::cmp::Ordering::Less;
|
||||
} else {
|
||||
return core::cmp::Ordering::Greater;
|
||||
}
|
||||
}
|
||||
|
||||
// distance is the next metric, closer nodes first
|
||||
let da = distance(a_key, &node_id);
|
||||
let db = distance(b_key, &node_id);
|
||||
da.cmp(&db)
|
||||
},
|
||||
// transform,
|
||||
transform,
|
||||
);
|
||||
trace!(">> find_closest_nodes: node count = {}", out.len());
|
||||
out
|
||||
}
|
||||
}
|
731
veilid-core/src/routing_table/mod.rs
Normal file
731
veilid-core/src/routing_table/mod.rs
Normal file
@@ -0,0 +1,731 @@
|
||||
mod bucket;
|
||||
mod bucket_entry;
|
||||
mod dial_info_entry;
|
||||
mod find_nodes;
|
||||
mod node_ref;
|
||||
|
||||
use bucket::*;
|
||||
pub use bucket_entry::*;
|
||||
pub use dial_info_entry::*;
|
||||
pub use find_nodes::*;
|
||||
pub use node_ref::*;
|
||||
|
||||
use crate::dht::*;
|
||||
use crate::intf::*;
|
||||
use crate::network_manager::*;
|
||||
use crate::rpc_processor::*;
|
||||
use crate::xx::*;
|
||||
use crate::*;
|
||||
use alloc::collections::VecDeque;
|
||||
use alloc::str::FromStr;
|
||||
use futures_util::stream::{FuturesUnordered, StreamExt};
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Ord, Eq)]
|
||||
pub enum DialInfoOrigin {
|
||||
Static,
|
||||
Discovered,
|
||||
Mapped,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, PartialOrd, Ord, Eq)]
|
||||
pub struct DialInfoDetail {
|
||||
pub dial_info: DialInfo,
|
||||
pub origin: DialInfoOrigin,
|
||||
pub network_class: Option<NetworkClass>,
|
||||
pub timestamp: u64,
|
||||
}
|
||||
|
||||
struct RoutingTableInner {
|
||||
network_manager: NetworkManager,
|
||||
node_id: DHTKey,
|
||||
node_id_secret: DHTKeySecret,
|
||||
buckets: Vec<Bucket>,
|
||||
//recent_nodes: VecDeque<DHTKey>,
|
||||
//closest_reliable_nodes: Vec<DHTKey>,
|
||||
//fastest_reliable_nodes: Vec<DHTKey>,
|
||||
//closest_nodes: Vec<DHTKey>,
|
||||
//fastest_nodes: Vec<DHTKey>,
|
||||
local_dial_info: Vec<DialInfoDetail>,
|
||||
public_dial_info: Vec<DialInfoDetail>,
|
||||
bucket_entry_count: usize,
|
||||
// Waiters
|
||||
eventual_changed_dial_info: Eventual,
|
||||
}
|
||||
|
||||
struct RoutingTableUnlockedInner {
|
||||
// Background processes
|
||||
rolling_transfers_task: TickTask,
|
||||
bootstrap_task: TickTask,
|
||||
peer_minimum_refresh_task: TickTask,
|
||||
ping_validator_task: TickTask,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RoutingTable {
|
||||
config: VeilidConfig,
|
||||
inner: Arc<Mutex<RoutingTableInner>>,
|
||||
unlocked_inner: Arc<RoutingTableUnlockedInner>,
|
||||
}
|
||||
|
||||
impl RoutingTable {
|
||||
fn new_inner(network_manager: NetworkManager) -> RoutingTableInner {
|
||||
RoutingTableInner {
|
||||
network_manager: network_manager,
|
||||
node_id: DHTKey::default(),
|
||||
node_id_secret: DHTKeySecret::default(),
|
||||
buckets: Vec::new(),
|
||||
//recent_nodes: VecDeque::new(),
|
||||
//closest_reliable_nodes: Vec::new(),
|
||||
//fastest_reliable_nodes: Vec::new(),
|
||||
//closest_nodes: Vec::new(),
|
||||
//fastest_nodes: Vec::new(),
|
||||
local_dial_info: Vec::new(),
|
||||
public_dial_info: Vec::new(),
|
||||
bucket_entry_count: 0,
|
||||
eventual_changed_dial_info: Eventual::new(),
|
||||
}
|
||||
}
|
||||
fn new_unlocked_inner(config: VeilidConfig) -> RoutingTableUnlockedInner {
|
||||
let c = config.get();
|
||||
RoutingTableUnlockedInner {
|
||||
rolling_transfers_task: TickTask::new(bucket_entry::ROLLING_TRANSFERS_INTERVAL_SECS),
|
||||
bootstrap_task: TickTask::new(1),
|
||||
peer_minimum_refresh_task: TickTask::new_us(c.network.dht.min_peer_refresh_time),
|
||||
ping_validator_task: TickTask::new(1),
|
||||
}
|
||||
}
|
||||
pub fn new(network_manager: NetworkManager) -> Self {
|
||||
let config = network_manager.config();
|
||||
let this = Self {
|
||||
config: config.clone(),
|
||||
inner: Arc::new(Mutex::new(Self::new_inner(network_manager))),
|
||||
unlocked_inner: Arc::new(Self::new_unlocked_inner(config)),
|
||||
};
|
||||
// Set rolling transfers tick task
|
||||
{
|
||||
let this2 = this.clone();
|
||||
this.unlocked_inner
|
||||
.rolling_transfers_task
|
||||
.set_routine(move |l, t| {
|
||||
Box::pin(this2.clone().rolling_transfers_task_routine(l, t))
|
||||
});
|
||||
}
|
||||
// Set bootstrap tick task
|
||||
{
|
||||
let this2 = this.clone();
|
||||
this.unlocked_inner
|
||||
.bootstrap_task
|
||||
.set_routine(move |_l, _t| Box::pin(this2.clone().bootstrap_task_routine()));
|
||||
}
|
||||
// Set peer minimum refresh tick task
|
||||
{
|
||||
let this2 = this.clone();
|
||||
this.unlocked_inner
|
||||
.peer_minimum_refresh_task
|
||||
.set_routine(move |_l, _t| {
|
||||
Box::pin(this2.clone().peer_minimum_refresh_task_routine())
|
||||
});
|
||||
}
|
||||
// Set ping validator tick task
|
||||
{
|
||||
let this2 = this.clone();
|
||||
this.unlocked_inner
|
||||
.ping_validator_task
|
||||
.set_routine(move |l, t| Box::pin(this2.clone().ping_validator_task_routine(l, t)));
|
||||
}
|
||||
this
|
||||
}
|
||||
|
||||
pub fn network_manager(&self) -> NetworkManager {
|
||||
self.inner.lock().network_manager.clone()
|
||||
}
|
||||
pub fn rpc_processor(&self) -> RPCProcessor {
|
||||
self.network_manager().rpc_processor()
|
||||
}
|
||||
|
||||
pub fn node_id(&self) -> DHTKey {
|
||||
self.inner.lock().node_id
|
||||
}
|
||||
|
||||
pub fn node_id_secret(&self) -> DHTKeySecret {
|
||||
self.inner.lock().node_id_secret
|
||||
}
|
||||
|
||||
pub fn has_local_dial_info(&self) -> bool {
|
||||
let inner = self.inner.lock();
|
||||
inner.local_dial_info.len() > 0
|
||||
}
|
||||
|
||||
pub fn local_dial_info(&self) -> Vec<DialInfoDetail> {
|
||||
let inner = self.inner.lock();
|
||||
inner.local_dial_info.clone()
|
||||
}
|
||||
|
||||
pub fn local_dial_info_for_protocol(&self, protocol_type: ProtocolType) -> Vec<DialInfoDetail> {
|
||||
let inner = self.inner.lock();
|
||||
inner
|
||||
.local_dial_info
|
||||
.iter()
|
||||
.filter_map(|di| {
|
||||
if di.dial_info.protocol_type() != protocol_type {
|
||||
None
|
||||
} else {
|
||||
Some(di.clone())
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn local_dial_info_for_protocol_address_type(
|
||||
&self,
|
||||
protocol_address_type: ProtocolAddressType,
|
||||
) -> Vec<DialInfoDetail> {
|
||||
let inner = self.inner.lock();
|
||||
inner
|
||||
.local_dial_info
|
||||
.iter()
|
||||
.filter_map(|di| {
|
||||
if di.dial_info.protocol_address_type() != protocol_address_type {
|
||||
None
|
||||
} else {
|
||||
Some(di.clone())
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn register_local_dial_info(&self, dial_info: DialInfo, origin: DialInfoOrigin) {
|
||||
let ts = get_timestamp();
|
||||
let mut inner = self.inner.lock();
|
||||
|
||||
inner.local_dial_info.push(DialInfoDetail {
|
||||
dial_info: dial_info.clone(),
|
||||
origin: origin,
|
||||
network_class: None,
|
||||
timestamp: ts,
|
||||
});
|
||||
|
||||
info!(
|
||||
"Local Dial Info: {} ({:?})",
|
||||
NodeDialInfoSingle {
|
||||
node_id: NodeId::new(inner.node_id),
|
||||
dial_info: dial_info.clone()
|
||||
}
|
||||
.to_string(),
|
||||
origin,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn clear_local_dial_info(&self) {
|
||||
self.inner.lock().local_dial_info.clear();
|
||||
}
|
||||
|
||||
pub fn has_public_dial_info(&self) -> bool {
|
||||
let inner = self.inner.lock();
|
||||
inner.public_dial_info.len() > 0
|
||||
}
|
||||
|
||||
pub fn public_dial_info(&self) -> Vec<DialInfoDetail> {
|
||||
let inner = self.inner.lock();
|
||||
inner.public_dial_info.clone()
|
||||
}
|
||||
|
||||
pub fn public_dial_info_for_protocol(
|
||||
&self,
|
||||
protocol_type: ProtocolType,
|
||||
) -> Vec<DialInfoDetail> {
|
||||
let inner = self.inner.lock();
|
||||
inner
|
||||
.public_dial_info
|
||||
.iter()
|
||||
.filter_map(|di| {
|
||||
if di.dial_info.protocol_type() != protocol_type {
|
||||
None
|
||||
} else {
|
||||
Some(di.clone())
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
pub fn public_dial_info_for_protocol_address_type(
|
||||
&self,
|
||||
protocol_address_type: ProtocolAddressType,
|
||||
) -> Vec<DialInfoDetail> {
|
||||
let inner = self.inner.lock();
|
||||
inner
|
||||
.public_dial_info
|
||||
.iter()
|
||||
.filter_map(|di| {
|
||||
if di.dial_info.protocol_address_type() != protocol_address_type {
|
||||
None
|
||||
} else {
|
||||
Some(di.clone())
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn register_public_dial_info(
|
||||
&self,
|
||||
dial_info: DialInfo,
|
||||
network_class: Option<NetworkClass>,
|
||||
origin: DialInfoOrigin,
|
||||
) {
|
||||
let ts = get_timestamp();
|
||||
let mut inner = self.inner.lock();
|
||||
|
||||
inner.public_dial_info.push(DialInfoDetail {
|
||||
dial_info: dial_info.clone(),
|
||||
origin: origin,
|
||||
network_class: network_class,
|
||||
timestamp: ts,
|
||||
});
|
||||
|
||||
info!(
|
||||
"Public Dial Info: {} ({:?}#{:?})",
|
||||
NodeDialInfoSingle {
|
||||
node_id: NodeId::new(inner.node_id),
|
||||
dial_info: dial_info.clone()
|
||||
}
|
||||
.to_string(),
|
||||
origin,
|
||||
network_class,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn clear_public_dial_info(&self) {
|
||||
self.inner.lock().public_dial_info.clear();
|
||||
}
|
||||
|
||||
pub async fn wait_changed_dial_info(&self) {
|
||||
let inst = self
|
||||
.inner
|
||||
.lock()
|
||||
.eventual_changed_dial_info
|
||||
.instance_empty();
|
||||
inst.await;
|
||||
}
|
||||
pub async fn trigger_changed_dial_info(&self) {
|
||||
let eventual = {
|
||||
let mut inner = self.inner.lock();
|
||||
let mut new_eventual = Eventual::new();
|
||||
core::mem::swap(&mut inner.eventual_changed_dial_info, &mut new_eventual);
|
||||
new_eventual
|
||||
};
|
||||
eventual.resolve().await;
|
||||
}
|
||||
|
||||
fn bucket_depth(index: usize) -> usize {
|
||||
match index {
|
||||
0 => 256,
|
||||
1 => 128,
|
||||
2 => 64,
|
||||
3 => 32,
|
||||
4 => 16,
|
||||
5 => 8,
|
||||
6 => 4,
|
||||
7 => 4,
|
||||
8 => 4,
|
||||
9 => 4,
|
||||
_ => 4,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn init(&self) -> Result<(), String> {
|
||||
let mut inner = self.inner.lock();
|
||||
// Size the buckets (one per bit)
|
||||
inner.buckets.reserve(DHT_KEY_LENGTH * 8);
|
||||
for _ in 0..DHT_KEY_LENGTH * 8 {
|
||||
let bucket = Bucket::new(self.clone());
|
||||
inner.buckets.push(bucket);
|
||||
}
|
||||
|
||||
// make local copy of node id for easy access
|
||||
let c = self.config.get();
|
||||
inner.node_id = c.network.node_id;
|
||||
inner.node_id_secret = c.network.node_id_secret;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn terminate(&self) {
|
||||
*self.inner.lock() = Self::new_inner(self.network_manager());
|
||||
}
|
||||
|
||||
// Just match address and port to help sort dialinfoentries for buckets
|
||||
// because inbound connections will not have dialinfo associated with them
|
||||
// but should have ip addresses if they have changed
|
||||
fn dial_info_peer_address_match(dial_info: &DialInfo, peer_addr: &PeerAddress) -> bool {
|
||||
match dial_info {
|
||||
DialInfo::UDP(_) => {
|
||||
peer_addr.protocol_type == ProtocolType::UDP
|
||||
&& peer_addr.port == dial_info.port()
|
||||
&& peer_addr.address.address_string() == dial_info.address_string()
|
||||
}
|
||||
DialInfo::TCP(_) => {
|
||||
peer_addr.protocol_type == ProtocolType::TCP
|
||||
&& peer_addr.port == dial_info.port()
|
||||
&& peer_addr.address.address_string() == dial_info.address_string()
|
||||
}
|
||||
DialInfo::WS(_) => {
|
||||
peer_addr.protocol_type == ProtocolType::WS
|
||||
&& peer_addr.port == dial_info.port()
|
||||
&& peer_addr.address.address_string() == dial_info.address_string()
|
||||
}
|
||||
DialInfo::WSS(_) => {
|
||||
peer_addr.protocol_type == ProtocolType::WSS
|
||||
&& peer_addr.port == dial_info.port()
|
||||
&& peer_addr.address.address_string() == dial_info.address_string()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Attempt to settle buckets and remove entries down to the desired number
|
||||
// which may not be possible due extant NodeRefs
|
||||
fn kick_bucket(inner: &mut RoutingTableInner, idx: usize) {
|
||||
let bucket = &mut inner.buckets[idx];
|
||||
let bucket_depth = Self::bucket_depth(idx);
|
||||
|
||||
if let Some(dead_node_ids) = bucket.kick(bucket_depth) {
|
||||
// Remove counts
|
||||
inner.bucket_entry_count -= dead_node_ids.len();
|
||||
debug!("Routing table now has {} nodes", inner.bucket_entry_count);
|
||||
|
||||
// Now purge the routing table inner vectors
|
||||
//let filter = |k: &DHTKey| dead_node_ids.contains(k);
|
||||
//inner.closest_reliable_nodes.retain(filter);
|
||||
//inner.fastest_reliable_nodes.retain(filter);
|
||||
//inner.closest_nodes.retain(filter);
|
||||
//inner.fastest_nodes.retain(filter);
|
||||
}
|
||||
}
|
||||
|
||||
fn find_bucket_index(inner: &RoutingTableInner, node_id: DHTKey) -> usize {
|
||||
distance(&node_id, &inner.node_id)
|
||||
.first_nonzero_bit()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn drop_node_ref(&self, node_id: DHTKey) {
|
||||
// Reduce ref count on entry
|
||||
let mut inner = self.inner.lock();
|
||||
let idx = Self::find_bucket_index(&*inner, node_id);
|
||||
let new_ref_count = {
|
||||
let bucket = &mut inner.buckets[idx];
|
||||
let entry = bucket.entry_mut(&node_id).unwrap();
|
||||
entry.ref_count -= 1;
|
||||
entry.ref_count
|
||||
};
|
||||
|
||||
// If this entry could possibly go away, kick the bucket
|
||||
if new_ref_count == 0 {
|
||||
// it important to do this in the same inner lock as the ref count decrease
|
||||
Self::kick_bucket(&mut *inner, idx);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_node_ref(&self, node_id: DHTKey) -> Result<NodeRef, String> {
|
||||
// Ensure someone isn't trying register this node itself
|
||||
if node_id == self.node_id() {
|
||||
return Err("can't register own node".to_owned());
|
||||
}
|
||||
|
||||
// Insert into bucket, possibly evicting the newest bucket member
|
||||
let noderef = match self.lookup_node_ref(node_id) {
|
||||
None => {
|
||||
// Make new entry
|
||||
let mut inner = self.inner.lock();
|
||||
let idx = Self::find_bucket_index(&*inner, node_id);
|
||||
let nr = {
|
||||
// Get the bucket for the entry
|
||||
let bucket = &mut inner.buckets[idx];
|
||||
// Add new entry
|
||||
let nr = bucket.add_entry(node_id);
|
||||
|
||||
// Update count
|
||||
inner.bucket_entry_count += 1;
|
||||
debug!("Routing table now has {} nodes", inner.bucket_entry_count);
|
||||
nr
|
||||
};
|
||||
|
||||
// Kick the bucket
|
||||
// It is important to do this in the same inner lock as the add_entry
|
||||
Self::kick_bucket(&mut *inner, idx);
|
||||
|
||||
nr
|
||||
}
|
||||
Some(nr) => nr,
|
||||
};
|
||||
|
||||
Ok(noderef)
|
||||
}
|
||||
|
||||
pub fn lookup_node_ref(&self, node_id: DHTKey) -> Option<NodeRef> {
|
||||
let mut inner = self.inner.lock();
|
||||
let idx = Self::find_bucket_index(&*inner, node_id);
|
||||
let bucket = &mut inner.buckets[idx];
|
||||
match bucket.entry_mut(&node_id) {
|
||||
None => None,
|
||||
Some(e) => Some(NodeRef::new(self.clone(), node_id, e)),
|
||||
}
|
||||
}
|
||||
|
||||
// Shortcut function to add a node to our routing table if it doesn't exist
|
||||
// and add the dial info we have for it, since that's pretty common
|
||||
pub fn register_node_with_dial_info(
|
||||
&self,
|
||||
node_id: DHTKey,
|
||||
dial_infos: &[DialInfo],
|
||||
) -> Result<NodeRef, String> {
|
||||
let nr = match self.create_node_ref(node_id) {
|
||||
Err(e) => {
|
||||
return Err(format!("Couldn't create node reference: {}", e));
|
||||
}
|
||||
Ok(v) => v,
|
||||
};
|
||||
|
||||
nr.operate(move |e| -> Result<(), String> {
|
||||
for di in dial_infos {
|
||||
e.add_dial_info(di.clone())?;
|
||||
}
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
Ok(nr)
|
||||
}
|
||||
|
||||
// Shortcut function to add a node to our routing table if it doesn't exist
|
||||
// and add the last peer address we have for it, since that's pretty common
|
||||
pub fn register_node_with_existing_connection(
|
||||
&self,
|
||||
node_id: DHTKey,
|
||||
descriptor: ConnectionDescriptor,
|
||||
timestamp: u64,
|
||||
) -> Result<NodeRef, String> {
|
||||
let nr = match self.create_node_ref(node_id) {
|
||||
Err(e) => {
|
||||
return Err(format!("Couldn't create node reference: {}", e));
|
||||
}
|
||||
Ok(v) => v,
|
||||
};
|
||||
|
||||
nr.operate(move |e| {
|
||||
// set the most recent node address for connection finding and udp replies
|
||||
e.set_last_connection(descriptor, timestamp);
|
||||
});
|
||||
|
||||
Ok(nr)
|
||||
}
|
||||
|
||||
fn operate_on_bucket_entry<T, F>(&self, node_id: DHTKey, f: F) -> T
|
||||
where
|
||||
F: FnOnce(&mut BucketEntry) -> T,
|
||||
{
|
||||
let mut inner = self.inner.lock();
|
||||
let idx = Self::find_bucket_index(&*inner, node_id);
|
||||
let bucket = &mut inner.buckets[idx];
|
||||
let entry = bucket.entry_mut(&node_id).unwrap();
|
||||
f(entry)
|
||||
}
|
||||
|
||||
pub async fn find_self(&self, node_ref: NodeRef) -> Result<Vec<NodeRef>, String> {
|
||||
let node_id = self.node_id();
|
||||
let rpc_processor = self.rpc_processor();
|
||||
|
||||
let res = match rpc_processor
|
||||
.rpc_call_find_node(
|
||||
Destination::Direct(node_ref.clone()),
|
||||
node_id,
|
||||
None,
|
||||
RespondTo::Sender,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
return Err(format!("couldn't contact node at {:?}: {}", &node_ref, e));
|
||||
}
|
||||
};
|
||||
trace!(
|
||||
"find_self for at {:?} answered in {}ms",
|
||||
&node_ref,
|
||||
timestamp_to_secs(res.latency) * 1000.0f64
|
||||
);
|
||||
|
||||
// register nodes we'd found
|
||||
let mut out = Vec::<NodeRef>::with_capacity(res.peers.len());
|
||||
for p in res.peers {
|
||||
// if our own node if is in the list then ignore it, as we don't add ourselves to our own routing table
|
||||
if p.node_id.key == node_id {
|
||||
// however, it is useful to note when
|
||||
continue;
|
||||
}
|
||||
|
||||
// register the node if it's new
|
||||
let nr = match self.register_node_with_dial_info(p.node_id.key, &p.dial_infos) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
return Err(format!(
|
||||
"couldn't register node {} at {:?}: {}",
|
||||
p.node_id.key, &p.dial_infos, e
|
||||
));
|
||||
}
|
||||
};
|
||||
out.push(nr);
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
pub async fn reverse_find_node(&self, node_ref: NodeRef, wide: bool) {
|
||||
// Ask bootstrap node to 'find' our own node so we can get some more nodes near ourselves
|
||||
// and then contact those nodes to inform -them- that we exist
|
||||
|
||||
// Ask bootstrap server for nodes closest to our own node
|
||||
let closest_nodes = match self.find_self(node_ref.clone()).await {
|
||||
Err(e) => {
|
||||
error!(
|
||||
"reverse_find_node: find_self failed for {:?}: {}",
|
||||
&node_ref, e
|
||||
);
|
||||
return;
|
||||
}
|
||||
Ok(v) => v,
|
||||
};
|
||||
|
||||
// Ask each node near us to find us as well
|
||||
if wide {
|
||||
for closest_nr in closest_nodes {
|
||||
match self.find_self(closest_nr.clone()).await {
|
||||
Err(e) => {
|
||||
error!(
|
||||
"reverse_find_node: closest node find_self failed for {:?}: {}",
|
||||
&closest_nr, e
|
||||
);
|
||||
return;
|
||||
}
|
||||
Ok(v) => v,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn bootstrap_task_routine(self) -> Result<(), String> {
|
||||
let bootstrap = {
|
||||
let c = self.config.get();
|
||||
c.network.bootstrap.clone()
|
||||
};
|
||||
|
||||
// Map all bootstrap entries to a single key with multiple dialinfo
|
||||
let mut bsmap: BTreeMap<DHTKey, Vec<DialInfo>> = BTreeMap::new();
|
||||
for b in bootstrap {
|
||||
let ndis = match NodeDialInfoSingle::from_str(b.as_str()) {
|
||||
Err(_) => {
|
||||
return Err(format!("Invalid dial info in bootstrap entry: {}", b));
|
||||
}
|
||||
Ok(v) => v,
|
||||
};
|
||||
let node_id = ndis.node_id.key;
|
||||
bsmap
|
||||
.entry(node_id)
|
||||
.or_insert(Vec::new())
|
||||
.push(ndis.dial_info);
|
||||
}
|
||||
|
||||
// Run all bootstrap operations concurrently
|
||||
let mut unord = FuturesUnordered::new();
|
||||
for (k, v) in bsmap {
|
||||
let nr = match self.register_node_with_dial_info(k, &v) {
|
||||
Ok(nr) => nr,
|
||||
Err(e) => {
|
||||
return Err(format!("Couldn't add bootstrap node: {}", e));
|
||||
}
|
||||
};
|
||||
|
||||
info!("Bootstrapping {} with {:?}", k.encode(), &v);
|
||||
unord.push(self.reverse_find_node(nr, true));
|
||||
}
|
||||
while unord.next().await.is_some() {}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////
|
||||
/// Peer ping validation
|
||||
|
||||
// Ask our remaining peers to give us more peers before we go
|
||||
// back to the bootstrap servers to keep us from bothering them too much
|
||||
async fn peer_minimum_refresh_task_routine(self) -> Result<(), String> {
|
||||
// get list of all peers we know about, even the unreliable ones, and ask them to bootstrap too
|
||||
let noderefs = {
|
||||
let mut inner = self.inner.lock();
|
||||
let mut noderefs = Vec::<NodeRef>::with_capacity(inner.bucket_entry_count);
|
||||
for b in &mut inner.buckets {
|
||||
for (k, entry) in b.entries_mut() {
|
||||
noderefs.push(NodeRef::new(self.clone(), *k, entry))
|
||||
}
|
||||
}
|
||||
noderefs
|
||||
};
|
||||
|
||||
// do peer minimum search concurrently
|
||||
let mut unord = FuturesUnordered::new();
|
||||
for nr in noderefs {
|
||||
debug!("Peer minimum search with {:?}", nr);
|
||||
unord.push(self.reverse_find_node(nr, false));
|
||||
}
|
||||
while unord.next().await.is_some() {}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Ping each node in the routing table if they need to be pinged
|
||||
// to determine their reliability
|
||||
async fn ping_validator_task_routine(self, _last_ts: u64, cur_ts: u64) -> Result<(), String> {
|
||||
let rpc = self.rpc_processor();
|
||||
let mut inner = self.inner.lock();
|
||||
for b in &mut inner.buckets {
|
||||
for (k, entry) in b.entries_mut() {
|
||||
if entry.needs_ping(cur_ts) {
|
||||
let nr = NodeRef::new(self.clone(), *k, entry);
|
||||
intf::spawn_local(rpc.clone().rpc_call_info(nr)).detach();
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Compute transfer statistics to determine how 'fast' a node is
|
||||
async fn rolling_transfers_task_routine(self, last_ts: u64, cur_ts: u64) -> Result<(), String> {
|
||||
let mut inner = self.inner.lock();
|
||||
for b in &mut inner.buckets {
|
||||
b.roll_transfers(last_ts, cur_ts);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Ticks about once per second
|
||||
// to run tick tasks which may run at slower tick rates as configured
|
||||
pub async fn tick(&self) -> Result<(), String> {
|
||||
// Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs
|
||||
self.unlocked_inner.rolling_transfers_task.tick().await?;
|
||||
|
||||
// If routing table is empty, then add the bootstrap nodes to it
|
||||
if self.inner.lock().bucket_entry_count == 0 {
|
||||
self.unlocked_inner.bootstrap_task.tick().await?;
|
||||
}
|
||||
|
||||
// If we still don't have enough peers, find nodes until we do
|
||||
let min_peer_count = {
|
||||
let c = self.config.get();
|
||||
c.network.dht.min_peer_count as usize
|
||||
};
|
||||
if self.inner.lock().bucket_entry_count < min_peer_count {
|
||||
self.unlocked_inner.peer_minimum_refresh_task.tick().await?;
|
||||
}
|
||||
// Ping validate some nodes to groom the table
|
||||
self.unlocked_inner.ping_validator_task.tick().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
108
veilid-core/src/routing_table/node_ref.rs
Normal file
108
veilid-core/src/routing_table/node_ref.rs
Normal file
@@ -0,0 +1,108 @@
|
||||
use super::*;
|
||||
use crate::dht::*;
|
||||
use alloc::fmt;
|
||||
|
||||
pub struct NodeRef {
|
||||
routing_table: RoutingTable,
|
||||
node_id: DHTKey,
|
||||
protocol_address_type: Option<ProtocolAddressType>,
|
||||
}
|
||||
|
||||
impl NodeRef {
|
||||
pub fn new(routing_table: RoutingTable, key: DHTKey, entry: &mut BucketEntry) -> Self {
|
||||
entry.ref_count += 1;
|
||||
Self {
|
||||
routing_table: routing_table,
|
||||
node_id: key,
|
||||
protocol_address_type: None,
|
||||
}
|
||||
}
|
||||
pub fn new_filtered(
|
||||
routing_table: RoutingTable,
|
||||
key: DHTKey,
|
||||
entry: &mut BucketEntry,
|
||||
protocol_address_type: ProtocolAddressType,
|
||||
) -> Self {
|
||||
entry.ref_count += 1;
|
||||
Self {
|
||||
routing_table: routing_table,
|
||||
node_id: key,
|
||||
protocol_address_type: Some(protocol_address_type),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn node_id(&self) -> DHTKey {
|
||||
self.node_id
|
||||
}
|
||||
|
||||
pub fn protocol_address_type(&self) -> Option<ProtocolAddressType> {
|
||||
self.protocol_address_type
|
||||
}
|
||||
|
||||
pub fn set_protocol_address_type(
|
||||
&mut self,
|
||||
protocol_address_type: Option<ProtocolAddressType>,
|
||||
) {
|
||||
self.protocol_address_type = protocol_address_type;
|
||||
}
|
||||
|
||||
pub fn operate<T, F>(&self, f: F) -> T
|
||||
where
|
||||
F: FnOnce(&mut BucketEntry) -> T,
|
||||
{
|
||||
self.routing_table.operate_on_bucket_entry(self.node_id, f)
|
||||
}
|
||||
|
||||
pub fn dial_info(&self) -> Option<DialInfo> {
|
||||
match self.protocol_address_type {
|
||||
None => self.operate(|e| e.best_dial_info()),
|
||||
Some(pat) => self.operate(|e| {
|
||||
e.filtered_dial_info(|die| die.dial_info().protocol_address_type() == pat)
|
||||
}),
|
||||
}
|
||||
}
|
||||
pub fn last_connection(&self) -> Option<ConnectionDescriptor> {
|
||||
match self.operate(|e| e.last_connection()) {
|
||||
None => None,
|
||||
Some(c) => {
|
||||
if let Some(protocol_address_type) = self.protocol_address_type {
|
||||
if c.remote.protocol_address_type() == protocol_address_type {
|
||||
Some(c)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
Some(c)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for NodeRef {
|
||||
fn clone(&self) -> Self {
|
||||
self.operate(move |e| {
|
||||
e.ref_count += 1;
|
||||
});
|
||||
Self {
|
||||
routing_table: self.routing_table.clone(),
|
||||
node_id: self.node_id.clone(),
|
||||
protocol_address_type: self.protocol_address_type,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for NodeRef {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self.protocol_address_type {
|
||||
None => write!(f, "{}", self.node_id.encode()),
|
||||
Some(pat) => write!(f, "{}#{:?}", self.node_id.encode(), pat),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for NodeRef {
|
||||
fn drop(&mut self) {
|
||||
self.routing_table.drop_node_ref(self.node_id);
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user