diff --git a/veilid-core/src/network_manager/direct_boot.rs b/veilid-core/src/network_manager/direct_boot.rs new file mode 100644 index 00000000..da33fc39 --- /dev/null +++ b/veilid-core/src/network_manager/direct_boot.rs @@ -0,0 +1,67 @@ +use super::*; + +impl NetworkManager { + // Direct bootstrap request handler (separate fallback mechanism from cheaper TXT bootstrap mechanism) + #[instrument(level = "trace", skip(self), ret, err)] + pub(crate) async fn handle_boot_request( + &self, + descriptor: ConnectionDescriptor, + ) -> EyreResult> { + let routing_table = self.routing_table(); + + // Get a bunch of nodes with the various + let bootstrap_nodes = routing_table.find_bootstrap_nodes_filtered(2); + + // Serialize out peer info + let bootstrap_peerinfo: Vec = bootstrap_nodes + .iter() + .filter_map(|nr| nr.make_peer_info(RoutingDomain::PublicInternet)) + .collect(); + let mut json_bytes = serialize_json(bootstrap_peerinfo).as_bytes().to_vec(); + + self.apply_network_key(&mut json_bytes); + + // Reply with a chunk of signed routing table + match self + .net() + .send_data_to_existing_connection(descriptor, json_bytes) + .await? + { + None => { + // Bootstrap reply was sent + Ok(NetworkResult::value(())) + } + Some(_) => Ok(NetworkResult::no_connection_other( + "bootstrap reply could not be sent", + )), + } + } + + // Direct bootstrap request + #[instrument(level = "trace", err, skip(self))] + pub async fn boot_request(&self, dial_info: DialInfo) -> EyreResult> { + let timeout_ms = self.with_config(|c| c.network.rpc.timeout_ms); + // Send boot magic to requested peer address + let mut data = BOOT_MAGIC.to_vec(); + + // Apply network key + self.apply_network_key(&mut data); + + let mut out_data: Vec = network_result_value_or_log!(self + .net() + .send_recv_data_unbound_to_dial_info(dial_info, data, timeout_ms) + .await? => + { + return Ok(Vec::new()); + }); + + // Apply network key + self.apply_network_key(&mut out_data); + + let bootstrap_peerinfo: Vec = + deserialize_json(std::str::from_utf8(&out_data).wrap_err("bad utf8 in boot peerinfo")?) + .wrap_err("failed to deserialize boot peerinfo")?; + + Ok(bootstrap_peerinfo) + } +} diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 9c64cf2e..f6f94ab1 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -5,6 +5,8 @@ mod native; #[cfg(target_arch = "wasm32")] mod wasm; +mod direct_boot; +mod send_data; mod connection_handle; mod connection_limits; mod connection_manager; @@ -12,6 +14,7 @@ mod connection_table; mod network_connection; mod tasks; mod types; +mod stats; pub mod tests; @@ -20,6 +23,9 @@ pub mod tests; pub use connection_manager::*; pub use network_connection::*; pub use types::*; +pub use send_data::*; +pub use direct_boot::*; +pub use stats::*; //////////////////////////////////////////////////////////////////////////////////////// use connection_handle::*; @@ -42,6 +48,7 @@ use wasm::*; pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE; pub const IPADDR_TABLE_SIZE: usize = 1024; pub const IPADDR_MAX_INACTIVE_DURATION_US: TimestampDuration = TimestampDuration::new(300_000_000u64); // 5 minutes +pub const NODE_CONTACT_METHOD_CACHE_SIZE: usize = 1024; pub const PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT: usize = 3; pub const PUBLIC_ADDRESS_CHECK_CACHE_SIZE: usize = 8; pub const PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS: u32 = 60; @@ -67,38 +74,6 @@ struct NetworkComponents { receipt_manager: ReceiptManager, } -// Statistics per address -#[derive(Clone, Default)] -pub struct PerAddressStats { - last_seen_ts: Timestamp, - transfer_stats_accounting: TransferStatsAccounting, - transfer_stats: TransferStatsDownUp, -} - -#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] -pub struct PerAddressStatsKey(IpAddr); - -impl Default for PerAddressStatsKey { - fn default() -> Self { - Self(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) - } -} - -// Statistics about the low-level network -#[derive(Clone)] -pub struct NetworkManagerStats { - self_stats: PerAddressStats, - per_address_stats: LruCache, -} - -impl Default for NetworkManagerStats { - fn default() -> Self { - Self { - self_stats: PerAddressStats::default(), - per_address_stats: LruCache::new(IPADDR_TABLE_SIZE), - } - } -} #[derive(Debug)] struct ClientWhitelistEntry { @@ -130,6 +105,12 @@ pub(crate) enum NodeContactMethod { /// Must use outbound relay to reach the node OutboundRelay(NodeRef), } +#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)] +struct NodeContactMethodCacheKey { + own_node_info_ts: Option, + target_node_info_ts: Timestamp, + target_node_ref_filter: Option, +} #[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)] struct PublicAddressCheckCacheKey(ProtocolType, AddressType); @@ -138,6 +119,7 @@ struct PublicAddressCheckCacheKey(ProtocolType, AddressType); struct NetworkManagerInner { stats: NetworkManagerStats, client_whitelist: LruCache, + node_contact_method_cache: LruCache, public_address_check_cache: BTreeMap>, public_address_inconsistencies_table: @@ -175,6 +157,7 @@ impl NetworkManager { NetworkManagerInner { stats: NetworkManagerStats::default(), client_whitelist: LruCache::new_unbounded(), + node_contact_method_cache: LruCache::new(NODE_CONTACT_METHOD_CACHE_SIZE), public_address_check_cache: BTreeMap::new(), public_address_inconsistencies_table: BTreeMap::new(), } @@ -889,463 +872,6 @@ impl NetworkManager { Ok(()) } - /// Send a reverse connection signal and wait for the return receipt over it - /// Then send the data across the new connection - /// Only usable for PublicInternet routing domain - #[instrument(level = "trace", skip(self, data), err)] - pub async fn do_reverse_connect( - &self, - relay_nr: NodeRef, - target_nr: NodeRef, - data: Vec, - ) -> EyreResult> { - // Build a return receipt for the signal - let receipt_timeout = ms_to_us( - self.unlocked_inner - .config - .get() - .network - .reverse_connection_receipt_time_ms, - ); - let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?; - - // Get target routing domain - let Some(routing_domain) = target_nr.best_routing_domain() else { - return Ok(NetworkResult::no_connection_other("No routing domain for target")); - }; - - // Get our peer info - let Some(peer_info) = self - .routing_table() - .get_own_peer_info(routing_domain) else { - return Ok(NetworkResult::no_connection_other("Own peer info not available")); - }; - - // Issue the signal - let rpc = self.rpc_processor(); - network_result_try!(rpc - .rpc_call_signal( - Destination::relay(relay_nr, target_nr.clone()), - SignalInfo::ReverseConnect { receipt, peer_info }, - ) - .await - .wrap_err("failed to send signal")?); - - // Wait for the return receipt - let inbound_nr = match eventual_value.await.take_value().unwrap() { - ReceiptEvent::ReturnedPrivate { private_route: _ } - | ReceiptEvent::ReturnedOutOfBand - | ReceiptEvent::ReturnedSafety => { - return Ok(NetworkResult::invalid_message( - "reverse connect receipt should be returned in-band", - )); - } - ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef, - ReceiptEvent::Expired => { - return Ok(NetworkResult::timeout()); - } - ReceiptEvent::Cancelled => { - return Ok(NetworkResult::no_connection_other(format!("reverse connect receipt cancelled from {}", target_nr))) - } - }; - - // We expect the inbound noderef to be the same as the target noderef - // if they aren't the same, we should error on this and figure out what then hell is up - if !target_nr.same_entry(&inbound_nr) { - bail!("unexpected noderef mismatch on reverse connect"); - } - - // And now use the existing connection to send over - if let Some(descriptor) = inbound_nr.last_connection() { - match self - .net() - .send_data_to_existing_connection(descriptor, data) - .await? - { - None => Ok(NetworkResult::value(descriptor)), - Some(_) => Ok(NetworkResult::no_connection_other( - "unable to send over reverse connection", - )), - } - } else { - bail!("no reverse connection available") - } - } - - /// Send a hole punch signal and do a negotiating ping and wait for the return receipt - /// Then send the data across the new connection - /// Only usable for PublicInternet routing domain - #[instrument(level = "trace", skip(self, data), err)] - pub async fn do_hole_punch( - &self, - relay_nr: NodeRef, - target_nr: NodeRef, - data: Vec, - ) -> EyreResult> { - // Ensure we are filtered down to UDP (the only hole punch protocol supported today) - assert!(target_nr - .filter_ref() - .map(|nrf| nrf.dial_info_filter.protocol_type_set - == ProtocolTypeSet::only(ProtocolType::UDP)) - .unwrap_or_default()); - - // Build a return receipt for the signal - let receipt_timeout = ms_to_us( - self.unlocked_inner - .config - .get() - .network - .hole_punch_receipt_time_ms, - ); - let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?; - - // Get target routing domain - let Some(routing_domain) = target_nr.best_routing_domain() else { - return Ok(NetworkResult::no_connection_other("No routing domain for target")); - }; - - // Get our peer info - let Some(peer_info) = self - .routing_table() - .get_own_peer_info(routing_domain) else { - return Ok(NetworkResult::no_connection_other("Own peer info not available")); - }; - - // Get the udp direct dialinfo for the hole punch - let hole_punch_did = target_nr - .first_filtered_dial_info_detail() - .ok_or_else(|| eyre!("No hole punch capable dialinfo found for node"))?; - - // Do our half of the hole punch by sending an empty packet - // Both sides will do this and then the receipt will get sent over the punched hole - // Don't bother storing the returned connection descriptor as the 'last connection' because the other side of the hole - // punch should come through and create a real 'last connection' for us if this succeeds - network_result_try!( - self.net() - .send_data_to_dial_info(hole_punch_did.dial_info, Vec::new()) - .await? - ); - - // Issue the signal - let rpc = self.rpc_processor(); - network_result_try!(rpc - .rpc_call_signal( - Destination::relay(relay_nr, target_nr.clone()), - SignalInfo::HolePunch { receipt, peer_info }, - ) - .await - .wrap_err("failed to send signal")?); - - // Wait for the return receipt - let inbound_nr = match eventual_value.await.take_value().unwrap() { - ReceiptEvent::ReturnedPrivate { private_route: _ } - | ReceiptEvent::ReturnedOutOfBand - | ReceiptEvent::ReturnedSafety => { - return Ok(NetworkResult::invalid_message( - "hole punch receipt should be returned in-band", - )); - } - ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef, - ReceiptEvent::Expired => { - return Ok(NetworkResult::timeout()); - } - ReceiptEvent::Cancelled => { - return Ok(NetworkResult::no_connection_other(format!("hole punch receipt cancelled from {}", target_nr))) - } - }; - - // We expect the inbound noderef to be the same as the target noderef - // if they aren't the same, we should error on this and figure out what then hell is up - if !target_nr.same_entry(&inbound_nr) { - bail!( - "unexpected noderef mismatch on hole punch {}, expected {}", - inbound_nr, - target_nr - ); - } - - // And now use the existing connection to send over - if let Some(descriptor) = inbound_nr.last_connection() { - match self - .net() - .send_data_to_existing_connection(descriptor, data) - .await? - { - None => Ok(NetworkResult::value(descriptor)), - Some(_) => Ok(NetworkResult::no_connection_other( - "unable to send over hole punch", - )), - } - } else { - bail!("no hole punch available") - } - } - - /// Figure out how to reach a node from our own node over the best routing domain and reference the nodes we want to access - /// Uses NodeRefs to ensure nodes are referenced, this is not a part of 'RoutingTable' because RoutingTable is not - /// allowed to use NodeRefs due to recursive locking - #[instrument(level = "trace", skip(self), ret)] - pub(crate) fn get_node_contact_method( - &self, - target_node_ref: NodeRef, - ) -> EyreResult { - let routing_table = self.routing_table(); - - // Figure out the best routing domain to get the contact method over - let routing_domain = match target_node_ref.best_routing_domain() { - Some(rd) => rd, - None => { - log_net!("no routing domain for node {:?}", target_node_ref); - return Ok(NodeContactMethod::Unreachable); - } - }; - - // Node A is our own node - // Use whatever node info we've calculated so far - let peer_a = routing_table.get_best_effort_own_peer_info(routing_domain); - - // Node B is the target node - let peer_b = match target_node_ref.make_peer_info(routing_domain) { - Some(ni) => ni, - None => { - log_net!("no node info for node {:?}", target_node_ref); - return Ok(NodeContactMethod::Unreachable); - } - }; - - // Dial info filter comes from the target node ref - let dial_info_filter = target_node_ref.dial_info_filter(); - let mut sequencing = target_node_ref.sequencing(); - - // If the node has had lost questions or failures to send, prefer sequencing - // to improve reliability. The node may be experiencing UDP fragmentation drops - // or other firewalling issues and may perform better with TCP. - let unreliable = target_node_ref.peer_stats().rpc_stats.failed_to_send > 2 || target_node_ref.peer_stats().rpc_stats.recent_lost_answers > 2; - if unreliable && sequencing < Sequencing::PreferOrdered { - log_net!(debug "Node contact failing over to Ordered for {}", target_node_ref.to_string().cyan()); - sequencing = Sequencing::PreferOrdered; - } - - // Get the best contact method with these parameters from the routing domain - let cm = routing_table.get_contact_method( - routing_domain, - &peer_a, - &peer_b, - dial_info_filter, - sequencing, - ); - - // Translate the raw contact method to a referenced contact method - Ok(match cm { - ContactMethod::Unreachable => NodeContactMethod::Unreachable, - ContactMethod::Existing => NodeContactMethod::Existing, - ContactMethod::Direct(di) => NodeContactMethod::Direct(di), - ContactMethod::SignalReverse(relay_key, target_key) => { - let relay_nr = routing_table - .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)? - .ok_or_else(|| eyre!("couldn't look up relay"))?; - if !target_node_ref.node_ids().contains(&target_key) { - bail!("signalreverse target noderef didn't match target key: {:?} != {} for relay {}", target_node_ref, target_key, relay_key ); - } - NodeContactMethod::SignalReverse(relay_nr, target_node_ref) - } - ContactMethod::SignalHolePunch(relay_key, target_key) => { - let relay_nr = routing_table - .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)? - .ok_or_else(|| eyre!("couldn't look up relay"))?; - if !target_node_ref.node_ids().contains(&target_key) { - bail!("signalholepunch target noderef didn't match target key: {:?} != {} for relay {}", target_node_ref, target_key, relay_key ); - } - // if any other protocol were possible here we could update this and do_hole_punch - // but tcp hole punch is very very unreliable it seems - let udp_target_node_ref = target_node_ref.filtered_clone(NodeRefFilter::new().with_protocol_type(ProtocolType::UDP)); - - NodeContactMethod::SignalHolePunch(relay_nr, udp_target_node_ref) - } - ContactMethod::InboundRelay(relay_key) => { - let relay_nr = routing_table - .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)? - .ok_or_else(|| eyre!("couldn't look up relay"))?; - NodeContactMethod::InboundRelay(relay_nr) - } - ContactMethod::OutboundRelay(relay_key) => { - let relay_nr = routing_table - .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)? - .ok_or_else(|| eyre!("couldn't look up relay"))?; - NodeContactMethod::OutboundRelay(relay_nr) - } - }) - } - - /// Send raw data to a node - /// - /// We may not have dial info for a node, but have an existing connection for it - /// because an inbound connection happened first, and no FindNodeQ has happened to that - /// node yet to discover its dial info. The existing connection should be tried first - /// in this case. - /// - /// Sending to a node requires determining a NetworkClass compatible mechanism - pub fn send_data( - &self, - node_ref: NodeRef, - data: Vec, - ) -> SendPinBoxFuture>> { - let this = self.clone(); - Box::pin( - async move { - // info!("{}", format!("send_data to: {:?}", node_ref).red()); - - // First try to send data to the last socket we've seen this peer on - let data = if let Some(connection_descriptor) = node_ref.last_connection() { - // info!( - // "{}", - // format!("last_connection to: {:?}", connection_descriptor).red() - // ); - - match this - .net() - .send_data_to_existing_connection(connection_descriptor, data) - .await? - { - None => { - // info!( - // "{}", - // format!("sent to existing connection: {:?}", connection_descriptor) - // .red() - // ); - - // Update timestamp for this last connection since we just sent to it - node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp()); - - return Ok(NetworkResult::value(SendDataKind::Existing( - connection_descriptor, - ))); - } - Some(d) => d, - } - } else { - data - }; - - // info!("{}", "no existing connection".red()); - - // If we don't have last_connection, try to reach out to the peer via its dial info - let contact_method = this.get_node_contact_method(node_ref.clone())?; - log_net!( - "send_data via {:?} to dialinfo {:?}", - contact_method, - node_ref - ); - match contact_method { - NodeContactMethod::OutboundRelay(relay_nr) - | NodeContactMethod::InboundRelay(relay_nr) => { - network_result_try!(this.send_data(relay_nr, data).await?); - Ok(NetworkResult::value(SendDataKind::Indirect)) - } - NodeContactMethod::Direct(dial_info) => { - let connection_descriptor = network_result_try!( - this.net().send_data_to_dial_info(dial_info, data).await? - ); - // If we connected to this node directly, save off the last connection so we can use it again - node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp()); - - Ok(NetworkResult::value(SendDataKind::Direct( - connection_descriptor, - ))) - } - NodeContactMethod::SignalReverse(relay_nr, target_node_ref) => { - let connection_descriptor = network_result_try!( - this.do_reverse_connect(relay_nr, target_node_ref, data) - .await? - ); - Ok(NetworkResult::value(SendDataKind::Direct( - connection_descriptor, - ))) - } - NodeContactMethod::SignalHolePunch(relay_nr, target_node_ref) => { - let connection_descriptor = network_result_try!( - this.do_hole_punch(relay_nr, target_node_ref, data).await? - ); - Ok(NetworkResult::value(SendDataKind::Direct( - connection_descriptor, - ))) - } - NodeContactMethod::Existing => Ok(NetworkResult::no_connection_other( - "should have found an existing connection", - )), - NodeContactMethod::Unreachable => Ok(NetworkResult::no_connection_other( - "Can't send to this node", - )), - } - } - .instrument(trace_span!("send_data")), - ) - } - - // Direct bootstrap request handler (separate fallback mechanism from cheaper TXT bootstrap mechanism) - #[instrument(level = "trace", skip(self), ret, err)] - async fn handle_boot_request( - &self, - descriptor: ConnectionDescriptor, - ) -> EyreResult> { - let routing_table = self.routing_table(); - - // Get a bunch of nodes with the various - let bootstrap_nodes = routing_table.find_bootstrap_nodes_filtered(2); - - // Serialize out peer info - let bootstrap_peerinfo: Vec = bootstrap_nodes - .iter() - .filter_map(|nr| nr.make_peer_info(RoutingDomain::PublicInternet)) - .collect(); - let mut json_bytes = serialize_json(bootstrap_peerinfo).as_bytes().to_vec(); - - self.apply_network_key(&mut json_bytes); - - // Reply with a chunk of signed routing table - match self - .net() - .send_data_to_existing_connection(descriptor, json_bytes) - .await? - { - None => { - // Bootstrap reply was sent - Ok(NetworkResult::value(())) - } - Some(_) => Ok(NetworkResult::no_connection_other( - "bootstrap reply could not be sent", - )), - } - } - - // Direct bootstrap request - #[instrument(level = "trace", err, skip(self))] - pub async fn boot_request(&self, dial_info: DialInfo) -> EyreResult> { - let timeout_ms = self.with_config(|c| c.network.rpc.timeout_ms); - // Send boot magic to requested peer address - let mut data = BOOT_MAGIC.to_vec(); - - // Apply network key - self.apply_network_key(&mut data); - - let mut out_data: Vec = network_result_value_or_log!(self - .net() - .send_recv_data_unbound_to_dial_info(dial_info, data, timeout_ms) - .await? => - { - return Ok(Vec::new()); - }); - - // Apply network key - self.apply_network_key(&mut out_data); - - let bootstrap_peerinfo: Vec = - deserialize_json(std::str::from_utf8(&out_data).wrap_err("bad utf8 in boot peerinfo")?) - .wrap_err("failed to deserialize boot peerinfo")?; - - Ok(bootstrap_peerinfo) - } - // Network isolation encryption fn apply_network_key(&self, data: &mut [u8]) { if let Some(network_key) = self.unlocked_inner.network_key { @@ -1576,108 +1102,6 @@ impl NetworkManager { Ok(true) } - // Callbacks from low level network for statistics gathering - pub fn stats_packet_sent(&self, addr: IpAddr, bytes: ByteCount) { - let inner = &mut *self.inner.lock(); - inner - .stats - .self_stats - .transfer_stats_accounting - .add_up(bytes); - inner - .stats - .per_address_stats - .entry(PerAddressStatsKey(addr), |_k,_v| { - // do nothing on LRU evict - }) - .or_insert(PerAddressStats::default()) - .transfer_stats_accounting - .add_up(bytes); - } - - pub fn stats_packet_rcvd(&self, addr: IpAddr, bytes: ByteCount) { - let inner = &mut *self.inner.lock(); - inner - .stats - .self_stats - .transfer_stats_accounting - .add_down(bytes); - inner - .stats - .per_address_stats - .entry(PerAddressStatsKey(addr), |_k,_v| { - // do nothing on LRU evict - }) - .or_insert(PerAddressStats::default()) - .transfer_stats_accounting - .add_down(bytes); - } - - // Get stats - pub fn get_stats(&self) -> NetworkManagerStats { - let inner = self.inner.lock(); - inner.stats.clone() - } - - pub fn get_veilid_state(&self) -> VeilidStateNetwork { - let has_state = self - .unlocked_inner - .components - .read() - .as_ref() - .map(|c| c.net.is_started()) - .unwrap_or(false); - - if !has_state { - return VeilidStateNetwork { - started: false, - bps_down: 0.into(), - bps_up: 0.into(), - peers: Vec::new(), - - }; - } - let routing_table = self.routing_table(); - - let (bps_down, bps_up) = { - let inner = self.inner.lock(); - ( - inner.stats.self_stats.transfer_stats.down.average, - inner.stats.self_stats.transfer_stats.up.average, - ) - }; - - VeilidStateNetwork { - started: true, - bps_down, - bps_up, - peers: { - let mut out = Vec::new(); - for (k, v) in routing_table.get_recent_peers() { - if let Ok(Some(nr)) = routing_table.lookup_node_ref(k) { - let peer_stats = nr.peer_stats(); - let peer = PeerTableData { - node_ids: nr.node_ids().iter().copied().collect(), - peer_address: v.last_connection.remote().to_string(), - peer_stats, - }; - out.push(peer); - } - } - out - }, - } - } - - fn send_network_update(&self) { - let update_cb = self.unlocked_inner.update_callback.read().clone(); - if update_cb.is_none() { - return; - } - let state = self.get_veilid_state(); - (update_cb.unwrap())(VeilidUpdate::Network(state)); - } - // Determine if a local IP address has changed // this means we should restart the low level network and and recreate all of our dial info // Wait until we have received confirmation from N different peers diff --git a/veilid-core/src/network_manager/send_data.rs b/veilid-core/src/network_manager/send_data.rs new file mode 100644 index 00000000..02de25c5 --- /dev/null +++ b/veilid-core/src/network_manager/send_data.rs @@ -0,0 +1,553 @@ +use super::*; + +impl NetworkManager { + /// Send raw data to a node + /// + /// We may not have dial info for a node, but have an existing connection for it + /// because an inbound connection happened first, and no FindNodeQ has happened to that + /// node yet to discover its dial info. The existing connection should be tried first + /// in this case, if it matches the node ref's filters and no more permissive connection + /// could be established. + /// + /// Sending to a node requires determining a NetworkClass compatible mechanism + pub fn send_data( + &self, + target_node_ref: NodeRef, + data: Vec, + ) -> SendPinBoxFuture>> { + let this = self.clone(); + Box::pin( + async move { + // Get the best way to contact this node + let contact_method = this.get_node_contact_method(target_node_ref.clone())?; + + // If we need to relay, do it + let (contact_method, node_ref, relayed) = match contact_method { + NodeContactMethod::OutboundRelay(relay_nr) + | NodeContactMethod::InboundRelay(relay_nr) => { + let cm = this.get_node_contact_method(relay_nr.clone())?; + (cm, relay_nr, true) + } + cm => (cm, target_node_ref.clone(), false), + }; + + // Try the contact method + let sdk = match contact_method { + NodeContactMethod::OutboundRelay(relay_nr) + | NodeContactMethod::InboundRelay(relay_nr) => { + // Relay loop or multiple relays + bail!( + "Relay loop or multiple relays detected: {} -> {} -> {}", + target_node_ref, + node_ref, + relay_nr + ); + } + NodeContactMethod::Direct(dial_info) => { + network_result_try!( + this.send_data_ncm_direct(node_ref, dial_info, data).await? + ) + } + NodeContactMethod::SignalReverse(relay_nr, target_node_ref) => { + network_result_try!( + this.send_data_ncm_signal_reverse(relay_nr, target_node_ref, data) + .await? + ) + } + NodeContactMethod::SignalHolePunch(relay_nr, target_node_ref) => { + network_result_try!( + this.send_data_ncm_signal_hole_punch(relay_nr, target_node_ref, data) + .await? + ) + } + NodeContactMethod::Existing => { + network_result_try!( + this.send_data_ncm_existing(target_node_ref, data).await? + ) + } + NodeContactMethod::Unreachable => { + return Ok(NetworkResult::no_connection_other( + "Can't send to this node", + )); + } + }; + + if relayed { + return Ok(NetworkResult::value(SendDataKind::Indirect)); + } + Ok(NetworkResult::value(sdk)) + } + .instrument(trace_span!("send_data")), + ) + } + + /// Send data using NodeContactMethod::Existing + async fn send_data_ncm_existing( + &self, + target_node_ref: NodeRef, + data: Vec, + ) -> EyreResult> { + // First try to send data to the last socket we've seen this peer on + let Some(connection_descriptor) = target_node_ref.last_connection() else { + return Ok(NetworkResult::no_connection_other( + "should have found an existing connection", + )); + }; + + if self + .net() + .send_data_to_existing_connection(connection_descriptor, data) + .await? + .is_some() + { + return Ok(NetworkResult::no_connection_other( + "failed to send to existing connection", + )); + } + + // Update timestamp for this last connection since we just sent to it + target_node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp()); + + Ok(NetworkResult::value(SendDataKind::Existing( + connection_descriptor, + ))) + } + + /// Send data using NodeContactMethod::SignalReverse + async fn send_data_ncm_signal_reverse( + &self, + relay_nr: NodeRef, + target_node_ref: NodeRef, + data: Vec, + ) -> EyreResult> { + // First try to send data to the last socket we've seen this peer on + let data = if let Some(connection_descriptor) = target_node_ref.last_connection() { + match self + .net() + .send_data_to_existing_connection(connection_descriptor, data) + .await? + { + None => { + // Update timestamp for this last connection since we just sent to it + target_node_ref + .set_last_connection(connection_descriptor, get_aligned_timestamp()); + + return Ok(NetworkResult::value(SendDataKind::Existing( + connection_descriptor, + ))); + } + Some(data) => { + // Couldn't send data to existing connection + // so pass the data back out + data + } + } + } else { + // No last connection + data + }; + + let connection_descriptor = network_result_try!( + self.do_reverse_connect(relay_nr, target_node_ref, data) + .await? + ); + Ok(NetworkResult::value(SendDataKind::Direct( + connection_descriptor, + ))) + } + + /// Send data using NodeContactMethod::SignalHolePunch + async fn send_data_ncm_signal_hole_punch( + &self, + relay_nr: NodeRef, + target_node_ref: NodeRef, + data: Vec, + ) -> EyreResult> { + // First try to send data to the last socket we've seen this peer on + let data = if let Some(connection_descriptor) = target_node_ref.last_connection() { + match self + .net() + .send_data_to_existing_connection(connection_descriptor, data) + .await? + { + None => { + // Update timestamp for this last connection since we just sent to it + target_node_ref + .set_last_connection(connection_descriptor, get_aligned_timestamp()); + + return Ok(NetworkResult::value(SendDataKind::Existing( + connection_descriptor, + ))); + } + Some(data) => { + // Couldn't send data to existing connection + // so pass the data back out + data + } + } + } else { + // No last connection + data + }; + + let connection_descriptor = + network_result_try!(self.do_hole_punch(relay_nr, target_node_ref, data).await?); + Ok(NetworkResult::value(SendDataKind::Direct( + connection_descriptor, + ))) + } + + /// Send data using NodeContactMethod::Direct + async fn send_data_ncm_direct( + &self, + node_ref: NodeRef, + dial_info: DialInfo, + data: Vec, + ) -> EyreResult> { + // Since we have the best dial info already, we can find a connection to use by protocol type + let node_ref = node_ref.filtered_clone(NodeRefFilter::from(dial_info.make_filter())); + + // First try to send data to the last socket we've seen this peer on + let data = if let Some(connection_descriptor) = node_ref.last_connection() { + match self + .net() + .send_data_to_existing_connection(connection_descriptor, data) + .await? + { + None => { + // Update timestamp for this last connection since we just sent to it + node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp()); + + return Ok(NetworkResult::value(SendDataKind::Existing( + connection_descriptor, + ))); + } + Some(d) => d, + } + } else { + data + }; + + // New direct connection was necessary for this dial info + let connection_descriptor = + network_result_try!(self.net().send_data_to_dial_info(dial_info, data).await?); + + // If we connected to this node directly, save off the last connection so we can use it again + node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp()); + + Ok(NetworkResult::value(SendDataKind::Direct( + connection_descriptor, + ))) + } + + /// Figure out how to reach a node from our own node over the best routing domain and reference the nodes we want to access + /// Uses NodeRefs to ensure nodes are referenced, this is not a part of 'RoutingTable' because RoutingTable is not + /// allowed to use NodeRefs due to recursive locking + pub(crate) fn get_node_contact_method( + &self, + target_node_ref: NodeRef, + ) -> EyreResult { + let routing_table = self.routing_table(); + + // Figure out the best routing domain to get the contact method over + let routing_domain = match target_node_ref.best_routing_domain() { + Some(rd) => rd, + None => { + log_net!("no routing domain for node {:?}", target_node_ref); + return Ok(NodeContactMethod::Unreachable); + } + }; + + // Get cache key + let ncm_key = NodeContactMethodCacheKey { + own_node_info_ts: routing_table.get_own_node_info_ts(routing_domain), + target_node_info_ts: target_node_ref.node_info_ts(routing_domain), + target_node_ref_filter: target_node_ref.filter_ref().cloned(), + }; + if let Some(ncm) = self.inner.lock().node_contact_method_cache.get(&ncm_key) { + return Ok(ncm.clone()); + } + + // Node A is our own node + // Use whatever node info we've calculated so far + let peer_a = routing_table.get_best_effort_own_peer_info(routing_domain); + + // Node B is the target node + let peer_b = match target_node_ref.make_peer_info(routing_domain) { + Some(ni) => ni, + None => { + log_net!("no node info for node {:?}", target_node_ref); + return Ok(NodeContactMethod::Unreachable); + } + }; + + // Dial info filter comes from the target node ref + let dial_info_filter = target_node_ref.dial_info_filter(); + let sequencing = target_node_ref.sequencing(); + + // If the node has had lost questions or failures to send, prefer sequencing + // to improve reliability. The node may be experiencing UDP fragmentation drops + // or other firewalling issues and may perform better with TCP. + // let unreliable = target_node_ref.peer_stats().rpc_stats.failed_to_send > 2 || target_node_ref.peer_stats().rpc_stats.recent_lost_answers > 2; + // if unreliable && sequencing < Sequencing::PreferOrdered { + // log_net!(debug "Node contact failing over to Ordered for {}", target_node_ref.to_string().cyan()); + // sequencing = Sequencing::PreferOrdered; + // } + + // Get the best contact method with these parameters from the routing domain + let cm = routing_table.get_contact_method( + routing_domain, + &peer_a, + &peer_b, + dial_info_filter, + sequencing, + ); + + // Translate the raw contact method to a referenced contact method + let ncm = match cm { + ContactMethod::Unreachable => NodeContactMethod::Unreachable, + ContactMethod::Existing => NodeContactMethod::Existing, + ContactMethod::Direct(di) => NodeContactMethod::Direct(di), + ContactMethod::SignalReverse(relay_key, target_key) => { + let relay_nr = routing_table + .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)? + .ok_or_else(|| eyre!("couldn't look up relay"))?; + if !target_node_ref.node_ids().contains(&target_key) { + bail!("signalreverse target noderef didn't match target key: {:?} != {} for relay {}", target_node_ref, target_key, relay_key ); + } + NodeContactMethod::SignalReverse(relay_nr, target_node_ref) + } + ContactMethod::SignalHolePunch(relay_key, target_key) => { + let relay_nr = routing_table + .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)? + .ok_or_else(|| eyre!("couldn't look up relay"))?; + if !target_node_ref.node_ids().contains(&target_key) { + bail!("signalholepunch target noderef didn't match target key: {:?} != {} for relay {}", target_node_ref, target_key, relay_key ); + } + // if any other protocol were possible here we could update this and do_hole_punch + // but tcp hole punch is very very unreliable it seems + let udp_target_node_ref = target_node_ref + .filtered_clone(NodeRefFilter::new().with_protocol_type(ProtocolType::UDP)); + + NodeContactMethod::SignalHolePunch(relay_nr, udp_target_node_ref) + } + ContactMethod::InboundRelay(relay_key) => { + let relay_nr = routing_table + .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)? + .ok_or_else(|| eyre!("couldn't look up relay"))?; + NodeContactMethod::InboundRelay(relay_nr) + } + ContactMethod::OutboundRelay(relay_key) => { + let relay_nr = routing_table + .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)? + .ok_or_else(|| eyre!("couldn't look up relay"))?; + NodeContactMethod::OutboundRelay(relay_nr) + } + }; + + // Cache this + self.inner + .lock() + .node_contact_method_cache + .insert(ncm_key, ncm.clone(), |_, _| {}); + Ok(ncm) + } + + /// Send a reverse connection signal and wait for the return receipt over it + /// Then send the data across the new connection + /// Only usable for PublicInternet routing domain + #[instrument(level = "trace", skip(self, data), err)] + async fn do_reverse_connect( + &self, + relay_nr: NodeRef, + target_nr: NodeRef, + data: Vec, + ) -> EyreResult> { + // Build a return receipt for the signal + let receipt_timeout = ms_to_us( + self.unlocked_inner + .config + .get() + .network + .reverse_connection_receipt_time_ms, + ); + let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?; + + // Get target routing domain + let Some(routing_domain) = target_nr.best_routing_domain() else { + return Ok(NetworkResult::no_connection_other("No routing domain for target")); + }; + + // Get our peer info + let Some(peer_info) = self + .routing_table() + .get_own_peer_info(routing_domain) else { + return Ok(NetworkResult::no_connection_other("Own peer info not available")); + }; + + // Issue the signal + let rpc = self.rpc_processor(); + network_result_try!(rpc + .rpc_call_signal( + Destination::relay(relay_nr, target_nr.clone()), + SignalInfo::ReverseConnect { receipt, peer_info }, + ) + .await + .wrap_err("failed to send signal")?); + + // Wait for the return receipt + let inbound_nr = match eventual_value.await.take_value().unwrap() { + ReceiptEvent::ReturnedPrivate { private_route: _ } + | ReceiptEvent::ReturnedOutOfBand + | ReceiptEvent::ReturnedSafety => { + return Ok(NetworkResult::invalid_message( + "reverse connect receipt should be returned in-band", + )); + } + ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef, + ReceiptEvent::Expired => { + return Ok(NetworkResult::timeout()); + } + ReceiptEvent::Cancelled => { + return Ok(NetworkResult::no_connection_other(format!( + "reverse connect receipt cancelled from {}", + target_nr + ))) + } + }; + + // We expect the inbound noderef to be the same as the target noderef + // if they aren't the same, we should error on this and figure out what then hell is up + if !target_nr.same_entry(&inbound_nr) { + bail!("unexpected noderef mismatch on reverse connect"); + } + + // And now use the existing connection to send over + if let Some(descriptor) = inbound_nr.last_connection() { + match self + .net() + .send_data_to_existing_connection(descriptor, data) + .await? + { + None => Ok(NetworkResult::value(descriptor)), + Some(_) => Ok(NetworkResult::no_connection_other( + "unable to send over reverse connection", + )), + } + } else { + bail!("no reverse connection available") + } + } + + /// Send a hole punch signal and do a negotiating ping and wait for the return receipt + /// Then send the data across the new connection + /// Only usable for PublicInternet routing domain + #[instrument(level = "trace", skip(self, data), err)] + async fn do_hole_punch( + &self, + relay_nr: NodeRef, + target_nr: NodeRef, + data: Vec, + ) -> EyreResult> { + // Ensure we are filtered down to UDP (the only hole punch protocol supported today) + assert!(target_nr + .filter_ref() + .map(|nrf| nrf.dial_info_filter.protocol_type_set + == ProtocolTypeSet::only(ProtocolType::UDP)) + .unwrap_or_default()); + + // Build a return receipt for the signal + let receipt_timeout = ms_to_us( + self.unlocked_inner + .config + .get() + .network + .hole_punch_receipt_time_ms, + ); + let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?; + + // Get target routing domain + let Some(routing_domain) = target_nr.best_routing_domain() else { + return Ok(NetworkResult::no_connection_other("No routing domain for target")); + }; + + // Get our peer info + let Some(peer_info) = self + .routing_table() + .get_own_peer_info(routing_domain) else { + return Ok(NetworkResult::no_connection_other("Own peer info not available")); + }; + + // Get the udp direct dialinfo for the hole punch + let hole_punch_did = target_nr + .first_filtered_dial_info_detail() + .ok_or_else(|| eyre!("No hole punch capable dialinfo found for node"))?; + + // Do our half of the hole punch by sending an empty packet + // Both sides will do this and then the receipt will get sent over the punched hole + // Don't bother storing the returned connection descriptor as the 'last connection' because the other side of the hole + // punch should come through and create a real 'last connection' for us if this succeeds + network_result_try!( + self.net() + .send_data_to_dial_info(hole_punch_did.dial_info, Vec::new()) + .await? + ); + + // Issue the signal + let rpc = self.rpc_processor(); + network_result_try!(rpc + .rpc_call_signal( + Destination::relay(relay_nr, target_nr.clone()), + SignalInfo::HolePunch { receipt, peer_info }, + ) + .await + .wrap_err("failed to send signal")?); + + // Wait for the return receipt + let inbound_nr = match eventual_value.await.take_value().unwrap() { + ReceiptEvent::ReturnedPrivate { private_route: _ } + | ReceiptEvent::ReturnedOutOfBand + | ReceiptEvent::ReturnedSafety => { + return Ok(NetworkResult::invalid_message( + "hole punch receipt should be returned in-band", + )); + } + ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef, + ReceiptEvent::Expired => { + return Ok(NetworkResult::timeout()); + } + ReceiptEvent::Cancelled => { + return Ok(NetworkResult::no_connection_other(format!( + "hole punch receipt cancelled from {}", + target_nr + ))) + } + }; + + // We expect the inbound noderef to be the same as the target noderef + // if they aren't the same, we should error on this and figure out what then hell is up + if !target_nr.same_entry(&inbound_nr) { + bail!( + "unexpected noderef mismatch on hole punch {}, expected {}", + inbound_nr, + target_nr + ); + } + + // And now use the existing connection to send over + if let Some(descriptor) = inbound_nr.last_connection() { + match self + .net() + .send_data_to_existing_connection(descriptor, data) + .await? + { + None => Ok(NetworkResult::value(descriptor)), + Some(_) => Ok(NetworkResult::no_connection_other( + "unable to send over hole punch", + )), + } + } else { + bail!("no hole punch available") + } + } +} diff --git a/veilid-core/src/network_manager/stats.rs b/veilid-core/src/network_manager/stats.rs new file mode 100644 index 00000000..d5660470 --- /dev/null +++ b/veilid-core/src/network_manager/stats.rs @@ -0,0 +1,137 @@ +use super::*; + +// Statistics per address +#[derive(Clone, Default)] +pub struct PerAddressStats { + pub last_seen_ts: Timestamp, + pub transfer_stats_accounting: TransferStatsAccounting, + pub transfer_stats: TransferStatsDownUp, +} + +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] +pub struct PerAddressStatsKey(IpAddr); + +impl Default for PerAddressStatsKey { + fn default() -> Self { + Self(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) + } +} + +// Statistics about the low-level network +#[derive(Clone)] +pub struct NetworkManagerStats { + pub self_stats: PerAddressStats, + pub per_address_stats: LruCache, +} + +impl Default for NetworkManagerStats { + fn default() -> Self { + Self { + self_stats: PerAddressStats::default(), + per_address_stats: LruCache::new(IPADDR_TABLE_SIZE), + } + } +} + +impl NetworkManager { + // Callbacks from low level network for statistics gathering + pub fn stats_packet_sent(&self, addr: IpAddr, bytes: ByteCount) { + let inner = &mut *self.inner.lock(); + inner + .stats + .self_stats + .transfer_stats_accounting + .add_up(bytes); + inner + .stats + .per_address_stats + .entry(PerAddressStatsKey(addr), |_k, _v| { + // do nothing on LRU evict + }) + .or_insert(PerAddressStats::default()) + .transfer_stats_accounting + .add_up(bytes); + } + + pub fn stats_packet_rcvd(&self, addr: IpAddr, bytes: ByteCount) { + let inner = &mut *self.inner.lock(); + inner + .stats + .self_stats + .transfer_stats_accounting + .add_down(bytes); + inner + .stats + .per_address_stats + .entry(PerAddressStatsKey(addr), |_k, _v| { + // do nothing on LRU evict + }) + .or_insert(PerAddressStats::default()) + .transfer_stats_accounting + .add_down(bytes); + } + + // Get stats + pub fn get_stats(&self) -> NetworkManagerStats { + let inner = self.inner.lock(); + inner.stats.clone() + } + + pub fn get_veilid_state(&self) -> VeilidStateNetwork { + let has_state = self + .unlocked_inner + .components + .read() + .as_ref() + .map(|c| c.net.is_started()) + .unwrap_or(false); + + if !has_state { + return VeilidStateNetwork { + started: false, + bps_down: 0.into(), + bps_up: 0.into(), + peers: Vec::new(), + }; + } + let routing_table = self.routing_table(); + + let (bps_down, bps_up) = { + let inner = self.inner.lock(); + ( + inner.stats.self_stats.transfer_stats.down.average, + inner.stats.self_stats.transfer_stats.up.average, + ) + }; + + VeilidStateNetwork { + started: true, + bps_down, + bps_up, + peers: { + let mut out = Vec::new(); + for (k, v) in routing_table.get_recent_peers() { + if let Ok(Some(nr)) = routing_table.lookup_node_ref(k) { + let peer_stats = nr.peer_stats(); + let peer = PeerTableData { + node_ids: nr.node_ids().iter().copied().collect(), + peer_address: v.last_connection.remote().to_string(), + peer_stats, + }; + out.push(peer); + } + } + out + }, + } + } + + pub(super) fn send_network_update(&self) { + let update_cb = self.unlocked_inner.update_callback.read().clone(); + if update_cb.is_none() { + return; + } + let state = self.get_veilid_state(); + (update_cb.unwrap())(VeilidUpdate::Network(state)); + } +} diff --git a/veilid-core/src/network_manager/types/dial_info_filter.rs b/veilid-core/src/network_manager/types/dial_info_filter.rs index a6867789..c51a5acb 100644 --- a/veilid-core/src/network_manager/types/dial_info_filter.rs +++ b/veilid-core/src/network_manager/types/dial_info_filter.rs @@ -7,6 +7,7 @@ use super::*; Eq, PartialOrd, Ord, + Hash, Serialize, Deserialize, RkyvArchive, @@ -61,7 +62,7 @@ impl DialInfoFilter { pub fn is_dead(&self) -> bool { self.protocol_type_set.is_empty() || self.address_type_set.is_empty() } - pub fn with_sequencing(mut self, sequencing: Sequencing) -> (bool, DialInfoFilter) { + pub fn with_sequencing(self, sequencing: Sequencing) -> (bool, DialInfoFilter) { // Get first filtered dialinfo match sequencing { Sequencing::NoPreference => (false, self), diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 4315a176..909f9394 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -317,7 +317,6 @@ impl BucketEntryInner { rti, true, NodeRefFilter::from(routing_domain), - false, ); !last_connections.is_empty() } @@ -372,7 +371,6 @@ impl BucketEntryInner { rti, true, NodeRefFilter::from(routing_domain_set), - false ); for lc in last_connections { if let Some(rd) = @@ -415,7 +413,6 @@ impl BucketEntryInner { rti: &RoutingTableInner, only_live: bool, filter: NodeRefFilter, - ordered: bool, ) -> Vec<(ConnectionDescriptor, Timestamp)> { let connection_manager = rti.unlocked_inner.network_manager.connection_manager(); @@ -461,14 +458,8 @@ impl BucketEntryInner { } }) .collect(); - // Sort with ordering preference first and then sort with newest timestamps + // Sort with newest timestamps out.sort_by(|a, b| { - if ordered { - let s = ProtocolType::ordered_sequencing_sort(a.0.protocol_type(), b.0.protocol_type()); - if s != core::cmp::Ordering::Equal { - return s; - } - } b.1.cmp(&a.1) }); out diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 2044ee0a..09c968a1 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -277,15 +277,22 @@ pub trait NodeRefBase: Sized { out } + /// Get the most recent 'last connection' to this node + /// Filtered first and then sorted by ordering preference and then by most recent fn last_connection(&self) -> Option { - // Get the last connections and the last time we saw anything with this connection - // Filtered first and then sorted by sequencing and then by most recent self.operate(|rti, e| { // apply sequencing to filter and get sort let sequencing = self.common().sequencing; let filter = self.common().filter.clone().unwrap_or_default(); let (ordered, filter) = filter.with_sequencing(sequencing); - let last_connections = e.last_connections(rti, true, filter, ordered); + let mut last_connections = e.last_connections(rti, true, filter); + + if ordered { + last_connections.sort_by(|a, b| { + ProtocolType::ordered_sequencing_sort(a.0.protocol_type(), b.0.protocol_type()) + }); + } + last_connections.first().map(|x| x.0) }) } diff --git a/veilid-core/src/routing_table/node_ref_filter.rs b/veilid-core/src/routing_table/node_ref_filter.rs index 9c0936c5..9c4d44cb 100644 --- a/veilid-core/src/routing_table/node_ref_filter.rs +++ b/veilid-core/src/routing_table/node_ref_filter.rs @@ -1,6 +1,6 @@ use super::*; -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct NodeRefFilter { pub routing_domain_set: RoutingDomainSet, pub dial_info_filter: DialInfoFilter, diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 1c49ec4a..d57cfc00 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -529,6 +529,7 @@ impl RPCProcessor { async fn wait_for_reply( &self, waitable_reply: WaitableReply, + debug_string: String, ) -> Result, RPCError> { let out = self .unlocked_inner @@ -536,7 +537,20 @@ impl RPCProcessor { .wait_for_op(waitable_reply.handle, waitable_reply.timeout_us) .await; match &out { - Err(_) | Ok(TimeoutOr::Timeout) => { + Err(e) => { + let msg = format!("RPC Lost ({}): {}", debug_string, e); + log_rpc!(debug "{}", msg.bright_magenta()); + self.record_question_lost( + waitable_reply.send_ts, + waitable_reply.node_ref.clone(), + waitable_reply.safety_route, + waitable_reply.remote_private_route, + waitable_reply.reply_private_route, + ); + } + Ok(TimeoutOr::Timeout) => { + let msg = format!("RPC Lost ({}): Timeout", debug_string); + log_rpc!(debug "{}", msg.bright_cyan()); self.record_question_lost( waitable_reply.send_ts, waitable_reply.node_ref.clone(), @@ -878,8 +892,6 @@ impl RPCProcessor { ) { // Record for node if this was not sent via a route if safety_route.is_none() && remote_private_route.is_none() { - log_rpc!(debug "RPC Question Lost: {:?}", node_ref); - node_ref.stats_question_lost(); // Also clear the last_connections for the entry so we make a new connection next time diff --git a/veilid-core/src/rpc_processor/rpc_app_call.rs b/veilid-core/src/rpc_processor/rpc_app_call.rs index 6e0a3e43..c19b27dd 100644 --- a/veilid-core/src/rpc_processor/rpc_app_call.rs +++ b/veilid-core/src/rpc_processor/rpc_app_call.rs @@ -9,6 +9,8 @@ impl RPCProcessor { dest: Destination, message: Vec, ) -> Result>>, RPCError> { + let debug_string = format!("AppCall(message(len)={}) => {}", message.len(), dest); + let app_call_q = RPCOperationAppCallQ::new(message)?; let question = RPCQuestion::new( network_result_try!(self.get_destination_respond_to(&dest)?), @@ -19,7 +21,7 @@ impl RPCProcessor { let waitable_reply = network_result_try!(self.question(dest, question, None).await?); // Wait for reply - let (msg, latency) = match self.wait_for_reply(waitable_reply).await? { + let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), TimeoutOr::Value(v) => v, }; diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index 8b7ded6b..fadbefd2 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -32,11 +32,13 @@ impl RPCProcessor { find_node_q_detail, ); + let debug_string = format!("FindNode(node_id={}) => {}", node_id, dest); + // Send the find_node request let waitable_reply = network_result_try!(self.question(dest, find_node_q, None).await?); // Wait for reply - let (msg, latency) = match self.wait_for_reply(waitable_reply).await? { + let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), TimeoutOr::Value(v) => v, }; diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index cd82f067..611f0e6c 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -39,6 +39,18 @@ impl RPCProcessor { return Err(RPCError::internal("No node id for crypto kind")); }; + let debug_string = format!( + "GetValue(key={} subkey={} last_descriptor={}) => {}", + key, + subkey, + if last_descriptor.is_some() { + "Some" + } else { + "None" + }, + dest + ); + // Send the getvalue question let get_value_q = RPCOperationGetValueQ::new(key, subkey, last_descriptor.is_none()); let question = RPCQuestion::new( @@ -58,7 +70,7 @@ impl RPCProcessor { ); // Wait for reply - let (msg, latency) = match self.wait_for_reply(waitable_reply).await? { + let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), TimeoutOr::Value(v) => v, }; diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index e36852af..17c2ab87 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -39,6 +39,16 @@ impl RPCProcessor { return Err(RPCError::internal("No node id for crypto kind")); }; + let debug_string = format!( + "SetValue(key={} subkey={} value_data(writer)={} value_data(len)={} send_descriptor={}) => {}", + key, + subkey, + value.value_data().writer(), + value.value_data().data().len(), + send_descriptor, + dest + ); + // Send the setvalue question let set_value_q = RPCOperationSetValueQ::new( key, @@ -59,13 +69,14 @@ impl RPCProcessor { subkey, vcrypto: vcrypto.clone(), }); + let waitable_reply = network_result_try!( self.question(dest, question, Some(question_context)) .await? ); // Wait for reply - let (msg, latency) = match self.wait_for_reply(waitable_reply).await? { + let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), TimeoutOr::Value(v) => v, }; diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index 323789b3..53af4629 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -101,6 +101,8 @@ impl RPCProcessor { RPCQuestionDetail::StatusQ(status_q), ); + let debug_string = format!("Status => {}", dest); + // Send the info request let waitable_reply = network_result_try!(self.question(dest.clone(), question, None).await?); @@ -109,7 +111,7 @@ impl RPCProcessor { let send_data_kind = waitable_reply.send_data_kind; // Wait for reply - let (msg, latency) = match self.wait_for_reply(waitable_reply).await? { + let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), TimeoutOr::Value(v) => v, };