alignment refactor

This commit is contained in:
John Smith
2022-12-16 21:55:03 -05:00
parent 221c09b555
commit 16d74b96f3
23 changed files with 138 additions and 108 deletions
+17 -17
View File
@@ -384,11 +384,11 @@ impl BucketEntryInner {
rti: &RoutingTableInner,
only_live: bool,
filter: Option<NodeRefFilter>,
) -> Vec<(ConnectionDescriptor, u64)> {
) -> Vec<(ConnectionDescriptor, Timestamp)> {
let connection_manager =
rti.unlocked_inner.network_manager.connection_manager();
let mut out: Vec<(ConnectionDescriptor, u64)> = self
let mut out: Vec<(ConnectionDescriptor, Timestamp)> = self
.last_connections
.iter()
.filter_map(|(k, v)| {
@@ -432,7 +432,7 @@ impl BucketEntryInner {
// If this is not connection oriented, then we check our last seen time
// to see if this mapping has expired (beyond our timeout)
let cur_ts = get_aligned_timestamp();
(v.1 + (CONNECTIONLESS_TIMEOUT_SECS as u64 * 1_000_000u64)) >= cur_ts
(v.1 + TimestampDuration::new(CONNECTIONLESS_TIMEOUT_SECS as u64 * 1_000_000u64)) >= cur_ts
};
if alive {
@@ -554,7 +554,7 @@ impl BucketEntryInner {
match self.peer_stats.rpc_stats.first_consecutive_seen_ts {
None => false,
Some(ts) => {
cur_ts.saturating_sub(ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64)
cur_ts.saturating_sub(ts) >= TimestampDuration::new(UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64)
}
}
}
@@ -569,7 +569,7 @@ impl BucketEntryInner {
match self.peer_stats.rpc_stats.last_seen_ts {
None => self.peer_stats.rpc_stats.recent_lost_answers < NEVER_REACHED_PING_COUNT,
Some(ts) => {
cur_ts.saturating_sub(ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64)
cur_ts.saturating_sub(ts) >= TimestampDuration::new(UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64)
}
}
}
@@ -582,7 +582,7 @@ impl BucketEntryInner {
.max(self.peer_stats.rpc_stats.last_question_ts)
}
fn needs_constant_ping(&self, cur_ts: Timestamp, interval: Timestamp) -> bool {
fn needs_constant_ping(&self, cur_ts: Timestamp, interval_us: TimestampDuration) -> bool {
// If we have not either seen the node in the last 'interval' then we should ping it
let latest_contact_time = self.latest_contact_time();
@@ -590,7 +590,7 @@ impl BucketEntryInner {
None => true,
Some(latest_contact_time) => {
// If we haven't done anything with this node in 'interval' seconds
cur_ts.saturating_sub(latest_contact_time) >= (interval * 1000000u64)
cur_ts.saturating_sub(latest_contact_time) >= interval_us
}
}
}
@@ -603,7 +603,7 @@ impl BucketEntryInner {
// If this entry needs a keepalive (like a relay node),
// then we should ping it regularly to keep our association alive
if needs_keepalive {
return self.needs_constant_ping(cur_ts, KEEPALIVE_PING_INTERVAL_SECS as u64);
return self.needs_constant_ping(cur_ts, TimestampDuration::new(KEEPALIVE_PING_INTERVAL_SECS as u64 * 1000000u64));
}
// If we don't have node status for this node, then we should ping it to get some node status
@@ -636,8 +636,8 @@ impl BucketEntryInner {
latest_contact_time.saturating_sub(start_of_reliable_time);
retry_falloff_log(
reliable_last,
reliable_cur,
reliable_last.as_u64(),
reliable_cur.as_u64(),
RELIABLE_PING_INTERVAL_START_SECS as u64 * 1_000_000u64,
RELIABLE_PING_INTERVAL_MAX_SECS as u64 * 1_000_000u64,
RELIABLE_PING_INTERVAL_MULTIPLIER,
@@ -647,7 +647,7 @@ impl BucketEntryInner {
}
BucketEntryState::Unreliable => {
// If we are in an unreliable state, we need a ping every UNRELIABLE_PING_INTERVAL_SECS seconds
self.needs_constant_ping(cur_ts, UNRELIABLE_PING_INTERVAL_SECS as u64)
self.needs_constant_ping(cur_ts, TimestampDuration::new(UNRELIABLE_PING_INTERVAL_SECS as u64 * 1000000u64))
}
BucketEntryState::Dead => false,
}
@@ -673,7 +673,7 @@ impl BucketEntryInner {
{
format!(
"{}s ago",
timestamp_to_secs(cur_ts.saturating_sub(first_consecutive_seen_ts))
timestamp_to_secs(cur_ts.saturating_sub(first_consecutive_seen_ts).as_u64())
)
} else {
"never".to_owned()
@@ -681,7 +681,7 @@ impl BucketEntryInner {
let last_seen_ts_str = if let Some(last_seen_ts) = self.peer_stats.rpc_stats.last_seen_ts {
format!(
"{}s ago",
timestamp_to_secs(cur_ts.saturating_sub(last_seen_ts))
timestamp_to_secs(cur_ts.saturating_sub(last_seen_ts).as_u64())
)
} else {
"never".to_owned()
@@ -698,7 +698,7 @@ impl BucketEntryInner {
////////////////////////////////////////////////////////////////
/// Called when rpc processor things happen
pub(super) fn question_sent(&mut self, ts: Timestamp, bytes: u64, expects_answer: bool) {
pub(super) fn question_sent(&mut self, ts: Timestamp, bytes: ByteCount, expects_answer: bool) {
self.transfer_stats_accounting.add_up(bytes);
self.peer_stats.rpc_stats.messages_sent += 1;
self.peer_stats.rpc_stats.failed_to_send = 0;
@@ -707,7 +707,7 @@ impl BucketEntryInner {
self.peer_stats.rpc_stats.last_question_ts = Some(ts);
}
}
pub(super) fn question_rcvd(&mut self, ts: Timestamp, bytes: u64) {
pub(super) fn question_rcvd(&mut self, ts: Timestamp, bytes: ByteCount) {
self.transfer_stats_accounting.add_down(bytes);
self.peer_stats.rpc_stats.messages_rcvd += 1;
self.touch_last_seen(ts);
@@ -755,12 +755,12 @@ impl BucketEntry {
updated_since_last_network_change: false,
last_connections: BTreeMap::new(),
local_network: BucketEntryLocalNetwork {
last_seen_our_node_info_ts: 0,
last_seen_our_node_info_ts: Timestamp::new(0u64),
signed_node_info: None,
node_status: None,
},
public_internet: BucketEntryPublicInternet {
last_seen_our_node_info_ts: 0,
last_seen_our_node_info_ts: Timestamp::new(0u64),
signed_node_info: None,
node_status: None,
},
+1 -1
View File
@@ -402,7 +402,7 @@ impl RoutingTable {
}
/// Return our current node info timestamp
pub fn get_own_node_info_ts(&self, routing_domain: RoutingDomain) -> Option<u64> {
pub fn get_own_node_info_ts(&self, routing_domain: RoutingDomain) -> Option<Timestamp> {
self.inner.read().get_own_node_info_ts(routing_domain)
}
@@ -7,7 +7,7 @@ use rkyv::{
/// The size of the remote private route cache
const REMOTE_PRIVATE_ROUTE_CACHE_SIZE: usize = 1024;
/// Remote private route cache entries expire in 5 minutes if they haven't been used
const REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY: TimestampDuration = 300_000_000u64.into();
const REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY: TimestampDuration = TimestampDuration::new(300_000_000u64);
/// Amount of time a route can remain idle before it gets tested
const ROUTE_MIN_IDLE_TIME_MS: u32 = 30_000;
@@ -39,16 +39,16 @@ pub struct RouteStats {
#[with(Skip)]
pub questions_lost: u32,
/// Timestamp of when the route was created
pub created_ts: u64,
pub created_ts: Timestamp,
/// Timestamp of when the route was last checked for validity
#[with(Skip)]
pub last_tested_ts: Option<u64>,
pub last_tested_ts: Option<Timestamp>,
/// Timestamp of when the route was last sent to
#[with(Skip)]
pub last_sent_ts: Option<u64>,
pub last_sent_ts: Option<Timestamp>,
/// Timestamp of when the route was last received over
#[with(Skip)]
pub last_received_ts: Option<u64>,
pub last_received_ts: Option<Timestamp>,
/// Transfers up and down
pub transfer_stats_down_up: TransferStatsDownUp,
/// Latency stats
@@ -63,7 +63,7 @@ pub struct RouteStats {
impl RouteStats {
/// Make new route stats
pub fn new(created_ts: u64) -> Self {
pub fn new(created_ts: Timestamp) -> Self {
Self {
created_ts,
..Default::default()
@@ -143,7 +143,9 @@ impl RouteStats {
// Has the route been tested within the idle time we'd want to check things?
// (also if we've received successfully over the route, this will get set)
if let Some(last_tested_ts) = self.last_tested_ts {
if cur_ts.saturating_sub(last_tested_ts) > (ROUTE_MIN_IDLE_TIME_MS as u64 * 1000u64) {
if cur_ts.saturating_sub(last_tested_ts)
> TimestampDuration::new(ROUTE_MIN_IDLE_TIME_MS as u64 * 1000u64)
{
return true;
}
} else {
@@ -210,9 +212,9 @@ pub struct RemotePrivateRouteInfo {
// The private route itself
private_route: Option<PrivateRoute>,
/// Did this remote private route see our node info due to no safety route in use
last_seen_our_node_info_ts: u64,
last_seen_our_node_info_ts: Timestamp,
/// Last time this remote private route was requested for any reason (cache expiration)
last_touched_ts: u64,
last_touched_ts: Timestamp,
/// Stats
stats: RouteStats,
}
@@ -1630,7 +1632,7 @@ impl RouteSpecStore {
.and_modify(|rpr| {
if cur_ts.saturating_sub(rpr.last_touched_ts) >= REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY {
// Start fresh if this had expired
rpr.last_seen_our_node_info_ts = 0;
rpr.last_seen_our_node_info_ts = Timestamp::new(0);
rpr.last_touched_ts = cur_ts;
rpr.stats = RouteStats::new(cur_ts);
} else {
@@ -1641,7 +1643,7 @@ impl RouteSpecStore {
.or_insert_with(|| RemotePrivateRouteInfo {
// New remote private route cache entry
private_route: Some(private_route),
last_seen_our_node_info_ts: 0,
last_seen_our_node_info_ts: Timestamp::new(0),
last_touched_ts: cur_ts,
stats: RouteStats::new(cur_ts),
});
@@ -262,7 +262,7 @@ impl RoutingTableInner {
}
/// Return our current node info timestamp
pub fn get_own_node_info_ts(&self, routing_domain: RoutingDomain) -> Option<u64> {
pub fn get_own_node_info_ts(&self, routing_domain: RoutingDomain) -> Option<Timestamp> {
self.with_routing_domain(routing_domain, |rdd| {
if !rdd.common().has_valid_own_node_info() {
None
@@ -440,7 +440,7 @@ impl RoutingTableInner {
pub fn with_entries<T, F: FnMut(&RoutingTableInner, DHTKey, Arc<BucketEntry>) -> Option<T>>(
&self,
cur_ts: u64,
cur_ts: Timestamp,
min_state: BucketEntryState,
mut f: F,
) -> Option<T> {
@@ -812,7 +812,7 @@ impl RoutingTableInner {
pub fn find_peers_with_sort_and_filter<C, T, O>(
&self,
node_count: usize,
cur_ts: u64,
cur_ts: Timestamp,
mut filters: VecDeque<RoutingTableEntryFilter>,
mut compare: C,
mut transform: T,
@@ -56,12 +56,12 @@ impl TransferStatsAccounting {
self.current_transfer = TransferCount::default();
transfer_stats.down.maximum = 0;
transfer_stats.up.maximum = 0;
transfer_stats.down.minimum = u64::MAX;
transfer_stats.up.minimum = u64::MAX;
transfer_stats.down.average = 0;
transfer_stats.up.average = 0;
transfer_stats.down.maximum = 0.into();
transfer_stats.up.maximum = 0.into();
transfer_stats.down.minimum = u64::MAX.into();
transfer_stats.up.minimum = u64::MAX.into();
transfer_stats.down.average = 0.into();
transfer_stats.up.average = 0.into();
for xfer in &self.rolling_transfers {
let bpsd = xfer.down * 1000u64 / dur_ms;
let bpsu = xfer.up * 1000u64 / dur_ms;
@@ -97,9 +97,9 @@ impl LatencyStatsAccounting {
self.rolling_latencies.push_back(latency);
let mut ls = LatencyStats {
fastest: u64::MAX,
average: 0,
slowest: 0,
fastest: u64::MAX.into(),
average: 0.into(),
slowest: 0.into(),
};
for rl in &self.rolling_latencies {
ls.fastest.min_assign(*rl);
+9 -5
View File
@@ -18,7 +18,7 @@ impl RoutingTable {
.set_routine(move |s, l, t| {
Box::pin(
this.clone()
.rolling_transfers_task_routine(s, l, t)
.rolling_transfers_task_routine(s, Timestamp::new(l), Timestamp::new(t))
.instrument(trace_span!(
parent: None,
"RoutingTable rolling transfers task routine"
@@ -35,7 +35,7 @@ impl RoutingTable {
.set_routine(move |s, l, t| {
Box::pin(
this.clone()
.kick_buckets_task_routine(s, l, t)
.kick_buckets_task_routine(s, Timestamp::new(l), Timestamp::new(t))
.instrument(trace_span!(parent: None, "kick buckets task routine")),
)
});
@@ -80,7 +80,7 @@ impl RoutingTable {
.set_routine(move |s, l, t| {
Box::pin(
this.clone()
.ping_validator_task_routine(s, l, t)
.ping_validator_task_routine(s, Timestamp::new(l), Timestamp::new(t))
.instrument(trace_span!(parent: None, "ping validator task routine")),
)
});
@@ -94,7 +94,7 @@ impl RoutingTable {
.set_routine(move |s, l, t| {
Box::pin(
this.clone()
.relay_management_task_routine(s, l, t)
.relay_management_task_routine(s, Timestamp::new(l), Timestamp::new(t))
.instrument(trace_span!(parent: None, "relay management task routine")),
)
});
@@ -108,7 +108,11 @@ impl RoutingTable {
.set_routine(move |s, l, t| {
Box::pin(
this.clone()
.private_route_management_task_routine(s, l, t)
.private_route_management_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
)
.instrument(trace_span!(
parent: None,
"private route management task routine"
@@ -91,7 +91,7 @@ impl RoutingTable {
#[instrument(level = "trace", skip(self), err)]
fn ping_validator_local_network(
&self,
cur_ts: u64,
cur_ts: Timestamp,
unord: &mut FuturesUnordered<
SendPinBoxFuture<Result<NetworkResult<Answer<Option<SenderInfo>>>, RPCError>>,
>,
@@ -122,8 +122,8 @@ impl RoutingTable {
pub(crate) async fn ping_validator_task_routine(
self,
stop_token: StopToken,
_last_ts: u64,
cur_ts: u64,
_last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {
let mut unord = FuturesUnordered::new();