From 444f65d76dc616407bfe4f9435ebcf594adc49f4 Mon Sep 17 00:00:00 2001 From: John Smith Date: Wed, 11 May 2022 12:20:33 -0400 Subject: [PATCH] add node_info_update calls --- veilid-core/src/intf/native/network/mod.rs | 4 + .../native/network/network_class_discovery.rs | 3 + veilid-core/src/routing_table/mod.rs | 92 +++++++++++++++++++ 3 files changed, 99 insertions(+) diff --git a/veilid-core/src/intf/native/network/mod.rs b/veilid-core/src/intf/native/network/mod.rs index 4ac4845c..d814dd3e 100644 --- a/veilid-core/src/intf/native/network/mod.rs +++ b/veilid-core/src/intf/native/network/mod.rs @@ -464,6 +464,10 @@ impl Network { info!("network started"); self.inner.lock().network_started = true; + + // Inform routing table entries that our dial info has changed + self.routing_table().send_node_info_updates(); + Ok(()) } diff --git a/veilid-core/src/intf/native/network/network_class_discovery.rs b/veilid-core/src/intf/native/network/network_class_discovery.rs index 4a68eb6e..117095aa 100644 --- a/veilid-core/src/intf/native/network/network_class_discovery.rs +++ b/veilid-core/src/intf/native/network/network_class_discovery.rs @@ -474,6 +474,9 @@ impl Network { log_net!(debug "network class set to {:?}", network_class); + // send updates to everyone + self.routing_table().send_node_info_updates(); + Ok(()) } } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 701d294d..557bf658 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -61,6 +61,7 @@ struct RoutingTableUnlockedInner { bootstrap_task: TickTask, peer_minimum_refresh_task: TickTask, ping_validator_task: TickTask, + node_info_update_single_future: SingleFuture<()>, } #[derive(Clone)] @@ -92,6 +93,7 @@ impl RoutingTable { bootstrap_task: TickTask::new(1), peer_minimum_refresh_task: TickTask::new_ms(c.network.dht.min_peer_refresh_time_ms), ping_validator_task: TickTask::new(1), + node_info_update_single_future: SingleFuture::new(), } } pub fn new(network_manager: NetworkManager) -> Self { @@ -306,6 +308,16 @@ impl RoutingTable { .to_string(), ); debug!(" Class: {:?}", class); + + // Public dial info changed, go through all nodes and reset their 'seen our node info' bit + if matches!(domain, RoutingDomain::PublicInternet) { + for bucket in &mut inner.buckets { + for entry in bucket.entries_mut() { + entry.1.set_seen_our_node_info(false); + } + } + } + Ok(()) } @@ -355,6 +367,86 @@ impl RoutingTable { *self.inner.lock() = Self::new_inner(self.network_manager()); } + // Inform routing table entries that our dial info has changed + pub fn send_node_info_updates(&self) { + let this = self.clone(); + // Run in background + intf::spawn(async move { + // Run in background only once + this.clone() + .unlocked_inner + .node_info_update_single_future + .single_spawn(async move { + + // Only update if we actually have a valid network class + let netman = this.network_manager(); + if matches!( + netman.get_network_class().unwrap_or(NetworkClass::Invalid), + NetworkClass::Invalid + ) { + trace!("not sending node info update because our network class is not yet valid"); + return; + } + + // Get the list of refs to all nodes to update + let node_refs = { + let mut inner = this.inner.lock(); + let mut node_refs = Vec::::with_capacity(inner.bucket_entry_count); + let cur_ts = intf::get_timestamp(); + for bucket in &mut inner.buckets { + for entry in bucket.entries_mut() { + match entry.1.state(cur_ts) { + BucketEntryState::Reliable | BucketEntryState::Unreliable => { + // Only update nodes that haven't seen our node info yet + if !entry.1.has_seen_our_node_info() { + node_refs.push(NodeRef::new( + this.clone(), + *entry.0, + entry.1, + None, + )); + } + } + BucketEntryState::Dead => { + // do nothing + } + } + } + } + node_refs + }; + + // Send the updates + log_rtab!("Sending node info updates to {} nodes", node_refs.len()); + let mut unord = FuturesUnordered::new(); + for nr in node_refs { + let rpc = this.rpc_processor(); + unord.push(async move { + // Update the node + if let Err(e) = rpc + .rpc_call_node_info_update(Destination::Direct(nr.clone()), None) + .await + { + // Not fatal, but we should be able to see if this is happening + trace!("failed to send node info update to {:?}: {}", nr, e); + return; + } + + // Mark the node as updated + nr.set_seen_our_node_info(); + }); + } + + // Wait for futures to complete + while unord.next().await.is_some() {} + + log_rtab!("Finished sending node updates"); + }) + .await + }) + .detach() + } + // Attempt to empty the routing table // should only be performed when there are no node_refs (detached) pub fn purge(&self) {