diff --git a/external/keyring-manager b/external/keyring-manager index b127b2d3..c153eb30 160000 --- a/external/keyring-manager +++ b/external/keyring-manager @@ -1 +1 @@ -Subproject commit b127b2d3c653fea163a776dd58b3798f28aeeee3 +Subproject commit c153eb3015d6d118e5d467865510d053ddd84533 diff --git a/veilid-core/src/attachment_manager.rs b/veilid-core/src/attachment_manager.rs index c47b9f52..751cde8f 100644 --- a/veilid-core/src/attachment_manager.rs +++ b/veilid-core/src/attachment_manager.rs @@ -103,7 +103,7 @@ impl TryFrom for AttachmentState { pub struct AttachmentManagerInner { attachment_machine: CallbackStateMachine, maintain_peers: bool, - attach_timestamp: Option, + attach_ts: Option, update_callback: Option, attachment_maintainer_jh: Option>, } @@ -142,7 +142,7 @@ impl AttachmentManager { AttachmentManagerInner { attachment_machine: CallbackStateMachine::new(), maintain_peers: false, - attach_timestamp: None, + attach_ts: None, update_callback: None, attachment_maintainer_jh: None, } @@ -183,8 +183,8 @@ impl AttachmentManager { matches!(s, AttachmentState::Detached) } - pub fn get_attach_timestamp(&self) -> Option { - self.inner.lock().attach_timestamp + pub fn get_attach_timestamp(&self) -> Option { + self.inner.lock().attach_ts } fn translate_routing_table_health( @@ -252,7 +252,7 @@ impl AttachmentManager { #[instrument(level = "debug", skip(self))] async fn attachment_maintainer(self) { debug!("attachment starting"); - self.inner.lock().attach_timestamp = Some(get_timestamp()); + self.inner.lock().attach_ts = Some(get_aligned_timestamp()); let netman = self.network_manager(); let mut restart; @@ -306,7 +306,7 @@ impl AttachmentManager { .consume(&AttachmentInput::AttachmentStopped) .await; debug!("attachment stopped"); - self.inner.lock().attach_timestamp = None; + self.inner.lock().attach_ts = None; } #[instrument(level = "debug", skip_all, err)] diff --git a/veilid-core/src/crypto/envelope.rs b/veilid-core/src/crypto/envelope.rs index efdec697..7d0b6734 100644 --- a/veilid-core/src/crypto/envelope.rs +++ b/veilid-core/src/crypto/envelope.rs @@ -44,7 +44,7 @@ pub struct Envelope { version: u8, min_version: u8, max_version: u8, - timestamp: u64, + timestamp: Timestamp, nonce: EnvelopeNonce, sender_id: DHTKey, recipient_id: DHTKey, @@ -53,7 +53,7 @@ pub struct Envelope { impl Envelope { pub fn new( version: u8, - timestamp: u64, + timestamp: Timestamp, nonce: EnvelopeNonce, sender_id: DHTKey, recipient_id: DHTKey, @@ -128,11 +128,12 @@ impl Envelope { } // Get the timestamp - let timestamp: u64 = u64::from_le_bytes( + let timestamp: Timestamp = u64::from_le_bytes( data[0x0A..0x12] .try_into() .map_err(VeilidAPIError::internal)?, - ); + ) + .into(); // Get nonce and sender node id let nonce: EnvelopeNonce = data[0x12..0x2A] @@ -217,7 +218,7 @@ impl Envelope { // Write size data[0x08..0x0A].copy_from_slice(&(envelope_size as u16).to_le_bytes()); // Write timestamp - data[0x0A..0x12].copy_from_slice(&self.timestamp.to_le_bytes()); + data[0x0A..0x12].copy_from_slice(&self.timestamp.as_u64().to_le_bytes()); // Write nonce data[0x12..0x2A].copy_from_slice(&self.nonce); // Write sender node id @@ -260,7 +261,7 @@ impl Envelope { } } - pub fn get_timestamp(&self) -> u64 { + pub fn get_timestamp(&self) -> Timestamp { self.timestamp } diff --git a/veilid-core/src/crypto/tests/test_envelope_receipt.rs b/veilid-core/src/crypto/tests/test_envelope_receipt.rs index 723eeaff..a5b2e45e 100644 --- a/veilid-core/src/crypto/tests/test_envelope_receipt.rs +++ b/veilid-core/src/crypto/tests/test_envelope_receipt.rs @@ -12,7 +12,7 @@ pub async fn test_envelope_round_trip() { let crypto = api.crypto().unwrap(); // Create envelope - let ts = 0x12345678ABCDEF69u64; + let ts = Timestamp::from(0x12345678ABCDEF69u64); let nonce = Crypto::get_random_nonce(); let (sender_id, sender_secret) = generate_secret(); let (recipient_id, recipient_secret) = generate_secret(); diff --git a/veilid-core/src/network_manager/connection_handle.rs b/veilid-core/src/network_manager/connection_handle.rs index 9eeb1b63..3dc0adb5 100644 --- a/veilid-core/src/network_manager/connection_handle.rs +++ b/veilid-core/src/network_manager/connection_handle.rs @@ -2,7 +2,7 @@ use super::*; #[derive(Clone, Debug)] pub struct ConnectionHandle { - id: u64, + id: NetworkConnectionId, descriptor: ConnectionDescriptor, channel: flume::Sender<(Option, Vec)>, } @@ -15,7 +15,7 @@ pub enum ConnectionHandleSendResult { impl ConnectionHandle { pub(super) fn new( - id: u64, + id: NetworkConnectionId, descriptor: ConnectionDescriptor, channel: flume::Sender<(Option, Vec)>, ) -> Self { @@ -26,7 +26,7 @@ impl ConnectionHandle { } } - pub fn connection_id(&self) -> u64 { + pub fn connection_id(&self) -> NetworkConnectionId { self.id } diff --git a/veilid-core/src/network_manager/connection_limits.rs b/veilid-core/src/network_manager/connection_limits.rs index f25f1654..d62cdc01 100644 --- a/veilid-core/src/network_manager/connection_limits.rs +++ b/veilid-core/src/network_manager/connection_limits.rs @@ -41,7 +41,7 @@ impl ConnectionLimits { } } - fn purge_old_timestamps(&mut self, cur_ts: u64) { + fn purge_old_timestamps(&mut self, cur_ts: Timestamp) { // v4 { let mut dead_keys = Vec::::new(); @@ -78,7 +78,7 @@ impl ConnectionLimits { pub fn add(&mut self, addr: IpAddr) -> Result<(), AddressFilterError> { let ipblock = ip_to_ipblock(self.max_connections_per_ip6_prefix_size, addr); - let ts = get_timestamp(); + let ts = get_aligned_timestamp(); self.purge_old_timestamps(ts); @@ -134,7 +134,7 @@ impl ConnectionLimits { pub fn remove(&mut self, addr: IpAddr) -> Result<(), AddressNotInTableError> { let ipblock = ip_to_ipblock(self.max_connections_per_ip6_prefix_size, addr); - let ts = get_timestamp(); + let ts = get_aligned_timestamp(); self.purge_old_timestamps(ts); match ipblock { diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index b8c62e84..342925c9 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -48,9 +48,9 @@ impl ConnectionManager { async_processor_jh: MustJoinHandle<()>, ) -> ConnectionManagerInner { ConnectionManagerInner { - next_id: 0, + next_id: 0.into(), stop_source: Some(stop_source), - sender: sender, + sender, async_processor_jh: Some(async_processor_jh), } } @@ -149,7 +149,7 @@ impl ConnectionManager { ) -> EyreResult> { // Get next connection id to use let id = inner.next_id; - inner.next_id += 1; + inner.next_id += 1u64; log_net!( "on_new_protocol_network_connection: id={} prot_conn={:?}", id, @@ -398,7 +398,7 @@ impl ConnectionManager { // Callback from network connection receive loop when it exits // cleans up the entry in the connection table #[instrument(level = "trace", skip(self))] - pub(super) async fn report_connection_finished(&self, connection_id: u64) { + pub(super) async fn report_connection_finished(&self, connection_id: NetworkConnectionId) { // Get channel sender let sender = { let mut inner = self.arc.inner.lock(); diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 60d26957..e5b27aeb 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -67,7 +67,7 @@ struct NetworkComponents { // Statistics per address #[derive(Clone, Default)] pub struct PerAddressStats { - last_seen_ts: u64, + last_seen_ts: Timestamp, transfer_stats_accounting: TransferStatsAccounting, transfer_stats: TransferStatsDownUp, } @@ -99,7 +99,7 @@ impl Default for NetworkManagerStats { #[derive(Debug)] struct ClientWhitelistEntry { - last_seen_ts: u64, + last_seen_ts: Timestamp, } #[derive(Copy, Clone, Debug)] @@ -400,11 +400,11 @@ impl NetworkManager { let mut inner = self.inner.lock(); match inner.client_whitelist.entry(client) { hashlink::lru_cache::Entry::Occupied(mut entry) => { - entry.get_mut().last_seen_ts = get_timestamp() + entry.get_mut().last_seen_ts = get_aligned_timestamp() } hashlink::lru_cache::Entry::Vacant(entry) => { entry.insert(ClientWhitelistEntry { - last_seen_ts: get_timestamp(), + last_seen_ts: get_aligned_timestamp(), }); } } @@ -416,7 +416,7 @@ impl NetworkManager { match inner.client_whitelist.entry(client) { hashlink::lru_cache::Entry::Occupied(mut entry) => { - entry.get_mut().last_seen_ts = get_timestamp(); + entry.get_mut().last_seen_ts = get_aligned_timestamp(); true } hashlink::lru_cache::Entry::Vacant(_) => false, @@ -426,7 +426,7 @@ impl NetworkManager { pub fn purge_client_whitelist(&self) { let timeout_ms = self.with_config(|c| c.network.client_whitelist_timeout_ms); let mut inner = self.inner.lock(); - let cutoff_timestamp = get_timestamp() - ((timeout_ms as u64) * 1000u64); + let cutoff_timestamp = get_aligned_timestamp() - ((timeout_ms as u64) * 1000u64); // Remove clients from the whitelist that haven't been since since our whitelist timeout while inner .client_whitelist @@ -526,7 +526,7 @@ impl NetworkManager { .wrap_err("failed to generate signed receipt")?; // Record the receipt for later - let exp_ts = get_timestamp() + expiration_us; + let exp_ts = get_aligned_timestamp() + expiration_us; receipt_manager.record_receipt(receipt, exp_ts, expected_returns, callback); Ok(out) @@ -550,7 +550,7 @@ impl NetworkManager { .wrap_err("failed to generate signed receipt")?; // Record the receipt for later - let exp_ts = get_timestamp() + expiration_us; + let exp_ts = get_aligned_timestamp() + expiration_us; let eventual = SingleShotEventual::new(Some(ReceiptEvent::Cancelled)); let instance = eventual.instance(); receipt_manager.record_single_shot_receipt(receipt, exp_ts, eventual); @@ -717,7 +717,7 @@ impl NetworkManager { // XXX: do we need a delay here? or another hole punch packet? // Set the hole punch as our 'last connection' to ensure we return the receipt over the direct hole punch - peer_nr.set_last_connection(connection_descriptor, get_timestamp()); + peer_nr.set_last_connection(connection_descriptor, get_aligned_timestamp()); // Return the receipt using the same dial info send the receipt to it rpc.rpc_call_return_receipt(Destination::direct(peer_nr), receipt) @@ -741,7 +741,7 @@ impl NetworkManager { let node_id_secret = routing_table.node_id_secret(); // Get timestamp, nonce - let ts = get_timestamp(); + let ts = get_aligned_timestamp(); let nonce = Crypto::get_random_nonce(); // Encode envelope @@ -1136,7 +1136,7 @@ impl NetworkManager { // ); // Update timestamp for this last connection since we just sent to it - node_ref.set_last_connection(connection_descriptor, get_timestamp()); + node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp()); return Ok(NetworkResult::value(SendDataKind::Existing( connection_descriptor, @@ -1168,7 +1168,7 @@ impl NetworkManager { this.net().send_data_to_dial_info(dial_info, data).await? ); // If we connected to this node directly, save off the last connection so we can use it again - node_ref.set_last_connection(connection_descriptor, get_timestamp()); + node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp()); Ok(NetworkResult::value(SendDataKind::Direct( connection_descriptor, @@ -1337,28 +1337,28 @@ impl NetworkManager { // Get timestamp range let (tsbehind, tsahead) = self.with_config(|c| { ( - c.network.rpc.max_timestamp_behind_ms.map(ms_to_us), - c.network.rpc.max_timestamp_ahead_ms.map(ms_to_us), + c.network.rpc.max_timestamp_behind_ms.map(ms_to_us).map(TimestampDuration::new), + c.network.rpc.max_timestamp_ahead_ms.map(ms_to_us).map(TimestampDuration::new), ) }); // Validate timestamp isn't too old - let ts = get_timestamp(); + let ts = get_aligned_timestamp(); let ets = envelope.get_timestamp(); if let Some(tsbehind) = tsbehind { - if tsbehind > 0 && (ts > ets && ts.saturating_sub(ets) > tsbehind) { + if tsbehind.as_u64() != 0 && (ts > ets && ts.saturating_sub(ets) > tsbehind) { log_net!(debug "envelope time was too far in the past: {}ms ", - timestamp_to_secs(ts.saturating_sub(ets)) * 1000f64 + timestamp_to_secs(ts.saturating_sub(ets).as_u64()) * 1000f64 ); return Ok(false); } } if let Some(tsahead) = tsahead { - if tsahead > 0 && (ts < ets && ets.saturating_sub(ts) > tsahead) { + if tsahead.as_u64() != 0 && (ts < ets && ets.saturating_sub(ts) > tsahead) { log_net!(debug "envelope time was too far in the future: {}ms", - timestamp_to_secs(ets.saturating_sub(ts)) * 1000f64 + timestamp_to_secs(ets.saturating_sub(ts).as_u64()) * 1000f64 ); return Ok(false); } @@ -1497,8 +1497,8 @@ impl NetworkManager { if !has_state { return VeilidStateNetwork { started: false, - bps_down: 0, - bps_up: 0, + bps_down: 0.into(), + bps_up: 0.into(), peers: Vec::new(), }; } @@ -1650,7 +1650,7 @@ impl NetworkManager { // public dialinfo let inconsistent = if inconsistencies.len() >= PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT { - let exp_ts = get_timestamp() + PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US; + let exp_ts = get_aligned_timestamp() + PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US; for i in &inconsistencies { pait.insert(*i, exp_ts); } @@ -1664,7 +1664,7 @@ impl NetworkManager { .entry(key) .or_insert_with(|| HashMap::new()); let exp_ts = - get_timestamp() + PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US; + get_aligned_timestamp() + PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US; for i in inconsistencies { pait.insert(i, exp_ts); } diff --git a/veilid-core/src/network_manager/native/igd_manager.rs b/veilid-core/src/network_manager/native/igd_manager.rs index 3765a4ae..ecf60618 100644 --- a/veilid-core/src/network_manager/native/igd_manager.rs +++ b/veilid-core/src/network_manager/native/igd_manager.rs @@ -6,7 +6,7 @@ use std::net::UdpSocket; const UPNP_GATEWAY_DETECT_TIMEOUT_MS: u32 = 5_000; const UPNP_MAPPING_LIFETIME_MS: u32 = 120_000; const UPNP_MAPPING_ATTEMPTS: u32 = 3; -const UPNP_MAPPING_LIFETIME_US:u64 = (UPNP_MAPPING_LIFETIME_MS as u64) * 1000u64; +const UPNP_MAPPING_LIFETIME_US:TimestampDuration = TimestampDuration::new(UPNP_MAPPING_LIFETIME_MS as u64 * 1000u64); #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] struct PortMapKey { @@ -19,8 +19,8 @@ struct PortMapKey { struct PortMapValue { ext_ip: IpAddr, mapped_port: u16, - timestamp: u64, - renewal_lifetime: u64, + timestamp: Timestamp, + renewal_lifetime: TimestampDuration, renewal_attempts: u32, } @@ -276,7 +276,7 @@ impl IGDManager { }; // Add to mapping list to keep alive - let timestamp = get_timestamp(); + let timestamp = get_aligned_timestamp(); inner.port_maps.insert(PortMapKey { llpt, at, @@ -285,7 +285,7 @@ impl IGDManager { ext_ip, mapped_port, timestamp, - renewal_lifetime: (UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64, + renewal_lifetime: ((UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64).into(), renewal_attempts: 0, }); @@ -302,7 +302,7 @@ impl IGDManager { let mut renews: Vec<(PortMapKey, PortMapValue)> = Vec::new(); { let inner = self.inner.lock(); - let now = get_timestamp(); + let now = get_aligned_timestamp(); for (k, v) in &inner.port_maps { let mapping_lifetime = now.saturating_sub(v.timestamp); @@ -357,8 +357,8 @@ impl IGDManager { inner.port_maps.insert(k, PortMapValue { ext_ip: v.ext_ip, mapped_port, - timestamp: get_timestamp(), - renewal_lifetime: (UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64, + timestamp: get_aligned_timestamp(), + renewal_lifetime: TimestampDuration::new((UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64), renewal_attempts: 0, }); }, @@ -398,8 +398,8 @@ impl IGDManager { inner.port_maps.insert(k, PortMapValue { ext_ip: v.ext_ip, mapped_port: v.mapped_port, - timestamp: get_timestamp(), - renewal_lifetime: (UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64, + timestamp: get_aligned_timestamp(), + renewal_lifetime: ((UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64).into(), renewal_attempts: 0, }); }, @@ -407,7 +407,7 @@ impl IGDManager { log_net!(debug "failed to renew mapped port {:?} -> {:?}: {}", v, k, e); // Get closer to the maximum renewal timeline by a factor of two each time - v.renewal_lifetime = (v.renewal_lifetime + UPNP_MAPPING_LIFETIME_US) / 2; + v.renewal_lifetime = (v.renewal_lifetime + UPNP_MAPPING_LIFETIME_US) / 2u64; v.renewal_attempts += 1; // Store new value to try again diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index 3b573835..6c832c42 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -78,19 +78,19 @@ enum RecvLoopAction { #[derive(Debug, Clone)] pub struct NetworkConnectionStats { - last_message_sent_time: Option, - last_message_recv_time: Option, + last_message_sent_time: Option, + last_message_recv_time: Option, } -pub type NetworkConnectionId = u64; +pub type NetworkConnectionId = AlignedU64; #[derive(Debug)] pub struct NetworkConnection { connection_id: NetworkConnectionId, descriptor: ConnectionDescriptor, processor: Option>, - established_time: u64, + established_time: Timestamp, stats: Arc>, sender: flume::Sender<(Option, Vec)>, stop_source: Option, @@ -105,7 +105,7 @@ impl NetworkConnection { connection_id: id, descriptor, processor: None, - established_time: get_timestamp(), + established_time: get_aligned_timestamp(), stats: Arc::new(Mutex::new(NetworkConnectionStats { last_message_sent_time: None, last_message_recv_time: None, @@ -153,7 +153,7 @@ impl NetworkConnection { connection_id, descriptor, processor: Some(processor), - established_time: get_timestamp(), + established_time: get_aligned_timestamp(), stats, sender, stop_source: Some(stop_source), @@ -185,7 +185,7 @@ impl NetworkConnection { stats: Arc>, message: Vec, ) -> io::Result> { - let ts = get_timestamp(); + let ts = get_aligned_timestamp(); let out = network_result_try!(protocol_connection.send(message).await?); let mut stats = stats.lock(); @@ -199,7 +199,7 @@ impl NetworkConnection { protocol_connection: &ProtocolNetworkConnection, stats: Arc>, ) -> io::Result>> { - let ts = get_timestamp(); + let ts = get_aligned_timestamp(); let out = network_result_try!(protocol_connection.recv().await?); let mut stats = stats.lock(); @@ -217,7 +217,7 @@ impl NetworkConnection { } #[allow(dead_code)] - pub fn established_time(&self) -> u64 { + pub fn established_time(&self) -> Timestamp { self.established_time } diff --git a/veilid-core/src/network_manager/tasks/public_address_check.rs b/veilid-core/src/network_manager/tasks/public_address_check.rs index 2f7ccc46..91507e37 100644 --- a/veilid-core/src/network_manager/tasks/public_address_check.rs +++ b/veilid-core/src/network_manager/tasks/public_address_check.rs @@ -6,8 +6,8 @@ impl NetworkManager { pub(crate) async fn public_address_check_task_routine( self, stop_token: StopToken, - _last_ts: u64, - cur_ts: u64, + _last_ts: Timestamp, + cur_ts: Timestamp, ) -> EyreResult<()> { // go through public_address_inconsistencies_table and time out things that have expired let mut inner = self.inner.lock(); diff --git a/veilid-core/src/network_manager/tasks/rolling_transfers.rs b/veilid-core/src/network_manager/tasks/rolling_transfers.rs index 0e924024..219790ec 100644 --- a/veilid-core/src/network_manager/tasks/rolling_transfers.rs +++ b/veilid-core/src/network_manager/tasks/rolling_transfers.rs @@ -6,8 +6,8 @@ impl NetworkManager { pub(crate) async fn rolling_transfers_task_routine( self, _stop_token: StopToken, - last_ts: u64, - cur_ts: u64, + last_ts: Timestamp, + cur_ts: Timestamp, ) -> EyreResult<()> { // log_net!("--- network manager rolling_transfers task"); { diff --git a/veilid-core/src/network_manager/tests/test_connection_table.rs b/veilid-core/src/network_manager/tests/test_connection_table.rs index 6f720107..8a48c79d 100644 --- a/veilid-core/src/network_manager/tests/test_connection_table.rs +++ b/veilid-core/src/network_manager/tests/test_connection_table.rs @@ -50,13 +50,13 @@ pub async fn test_add_get_remove() { ))), ); - let c1 = NetworkConnection::dummy(1, a1); - let c1b = NetworkConnection::dummy(10, a1); + let c1 = NetworkConnection::dummy(1.into(), a1); + let c1b = NetworkConnection::dummy(10.into(), a1); let c1h = c1.get_handle(); - let c2 = NetworkConnection::dummy(2, a2); - let c3 = NetworkConnection::dummy(3, a3); - let c4 = NetworkConnection::dummy(4, a4); - let c5 = NetworkConnection::dummy(5, a5); + let c2 = NetworkConnection::dummy(2.into(), a2); + let c3 = NetworkConnection::dummy(3.into(), a3); + let c4 = NetworkConnection::dummy(4.into(), a4); + let c5 = NetworkConnection::dummy(5.into(), a5); assert_eq!(a1, c2.connection_descriptor()); assert_ne!(a3, c4.connection_descriptor()); @@ -68,8 +68,8 @@ pub async fn test_add_get_remove() { assert!(table.add_connection(c1b).is_err()); assert_eq!(table.connection_count(), 1); - assert!(table.remove_connection_by_id(4).is_none()); - assert!(table.remove_connection_by_id(5).is_none()); + assert!(table.remove_connection_by_id(4.into()).is_none()); + assert!(table.remove_connection_by_id(5.into()).is_none()); assert_eq!(table.connection_count(), 1); assert_eq!(table.get_connection_by_descriptor(a1), Some(c1h.clone())); assert_eq!(table.get_connection_by_descriptor(a1), Some(c1h.clone())); @@ -81,41 +81,41 @@ pub async fn test_add_get_remove() { assert_eq!(table.connection_count(), 1); assert_eq!( table - .remove_connection_by_id(1) + .remove_connection_by_id(1.into()) .map(|c| c.connection_descriptor()) .unwrap(), a1 ); assert_eq!(table.connection_count(), 0); - assert!(table.remove_connection_by_id(2).is_none()); + assert!(table.remove_connection_by_id(2.into()).is_none()); assert_eq!(table.connection_count(), 0); assert_eq!(table.get_connection_by_descriptor(a2), None); assert_eq!(table.get_connection_by_descriptor(a1), None); assert_eq!(table.connection_count(), 0); - let c1 = NetworkConnection::dummy(6, a1); + let c1 = NetworkConnection::dummy(6.into(), a1); table.add_connection(c1).unwrap(); - let c2 = NetworkConnection::dummy(7, a2); + let c2 = NetworkConnection::dummy(7.into(), a2); assert_err!(table.add_connection(c2)); table.add_connection(c3).unwrap(); table.add_connection(c4).unwrap(); assert_eq!(table.connection_count(), 3); assert_eq!( table - .remove_connection_by_id(6) + .remove_connection_by_id(6.into()) .map(|c| c.connection_descriptor()) .unwrap(), a2 ); assert_eq!( table - .remove_connection_by_id(3) + .remove_connection_by_id(3.into()) .map(|c| c.connection_descriptor()) .unwrap(), a3 ); assert_eq!( table - .remove_connection_by_id(4) + .remove_connection_by_id(4.into()) .map(|c| c.connection_descriptor()) .unwrap(), a4 diff --git a/veilid-core/src/receipt_manager.rs b/veilid-core/src/receipt_manager.rs index be548ff1..17fcf000 100644 --- a/veilid-core/src/receipt_manager.rs +++ b/veilid-core/src/receipt_manager.rs @@ -70,7 +70,7 @@ impl fmt::Debug for ReceiptRecordCallbackType { } pub struct ReceiptRecord { - expiration_ts: u64, + expiration_ts: Timestamp, receipt: Receipt, expected_returns: u32, returns_so_far: u32, @@ -92,7 +92,7 @@ impl fmt::Debug for ReceiptRecord { impl ReceiptRecord { pub fn new( receipt: Receipt, - expiration_ts: u64, + expiration_ts: Timestamp, expected_returns: u32, receipt_callback: impl ReceiptCallback, ) -> Self { @@ -107,7 +107,7 @@ impl ReceiptRecord { pub fn new_single_shot( receipt: Receipt, - expiration_ts: u64, + expiration_ts: Timestamp, eventual: ReceiptSingleShotType, ) -> Self { Self { @@ -123,7 +123,7 @@ impl ReceiptRecord { /* XXX: may be useful for O(1) timestamp expiration #[derive(Clone, Debug)] struct ReceiptRecordTimestampSort { - expiration_ts: u64, + expiration_ts: Timestamp, record: Arc>, } @@ -150,7 +150,7 @@ impl PartialOrd for ReceiptRecordTimestampSort { pub struct ReceiptManagerInner { network_manager: NetworkManager, records_by_nonce: BTreeMap>>, - next_oldest_ts: Option, + next_oldest_ts: Option, stop_source: Option, timeout_task: MustJoinSingleFuture<()>, } @@ -219,9 +219,9 @@ impl ReceiptManager { } #[instrument(level = "trace", skip(self))] - pub async fn timeout_task_routine(self, now: u64, stop_token: StopToken) { + pub async fn timeout_task_routine(self, now: Timestamp, stop_token: StopToken) { // Go through all receipts and build a list of expired nonces - let mut new_next_oldest_ts: Option = None; + let mut new_next_oldest_ts: Option = None; let mut expired_records = Vec::new(); { let mut inner = self.inner.lock(); @@ -280,7 +280,7 @@ impl ReceiptManager { }; (inner.next_oldest_ts, inner.timeout_task.clone(), stop_token) }; - let now = get_timestamp(); + let now = get_aligned_timestamp(); // If we have at least one timestamp to expire, lets do it if let Some(next_oldest_ts) = next_oldest_ts { if now >= next_oldest_ts { @@ -318,7 +318,7 @@ impl ReceiptManager { pub fn record_receipt( &self, receipt: Receipt, - expiration: u64, + expiration: Timestamp, expected_returns: u32, callback: impl ReceiptCallback, ) { @@ -339,7 +339,7 @@ impl ReceiptManager { pub fn record_single_shot_receipt( &self, receipt: Receipt, - expiration: u64, + expiration: Timestamp, eventual: ReceiptSingleShotType, ) { let receipt_nonce = receipt.get_nonce(); @@ -356,7 +356,7 @@ impl ReceiptManager { fn update_next_oldest_timestamp(inner: &mut ReceiptManagerInner) { // Update the next oldest timestamp - let mut new_next_oldest_ts: Option = None; + let mut new_next_oldest_ts: Option = None; for v in inner.records_by_nonce.values() { let receipt_inner = v.lock(); if new_next_oldest_ts.is_none() diff --git a/veilid-core/src/routing_table/bucket.rs b/veilid-core/src/routing_table/bucket.rs index d1a93d2e..ce0794bf 100644 --- a/veilid-core/src/routing_table/bucket.rs +++ b/veilid-core/src/routing_table/bucket.rs @@ -120,7 +120,7 @@ impl Bucket { .iter() .map(|(k, v)| (k.clone(), v.clone())) .collect(); - let cur_ts = get_timestamp(); + let cur_ts = get_aligned_timestamp(); sorted_entries.sort_by(|a, b| -> core::cmp::Ordering { if a.0 == b.0 { return core::cmp::Ordering::Equal; diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index c64cb7ee..ad8b1320 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -51,7 +51,7 @@ pub struct BucketEntryPublicInternet { /// The PublicInternet node info signed_node_info: Option>, /// The last node info timestamp of ours that this entry has seen - last_seen_our_node_info_ts: u64, + last_seen_our_node_info_ts: Timestamp, /// Last known node status node_status: Option, } @@ -63,7 +63,7 @@ pub struct BucketEntryLocalNetwork { /// The LocalNetwork node info signed_node_info: Option>, /// The last node info timestamp of ours that this entry has seen - last_seen_our_node_info_ts: u64, + last_seen_our_node_info_ts: Timestamp, /// Last known node status node_status: Option, } @@ -93,7 +93,7 @@ pub struct BucketEntryInner { updated_since_last_network_change: bool, /// The last connection descriptors used to contact this node, per protocol type #[with(Skip)] - last_connections: BTreeMap, + last_connections: BTreeMap, /// The node info for this entry on the publicinternet routing domain public_internet: BucketEntryPublicInternet, /// The node info for this entry on the localnetwork routing domain @@ -148,7 +148,7 @@ impl BucketEntryInner { } // Less is more reliable then faster - pub fn cmp_fastest_reliable(cur_ts: u64, e1: &Self, e2: &Self) -> std::cmp::Ordering { + pub fn cmp_fastest_reliable(cur_ts: Timestamp, e1: &Self, e2: &Self) -> std::cmp::Ordering { // Reverse compare so most reliable is at front let ret = e2.state(cur_ts).cmp(&e1.state(cur_ts)); if ret != std::cmp::Ordering::Equal { @@ -170,7 +170,7 @@ impl BucketEntryInner { } // Less is more reliable then older - pub fn cmp_oldest_reliable(cur_ts: u64, e1: &Self, e2: &Self) -> std::cmp::Ordering { + pub fn cmp_oldest_reliable(cur_ts: Timestamp, e1: &Self, e2: &Self) -> std::cmp::Ordering { // Reverse compare so most reliable is at front let ret = e2.state(cur_ts).cmp(&e1.state(cur_ts)); if ret != std::cmp::Ordering::Equal { @@ -191,7 +191,7 @@ impl BucketEntryInner { } } - pub fn sort_fastest_reliable_fn(cur_ts: u64) -> impl FnMut(&Self, &Self) -> std::cmp::Ordering { + pub fn sort_fastest_reliable_fn(cur_ts: Timestamp) -> impl FnMut(&Self, &Self) -> std::cmp::Ordering { move |e1, e2| Self::cmp_fastest_reliable(cur_ts, e1, e2) } @@ -231,7 +231,7 @@ impl BucketEntryInner { // No need to update the signednodeinfo though since the timestamp is the same // Touch the node and let it try to live again self.updated_since_last_network_change = true; - self.touch_last_seen(get_timestamp()); + self.touch_last_seen(get_aligned_timestamp()); } return; } @@ -258,7 +258,7 @@ impl BucketEntryInner { // Update the signed node info *opt_current_sni = Some(Box::new(signed_node_info)); self.updated_since_last_network_change = true; - self.touch_last_seen(get_timestamp()); + self.touch_last_seen(get_aligned_timestamp()); } pub fn has_node_info(&self, routing_domain_set: RoutingDomainSet) -> bool { @@ -367,7 +367,7 @@ impl BucketEntryInner { } // Stores a connection descriptor in this entry's table of last connections - pub fn set_last_connection(&mut self, last_connection: ConnectionDescriptor, timestamp: u64) { + pub fn set_last_connection(&mut self, last_connection: ConnectionDescriptor, timestamp: Timestamp) { let key = self.descriptor_to_key(last_connection); self.last_connections .insert(key, (last_connection, timestamp)); @@ -431,7 +431,7 @@ impl BucketEntryInner { } else { // If this is not connection oriented, then we check our last seen time // to see if this mapping has expired (beyond our timeout) - let cur_ts = get_timestamp(); + let cur_ts = get_aligned_timestamp(); (v.1 + (CONNECTIONLESS_TIMEOUT_SECS as u64 * 1_000_000u64)) >= cur_ts }; @@ -455,7 +455,7 @@ impl BucketEntryInner { self.min_max_version } - pub fn state(&self, cur_ts: u64) -> BucketEntryState { + pub fn state(&self, cur_ts: Timestamp) -> BucketEntryState { if self.check_reliable(cur_ts) { BucketEntryState::Reliable } else if self.check_dead(cur_ts) { @@ -494,7 +494,7 @@ impl BucketEntryInner { } } - pub fn set_our_node_info_ts(&mut self, routing_domain: RoutingDomain, seen_ts: u64) { + pub fn set_our_node_info_ts(&mut self, routing_domain: RoutingDomain, seen_ts: Timestamp) { match routing_domain { RoutingDomain::LocalNetwork => { self.local_network.last_seen_our_node_info_ts = seen_ts; @@ -508,7 +508,7 @@ impl BucketEntryInner { pub fn has_seen_our_node_info_ts( &self, routing_domain: RoutingDomain, - our_node_info_ts: u64, + our_node_info_ts: Timestamp, ) -> bool { match routing_domain { RoutingDomain::LocalNetwork => { @@ -530,7 +530,7 @@ impl BucketEntryInner { ///// stats methods // called every ROLLING_TRANSFERS_INTERVAL_SECS seconds - pub(super) fn roll_transfers(&mut self, last_ts: u64, cur_ts: u64) { + pub(super) fn roll_transfers(&mut self, last_ts: Timestamp, cur_ts: Timestamp) { self.transfer_stats_accounting.roll_transfers( last_ts, cur_ts, @@ -539,12 +539,12 @@ impl BucketEntryInner { } // Called for every round trip packet we receive - fn record_latency(&mut self, latency: u64) { + fn record_latency(&mut self, latency: TimestampDuration) { self.peer_stats.latency = Some(self.latency_stats_accounting.record_latency(latency)); } ///// state machine handling - pub(super) fn check_reliable(&self, cur_ts: u64) -> bool { + pub(super) fn check_reliable(&self, cur_ts: Timestamp) -> bool { // If we have had any failures to send, this is not reliable if self.peer_stats.rpc_stats.failed_to_send > 0 { return false; @@ -558,7 +558,7 @@ impl BucketEntryInner { } } } - pub(super) fn check_dead(&self, cur_ts: u64) -> bool { + pub(super) fn check_dead(&self, cur_ts: Timestamp) -> bool { // If we have failured to send NEVER_REACHED_PING_COUNT times in a row, the node is dead if self.peer_stats.rpc_stats.failed_to_send >= NEVER_REACHED_PING_COUNT { return true; @@ -575,14 +575,14 @@ impl BucketEntryInner { } /// Return the last time we either saw a node, or asked it a question - fn latest_contact_time(&self) -> Option { + fn latest_contact_time(&self) -> Option { self.peer_stats .rpc_stats .last_seen_ts - .max(self.peer_stats.rpc_stats.last_question) + .max(self.peer_stats.rpc_stats.last_question_ts) } - fn needs_constant_ping(&self, cur_ts: u64, interval: u64) -> bool { + fn needs_constant_ping(&self, cur_ts: Timestamp, interval: Timestamp) -> bool { // If we have not either seen the node in the last 'interval' then we should ping it let latest_contact_time = self.latest_contact_time(); @@ -596,7 +596,7 @@ impl BucketEntryInner { } // Check if this node needs a ping right now to validate it is still reachable - pub(super) fn needs_ping(&self, cur_ts: u64, needs_keepalive: bool) -> bool { + pub(super) fn needs_ping(&self, cur_ts: Timestamp, needs_keepalive: bool) -> bool { // See which ping pattern we are to use let state = self.state(cur_ts); @@ -653,7 +653,7 @@ impl BucketEntryInner { } } - pub(super) fn touch_last_seen(&mut self, ts: u64) { + pub(super) fn touch_last_seen(&mut self, ts: Timestamp) { // Mark the node as seen if self .peer_stats @@ -667,7 +667,7 @@ impl BucketEntryInner { self.peer_stats.rpc_stats.last_seen_ts = Some(ts); } - pub(super) fn _state_debug_info(&self, cur_ts: u64) -> String { + pub(super) fn _state_debug_info(&self, cur_ts: Timestamp) -> String { let first_consecutive_seen_ts = if let Some(first_consecutive_seen_ts) = self.peer_stats.rpc_stats.first_consecutive_seen_ts { @@ -698,26 +698,26 @@ impl BucketEntryInner { //////////////////////////////////////////////////////////////// /// Called when rpc processor things happen - pub(super) fn question_sent(&mut self, ts: u64, bytes: u64, expects_answer: bool) { + pub(super) fn question_sent(&mut self, ts: Timestamp, bytes: u64, expects_answer: bool) { self.transfer_stats_accounting.add_up(bytes); self.peer_stats.rpc_stats.messages_sent += 1; self.peer_stats.rpc_stats.failed_to_send = 0; if expects_answer { self.peer_stats.rpc_stats.questions_in_flight += 1; - self.peer_stats.rpc_stats.last_question = Some(ts); + self.peer_stats.rpc_stats.last_question_ts = Some(ts); } } - pub(super) fn question_rcvd(&mut self, ts: u64, bytes: u64) { + pub(super) fn question_rcvd(&mut self, ts: Timestamp, bytes: u64) { self.transfer_stats_accounting.add_down(bytes); self.peer_stats.rpc_stats.messages_rcvd += 1; self.touch_last_seen(ts); } - pub(super) fn answer_sent(&mut self, bytes: u64) { + pub(super) fn answer_sent(&mut self, bytes: ByteCount) { self.transfer_stats_accounting.add_up(bytes); self.peer_stats.rpc_stats.messages_sent += 1; self.peer_stats.rpc_stats.failed_to_send = 0; } - pub(super) fn answer_rcvd(&mut self, send_ts: u64, recv_ts: u64, bytes: u64) { + pub(super) fn answer_rcvd(&mut self, send_ts: Timestamp, recv_ts: Timestamp, bytes: ByteCount) { self.transfer_stats_accounting.add_down(bytes); self.peer_stats.rpc_stats.messages_rcvd += 1; self.peer_stats.rpc_stats.questions_in_flight -= 1; @@ -730,9 +730,9 @@ impl BucketEntryInner { self.peer_stats.rpc_stats.questions_in_flight -= 1; self.peer_stats.rpc_stats.recent_lost_answers += 1; } - pub(super) fn failed_to_send(&mut self, ts: u64, expects_answer: bool) { + pub(super) fn failed_to_send(&mut self, ts: Timestamp, expects_answer: bool) { if expects_answer { - self.peer_stats.rpc_stats.last_question = Some(ts); + self.peer_stats.rpc_stats.last_question_ts = Some(ts); } self.peer_stats.rpc_stats.failed_to_send += 1; self.peer_stats.rpc_stats.first_consecutive_seen_ts = None; @@ -747,7 +747,7 @@ pub struct BucketEntry { impl BucketEntry { pub(super) fn new() -> Self { - let now = get_timestamp(); + let now = get_aligned_timestamp(); Self { ref_count: AtomicU32::new(0), inner: RwLock::new(BucketEntryInner { diff --git a/veilid-core/src/routing_table/debug.rs b/veilid-core/src/routing_table/debug.rs index 45779587..64299c78 100644 --- a/veilid-core/src/routing_table/debug.rs +++ b/veilid-core/src/routing_table/debug.rs @@ -104,7 +104,7 @@ impl RoutingTable { pub(crate) fn debug_info_entries(&self, limit: usize, min_state: BucketEntryState) -> String { let inner = self.inner.read(); let inner = &*inner; - let cur_ts = get_timestamp(); + let cur_ts = get_aligned_timestamp(); let mut out = String::new(); @@ -164,7 +164,7 @@ impl RoutingTable { pub(crate) fn debug_info_buckets(&self, min_state: BucketEntryState) -> String { let inner = self.inner.read(); let inner = &*inner; - let cur_ts = get_timestamp(); + let cur_ts = get_aligned_timestamp(); let mut out = String::new(); const COLS: usize = 16; diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 32ff20d3..e83fb9eb 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -468,14 +468,14 @@ impl RoutingTable { pub fn get_nodes_needing_ping( &self, routing_domain: RoutingDomain, - cur_ts: u64, + cur_ts: Timestamp, ) -> Vec { self.inner .read() .get_nodes_needing_ping(self.clone(), routing_domain, cur_ts) } - pub fn get_all_nodes(&self, cur_ts: u64) -> Vec { + pub fn get_all_nodes(&self, cur_ts: Timestamp) -> Vec { let inner = self.inner.read(); inner.get_all_nodes(self.clone(), cur_ts) } @@ -542,7 +542,7 @@ impl RoutingTable { &self, node_id: DHTKey, descriptor: ConnectionDescriptor, - timestamp: u64, + timestamp: Timestamp, ) -> Option { self.inner.write().register_node_with_existing_connection( self.clone(), @@ -774,7 +774,7 @@ impl RoutingTable { pub fn find_peers_with_sort_and_filter( &self, node_count: usize, - cur_ts: u64, + cur_ts: Timestamp, filters: VecDeque, compare: C, transform: T, @@ -969,7 +969,7 @@ impl RoutingTable { pub fn find_inbound_relay( &self, routing_domain: RoutingDomain, - cur_ts: u64, + cur_ts: Timestamp, ) -> Option { // Get relay filter function let relay_node_filter = match routing_domain { diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 8ae8929a..7e53effe 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -119,7 +119,7 @@ pub trait NodeRefBase: Sized { fn set_min_max_version(&self, min_max_version: VersionRange) { self.operate_mut(|_rti, e| e.set_min_max_version(min_max_version)) } - fn state(&self, cur_ts: u64) -> BucketEntryState { + fn state(&self, cur_ts: Timestamp) -> BucketEntryState { self.operate(|_rti, e| e.state(cur_ts)) } fn peer_stats(&self) -> PeerStats { @@ -140,21 +140,21 @@ pub trait NodeRefBase: Sized { .unwrap_or(false) }) } - fn node_info_ts(&self, routing_domain: RoutingDomain) -> u64 { + fn node_info_ts(&self, routing_domain: RoutingDomain) -> Timestamp { self.operate(|_rti, e| { e.signed_node_info(routing_domain) .map(|sni| sni.timestamp()) - .unwrap_or(0u64) + .unwrap_or(0u64.into()) }) } fn has_seen_our_node_info_ts( &self, routing_domain: RoutingDomain, - our_node_info_ts: u64, + our_node_info_ts: Timestamp, ) -> bool { self.operate(|_rti, e| e.has_seen_our_node_info_ts(routing_domain, our_node_info_ts)) } - fn set_our_node_info_ts(&self, routing_domain: RoutingDomain, seen_ts: u64) { + fn set_our_node_info_ts(&self, routing_domain: RoutingDomain, seen_ts: Timestamp) { self.operate_mut(|_rti, e| e.set_our_node_info_ts(routing_domain, seen_ts)); } fn network_class(&self, routing_domain: RoutingDomain) -> Option { @@ -277,7 +277,7 @@ pub trait NodeRefBase: Sized { self.operate_mut(|_rti, e| e.clear_last_connections()) } - fn set_last_connection(&self, connection_descriptor: ConnectionDescriptor, ts: u64) { + fn set_last_connection(&self, connection_descriptor: ConnectionDescriptor, ts: Timestamp) { self.operate_mut(|rti, e| { e.set_last_connection(connection_descriptor, ts); rti.touch_recent_peer(self.common().node_id, connection_descriptor); @@ -297,25 +297,25 @@ pub trait NodeRefBase: Sized { }) } - fn stats_question_sent(&self, ts: u64, bytes: u64, expects_answer: bool) { + fn stats_question_sent(&self, ts: Timestamp, bytes: Timestamp, expects_answer: bool) { self.operate_mut(|rti, e| { rti.transfer_stats_accounting().add_up(bytes); e.question_sent(ts, bytes, expects_answer); }) } - fn stats_question_rcvd(&self, ts: u64, bytes: u64) { + fn stats_question_rcvd(&self, ts: Timestamp, bytes: ByteCount) { self.operate_mut(|rti, e| { rti.transfer_stats_accounting().add_down(bytes); e.question_rcvd(ts, bytes); }) } - fn stats_answer_sent(&self, bytes: u64) { + fn stats_answer_sent(&self, bytes: ByteCount) { self.operate_mut(|rti, e| { rti.transfer_stats_accounting().add_up(bytes); e.answer_sent(bytes); }) } - fn stats_answer_rcvd(&self, send_ts: u64, recv_ts: u64, bytes: u64) { + fn stats_answer_rcvd(&self, send_ts: Timestamp, recv_ts: Timestamp, bytes: ByteCount) { self.operate_mut(|rti, e| { rti.transfer_stats_accounting().add_down(bytes); rti.latency_stats_accounting() @@ -328,7 +328,7 @@ pub trait NodeRefBase: Sized { e.question_lost(); }) } - fn stats_failed_to_send(&self, ts: u64, expects_answer: bool) { + fn stats_failed_to_send(&self, ts: Timestamp, expects_answer: bool) { self.operate_mut(|_rti, e| { e.failed_to_send(ts, expects_answer); }) diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index fbf21951..2e6d8fda 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -7,7 +7,7 @@ use rkyv::{ /// The size of the remote private route cache const REMOTE_PRIVATE_ROUTE_CACHE_SIZE: usize = 1024; /// Remote private route cache entries expire in 5 minutes if they haven't been used -const REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY: u64 = 300_000_000u64; +const REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY: TimestampDuration = 300_000_000u64.into(); /// Amount of time a route can remain idle before it gets tested const ROUTE_MIN_IDLE_TIME_MS: u32 = 30_000; @@ -80,25 +80,25 @@ impl RouteStats { } /// Mark a route as having received something - pub fn record_received(&mut self, cur_ts: u64, bytes: u64) { + pub fn record_received(&mut self, cur_ts: Timestamp, bytes: ByteCount) { self.last_received_ts = Some(cur_ts); self.last_tested_ts = Some(cur_ts); self.transfer_stats_accounting.add_down(bytes); } /// Mark a route as having been sent to - pub fn record_sent(&mut self, cur_ts: u64, bytes: u64) { + pub fn record_sent(&mut self, cur_ts: Timestamp, bytes: ByteCount) { self.last_sent_ts = Some(cur_ts); self.transfer_stats_accounting.add_up(bytes); } /// Mark a route as having been sent to - pub fn record_latency(&mut self, latency: u64) { + pub fn record_latency(&mut self, latency: TimestampDuration) { self.latency_stats = self.latency_stats_accounting.record_latency(latency); } /// Mark a route as having been tested - pub fn record_tested(&mut self, cur_ts: u64) { + pub fn record_tested(&mut self, cur_ts: Timestamp) { self.last_tested_ts = Some(cur_ts); // Reset question_lost and failed_to_send if we test clean @@ -107,7 +107,7 @@ impl RouteStats { } /// Roll transfers for these route stats - pub fn roll_transfers(&mut self, last_ts: u64, cur_ts: u64) { + pub fn roll_transfers(&mut self, last_ts: Timestamp, cur_ts: Timestamp) { self.transfer_stats_accounting.roll_transfers( last_ts, cur_ts, @@ -133,7 +133,7 @@ impl RouteStats { } /// Check if a route needs testing - pub fn needs_testing(&self, cur_ts: u64) -> bool { + pub fn needs_testing(&self, cur_ts: Timestamp) -> bool { // Has the route had any failures lately? if self.questions_lost > 0 || self.failed_to_send > 0 { // If so, always test @@ -634,7 +634,7 @@ impl RouteSpecStore { .map(|nr| nr.node_id()); // Get list of all nodes, and sort them for selection - let cur_ts = get_timestamp(); + let cur_ts = get_aligned_timestamp(); let filter = Box::new( move |rti: &RoutingTableInner, k: DHTKey, v: Option>| -> bool { // Exclude our own node from routes @@ -872,22 +872,25 @@ impl RouteSpecStore { Ok(Some(public_key)) } - #[instrument(level = "trace", skip(self, data), ret, err)] + #[instrument(level = "trace", skip(self, data), ret)] pub fn validate_signatures( &self, public_key: &DHTKey, signatures: &[DHTSignature], data: &[u8], last_hop_id: DHTKey, - ) -> EyreResult> { + ) -> Option<(DHTKeySecret, SafetySpec)> { let inner = &*self.inner.lock(); - let rsd = Self::detail(inner, &public_key).ok_or_else(|| eyre!("route does not exist"))?; + let Some(rsd) = Self::detail(inner, &public_key) else { + log_rpc!(debug "route does not exist: {:?}", public_key); + return None; + }; // Ensure we have the right number of signatures if signatures.len() != rsd.hops.len() - 1 { // Wrong number of signatures log_rpc!(debug "wrong number of signatures ({} should be {}) for routed operation on private route {}", signatures.len(), rsd.hops.len() - 1, public_key); - return Ok(None); + return None; } // Validate signatures to ensure the route was handled by the nodes and not messed with // This is in private route (reverse) order as we are receiving over the route @@ -897,18 +900,18 @@ impl RouteSpecStore { // Verify the node we received the routed operation from is the last hop in our route if *hop_public_key != last_hop_id { log_rpc!(debug "received routed operation from the wrong hop ({} should be {}) on private route {}", hop_public_key.encode(), last_hop_id.encode(), public_key); - return Ok(None); + return None; } } else { // Verify a signature for a hop node along the route if let Err(e) = verify(hop_public_key, data, &signatures[hop_n]) { log_rpc!(debug "failed to verify signature for hop {} at {} on private route {}: {}", hop_n, hop_public_key, public_key, e); - return Ok(None); + return None; } } } // We got the correct signatures, return a key and response safety spec - Ok(Some(( + Some(( rsd.secret_key, SafetySpec { preferred_route: Some(*public_key), @@ -916,7 +919,7 @@ impl RouteSpecStore { stability: rsd.stability, sequencing: rsd.sequencing, }, - ))) + )) } #[instrument(level = "trace", skip(self), ret, err)] @@ -1002,7 +1005,7 @@ impl RouteSpecStore { pub async fn test_route(&self, key: &DHTKey) -> EyreResult { let is_remote = { let inner = &mut *self.inner.lock(); - let cur_ts = get_timestamp(); + let cur_ts = get_aligned_timestamp(); Self::with_peek_remote_private_route(inner, cur_ts, key, |_| {}).is_some() }; if is_remote { @@ -1066,7 +1069,7 @@ impl RouteSpecStore { pub fn release_route(&self, key: &DHTKey) -> bool { let is_remote = { let inner = &mut *self.inner.lock(); - let cur_ts = get_timestamp(); + let cur_ts = get_aligned_timestamp(); Self::with_peek_remote_private_route(inner, cur_ts, key, |_| {}).is_some() }; if is_remote { @@ -1087,7 +1090,7 @@ impl RouteSpecStore { directions: DirectionSet, avoid_node_ids: &[DHTKey], ) -> Option { - let cur_ts = get_timestamp(); + let cur_ts = get_aligned_timestamp(); let mut routes = Vec::new(); @@ -1167,7 +1170,7 @@ impl RouteSpecStore { /// Get the debug description of a route pub fn debug_route(&self, key: &DHTKey) -> Option { let inner = &mut *self.inner.lock(); - let cur_ts = get_timestamp(); + let cur_ts = get_aligned_timestamp(); // If this is a remote route, print it if let Some(s) = Self::with_peek_remote_private_route(inner, cur_ts, key, |rpi| format!("{:#?}", rpi)) @@ -1570,7 +1573,7 @@ impl RouteSpecStore { } // store the private route in our cache - let cur_ts = get_timestamp(); + let cur_ts = get_aligned_timestamp(); let key = Self::with_create_remote_private_route(inner, cur_ts, private_route, |r| { r.private_route.as_ref().unwrap().public_key.clone() }); @@ -1593,7 +1596,7 @@ impl RouteSpecStore { /// Retrieve an imported remote private route by its public key pub fn get_remote_private_route(&self, key: &DHTKey) -> Option { let inner = &mut *self.inner.lock(); - let cur_ts = get_timestamp(); + let cur_ts = get_aligned_timestamp(); Self::with_get_remote_private_route(inner, cur_ts, key, |r| { r.private_route.as_ref().unwrap().clone() }) @@ -1602,7 +1605,7 @@ impl RouteSpecStore { /// Retrieve an imported remote private route by its public key but don't 'touch' it pub fn peek_remote_private_route(&self, key: &DHTKey) -> Option { let inner = &mut *self.inner.lock(); - let cur_ts = get_timestamp(); + let cur_ts = get_aligned_timestamp(); Self::with_peek_remote_private_route(inner, cur_ts, key, |r| { r.private_route.as_ref().unwrap().clone() }) @@ -1611,7 +1614,7 @@ impl RouteSpecStore { // get or create a remote private route cache entry fn with_create_remote_private_route( inner: &mut RouteSpecStoreInner, - cur_ts: u64, + cur_ts: Timestamp, private_route: PrivateRoute, f: F, ) -> R @@ -1660,7 +1663,7 @@ impl RouteSpecStore { // get a remote private route cache entry fn with_get_remote_private_route( inner: &mut RouteSpecStoreInner, - cur_ts: u64, + cur_ts: Timestamp, key: &DHTKey, f: F, ) -> Option @@ -1680,7 +1683,7 @@ impl RouteSpecStore { // peek a remote private route cache entry fn with_peek_remote_private_route( inner: &mut RouteSpecStoreInner, - cur_ts: u64, + cur_ts: Timestamp, key: &DHTKey, f: F, ) -> Option @@ -1714,7 +1717,7 @@ impl RouteSpecStore { let opt_rpr_node_info_ts = { let inner = &mut *self.inner.lock(); - let cur_ts = get_timestamp(); + let cur_ts = get_aligned_timestamp(); Self::with_peek_remote_private_route(inner, cur_ts, key, |rpr| { rpr.last_seen_our_node_info_ts }) @@ -1736,7 +1739,7 @@ impl RouteSpecStore { pub fn mark_remote_private_route_seen_our_node_info( &self, key: &DHTKey, - cur_ts: u64, + cur_ts: Timestamp, ) -> EyreResult<()> { let our_node_info_ts = { let rti = &*self.unlocked_inner.routing_table.inner.read(); @@ -1765,7 +1768,7 @@ impl RouteSpecStore { } /// Get the route statistics for any route we know about, local or remote - pub fn with_route_stats(&self, cur_ts: u64, key: &DHTKey, f: F) -> Option + pub fn with_route_stats(&self, cur_ts: Timestamp, key: &DHTKey, f: F) -> Option where F: FnOnce(&mut RouteStats) -> R, { @@ -1822,7 +1825,7 @@ impl RouteSpecStore { } /// Process transfer statistics to get averages - pub fn roll_transfers(&self, last_ts: u64, cur_ts: u64) { + pub fn roll_transfers(&self, last_ts: Timestamp, cur_ts: Timestamp) { let inner = &mut *self.inner.lock(); // Roll transfers for locally allocated routes diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index b867bc03..de74e230 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -227,7 +227,7 @@ impl RoutingTableInner { } pub fn reset_all_updated_since_last_network_change(&mut self) { - let cur_ts = get_timestamp(); + let cur_ts = get_aligned_timestamp(); self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, _, v| { v.with_mut(rti, |_rti, e| { e.set_updated_since_last_network_change(false) @@ -347,7 +347,7 @@ impl RoutingTableInner { // If the local network topology has changed, nuke the existing local node info and let new local discovery happen if changed { - let cur_ts = get_timestamp(); + let cur_ts = get_aligned_timestamp(); self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, _, e| { e.with_mut(rti, |_rti, e| { e.clear_signed_node_info(RoutingDomain::LocalNetwork); @@ -426,7 +426,7 @@ impl RoutingTableInner { min_state: BucketEntryState, ) -> usize { let mut count = 0usize; - let cur_ts = get_timestamp(); + let cur_ts = get_aligned_timestamp(); self.with_entries(cur_ts, min_state, |rti, _, e| { if e.with(rti, |rti, e| e.best_routing_domain(rti, routing_domain_set)) .is_some() @@ -466,7 +466,7 @@ impl RoutingTableInner { F: FnMut(&mut RoutingTableInner, DHTKey, Arc) -> Option, >( &mut self, - cur_ts: u64, + cur_ts: Timestamp, min_state: BucketEntryState, mut f: F, ) -> Option { @@ -491,7 +491,7 @@ impl RoutingTableInner { &self, outer_self: RoutingTable, routing_domain: RoutingDomain, - cur_ts: u64, + cur_ts: Timestamp, ) -> Vec { // Collect relay nodes let opt_relay_id = self.with_routing_domain(routing_domain, |rd| { @@ -531,7 +531,7 @@ impl RoutingTableInner { node_refs } - pub fn get_all_nodes(&self, outer_self: RoutingTable, cur_ts: u64) -> Vec { + pub fn get_all_nodes(&self, outer_self: RoutingTable, cur_ts: Timestamp) -> Vec { let mut node_refs = Vec::::with_capacity(self.bucket_entry_count); self.with_entries(cur_ts, BucketEntryState::Unreliable, |_rti, k, v| { node_refs.push(NodeRef::new(outer_self.clone(), k, v, None)); @@ -700,7 +700,7 @@ impl RoutingTableInner { outer_self: RoutingTable, node_id: DHTKey, descriptor: ConnectionDescriptor, - timestamp: u64, + timestamp: Timestamp, ) -> Option { let out = self.create_node_ref(outer_self, node_id, |_rti, e| { // this node is live because it literally just connected to us @@ -719,7 +719,7 @@ impl RoutingTableInner { pub fn get_routing_table_health(&self) -> RoutingTableHealth { let mut health = RoutingTableHealth::default(); - let cur_ts = get_timestamp(); + let cur_ts = get_aligned_timestamp(); for bucket in &self.buckets { for (_, v) in bucket.entries() { match v.with(self, |_rti, e| e.state(cur_ts)) { @@ -876,7 +876,7 @@ impl RoutingTableInner { where T: for<'r> FnMut(&'r RoutingTableInner, DHTKey, Option>) -> O, { - let cur_ts = get_timestamp(); + let cur_ts = get_aligned_timestamp(); // Add filter to remove dead nodes always let filter_dead = Box::new( @@ -961,7 +961,7 @@ impl RoutingTableInner { where T: for<'r> FnMut(&'r RoutingTableInner, DHTKey, Option>) -> O, { - let cur_ts = get_timestamp(); + let cur_ts = get_aligned_timestamp(); let node_count = { let config = self.config(); let c = config.get(); diff --git a/veilid-core/src/routing_table/stats_accounting.rs b/veilid-core/src/routing_table/stats_accounting.rs index c02ab6ca..2c711493 100644 --- a/veilid-core/src/routing_table/stats_accounting.rs +++ b/veilid-core/src/routing_table/stats_accounting.rs @@ -13,8 +13,8 @@ pub const ROLLING_TRANSFERS_INTERVAL_SECS: u32 = 1; #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] pub struct TransferCount { - down: u64, - up: u64, + down: ByteCount, + up: ByteCount, } #[derive(Debug, Clone, Default)] @@ -31,18 +31,18 @@ impl TransferStatsAccounting { } } - pub fn add_down(&mut self, bytes: u64) { + pub fn add_down(&mut self, bytes: ByteCount) { self.current_transfer.down += bytes; } - pub fn add_up(&mut self, bytes: u64) { + pub fn add_up(&mut self, bytes: ByteCount) { self.current_transfer.up += bytes; } pub fn roll_transfers( &mut self, - last_ts: u64, - cur_ts: u64, + last_ts: Timestamp, + cur_ts: Timestamp, transfer_stats: &mut TransferStatsDownUp, ) { let dur_ms = cur_ts.saturating_sub(last_ts) / 1000u64; @@ -80,7 +80,7 @@ impl TransferStatsAccounting { #[derive(Debug, Clone, Default)] pub struct LatencyStatsAccounting { - rolling_latencies: VecDeque, + rolling_latencies: VecDeque, } impl LatencyStatsAccounting { @@ -90,7 +90,7 @@ impl LatencyStatsAccounting { } } - pub fn record_latency(&mut self, latency: u64) -> veilid_api::LatencyStats { + pub fn record_latency(&mut self, latency: TimestampDuration) -> veilid_api::LatencyStats { while self.rolling_latencies.len() >= ROLLING_LATENCIES_SIZE { self.rolling_latencies.pop_front(); } diff --git a/veilid-core/src/routing_table/tasks/kick_buckets.rs b/veilid-core/src/routing_table/tasks/kick_buckets.rs index 38eef8af..318f1915 100644 --- a/veilid-core/src/routing_table/tasks/kick_buckets.rs +++ b/veilid-core/src/routing_table/tasks/kick_buckets.rs @@ -7,8 +7,8 @@ impl RoutingTable { pub(crate) async fn kick_buckets_task_routine( self, _stop_token: StopToken, - _last_ts: u64, - cur_ts: u64, + _last_ts: Timestamp, + cur_ts: Timestamp, ) -> EyreResult<()> { let kick_queue: Vec = core::mem::take(&mut *self.unlocked_inner.kick_queue.lock()) .into_iter() diff --git a/veilid-core/src/routing_table/tasks/ping_validator.rs b/veilid-core/src/routing_table/tasks/ping_validator.rs index 2460d29f..46f09949 100644 --- a/veilid-core/src/routing_table/tasks/ping_validator.rs +++ b/veilid-core/src/routing_table/tasks/ping_validator.rs @@ -10,7 +10,7 @@ impl RoutingTable { #[instrument(level = "trace", skip(self), err)] fn ping_validator_public_internet( &self, - cur_ts: u64, + cur_ts: Timestamp, unord: &mut FuturesUnordered< SendPinBoxFuture>>, RPCError>>, >, diff --git a/veilid-core/src/routing_table/tasks/private_route_management.rs b/veilid-core/src/routing_table/tasks/private_route_management.rs index f790afe0..7ae002ef 100644 --- a/veilid-core/src/routing_table/tasks/private_route_management.rs +++ b/veilid-core/src/routing_table/tasks/private_route_management.rs @@ -72,8 +72,8 @@ impl RoutingTable { pub(crate) async fn private_route_management_task_routine( self, stop_token: StopToken, - _last_ts: u64, - cur_ts: u64, + _last_ts: Timestamp, + cur_ts: Timestamp, ) -> EyreResult<()> { // Get our node's current node info and network class and do the right thing let network_class = self diff --git a/veilid-core/src/routing_table/tasks/relay_management.rs b/veilid-core/src/routing_table/tasks/relay_management.rs index 86cfdd3b..4e685c02 100644 --- a/veilid-core/src/routing_table/tasks/relay_management.rs +++ b/veilid-core/src/routing_table/tasks/relay_management.rs @@ -6,8 +6,8 @@ impl RoutingTable { pub(crate) async fn relay_management_task_routine( self, _stop_token: StopToken, - _last_ts: u64, - cur_ts: u64, + _last_ts: Timestamp, + cur_ts: Timestamp, ) -> EyreResult<()> { // Get our node's current node info and network class and do the right thing let Some(own_peer_info) = self.get_own_peer_info(RoutingDomain::PublicInternet) else { diff --git a/veilid-core/src/routing_table/tasks/rolling_transfers.rs b/veilid-core/src/routing_table/tasks/rolling_transfers.rs index 04177c01..436381ec 100644 --- a/veilid-core/src/routing_table/tasks/rolling_transfers.rs +++ b/veilid-core/src/routing_table/tasks/rolling_transfers.rs @@ -6,8 +6,8 @@ impl RoutingTable { pub(crate) async fn rolling_transfers_task_routine( self, _stop_token: StopToken, - last_ts: u64, - cur_ts: u64, + last_ts: Timestamp, + cur_ts: Timestamp, ) -> EyreResult<()> { // log_rtab!("--- rolling_transfers task"); { diff --git a/veilid-core/src/rpc_processor/coders/operations/operation.rs b/veilid-core/src/rpc_processor/coders/operations/operation.rs index 46651748..f23c1df8 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation.rs @@ -53,9 +53,9 @@ impl RPCOperationKind { #[derive(Debug, Clone)] pub struct RPCOperation { - op_id: u64, + op_id: OperationId, sender_node_info: Option, - target_node_info_ts: u64, + target_node_info_ts: Timestamp, kind: RPCOperationKind, } @@ -65,7 +65,7 @@ impl RPCOperation { sender_signed_node_info: SenderSignedNodeInfo, ) -> Self { Self { - op_id: get_random_u64(), + op_id: OperationId::new(get_random_u64()), sender_node_info: sender_signed_node_info.signed_node_info, target_node_info_ts: sender_signed_node_info.target_node_info_ts, kind: RPCOperationKind::Question(question), @@ -76,7 +76,7 @@ impl RPCOperation { sender_signed_node_info: SenderSignedNodeInfo, ) -> Self { Self { - op_id: get_random_u64(), + op_id: OperationId::new(get_random_u64()), sender_node_info: sender_signed_node_info.signed_node_info, target_node_info_ts: sender_signed_node_info.target_node_info_ts, kind: RPCOperationKind::Statement(statement), @@ -96,14 +96,14 @@ impl RPCOperation { } } - pub fn op_id(&self) -> u64 { + pub fn op_id(&self) -> OperationId { self.op_id } pub fn sender_node_info(&self) -> Option<&SignedNodeInfo> { self.sender_node_info.as_ref() } - pub fn target_node_info_ts(&self) -> u64 { + pub fn target_node_info_ts(&self) -> Timestamp { self.target_node_info_ts } @@ -119,7 +119,7 @@ impl RPCOperation { operation_reader: &veilid_capnp::operation::Reader, opt_sender_node_id: Option<&DHTKey>, ) -> Result { - let op_id = operation_reader.get_op_id(); + let op_id = OperationId::new(operation_reader.get_op_id()); let sender_node_info = if operation_reader.has_sender_node_info() { if let Some(sender_node_id) = opt_sender_node_id { @@ -135,7 +135,7 @@ impl RPCOperation { None }; - let target_node_info_ts = operation_reader.get_target_node_info_ts(); + let target_node_info_ts = Timestamp::new(operation_reader.get_target_node_info_ts()); let kind_reader = operation_reader.get_kind(); let kind = RPCOperationKind::decode(&kind_reader)?; @@ -149,12 +149,12 @@ impl RPCOperation { } pub fn encode(&self, builder: &mut veilid_capnp::operation::Builder) -> Result<(), RPCError> { - builder.set_op_id(self.op_id); + builder.set_op_id(self.op_id.as_u64()); if let Some(sender_info) = &self.sender_node_info { let mut si_builder = builder.reborrow().init_sender_node_info(); encode_signed_node_info(&sender_info, &mut si_builder)?; } - builder.set_target_node_info_ts(self.target_node_info_ts); + builder.set_target_node_info_ts(self.target_node_info_ts.as_u64()); let mut k_builder = builder.reborrow().init_kind(); self.kind.encode(&mut k_builder)?; Ok(()) diff --git a/veilid-core/src/rpc_processor/coders/signed_direct_node_info.rs b/veilid-core/src/rpc_processor/coders/signed_direct_node_info.rs index 7b583e21..bca21bb5 100644 --- a/veilid-core/src/rpc_processor/coders/signed_direct_node_info.rs +++ b/veilid-core/src/rpc_processor/coders/signed_direct_node_info.rs @@ -10,7 +10,7 @@ pub fn encode_signed_direct_node_info( builder .reborrow() - .set_timestamp(signed_direct_node_info.timestamp); + .set_timestamp(signed_direct_node_info.timestamp.into()); let mut sig_builder = builder.reborrow().init_signature(); let Some(signature) = &signed_direct_node_info.signature else { @@ -36,7 +36,7 @@ pub fn decode_signed_direct_node_info( .get_signature() .map_err(RPCError::protocol)?; - let timestamp = reader.reborrow().get_timestamp(); + let timestamp = reader.reborrow().get_timestamp().into(); let signature = decode_signature(&sig_reader); diff --git a/veilid-core/src/rpc_processor/coders/signed_relayed_node_info.rs b/veilid-core/src/rpc_processor/coders/signed_relayed_node_info.rs index 924a00ad..21d3266b 100644 --- a/veilid-core/src/rpc_processor/coders/signed_relayed_node_info.rs +++ b/veilid-core/src/rpc_processor/coders/signed_relayed_node_info.rs @@ -16,7 +16,7 @@ pub fn encode_signed_relayed_node_info( builder .reborrow() - .set_timestamp(signed_relayed_node_info.timestamp); + .set_timestamp(signed_relayed_node_info.timestamp.into()); let mut sig_builder = builder.reborrow().init_signature(); encode_signature(&signed_relayed_node_info.signature, &mut sig_builder); @@ -50,7 +50,7 @@ pub fn decode_signed_relayed_node_info( .reborrow() .get_signature() .map_err(RPCError::protocol)?; - let timestamp = reader.reborrow().get_timestamp(); + let timestamp = reader.reborrow().get_timestamp().into(); let signature = decode_signature(&sig_reader); diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 149c3b8c..01ee69ae 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -37,8 +37,6 @@ use stop_token::future::FutureExt; ///////////////////////////////////////////////////////////////////// -type OperationId = u64; - #[derive(Debug, Clone)] struct RPCMessageHeaderDetailDirect { /// The decoded header of the envelope @@ -82,9 +80,9 @@ enum RPCMessageHeaderDetail { #[derive(Debug, Clone)] struct RPCMessageHeader { /// Time the message was received, not sent - timestamp: u64, + timestamp: Timestamp, /// The length in bytes of the rpc message body - body_len: u64, + body_len: ByteCount, /// The header detail depending on which way the message was received detail: RPCMessageHeaderDetail, } @@ -139,9 +137,9 @@ where #[derive(Debug)] struct WaitableReply { handle: OperationWaitHandle, - timeout: u64, + timeout_us: TimestampDuration, node_ref: NodeRef, - send_ts: u64, + send_ts: Timestamp, send_data_kind: SendDataKind, safety_route: Option, remote_private_route: Option, @@ -152,11 +150,11 @@ struct WaitableReply { #[derive(Clone, Debug, Default)] pub struct Answer { - pub latency: u64, // how long it took to get this answer - pub answer: T, // the answer itself + pub latency: TimestampDuration, // how long it took to get this answer + pub answer: T, // the answer itself } impl Answer { - pub fn new(latency: u64, answer: T) -> Self { + pub fn new(latency: TimestampDuration, answer: T) -> Self { Self { latency, answer } } } @@ -185,16 +183,16 @@ pub struct SenderSignedNodeInfo { /// The current signed node info of the sender if required signed_node_info: Option, /// The last timestamp of the target's node info to assist remote node with sending its latest node info - target_node_info_ts: u64, + target_node_info_ts: Timestamp, } impl SenderSignedNodeInfo { - pub fn new_no_sni(target_node_info_ts: u64) -> Self { + pub fn new_no_sni(target_node_info_ts: Timestamp) -> Self { Self { signed_node_info: None, target_node_info_ts, } } - pub fn new(sender_signed_node_info: SignedNodeInfo, target_node_info_ts: u64) -> Self { + pub fn new(sender_signed_node_info: SignedNodeInfo, target_node_info_ts: Timestamp) -> Self { Self { signed_node_info: Some(sender_signed_node_info), target_node_info_ts, @@ -218,7 +216,7 @@ pub struct RPCProcessorInner { } pub struct RPCProcessorUnlockedInner { - timeout: u64, + timeout_us: TimestampDuration, queue_size: u32, concurrency: u32, max_route_hop_count: usize, @@ -267,7 +265,7 @@ impl RPCProcessor { let validate_dial_info_receipt_time_ms = c.network.dht.validate_dial_info_receipt_time_ms; RPCProcessorUnlockedInner { - timeout, + timeout_us: timeout, queue_size, concurrency, max_route_hop_count, @@ -445,11 +443,11 @@ impl RPCProcessor { async fn wait_for_reply( &self, waitable_reply: WaitableReply, - ) -> Result, RPCError> { + ) -> Result, RPCError> { let out = self .unlocked_inner .waiting_rpc_table - .wait_for_op(waitable_reply.handle, waitable_reply.timeout) + .wait_for_op(waitable_reply.handle, waitable_reply.timeout_us) .await; match &out { Err(_) | Ok(TimeoutOr::Timeout) => { @@ -463,7 +461,7 @@ impl RPCProcessor { } Ok(TimeoutOr::Value((rpcreader, _))) => { // Reply received - let recv_ts = get_timestamp(); + let recv_ts = get_aligned_timestamp(); // Record answer received self.record_answer_received( @@ -759,7 +757,7 @@ impl RPCProcessor { fn record_send_failure( &self, rpc_kind: RPCKind, - send_ts: u64, + send_ts: Timestamp, node_ref: NodeRef, safety_route: Option, remote_private_route: Option, @@ -788,7 +786,7 @@ impl RPCProcessor { /// Record question lost to node or route fn record_question_lost( &self, - send_ts: u64, + send_ts: Timestamp, node_ref: NodeRef, safety_route: Option, remote_private_route: Option, @@ -827,8 +825,8 @@ impl RPCProcessor { fn record_send_success( &self, rpc_kind: RPCKind, - send_ts: u64, - bytes: u64, + send_ts: Timestamp, + bytes: ByteCount, node_ref: NodeRef, safety_route: Option, remote_private_route: Option, @@ -863,9 +861,9 @@ impl RPCProcessor { /// Record answer received from node or route fn record_answer_received( &self, - send_ts: u64, - recv_ts: u64, - bytes: u64, + send_ts: Timestamp, + recv_ts: Timestamp, + bytes: ByteCount, node_ref: NodeRef, safety_route: Option, remote_private_route: Option, @@ -1004,7 +1002,7 @@ impl RPCProcessor { let op_id = operation.op_id(); // Log rpc send - trace!(target: "rpc_message", dir = "send", kind = "question", op_id, desc = operation.kind().desc(), ?dest); + trace!(target: "rpc_message", dir = "send", kind = "question", op_id = op_id.as_u64(), desc = operation.kind().desc(), ?dest); // Produce rendered operation let RenderedOperation { @@ -1019,14 +1017,14 @@ impl RPCProcessor { // Calculate answer timeout // Timeout is number of hops times the timeout per hop - let timeout = self.unlocked_inner.timeout * (hop_count as u64); + let timeout_us = self.unlocked_inner.timeout_us * (hop_count as u64); // Set up op id eventual let handle = self.unlocked_inner.waiting_rpc_table.add_op_waiter(op_id); // Send question - let bytes = message.len() as u64; - let send_ts = get_timestamp(); + let bytes: ByteCount = (message.len() as u64).into(); + let send_ts = get_aligned_timestamp(); let send_data_kind = network_result_try!(self .network_manager() .send_envelope(node_ref.clone(), Some(node_id), message) @@ -1054,7 +1052,7 @@ impl RPCProcessor { // Pass back waitable reply completion Ok(NetworkResult::value(WaitableReply { handle, - timeout, + timeout_us, node_ref, send_ts, send_data_kind, @@ -1078,7 +1076,7 @@ impl RPCProcessor { let operation = RPCOperation::new_statement(statement, ssni); // Log rpc send - trace!(target: "rpc_message", dir = "send", kind = "statement", op_id = operation.op_id(), desc = operation.kind().desc(), ?dest); + trace!(target: "rpc_message", dir = "send", kind = "statement", op_id = operation.op_id().as_u64(), desc = operation.kind().desc(), ?dest); // Produce rendered operation let RenderedOperation { @@ -1092,8 +1090,8 @@ impl RPCProcessor { } = network_result_try!(self.render_operation(dest, &operation)?); // Send statement - let bytes = message.len() as u64; - let send_ts = get_timestamp(); + let bytes: ByteCount = (message.len() as u64).into(); + let send_ts = get_aligned_timestamp(); let _send_data_kind = network_result_try!(self .network_manager() .send_envelope(node_ref.clone(), Some(node_id), message) @@ -1139,7 +1137,7 @@ impl RPCProcessor { let operation = RPCOperation::new_answer(&request.operation, answer, ssni); // Log rpc send - trace!(target: "rpc_message", dir = "send", kind = "answer", op_id = operation.op_id(), desc = operation.kind().desc(), ?dest); + trace!(target: "rpc_message", dir = "send", kind = "answer", op_id = operation.op_id().as_u64(), desc = operation.kind().desc(), ?dest); // Produce rendered operation let RenderedOperation { @@ -1153,8 +1151,8 @@ impl RPCProcessor { } = network_result_try!(self.render_operation(dest, &operation)?); // Send the reply - let bytes = message.len() as u64; - let send_ts = get_timestamp(); + let bytes: ByteCount = (message.len() as u64).into(); + let send_ts = get_aligned_timestamp(); network_result_try!(self.network_manager() .send_envelope(node_ref.clone(), Some(node_id), message) .await @@ -1284,7 +1282,7 @@ impl RPCProcessor { }; // Log rpc receive - trace!(target: "rpc_message", dir = "recv", kind, op_id = msg.operation.op_id(), desc = msg.operation.kind().desc(), header = ?msg.header); + trace!(target: "rpc_message", dir = "recv", kind, op_id = msg.operation.op_id().as_u64(), desc = msg.operation.kind().desc(), header = ?msg.header); // Process specific message kind match msg.operation.kind() { @@ -1366,7 +1364,7 @@ impl RPCProcessor { connection_descriptor, routing_domain, }), - timestamp: get_timestamp(), + timestamp: get_aligned_timestamp(), body_len: body.len() as u64, }, data: RPCMessageData { contents: body }, @@ -1395,8 +1393,8 @@ impl RPCProcessor { remote_safety_route, sequencing, }), - timestamp: get_timestamp(), - body_len: body.len() as u64, + timestamp: get_aligned_timestamp(), + body_len: (body.len() as u64).into(), }, data: RPCMessageData { contents: body }, }; @@ -1428,8 +1426,8 @@ impl RPCProcessor { safety_spec, }, ), - timestamp: get_timestamp(), - body_len: body.len() as u64, + timestamp: get_aligned_timestamp(), + body_len: (body.len() as u64).into(), }, data: RPCMessageData { contents: body }, }; diff --git a/veilid-core/src/rpc_processor/operation_waiter.rs b/veilid-core/src/rpc_processor/operation_waiter.rs index 3f56e339..fb276942 100644 --- a/veilid-core/src/rpc_processor/operation_waiter.rs +++ b/veilid-core/src/rpc_processor/operation_waiter.rs @@ -104,9 +104,9 @@ where pub async fn wait_for_op( &self, mut handle: OperationWaitHandle, - timeout_us: u64, - ) -> Result, RPCError> { - let timeout_ms = u32::try_from(timeout_us / 1000u64) + timeout_us: TimestampDuration, + ) -> Result, RPCError> { + let timeout_ms = u32::try_from(timeout_us.as_u64() / 1000u64) .map_err(|e| RPCError::map_internal("invalid timeout")(e))?; // Take the instance @@ -114,7 +114,7 @@ where let eventual_instance = handle.eventual_instance.take().unwrap(); // wait for eventualvalue - let start_ts = get_timestamp(); + let start_ts = get_aligned_timestamp(); let res = timeout(timeout_ms, eventual_instance) .await .into_timeout_or(); @@ -125,7 +125,7 @@ where }) .map(|res| { let (_span_id, ret) = res.take_value().unwrap(); - let end_ts = get_timestamp(); + let end_ts = get_aligned_timestamp(); //xxx: causes crash (Missing otel data span extensions) // Span::current().follows_from(span_id); diff --git a/veilid-core/src/rpc_processor/rpc_app_call.rs b/veilid-core/src/rpc_processor/rpc_app_call.rs index e068e133..b0506be9 100644 --- a/veilid-core/src/rpc_processor/rpc_app_call.rs +++ b/veilid-core/src/rpc_processor/rpc_app_call.rs @@ -73,7 +73,7 @@ impl RPCProcessor { let res = self .unlocked_inner .waiting_app_call_table - .wait_for_op(handle, self.unlocked_inner.timeout) + .wait_for_op(handle, self.unlocked_inner.timeout_us) .await?; let (message, _latency) = match res { TimeoutOr::Timeout => { @@ -93,7 +93,7 @@ impl RPCProcessor { } /// Exposed to API for apps to return app call answers - pub async fn app_call_reply(&self, id: u64, message: Vec) -> Result<(), RPCError> { + pub async fn app_call_reply(&self, id: OperationId, message: Vec) -> Result<(), RPCError> { self.unlocked_inner .waiting_app_call_table .complete_op_waiter(id, message) diff --git a/veilid-core/src/rpc_processor/rpc_route.rs b/veilid-core/src/rpc_processor/rpc_route.rs index 1602858a..78a12696 100644 --- a/veilid-core/src/rpc_processor/rpc_route.rs +++ b/veilid-core/src/rpc_processor/rpc_route.rs @@ -196,7 +196,6 @@ impl RPCProcessor { &routed_operation.data, sender_id, ) - .map_err(RPCError::protocol)? else { return Ok(NetworkResult::invalid_message("signatures did not validate for private route")); }; diff --git a/veilid-core/src/veilid_api/aligned_u64.rs b/veilid-core/src/veilid_api/aligned_u64.rs new file mode 100644 index 00000000..f805490d --- /dev/null +++ b/veilid-core/src/veilid_api/aligned_u64.rs @@ -0,0 +1,110 @@ +use super::*; + +/// Aligned u64 +/// Required on 32-bit platforms for serialization because Rust aligns u64 on 4 byte boundaries +/// And zero-copy serialization with Rkyv requires 8-byte alignment + +#[derive( + Clone, + Default, + PartialEq, + Eq, + PartialOrd, + Ord, + Copy, + Hash, + Serialize, + Deserialize, + RkyvArchive, + RkyvSerialize, + RkyvDeserialize, +)] +#[repr(C, align(8))] +#[archive_attr(repr(C, align(8)), derive(CheckBytes))] +pub struct AlignedU64(u64); + +impl From for AlignedU64 { + fn from(v: u64) -> Self { + AlignedU64(v) + } +} +impl From for u64 { + fn from(v: AlignedU64) -> Self { + v.0 + } +} + +impl fmt::Display for AlignedU64 { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (&self.0 as &dyn fmt::Display).fmt(f) + } +} + +impl fmt::Debug for AlignedU64 { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (&self.0 as &dyn fmt::Debug).fmt(f) + } +} + +impl FromStr for AlignedU64 { + type Err = ::Err; + fn from_str(s: &str) -> Result { + Ok(AlignedU64(u64::from_str(s)?)) + } +} + +impl> core::ops::Add for AlignedU64 { + type Output = Self; + + fn add(self, rhs: Rhs) -> Self { + Self(self.0 + rhs.into()) + } +} + +impl> core::ops::AddAssign for AlignedU64 { + fn add_assign(&mut self, rhs: Rhs) { + self.0 += rhs.into(); + } +} + +impl> core::ops::Sub for AlignedU64 { + type Output = Self; + + fn sub(self, rhs: Rhs) -> Self { + Self(self.0 - rhs.into()) + } +} + +impl> core::ops::SubAssign for AlignedU64 { + fn sub_assign(&mut self, rhs: Rhs) { + self.0 -= rhs.into(); + } +} + +impl> core::ops::Mul for AlignedU64 { + type Output = Self; + + fn mul(self, rhs: Rhs) -> Self { + Self(self.0 * rhs.into()) + } +} + +impl> core::ops::Div for AlignedU64 { + type Output = Self; + + fn div(self, rhs: Rhs) -> Self { + Self(self.0 / rhs.into()) + } +} + +impl AlignedU64 { + pub const fn new(v: u64) -> Self { + Self(v) + } + pub fn as_u64(self) -> u64 { + self.0 + } + pub fn saturating_sub(self, rhs: Self) -> Self { + Self(self.0.saturating_sub(rhs.0)) + } +} diff --git a/veilid-core/src/veilid_api/api.rs b/veilid-core/src/veilid_api/api.rs index 4ef5e5e6..88d00f81 100644 --- a/veilid-core/src/veilid_api/api.rs +++ b/veilid-core/src/veilid_api/api.rs @@ -247,7 +247,11 @@ impl VeilidAPI { // App Calls #[instrument(level = "debug", skip(self))] - pub async fn app_call_reply(&self, id: u64, message: Vec) -> Result<(), VeilidAPIError> { + pub async fn app_call_reply( + &self, + id: OperationId, + message: Vec, + ) -> Result<(), VeilidAPIError> { let rpc_processor = self.rpc_processor()?; rpc_processor .app_call_reply(id, message) diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 9dd2cebe..d2d81c45 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -1,5 +1,6 @@ #![allow(dead_code)] +mod aligned_u64; mod api; mod debug; mod error; @@ -7,6 +8,7 @@ mod routing_context; mod serialize_helpers; mod types; +pub use aligned_u64::*; pub use api::*; pub use debug::*; pub use error::*; diff --git a/veilid-core/src/veilid_api/types.rs b/veilid-core/src/veilid_api/types.rs index baa3c9cb..cc074b97 100644 --- a/veilid-core/src/veilid_api/types.rs +++ b/veilid-core/src/veilid_api/types.rs @@ -2,6 +2,20 @@ use super::*; ///////////////////////////////////////////////////////////////////////////////////////////////////// +/// Microseconds since epoch +pub type Timestamp = AlignedU64; +pub fn get_aligned_timestamp() -> Timestamp { + get_timestamp().into() +} +/// Microseconds duration +pub type TimestampDuration = AlignedU64; +/// Request/Response matching id +pub type OperationId = AlignedU64; +/// Number of bytes +pub type ByteCount = AlignedU64; +/// Tunnel identifier +pub type TunnelId = AlignedU64; + #[derive( Debug, Clone, @@ -113,7 +127,7 @@ pub struct VeilidAppCall { pub message: Vec, /// The id to reply to #[serde(with = "json_as_string")] - pub id: u64, + pub id: OperationId, } #[derive( @@ -141,9 +155,9 @@ pub struct PeerTableData { pub struct VeilidStateNetwork { pub started: bool, #[serde(with = "json_as_string")] - pub bps_down: u64, + pub bps_down: ByteCount, #[serde(with = "json_as_string")] - pub bps_up: u64, + pub bps_up: ByteCount, pub peers: Vec, } @@ -1801,7 +1815,7 @@ impl MatchesDialInfoFilter for DialInfo { #[archive_attr(repr(C), derive(CheckBytes))] pub struct SignedDirectNodeInfo { pub node_info: NodeInfo, - pub timestamp: u64, + pub timestamp: Timestamp, pub signature: Option, } @@ -1809,7 +1823,7 @@ impl SignedDirectNodeInfo { pub fn new( node_id: NodeId, node_info: NodeInfo, - timestamp: u64, + timestamp: Timestamp, signature: DHTSignature, ) -> Result { let node_info_bytes = Self::make_signature_bytes(&node_info, timestamp)?; @@ -1826,7 +1840,7 @@ impl SignedDirectNodeInfo { node_info: NodeInfo, secret: &DHTKeySecret, ) -> Result { - let timestamp = get_timestamp(); + let timestamp = get_aligned_timestamp(); let node_info_bytes = Self::make_signature_bytes(&node_info, timestamp)?; let signature = sign(&node_id.key, secret, &node_info_bytes)?; Ok(Self { @@ -1838,7 +1852,7 @@ impl SignedDirectNodeInfo { fn make_signature_bytes( node_info: &NodeInfo, - timestamp: u64, + timestamp: Timestamp, ) -> Result, VeilidAPIError> { let mut node_info_bytes = Vec::new(); @@ -1849,7 +1863,7 @@ impl SignedDirectNodeInfo { node_info_bytes.append(&mut builder_to_vec(ni_msg).map_err(VeilidAPIError::internal)?); // Add timestamp to signature - node_info_bytes.append(&mut timestamp.to_le_bytes().to_vec()); + node_info_bytes.append(&mut timestamp.as_u64().to_le_bytes().to_vec()); Ok(node_info_bytes) } @@ -1858,7 +1872,7 @@ impl SignedDirectNodeInfo { Self { node_info, signature: None, - timestamp: get_timestamp(), + timestamp: get_aligned_timestamp(), } } @@ -1874,7 +1888,7 @@ pub struct SignedRelayedNodeInfo { pub node_info: NodeInfo, pub relay_id: NodeId, pub relay_info: SignedDirectNodeInfo, - pub timestamp: u64, + pub timestamp: Timestamp, pub signature: DHTSignature, } @@ -1884,7 +1898,7 @@ impl SignedRelayedNodeInfo { node_info: NodeInfo, relay_id: NodeId, relay_info: SignedDirectNodeInfo, - timestamp: u64, + timestamp: Timestamp, signature: DHTSignature, ) -> Result { let node_info_bytes = @@ -1906,7 +1920,7 @@ impl SignedRelayedNodeInfo { relay_info: SignedDirectNodeInfo, secret: &DHTKeySecret, ) -> Result { - let timestamp = get_timestamp(); + let timestamp = get_aligned_timestamp(); let node_info_bytes = Self::make_signature_bytes(&node_info, &relay_id, &relay_info, timestamp)?; let signature = sign(&node_id.key, secret, &node_info_bytes)?; @@ -1923,7 +1937,7 @@ impl SignedRelayedNodeInfo { node_info: &NodeInfo, relay_id: &NodeId, relay_info: &SignedDirectNodeInfo, - timestamp: u64, + timestamp: Timestamp, ) -> Result, VeilidAPIError> { let mut sig_bytes = Vec::new(); @@ -1968,7 +1982,7 @@ impl SignedNodeInfo { } } - pub fn timestamp(&self) -> u64 { + pub fn timestamp(&self) -> Timestamp { match self { SignedNodeInfo::Direct(d) => d.timestamp, SignedNodeInfo::Relayed(r) => r.timestamp, @@ -2201,11 +2215,11 @@ impl MatchesDialInfoFilter for ConnectionDescriptor { #[archive_attr(repr(C), derive(CheckBytes))] pub struct LatencyStats { #[serde(with = "json_as_string")] - pub fastest: u64, // fastest latency in the ROLLING_LATENCIES_SIZE last latencies + pub fastest: TimestampDuration, // fastest latency in the ROLLING_LATENCIES_SIZE last latencies #[serde(with = "json_as_string")] - pub average: u64, // average latency over the ROLLING_LATENCIES_SIZE last latencies + pub average: TimestampDuration, // average latency over the ROLLING_LATENCIES_SIZE last latencies #[serde(with = "json_as_string")] - pub slowest: u64, // slowest latency in the ROLLING_LATENCIES_SIZE last latencies + pub slowest: TimestampDuration, // slowest latency in the ROLLING_LATENCIES_SIZE last latencies } #[derive( @@ -2223,13 +2237,13 @@ pub struct LatencyStats { #[archive_attr(repr(C), derive(CheckBytes))] pub struct TransferStats { #[serde(with = "json_as_string")] - pub total: u64, // total amount transferred ever + pub total: ByteCount, // total amount transferred ever #[serde(with = "json_as_string")] - pub maximum: u64, // maximum rate over the ROLLING_TRANSFERS_SIZE last amounts + pub maximum: ByteCount, // maximum rate over the ROLLING_TRANSFERS_SIZE last amounts #[serde(with = "json_as_string")] - pub average: u64, // average rate over the ROLLING_TRANSFERS_SIZE last amounts + pub average: ByteCount, // average rate over the ROLLING_TRANSFERS_SIZE last amounts #[serde(with = "json_as_string")] - pub minimum: u64, // minimum rate over the ROLLING_TRANSFERS_SIZE last amounts + pub minimum: ByteCount, // minimum rate over the ROLLING_TRANSFERS_SIZE last amounts } #[derive( @@ -2268,11 +2282,11 @@ pub struct RPCStats { pub messages_rcvd: u32, // number of rpcs that have been received in the total_time range pub questions_in_flight: u32, // number of questions issued that have yet to be answered #[serde(with = "opt_json_as_string")] - pub last_question: Option, // when the peer was last questioned (either successfully or not) and we wanted an answer + pub last_question_ts: Option, // when the peer was last questioned (either successfully or not) and we wanted an answer #[serde(with = "opt_json_as_string")] - pub last_seen_ts: Option, // when the peer was last seen for any reason, including when we first attempted to reach out to it + pub last_seen_ts: Option, // when the peer was last seen for any reason, including when we first attempted to reach out to it #[serde(with = "opt_json_as_string")] - pub first_consecutive_seen_ts: Option, // the timestamp of the first consecutive proof-of-life for this node (an answer or received question) + pub first_consecutive_seen_ts: Option, // the timestamp of the first consecutive proof-of-life for this node (an answer or received question) pub recent_lost_answers: u32, // number of answers that have been lost since we lost reliability pub failed_to_send: u32, // number of messages that have failed to send since we last successfully sent one } @@ -2292,7 +2306,7 @@ pub struct RPCStats { #[archive_attr(repr(C), derive(CheckBytes))] pub struct PeerStats { #[serde(with = "json_as_string")] - pub time_added: u64, // when the peer was added to the routing table + pub time_added: Timestamp, // when the peer was added to the routing table pub rpc_stats: RPCStats, // information about RPCs pub latency: Option, // latencies for communications with the peer pub transfer: TransferStatsDownUp, // Stats for communications with the peer @@ -2362,8 +2376,6 @@ pub enum TunnelError { NoCapacity, // Endpoint is full } -pub type TunnelId = u64; - #[derive(Clone, Debug, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize)] #[archive_attr(repr(C), derive(CheckBytes))] pub struct TunnelEndpoint { @@ -2386,7 +2398,7 @@ impl Default for TunnelEndpoint { #[archive_attr(repr(C), derive(CheckBytes))] pub struct FullTunnel { pub id: TunnelId, - pub timeout: u64, + pub timeout: TimestampDuration, pub local: TunnelEndpoint, pub remote: TunnelEndpoint, } @@ -2397,6 +2409,6 @@ pub struct FullTunnel { #[archive_attr(repr(C), derive(CheckBytes))] pub struct PartialTunnel { pub id: TunnelId, - pub timeout: u64, + pub timeout: TimestampDuration, pub local: TunnelEndpoint, } diff --git a/veilid-flutter/example/web/index.html b/veilid-flutter/example/web/index.html index 128d80a0..373dae36 100644 --- a/veilid-flutter/example/web/index.html +++ b/veilid-flutter/example/web/index.html @@ -47,6 +47,7 @@