add node_info_update calls
This commit is contained in:
parent
d7ba221b48
commit
444f65d76d
@ -464,6 +464,10 @@ impl Network {
|
|||||||
|
|
||||||
info!("network started");
|
info!("network started");
|
||||||
self.inner.lock().network_started = true;
|
self.inner.lock().network_started = true;
|
||||||
|
|
||||||
|
// Inform routing table entries that our dial info has changed
|
||||||
|
self.routing_table().send_node_info_updates();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -474,6 +474,9 @@ impl Network {
|
|||||||
|
|
||||||
log_net!(debug "network class set to {:?}", network_class);
|
log_net!(debug "network class set to {:?}", network_class);
|
||||||
|
|
||||||
|
// send updates to everyone
|
||||||
|
self.routing_table().send_node_info_updates();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -61,6 +61,7 @@ struct RoutingTableUnlockedInner {
|
|||||||
bootstrap_task: TickTask,
|
bootstrap_task: TickTask,
|
||||||
peer_minimum_refresh_task: TickTask,
|
peer_minimum_refresh_task: TickTask,
|
||||||
ping_validator_task: TickTask,
|
ping_validator_task: TickTask,
|
||||||
|
node_info_update_single_future: SingleFuture<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@ -92,6 +93,7 @@ impl RoutingTable {
|
|||||||
bootstrap_task: TickTask::new(1),
|
bootstrap_task: TickTask::new(1),
|
||||||
peer_minimum_refresh_task: TickTask::new_ms(c.network.dht.min_peer_refresh_time_ms),
|
peer_minimum_refresh_task: TickTask::new_ms(c.network.dht.min_peer_refresh_time_ms),
|
||||||
ping_validator_task: TickTask::new(1),
|
ping_validator_task: TickTask::new(1),
|
||||||
|
node_info_update_single_future: SingleFuture::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn new(network_manager: NetworkManager) -> Self {
|
pub fn new(network_manager: NetworkManager) -> Self {
|
||||||
@ -306,6 +308,16 @@ impl RoutingTable {
|
|||||||
.to_string(),
|
.to_string(),
|
||||||
);
|
);
|
||||||
debug!(" Class: {:?}", class);
|
debug!(" Class: {:?}", class);
|
||||||
|
|
||||||
|
// Public dial info changed, go through all nodes and reset their 'seen our node info' bit
|
||||||
|
if matches!(domain, RoutingDomain::PublicInternet) {
|
||||||
|
for bucket in &mut inner.buckets {
|
||||||
|
for entry in bucket.entries_mut() {
|
||||||
|
entry.1.set_seen_our_node_info(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -355,6 +367,86 @@ impl RoutingTable {
|
|||||||
*self.inner.lock() = Self::new_inner(self.network_manager());
|
*self.inner.lock() = Self::new_inner(self.network_manager());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Inform routing table entries that our dial info has changed
|
||||||
|
pub fn send_node_info_updates(&self) {
|
||||||
|
let this = self.clone();
|
||||||
|
// Run in background
|
||||||
|
intf::spawn(async move {
|
||||||
|
// Run in background only once
|
||||||
|
this.clone()
|
||||||
|
.unlocked_inner
|
||||||
|
.node_info_update_single_future
|
||||||
|
.single_spawn(async move {
|
||||||
|
|
||||||
|
// Only update if we actually have a valid network class
|
||||||
|
let netman = this.network_manager();
|
||||||
|
if matches!(
|
||||||
|
netman.get_network_class().unwrap_or(NetworkClass::Invalid),
|
||||||
|
NetworkClass::Invalid
|
||||||
|
) {
|
||||||
|
trace!("not sending node info update because our network class is not yet valid");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the list of refs to all nodes to update
|
||||||
|
let node_refs = {
|
||||||
|
let mut inner = this.inner.lock();
|
||||||
|
let mut node_refs = Vec::<NodeRef>::with_capacity(inner.bucket_entry_count);
|
||||||
|
let cur_ts = intf::get_timestamp();
|
||||||
|
for bucket in &mut inner.buckets {
|
||||||
|
for entry in bucket.entries_mut() {
|
||||||
|
match entry.1.state(cur_ts) {
|
||||||
|
BucketEntryState::Reliable | BucketEntryState::Unreliable => {
|
||||||
|
// Only update nodes that haven't seen our node info yet
|
||||||
|
if !entry.1.has_seen_our_node_info() {
|
||||||
|
node_refs.push(NodeRef::new(
|
||||||
|
this.clone(),
|
||||||
|
*entry.0,
|
||||||
|
entry.1,
|
||||||
|
None,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
BucketEntryState::Dead => {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
node_refs
|
||||||
|
};
|
||||||
|
|
||||||
|
// Send the updates
|
||||||
|
log_rtab!("Sending node info updates to {} nodes", node_refs.len());
|
||||||
|
let mut unord = FuturesUnordered::new();
|
||||||
|
for nr in node_refs {
|
||||||
|
let rpc = this.rpc_processor();
|
||||||
|
unord.push(async move {
|
||||||
|
// Update the node
|
||||||
|
if let Err(e) = rpc
|
||||||
|
.rpc_call_node_info_update(Destination::Direct(nr.clone()), None)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
// Not fatal, but we should be able to see if this is happening
|
||||||
|
trace!("failed to send node info update to {:?}: {}", nr, e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark the node as updated
|
||||||
|
nr.set_seen_our_node_info();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for futures to complete
|
||||||
|
while unord.next().await.is_some() {}
|
||||||
|
|
||||||
|
log_rtab!("Finished sending node updates");
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
})
|
||||||
|
.detach()
|
||||||
|
}
|
||||||
|
|
||||||
// Attempt to empty the routing table
|
// Attempt to empty the routing table
|
||||||
// should only be performed when there are no node_refs (detached)
|
// should only be performed when there are no node_refs (detached)
|
||||||
pub fn purge(&self) {
|
pub fn purge(&self) {
|
||||||
|
Loading…
Reference in New Issue
Block a user