From 16d74b96f3d5798ec644790100417ec007652c8c Mon Sep 17 00:00:00 2001 From: John Smith Date: Fri, 16 Dec 2022 21:55:03 -0500 Subject: [PATCH] alignment refactor --- veilid-cli/src/client_api_connection.rs | 8 +++-- veilid-cli/src/command_processor.rs | 12 ++++--- veilid-cli/src/peers_table_view.rs | 8 +++-- .../src/network_manager/connection_limits.rs | 10 +++--- veilid-core/src/network_manager/mod.rs | 16 ++++----- veilid-core/src/network_manager/native/mod.rs | 16 ++++----- .../src/network_manager/native/network_udp.rs | 2 +- veilid-core/src/network_manager/tasks/mod.rs | 8 +++-- veilid-core/src/routing_table/bucket_entry.rs | 34 +++++++++---------- veilid-core/src/routing_table/mod.rs | 2 +- .../src/routing_table/route_spec_store.rs | 24 +++++++------ .../src/routing_table/routing_table_inner.rs | 6 ++-- .../src/routing_table/stats_accounting.rs | 18 +++++----- veilid-core/src/routing_table/tasks/mod.rs | 14 +++++--- .../src/routing_table/tasks/ping_validator.rs | 6 ++-- .../operations/operation_cancel_tunnel.rs | 8 ++--- .../operations/operation_complete_tunnel.rs | 4 +-- .../operations/operation_start_tunnel.rs | 4 +-- .../src/rpc_processor/coders/tunnel.rs | 16 ++++----- veilid-core/src/rpc_processor/mod.rs | 14 ++++---- veilid-core/src/veilid_api/aligned_u64.rs | 12 +++++++ veilid-core/src/veilid_api/types.rs | 2 +- veilid-server/src/client_api.rs | 2 +- 23 files changed, 138 insertions(+), 108 deletions(-) diff --git a/veilid-cli/src/client_api_connection.rs b/veilid-cli/src/client_api_connection.rs index 0c8980aa..5999ab22 100644 --- a/veilid-cli/src/client_api_connection.rs +++ b/veilid-cli/src/client_api_connection.rs @@ -444,7 +444,11 @@ impl ClientApiConnection { res.map_err(map_to_string) } - pub async fn server_appcall_reply(&mut self, id: u64, msg: Vec) -> Result<(), String> { + pub async fn server_appcall_reply( + &mut self, + id: OperationId, + msg: Vec, + ) -> Result<(), String> { trace!("ClientApiConnection::appcall_reply"); let server = { let inner = self.inner.borrow(); @@ -455,7 +459,7 @@ impl ClientApiConnection { .clone() }; let mut request = server.borrow().app_call_reply_request(); - request.get().set_id(id); + request.get().set_id(id.as_u64()); request.get().set_message(&msg); let response = self .cancellable(request.send().promise) diff --git a/veilid-cli/src/command_processor.rs b/veilid-cli/src/command_processor.rs index 39d91f18..77ec149b 100644 --- a/veilid-cli/src/command_processor.rs +++ b/veilid-cli/src/command_processor.rs @@ -47,7 +47,7 @@ struct CommandProcessorInner { autoreconnect: bool, server_addr: Option, connection_waker: Eventual, - last_call_id: Option, + last_call_id: Option, } type Handle = Rc>; @@ -249,7 +249,7 @@ reply - reply to an AppCall not handled directly by the server } Ok(v) => v, }; - (id, second) + (OperationId::new(id), second) } else { let id = match some_last_id { None => { @@ -394,8 +394,8 @@ reply - reply to an AppCall not handled directly by the server pub fn update_network_status(&mut self, network: veilid_core::VeilidStateNetwork) { self.inner_mut().ui.set_network_status( network.started, - network.bps_down, - network.bps_up, + network.bps_down.as_u64(), + network.bps_up.as_u64(), network.peers, ); } @@ -471,7 +471,9 @@ reply - reply to an AppCall not handled directly by the server self.inner().ui.add_node_event(format!( "AppCall ({:?}) id = {:016x} : {}", - call.sender, call.id, strmsg + call.sender, + call.id.as_u64(), + strmsg )); self.inner_mut().last_call_id = Some(call.id); diff --git a/veilid-cli/src/peers_table_view.rs b/veilid-cli/src/peers_table_view.rs index 1ba95dd3..870eaea5 100644 --- a/veilid-cli/src/peers_table_view.rs +++ b/veilid-cli/src/peers_table_view.rs @@ -1,7 +1,7 @@ use super::*; use cursive_table_view::*; use std::cmp::Ordering; -use veilid_core::PeerTableData; +use veilid_core::*; #[derive(Copy, Clone, PartialEq, Eq, Hash)] pub enum PeerTableColumn { @@ -24,7 +24,8 @@ pub enum PeerTableColumn { // } // } -fn format_ts(ts: u64) -> String { +fn format_ts(ts: Timestamp) -> String { + let ts = ts.as_u64(); let secs = timestamp_to_secs(ts); if secs >= 1.0 { format!("{:.2}s", timestamp_to_secs(ts)) @@ -33,7 +34,8 @@ fn format_ts(ts: u64) -> String { } } -fn format_bps(bps: u64) -> String { +fn format_bps(bps: ByteCount) -> String { + let bps = bps.as_u64(); if bps >= 1024u64 * 1024u64 * 1024u64 { format!("{:.2}GB/s", (bps / (1024u64 * 1024u64)) as f64 / 1024.0) } else if bps >= 1024u64 * 1024u64 { diff --git a/veilid-core/src/network_manager/connection_limits.rs b/veilid-core/src/network_manager/connection_limits.rs index d62cdc01..c118c4e9 100644 --- a/veilid-core/src/network_manager/connection_limits.rs +++ b/veilid-core/src/network_manager/connection_limits.rs @@ -21,8 +21,8 @@ pub struct ConnectionLimits { max_connection_frequency_per_min: usize, conn_count_by_ip4: BTreeMap, conn_count_by_ip6_prefix: BTreeMap, - conn_timestamps_by_ip4: BTreeMap>, - conn_timestamps_by_ip6_prefix: BTreeMap>, + conn_timestamps_by_ip4: BTreeMap>, + conn_timestamps_by_ip6_prefix: BTreeMap>, } impl ConnectionLimits { @@ -48,7 +48,7 @@ impl ConnectionLimits { for (key, value) in &mut self.conn_timestamps_by_ip4 { value.retain(|v| { // keep timestamps that are less than a minute away - cur_ts.saturating_sub(*v) < 60_000_000u64 + cur_ts.saturating_sub(*v) < TimestampDuration::new(60_000_000u64) }); if value.is_empty() { dead_keys.push(*key); @@ -64,7 +64,7 @@ impl ConnectionLimits { for (key, value) in &mut self.conn_timestamps_by_ip6_prefix { value.retain(|v| { // keep timestamps that are less than a minute away - cur_ts.saturating_sub(*v) < 60_000_000u64 + cur_ts.saturating_sub(*v) < TimestampDuration::new(60_000_000u64) }); if value.is_empty() { dead_keys.push(*key); @@ -95,7 +95,7 @@ impl ConnectionLimits { let tstamps = &mut self.conn_timestamps_by_ip4.entry(v4).or_default(); tstamps.retain(|v| { // keep timestamps that are less than a minute away - ts.saturating_sub(*v) < 60_000_000u64 + ts.saturating_sub(*v) < TimestampDuration::new(60_000_000u64) }); assert!(tstamps.len() <= self.max_connection_frequency_per_min); if tstamps.len() == self.max_connection_frequency_per_min { diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index e5b27aeb..d23677f2 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -38,12 +38,12 @@ use wasm::*; pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE; pub const IPADDR_TABLE_SIZE: usize = 1024; -pub const IPADDR_MAX_INACTIVE_DURATION_US: u64 = 300_000_000u64; // 5 minutes +pub const IPADDR_MAX_INACTIVE_DURATION_US: TimestampDuration = TimestampDuration::new(300_000_000u64); // 5 minutes pub const PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT: usize = 3; pub const PUBLIC_ADDRESS_CHECK_CACHE_SIZE: usize = 8; pub const PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS: u32 = 60; -pub const PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US: u64 = 300_000_000u64; // 5 minutes -pub const PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US: u64 = 3600_000_000u64; // 60 minutes +pub const PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US: TimestampDuration = TimestampDuration::new(300_000_000u64); // 5 minutes +pub const PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US: TimestampDuration = TimestampDuration::new(3600_000_000u64); // 60 minutes pub const BOOT_MAGIC: &[u8; 4] = b"BOOT"; #[derive(Copy, Clone, Debug, Default)] @@ -138,7 +138,7 @@ struct NetworkManagerInner { public_address_check_cache: BTreeMap>, public_address_inconsistencies_table: - BTreeMap>, + BTreeMap>, } struct NetworkManagerUnlockedInner { @@ -426,7 +426,7 @@ impl NetworkManager { pub fn purge_client_whitelist(&self) { let timeout_ms = self.with_config(|c| c.network.client_whitelist_timeout_ms); let mut inner = self.inner.lock(); - let cutoff_timestamp = get_aligned_timestamp() - ((timeout_ms as u64) * 1000u64); + let cutoff_timestamp = get_aligned_timestamp() - TimestampDuration::new((timeout_ms as u64) * 1000u64); // Remove clients from the whitelist that haven't been since since our whitelist timeout while inner .client_whitelist @@ -1285,7 +1285,7 @@ impl NetworkManager { // Network accounting self.stats_packet_rcvd( connection_descriptor.remote_address().to_ip_addr(), - data.len() as u64, + ByteCount::new(data.len() as u64), ); // If this is a zero length packet, just drop it, because these are used for hole punching @@ -1447,7 +1447,7 @@ impl NetworkManager { } // Callbacks from low level network for statistics gathering - pub fn stats_packet_sent(&self, addr: IpAddr, bytes: u64) { + pub fn stats_packet_sent(&self, addr: IpAddr, bytes: ByteCount) { let inner = &mut *self.inner.lock(); inner .stats @@ -1463,7 +1463,7 @@ impl NetworkManager { .add_up(bytes); } - pub fn stats_packet_rcvd(&self, addr: IpAddr, bytes: u64) { + pub fn stats_packet_rcvd(&self, addr: IpAddr, bytes: ByteCount) { let inner = &mut *self.inner.lock(); inner .stats diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 9a532d3a..c38f799b 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -406,7 +406,7 @@ impl Network { } // Network accounting self.network_manager() - .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); + .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); Ok(NetworkResult::Value(())) } @@ -440,7 +440,7 @@ impl Network { .await .wrap_err("send message failure")?); self.network_manager() - .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); + .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); // receive single response let mut out = vec![0u8; MAX_MESSAGE_SIZE]; @@ -454,7 +454,7 @@ impl Network { let recv_socket_addr = recv_addr.remote_address().to_socket_addr(); self.network_manager() - .stats_packet_rcvd(recv_socket_addr.ip(), recv_len as u64); + .stats_packet_rcvd(recv_socket_addr.ip(), ByteCount::new(recv_len as u64)); // if the from address is not the same as the one we sent to, then drop this if recv_socket_addr != peer_socket_addr { @@ -481,7 +481,7 @@ impl Network { network_result_try!(pnc.send(data).await.wrap_err("send failure")?); self.network_manager() - .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); + .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); let out = network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv()) .await @@ -489,7 +489,7 @@ impl Network { .wrap_err("recv failure")?); self.network_manager() - .stats_packet_rcvd(dial_info.to_ip_addr(), out.len() as u64); + .stats_packet_rcvd(dial_info.to_ip_addr(), ByteCount::new(out.len() as u64)); Ok(NetworkResult::Value(out)) } @@ -519,7 +519,7 @@ impl Network { // Network accounting self.network_manager() - .stats_packet_sent(peer_socket_addr.ip(), data_len as u64); + .stats_packet_sent(peer_socket_addr.ip(), ByteCount::new(data_len as u64)); // Data was consumed return Ok(None); @@ -536,7 +536,7 @@ impl Network { // Network accounting self.network_manager().stats_packet_sent( descriptor.remote().to_socket_addr().ip(), - data_len as u64, + ByteCount::new(data_len as u64), ); // Data was consumed @@ -595,7 +595,7 @@ impl Network { // Network accounting self.network_manager() - .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); + .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); Ok(NetworkResult::value(connection_descriptor)) } diff --git a/veilid-core/src/network_manager/native/network_udp.rs b/veilid-core/src/network_manager/native/network_udp.rs index 1174238b..71bb9ece 100644 --- a/veilid-core/src/network_manager/native/network_udp.rs +++ b/veilid-core/src/network_manager/native/network_udp.rs @@ -65,7 +65,7 @@ impl Network { // Network accounting network_manager.stats_packet_rcvd( descriptor.remote_address().to_ip_addr(), - size as u64, + ByteCount::new(size as u64), ); // Pass it up for processing diff --git a/veilid-core/src/network_manager/tasks/mod.rs b/veilid-core/src/network_manager/tasks/mod.rs index e5ec11f8..03f76a42 100644 --- a/veilid-core/src/network_manager/tasks/mod.rs +++ b/veilid-core/src/network_manager/tasks/mod.rs @@ -13,7 +13,7 @@ impl NetworkManager { .set_routine(move |s, l, t| { Box::pin( this.clone() - .rolling_transfers_task_routine(s, l, t) + .rolling_transfers_task_routine(s, Timestamp::new(l), Timestamp::new(t)) .instrument(trace_span!( parent: None, "NetworkManager rolling transfers task routine" @@ -30,7 +30,11 @@ impl NetworkManager { .set_routine(move |s, l, t| { Box::pin( this.clone() - .public_address_check_task_routine(s, l, t) + .public_address_check_task_routine( + s, + Timestamp::new(l), + Timestamp::new(t), + ) .instrument(trace_span!( parent: None, "public address check task routine" diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index ad8b1320..3ca93132 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -384,11 +384,11 @@ impl BucketEntryInner { rti: &RoutingTableInner, only_live: bool, filter: Option, - ) -> Vec<(ConnectionDescriptor, u64)> { + ) -> Vec<(ConnectionDescriptor, Timestamp)> { let connection_manager = rti.unlocked_inner.network_manager.connection_manager(); - let mut out: Vec<(ConnectionDescriptor, u64)> = self + let mut out: Vec<(ConnectionDescriptor, Timestamp)> = self .last_connections .iter() .filter_map(|(k, v)| { @@ -432,7 +432,7 @@ impl BucketEntryInner { // If this is not connection oriented, then we check our last seen time // to see if this mapping has expired (beyond our timeout) let cur_ts = get_aligned_timestamp(); - (v.1 + (CONNECTIONLESS_TIMEOUT_SECS as u64 * 1_000_000u64)) >= cur_ts + (v.1 + TimestampDuration::new(CONNECTIONLESS_TIMEOUT_SECS as u64 * 1_000_000u64)) >= cur_ts }; if alive { @@ -554,7 +554,7 @@ impl BucketEntryInner { match self.peer_stats.rpc_stats.first_consecutive_seen_ts { None => false, Some(ts) => { - cur_ts.saturating_sub(ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64) + cur_ts.saturating_sub(ts) >= TimestampDuration::new(UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64) } } } @@ -569,7 +569,7 @@ impl BucketEntryInner { match self.peer_stats.rpc_stats.last_seen_ts { None => self.peer_stats.rpc_stats.recent_lost_answers < NEVER_REACHED_PING_COUNT, Some(ts) => { - cur_ts.saturating_sub(ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64) + cur_ts.saturating_sub(ts) >= TimestampDuration::new(UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64) } } } @@ -582,7 +582,7 @@ impl BucketEntryInner { .max(self.peer_stats.rpc_stats.last_question_ts) } - fn needs_constant_ping(&self, cur_ts: Timestamp, interval: Timestamp) -> bool { + fn needs_constant_ping(&self, cur_ts: Timestamp, interval_us: TimestampDuration) -> bool { // If we have not either seen the node in the last 'interval' then we should ping it let latest_contact_time = self.latest_contact_time(); @@ -590,7 +590,7 @@ impl BucketEntryInner { None => true, Some(latest_contact_time) => { // If we haven't done anything with this node in 'interval' seconds - cur_ts.saturating_sub(latest_contact_time) >= (interval * 1000000u64) + cur_ts.saturating_sub(latest_contact_time) >= interval_us } } } @@ -603,7 +603,7 @@ impl BucketEntryInner { // If this entry needs a keepalive (like a relay node), // then we should ping it regularly to keep our association alive if needs_keepalive { - return self.needs_constant_ping(cur_ts, KEEPALIVE_PING_INTERVAL_SECS as u64); + return self.needs_constant_ping(cur_ts, TimestampDuration::new(KEEPALIVE_PING_INTERVAL_SECS as u64 * 1000000u64)); } // If we don't have node status for this node, then we should ping it to get some node status @@ -636,8 +636,8 @@ impl BucketEntryInner { latest_contact_time.saturating_sub(start_of_reliable_time); retry_falloff_log( - reliable_last, - reliable_cur, + reliable_last.as_u64(), + reliable_cur.as_u64(), RELIABLE_PING_INTERVAL_START_SECS as u64 * 1_000_000u64, RELIABLE_PING_INTERVAL_MAX_SECS as u64 * 1_000_000u64, RELIABLE_PING_INTERVAL_MULTIPLIER, @@ -647,7 +647,7 @@ impl BucketEntryInner { } BucketEntryState::Unreliable => { // If we are in an unreliable state, we need a ping every UNRELIABLE_PING_INTERVAL_SECS seconds - self.needs_constant_ping(cur_ts, UNRELIABLE_PING_INTERVAL_SECS as u64) + self.needs_constant_ping(cur_ts, TimestampDuration::new(UNRELIABLE_PING_INTERVAL_SECS as u64 * 1000000u64)) } BucketEntryState::Dead => false, } @@ -673,7 +673,7 @@ impl BucketEntryInner { { format!( "{}s ago", - timestamp_to_secs(cur_ts.saturating_sub(first_consecutive_seen_ts)) + timestamp_to_secs(cur_ts.saturating_sub(first_consecutive_seen_ts).as_u64()) ) } else { "never".to_owned() @@ -681,7 +681,7 @@ impl BucketEntryInner { let last_seen_ts_str = if let Some(last_seen_ts) = self.peer_stats.rpc_stats.last_seen_ts { format!( "{}s ago", - timestamp_to_secs(cur_ts.saturating_sub(last_seen_ts)) + timestamp_to_secs(cur_ts.saturating_sub(last_seen_ts).as_u64()) ) } else { "never".to_owned() @@ -698,7 +698,7 @@ impl BucketEntryInner { //////////////////////////////////////////////////////////////// /// Called when rpc processor things happen - pub(super) fn question_sent(&mut self, ts: Timestamp, bytes: u64, expects_answer: bool) { + pub(super) fn question_sent(&mut self, ts: Timestamp, bytes: ByteCount, expects_answer: bool) { self.transfer_stats_accounting.add_up(bytes); self.peer_stats.rpc_stats.messages_sent += 1; self.peer_stats.rpc_stats.failed_to_send = 0; @@ -707,7 +707,7 @@ impl BucketEntryInner { self.peer_stats.rpc_stats.last_question_ts = Some(ts); } } - pub(super) fn question_rcvd(&mut self, ts: Timestamp, bytes: u64) { + pub(super) fn question_rcvd(&mut self, ts: Timestamp, bytes: ByteCount) { self.transfer_stats_accounting.add_down(bytes); self.peer_stats.rpc_stats.messages_rcvd += 1; self.touch_last_seen(ts); @@ -755,12 +755,12 @@ impl BucketEntry { updated_since_last_network_change: false, last_connections: BTreeMap::new(), local_network: BucketEntryLocalNetwork { - last_seen_our_node_info_ts: 0, + last_seen_our_node_info_ts: Timestamp::new(0u64), signed_node_info: None, node_status: None, }, public_internet: BucketEntryPublicInternet { - last_seen_our_node_info_ts: 0, + last_seen_our_node_info_ts: Timestamp::new(0u64), signed_node_info: None, node_status: None, }, diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index e83fb9eb..054bbc6d 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -402,7 +402,7 @@ impl RoutingTable { } /// Return our current node info timestamp - pub fn get_own_node_info_ts(&self, routing_domain: RoutingDomain) -> Option { + pub fn get_own_node_info_ts(&self, routing_domain: RoutingDomain) -> Option { self.inner.read().get_own_node_info_ts(routing_domain) } diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index 2e6d8fda..a9a9e847 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -7,7 +7,7 @@ use rkyv::{ /// The size of the remote private route cache const REMOTE_PRIVATE_ROUTE_CACHE_SIZE: usize = 1024; /// Remote private route cache entries expire in 5 minutes if they haven't been used -const REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY: TimestampDuration = 300_000_000u64.into(); +const REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY: TimestampDuration = TimestampDuration::new(300_000_000u64); /// Amount of time a route can remain idle before it gets tested const ROUTE_MIN_IDLE_TIME_MS: u32 = 30_000; @@ -39,16 +39,16 @@ pub struct RouteStats { #[with(Skip)] pub questions_lost: u32, /// Timestamp of when the route was created - pub created_ts: u64, + pub created_ts: Timestamp, /// Timestamp of when the route was last checked for validity #[with(Skip)] - pub last_tested_ts: Option, + pub last_tested_ts: Option, /// Timestamp of when the route was last sent to #[with(Skip)] - pub last_sent_ts: Option, + pub last_sent_ts: Option, /// Timestamp of when the route was last received over #[with(Skip)] - pub last_received_ts: Option, + pub last_received_ts: Option, /// Transfers up and down pub transfer_stats_down_up: TransferStatsDownUp, /// Latency stats @@ -63,7 +63,7 @@ pub struct RouteStats { impl RouteStats { /// Make new route stats - pub fn new(created_ts: u64) -> Self { + pub fn new(created_ts: Timestamp) -> Self { Self { created_ts, ..Default::default() @@ -143,7 +143,9 @@ impl RouteStats { // Has the route been tested within the idle time we'd want to check things? // (also if we've received successfully over the route, this will get set) if let Some(last_tested_ts) = self.last_tested_ts { - if cur_ts.saturating_sub(last_tested_ts) > (ROUTE_MIN_IDLE_TIME_MS as u64 * 1000u64) { + if cur_ts.saturating_sub(last_tested_ts) + > TimestampDuration::new(ROUTE_MIN_IDLE_TIME_MS as u64 * 1000u64) + { return true; } } else { @@ -210,9 +212,9 @@ pub struct RemotePrivateRouteInfo { // The private route itself private_route: Option, /// Did this remote private route see our node info due to no safety route in use - last_seen_our_node_info_ts: u64, + last_seen_our_node_info_ts: Timestamp, /// Last time this remote private route was requested for any reason (cache expiration) - last_touched_ts: u64, + last_touched_ts: Timestamp, /// Stats stats: RouteStats, } @@ -1630,7 +1632,7 @@ impl RouteSpecStore { .and_modify(|rpr| { if cur_ts.saturating_sub(rpr.last_touched_ts) >= REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY { // Start fresh if this had expired - rpr.last_seen_our_node_info_ts = 0; + rpr.last_seen_our_node_info_ts = Timestamp::new(0); rpr.last_touched_ts = cur_ts; rpr.stats = RouteStats::new(cur_ts); } else { @@ -1641,7 +1643,7 @@ impl RouteSpecStore { .or_insert_with(|| RemotePrivateRouteInfo { // New remote private route cache entry private_route: Some(private_route), - last_seen_our_node_info_ts: 0, + last_seen_our_node_info_ts: Timestamp::new(0), last_touched_ts: cur_ts, stats: RouteStats::new(cur_ts), }); diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index de74e230..0d83a6e2 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -262,7 +262,7 @@ impl RoutingTableInner { } /// Return our current node info timestamp - pub fn get_own_node_info_ts(&self, routing_domain: RoutingDomain) -> Option { + pub fn get_own_node_info_ts(&self, routing_domain: RoutingDomain) -> Option { self.with_routing_domain(routing_domain, |rdd| { if !rdd.common().has_valid_own_node_info() { None @@ -440,7 +440,7 @@ impl RoutingTableInner { pub fn with_entries) -> Option>( &self, - cur_ts: u64, + cur_ts: Timestamp, min_state: BucketEntryState, mut f: F, ) -> Option { @@ -812,7 +812,7 @@ impl RoutingTableInner { pub fn find_peers_with_sort_and_filter( &self, node_count: usize, - cur_ts: u64, + cur_ts: Timestamp, mut filters: VecDeque, mut compare: C, mut transform: T, diff --git a/veilid-core/src/routing_table/stats_accounting.rs b/veilid-core/src/routing_table/stats_accounting.rs index 2c711493..5f0c8960 100644 --- a/veilid-core/src/routing_table/stats_accounting.rs +++ b/veilid-core/src/routing_table/stats_accounting.rs @@ -56,12 +56,12 @@ impl TransferStatsAccounting { self.current_transfer = TransferCount::default(); - transfer_stats.down.maximum = 0; - transfer_stats.up.maximum = 0; - transfer_stats.down.minimum = u64::MAX; - transfer_stats.up.minimum = u64::MAX; - transfer_stats.down.average = 0; - transfer_stats.up.average = 0; + transfer_stats.down.maximum = 0.into(); + transfer_stats.up.maximum = 0.into(); + transfer_stats.down.minimum = u64::MAX.into(); + transfer_stats.up.minimum = u64::MAX.into(); + transfer_stats.down.average = 0.into(); + transfer_stats.up.average = 0.into(); for xfer in &self.rolling_transfers { let bpsd = xfer.down * 1000u64 / dur_ms; let bpsu = xfer.up * 1000u64 / dur_ms; @@ -97,9 +97,9 @@ impl LatencyStatsAccounting { self.rolling_latencies.push_back(latency); let mut ls = LatencyStats { - fastest: u64::MAX, - average: 0, - slowest: 0, + fastest: u64::MAX.into(), + average: 0.into(), + slowest: 0.into(), }; for rl in &self.rolling_latencies { ls.fastest.min_assign(*rl); diff --git a/veilid-core/src/routing_table/tasks/mod.rs b/veilid-core/src/routing_table/tasks/mod.rs index d2992626..1cc3e317 100644 --- a/veilid-core/src/routing_table/tasks/mod.rs +++ b/veilid-core/src/routing_table/tasks/mod.rs @@ -18,7 +18,7 @@ impl RoutingTable { .set_routine(move |s, l, t| { Box::pin( this.clone() - .rolling_transfers_task_routine(s, l, t) + .rolling_transfers_task_routine(s, Timestamp::new(l), Timestamp::new(t)) .instrument(trace_span!( parent: None, "RoutingTable rolling transfers task routine" @@ -35,7 +35,7 @@ impl RoutingTable { .set_routine(move |s, l, t| { Box::pin( this.clone() - .kick_buckets_task_routine(s, l, t) + .kick_buckets_task_routine(s, Timestamp::new(l), Timestamp::new(t)) .instrument(trace_span!(parent: None, "kick buckets task routine")), ) }); @@ -80,7 +80,7 @@ impl RoutingTable { .set_routine(move |s, l, t| { Box::pin( this.clone() - .ping_validator_task_routine(s, l, t) + .ping_validator_task_routine(s, Timestamp::new(l), Timestamp::new(t)) .instrument(trace_span!(parent: None, "ping validator task routine")), ) }); @@ -94,7 +94,7 @@ impl RoutingTable { .set_routine(move |s, l, t| { Box::pin( this.clone() - .relay_management_task_routine(s, l, t) + .relay_management_task_routine(s, Timestamp::new(l), Timestamp::new(t)) .instrument(trace_span!(parent: None, "relay management task routine")), ) }); @@ -108,7 +108,11 @@ impl RoutingTable { .set_routine(move |s, l, t| { Box::pin( this.clone() - .private_route_management_task_routine(s, l, t) + .private_route_management_task_routine( + s, + Timestamp::new(l), + Timestamp::new(t), + ) .instrument(trace_span!( parent: None, "private route management task routine" diff --git a/veilid-core/src/routing_table/tasks/ping_validator.rs b/veilid-core/src/routing_table/tasks/ping_validator.rs index 46f09949..f94f603c 100644 --- a/veilid-core/src/routing_table/tasks/ping_validator.rs +++ b/veilid-core/src/routing_table/tasks/ping_validator.rs @@ -91,7 +91,7 @@ impl RoutingTable { #[instrument(level = "trace", skip(self), err)] fn ping_validator_local_network( &self, - cur_ts: u64, + cur_ts: Timestamp, unord: &mut FuturesUnordered< SendPinBoxFuture>>, RPCError>>, >, @@ -122,8 +122,8 @@ impl RoutingTable { pub(crate) async fn ping_validator_task_routine( self, stop_token: StopToken, - _last_ts: u64, - cur_ts: u64, + _last_ts: Timestamp, + cur_ts: Timestamp, ) -> EyreResult<()> { let mut unord = FuturesUnordered::new(); diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_cancel_tunnel.rs b/veilid-core/src/rpc_processor/coders/operations/operation_cancel_tunnel.rs index d5ca9ad2..3484ab0c 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_cancel_tunnel.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_cancel_tunnel.rs @@ -9,7 +9,7 @@ impl RPCOperationCancelTunnelQ { pub fn decode( reader: &veilid_capnp::operation_cancel_tunnel_q::Reader, ) -> Result { - let id = reader.get_id(); + let id = TunnelId::new(reader.get_id()); Ok(RPCOperationCancelTunnelQ { id }) } @@ -17,7 +17,7 @@ impl RPCOperationCancelTunnelQ { &self, builder: &mut veilid_capnp::operation_cancel_tunnel_q::Builder, ) -> Result<(), RPCError> { - builder.set_id(self.id); + builder.set_id(self.id.as_u64()); Ok(()) } @@ -35,7 +35,7 @@ impl RPCOperationCancelTunnelA { ) -> Result { match reader.which().map_err(RPCError::protocol)? { veilid_capnp::operation_cancel_tunnel_a::Which::Tunnel(r) => { - Ok(RPCOperationCancelTunnelA::Tunnel(r)) + Ok(RPCOperationCancelTunnelA::Tunnel(TunnelId::new(r))) } veilid_capnp::operation_cancel_tunnel_a::Which::Error(r) => { let tunnel_error = decode_tunnel_error(r.map_err(RPCError::protocol)?); @@ -49,7 +49,7 @@ impl RPCOperationCancelTunnelA { ) -> Result<(), RPCError> { match self { RPCOperationCancelTunnelA::Tunnel(p) => { - builder.set_tunnel(*p); + builder.set_tunnel(p.as_u64()); } RPCOperationCancelTunnelA::Error(e) => { builder.set_error(encode_tunnel_error(*e)); diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_complete_tunnel.rs b/veilid-core/src/rpc_processor/coders/operations/operation_complete_tunnel.rs index 49dc90a8..e4737a2d 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_complete_tunnel.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_complete_tunnel.rs @@ -12,7 +12,7 @@ impl RPCOperationCompleteTunnelQ { pub fn decode( reader: &veilid_capnp::operation_complete_tunnel_q::Reader, ) -> Result { - let id = reader.get_id(); + let id = TunnelId::new(reader.get_id()); let local_mode = match reader.get_local_mode().map_err(RPCError::protocol)? { veilid_capnp::TunnelEndpointMode::Raw => TunnelMode::Raw, veilid_capnp::TunnelEndpointMode::Turn => TunnelMode::Turn, @@ -32,7 +32,7 @@ impl RPCOperationCompleteTunnelQ { &self, builder: &mut veilid_capnp::operation_complete_tunnel_q::Builder, ) -> Result<(), RPCError> { - builder.set_id(self.id); + builder.set_id(self.id.as_u64()); builder.set_local_mode(match self.local_mode { TunnelMode::Raw => veilid_capnp::TunnelEndpointMode::Raw, TunnelMode::Turn => veilid_capnp::TunnelEndpointMode::Turn, diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_start_tunnel.rs b/veilid-core/src/rpc_processor/coders/operations/operation_start_tunnel.rs index d58e625a..274b0af8 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_start_tunnel.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_start_tunnel.rs @@ -11,7 +11,7 @@ impl RPCOperationStartTunnelQ { pub fn decode( reader: &veilid_capnp::operation_start_tunnel_q::Reader, ) -> Result { - let id = reader.get_id(); + let id = TunnelId::new(reader.get_id()); let local_mode = match reader.get_local_mode().map_err(RPCError::protocol)? { veilid_capnp::TunnelEndpointMode::Raw => TunnelMode::Raw, veilid_capnp::TunnelEndpointMode::Turn => TunnelMode::Turn, @@ -28,7 +28,7 @@ impl RPCOperationStartTunnelQ { &self, builder: &mut veilid_capnp::operation_start_tunnel_q::Builder, ) -> Result<(), RPCError> { - builder.set_id(self.id); + builder.set_id(self.id.as_u64()); builder.set_local_mode(match self.local_mode { TunnelMode::Raw => veilid_capnp::TunnelEndpointMode::Raw, TunnelMode::Turn => veilid_capnp::TunnelEndpointMode::Turn, diff --git a/veilid-core/src/rpc_processor/coders/tunnel.rs b/veilid-core/src/rpc_processor/coders/tunnel.rs index 72d2470a..994cd0da 100644 --- a/veilid-core/src/rpc_processor/coders/tunnel.rs +++ b/veilid-core/src/rpc_processor/coders/tunnel.rs @@ -58,8 +58,8 @@ pub fn encode_full_tunnel( full_tunnel: &FullTunnel, builder: &mut veilid_capnp::full_tunnel::Builder, ) -> Result<(), RPCError> { - builder.set_id(full_tunnel.id); - builder.set_timeout(full_tunnel.timeout); + builder.set_id(full_tunnel.id.as_u64()); + builder.set_timeout(full_tunnel.timeout.as_u64()); let mut l_builder = builder.reborrow().init_local(); encode_tunnel_endpoint(&full_tunnel.local, &mut l_builder)?; let mut r_builder = builder.reborrow().init_remote(); @@ -70,8 +70,8 @@ pub fn encode_full_tunnel( pub fn decode_full_tunnel( reader: &veilid_capnp::full_tunnel::Reader, ) -> Result { - let id = reader.get_id(); - let timeout = reader.get_timeout(); + let id = TunnelId::new(reader.get_id()); + let timeout = TimestampDuration::new(reader.get_timeout()); let l_reader = reader.get_local().map_err(RPCError::protocol)?; let local = decode_tunnel_endpoint(&l_reader)?; let r_reader = reader.get_remote().map_err(RPCError::protocol)?; @@ -89,8 +89,8 @@ pub fn encode_partial_tunnel( partial_tunnel: &PartialTunnel, builder: &mut veilid_capnp::partial_tunnel::Builder, ) -> Result<(), RPCError> { - builder.set_id(partial_tunnel.id); - builder.set_timeout(partial_tunnel.timeout); + builder.set_id(partial_tunnel.id.as_u64()); + builder.set_timeout(partial_tunnel.timeout.as_u64()); let mut l_builder = builder.reborrow().init_local(); encode_tunnel_endpoint(&partial_tunnel.local, &mut l_builder)?; Ok(()) @@ -99,8 +99,8 @@ pub fn encode_partial_tunnel( pub fn decode_partial_tunnel( reader: &veilid_capnp::partial_tunnel::Reader, ) -> Result { - let id = reader.get_id(); - let timeout = reader.get_timeout(); + let id = TunnelId::new(reader.get_id()); + let timeout = TimestampDuration::new(reader.get_timeout()); let l_reader = reader.get_local().map_err(RPCError::protocol)?; let local = decode_tunnel_endpoint(&l_reader)?; diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 01ee69ae..6ae3d54d 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -254,7 +254,7 @@ impl RPCProcessor { // set up channel let mut concurrency = c.network.rpc.concurrency; let queue_size = c.network.rpc.queue_size; - let timeout = ms_to_us(c.network.rpc.timeout_ms); + let timeout_us = TimestampDuration::new(ms_to_us(c.network.rpc.timeout_ms)); let max_route_hop_count = c.network.rpc.max_route_hop_count as usize; if concurrency == 0 { concurrency = get_concurrency() / 2; @@ -265,7 +265,7 @@ impl RPCProcessor { let validate_dial_info_receipt_time_ms = c.network.dht.validate_dial_info_receipt_time_ms; RPCProcessorUnlockedInner { - timeout_us: timeout, + timeout_us, queue_size, concurrency, max_route_hop_count, @@ -879,8 +879,8 @@ impl RPCProcessor { let rss = self.routing_table.route_spec_store(); // Get latency for all local routes - let mut total_local_latency = 0u64; - let total_latency = recv_ts.saturating_sub(send_ts); + let mut total_local_latency = TimestampDuration::new(0u64); + let total_latency: TimestampDuration = recv_ts.saturating_sub(send_ts); // If safety route was used, record route there if let Some(sr_pubkey) = &safety_route { @@ -932,12 +932,12 @@ impl RPCProcessor { if let Some(sr_pubkey) = &safety_route { let rss = self.routing_table.route_spec_store(); rss.with_route_stats(send_ts, sr_pubkey, |s| { - s.record_latency(total_latency / 2); + s.record_latency(total_latency / 2u64); }); } if let Some(pr_pubkey) = &reply_private_route { rss.with_route_stats(send_ts, pr_pubkey, |s| { - s.record_latency(total_latency / 2); + s.record_latency(total_latency / 2u64); }); } } @@ -1365,7 +1365,7 @@ impl RPCProcessor { routing_domain, }), timestamp: get_aligned_timestamp(), - body_len: body.len() as u64, + body_len: ByteCount::new(body.len() as u64), }, data: RPCMessageData { contents: body }, }; diff --git a/veilid-core/src/veilid_api/aligned_u64.rs b/veilid-core/src/veilid_api/aligned_u64.rs index f805490d..31c161aa 100644 --- a/veilid-core/src/veilid_api/aligned_u64.rs +++ b/veilid-core/src/veilid_api/aligned_u64.rs @@ -89,6 +89,12 @@ impl> core::ops::Mul for AlignedU64 { } } +impl> core::ops::MulAssign for AlignedU64 { + fn mul_assign(&mut self, rhs: Rhs) { + self.0 *= rhs.into(); + } +} + impl> core::ops::Div for AlignedU64 { type Output = Self; @@ -97,6 +103,12 @@ impl> core::ops::Div for AlignedU64 { } } +impl> core::ops::DivAssign for AlignedU64 { + fn div_assign(&mut self, rhs: Rhs) { + self.0 /= rhs.into(); + } +} + impl AlignedU64 { pub const fn new(v: u64) -> Self { Self(v) diff --git a/veilid-core/src/veilid_api/types.rs b/veilid-core/src/veilid_api/types.rs index cc074b97..36e9c608 100644 --- a/veilid-core/src/veilid_api/types.rs +++ b/veilid-core/src/veilid_api/types.rs @@ -1961,7 +1961,7 @@ impl SignedRelayedNodeInfo { sig_bytes.append(&mut builder_to_vec(ri_msg).map_err(VeilidAPIError::internal)?); // Add timestamp to signature - sig_bytes.append(&mut timestamp.to_le_bytes().to_vec()); + sig_bytes.append(&mut timestamp.as_u64().to_le_bytes().to_vec()); Ok(sig_bytes) } diff --git a/veilid-server/src/client_api.rs b/veilid-server/src/client_api.rs index 86d3a645..fac14b9a 100644 --- a/veilid-server/src/client_api.rs +++ b/veilid-server/src/client_api.rs @@ -261,7 +261,7 @@ impl veilid_server::Server for VeilidServerImpl { ) -> Promise<(), ::capnp::Error> { trace!("VeilidServerImpl::app_call_reply"); - let id = pry!(params.get()).get_id(); + let id = OperationId::new(pry!(params.get()).get_id()); let message = pry!(pry!(params.get()).get_message()).to_owned(); let veilid_api = self.veilid_api.clone();