simplify ping checking

This commit is contained in:
John Smith 2022-04-18 18:49:33 -04:00
parent 9cee8c292f
commit 1d30073360
4 changed files with 74 additions and 192 deletions

View File

@ -59,7 +59,7 @@ impl BucketEntry {
peer_stats: PeerStats {
time_added: now,
last_seen: None,
ping_stats: PingStats::default(),
rpc_stats: RPCStats::default(),
latency: None,
transfer: TransferStatsDownUp::default(),
status: None,
@ -187,7 +187,7 @@ impl BucketEntry {
///// state machine handling
pub(super) fn check_reliable(&self, cur_ts: u64) -> bool {
// if we have had consecutive ping replies for longer that UNRELIABLE_PING_SPAN_SECS
match self.peer_stats.ping_stats.first_consecutive_pong_time {
match self.peer_stats.rpc_stats.first_consecutive_answer_time {
None => false,
Some(ts) => {
cur_ts.saturating_sub(ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64)
@ -206,9 +206,9 @@ impl BucketEntry {
}
fn needs_constant_ping(&self, cur_ts: u64, interval: u64) -> bool {
match self.peer_stats.ping_stats.last_pinged {
match self.peer_stats.last_seen {
None => true,
Some(last_pinged) => cur_ts.saturating_sub(last_pinged) >= (interval * 1000000u64),
Some(last_seen) => cur_ts.saturating_sub(last_seen) >= (interval * 1000000u64),
}
}
@ -235,25 +235,25 @@ impl BucketEntry {
match state {
BucketEntryState::Reliable => {
// If we are in a reliable state, we need a ping on an exponential scale
match self.peer_stats.ping_stats.last_pinged {
match self.peer_stats.last_seen {
None => true,
Some(last_pinged) => {
let first_consecutive_pong_time = self
Some(last_seen) => {
let first_consecutive_answer_time = self
.peer_stats
.ping_stats
.first_consecutive_pong_time
.rpc_stats
.first_consecutive_answer_time
.unwrap();
let start_of_reliable_time = first_consecutive_pong_time
let start_of_reliable_time = first_consecutive_answer_time
+ ((UNRELIABLE_PING_SPAN_SECS - UNRELIABLE_PING_INTERVAL_SECS) as u64
* 1000000u64);
* 1_000_000u64);
let reliable_cur = cur_ts.saturating_sub(start_of_reliable_time);
let reliable_last = last_pinged.saturating_sub(start_of_reliable_time);
let reliable_last = last_seen.saturating_sub(start_of_reliable_time);
retry_falloff_log(
reliable_last,
reliable_cur,
RELIABLE_PING_INTERVAL_START_SECS as u64 * 1000000u64,
RELIABLE_PING_INTERVAL_MAX_SECS as u64 * 1000000u64,
RELIABLE_PING_INTERVAL_START_SECS as u64 * 1_000_000u64,
RELIABLE_PING_INTERVAL_MAX_SECS as u64 * 1_000_000u64,
RELIABLE_PING_INTERVAL_MULTIPLIER,
)
}
@ -269,26 +269,18 @@ impl BucketEntry {
pub(super) fn touch_last_seen(&mut self, ts: u64) {
// If we've heard from the node at all, we can always restart our lost ping count
self.peer_stats.ping_stats.recent_lost_pings = 0;
self.peer_stats.rpc_stats.recent_lost_answers = 0;
// Mark the node as seen
self.peer_stats.last_seen = Some(ts);
}
pub(super) fn state_debug_info(&self, cur_ts: u64) -> String {
let last_pinged = if let Some(last_pinged) = self.peer_stats.ping_stats.last_pinged {
format!(
"{}s ago",
timestamp_to_secs(cur_ts.saturating_sub(last_pinged))
)
} else {
"never".to_owned()
};
let first_consecutive_pong_time = if let Some(first_consecutive_pong_time) =
self.peer_stats.ping_stats.first_consecutive_pong_time
let first_consecutive_answer_time = if let Some(first_consecutive_answer_time) =
self.peer_stats.rpc_stats.first_consecutive_answer_time
{
format!(
"{}s ago",
timestamp_to_secs(cur_ts.saturating_sub(first_consecutive_pong_time))
timestamp_to_secs(cur_ts.saturating_sub(first_consecutive_answer_time))
)
} else {
"never".to_owned()
@ -303,10 +295,9 @@ impl BucketEntry {
};
format!(
"state: {:?}, first_consecutive_pong_time: {}, last_pinged: {}, last_seen: {}",
"state: {:?}, first_consecutive_answer_time: {}, last_seen: {}",
self.state(cur_ts),
first_consecutive_pong_time,
last_pinged,
first_consecutive_answer_time,
last_seen
)
}
@ -314,69 +305,43 @@ impl BucketEntry {
////////////////////////////////////////////////////////////////
/// Called when rpc processor things happen
pub(super) fn ping_sent(&mut self, ts: u64, bytes: u64) {
self.peer_stats.ping_stats.total_sent += 1;
pub(super) fn question_sent(&mut self, ts: u64, bytes: u64, expects_answer: bool) {
self.transfer_stats_accounting.add_up(bytes);
self.peer_stats.ping_stats.in_flight += 1;
self.peer_stats.ping_stats.last_pinged = Some(ts);
// if we haven't heard from this node yet and it's our first attempt at contacting it
// then we set the last_seen time
if self.peer_stats.last_seen.is_none() {
self.peer_stats.last_seen = Some(ts);
self.peer_stats.rpc_stats.messages_sent += 1;
if expects_answer {
self.peer_stats.rpc_stats.questions_in_flight += 1;
}
}
pub(super) fn ping_rcvd(&mut self, ts: u64, bytes: u64) {
self.transfer_stats_accounting.add_down(bytes);
self.touch_last_seen(ts);
}
pub(super) fn pong_sent(&mut self, _ts: u64, bytes: u64) {
self.transfer_stats_accounting.add_up(bytes);
}
pub(super) fn pong_rcvd(&mut self, send_ts: u64, recv_ts: u64, bytes: u64) {
self.transfer_stats_accounting.add_down(bytes);
self.peer_stats.ping_stats.in_flight -= 1;
self.peer_stats.ping_stats.total_returned += 1;
self.peer_stats.ping_stats.consecutive_pongs += 1;
if self
.peer_stats
.ping_stats
.first_consecutive_pong_time
.is_none()
{
self.peer_stats.ping_stats.first_consecutive_pong_time = Some(recv_ts);
}
self.record_latency(recv_ts - send_ts);
self.touch_last_seen(recv_ts);
}
pub(super) fn ping_lost(&mut self, _ts: u64) {
self.peer_stats.ping_stats.in_flight -= 1;
self.peer_stats.ping_stats.recent_lost_pings += 1;
self.peer_stats.ping_stats.consecutive_pongs = 0;
self.peer_stats.ping_stats.first_consecutive_pong_time = None;
}
pub(super) fn question_sent(&mut self, ts: u64, bytes: u64) {
self.transfer_stats_accounting.add_up(bytes);
// if we haven't heard from this node yet and it's our first attempt at contacting it
// then we set the last_seen time
if self.peer_stats.last_seen.is_none() {
self.peer_stats.last_seen = Some(ts);
}
}
pub(super) fn question_rcvd(&mut self, ts: u64, bytes: u64) {
self.transfer_stats_accounting.add_down(bytes);
self.peer_stats.rpc_stats.messages_rcvd += 1;
self.touch_last_seen(ts);
}
pub(super) fn answer_sent(&mut self, _ts: u64, bytes: u64) {
self.transfer_stats_accounting.add_up(bytes);
self.peer_stats.rpc_stats.messages_sent += 1;
}
pub(super) fn answer_rcvd(&mut self, send_ts: u64, recv_ts: u64, bytes: u64) {
self.transfer_stats_accounting.add_down(bytes);
self.peer_stats.rpc_stats.messages_rcvd += 1;
self.peer_stats.rpc_stats.questions_in_flight -= 1;
if self
.peer_stats
.rpc_stats
.first_consecutive_answer_time
.is_none()
{
self.peer_stats.rpc_stats.first_consecutive_answer_time = Some(recv_ts);
}
self.record_latency(recv_ts - send_ts);
self.touch_last_seen(recv_ts);
}
pub(super) fn question_lost(&mut self, _ts: u64) {
self.peer_stats.ping_stats.consecutive_pongs = 0;
self.peer_stats.ping_stats.first_consecutive_pong_time = None;
self.peer_stats.rpc_stats.first_consecutive_answer_time = None;
self.peer_stats.rpc_stats.questions_in_flight -= 1;
}
}

View File

@ -797,59 +797,19 @@ impl RoutingTable {
//////////////////////////////////////////////////////////////////////
// Stats Accounting
pub fn stats_ping_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) {
pub fn stats_question_sent(
&self,
node_ref: NodeRef,
ts: u64,
bytes: u64,
expects_answer: bool,
) {
self.inner
.lock()
.self_transfer_stats_accounting
.add_up(bytes);
node_ref.operate(|e| {
e.ping_sent(ts, bytes);
})
}
pub fn stats_ping_rcvd(&self, node_ref: NodeRef, ts: u64, bytes: u64) {
self.inner
.lock()
.self_transfer_stats_accounting
.add_down(bytes);
node_ref.operate(|e| {
e.ping_rcvd(ts, bytes);
})
}
pub fn stats_pong_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) {
self.inner
.lock()
.self_transfer_stats_accounting
.add_up(bytes);
node_ref.operate(|e| {
e.pong_sent(ts, bytes);
})
}
pub fn stats_pong_rcvd(&self, node_ref: NodeRef, send_ts: u64, recv_ts: u64, bytes: u64) {
self.inner
.lock()
.self_transfer_stats_accounting
.add_down(bytes);
self.inner
.lock()
.self_latency_stats_accounting
.record_latency(recv_ts - send_ts);
node_ref.operate(|e| {
e.pong_rcvd(send_ts, recv_ts, bytes);
})
}
pub fn stats_ping_lost(&self, node_ref: NodeRef, ts: u64) {
node_ref.operate(|e| {
e.ping_lost(ts);
})
}
pub fn stats_question_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) {
self.inner
.lock()
.self_transfer_stats_accounting
.add_up(bytes);
node_ref.operate(|e| {
e.question_sent(ts, bytes);
e.question_sent(ts, bytes, expects_answer);
})
}
pub fn stats_question_rcvd(&self, node_ref: NodeRef, ts: u64, bytes: u64) {

View File

@ -124,7 +124,6 @@ struct WaitableReply {
timeout: u64,
node_ref: NodeRef,
send_ts: u64,
is_ping: bool,
}
/////////////////////////////////////////////////////////////////////
@ -367,15 +366,9 @@ impl RPCProcessor {
match &out {
Err(_) => {
self.cancel_op_id_waiter(waitable_reply.op_id);
if waitable_reply.is_ping {
self.routing_table()
.stats_ping_lost(waitable_reply.node_ref.clone(), waitable_reply.send_ts);
} else {
self.routing_table().stats_question_lost(
waitable_reply.node_ref.clone(),
waitable_reply.send_ts,
);
}
self.routing_table()
.stats_question_lost(waitable_reply.node_ref.clone(), waitable_reply.send_ts);
}
Ok((rpcreader, _)) => {
// Note that we definitely received this node info since we got a reply
@ -383,21 +376,12 @@ impl RPCProcessor {
// Reply received
let recv_ts = get_timestamp();
if waitable_reply.is_ping {
self.routing_table().stats_pong_rcvd(
waitable_reply.node_ref,
waitable_reply.send_ts,
recv_ts,
rpcreader.header.body_len,
)
} else {
self.routing_table().stats_answer_rcvd(
waitable_reply.node_ref,
waitable_reply.send_ts,
recv_ts,
rpcreader.header.body_len,
)
}
self.routing_table().stats_answer_rcvd(
waitable_reply.node_ref,
waitable_reply.send_ts,
recv_ts,
rpcreader.header.body_len,
)
}
};
@ -416,16 +400,15 @@ impl RPCProcessor {
) -> Result<Option<WaitableReply>, RPCError> {
log_rpc!(self.get_rpc_request_debug_info(&dest, &message, &safety_route_spec));
let (op_id, wants_answer, is_ping) = {
let (op_id, wants_answer) = {
let operation = message
.get_root::<veilid_capnp::operation::Reader>()
.map_err(map_error_internal!("invalid operation"))
.map_err(logthru_rpc!(error))?;
let op_id = operation.get_op_id();
let wants_answer = self.wants_answer(&operation).map_err(logthru_rpc!())?;
let is_ping = operation.get_detail().has_info_q();
(op_id, wants_answer, is_ping)
(op_id, wants_answer)
};
let out_node_id;
@ -572,13 +555,8 @@ impl RPCProcessor {
// Successfully sent
let send_ts = get_timestamp();
if is_ping {
self.routing_table()
.stats_ping_sent(node_ref.clone(), send_ts, bytes);
} else {
self.routing_table()
.stats_question_sent(node_ref.clone(), send_ts, bytes);
}
self.routing_table()
.stats_question_sent(node_ref.clone(), send_ts, bytes, wants_answer);
// Pass back waitable reply completion
match eventual {
@ -592,7 +570,6 @@ impl RPCProcessor {
timeout,
node_ref,
send_ts,
is_ping,
})),
}
}
@ -610,12 +587,6 @@ impl RPCProcessor {
//
let out_node_id;
let mut out_noderef: Option<NodeRef> = None;
let is_pong = {
let operation = reply_msg
.get_root::<veilid_capnp::operation::Reader>()
.map_err(map_error_internal!("invalid operation"))?;
operation.get_detail().has_info_a()
};
let out = {
let out;
@ -753,13 +724,8 @@ impl RPCProcessor {
// Reply successfully sent
let send_ts = get_timestamp();
if is_pong {
self.routing_table()
.stats_pong_sent(node_ref, send_ts, bytes);
} else {
self.routing_table()
.stats_answer_sent(node_ref, send_ts, bytes);
}
self.routing_table()
.stats_answer_sent(node_ref, send_ts, bytes);
Ok(())
}
@ -1237,19 +1203,11 @@ impl RPCProcessor {
};
if let Some(sender_nr) = opt_sender_nr.clone() {
if which == 0u32 {
self.routing_table().stats_ping_rcvd(
sender_nr,
msg.header.timestamp,
msg.header.body_len,
);
} else {
self.routing_table().stats_question_rcvd(
sender_nr,
msg.header.timestamp,
msg.header.body_len,
);
}
self.routing_table().stats_question_rcvd(
sender_nr,
msg.header.timestamp,
msg.header.body_len,
);
}
};

View File

@ -1099,21 +1099,20 @@ pub struct TransferStatsDownUp {
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct PingStats {
pub in_flight: u32, // number of pings issued that have yet to be answered
pub total_sent: u32, // number of pings that have been sent in the total_time range
pub total_returned: u32, // number of pings that have been returned by the node in the total_time range
pub consecutive_pongs: u32, // number of pongs that have been received and returned consecutively without a lost ping
pub last_pinged: Option<u64>, // when the peer was last pinged
pub first_consecutive_pong_time: Option<u64>, // the timestamp of the first pong in a series of consecutive pongs
pub recent_lost_pings: u32, // number of pings that have been lost since we lost reliability
pub struct RPCStats {
pub messages_sent: u32, // number of rpcs that have been sent in the total_time range
pub messages_rcvd: u32, // number of rpcs that have been received in the total_time range
pub questions_in_flight: u32, // number of questions issued that have yet to be answered
//pub last_question: Option<u64>, // when the peer was last questioned and we want an answer
pub first_consecutive_answer_time: Option<u64>, // the timestamp of the first answer in a series of consecutive questions
pub recent_lost_answers: u32, // number of answers that have been lost since we lost reliability
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct PeerStats {
pub time_added: u64, // when the peer was added to the routing table
pub last_seen: Option<u64>, // when the peer was last seen for any reason, including when we first attempted to reach out to it
pub ping_stats: PingStats, // information about pings
pub rpc_stats: RPCStats, // information about RPCs
pub latency: Option<LatencyStats>, // latencies for communications with the peer
pub transfer: TransferStatsDownUp, // Stats for communications with the peer
pub status: Option<NodeStatus>, // Last known node status