cleanup and stats accounting organization

This commit is contained in:
John Smith 2021-11-26 09:54:38 -05:00
parent 311dc908fe
commit a80178da54
9 changed files with 341 additions and 231 deletions

View File

@ -1,3 +1,6 @@
variables:
GIT_SUBMODULE_STRATEGY: recursive
stages: stages:
- clippy - clippy
- test - test

View File

@ -1,3 +1,4 @@
#![warn(clippy::all)]
#![cfg_attr(target_arch = "wasm32", no_std)] #![cfg_attr(target_arch = "wasm32", no_std)]
#[macro_use] #[macro_use]

View File

@ -12,7 +12,6 @@ use xx::*;
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
const BANDWIDTH_TABLE_SIZE: usize = 10usize;
const CONNECTION_PROCESSOR_CHANNEL_SIZE: usize = 128usize; const CONNECTION_PROCESSOR_CHANNEL_SIZE: usize = 128usize;
pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE; pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE;
@ -32,14 +31,14 @@ pub enum NetworkClass {
impl NetworkClass { impl NetworkClass {
pub fn inbound_capable(&self) -> bool { pub fn inbound_capable(&self) -> bool {
match self { matches!(
self,
Self::Server Self::Server
| Self::Mapped | Self::Mapped
| Self::FullNAT | Self::FullNAT
| Self::AddressRestrictedNAT | Self::AddressRestrictedNAT
| Self::PortRestrictedNAT => true, | Self::PortRestrictedNAT
_ => false, )
}
} }
} }
@ -59,12 +58,6 @@ pub struct NetworkManagerInner {
routing_table: Option<RoutingTable>, routing_table: Option<RoutingTable>,
components: Option<NetworkComponents>, components: Option<NetworkComponents>,
network_class: Option<NetworkClass>, network_class: Option<NetworkClass>,
incoming_avg_bandwidth: f32,
incoming_max_bandwidth: f32,
incoming_bandwidth_table: Vec<f32>,
outgoing_avg_bandwidth: f32,
outgoing_max_bandwidth: f32,
outgoing_bandwidth_table: Vec<f32>,
connection_processor_jh: Option<JoinHandle<()>>, connection_processor_jh: Option<JoinHandle<()>>,
connection_add_channel_tx: Option<utils::channel::Sender<SystemPinBoxFuture<()>>>, connection_add_channel_tx: Option<utils::channel::Sender<SystemPinBoxFuture<()>>>,
} }
@ -79,33 +72,20 @@ pub struct NetworkManager {
impl NetworkManager { impl NetworkManager {
fn new_inner() -> NetworkManagerInner { fn new_inner() -> NetworkManagerInner {
let mut inner = NetworkManagerInner { NetworkManagerInner {
routing_table: None, routing_table: None,
components: None, components: None,
network_class: None, network_class: None,
incoming_avg_bandwidth: 0.0f32,
incoming_max_bandwidth: 0.0f32,
incoming_bandwidth_table: Vec::new(),
outgoing_avg_bandwidth: 0.0f32,
outgoing_max_bandwidth: 0.0f32,
outgoing_bandwidth_table: Vec::new(),
connection_processor_jh: None, connection_processor_jh: None,
connection_add_channel_tx: None, connection_add_channel_tx: None,
}; }
inner
.incoming_bandwidth_table
.resize(BANDWIDTH_TABLE_SIZE, 0.0f32);
inner
.outgoing_bandwidth_table
.resize(BANDWIDTH_TABLE_SIZE, 0.0f32);
inner
} }
pub fn new(config: VeilidConfig, table_store: TableStore, crypto: Crypto) -> Self { pub fn new(config: VeilidConfig, table_store: TableStore, crypto: Crypto) -> Self {
Self { Self {
config: config.clone(), config: config.clone(),
table_store: table_store, table_store,
crypto: crypto, crypto,
inner: Arc::new(Mutex::new(Self::new_inner())), inner: Arc::new(Mutex::new(Self::new_inner())),
} }
} }

View File

@ -1,15 +1,5 @@
use super::*; use super::*;
// Latency entry is per round-trip packet (ping or data)
// - Size is number of entries
const ROLLING_LATENCIES_SIZE: usize = 10;
// Transfers entries are in bytes total for the interval
// - Size is number of entries
// - Interval is number of seconds in each entry
const ROLLING_TRANSFERS_SIZE: usize = 10;
pub const ROLLING_TRANSFERS_INTERVAL_SECS: u32 = 10;
// Reliable pings are done with increased spacing between pings // Reliable pings are done with increased spacing between pings
// - Start secs is the number of seconds between the first two pings // - Start secs is the number of seconds between the first two pings
// - Max secs is the maximum number of seconds between consecutive pings // - Max secs is the maximum number of seconds between consecutive pings
@ -42,9 +32,7 @@ pub struct BucketEntry {
min_max_version: Option<(u8, u8)>, min_max_version: Option<(u8, u8)>,
last_connection: Option<(ConnectionDescriptor, u64)>, last_connection: Option<(ConnectionDescriptor, u64)>,
dial_info_entries: VecDeque<DialInfoEntry>, dial_info_entries: VecDeque<DialInfoEntry>,
rolling_latencies: VecDeque<u64>, stats_accounting: StatsAccounting,
rolling_transfers: VecDeque<(u64, u64)>,
current_transfer: (u64, u64),
peer_stats: PeerStats, peer_stats: PeerStats,
} }
@ -55,15 +43,13 @@ impl BucketEntry {
min_max_version: None, min_max_version: None,
last_connection: None, last_connection: None,
dial_info_entries: VecDeque::new(), dial_info_entries: VecDeque::new(),
rolling_latencies: VecDeque::new(), stats_accounting: StatsAccounting::new(),
rolling_transfers: VecDeque::new(),
current_transfer: (0, 0),
peer_stats: PeerStats { peer_stats: PeerStats {
time_added: get_timestamp(), time_added: get_timestamp(),
last_seen: None, last_seen: None,
ping_stats: PingStats::default(), ping_stats: PingStats::default(),
latency: None, latency: None,
transfer: (TransferStats::default(), TransferStats::default()), transfer: TransferStatsDownUp::default(),
node_info: None, node_info: None,
}, },
} }
@ -213,10 +199,7 @@ impl BucketEntry {
} }
pub fn last_connection(&self) -> Option<ConnectionDescriptor> { pub fn last_connection(&self) -> Option<ConnectionDescriptor> {
match self.last_connection.as_ref() { self.last_connection.as_ref().map(|x| x.0.clone())
Some(x) => Some(x.0.clone()),
None => None,
}
} }
pub fn set_min_max_version(&mut self, min_max_version: (u8, u8)) { pub fn set_min_max_version(&mut self, min_max_version: (u8, u8)) {
@ -248,71 +231,13 @@ impl BucketEntry {
///// stats methods ///// stats methods
// called every ROLLING_TRANSFERS_INTERVAL_SECS seconds // called every ROLLING_TRANSFERS_INTERVAL_SECS seconds
pub(super) fn roll_transfers(&mut self, last_ts: u64, cur_ts: u64) { pub(super) fn roll_transfers(&mut self, last_ts: u64, cur_ts: u64) {
let dur_ms = (cur_ts - last_ts) / 1000u64; self.stats_accounting
while self.rolling_transfers.len() >= ROLLING_TRANSFERS_SIZE { .roll_transfers(last_ts, cur_ts, &mut self.peer_stats.transfer);
self.rolling_transfers.pop_front();
}
self.rolling_transfers.push_back(self.current_transfer);
self.current_transfer = (0, 0);
let xd = &mut self.peer_stats.transfer.0;
let xu = &mut self.peer_stats.transfer.1;
xd.maximum = 0;
xu.maximum = 0;
xd.minimum = u64::MAX;
xu.minimum = u64::MAX;
xd.average = 0;
xu.average = 0;
for (rtd, rtu) in &self.rolling_transfers {
let bpsd = rtd * 1000u64 / dur_ms;
let bpsu = rtu * 1000u64 / dur_ms;
if bpsd > xd.maximum {
xd.maximum = bpsd;
}
if bpsu > xu.maximum {
xu.maximum = bpsu;
}
if bpsd < xd.minimum {
xd.minimum = bpsd;
}
if bpsu < xu.minimum {
xu.minimum = bpsu;
}
xd.average += bpsd;
xu.average += bpsu;
}
let len = self.rolling_transfers.len() as u64;
xd.average /= len;
xu.average /= len;
// total remains unchanged
} }
// Called for every round trip packet we receive // Called for every round trip packet we receive
fn record_latency(&mut self, latency: u64) { fn record_latency(&mut self, latency: u64) {
while self.rolling_latencies.len() >= ROLLING_LATENCIES_SIZE { self.peer_stats.latency = Some(self.stats_accounting.record_latency(latency));
self.rolling_latencies.pop_front();
}
self.rolling_latencies.push_back(latency);
let mut ls = LatencyStats {
fastest: 0,
average: 0,
slowest: 0,
};
for rl in &self.rolling_latencies {
if *rl < ls.fastest {
ls.fastest = *rl;
}
if *rl > ls.slowest {
ls.slowest = *rl;
}
ls.average += *rl;
}
let len = self.rolling_latencies.len() as u64;
ls.average /= len;
self.peer_stats.latency = Some(ls);
} }
///// state machine handling ///// state machine handling
@ -381,23 +306,23 @@ impl BucketEntry {
} }
//////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////
/// Called by RPC processor as events happen /// Called when rpc processor things happen
pub fn ping_sent(&mut self, ts: u64, bytes: u64) { pub(super) fn ping_sent(&mut self, ts: u64, bytes: u64) {
self.peer_stats.ping_stats.total_sent += 1; self.peer_stats.ping_stats.total_sent += 1;
self.current_transfer.1 += bytes; self.stats_accounting.add_up(bytes);
self.peer_stats.ping_stats.in_flight += 1; self.peer_stats.ping_stats.in_flight += 1;
self.peer_stats.ping_stats.last_pinged = Some(ts); self.peer_stats.ping_stats.last_pinged = Some(ts);
} }
pub fn ping_rcvd(&mut self, ts: u64, bytes: u64) { pub(super) fn ping_rcvd(&mut self, ts: u64, bytes: u64) {
self.current_transfer.0 += bytes; self.stats_accounting.add_down(bytes);
self.touch_last_seen(ts); self.touch_last_seen(ts);
} }
pub fn pong_sent(&mut self, _ts: u64, bytes: u64) { pub(super) fn pong_sent(&mut self, _ts: u64, bytes: u64) {
self.current_transfer.1 += bytes; self.stats_accounting.add_up(bytes);
} }
pub fn pong_rcvd(&mut self, send_ts: u64, recv_ts: u64, bytes: u64) { pub(super) fn pong_rcvd(&mut self, send_ts: u64, recv_ts: u64, bytes: u64) {
self.current_transfer.0 += bytes; self.stats_accounting.add_down(bytes);
self.peer_stats.ping_stats.in_flight -= 1; self.peer_stats.ping_stats.in_flight -= 1;
self.peer_stats.ping_stats.total_returned += 1; self.peer_stats.ping_stats.total_returned += 1;
self.peer_stats.ping_stats.consecutive_pongs += 1; self.peer_stats.ping_stats.consecutive_pongs += 1;
@ -412,28 +337,28 @@ impl BucketEntry {
self.record_latency(recv_ts - send_ts); self.record_latency(recv_ts - send_ts);
self.touch_last_seen(recv_ts); self.touch_last_seen(recv_ts);
} }
pub fn ping_lost(&mut self, _ts: u64) { pub(super) fn ping_lost(&mut self, _ts: u64) {
self.peer_stats.ping_stats.in_flight -= 1; self.peer_stats.ping_stats.in_flight -= 1;
self.peer_stats.ping_stats.recent_lost_pings += 1; self.peer_stats.ping_stats.recent_lost_pings += 1;
self.peer_stats.ping_stats.consecutive_pongs = 0; self.peer_stats.ping_stats.consecutive_pongs = 0;
self.peer_stats.ping_stats.first_consecutive_pong_time = None; self.peer_stats.ping_stats.first_consecutive_pong_time = None;
} }
pub fn question_sent(&mut self, _ts: u64, bytes: u64) { pub(super) fn question_sent(&mut self, _ts: u64, bytes: u64) {
self.current_transfer.1 += bytes; self.stats_accounting.add_up(bytes);
} }
pub fn question_rcvd(&mut self, ts: u64, bytes: u64) { pub(super) fn question_rcvd(&mut self, ts: u64, bytes: u64) {
self.current_transfer.0 += bytes; self.stats_accounting.add_down(bytes);
self.touch_last_seen(ts); self.touch_last_seen(ts);
} }
pub fn answer_sent(&mut self, _ts: u64, bytes: u64) { pub(super) fn answer_sent(&mut self, _ts: u64, bytes: u64) {
self.current_transfer.1 += bytes; self.stats_accounting.add_up(bytes);
} }
pub fn answer_rcvd(&mut self, send_ts: u64, recv_ts: u64, bytes: u64) { pub(super) fn answer_rcvd(&mut self, send_ts: u64, recv_ts: u64, bytes: u64) {
self.current_transfer.0 += bytes; self.stats_accounting.add_down(bytes);
self.record_latency(recv_ts - send_ts); self.record_latency(recv_ts - send_ts);
self.touch_last_seen(recv_ts); self.touch_last_seen(recv_ts);
} }
pub fn question_lost(&mut self, _ts: u64) { pub(super) fn question_lost(&mut self, _ts: u64) {
self.peer_stats.ping_stats.consecutive_pongs = 0; self.peer_stats.ping_stats.consecutive_pongs = 0;
self.peer_stats.ping_stats.first_consecutive_pong_time = None; self.peer_stats.ping_stats.first_consecutive_pong_time = None;
} }

View File

@ -3,12 +3,7 @@ mod bucket_entry;
mod dial_info_entry; mod dial_info_entry;
mod find_nodes; mod find_nodes;
mod node_ref; mod node_ref;
mod stats_accounting;
use bucket::*;
pub use bucket_entry::*;
pub use dial_info_entry::*;
pub use find_nodes::*;
pub use node_ref::*;
use crate::dht::*; use crate::dht::*;
use crate::intf::*; use crate::intf::*;
@ -18,7 +13,13 @@ use crate::xx::*;
use crate::*; use crate::*;
use alloc::collections::VecDeque; use alloc::collections::VecDeque;
use alloc::str::FromStr; use alloc::str::FromStr;
use bucket::*;
pub use bucket_entry::*;
pub use dial_info_entry::*;
pub use find_nodes::*;
use futures_util::stream::{FuturesUnordered, StreamExt}; use futures_util::stream::{FuturesUnordered, StreamExt};
pub use node_ref::*;
pub use stats_accounting::*;
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
@ -42,16 +43,15 @@ struct RoutingTableInner {
node_id: DHTKey, node_id: DHTKey,
node_id_secret: DHTKeySecret, node_id_secret: DHTKeySecret,
buckets: Vec<Bucket>, buckets: Vec<Bucket>,
//recent_nodes: VecDeque<DHTKey>,
//closest_reliable_nodes: Vec<DHTKey>,
//fastest_reliable_nodes: Vec<DHTKey>,
//closest_nodes: Vec<DHTKey>,
//fastest_nodes: Vec<DHTKey>,
local_dial_info: Vec<DialInfoDetail>, local_dial_info: Vec<DialInfoDetail>,
public_dial_info: Vec<DialInfoDetail>, public_dial_info: Vec<DialInfoDetail>,
bucket_entry_count: usize, bucket_entry_count: usize,
// Waiters // Waiters
eventual_changed_dial_info: Eventual, eventual_changed_dial_info: Eventual,
// Transfer stats for this node
stats_accounting: StatsAccounting,
// latency: Option<LatencyStats>,
transfer_stats: TransferStatsDownUp,
} }
struct RoutingTableUnlockedInner { struct RoutingTableUnlockedInner {
@ -72,25 +72,22 @@ pub struct RoutingTable {
impl RoutingTable { impl RoutingTable {
fn new_inner(network_manager: NetworkManager) -> RoutingTableInner { fn new_inner(network_manager: NetworkManager) -> RoutingTableInner {
RoutingTableInner { RoutingTableInner {
network_manager: network_manager, network_manager,
node_id: DHTKey::default(), node_id: DHTKey::default(),
node_id_secret: DHTKeySecret::default(), node_id_secret: DHTKeySecret::default(),
buckets: Vec::new(), buckets: Vec::new(),
//recent_nodes: VecDeque::new(),
//closest_reliable_nodes: Vec::new(),
//fastest_reliable_nodes: Vec::new(),
//closest_nodes: Vec::new(),
//fastest_nodes: Vec::new(),
local_dial_info: Vec::new(), local_dial_info: Vec::new(),
public_dial_info: Vec::new(), public_dial_info: Vec::new(),
bucket_entry_count: 0, bucket_entry_count: 0,
eventual_changed_dial_info: Eventual::new(), eventual_changed_dial_info: Eventual::new(),
stats_accounting: StatsAccounting::new(),
transfer_stats: TransferStatsDownUp::default(),
} }
} }
fn new_unlocked_inner(config: VeilidConfig) -> RoutingTableUnlockedInner { fn new_unlocked_inner(config: VeilidConfig) -> RoutingTableUnlockedInner {
let c = config.get(); let c = config.get();
RoutingTableUnlockedInner { RoutingTableUnlockedInner {
rolling_transfers_task: TickTask::new(bucket_entry::ROLLING_TRANSFERS_INTERVAL_SECS), rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS),
bootstrap_task: TickTask::new(1), bootstrap_task: TickTask::new(1),
peer_minimum_refresh_task: TickTask::new_us(c.network.dht.min_peer_refresh_time), peer_minimum_refresh_task: TickTask::new_us(c.network.dht.min_peer_refresh_time),
ping_validator_task: TickTask::new(1), ping_validator_task: TickTask::new(1),
@ -155,7 +152,7 @@ impl RoutingTable {
pub fn has_local_dial_info(&self) -> bool { pub fn has_local_dial_info(&self) -> bool {
let inner = self.inner.lock(); let inner = self.inner.lock();
inner.local_dial_info.len() > 0 !inner.local_dial_info.is_empty()
} }
pub fn local_dial_info(&self) -> Vec<DialInfoDetail> { pub fn local_dial_info(&self) -> Vec<DialInfoDetail> {
@ -197,14 +194,14 @@ impl RoutingTable {
} }
pub fn register_local_dial_info(&self, dial_info: DialInfo, origin: DialInfoOrigin) { pub fn register_local_dial_info(&self, dial_info: DialInfo, origin: DialInfoOrigin) {
let ts = get_timestamp(); let timestamp = get_timestamp();
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
inner.local_dial_info.push(DialInfoDetail { inner.local_dial_info.push(DialInfoDetail {
dial_info: dial_info.clone(), dial_info: dial_info.clone(),
origin: origin, origin,
network_class: None, network_class: None,
timestamp: ts, timestamp,
}); });
info!( info!(
@ -224,7 +221,7 @@ impl RoutingTable {
pub fn has_public_dial_info(&self) -> bool { pub fn has_public_dial_info(&self) -> bool {
let inner = self.inner.lock(); let inner = self.inner.lock();
inner.public_dial_info.len() > 0 !inner.public_dial_info.is_empty()
} }
pub fn public_dial_info(&self) -> Vec<DialInfoDetail> { pub fn public_dial_info(&self) -> Vec<DialInfoDetail> {
@ -278,8 +275,8 @@ impl RoutingTable {
inner.public_dial_info.push(DialInfoDetail { inner.public_dial_info.push(DialInfoDetail {
dial_info: dial_info.clone(), dial_info: dial_info.clone(),
origin: origin, origin,
network_class: network_class, network_class,
timestamp: ts, timestamp: ts,
}); });
@ -466,10 +463,9 @@ impl RoutingTable {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
let idx = Self::find_bucket_index(&*inner, node_id); let idx = Self::find_bucket_index(&*inner, node_id);
let bucket = &mut inner.buckets[idx]; let bucket = &mut inner.buckets[idx];
match bucket.entry_mut(&node_id) { bucket
None => None, .entry_mut(&node_id)
Some(e) => Some(NodeRef::new(self.clone(), node_id, e)), .map(|e| NodeRef::new(self.clone(), node_id, e))
}
} }
// Shortcut function to add a node to our routing table if it doesn't exist // Shortcut function to add a node to our routing table if it doesn't exist
@ -629,7 +625,7 @@ impl RoutingTable {
let node_id = ndis.node_id.key; let node_id = ndis.node_id.key;
bsmap bsmap
.entry(node_id) .entry(node_id)
.or_insert(Vec::new()) .or_insert_with(Vec::new)
.push(ndis.dial_info); .push(ndis.dial_info);
} }
@ -697,7 +693,14 @@ impl RoutingTable {
// Compute transfer statistics to determine how 'fast' a node is // Compute transfer statistics to determine how 'fast' a node is
async fn rolling_transfers_task_routine(self, last_ts: u64, cur_ts: u64) -> Result<(), String> { async fn rolling_transfers_task_routine(self, last_ts: u64, cur_ts: u64) -> Result<(), String> {
let mut inner = self.inner.lock(); let inner = &mut *self.inner.lock();
// Roll our own node's transfers
inner
.stats_accounting
.roll_transfers(last_ts, cur_ts, &mut inner.transfer_stats);
// Roll all bucket entry transfers
for b in &mut inner.buckets { for b in &mut inner.buckets {
b.roll_transfers(last_ts, cur_ts); b.roll_transfers(last_ts, cur_ts);
} }
@ -728,4 +731,66 @@ impl RoutingTable {
Ok(()) Ok(())
} }
//////////////////////////////////////////////////////////////////////
// Stats Accounting
pub fn ping_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) {
self.inner.lock().stats_accounting.add_up(bytes);
node_ref.operate(|e| {
e.ping_sent(ts, bytes);
})
}
pub fn ping_rcvd(&self, node_ref: NodeRef, ts: u64, bytes: u64) {
self.inner.lock().stats_accounting.add_down(bytes);
node_ref.operate(|e| {
e.ping_rcvd(ts, bytes);
})
}
pub fn pong_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) {
self.inner.lock().stats_accounting.add_up(bytes);
node_ref.operate(|e| {
e.pong_sent(ts, bytes);
})
}
pub fn pong_rcvd(&self, node_ref: NodeRef, send_ts: u64, recv_ts: u64, bytes: u64) {
self.inner.lock().stats_accounting.add_down(bytes);
node_ref.operate(|e| {
e.pong_rcvd(send_ts, recv_ts, bytes);
})
}
pub fn ping_lost(&self, node_ref: NodeRef, ts: u64) {
node_ref.operate(|e| {
e.ping_lost(ts);
})
}
pub fn question_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) {
self.inner.lock().stats_accounting.add_up(bytes);
node_ref.operate(|e| {
e.question_sent(ts, bytes);
})
}
pub fn question_rcvd(&self, node_ref: NodeRef, ts: u64, bytes: u64) {
self.inner.lock().stats_accounting.add_down(bytes);
node_ref.operate(|e| {
e.question_rcvd(ts, bytes);
})
}
pub fn answer_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) {
self.inner.lock().stats_accounting.add_up(bytes);
node_ref.operate(|e| {
e.answer_sent(ts, bytes);
})
}
pub fn answer_rcvd(&self, node_ref: NodeRef, send_ts: u64, recv_ts: u64, bytes: u64) {
self.inner.lock().stats_accounting.add_down(bytes);
node_ref.operate(|e| {
e.answer_rcvd(send_ts, recv_ts, bytes);
})
}
pub fn question_lost(&self, node_ref: NodeRef, ts: u64) {
node_ref.operate(|e| {
e.question_lost(ts);
})
}
} }

View File

@ -0,0 +1,104 @@
use crate::xx::*;
use crate::*;
use alloc::collections::VecDeque;
// Latency entry is per round-trip packet (ping or data)
// - Size is number of entries
const ROLLING_LATENCIES_SIZE: usize = 10;
// Transfers entries are in bytes total for the interval
// - Size is number of entries
// - Interval is number of seconds in each entry
const ROLLING_TRANSFERS_SIZE: usize = 10;
pub const ROLLING_TRANSFERS_INTERVAL_SECS: u32 = 10;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct TransferCount {
down: u64,
up: u64,
}
#[derive(Debug, Clone, Default)]
pub struct StatsAccounting {
rolling_latencies: VecDeque<u64>,
rolling_transfers: VecDeque<TransferCount>,
current_transfer: TransferCount,
}
impl StatsAccounting {
pub fn new() -> Self {
Self {
rolling_latencies: VecDeque::new(),
rolling_transfers: VecDeque::new(),
current_transfer: TransferCount::default(),
}
}
pub fn add_down(&mut self, bytes: u64) {
self.current_transfer.down += bytes;
}
pub fn add_up(&mut self, bytes: u64) {
self.current_transfer.up += bytes;
}
pub fn roll_transfers(
&mut self,
last_ts: u64,
cur_ts: u64,
transfer_stats: &mut TransferStatsDownUp,
) {
let dur_ms = (cur_ts - last_ts) / 1000u64;
while self.rolling_transfers.len() >= ROLLING_TRANSFERS_SIZE {
self.rolling_transfers.pop_front();
}
self.rolling_transfers.push_back(self.current_transfer);
transfer_stats.down.total += self.current_transfer.down;
transfer_stats.up.total += self.current_transfer.up;
self.current_transfer = TransferCount::default();
transfer_stats.down.maximum = 0;
transfer_stats.up.maximum = 0;
transfer_stats.down.minimum = u64::MAX;
transfer_stats.up.minimum = u64::MAX;
transfer_stats.down.average = 0;
transfer_stats.up.average = 0;
for xfer in &self.rolling_transfers {
let bpsd = xfer.down * 1000u64 / dur_ms;
let bpsu = xfer.up * 1000u64 / dur_ms;
transfer_stats.down.maximum.max_assign(bpsd);
transfer_stats.up.maximum.max_assign(bpsu);
transfer_stats.down.minimum.min_assign(bpsd);
transfer_stats.up.minimum.min_assign(bpsu);
transfer_stats.down.average += bpsd;
transfer_stats.down.average += bpsu;
}
let len = self.rolling_transfers.len() as u64;
transfer_stats.down.average /= len;
transfer_stats.up.average /= len;
}
pub fn record_latency(&mut self, latency: u64) -> veilid_api::LatencyStats {
while self.rolling_latencies.len() >= ROLLING_LATENCIES_SIZE {
self.rolling_latencies.pop_front();
}
self.rolling_latencies.push_back(latency);
let mut ls = LatencyStats {
fastest: 0,
average: 0,
slowest: 0,
};
for rl in &self.rolling_latencies {
ls.fastest.min_assign(*rl);
ls.slowest.max_assign(*rl);
ls.average += *rl;
}
let len = self.rolling_latencies.len() as u64;
ls.average /= len;
ls
}
}

View File

@ -575,24 +575,32 @@ impl RPCProcessor {
match &out { match &out {
Err(_) => { Err(_) => {
self.cancel_op_id_waiter(waitable_reply.op_id); self.cancel_op_id_waiter(waitable_reply.op_id);
waitable_reply.node_ref.operate(|e| {
if waitable_reply.is_ping { if waitable_reply.is_ping {
e.ping_lost(waitable_reply.send_ts); self.routing_table()
.ping_lost(waitable_reply.node_ref.clone(), waitable_reply.send_ts);
} else { } else {
e.question_lost(waitable_reply.send_ts); self.routing_table()
.question_lost(waitable_reply.node_ref.clone(), waitable_reply.send_ts);
} }
});
} }
Ok((rpcreader, _)) => { Ok((rpcreader, _)) => {
// Reply received // Reply received
let recv_ts = get_timestamp(); let recv_ts = get_timestamp();
waitable_reply.node_ref.operate(|e| {
if waitable_reply.is_ping { if waitable_reply.is_ping {
e.pong_rcvd(waitable_reply.send_ts, recv_ts, rpcreader.header.body_len); self.routing_table().pong_rcvd(
waitable_reply.node_ref,
waitable_reply.send_ts,
recv_ts,
rpcreader.header.body_len,
)
} else { } else {
e.answer_rcvd(waitable_reply.send_ts, recv_ts, rpcreader.header.body_len); self.routing_table().answer_rcvd(
waitable_reply.node_ref,
waitable_reply.send_ts,
recv_ts,
rpcreader.header.body_len,
)
} }
});
} }
}; };
@ -749,13 +757,13 @@ impl RPCProcessor {
// Successfully sent // Successfully sent
let send_ts = get_timestamp(); let send_ts = get_timestamp();
node_ref.operate(|e| {
if is_ping { if is_ping {
e.ping_sent(send_ts, bytes); self.routing_table()
.ping_sent(node_ref.clone(), send_ts, bytes);
} else { } else {
e.question_sent(send_ts, bytes); self.routing_table()
.question_sent(node_ref.clone(), send_ts, bytes);
} }
});
// Pass back waitable reply completion // Pass back waitable reply completion
match eventual { match eventual {
@ -763,13 +771,13 @@ impl RPCProcessor {
// if we don't want an answer, don't wait for one // if we don't want an answer, don't wait for one
Ok(None) Ok(None)
} }
Some(e) => Ok(Some(WaitableReply { Some(eventual) => Ok(Some(WaitableReply {
op_id: op_id, op_id,
eventual: e, eventual,
timeout: timeout, timeout,
node_ref: node_ref, node_ref,
send_ts: send_ts, send_ts,
is_ping: is_ping, is_ping,
})), })),
} }
} }
@ -916,13 +924,11 @@ impl RPCProcessor {
// Reply successfully sent // Reply successfully sent
let send_ts = get_timestamp(); let send_ts = get_timestamp();
node_ref.operate(|e| {
if is_pong { if is_pong {
e.pong_sent(send_ts, bytes); self.routing_table().pong_sent(node_ref, send_ts, bytes);
} else { } else {
e.answer_sent(send_ts, bytes); self.routing_table().answer_sent(node_ref, send_ts, bytes);
} }
});
Ok(()) Ok(())
} }
@ -985,16 +991,16 @@ impl RPCProcessor {
let will_validate_dial_info = self.will_validate_dial_info(); let will_validate_dial_info = self.will_validate_dial_info();
NodeInfo { NodeInfo {
can_route: can_route, can_route,
will_route: will_route, will_route,
can_tunnel: can_tunnel, can_tunnel,
will_tunnel: will_tunnel, will_tunnel,
can_signal_lease: can_signal_lease, can_signal_lease,
will_signal_lease: will_signal_lease, will_signal_lease,
can_relay_lease: can_relay_lease, can_relay_lease,
will_relay_lease: will_relay_lease, will_relay_lease,
can_validate_dial_info: can_validate_dial_info, can_validate_dial_info,
will_validate_dial_info: will_validate_dial_info, will_validate_dial_info,
} }
} }
@ -1007,9 +1013,7 @@ impl RPCProcessor {
None => None, None => None,
Some(c) => c.remote.to_socket_addr().ok(), Some(c) => c.remote.to_socket_addr().ok(),
}); });
SenderInfo { SenderInfo { socket_address }
socket_address: socket_address,
}
} }
async fn process_info_q(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> { async fn process_info_q(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> {
@ -1329,7 +1333,7 @@ impl RPCProcessor {
let reader = capnp::message::Reader::new(msg.data, Default::default()); let reader = capnp::message::Reader::new(msg.data, Default::default());
let rpcreader = RPCMessageReader { let rpcreader = RPCMessageReader {
header: msg.header, header: msg.header,
reader: reader, reader,
}; };
let (which, is_q) = { let (which, is_q) = {
@ -1379,13 +1383,17 @@ impl RPCProcessor {
.lookup_node_ref(rpcreader.header.envelope.get_sender_id()) .lookup_node_ref(rpcreader.header.envelope.get_sender_id())
{ {
if which == 0u32 { if which == 0u32 {
sender_nr.operate(|e| { self.routing_table().ping_rcvd(
e.ping_rcvd(rpcreader.header.timestamp, rpcreader.header.body_len); sender_nr,
}); rpcreader.header.timestamp,
rpcreader.header.body_len,
);
} else { } else {
sender_nr.operate(|e| { self.routing_table().question_rcvd(
e.question_rcvd(rpcreader.header.timestamp, rpcreader.header.body_len); sender_nr,
}); rpcreader.header.timestamp,
rpcreader.header.body_len,
);
} }
} }
}; };
@ -1508,9 +1516,9 @@ impl RPCProcessor {
let msg = RPCMessage { let msg = RPCMessage {
header: RPCMessageHeader { header: RPCMessageHeader {
timestamp: get_timestamp(), timestamp: get_timestamp(),
envelope: envelope, envelope,
body_len: body.len() as u64, body_len: body.len() as u64,
peer_noderef: peer_noderef, peer_noderef,
}, },
data: RPCMessageData { contents: body }, data: RPCMessageData { contents: body },
}; };
@ -1586,9 +1594,9 @@ impl RPCProcessor {
// Return the answer for anyone who may care // Return the answer for anyone who may care
let out = InfoAnswer { let out = InfoAnswer {
latency: latency, latency,
node_info: node_info, node_info,
sender_info: sender_info, sender_info,
}; };
Ok(out) Ok(out)
@ -1713,7 +1721,7 @@ impl RPCProcessor {
let peers_reader = find_node_a let peers_reader = find_node_a
.get_peers() .get_peers()
.map_err(map_error_internal!("Missing peers"))?; .map_err(map_error_internal!("Missing peers"))?;
let mut peers_vec = Vec::<PeerInfo>::with_capacity( let mut peers = Vec::<PeerInfo>::with_capacity(
peers_reader peers_reader
.len() .len()
.try_into() .try_into()
@ -1732,13 +1740,10 @@ impl RPCProcessor {
} }
} }
peers_vec.push(peer_info); peers.push(peer_info);
} }
let out = FindNodeAnswer { let out = FindNodeAnswer { latency, peers };
latency: latency,
peers: peers_vec,
};
Ok(out) Ok(out)
} }

View File

@ -671,6 +671,12 @@ pub struct LatencyStats {
pub slowest: u64, // slowest latency in the ROLLING_LATENCIES_SIZE last latencies pub slowest: u64, // slowest latency in the ROLLING_LATENCIES_SIZE last latencies
} }
#[derive(Clone, Debug, Default)]
pub struct TransferStatsDownUp {
pub down: TransferStats,
pub up: TransferStats,
}
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
pub struct TransferStats { pub struct TransferStats {
pub total: u64, // total amount transferred ever pub total: u64, // total amount transferred ever
@ -696,8 +702,8 @@ pub struct PeerStats {
pub last_seen: Option<u64>, // when the peer was last seen for any reason pub last_seen: Option<u64>, // when the peer was last seen for any reason
pub ping_stats: PingStats, // information about pings pub ping_stats: PingStats, // information about pings
pub latency: Option<LatencyStats>, // latencies for communications with the peer pub latency: Option<LatencyStats>, // latencies for communications with the peer
pub transfer: (TransferStats, TransferStats), // (download, upload) stats for communications with the peer pub transfer: TransferStatsDownUp, // Stats for communications with the peer
pub node_info: Option<NodeInfo>, // last known node info pub node_info: Option<NodeInfo>, // Last known node info
} }
cfg_if! { cfg_if! {

View File

@ -99,3 +99,24 @@ where
} }
None None
} }
pub trait CmpAssign {
fn min_assign(&mut self, other: Self);
fn max_assign(&mut self, other: Self);
}
impl<T> CmpAssign for T
where
T: core::cmp::Ord,
{
fn min_assign(&mut self, other: Self) {
if &other < self {
*self = other;
}
}
fn max_assign(&mut self, other: Self) {
if &other > self {
*self = other;
}
}
}