From ac0280e0b6ff2309be8ca8b0b66f001155bdd7b1 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 20 Mar 2022 10:52:03 -0400 Subject: [PATCH] add bandwidth tracking --- Cargo.lock | 22 ++++--- Cargo.toml | 2 +- external/hashlink | 2 +- veilid-core/Cargo.toml | 3 +- veilid-core/src/dht/crypto.rs | 57 +++++++++--------- veilid-core/src/dht/key.rs | 9 +++ veilid-core/src/intf/native/network/mod.rs | 41 +++++++++++-- .../src/intf/native/network/network_udp.rs | 7 +++ veilid-core/src/intf/wasm/network/mod.rs | 58 ++++++++++++------- veilid-core/src/network_manager.rs | 51 ++++++++++------ veilid-core/src/routing_table/debug.rs | 4 ++ veilid-core/src/routing_table/mod.rs | 28 +++++---- veilid-core/src/rpc_processor/mod.rs | 26 +++++---- 13 files changed, 203 insertions(+), 107 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1a108738..7f62cebf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1782,6 +1782,14 @@ dependencies = [ "hashbrown 0.11.2", ] +[[package]] +name = "hashlink" +version = "0.8.0" +dependencies = [ + "hashbrown 0.12.0", + "serde 1.0.136", +] + [[package]] name = "hermit-abi" version = "0.1.19" @@ -3293,7 +3301,7 @@ dependencies = [ "bitflags", "fallible-iterator", "fallible-streaming-iterator", - "hashlink", + "hashlink 0.7.0", "libsqlite3-sys", "memchr", "smallvec", @@ -4042,15 +4050,6 @@ dependencies = [ "static_assertions", ] -[[package]] -name = "uluru" -version = "3.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "794a32261a1f5eb6a4462c81b59cec87b5c27d5deea7dd1ac8fc781c41d226db" -dependencies = [ - "arrayvec 0.7.2", -] - [[package]] name = "unicode-bidi" version = "0.3.7" @@ -4201,7 +4200,7 @@ dependencies = [ "generic-array 0.14.5", "getrandom 0.2.5", "hashbrown 0.12.0", - "hashlink", + "hashlink 0.8.0", "hex", "ifstructs", "jni", @@ -4239,7 +4238,6 @@ dependencies = [ "socket2", "static_assertions", "thiserror", - "uluru", "wasm-bindgen", "wasm-bindgen-futures", "wasm-bindgen-test", diff --git a/Cargo.toml b/Cargo.toml index 6880d9bf..733f27c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ "veilid-wasm", ] -exclude = [ "./external/keyring-rs", "./external/netlink", "./external/cursive" ] +exclude = [ "./external/keyring-rs", "./external/netlink", "./external/cursive", "./external/hashlink" ] [patch.crates-io] cursive = { path = "./external/cursive/cursive" } diff --git a/external/hashlink b/external/hashlink index f5846ec3..9841db4f 160000 --- a/external/hashlink +++ b/external/hashlink @@ -1 +1 @@ -Subproject commit f5846ec3ff06865a204114bd710253e7e67e4498 +Subproject commit 9841db4f9a8b4d6bd46af78927cd88e70befd3f4 diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index 9e67e71d..a260be3a 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -25,8 +25,7 @@ hex = "^0" generic-array = "^0" secrecy = "^0" chacha20poly1305 = "^0" -uluru = "^3" -hashlink = "^0" +hashlink = { path = "../external/hashlink", features = ["serde_impl"] } serde-big-array = "^0" futures-util = { version = "^0", default_features = false, features = ["alloc"] } parking_lot = "^0" diff --git a/veilid-core/src/dht/crypto.rs b/veilid-core/src/dht/crypto.rs index a6051500..4b444326 100644 --- a/veilid-core/src/dht/crypto.rs +++ b/veilid-core/src/dht/crypto.rs @@ -7,8 +7,9 @@ use chacha20poly1305::aead::{AeadInPlace, NewAead}; use core::convert::TryInto; use curve25519_dalek as cd; use ed25519_dalek as ed; +use hashlink::linked_hash_map::Entry; +use hashlink::LruCache; use serde::{Deserialize, Serialize}; -use uluru; use x25519_dalek as xd; pub type SharedSecret = [u8; 32]; @@ -17,22 +18,25 @@ pub type Nonce = [u8; 24]; const DH_CACHE_SIZE: usize = 1024; pub const ENCRYPTION_OVERHEAD: usize = 16; -type DHCache = uluru::LRUCache; - -#[derive(Serialize, Deserialize)] -struct DHCacheEntry { +#[derive(Serialize, Deserialize, PartialEq, Eq, Hash)] +struct DHCacheKey { key: DHTKey, secret: DHTKeySecret, +} + +#[derive(Serialize, Deserialize)] +struct DHCacheValue { shared_secret: SharedSecret, } +type DHCache = LruCache; fn cache_to_bytes(cache: &DHCache) -> Vec { let cnt: usize = cache.len(); let mut out: Vec = Vec::with_capacity(cnt * (32 + 32 + 32)); for e in cache.iter() { - out.extend(&e.key.bytes); - out.extend(&e.secret.bytes); - out.extend(&e.shared_secret); + out.extend(&e.0.key.bytes); + out.extend(&e.0.secret.bytes); + out.extend(&e.1.shared_secret); } let mut rev: Vec = Vec::with_capacity(out.len()); for d in out.chunks(32 + 32 + 32).rev() { @@ -43,12 +47,14 @@ fn cache_to_bytes(cache: &DHCache) -> Vec { fn bytes_to_cache(bytes: &[u8], cache: &mut DHCache) { for d in bytes.chunks(32 + 32 + 32) { - let e = DHCacheEntry { + let k = DHCacheKey { key: DHTKey::new(d[0..32].try_into().expect("asdf")), secret: DHTKeySecret::new(d[32..64].try_into().expect("asdf")), + }; + let v = DHCacheValue { shared_secret: d[64..96].try_into().expect("asdf"), }; - cache.insert(e); + cache.insert(k, v); } } @@ -72,7 +78,7 @@ impl Crypto { table_store, node_id: Default::default(), node_id_secret: Default::default(), - dh_cache: DHCache::default(), + dh_cache: DHCache::new(DH_CACHE_SIZE), flush_future: None, } } @@ -176,22 +182,19 @@ impl Crypto { } pub fn cached_dh(&self, key: &DHTKey, secret: &DHTKeySecret) -> Result { - if let Some(c) = self - .inner - .lock() - .dh_cache - .find(|entry| entry.key == *key && entry.secret == *secret) - { - return Ok(c.shared_secret); - } - - let shared_secret = Self::compute_dh(key, secret)?; - self.inner.lock().dh_cache.insert(DHCacheEntry { - key: *key, - secret: *secret, - shared_secret, - }); - Ok(shared_secret) + Ok( + match self.inner.lock().dh_cache.entry(DHCacheKey { + key: *key, + secret: *secret, + }) { + Entry::Occupied(e) => e.get().shared_secret, + Entry::Vacant(e) => { + let shared_secret = Self::compute_dh(key, secret)?; + e.insert(DHCacheValue { shared_secret }); + shared_secret + } + }, + ) } /////////// diff --git a/veilid-core/src/dht/key.rs b/veilid-core/src/dht/key.rs index afee7204..fba0c927 100644 --- a/veilid-core/src/dht/key.rs +++ b/veilid-core/src/dht/key.rs @@ -2,6 +2,7 @@ use crate::xx::*; use core::cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd}; use core::convert::{TryFrom, TryInto}; use core::fmt; +use core::hash::{Hash, Hasher}; use hex; use crate::veilid_rng::*; @@ -203,6 +204,14 @@ macro_rules! byte_array_type { } } impl Eq for $name {} + impl Hash for $name { + fn hash(&self, state: &mut H) { + self.valid.hash(state); + if self.valid { + self.bytes.hash(state); + } + } + } impl Default for $name { fn default() -> Self { let mut this = $name::new([0u8; $size]); diff --git a/veilid-core/src/intf/native/network/mod.rs b/veilid-core/src/intf/native/network/mod.rs index 0263f216..94fabff6 100644 --- a/veilid-core/src/intf/native/network/mod.rs +++ b/veilid-core/src/intf/native/network/mod.rs @@ -261,7 +261,8 @@ impl Network { dial_info: DialInfo, data: Vec, ) -> Result<(), String> { - match dial_info.protocol_type() { + let data_len = data.len(); + let res = match dial_info.protocol_type() { ProtocolType::UDP => { let peer_socket_addr = dial_info.to_socket_addr(); RawUdpProtocolHandler::send_unbound_message(peer_socket_addr, data) @@ -275,11 +276,17 @@ impl Network { .map_err(logthru_net!()) } ProtocolType::WS | ProtocolType::WSS => { - WebsocketProtocolHandler::send_unbound_message(dial_info, data) + WebsocketProtocolHandler::send_unbound_message(dial_info.clone(), data) .await .map_err(logthru_net!()) } + }; + if res.is_ok() { + // Network accounting + self.network_manager() + .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); } + res } async fn send_data_to_existing_connection( @@ -287,6 +294,8 @@ impl Network { descriptor: ConnectionDescriptor, data: Vec, ) -> Result>, String> { + let data_len = data.len(); + // Handle connectionless protocol if descriptor.protocol_type() == ProtocolType::UDP { // send over the best udp socket we have bound since UDP is not connection oriented @@ -299,6 +308,11 @@ impl Network { .send_message(data, peer_socket_addr) .await .map_err(logthru_net!())?; + + // Network accounting + self.network_manager() + .stats_packet_sent(peer_socket_addr.ip(), data_len as u64); + // Data was consumed return Ok(None); } @@ -311,6 +325,10 @@ impl Network { // connection exists, send over it conn.send(data).await.map_err(logthru_net!())?; + // Network accounting + self.network_manager() + .stats_packet_sent(descriptor.remote.to_socket_addr().ip(), data_len as u64); + // Data was consumed Ok(None) } else { @@ -326,14 +344,21 @@ impl Network { dial_info: DialInfo, data: Vec, ) -> Result<(), String> { + let data_len = data.len(); // Handle connectionless protocol if dial_info.protocol_type() == ProtocolType::UDP { let peer_socket_addr = dial_info.to_socket_addr(); if let Some(ph) = self.find_best_udp_protocol_handler(&peer_socket_addr, &None) { - return ph + let res = ph .send_message(data, peer_socket_addr) .await .map_err(logthru_net!()); + if res.is_ok() { + // Network accounting + self.network_manager() + .stats_packet_sent(peer_socket_addr.ip(), data_len as u64); + } + return res; } return Err("no appropriate UDP protocol handler for dial_info".to_owned()) .map_err(logthru_net!(error)); @@ -343,10 +368,16 @@ impl Network { let local_addr = self.get_preferred_local_address(&dial_info); let conn = self .connection_manager() - .get_or_create_connection(Some(local_addr), dial_info) + .get_or_create_connection(Some(local_addr), dial_info.clone()) .await?; - conn.send(data).await.map_err(logthru_net!(error)) + let res = conn.send(data).await.map_err(logthru_net!(error)); + if res.is_ok() { + // Network accounting + self.network_manager() + .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); + } + res } // Send data to node diff --git a/veilid-core/src/intf/native/network/network_udp.rs b/veilid-core/src/intf/native/network/network_udp.rs index 7894138d..a7213b49 100644 --- a/veilid-core/src/intf/native/network/network_udp.rs +++ b/veilid-core/src/intf/native/network/network_udp.rs @@ -54,6 +54,13 @@ impl Network { // XXX: Limit the number of packets from the same IP address? log_net!("UDP packet: {:?}", descriptor); + // Network accounting + network_manager.stats_packet_rcvd( + descriptor.remote.to_socket_addr().ip(), + size as u64, + ); + + // Pass it up for processing if let Err(e) = network_manager .on_recv_envelope(&data[..size], descriptor) .await diff --git a/veilid-core/src/intf/wasm/network/mod.rs b/veilid-core/src/intf/wasm/network/mod.rs index 5e5ef819..ee79e9a1 100644 --- a/veilid-core/src/intf/wasm/network/mod.rs +++ b/veilid-core/src/intf/wasm/network/mod.rs @@ -49,6 +49,32 @@ impl Network { ///////////////////////////////////////////////////////////////// + pub async fn send_data_unbound_to_dial_info( + &self, + dial_info: DialInfo, + data: Vec, + ) -> Result<(), String> { + let res = match dial_info.protocol_type() { + ProtocolType::UDP => { + return Err("no support for UDP protocol".to_owned()).map_err(logthru_net!(error)) + } + ProtocolType::TCP => { + return Err("no support for TCP protocol".to_owned()).map_err(logthru_net!(error)) + } + ProtocolType::WS | ProtocolType::WSS => { + WebsocketProtocolHandler::send_unbound_message(dial_info, data) + .await + .map_err(logthru_net!()) + } + }; + if res.is_ok() { + // Network accounting + self.network_manager() + .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); + } + res + } + async fn send_data_to_existing_connection( &self, descriptor: ConnectionDescriptor, @@ -71,6 +97,10 @@ impl Network { // connection exists, send over it conn.send(data).await.map_err(logthru_net!())?; + // Network accounting + self.network_manager() + .stats_packet_sent(descriptor.remote.to_socket_addr().ip(), data_len as u64); + // Data was consumed Ok(None) } else { @@ -80,26 +110,6 @@ impl Network { } } - pub async fn send_data_unbound_to_dial_info( - &self, - dial_info: DialInfo, - data: Vec, - ) -> Result<(), String> { - match dial_info.protocol_type() { - ProtocolType::UDP => { - return Err("no support for UDP protocol".to_owned()).map_err(logthru_net!(error)) - } - ProtocolType::TCP => { - return Err("no support for TCP protocol".to_owned()).map_err(logthru_net!(error)) - } - ProtocolType::WS | ProtocolType::WSS => { - WebsocketProtocolHandler::send_unbound_message(dial_info, data) - .await - .map_err(logthru_net!()) - } - } - } - pub async fn send_data_to_dial_info( &self, dial_info: DialInfo, @@ -118,7 +128,13 @@ impl Network { .get_or_create_connection(None, dial_info) .await?; - conn.send(data).await.map_err(logthru_net!(error)) + let res = conn.send(data).await.map_err(logthru_net!(error)); + if res.is_ok() { + // Network accounting + self.network_manager() + .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); + } + res } pub async fn send_data(&self, node_ref: NodeRef, data: Vec) -> Result<(), String> { diff --git a/veilid-core/src/network_manager.rs b/veilid-core/src/network_manager.rs index 062fd989..9ad6ba7e 100644 --- a/veilid-core/src/network_manager.rs +++ b/veilid-core/src/network_manager.rs @@ -1,6 +1,7 @@ use crate::*; use connection_manager::*; use dht::*; +use hashlink::LruCache; use intf::*; use lease_manager::*; use receipt_manager::*; @@ -89,14 +90,30 @@ pub struct PerAddressStats { transfer_stats: TransferStatsDownUp, } -// Statistics about the low-level network -#[derive(Clone, Default)] -pub struct NetworkManagerStats { - self_stats: PerAddressStats, - per_address_stats: HashMap, - recent_addresses: VecDeque, +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] +pub struct PerAddressStatsKey(IpAddr); + +impl Default for PerAddressStatsKey { + fn default() -> Self { + Self(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) + } } +// Statistics about the low-level network +#[derive(Clone)] +pub struct NetworkManagerStats { + self_stats: PerAddressStats, + per_address_stats: LruCache, +} + +impl Default for NetworkManagerStats { + fn default() -> Self { + Self { + self_stats: PerAddressStats::default(), + per_address_stats: LruCache::new(IPADDR_TABLE_SIZE), + } + } +} // The mutable state of the network manager struct NetworkManagerInner { routing_table: Option, @@ -487,6 +504,10 @@ impl NetworkManager { data.len(), descriptor ); + + // Network accounting + self.stats_packet_rcvd(descriptor.remote.to_socket_addr().ip(), data.len() as u64); + // Is this an out-of-band receipt instead of an envelope? if data[0..4] == *RECEIPT_MAGIC { self.process_receipt(data).await?; @@ -603,7 +624,7 @@ impl NetworkManager { .roll_transfers(last_ts, cur_ts, &mut inner.stats.self_stats.transfer_stats); // Roll all per-address transfers - let mut dead_addrs: HashSet = HashSet::new(); + let mut dead_addrs: HashSet = HashSet::new(); for (addr, stats) in &mut inner.stats.per_address_stats { stats.transfer_stats_accounting.roll_transfers( last_ts, @@ -622,15 +643,11 @@ impl NetworkManager { for da in &dead_addrs { inner.stats.per_address_stats.remove(da); } - inner - .stats - .recent_addresses - .retain(|a| !dead_addrs.contains(a)); Ok(()) } // Callbacks from low level network for statistics gathering - fn packet_sent(&self, addr: IpAddr, bytes: u64) { + pub fn stats_packet_sent(&self, addr: IpAddr, bytes: u64) { let inner = &mut *self.inner.lock(); inner .stats @@ -640,13 +657,13 @@ impl NetworkManager { inner .stats .per_address_stats - .entry(addr) - .or_default() + .entry(PerAddressStatsKey(addr)) + .or_insert(PerAddressStats::default()) .transfer_stats_accounting .add_up(bytes); } - fn packet_rcvd(&self, addr: IpAddr, bytes: u64) { + pub fn stats_packet_rcvd(&self, addr: IpAddr, bytes: u64) { let inner = &mut *self.inner.lock(); inner .stats @@ -656,8 +673,8 @@ impl NetworkManager { inner .stats .per_address_stats - .entry(addr) - .or_default() + .entry(PerAddressStatsKey(addr)) + .or_insert(PerAddressStats::default()) .transfer_stats_accounting .add_down(bytes); } diff --git a/veilid-core/src/routing_table/debug.rs b/veilid-core/src/routing_table/debug.rs index fb37e0b6..67c3f598 100644 --- a/veilid-core/src/routing_table/debug.rs +++ b/veilid-core/src/routing_table/debug.rs @@ -7,6 +7,10 @@ impl RoutingTable { out += "Routing Table Info:\n"; out += &format!(" Node Id: {}\n", inner.node_id.encode()); + out += &format!( + " Self Latency Stats Accounting: {:#?}\n\n", + inner.self_latency_stats_accounting + ); out += &format!( " Self Transfer Stats Accounting: {:#?}\n\n", inner.self_transfer_stats_accounting diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 0ed18164..ee9239ba 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -653,7 +653,7 @@ impl RoutingTable { ////////////////////////////////////////////////////////////////////// // Stats Accounting - pub fn ping_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) { + pub fn stats_ping_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) { self.inner .lock() .self_transfer_stats_accounting @@ -662,7 +662,7 @@ impl RoutingTable { e.ping_sent(ts, bytes); }) } - pub fn ping_rcvd(&self, node_ref: NodeRef, ts: u64, bytes: u64) { + pub fn stats_ping_rcvd(&self, node_ref: NodeRef, ts: u64, bytes: u64) { self.inner .lock() .self_transfer_stats_accounting @@ -671,7 +671,7 @@ impl RoutingTable { e.ping_rcvd(ts, bytes); }) } - pub fn pong_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) { + pub fn stats_pong_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) { self.inner .lock() .self_transfer_stats_accounting @@ -680,21 +680,25 @@ impl RoutingTable { e.pong_sent(ts, bytes); }) } - pub fn pong_rcvd(&self, node_ref: NodeRef, send_ts: u64, recv_ts: u64, bytes: u64) { + pub fn stats_pong_rcvd(&self, node_ref: NodeRef, send_ts: u64, recv_ts: u64, bytes: u64) { self.inner .lock() .self_transfer_stats_accounting .add_down(bytes); + self.inner + .lock() + .self_latency_stats_accounting + .record_latency(recv_ts - send_ts); node_ref.operate(|e| { e.pong_rcvd(send_ts, recv_ts, bytes); }) } - pub fn ping_lost(&self, node_ref: NodeRef, ts: u64) { + pub fn stats_ping_lost(&self, node_ref: NodeRef, ts: u64) { node_ref.operate(|e| { e.ping_lost(ts); }) } - pub fn question_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) { + pub fn stats_question_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) { self.inner .lock() .self_transfer_stats_accounting @@ -703,7 +707,7 @@ impl RoutingTable { e.question_sent(ts, bytes); }) } - pub fn question_rcvd(&self, node_ref: NodeRef, ts: u64, bytes: u64) { + pub fn stats_question_rcvd(&self, node_ref: NodeRef, ts: u64, bytes: u64) { self.inner .lock() .self_transfer_stats_accounting @@ -712,7 +716,7 @@ impl RoutingTable { e.question_rcvd(ts, bytes); }) } - pub fn answer_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) { + pub fn stats_answer_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) { self.inner .lock() .self_transfer_stats_accounting @@ -721,16 +725,20 @@ impl RoutingTable { e.answer_sent(ts, bytes); }) } - pub fn answer_rcvd(&self, node_ref: NodeRef, send_ts: u64, recv_ts: u64, bytes: u64) { + pub fn stats_answer_rcvd(&self, node_ref: NodeRef, send_ts: u64, recv_ts: u64, bytes: u64) { self.inner .lock() .self_transfer_stats_accounting .add_down(bytes); + self.inner + .lock() + .self_latency_stats_accounting + .record_latency(recv_ts - send_ts); node_ref.operate(|e| { e.answer_rcvd(send_ts, recv_ts, bytes); }) } - pub fn question_lost(&self, node_ref: NodeRef, ts: u64) { + pub fn stats_question_lost(&self, node_ref: NodeRef, ts: u64) { node_ref.operate(|e| { e.question_lost(ts); }) diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 17e7ebed..415f9dc8 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -348,24 +348,26 @@ impl RPCProcessor { self.cancel_op_id_waiter(waitable_reply.op_id); if waitable_reply.is_ping { self.routing_table() - .ping_lost(waitable_reply.node_ref.clone(), waitable_reply.send_ts); + .stats_ping_lost(waitable_reply.node_ref.clone(), waitable_reply.send_ts); } else { - self.routing_table() - .question_lost(waitable_reply.node_ref.clone(), waitable_reply.send_ts); + self.routing_table().stats_question_lost( + waitable_reply.node_ref.clone(), + waitable_reply.send_ts, + ); } } Ok((rpcreader, _)) => { // Reply received let recv_ts = get_timestamp(); if waitable_reply.is_ping { - self.routing_table().pong_rcvd( + self.routing_table().stats_pong_rcvd( waitable_reply.node_ref, waitable_reply.send_ts, recv_ts, rpcreader.header.body_len, ) } else { - self.routing_table().answer_rcvd( + self.routing_table().stats_answer_rcvd( waitable_reply.node_ref, waitable_reply.send_ts, recv_ts, @@ -538,10 +540,10 @@ impl RPCProcessor { let send_ts = get_timestamp(); if is_ping { self.routing_table() - .ping_sent(node_ref.clone(), send_ts, bytes); + .stats_ping_sent(node_ref.clone(), send_ts, bytes); } else { self.routing_table() - .question_sent(node_ref.clone(), send_ts, bytes); + .stats_question_sent(node_ref.clone(), send_ts, bytes); } // Pass back waitable reply completion @@ -718,9 +720,11 @@ impl RPCProcessor { let send_ts = get_timestamp(); if is_pong { - self.routing_table().pong_sent(node_ref, send_ts, bytes); + self.routing_table() + .stats_pong_sent(node_ref, send_ts, bytes); } else { - self.routing_table().answer_sent(node_ref, send_ts, bytes); + self.routing_table() + .stats_answer_sent(node_ref, send_ts, bytes); } Ok(()) @@ -1190,13 +1194,13 @@ impl RPCProcessor { .lookup_node_ref(rpcreader.header.envelope.get_sender_id()) { if which == 0u32 { - self.routing_table().ping_rcvd( + self.routing_table().stats_ping_rcvd( sender_nr, rpcreader.header.timestamp, rpcreader.header.body_len, ); } else { - self.routing_table().question_rcvd( + self.routing_table().stats_question_rcvd( sender_nr, rpcreader.header.timestamp, rpcreader.header.body_len,