diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 783ba043..1ead5705 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -59,7 +59,7 @@ impl BucketEntry { peer_stats: PeerStats { time_added: now, last_seen: None, - ping_stats: PingStats::default(), + rpc_stats: RPCStats::default(), latency: None, transfer: TransferStatsDownUp::default(), status: None, @@ -187,7 +187,7 @@ impl BucketEntry { ///// state machine handling pub(super) fn check_reliable(&self, cur_ts: u64) -> bool { // if we have had consecutive ping replies for longer that UNRELIABLE_PING_SPAN_SECS - match self.peer_stats.ping_stats.first_consecutive_pong_time { + match self.peer_stats.rpc_stats.first_consecutive_answer_time { None => false, Some(ts) => { cur_ts.saturating_sub(ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64) @@ -206,9 +206,9 @@ impl BucketEntry { } fn needs_constant_ping(&self, cur_ts: u64, interval: u64) -> bool { - match self.peer_stats.ping_stats.last_pinged { + match self.peer_stats.last_seen { None => true, - Some(last_pinged) => cur_ts.saturating_sub(last_pinged) >= (interval * 1000000u64), + Some(last_seen) => cur_ts.saturating_sub(last_seen) >= (interval * 1000000u64), } } @@ -235,25 +235,25 @@ impl BucketEntry { match state { BucketEntryState::Reliable => { // If we are in a reliable state, we need a ping on an exponential scale - match self.peer_stats.ping_stats.last_pinged { + match self.peer_stats.last_seen { None => true, - Some(last_pinged) => { - let first_consecutive_pong_time = self + Some(last_seen) => { + let first_consecutive_answer_time = self .peer_stats - .ping_stats - .first_consecutive_pong_time + .rpc_stats + .first_consecutive_answer_time .unwrap(); - let start_of_reliable_time = first_consecutive_pong_time + let start_of_reliable_time = first_consecutive_answer_time + ((UNRELIABLE_PING_SPAN_SECS - UNRELIABLE_PING_INTERVAL_SECS) as u64 - * 1000000u64); + * 1_000_000u64); let reliable_cur = cur_ts.saturating_sub(start_of_reliable_time); - let reliable_last = last_pinged.saturating_sub(start_of_reliable_time); + let reliable_last = last_seen.saturating_sub(start_of_reliable_time); retry_falloff_log( reliable_last, reliable_cur, - RELIABLE_PING_INTERVAL_START_SECS as u64 * 1000000u64, - RELIABLE_PING_INTERVAL_MAX_SECS as u64 * 1000000u64, + RELIABLE_PING_INTERVAL_START_SECS as u64 * 1_000_000u64, + RELIABLE_PING_INTERVAL_MAX_SECS as u64 * 1_000_000u64, RELIABLE_PING_INTERVAL_MULTIPLIER, ) } @@ -269,26 +269,18 @@ impl BucketEntry { pub(super) fn touch_last_seen(&mut self, ts: u64) { // If we've heard from the node at all, we can always restart our lost ping count - self.peer_stats.ping_stats.recent_lost_pings = 0; + self.peer_stats.rpc_stats.recent_lost_answers = 0; // Mark the node as seen self.peer_stats.last_seen = Some(ts); } pub(super) fn state_debug_info(&self, cur_ts: u64) -> String { - let last_pinged = if let Some(last_pinged) = self.peer_stats.ping_stats.last_pinged { - format!( - "{}s ago", - timestamp_to_secs(cur_ts.saturating_sub(last_pinged)) - ) - } else { - "never".to_owned() - }; - let first_consecutive_pong_time = if let Some(first_consecutive_pong_time) = - self.peer_stats.ping_stats.first_consecutive_pong_time + let first_consecutive_answer_time = if let Some(first_consecutive_answer_time) = + self.peer_stats.rpc_stats.first_consecutive_answer_time { format!( "{}s ago", - timestamp_to_secs(cur_ts.saturating_sub(first_consecutive_pong_time)) + timestamp_to_secs(cur_ts.saturating_sub(first_consecutive_answer_time)) ) } else { "never".to_owned() @@ -303,10 +295,9 @@ impl BucketEntry { }; format!( - "state: {:?}, first_consecutive_pong_time: {}, last_pinged: {}, last_seen: {}", + "state: {:?}, first_consecutive_answer_time: {}, last_seen: {}", self.state(cur_ts), - first_consecutive_pong_time, - last_pinged, + first_consecutive_answer_time, last_seen ) } @@ -314,69 +305,43 @@ impl BucketEntry { //////////////////////////////////////////////////////////////// /// Called when rpc processor things happen - pub(super) fn ping_sent(&mut self, ts: u64, bytes: u64) { - self.peer_stats.ping_stats.total_sent += 1; + pub(super) fn question_sent(&mut self, ts: u64, bytes: u64, expects_answer: bool) { self.transfer_stats_accounting.add_up(bytes); - self.peer_stats.ping_stats.in_flight += 1; - self.peer_stats.ping_stats.last_pinged = Some(ts); - // if we haven't heard from this node yet and it's our first attempt at contacting it - // then we set the last_seen time - if self.peer_stats.last_seen.is_none() { - self.peer_stats.last_seen = Some(ts); + self.peer_stats.rpc_stats.messages_sent += 1; + if expects_answer { + self.peer_stats.rpc_stats.questions_in_flight += 1; } - } - pub(super) fn ping_rcvd(&mut self, ts: u64, bytes: u64) { - self.transfer_stats_accounting.add_down(bytes); - self.touch_last_seen(ts); - } - pub(super) fn pong_sent(&mut self, _ts: u64, bytes: u64) { - self.transfer_stats_accounting.add_up(bytes); - } - pub(super) fn pong_rcvd(&mut self, send_ts: u64, recv_ts: u64, bytes: u64) { - self.transfer_stats_accounting.add_down(bytes); - self.peer_stats.ping_stats.in_flight -= 1; - self.peer_stats.ping_stats.total_returned += 1; - self.peer_stats.ping_stats.consecutive_pongs += 1; - if self - .peer_stats - .ping_stats - .first_consecutive_pong_time - .is_none() - { - self.peer_stats.ping_stats.first_consecutive_pong_time = Some(recv_ts); - } - self.record_latency(recv_ts - send_ts); - self.touch_last_seen(recv_ts); - } - pub(super) fn ping_lost(&mut self, _ts: u64) { - self.peer_stats.ping_stats.in_flight -= 1; - self.peer_stats.ping_stats.recent_lost_pings += 1; - self.peer_stats.ping_stats.consecutive_pongs = 0; - self.peer_stats.ping_stats.first_consecutive_pong_time = None; - } - pub(super) fn question_sent(&mut self, ts: u64, bytes: u64) { - self.transfer_stats_accounting.add_up(bytes); - // if we haven't heard from this node yet and it's our first attempt at contacting it - // then we set the last_seen time if self.peer_stats.last_seen.is_none() { self.peer_stats.last_seen = Some(ts); } } pub(super) fn question_rcvd(&mut self, ts: u64, bytes: u64) { self.transfer_stats_accounting.add_down(bytes); + self.peer_stats.rpc_stats.messages_rcvd += 1; self.touch_last_seen(ts); } pub(super) fn answer_sent(&mut self, _ts: u64, bytes: u64) { self.transfer_stats_accounting.add_up(bytes); + self.peer_stats.rpc_stats.messages_sent += 1; } pub(super) fn answer_rcvd(&mut self, send_ts: u64, recv_ts: u64, bytes: u64) { self.transfer_stats_accounting.add_down(bytes); + self.peer_stats.rpc_stats.messages_rcvd += 1; + self.peer_stats.rpc_stats.questions_in_flight -= 1; + if self + .peer_stats + .rpc_stats + .first_consecutive_answer_time + .is_none() + { + self.peer_stats.rpc_stats.first_consecutive_answer_time = Some(recv_ts); + } self.record_latency(recv_ts - send_ts); self.touch_last_seen(recv_ts); } pub(super) fn question_lost(&mut self, _ts: u64) { - self.peer_stats.ping_stats.consecutive_pongs = 0; - self.peer_stats.ping_stats.first_consecutive_pong_time = None; + self.peer_stats.rpc_stats.first_consecutive_answer_time = None; + self.peer_stats.rpc_stats.questions_in_flight -= 1; } } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 0b35c51f..fe0b1675 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -797,59 +797,19 @@ impl RoutingTable { ////////////////////////////////////////////////////////////////////// // Stats Accounting - - pub fn stats_ping_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) { + pub fn stats_question_sent( + &self, + node_ref: NodeRef, + ts: u64, + bytes: u64, + expects_answer: bool, + ) { self.inner .lock() .self_transfer_stats_accounting .add_up(bytes); node_ref.operate(|e| { - e.ping_sent(ts, bytes); - }) - } - pub fn stats_ping_rcvd(&self, node_ref: NodeRef, ts: u64, bytes: u64) { - self.inner - .lock() - .self_transfer_stats_accounting - .add_down(bytes); - node_ref.operate(|e| { - e.ping_rcvd(ts, bytes); - }) - } - pub fn stats_pong_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) { - self.inner - .lock() - .self_transfer_stats_accounting - .add_up(bytes); - node_ref.operate(|e| { - e.pong_sent(ts, bytes); - }) - } - pub fn stats_pong_rcvd(&self, node_ref: NodeRef, send_ts: u64, recv_ts: u64, bytes: u64) { - self.inner - .lock() - .self_transfer_stats_accounting - .add_down(bytes); - self.inner - .lock() - .self_latency_stats_accounting - .record_latency(recv_ts - send_ts); - node_ref.operate(|e| { - e.pong_rcvd(send_ts, recv_ts, bytes); - }) - } - pub fn stats_ping_lost(&self, node_ref: NodeRef, ts: u64) { - node_ref.operate(|e| { - e.ping_lost(ts); - }) - } - pub fn stats_question_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) { - self.inner - .lock() - .self_transfer_stats_accounting - .add_up(bytes); - node_ref.operate(|e| { - e.question_sent(ts, bytes); + e.question_sent(ts, bytes, expects_answer); }) } pub fn stats_question_rcvd(&self, node_ref: NodeRef, ts: u64, bytes: u64) { diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 3f4cd92d..1f8d114b 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -124,7 +124,6 @@ struct WaitableReply { timeout: u64, node_ref: NodeRef, send_ts: u64, - is_ping: bool, } ///////////////////////////////////////////////////////////////////// @@ -367,15 +366,9 @@ impl RPCProcessor { match &out { Err(_) => { self.cancel_op_id_waiter(waitable_reply.op_id); - if waitable_reply.is_ping { - self.routing_table() - .stats_ping_lost(waitable_reply.node_ref.clone(), waitable_reply.send_ts); - } else { - self.routing_table().stats_question_lost( - waitable_reply.node_ref.clone(), - waitable_reply.send_ts, - ); - } + + self.routing_table() + .stats_question_lost(waitable_reply.node_ref.clone(), waitable_reply.send_ts); } Ok((rpcreader, _)) => { // Note that we definitely received this node info since we got a reply @@ -383,21 +376,12 @@ impl RPCProcessor { // Reply received let recv_ts = get_timestamp(); - if waitable_reply.is_ping { - self.routing_table().stats_pong_rcvd( - waitable_reply.node_ref, - waitable_reply.send_ts, - recv_ts, - rpcreader.header.body_len, - ) - } else { - self.routing_table().stats_answer_rcvd( - waitable_reply.node_ref, - waitable_reply.send_ts, - recv_ts, - rpcreader.header.body_len, - ) - } + self.routing_table().stats_answer_rcvd( + waitable_reply.node_ref, + waitable_reply.send_ts, + recv_ts, + rpcreader.header.body_len, + ) } }; @@ -416,16 +400,15 @@ impl RPCProcessor { ) -> Result, RPCError> { log_rpc!(self.get_rpc_request_debug_info(&dest, &message, &safety_route_spec)); - let (op_id, wants_answer, is_ping) = { + let (op_id, wants_answer) = { let operation = message .get_root::() .map_err(map_error_internal!("invalid operation")) .map_err(logthru_rpc!(error))?; let op_id = operation.get_op_id(); let wants_answer = self.wants_answer(&operation).map_err(logthru_rpc!())?; - let is_ping = operation.get_detail().has_info_q(); - (op_id, wants_answer, is_ping) + (op_id, wants_answer) }; let out_node_id; @@ -572,13 +555,8 @@ impl RPCProcessor { // Successfully sent let send_ts = get_timestamp(); - if is_ping { - self.routing_table() - .stats_ping_sent(node_ref.clone(), send_ts, bytes); - } else { - self.routing_table() - .stats_question_sent(node_ref.clone(), send_ts, bytes); - } + self.routing_table() + .stats_question_sent(node_ref.clone(), send_ts, bytes, wants_answer); // Pass back waitable reply completion match eventual { @@ -592,7 +570,6 @@ impl RPCProcessor { timeout, node_ref, send_ts, - is_ping, })), } } @@ -610,12 +587,6 @@ impl RPCProcessor { // let out_node_id; let mut out_noderef: Option = None; - let is_pong = { - let operation = reply_msg - .get_root::() - .map_err(map_error_internal!("invalid operation"))?; - operation.get_detail().has_info_a() - }; let out = { let out; @@ -753,13 +724,8 @@ impl RPCProcessor { // Reply successfully sent let send_ts = get_timestamp(); - if is_pong { - self.routing_table() - .stats_pong_sent(node_ref, send_ts, bytes); - } else { - self.routing_table() - .stats_answer_sent(node_ref, send_ts, bytes); - } + self.routing_table() + .stats_answer_sent(node_ref, send_ts, bytes); Ok(()) } @@ -1237,19 +1203,11 @@ impl RPCProcessor { }; if let Some(sender_nr) = opt_sender_nr.clone() { - if which == 0u32 { - self.routing_table().stats_ping_rcvd( - sender_nr, - msg.header.timestamp, - msg.header.body_len, - ); - } else { - self.routing_table().stats_question_rcvd( - sender_nr, - msg.header.timestamp, - msg.header.body_len, - ); - } + self.routing_table().stats_question_rcvd( + sender_nr, + msg.header.timestamp, + msg.header.body_len, + ); } }; diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 5782e70d..6302564c 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -1099,21 +1099,20 @@ pub struct TransferStatsDownUp { } #[derive(Clone, Debug, Default, Serialize, Deserialize)] -pub struct PingStats { - pub in_flight: u32, // number of pings issued that have yet to be answered - pub total_sent: u32, // number of pings that have been sent in the total_time range - pub total_returned: u32, // number of pings that have been returned by the node in the total_time range - pub consecutive_pongs: u32, // number of pongs that have been received and returned consecutively without a lost ping - pub last_pinged: Option, // when the peer was last pinged - pub first_consecutive_pong_time: Option, // the timestamp of the first pong in a series of consecutive pongs - pub recent_lost_pings: u32, // number of pings that have been lost since we lost reliability +pub struct RPCStats { + pub messages_sent: u32, // number of rpcs that have been sent in the total_time range + pub messages_rcvd: u32, // number of rpcs that have been received in the total_time range + pub questions_in_flight: u32, // number of questions issued that have yet to be answered + //pub last_question: Option, // when the peer was last questioned and we want an answer + pub first_consecutive_answer_time: Option, // the timestamp of the first answer in a series of consecutive questions + pub recent_lost_answers: u32, // number of answers that have been lost since we lost reliability } #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct PeerStats { pub time_added: u64, // when the peer was added to the routing table pub last_seen: Option, // when the peer was last seen for any reason, including when we first attempted to reach out to it - pub ping_stats: PingStats, // information about pings + pub rpc_stats: RPCStats, // information about RPCs pub latency: Option, // latencies for communications with the peer pub transfer: TransferStatsDownUp, // Stats for communications with the peer pub status: Option, // Last known node status