From 68ea977d0f5f368c61f123bc5cf37ce22724bf99 Mon Sep 17 00:00:00 2001 From: John Smith Date: Tue, 30 Aug 2022 21:21:16 -0400 Subject: [PATCH] refactor for routing domains --- veilid-core/proto/veilid.capnp | 3 + veilid-core/src/network_manager/mod.rs | 89 +++++---- veilid-core/src/network_manager/native/mod.rs | 19 +- .../native/network_class_discovery.rs | 16 +- veilid-core/src/network_manager/tasks.rs | 53 +++--- veilid-core/src/network_manager/wasm/mod.rs | 2 +- veilid-core/src/routing_table/bucket_entry.rs | 101 +++++----- veilid-core/src/routing_table/debug.rs | 15 +- veilid-core/src/routing_table/find_nodes.rs | 85 ++++++--- veilid-core/src/routing_table/mod.rs | 155 ++++++++-------- veilid-core/src/routing_table/node_ref.rs | 172 ++++++++++++------ veilid-core/src/rpc_processor/mod.rs | 60 +++--- .../src/rpc_processor/rpc_find_node.rs | 14 +- .../src/rpc_processor/rpc_node_info_update.rs | 1 + veilid-core/src/rpc_processor/rpc_status.rs | 8 +- veilid-core/src/veilid_api/mod.rs | 40 +--- 16 files changed, 471 insertions(+), 362 deletions(-) diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index 71533bad..c8bd457f 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -200,11 +200,14 @@ struct DialInfoDetail { } struct NodeStatus { + # PublicInternet RoutingDomain Status willRoute @0 :Bool; willTunnel @1 :Bool; willSignal @2 :Bool; willRelay @3 :Bool; willValidateDialInfo @4 :Bool; + # LocalNetwork RoutingDomain Status + # TODO } struct ProtocolTypeSet { diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 9099f038..2ea2bc54 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -147,7 +147,6 @@ struct NetworkManagerInner { update_callback: Option, stats: NetworkManagerStats, client_whitelist: LruCache, - relay_node: Option, public_address_check_cache: BTreeMap>, public_address_inconsistencies_table: @@ -187,7 +186,6 @@ impl NetworkManager { update_callback: None, stats: NetworkManagerStats::default(), client_whitelist: LruCache::new_unbounded(), - relay_node: None, public_address_check_cache: BTreeMap::new(), public_address_inconsistencies_table: BTreeMap::new(), protocol_config: None, @@ -315,10 +313,6 @@ impl NetworkManager { .clone() } - pub fn relay_node(&self) -> Option { - self.inner.lock().relay_node.clone() - } - #[instrument(level = "debug", skip_all, err)] pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> { let routing_table = RoutingTable::new(self.clone()); @@ -418,7 +412,9 @@ impl NetworkManager { } // Inform routing table entries that our dial info has changed - self.send_node_info_updates(true).await; + for rd in RoutingDomain::all() { + self.send_node_info_updates(rd, true).await; + } // Inform api clients that things have changed self.send_network_update(); @@ -478,7 +474,6 @@ impl NetworkManager { { let mut inner = self.inner.lock(); inner.components = None; - inner.relay_node = None; inner.public_inbound_dial_info_filter = None; inner.local_inbound_dial_info_filter = None; inner.public_outbound_dial_info_filter = None; @@ -601,9 +596,9 @@ impl NetworkManager { } // Return what network class we are in - pub fn get_network_class(&self) -> Option { + pub fn get_network_class(&self, routing_domain: RoutingDomain) -> Option { if let Some(components) = &self.inner.lock().components { - components.net.get_network_class() + components.net.get_network_class(routing_domain) } else { None } @@ -611,7 +606,9 @@ impl NetworkManager { // Get our node's capabilities pub fn generate_node_status(&self) -> NodeStatus { - let peer_info = self.routing_table().get_own_peer_info(); + let peer_info = self + .routing_table() + .get_own_peer_info(RoutingDomain::PublicInternet); let will_route = peer_info.signed_node_info.node_info.can_inbound_relay(); // xxx: eventually this may have more criteria added let will_tunnel = peer_info.signed_node_info.node_info.can_inbound_relay(); // xxx: we may want to restrict by battery life and network bandwidth at some point @@ -776,6 +773,7 @@ impl NetworkManager { // Add the peer info to our routing table let peer_nr = match routing_table.register_node_with_signed_node_info( + RoutingDomain::PublicInternet, peer_info.node_id.key, peer_info.signed_node_info, false, @@ -799,6 +797,7 @@ impl NetworkManager { // Add the peer info to our routing table let mut peer_nr = match routing_table.register_node_with_signed_node_info( + RoutingDomain::PublicInternet, peer_info.node_id.key, peer_info.signed_node_info, false, @@ -900,8 +899,7 @@ impl NetworkManager { } // Get node's min/max version and see if we can send to it // and if so, get the max version we can use - let version = if let Some((node_min, node_max)) = node_ref.operate(|e| e.min_max_version()) - { + let version = if let Some((node_min, node_max)) = node_ref.min_max_version() { #[allow(clippy::absurd_extreme_comparisons)] if node_min > MAX_VERSION || node_max < MIN_VERSION { bail!( @@ -966,7 +964,7 @@ impl NetworkManager { // Get the target's inbound relay, it must have one or it is not reachable // Note that .relay() never returns our own node. We can't relay to ourselves. - if let Some(inbound_relay_nr) = target_node_ref.relay() { + if let Some(inbound_relay_nr) = target_node_ref.relay(RoutingDomain::PublicInternet) { // Scope down to protocols we can do outbound let inbound_relay_nr = inbound_relay_nr.filtered_clone(public_outbound_dif.clone()); // Can we reach the inbound relay? @@ -975,8 +973,9 @@ impl NetworkManager { .is_some() { // Can we receive anything inbound ever? - let our_network_class = - self.get_network_class().unwrap_or(NetworkClass::Invalid); + let our_network_class = self + .get_network_class(RoutingDomain::PublicInternet) + .unwrap_or(NetworkClass::Invalid); if matches!(our_network_class, NetworkClass::InboundCapable) { let routing_table = self.routing_table(); @@ -985,7 +984,10 @@ impl NetworkManager { // Get the best match dial info for an reverse inbound connection let reverse_dif = self .get_inbound_dial_info_filter(RoutingDomain::PublicInternet) - .filtered(target_node_ref.node_info_outbound_filter()); + .filtered( + target_node_ref + .node_info_outbound_filter(RoutingDomain::PublicInternet), + ); if let Some(reverse_did) = routing_table.first_filtered_dial_info_detail( Some(RoutingDomain::PublicInternet), &reverse_dif, @@ -1016,7 +1018,10 @@ impl NetworkManager { // Does the self node have a direct udp dialinfo the target can reach? let inbound_udp_dif = self .get_inbound_dial_info_filter(RoutingDomain::PublicInternet) - .filtered(target_node_ref.node_info_outbound_filter()) + .filtered( + target_node_ref + .node_info_outbound_filter(RoutingDomain::PublicInternet), + ) .filtered( DialInfoFilter::global().with_protocol_type(ProtocolType::UDP), ); @@ -1046,7 +1051,9 @@ impl NetworkManager { } } // If the other node is not inbound capable at all, it needs to have an inbound relay - else if let Some(target_inbound_relay_nr) = target_node_ref.relay() { + else if let Some(target_inbound_relay_nr) = + target_node_ref.relay(RoutingDomain::PublicInternet) + { // Can we reach the full relay? if target_inbound_relay_nr .first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet)) @@ -1056,6 +1063,14 @@ impl NetworkManager { } } + // If we can't reach the node by other means, try our outbound relay if we have one + if let Some(relay_node) = self + .routing_table() + .relay_node(RoutingDomain::PublicInternet) + { + return ContactMethod::OutboundRelay(relay_node); + } + ContactMethod::Unreachable } @@ -1093,11 +1108,6 @@ impl NetworkManager { return out; } - // If we can't reach the node by other means, try our outbound relay if we have one - if let Some(relay_node) = self.relay_node() { - return ContactMethod::OutboundRelay(relay_node); - } - // Otherwise, we can't reach this node log_net!("unable to reach node {:?}", target_node_ref); ContactMethod::Unreachable @@ -1105,6 +1115,7 @@ impl NetworkManager { // Send a reverse connection signal and wait for the return receipt over it // Then send the data across the new connection + // Only usable for PublicInternet routing domain #[instrument(level = "trace", skip(self, data), err)] pub async fn do_reverse_connect( &self, @@ -1118,7 +1129,9 @@ impl NetworkManager { let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?; // Get our peer info - let peer_info = self.routing_table().get_own_peer_info(); + let peer_info = self + .routing_table() + .get_own_peer_info(RoutingDomain::PublicInternet); // Issue the signal let rpc = self.rpc_processor(); @@ -1172,6 +1185,7 @@ impl NetworkManager { // Send a hole punch signal and do a negotiating ping and wait for the return receipt // Then send the data across the new connection + // Only usable for PublicInternet routing domain #[instrument(level = "trace", skip(self, data), err)] pub async fn do_hole_punch( &self, @@ -1189,7 +1203,9 @@ impl NetworkManager { let receipt_timeout = ms_to_us(self.config.get().network.hole_punch_receipt_time_ms); let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?; // Get our peer info - let peer_info = self.routing_table().get_own_peer_info(); + let peer_info = self + .routing_table() + .get_own_peer_info(RoutingDomain::PublicInternet); // Get the udp direct dialinfo for the hole punch let hole_punch_did = target_nr @@ -1358,7 +1374,7 @@ impl NetworkManager { // Serialize out peer info let bootstrap_peerinfo: Vec = bootstrap_nodes .iter() - .filter_map(|b| b.peer_info()) + .filter_map(|b| b.peer_info(RoutingDomain::PublicInternet)) .collect(); let json_bytes = serialize_json(bootstrap_peerinfo).as_bytes().to_vec(); @@ -1567,7 +1583,7 @@ impl NetworkManager { } Some(v) => v, }; - source_noderef.operate_mut(|e| e.set_min_max_version(envelope.get_min_max_version())); + source_noderef.set_min_max_version(envelope.get_min_max_version()); // xxx: deal with spoofing and flooding here? @@ -1713,9 +1729,11 @@ impl NetworkManager { let mut bad_public_address_detection_punishment: Option< Box, > = None; - let network_class = net.get_network_class().unwrap_or(NetworkClass::Invalid); + let public_internet_network_class = net + .get_network_class(RoutingDomain::PublicInternet) + .unwrap_or(NetworkClass::Invalid); let needs_public_address_detection = - if matches!(network_class, NetworkClass::InboundCapable) { + if matches!(public_internet_network_class, NetworkClass::InboundCapable) { // Get the dial info filter for this connection so we can check if we have any public dialinfo that may have changed let dial_info_filter = connection_descriptor.make_dial_info_filter(); @@ -1831,7 +1849,7 @@ impl NetworkManager { } // Inform routing table entries that our dial info has changed - pub async fn send_node_info_updates(&self, all: bool) { + pub async fn send_node_info_updates(&self, routing_domain: RoutingDomain, all: bool) { let this = self.clone(); // Run in background only once @@ -1842,7 +1860,8 @@ impl NetworkManager { .single_spawn(async move { // Only update if we actually have a valid network class if matches!( - this.get_network_class().unwrap_or(NetworkClass::Invalid), + this.get_network_class(routing_domain) + .unwrap_or(NetworkClass::Invalid), NetworkClass::Invalid ) { trace!( @@ -1853,7 +1872,9 @@ impl NetworkManager { // Get the list of refs to all nodes to update let cur_ts = intf::get_timestamp(); - let node_refs = this.routing_table().get_nodes_needing_updates(cur_ts, all); + let node_refs = + this.routing_table() + .get_nodes_needing_updates(routing_domain, cur_ts, all); // Send the updates log_net!(debug "Sending node info updates to {} nodes", node_refs.len()); @@ -1872,7 +1893,7 @@ impl NetworkManager { } // Mark the node as having seen our node info - nr.set_seen_our_node_info(); + nr.set_seen_our_node_info(routing_domain); }); } diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index d04309ef..95136b18 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -38,7 +38,7 @@ struct NetworkInner { network_needs_restart: bool, protocol_config: Option, static_public_dialinfo: ProtocolTypeSet, - network_class: Option, + network_class: [Option; RoutingDomain::count()], join_handles: Vec>, stop_source: Option, udp_port: u16, @@ -98,7 +98,7 @@ impl Network { public_dial_info_check_punishment: None, protocol_config: None, static_public_dialinfo: ProtocolTypeSet::empty(), - network_class: None, + network_class: [None, None], join_handles: Vec::new(), stop_source: None, udp_port: 0u16, @@ -715,7 +715,8 @@ impl Network { if !detect_address_changes { let mut inner = self.inner.lock(); if !inner.static_public_dialinfo.is_empty() { - inner.network_class = Some(NetworkClass::InboundCapable); + inner.network_class[RoutingDomain::PublicInternet as usize] = + Some(NetworkClass::InboundCapable); } } @@ -796,9 +797,9 @@ impl Network { inner.doing_public_dial_info_check } - pub fn get_network_class(&self) -> Option { + pub fn get_network_class(&self, routing_domain: RoutingDomain) -> Option { let inner = self.inner.lock(); - inner.network_class + inner.network_class[routing_domain as usize] } ////////////////////////////////////////// @@ -861,9 +862,13 @@ impl Network { // If we need to figure out our network class, tick the task for it if detect_address_changes { - let network_class = self.get_network_class().unwrap_or(NetworkClass::Invalid); + let public_internet_network_class = self + .get_network_class(RoutingDomain::PublicInternet) + .unwrap_or(NetworkClass::Invalid); let needs_public_dial_info_check = self.needs_public_dial_info_check(); - if network_class == NetworkClass::Invalid || needs_public_dial_info_check { + if public_internet_network_class == NetworkClass::Invalid + || needs_public_dial_info_check + { let routing_table = self.routing_table(); let rth = routing_table.get_routing_table_health(); diff --git a/veilid-core/src/network_manager/native/network_class_discovery.rs b/veilid-core/src/network_manager/native/network_class_discovery.rs index ebf34a22..6504fa17 100644 --- a/veilid-core/src/network_manager/native/network_class_discovery.rs +++ b/veilid-core/src/network_manager/native/network_class_discovery.rs @@ -124,7 +124,7 @@ impl DiscoveryContext { let inbound_dial_info_entry_filter = RoutingTable::make_inbound_dial_info_entry_filter(dial_info_filter.clone()); let disallow_relays_filter = move |e: &BucketEntryInner| { - if let Some(n) = e.node_info() { + if let Some(n) = e.node_info(RoutingDomain::PublicInternet) { n.relay_peer_info.is_none() } else { false @@ -583,7 +583,8 @@ impl Network { let (protocol_config, existing_network_class, tcp_same_port) = { let inner = self.inner.lock(); let protocol_config = inner.protocol_config.unwrap_or_default(); - let existing_network_class = inner.network_class; + let existing_network_class = + inner.network_class[RoutingDomain::PublicInternet as usize]; let tcp_same_port = if protocol_config.inbound.contains(ProtocolType::TCP) && protocol_config.inbound.contains(ProtocolType::WS) { @@ -815,14 +816,15 @@ impl Network { // Is the network class different? if existing_network_class != new_network_class { - self.inner.lock().network_class = new_network_class; + self.inner.lock().network_class[RoutingDomain::PublicInternet as usize] = + new_network_class; changed = true; - log_net!(debug "network class changed to {:?}", new_network_class); + log_net!(debug "PublicInternet network class changed to {:?}", new_network_class); } } else if existing_network_class.is_some() { // Network class could not be determined routing_table.clear_dial_info_details(RoutingDomain::PublicInternet); - self.inner.lock().network_class = None; + self.inner.lock().network_class[RoutingDomain::PublicInternet as usize] = None; changed = true; log_net!(debug "network class cleared"); } @@ -834,7 +836,9 @@ impl Network { } } else { // Send updates to everyone - network_manager.send_node_info_updates(true).await; + network_manager + .send_node_info_updates(RoutingDomain::PublicInternet, true) + .await; } Ok(()) diff --git a/veilid-core/src/network_manager/tasks.rs b/veilid-core/src/network_manager/tasks.rs index a8b550fc..91e18588 100644 --- a/veilid-core/src/network_manager/tasks.rs +++ b/veilid-core/src/network_manager/tasks.rs @@ -187,9 +187,12 @@ impl NetworkManager { for pi in peer_info { let k = pi.node_id.key; // Register the node - if let Some(nr) = - routing_table.register_node_with_signed_node_info(k, pi.signed_node_info, false) - { + if let Some(nr) = routing_table.register_node_with_signed_node_info( + RoutingDomain::PublicInternet, + k, + pi.signed_node_info, + false, + ) { // Add this our futures to process in parallel let routing_table = routing_table.clone(); unord.push( @@ -278,6 +281,7 @@ impl NetworkManager { // Make invalid signed node info (no signature) if let Some(nr) = routing_table.register_node_with_signed_node_info( + RoutingDomain::PublicInternet, k, SignedNodeInfo::with_no_signature(NodeInfo { network_class: NetworkClass::InboundCapable, // Bootstraps are always inbound capable @@ -298,7 +302,7 @@ impl NetworkManager { let _ = routing_table.find_target(nr.clone()).await; // Ensure we got the signed peer info - if !nr.operate(|e| e.has_valid_signed_node_info()) { + if !nr.has_valid_signed_node_info(Some(RoutingDomain::PublicInternet)) { log_net!(warn "bootstrap at {:?} did not return valid signed node info", nr @@ -329,19 +333,24 @@ impl NetworkManager { let rpc = self.rpc_processor(); let routing_table = self.routing_table(); - let relay_node_id = self.relay_node().map(|nr| nr.node_id()); + let mut unord = FuturesUnordered::new(); + + let node_refs = routing_table.get_nodes_needing_ping(cur_ts); + let mut mapped_port_info = routing_table.get_mapped_port_info(); + let opt_public_internet_relay_nr = routing_table.relay_node(RoutingDomain::PublicInternet); + let opt_public_internet_relay_id = opt_public_internet_relay_nr.map(|nr| nr.node_id()); + // let opt_local_network_relay_nr = routing_table.relay_node(RoutingDomain::LocalNetwork); + // let opt_local_network_relay_id = opt_local_network_relay_nr.map(|nr| nr.node_id()); + + // Public Internet Routing Domain let dids = routing_table.all_filtered_dial_info_details( Some(RoutingDomain::PublicInternet), &DialInfoFilter::global(), ); - let mut unord = FuturesUnordered::new(); - - let node_refs = routing_table.get_nodes_needing_ping(cur_ts, relay_node_id); - let mut mapped_port_info = routing_table.get_mapped_port_info(); for nr in node_refs { let rpc = rpc.clone(); - if Some(nr.node_id()) == relay_node_id { + if Some(nr.node_id()) == opt_public_internet_relay_id { // Relay nodes get pinged over all protocols we have inbound dialinfo for // This is so we can preserve the inbound NAT mappings at our router for did in &dids { @@ -414,21 +423,20 @@ impl NetworkManager { ) -> EyreResult<()> { // Get our node's current node info and network class and do the right thing let routing_table = self.routing_table(); - let node_info = routing_table.get_own_node_info(); - let network_class = self.get_network_class(); + let node_info = routing_table.get_own_node_info(RoutingDomain::PublicInternet); + let network_class = self.get_network_class(RoutingDomain::PublicInternet); let mut node_info_changed = false; // Do we know our network class yet? if let Some(network_class) = network_class { // If we already have a relay, see if it is dead, or if we don't need it any more let has_relay = { - let mut inner = self.inner.lock(); - if let Some(relay_node) = inner.relay_node.clone() { - let state = relay_node.operate(|e| e.state(cur_ts)); + if let Some(relay_node) = routing_table.relay_node(RoutingDomain::PublicInternet) { + let state = relay_node.state(cur_ts); // Relay node is dead or no longer needed if matches!(state, BucketEntryState::Dead) { info!("Relay node died, dropping relay {}", relay_node); - inner.relay_node = None; + routing_table.set_relay_node(RoutingDomain::PublicInternet, None); node_info_changed = true; false } else if !node_info.requires_relay() { @@ -436,7 +444,7 @@ impl NetworkManager { "Relay node no longer required, dropping relay {}", relay_node ); - inner.relay_node = None; + routing_table.set_relay_node(RoutingDomain::PublicInternet, None); node_info_changed = true; false } else { @@ -453,16 +461,15 @@ impl NetworkManager { if network_class.outbound_wants_relay() { // The outbound relay is the host of the PWA if let Some(outbound_relay_peerinfo) = intf::get_outbound_relay_peer().await { - let mut inner = self.inner.lock(); - // Register new outbound relay if let Some(nr) = routing_table.register_node_with_signed_node_info( + RoutingDomain::PublicInternet, outbound_relay_peerinfo.node_id.key, outbound_relay_peerinfo.signed_node_info, false, ) { info!("Outbound relay node selected: {}", nr); - inner.relay_node = Some(nr); + routing_table.set_relay_node(RoutingDomain::PublicInternet, Some(nr)); node_info_changed = true; } } @@ -470,9 +477,8 @@ impl NetworkManager { } else { // Find a node in our routing table that is an acceptable inbound relay if let Some(nr) = routing_table.find_inbound_relay(cur_ts) { - let mut inner = self.inner.lock(); info!("Inbound relay node selected: {}", nr); - inner.relay_node = Some(nr); + routing_table.set_relay_node(RoutingDomain::PublicInternet, Some(nr)); node_info_changed = true; } } @@ -481,7 +487,8 @@ impl NetworkManager { // Re-send our node info if we selected a relay if node_info_changed { - self.send_node_info_updates(true).await; + self.send_node_info_updates(RoutingDomain::PublicInternet, true) + .await; } Ok(()) diff --git a/veilid-core/src/network_manager/wasm/mod.rs b/veilid-core/src/network_manager/wasm/mod.rs index cc77391d..aa300718 100644 --- a/veilid-core/src/network_manager/wasm/mod.rs +++ b/veilid-core/src/network_manager/wasm/mod.rs @@ -296,7 +296,7 @@ impl Network { // } - pub fn get_network_class(&self) -> Option { + pub fn get_network_class(&self, _routing_domain: PublicInternet) -> Option { // xxx eventually detect tor browser? return if self.inner.lock().network_started { Some(NetworkClass::WebApp) diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 0930e763..9f8f6bcd 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -44,11 +44,10 @@ struct LastConnectionKey(PeerScope, ProtocolType, AddressType); #[derive(Debug)] pub struct BucketEntryInner { min_max_version: Option<(u8, u8)>, - seen_our_node_info: bool, updated_since_last_network_change: bool, last_connections: BTreeMap, - opt_signed_node_info: Option, - opt_local_node_info: Option, + signed_node_info: [Option>; RoutingDomain::count()], + seen_our_node_info: [bool; RoutingDomain::count()], peer_stats: PeerStats, latency_stats_accounting: LatencyStatsAccounting, transfer_stats_accounting: TransferStatsAccounting, @@ -118,6 +117,7 @@ impl BucketEntryInner { // Retuns true if the node info changed pub fn update_signed_node_info( &mut self, + routing_domain: RoutingDomain, signed_node_info: SignedNodeInfo, allow_invalid_signature: bool, ) { @@ -128,7 +128,7 @@ impl BucketEntryInner { } // See if we have an existing signed_node_info to update or not - if let Some(current_sni) = &self.opt_signed_node_info { + if let Some(current_sni) = &self.signed_node_info[routing_domain as usize] { // If the timestamp hasn't changed or is less, ignore this update if signed_node_info.timestamp <= current_sni.timestamp { // If we received a node update with the same timestamp @@ -137,7 +137,7 @@ impl BucketEntryInner { && signed_node_info.timestamp == current_sni.timestamp { // No need to update the signednodeinfo though since the timestamp is the same - // Just return true so we can make the node not dead + // Touch the node and let it try to live again self.updated_since_last_network_change = true; self.touch_last_seen(intf::get_timestamp()); } @@ -152,43 +152,57 @@ impl BucketEntryInner { )); // Update the signed node info - self.opt_signed_node_info = Some(signed_node_info); + self.signed_node_info[routing_domain as usize] = Some(Box::new(signed_node_info)); self.updated_since_last_network_change = true; self.touch_last_seen(intf::get_timestamp()); } - pub fn update_local_node_info(&mut self, local_node_info: LocalNodeInfo) { - self.opt_local_node_info = Some(local_node_info) - } - pub fn has_node_info(&self) -> bool { - self.opt_signed_node_info.is_some() - } - - pub fn has_valid_signed_node_info(&self) -> bool { - if let Some(sni) = &self.opt_signed_node_info { - sni.is_valid() + pub fn has_node_info(&self, opt_routing_domain: Option) -> bool { + if let Some(rd) = opt_routing_domain { + self.signed_node_info[rd as usize].is_some() } else { + for rd in RoutingDomain::all() { + if self.signed_node_info[rd as usize].is_some() { + return true; + } + } false } } - pub fn has_local_node_info(&self) -> bool { - self.opt_local_node_info.is_some() + pub fn has_valid_signed_node_info(&self, opt_routing_domain: Option) -> bool { + if let Some(rd) = opt_routing_domain { + if let Some(sni) = &self.signed_node_info[rd as usize] { + if sni.is_valid() { + return true; + } + } + false + } else { + for rd in RoutingDomain::all() { + if let Some(sni) = &self.signed_node_info[rd as usize] { + if sni.is_valid() { + return true; + } + } + } + false + } } - pub fn node_info(&self) -> Option { - self.opt_signed_node_info + pub fn node_info(&self, routing_domain: RoutingDomain) -> Option { + self.signed_node_info[routing_domain as usize] .as_ref() .map(|s| s.node_info.clone()) } - pub fn local_node_info(&self) -> Option { - self.opt_local_node_info.clone() - } - pub fn peer_info(&self, key: DHTKey) -> Option { - self.opt_signed_node_info.as_ref().map(|s| PeerInfo { - node_id: NodeId::new(key), - signed_node_info: s.clone(), - }) + + pub fn peer_info(&self, key: DHTKey, routing_domain: RoutingDomain) -> Option { + self.signed_node_info[routing_domain as usize] + .as_ref() + .map(|s| PeerInfo { + node_id: NodeId::new(key), + signed_node_info: *s.clone(), + }) } fn descriptor_to_key(last_connection: ConnectionDescriptor) -> LastConnectionKey { @@ -256,12 +270,12 @@ impl BucketEntryInner { self.peer_stats.status = Some(status); } - pub fn set_seen_our_node_info(&mut self, seen: bool) { - self.seen_our_node_info = seen; + pub fn set_seen_our_node_info(&mut self, routing_domain: RoutingDomain, seen: bool) { + self.seen_our_node_info[routing_domain as usize] = seen; } - pub fn has_seen_our_node_info(&self) -> bool { - self.seen_our_node_info + pub fn has_seen_our_node_info(&self, routing_domain: RoutingDomain) -> bool { + self.seen_our_node_info[routing_domain as usize] } pub fn set_updated_since_last_network_change(&mut self, updated: bool) { @@ -337,20 +351,14 @@ impl BucketEntryInner { } // Check if this node needs a ping right now to validate it is still reachable - pub(super) fn needs_ping( - &self, - node_id: &DHTKey, - cur_ts: u64, - relay_node_id: Option, - ) -> bool { + pub(super) fn needs_ping(&self, node_id: &DHTKey, cur_ts: u64, needs_keepalive: bool) -> bool { // See which ping pattern we are to use let state = self.state(cur_ts); - // If this entry is our relay node, then we should ping it regularly to keep our association alive - if let Some(relay_node_id) = relay_node_id { - if relay_node_id == *node_id { - return self.needs_constant_ping(cur_ts, KEEPALIVE_PING_INTERVAL_SECS as u64); - } + // If this entry needs a keepalive (like a relay node), + // then we should ping it regularly to keep our association alive + if needs_keepalive { + return self.needs_constant_ping(cur_ts, KEEPALIVE_PING_INTERVAL_SECS as u64); } match state { @@ -494,11 +502,10 @@ impl BucketEntry { ref_count: AtomicU32::new(0), inner: RwLock::new(BucketEntryInner { min_max_version: None, - seen_our_node_info: false, + seen_our_node_info: [false, false], updated_since_last_network_change: false, last_connections: BTreeMap::new(), - opt_signed_node_info: None, - opt_local_node_info: None, + signed_node_info: [None, None], peer_stats: PeerStats { time_added: now, rpc_stats: RPCStats::default(), @@ -547,7 +554,7 @@ impl Drop for BucketEntry { panic!( "bucket entry dropped with non-zero refcount: {:#?}", - self.inner.read().node_info() + &*self.inner.read() ) } } diff --git a/veilid-core/src/routing_table/debug.rs b/veilid-core/src/routing_table/debug.rs index 7e3cc7d6..1966bd4e 100644 --- a/veilid-core/src/routing_table/debug.rs +++ b/veilid-core/src/routing_table/debug.rs @@ -85,8 +85,17 @@ impl RoutingTable { out += &format!(" {:>2}: {:?}\n", n, gdi); } - out += "Own PeerInfo:\n"; - out += &format!(" {:#?}\n", self.get_own_peer_info()); + out += "LocalNetwork PeerInfo:\n"; + out += &format!( + " {:#?}\n", + self.get_own_peer_info(RoutingDomain::LocalNetwork) + ); + + out += "PublicInternet PeerInfo:\n"; + out += &format!( + " {:#?}\n", + self.get_own_peer_info(RoutingDomain::PublicInternet) + ); out } @@ -142,7 +151,7 @@ impl RoutingTable { let mut out = String::new(); out += &format!("Entry {:?}:\n", node_id); if let Some(nr) = self.lookup_node_ref(node_id) { - out += &nr.operate(|e| format!("{:#?}\n", e)); + out += &nr.operate(|_rt, e| format!("{:#?}\n", e)); } else { out += "Entry not found\n"; } diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index eed17590..c6bcccb1 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -19,12 +19,19 @@ impl RoutingTable { ) -> impl FnMut(&BucketEntryInner) -> bool { // does it have matching public dial info? move |e| { - e.node_info() - .map(|n| { - n.first_filtered_dial_info_detail(|did| did.matches_filter(&dial_info_filter)) + for rd in RoutingDomain::all() { + if let Some(ni) = e.node_info(rd) { + if ni + .first_filtered_dial_info_detail(|did| { + did.matches_filter(&dial_info_filter) + }) .is_some() - }) - .unwrap_or(false) + { + return true; + } + } + } + false } } @@ -34,14 +41,17 @@ impl RoutingTable { ) -> impl FnMut(&BucketEntryInner) -> bool { // does the node's outbound capabilities match the dialinfo? move |e| { - e.node_info() - .map(|n| { + for rd in RoutingDomain::all() { + if let Some(ni) = e.node_info(rd) { let mut dif = DialInfoFilter::all(); - dif = dif.with_protocol_type_set(n.outbound_protocols); - dif = dif.with_address_type_set(n.address_types); - dial_info.matches_filter(&dif) - }) - .unwrap_or(false) + dif = dif.with_protocol_type_set(ni.outbound_protocols); + dif = dif.with_address_type_set(ni.address_types); + if dial_info.matches_filter(&dif) { + return true; + } + } + } + false } } @@ -75,14 +85,17 @@ impl RoutingTable { // count node_count, // filter - Some(move |_k: DHTKey, v: Option>| { + Some(|_k: DHTKey, v: Option>| { let entry = v.unwrap(); entry.with(|e| { - // skip nodes on our local network here - if e.local_node_info().is_some() { + // skip nodes on local network + if e.node_info(RoutingDomain::LocalNetwork).is_some() { + return false; + } + // skip nodes not on public internet + if e.node_info(RoutingDomain::PublicInternet).is_none() { return false; } - // skip nodes that dont match entry filter entry_filter(e) }) @@ -113,7 +126,7 @@ impl RoutingTable { let entry = v.unwrap(); entry.with(|e| { // skip nodes on our local network here - if e.local_node_info().is_some() { + if e.node_info(RoutingDomain::LocalNetwork).is_some() { return false; } @@ -147,7 +160,9 @@ impl RoutingTable { keep }; - e.node_info().map(filter).unwrap_or(false) + e.node_info(RoutingDomain::PublicInternet) + .map(filter) + .unwrap_or(false) }) }), // transform @@ -158,49 +173,61 @@ impl RoutingTable { } // Get our own node's peer info (public node info) so we can share it with other nodes - pub fn get_own_peer_info(&self) -> PeerInfo { - PeerInfo::new(NodeId::new(self.node_id()), self.get_own_signed_node_info()) + pub fn get_own_peer_info(&self, routing_domain: RoutingDomain) -> PeerInfo { + PeerInfo::new( + NodeId::new(self.node_id()), + self.get_own_signed_node_info(routing_domain), + ) } - pub fn get_own_signed_node_info(&self) -> SignedNodeInfo { + pub fn get_own_signed_node_info(&self, routing_domain: RoutingDomain) -> SignedNodeInfo { let node_id = NodeId::new(self.node_id()); let secret = self.node_id_secret(); - SignedNodeInfo::with_secret(self.get_own_node_info(), node_id, &secret).unwrap() + SignedNodeInfo::with_secret(self.get_own_node_info(routing_domain), node_id, &secret) + .unwrap() } - pub fn get_own_node_info(&self) -> NodeInfo { + pub fn get_own_node_info(&self, routing_domain: RoutingDomain) -> NodeInfo { let netman = self.network_manager(); - let relay_node = netman.relay_node(); + let relay_node = self.relay_node(routing_domain); let pc = netman.get_protocol_config(); NodeInfo { - network_class: netman.get_network_class().unwrap_or(NetworkClass::Invalid), + network_class: netman + .get_network_class(routing_domain) + .unwrap_or(NetworkClass::Invalid), outbound_protocols: pc.outbound, address_types: pc.family_global, min_version: MIN_VERSION, max_version: MAX_VERSION, - dial_info_detail_list: self.dial_info_details(RoutingDomain::PublicInternet), - relay_peer_info: relay_node.and_then(|rn| rn.peer_info().map(Box::new)), + dial_info_detail_list: self.dial_info_details(routing_domain), + relay_peer_info: relay_node.and_then(|rn| rn.peer_info(routing_domain).map(Box::new)), } } pub fn filter_has_valid_signed_node_info( + &self, v: Option>, own_peer_info_is_valid: bool, + opt_routing_domain: Option, ) -> bool { + let routing_table = self.clone(); match v { None => own_peer_info_is_valid, - Some(entry) => entry.with(|e| e.has_valid_signed_node_info()), + Some(entry) => entry.with(|e| e.has_valid_signed_node_info(opt_routing_domain)), } } pub fn transform_to_peer_info( + &self, + routing_domain: RoutingDomain, k: DHTKey, v: Option>, own_peer_info: &PeerInfo, ) -> PeerInfo { + let routing_table = self.clone(); match v { None => own_peer_info.clone(), - Some(entry) => entry.with(|e| e.peer_info(k).unwrap()), + Some(entry) => entry.with(|e| e.peer_info(k, routing_domain).unwrap()), } } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 087d65a9..1aa15f05 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -15,22 +15,39 @@ use bucket::*; pub use bucket_entry::*; pub use debug::*; pub use find_nodes::*; +use hashlink::LruCache; pub use node_ref::*; pub use stats_accounting::*; +const RECENT_PEERS_TABLE_SIZE: usize = 64; + ////////////////////////////////////////////////////////////////////////// #[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Ord, Eq)] pub enum RoutingDomain { - PublicInternet, - LocalNetwork, + PublicInternet = 0, + LocalNetwork = 1, +} +impl RoutingDomain { + pub const fn count() -> usize { + 2 + } + pub const fn all() -> [RoutingDomain; RoutingDomain::count()] { + [RoutingDomain::PublicInternet, RoutingDomain::LocalNetwork] + } } #[derive(Debug, Default)] pub struct RoutingDomainDetail { + relay_node: Option, dial_info_details: Vec, } +#[derive(Debug, Clone, Copy)] +pub struct RecentPeersEntry { + last_connection: ConnectionDescriptor, +} + struct RoutingTableInner { network_manager: NetworkManager, node_id: DHTKey, // The current node's public DHT key @@ -46,6 +63,7 @@ struct RoutingTableInner { self_latency_stats_accounting: LatencyStatsAccounting, // Interim accounting mechanism for this node's RPC latency to any other node self_transfer_stats_accounting: TransferStatsAccounting, // Interim accounting mechanism for the total bandwidth to/from this node self_transfer_stats: TransferStatsDownUp, // Statistics about the total bandwidth to/from this node + recent_peers: LruCache, // Peers we have recently communicated with } #[derive(Clone, Debug, Default)] @@ -82,6 +100,7 @@ impl RoutingTable { self_latency_stats_accounting: LatencyStatsAccounting::new(), self_transfer_stats_accounting: TransferStatsAccounting::new(), self_transfer_stats: TransferStatsDownUp::default(), + recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE), } } fn new_unlocked_inner(_config: VeilidConfig) -> RoutingTableUnlockedInner { @@ -159,6 +178,16 @@ impl RoutingTable { } } + pub fn relay_node(&self, domain: RoutingDomain) -> Option { + let inner = self.inner.read(); + Self::with_routing_domain(&*inner, domain, |rd| rd.relay_node.clone()) + } + + pub fn set_relay_node(&self, domain: RoutingDomain, opt_relay_node: Option) { + let inner = self.inner.write(); + Self::with_routing_domain(&mut *inner, domain, |rd| rd.relay_node = opt_relay_node); + } + pub fn has_dial_info(&self, domain: RoutingDomain) -> bool { let inner = self.inner.read(); Self::with_routing_domain(&*inner, domain, |rd| !rd.dial_info_details.is_empty()) @@ -295,14 +324,14 @@ impl RoutingTable { // Public dial info changed, go through all nodes and reset their 'seen our node info' bit if matches!(domain, RoutingDomain::PublicInternet) { - Self::reset_all_seen_our_node_info(&*inner); - Self::reset_all_updated_since_last_network_change(&*inner); + Self::reset_all_seen_our_node_info(&mut *inner); + Self::reset_all_updated_since_last_network_change(&mut *inner); } Ok(()) } - fn reset_all_seen_our_node_info(inner: &RoutingTableInner) { + fn reset_all_seen_our_node_info(inner: &mut RoutingTableInner) { let cur_ts = intf::get_timestamp(); Self::with_entries(&*inner, cur_ts, BucketEntryState::Dead, |_, v| { v.with_mut(|e| e.set_seen_our_node_info(false)); @@ -310,7 +339,7 @@ impl RoutingTable { }); } - fn reset_all_updated_since_last_network_change(inner: &RoutingTableInner) { + fn reset_all_updated_since_last_network_change(inner: &mut RoutingTableInner) { let cur_ts = intf::get_timestamp(); Self::with_entries(&*inner, cur_ts, BucketEntryState::Dead, |_, v| { v.with_mut(|e| e.set_updated_since_last_network_change(false)); @@ -328,7 +357,7 @@ impl RoutingTable { // Public dial info changed, go through all nodes and reset their 'seen our node info' bit if matches!(domain, RoutingDomain::PublicInternet) { - Self::reset_all_seen_our_node_info(&*inner); + Self::reset_all_seen_our_node_info(&mut *inner); } } @@ -479,12 +508,17 @@ impl RoutingTable { None } - pub fn get_nodes_needing_updates(&self, cur_ts: u64, all: bool) -> Vec { + pub fn get_nodes_needing_updates( + &self, + routing_domain: RoutingDomain, + cur_ts: u64, + all: bool, + ) -> Vec { let inner = self.inner.read(); let mut node_refs = Vec::::with_capacity(inner.bucket_entry_count); Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { // Only update nodes that haven't seen our node info yet - if all || !v.with(|e| e.has_seen_our_node_info()) { + if all || !v.with(|e| e.has_seen_our_node_info(routing_domain)) { node_refs.push(NodeRef::new(self.clone(), k, v, None)); } Option::<()>::None @@ -492,16 +526,25 @@ impl RoutingTable { node_refs } - pub fn get_nodes_needing_ping( - &self, - cur_ts: u64, - relay_node_id: Option, - ) -> Vec { + pub fn get_nodes_needing_ping(&self, cur_ts: u64) -> Vec { let inner = self.inner.read(); + + // Collect relay nodes + let mut relays: HashSet = HashSet::new(); + for rd in RoutingDomain::all() { + let opt_relay_id = + Self::with_routing_domain(&*inner, RoutingDomain::PublicInternet, |rd| { + rd.relay_node.map(|rn| rn.node_id()) + }); + if let Some(relay_id) = opt_relay_id { + relays.insert(relay_id); + } + } + + // Collect all entries that are 'needs_ping' and have some node info making them reachable somehow let mut node_refs = Vec::::with_capacity(inner.bucket_entry_count); Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { - // Only update nodes that haven't seen our node info yet - if v.with(|e| e.needs_ping(&k, cur_ts, relay_node_id)) { + if v.with(|e| e.has_node_info(None) && e.needs_ping(&k, cur_ts, relays.contains(&k))) { node_refs.push(NodeRef::new(self.clone(), k, v, None)); } Option::<()>::None @@ -600,6 +643,7 @@ impl RoutingTable { // and add the dial info we have for it, since that's pretty common pub fn register_node_with_signed_node_info( &self, + routing_domain: RoutingDomain, node_id: DHTKey, signed_node_info: SignedNodeInfo, allow_invalid_signature: bool, @@ -617,7 +661,7 @@ impl RoutingTable { } self.create_node_ref(node_id, |e| { - e.update_signed_node_info(signed_node_info, allow_invalid_signature); + e.update_signed_node_info(routing_domain, signed_node_info, allow_invalid_signature); }) } @@ -653,64 +697,6 @@ impl RoutingTable { Ok(()) } - ////////////////////////////////////////////////////////////////////// - // Stats Accounting - pub fn stats_question_sent( - &self, - node_ref: NodeRef, - ts: u64, - bytes: u64, - expects_answer: bool, - ) { - self.inner - .write() - .self_transfer_stats_accounting - .add_up(bytes); - node_ref.operate_mut(|e| { - e.question_sent(ts, bytes, expects_answer); - }) - } - pub fn stats_question_rcvd(&self, node_ref: NodeRef, ts: u64, bytes: u64) { - self.inner - .write() - .self_transfer_stats_accounting - .add_down(bytes); - node_ref.operate_mut(|e| { - e.question_rcvd(ts, bytes); - }) - } - pub fn stats_answer_sent(&self, node_ref: NodeRef, bytes: u64) { - self.inner - .write() - .self_transfer_stats_accounting - .add_up(bytes); - node_ref.operate_mut(|e| { - e.answer_sent(bytes); - }) - } - pub fn stats_answer_rcvd(&self, node_ref: NodeRef, send_ts: u64, recv_ts: u64, bytes: u64) { - { - let mut inner = self.inner.write(); - inner.self_transfer_stats_accounting.add_down(bytes); - inner - .self_latency_stats_accounting - .record_latency(recv_ts - send_ts); - } - node_ref.operate_mut(|e| { - e.answer_rcvd(send_ts, recv_ts, bytes); - }) - } - pub fn stats_question_lost(&self, node_ref: NodeRef) { - node_ref.operate_mut(|e| { - e.question_lost(); - }) - } - pub fn stats_failed_to_send(&self, node_ref: NodeRef, ts: u64, expects_answer: bool) { - node_ref.operate_mut(|e| { - e.failed_to_send(ts, expects_answer); - }) - } - ////////////////////////////////////////////////////////////////////// // Routing Table Health Metrics @@ -735,4 +721,23 @@ impl RoutingTable { } health } + + pub fn get_recent_peers(&self) -> Vec<(DHTKey, RecentPeersEntry)> { + let inner = self.inner.read(); + inner + .recent_peers + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect() + } + + pub fn touch_recent_peer( + inner: &mut RoutingTableInner, + node_id: DHTKey, + last_connection: ConnectionDescriptor, + ) { + inner + .recent_peers + .insert(node_id, RecentPeersEntry { last_connection }); + } } diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index f9b224ef..69859b7c 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -85,57 +85,82 @@ impl NodeRef { // .unwrap_or(true) // } - pub fn operate(&self, f: F) -> T + pub(super) fn operate(&self, f: F) -> T where - F: FnOnce(&BucketEntryInner) -> T, + F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> T, { - self.entry.with(f) + let inner = &*self.routing_table.inner.read(); + self.entry.with(|e| f(inner, e)) } - pub fn operate_mut(&self, f: F) -> T + pub(super) fn operate_mut(&self, f: F) -> T where - F: FnOnce(&mut BucketEntryInner) -> T, + F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner) -> T, { - self.entry.with_mut(f) + let inner = &mut *self.routing_table.inner.write(); + self.entry.with_mut(|e| f(inner, e)) } - pub fn peer_info(&self) -> Option { - self.operate(|e| e.peer_info(self.node_id())) + pub fn peer_info(&self, routing_domain: RoutingDomain) -> Option { + self.operate(|_rti, e| e.peer_info(self.node_id(), routing_domain)) } - pub fn has_seen_our_node_info(&self) -> bool { - self.operate(|e| e.has_seen_our_node_info()) + pub fn has_valid_signed_node_info(&self, opt_routing_domain: Option) -> bool { + self.operate(|_rti, e| e.has_valid_signed_node_info(opt_routing_domain)) } - pub fn set_seen_our_node_info(&self) { - self.operate_mut(|e| e.set_seen_our_node_info(true)); + pub fn has_seen_our_node_info(&self, routing_domain: RoutingDomain) -> bool { + self.operate(|_rti, e| e.has_seen_our_node_info(routing_domain)) + } + pub fn set_seen_our_node_info(&self, routing_domain: RoutingDomain) { + self.operate_mut(|_rti, e| e.set_seen_our_node_info(routing_domain, true)); } pub fn has_updated_since_last_network_change(&self) -> bool { - self.operate(|e| e.has_updated_since_last_network_change()) + self.operate(|_rti, e| e.has_updated_since_last_network_change()) } pub fn set_updated_since_last_network_change(&self) { - self.operate_mut(|e| e.set_updated_since_last_network_change(true)); + self.operate_mut(|_rti, e| e.set_updated_since_last_network_change(true)); } - pub fn network_class(&self) -> Option { - self.operate(|e| e.node_info().map(|n| n.network_class)) + + pub fn update_node_status(&self, node_status: NodeStatus) { + self.operate_mut(|_rti, e| { + e.update_node_status(node_status); + }); } - pub fn outbound_protocols(&self) -> Option { - self.operate(|e| e.node_info().map(|n| n.outbound_protocols)) + + pub fn min_max_version(&self) -> Option<(u8, u8)> { + self.operate(|_rti, e| e.min_max_version()) } - pub fn address_types(&self) -> Option { - self.operate(|e| e.node_info().map(|n| n.address_types)) + + pub fn set_min_max_version(&self, min_max_version: (u8, u8)) { + self.operate_mut(|_rti, e| e.set_min_max_version(min_max_version)) } - pub fn node_info_outbound_filter(&self) -> DialInfoFilter { + + pub fn state(&self, cur_ts: u64) -> BucketEntryState { + self.operate(|_rti, e| e.state(cur_ts)) + } + + pub fn network_class(&self, routing_domain: RoutingDomain) -> Option { + self.operate(|_rt, e| e.node_info(routing_domain).map(|n| n.network_class)) + } + pub fn outbound_protocols(&self, routing_domain: RoutingDomain) -> Option { + self.operate(|_rt, e| e.node_info(routing_domain).map(|n| n.outbound_protocols)) + } + pub fn address_types(&self, routing_domain: RoutingDomain) -> Option { + self.operate(|_rt, e| e.node_info(routing_domain).map(|n| n.address_types)) + } + pub fn node_info_outbound_filter(&self, routing_domain: RoutingDomain) -> DialInfoFilter { let mut dif = DialInfoFilter::all(); - if let Some(outbound_protocols) = self.outbound_protocols() { + if let Some(outbound_protocols) = self.outbound_protocols(routing_domain) { dif = dif.with_protocol_type_set(outbound_protocols); } - if let Some(address_types) = self.address_types() { + if let Some(address_types) = self.address_types(routing_domain) { dif = dif.with_address_type_set(address_types); } dif } - pub fn relay(&self) -> Option { - let target_rpi = self.operate(|e| e.node_info().map(|n| n.relay_peer_info))?; + pub fn relay(&self, routing_domain: RoutingDomain) -> Option { + let target_rpi = + self.operate(|_rt, e| e.node_info(routing_domain).map(|n| n.relay_peer_info))?; target_rpi.and_then(|t| { // If relay is ourselves, then return None, because we can't relay through ourselves // and to contact this node we should have had an existing inbound connection @@ -145,7 +170,12 @@ impl NodeRef { // Register relay node and return noderef self.routing_table - .register_node_with_signed_node_info(t.node_id.key, t.signed_node_info, false) + .register_node_with_signed_node_info( + routing_domain, + t.node_id.key, + t.signed_node_info, + false, + ) .map(|mut nr| { nr.set_filter(self.filter_ref().cloned()); nr @@ -156,28 +186,24 @@ impl NodeRef { &self, routing_domain: Option, ) -> Option { - self.operate(|e| { + self.operate(|_rt, e| { // Prefer local dial info first unless it is filtered out if routing_domain == None || routing_domain == Some(RoutingDomain::LocalNetwork) { - e.local_node_info().and_then(|l| { - l.first_filtered_dial_info(|di| { + e.node_info(RoutingDomain::LocalNetwork).and_then(|l| { + l.first_filtered_dial_info_detail(|did| { if let Some(filter) = self.filter.as_ref() { - di.matches_filter(filter) + did.matches_filter(filter) } else { true } }) - .map(|di| DialInfoDetail { - class: DialInfoClass::Direct, - dial_info: di, - }) }) } else { None } .or_else(|| { if routing_domain == None || routing_domain == Some(RoutingDomain::PublicInternet) { - e.node_info().and_then(|n| { + e.node_info(RoutingDomain::PublicInternet).and_then(|n| { n.first_filtered_dial_info_detail(|did| { if let Some(filter) = self.filter.as_ref() { did.matches_filter(filter) @@ -198,26 +224,21 @@ impl NodeRef { routing_domain: Option, ) -> Vec { let mut out = Vec::new(); - self.operate(|e| { + self.operate(|_rt, e| { // Prefer local dial info first unless it is filtered out if routing_domain == None || routing_domain == Some(RoutingDomain::LocalNetwork) { - if let Some(lni) = e.local_node_info() { - for di in lni.all_filtered_dial_info(|di| { + if let Some(ni) = e.node_info(RoutingDomain::LocalNetwork) { + out.append(&mut ni.all_filtered_dial_info_details(|did| { if let Some(filter) = self.filter.as_ref() { - di.matches_filter(filter) + did.matches_filter(filter) } else { true } - }) { - out.push(DialInfoDetail { - class: DialInfoClass::Direct, - dial_info: di, - }); - } + })) } } if routing_domain == None || routing_domain == Some(RoutingDomain::PublicInternet) { - if let Some(ni) = e.node_info() { + if let Some(ni) = e.node_info(RoutingDomain::PublicInternet) { out.append(&mut ni.all_filtered_dial_info_details(|did| { if let Some(filter) = self.filter.as_ref() { did.matches_filter(filter) @@ -235,7 +256,7 @@ impl NodeRef { pub async fn last_connection(&self) -> Option { // Get the last connection and the last time we saw anything with this connection let (last_connection, last_seen) = - self.operate(|e| e.last_connection(self.filter.clone()))?; + self.operate(|_rti, e| e.last_connection(self.filter.clone()))?; // Should we check the connection table? if last_connection.protocol_type().is_connection_oriented() { @@ -254,21 +275,60 @@ impl NodeRef { } pub fn clear_last_connections(&self) { - self.operate_mut(|e| e.clear_last_connections()) + self.operate_mut(|_rti, e| e.clear_last_connections()) } pub fn set_last_connection(&self, connection_descriptor: ConnectionDescriptor, ts: u64) { - self.operate_mut(|e| e.set_last_connection(connection_descriptor, ts)) + self.operate_mut(|_rti, e| e.set_last_connection(connection_descriptor, ts)) } pub fn has_any_dial_info(&self) -> bool { - self.operate(|e| { - e.node_info() - .map(|n| n.has_any_dial_info()) - .unwrap_or(false) - || e.local_node_info() - .map(|l| l.has_dial_info()) - .unwrap_or(false) + self.operate(|_rti, e| { + for rtd in RoutingDomain::all() { + if let Some(ni) = e.node_info(rtd) { + if ni.has_any_dial_info() { + return true; + } + } + } + false + }) + } + + pub fn stats_question_sent(&self, ts: u64, bytes: u64, expects_answer: bool) { + self.operate_mut(|rti, e| { + rti.self_transfer_stats_accounting.add_up(bytes); + e.question_sent(ts, bytes, expects_answer); + }) + } + pub fn stats_question_rcvd(&self, ts: u64, bytes: u64) { + self.operate_mut(|rti, e| { + rti.self_transfer_stats_accounting.add_down(bytes); + e.question_rcvd(ts, bytes); + }) + } + pub fn stats_answer_sent(&self, bytes: u64) { + self.operate_mut(|rti, e| { + rti.self_transfer_stats_accounting.add_up(bytes); + e.answer_sent(bytes); + }) + } + pub fn stats_answer_rcvd(&self, send_ts: u64, recv_ts: u64, bytes: u64) { + self.operate_mut(|rti, e| { + rti.self_transfer_stats_accounting.add_down(bytes); + rti.self_latency_stats_accounting + .record_latency(recv_ts - send_ts); + e.answer_rcvd(send_ts, recv_ts, bytes); + }) + } + pub fn stats_question_lost(&self) { + self.operate_mut(|_rti, e| { + e.question_lost(); + }) + } + pub fn stats_failed_to_send(&self, ts: u64, expects_answer: bool) { + self.operate_mut(|_rti, e| { + e.failed_to_send(ts, expects_answer); }) } } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index f97219e8..ead3ea1d 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -390,8 +390,7 @@ impl RPCProcessor { Err(_) | Ok(TimeoutOr::Timeout) => { self.cancel_op_id_waiter(waitable_reply.op_id); - self.routing_table() - .stats_question_lost(waitable_reply.node_ref.clone()); + waitable_reply.node_ref.stats_question_lost(); } Ok(TimeoutOr::Value((rpcreader, _))) => { // Note that the remote node definitely received this node info since we got a reply @@ -399,8 +398,7 @@ impl RPCProcessor { // Reply received let recv_ts = intf::get_timestamp(); - self.routing_table().stats_answer_rcvd( - waitable_reply.node_ref, + waitable_reply.node_ref.stats_answer_rcvd( waitable_reply.send_ts, recv_ts, rpcreader.header.body_len, @@ -595,20 +593,19 @@ impl RPCProcessor { .map_err(|e| { // If we're returning an error, clean up self.cancel_op_id_waiter(op_id); - self.routing_table() - .stats_failed_to_send(node_ref.clone(), send_ts, true); - RPCError::network(e) })? - => { + node_ref + .stats_failed_to_send(send_ts, true); + RPCError::network(e) + })? => { // If we couldn't send we're still cleaning up self.cancel_op_id_waiter(op_id); - self.routing_table() - .stats_failed_to_send(node_ref, send_ts, true); + node_ref + .stats_failed_to_send(send_ts, true); } ); // Successfully sent - self.routing_table() - .stats_question_sent(node_ref.clone(), send_ts, bytes, true); + node_ref.stats_question_sent(send_ts, bytes, true); // Pass back waitable reply completion Ok(NetworkResult::value(WaitableReply { @@ -663,18 +660,18 @@ impl RPCProcessor { .await .map_err(|e| { // If we're returning an error, clean up - self.routing_table() - .stats_failed_to_send(node_ref.clone(), send_ts, true); - RPCError::network(e) })? => { + node_ref + .stats_failed_to_send(send_ts, true); + RPCError::network(e) + })? => { // If we couldn't send we're still cleaning up - self.routing_table() - .stats_failed_to_send(node_ref, send_ts, true); + node_ref + .stats_failed_to_send(send_ts, true); } ); // Successfully sent - self.routing_table() - .stats_question_sent(node_ref.clone(), send_ts, bytes, true); + node_ref.stats_question_sent(send_ts, bytes, true); Ok(NetworkResult::value(())) } @@ -755,17 +752,18 @@ impl RPCProcessor { .await .map_err(|e| { // If we're returning an error, clean up - self.routing_table() - .stats_failed_to_send(node_ref.clone(), send_ts, true); - RPCError::network(e) })? => { + node_ref + .stats_failed_to_send(send_ts, true); + RPCError::network(e) + })? => { // If we couldn't send we're still cleaning up - self.routing_table() - .stats_failed_to_send(node_ref.clone(), send_ts, false); + node_ref + .stats_failed_to_send(send_ts, false); } ); // Reply successfully sent - self.routing_table().stats_answer_sent(node_ref, bytes); + node_ref.stats_answer_sent(bytes); Ok(NetworkResult::value(())) } @@ -828,21 +826,13 @@ impl RPCProcessor { let kind = match msg.operation.kind() { RPCOperationKind::Question(_) => { if let Some(sender_nr) = msg.opt_sender_nr.clone() { - self.routing_table().stats_question_rcvd( - sender_nr, - msg.header.timestamp, - msg.header.body_len, - ); + sender_nr.stats_question_rcvd(msg.header.timestamp, msg.header.body_len); } "question" } RPCOperationKind::Statement(_) => { if let Some(sender_nr) = msg.opt_sender_nr.clone() { - self.routing_table().stats_question_rcvd( - sender_nr, - msg.header.timestamp, - msg.header.body_len, - ); + sender_nr.stats_question_rcvd(msg.header.timestamp, msg.header.body_len); } "statement" } diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index e3f2213b..9fff8034 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -61,19 +61,27 @@ impl RPCProcessor { // add node information for the requesting node to our routing table let routing_table = self.routing_table(); + let rt2 = routing_table.clone(); + let rt3 = routing_table.clone(); // find N nodes closest to the target node in our routing table - let own_peer_info = routing_table.get_own_peer_info(); + let own_peer_info = routing_table.get_own_peer_info(RoutingDomain::PublicInternet); let own_peer_info_is_valid = own_peer_info.signed_node_info.is_valid(); let closest_nodes = routing_table.find_closest_nodes( find_node_q.node_id, // filter Some(move |_k, v| { - RoutingTable::filter_has_valid_signed_node_info(v, own_peer_info_is_valid) + rt2.filter_has_valid_signed_node_info( + v, + own_peer_info_is_valid, + Some(RoutingDomain::PublicInternet), + ) }), // transform - move |k, v| RoutingTable::transform_to_peer_info(k, v, &own_peer_info), + move |k, v| { + rt3.transform_to_peer_info(RoutingDomain::PublicInternet, k, v, &own_peer_info) + }, ); // Make status answer diff --git a/veilid-core/src/rpc_processor/rpc_node_info_update.rs b/veilid-core/src/rpc_processor/rpc_node_info_update.rs index 73cc6fa4..15b23d0c 100644 --- a/veilid-core/src/rpc_processor/rpc_node_info_update.rs +++ b/veilid-core/src/rpc_processor/rpc_node_info_update.rs @@ -10,6 +10,7 @@ impl RPCProcessor { safety_route: Option<&SafetyRouteSpec>, ) -> Result, RPCError> { let signed_node_info = self.routing_table().get_own_signed_node_info(); + xxx add routing domain to capnp.... let node_info_update = RPCOperationNodeInfoUpdate { signed_node_info }; let statement = RPCStatement::new(RPCStatementDetail::NodeInfoUpdate(node_info_update)); diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index 9eb85c84..25c04bcf 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -38,9 +38,7 @@ impl RPCProcessor { }; // Update latest node status in routing table - peer.operate_mut(|e| { - e.update_node_status(status_a.node_status.clone()); - }); + peer.update_node_status(status_a.node_status.clone()); // Report sender_info IP addresses to network manager if let Some(socket_address) = status_a.sender_info.socket_address { @@ -90,9 +88,7 @@ impl RPCProcessor { // update node status for the requesting node to our routing table if let Some(sender_nr) = msg.opt_sender_nr.clone() { // Update latest node status in routing table for the statusq sender - sender_nr.operate_mut(|e| { - e.update_node_status(status_q.node_status.clone()); - }); + sender_nr.update_node_status(status_q.node_status.clone()); } // Make status answer diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 12c01697..d3059146 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -392,11 +392,14 @@ impl NetworkClass { #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct NodeStatus { + // PublicInternet RoutingDomain Status pub will_route: bool, pub will_tunnel: bool, pub will_signal: bool, pub will_relay: bool, pub will_validate_dial_info: bool, + // LocalNetwork RoutingDomain Status + // TODO } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -502,43 +505,6 @@ impl NodeInfo { } } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct LocalNodeInfo { - pub dial_info_list: Vec, -} - -impl LocalNodeInfo { - pub fn first_filtered_dial_info(&self, filter: F) -> Option - where - F: Fn(&DialInfo) -> bool, - { - for di in &self.dial_info_list { - if filter(di) { - return Some(di.clone()); - } - } - None - } - - pub fn all_filtered_dial_info(&self, filter: F) -> Vec - where - F: Fn(&DialInfo) -> bool, - { - let mut dial_info_list = Vec::new(); - - for di in &self.dial_info_list { - if filter(di) { - dial_info_list.push(di.clone()); - } - } - dial_info_list - } - - pub fn has_dial_info(&self) -> bool { - !self.dial_info_list.is_empty() - } -} - #[allow(clippy::derive_hash_xor_eq)] #[derive(Debug, PartialOrd, Ord, Hash, Serialize, Deserialize, EnumSetType)] // Keep member order appropriate for sorting < preference