This commit is contained in:
John Smith
2021-12-10 20:14:33 -05:00
parent 7e967b22af
commit c5113623be
24 changed files with 596 additions and 262 deletions

View File

@@ -8,7 +8,7 @@ pub struct Bucket {
}
pub(super) type EntriesIterMut<'a> =
alloc::collections::btree_map::IterMut<'a, DHTKey, BucketEntry>;
//pub(super) type EntriesIter<'a> = alloc::collections::btree_map::Iter<'a, DHTKey, BucketEntry>;
pub(super) type EntriesIter<'a> = alloc::collections::btree_map::Iter<'a, DHTKey, BucketEntry>;
fn state_ordering(state: BucketEntryState) -> usize {
match state {
@@ -61,9 +61,9 @@ impl Bucket {
self.entries.get_mut(key)
}
// pub(super) fn entries(&self) -> EntriesIter {
// self.entries.iter()
// }
pub(super) fn entries(&self) -> EntriesIter {
self.entries.iter()
}
pub(super) fn entries_mut(&mut self) -> EntriesIterMut {
self.entries.iter_mut()
@@ -72,9 +72,12 @@ impl Bucket {
pub(super) fn kick(&mut self, bucket_depth: usize) -> Option<BTreeSet<DHTKey>> {
// Get number of entries to attempt to purge from bucket
let bucket_len = self.entries.len();
// Don't bother kicking bucket unless it is full
if bucket_len <= bucket_depth {
return None;
}
// Try to purge the newest entries that overflow the bucket
let mut dead_node_ids: BTreeSet<DHTKey> = BTreeSet::new();
let mut extra_entries = bucket_len - bucket_depth;

View File

@@ -19,11 +19,11 @@ const RELIABLE_PING_INTERVAL_MULTIPLIER: f64 = 2.0;
const UNRELIABLE_PING_SPAN_SECS: u32 = 60;
const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5;
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum BucketEntryState {
Reliable,
Unreliable,
Dead,
Unreliable,
Reliable,
}
#[derive(Debug, Clone)]
@@ -243,14 +243,18 @@ impl BucketEntry {
// if we have had consecutive ping replies for longer that UNRELIABLE_PING_SPAN_SECS
match self.peer_stats.ping_stats.first_consecutive_pong_time {
None => false,
Some(ts) => (cur_ts - ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64),
Some(ts) => {
cur_ts.saturating_sub(ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64)
}
}
}
pub(super) fn check_dead(&self, cur_ts: u64) -> bool {
// if we have not heard from the node at all for the duration of the unreliable ping span
match self.peer_stats.last_seen {
None => true,
Some(ts) => (cur_ts - ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64),
Some(ts) => {
cur_ts.saturating_sub(ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64)
}
}
}
@@ -268,9 +272,10 @@ impl BucketEntry {
.first_consecutive_pong_time
.unwrap();
let start_of_reliable_time = first_consecutive_pong_time
+ (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64);
let reliable_cur = cur_ts - start_of_reliable_time;
let reliable_last = last_pinged - start_of_reliable_time;
+ ((UNRELIABLE_PING_SPAN_SECS - UNRELIABLE_PING_INTERVAL_SECS) as u64
* 1000000u64);
let reliable_cur = cur_ts.saturating_sub(start_of_reliable_time);
let reliable_last = last_pinged.saturating_sub(start_of_reliable_time);
retry_falloff_log(
reliable_last,
@@ -287,7 +292,7 @@ impl BucketEntry {
match self.peer_stats.ping_stats.last_pinged {
None => true,
Some(last_pinged) => {
(cur_ts - last_pinged)
cur_ts.saturating_sub(last_pinged)
>= (UNRELIABLE_PING_INTERVAL_SECS as u64 * 1000000u64)
}
}
@@ -303,6 +308,43 @@ impl BucketEntry {
self.peer_stats.last_seen = Some(ts);
}
pub(super) fn state_debug_info(&self, cur_ts: u64) -> String {
let last_pinged = if let Some(last_pinged) = self.peer_stats.ping_stats.last_pinged {
format!(
"{}s ago",
timestamp_to_secs(cur_ts.saturating_sub(last_pinged))
)
} else {
"never".to_owned()
};
let first_consecutive_pong_time = if let Some(first_consecutive_pong_time) =
self.peer_stats.ping_stats.first_consecutive_pong_time
{
format!(
"{}s ago",
timestamp_to_secs(cur_ts.saturating_sub(first_consecutive_pong_time))
)
} else {
"never".to_owned()
};
let last_seen = if let Some(last_seen) = self.peer_stats.last_seen {
format!(
"{}s ago",
timestamp_to_secs(cur_ts.saturating_sub(last_seen))
)
} else {
"never".to_owned()
};
format!(
"state: {:?}, first_consecutive_pong_time: {}, last_pinged: {}, last_seen: {}",
self.state(cur_ts),
first_consecutive_pong_time,
last_pinged,
last_seen
)
}
////////////////////////////////////////////////////////////////
/// Called when rpc processor things happen

View File

@@ -213,10 +213,14 @@ impl RoutingTable {
.to_string(),
);
debug!(" Origin: {:?}", origin);
Self::trigger_changed_dial_info(&mut *inner);
}
pub fn clear_local_dial_info(&self) {
self.inner.lock().local_dial_info.clear();
let mut inner = self.inner.lock();
inner.local_dial_info.clear();
Self::trigger_changed_dial_info(&mut *inner);
}
pub fn has_global_dial_info(&self) -> bool {
@@ -290,10 +294,13 @@ impl RoutingTable {
);
debug!(" Origin: {:?}", origin);
debug!(" Network Class: {:?}", network_class);
Self::trigger_changed_dial_info(&mut *inner);
}
pub fn clear_global_dial_info(&self) {
self.inner.lock().global_dial_info.clear();
let mut inner = self.inner.lock();
inner.global_dial_info.clear();
Self::trigger_changed_dial_info(&mut *inner);
}
pub async fn wait_changed_dial_info(&self) {
@@ -304,14 +311,10 @@ impl RoutingTable {
.instance_empty();
inst.await;
}
pub async fn trigger_changed_dial_info(&self) {
let eventual = {
let mut inner = self.inner.lock();
let mut new_eventual = Eventual::new();
core::mem::swap(&mut inner.eventual_changed_dial_info, &mut new_eventual);
new_eventual
};
eventual.resolve().await;
fn trigger_changed_dial_info(inner: &mut RoutingTableInner) {
let mut new_eventual = Eventual::new();
core::mem::swap(&mut inner.eventual_changed_dial_info, &mut new_eventual);
spawn(new_eventual.resolve()).detach();
}
fn bucket_depth(index: usize) -> usize {
@@ -351,6 +354,38 @@ impl RoutingTable {
*self.inner.lock() = Self::new_inner(self.network_manager());
}
// debugging info
pub fn debug_info(&self, min_state: BucketEntryState) -> String {
let inner = self.inner.lock();
let cur_ts = get_timestamp();
let mut out = String::new();
const COLS: usize = 16;
let rows = inner.buckets.len() / COLS;
let mut r = 0;
let mut b = 0;
out += "Buckets:\n";
while r < rows {
let mut c = 0;
out += format!(" {:>3}: ", b).as_str();
while c < COLS {
let mut cnt = 0;
for e in inner.buckets[b].entries() {
if e.1.state(cur_ts) >= min_state {
cnt += 1;
}
}
out += format!("{:>3} ", cnt).as_str();
b += 1;
c += 1;
}
out += "\n";
r += 1;
}
out
}
// Just match address and port to help sort dialinfoentries for buckets
// because inbound connections will not have dialinfo associated with them
// but should have ip addresses if they have changed
@@ -613,7 +648,7 @@ impl RoutingTable {
c.network.bootstrap.clone()
};
trace!("Bootstrap task with: {:?}", bootstrap);
debug!("--- bootstrap_task");
// Map all bootstrap entries to a single key with multiple dialinfo
let mut bsmap: BTreeMap<DHTKey, Vec<DialInfo>> = BTreeMap::new();
@@ -630,6 +665,7 @@ impl RoutingTable {
.or_insert_with(Vec::new)
.push(ndis.dial_info);
}
trace!(" bootstrap list: {:?}", bsmap);
// Run all bootstrap operations concurrently
let mut unord = FuturesUnordered::new();
@@ -641,7 +677,7 @@ impl RoutingTable {
}
};
info!("Bootstrapping {} with {:?}", k.encode(), &v);
trace!(" bootstrapping {} with {:?}", k.encode(), &v);
unord.push(self.reverse_find_node(nr, true));
}
while unord.next().await.is_some() {}
@@ -654,6 +690,8 @@ impl RoutingTable {
// Ask our remaining peers to give us more peers before we go
// back to the bootstrap servers to keep us from bothering them too much
async fn peer_minimum_refresh_task_routine(self) -> Result<(), String> {
trace!("--- peer_minimum_refresh task");
// get list of all peers we know about, even the unreliable ones, and ask them to bootstrap too
let noderefs = {
let mut inner = self.inner.lock();
@@ -665,11 +703,12 @@ impl RoutingTable {
}
noderefs
};
trace!(" refreshing with nodes: {:?}", noderefs);
// do peer minimum search concurrently
let mut unord = FuturesUnordered::new();
for nr in noderefs {
debug!("Peer minimum search with {:?}", nr);
debug!(" --- peer minimum search with {:?}", nr);
unord.push(self.reverse_find_node(nr, false));
}
while unord.next().await.is_some() {}
@@ -680,12 +719,18 @@ impl RoutingTable {
// Ping each node in the routing table if they need to be pinged
// to determine their reliability
async fn ping_validator_task_routine(self, _last_ts: u64, cur_ts: u64) -> Result<(), String> {
trace!("--- ping_validator task");
let rpc = self.rpc_processor();
let mut inner = self.inner.lock();
for b in &mut inner.buckets {
for (k, entry) in b.entries_mut() {
if entry.needs_ping(cur_ts) {
let nr = NodeRef::new(self.clone(), *k, entry);
debug!(
" --- ping validating: {:?} ({})",
nr,
entry.state_debug_info(cur_ts)
);
intf::spawn_local(rpc.clone().rpc_call_info(nr)).detach();
}
}
@@ -695,6 +740,7 @@ impl RoutingTable {
// Compute transfer statistics to determine how 'fast' a node is
async fn rolling_transfers_task_routine(self, last_ts: u64, cur_ts: u64) -> Result<(), String> {
trace!("--- rolling_transfers task");
let inner = &mut *self.inner.lock();
// Roll our own node's transfers

View File

@@ -1502,7 +1502,7 @@ impl RPCProcessor {
body: Vec<u8>,
peer_noderef: NodeRef,
) -> Result<(), ()> {
debug!("enqueue_message: body len = {}", body.len());
trace!("enqueue_message: body len = {}", body.len());
let msg = RPCMessage {
header: RPCMessageHeader {
timestamp: get_timestamp(),

View File

@@ -3,6 +3,7 @@ use crate::*;
use attachment_manager::AttachmentManager;
use core::fmt;
use network_manager::NetworkManager;
use routing_table::*;
use rpc_processor::{RPCError, RPCProcessor};
use xx::*;
@@ -961,6 +962,51 @@ impl VeilidAPI {
self.inner.lock().core.is_none()
}
////////////////////////////////////////////////////////////////
// Debugging
async fn debug_buckets(&self, mut debug_args: Vec<String>) -> Result<String, VeilidAPIError> {
let min_state = {
if let Some(min_state) = debug_args.pop() {
if min_state == "dead" {
BucketEntryState::Dead
} else if min_state == "reliable" {
BucketEntryState::Reliable
} else {
return Err(VeilidAPIError::Internal(format!(
"Invalid argument '{}'",
min_state
)));
}
} else {
BucketEntryState::Unreliable
}
};
// Dump routing table bucket info
let rpc = self.rpc_processor()?;
let routing_table = rpc.routing_table();
Ok(routing_table.debug_info(min_state))
}
pub async fn debug(&self, what: String) -> Result<String, VeilidAPIError> {
trace!("VeilidCore::debug");
let mut out = String::new();
let mut debug_args: Vec<String> = what
.split_ascii_whitespace()
.map(|s| s.to_owned())
.collect();
if let Some(arg) = debug_args.pop() {
if arg == "buckets" {
out += self.debug_buckets(debug_args).await?.as_str();
} else {
out += ">>> Unknown command\n";
}
} else {
out += ">>> Debug commands:\n buckets [dead|reliable]\n";
}
Ok(out)
}
////////////////////////////////////////////////////////////////
// Attach/Detach