Christien Rioux bfc42cdd8e clean up ui
pause routing table tasks when making routing domain changes
2023-09-06 13:20:36 -04:00

241 lines
8.4 KiB
Rust

pub mod bootstrap;
pub mod kick_buckets;
pub mod peer_minimum_refresh;
pub mod ping_validator;
pub mod private_route_management;
pub mod relay_management;
pub mod rolling_transfers;
use super::*;
impl RoutingTable {
pub(crate) fn setup_tasks(&self) {
// Set rolling transfers tick task
{
let this = self.clone();
self.unlocked_inner
.rolling_transfers_task
.set_routine(move |s, l, t| {
Box::pin(
this.clone()
.rolling_transfers_task_routine(s, Timestamp::new(l), Timestamp::new(t))
.instrument(trace_span!(
parent: None,
"RoutingTable rolling transfers task routine"
)),
)
});
}
// Set kick buckets tick task
{
let this = self.clone();
self.unlocked_inner
.kick_buckets_task
.set_routine(move |s, l, t| {
Box::pin(
this.clone()
.kick_buckets_task_routine(s, Timestamp::new(l), Timestamp::new(t))
.instrument(trace_span!(parent: None, "kick buckets task routine")),
)
});
}
// Set bootstrap tick task
{
let this = self.clone();
self.unlocked_inner
.bootstrap_task
.set_routine(move |s, _l, _t| {
Box::pin(
this.clone()
.bootstrap_task_routine(s)
.instrument(trace_span!(parent: None, "bootstrap task routine")),
)
});
}
// Set peer minimum refresh tick task
{
let this = self.clone();
self.unlocked_inner
.peer_minimum_refresh_task
.set_routine(move |s, _l, _t| {
Box::pin(
this.clone()
.peer_minimum_refresh_task_routine(s)
.instrument(trace_span!(
parent: None,
"peer minimum refresh task routine"
)),
)
});
}
// Set ping validator tick task
{
let this = self.clone();
self.unlocked_inner
.ping_validator_task
.set_routine(move |s, l, t| {
Box::pin(
this.clone()
.ping_validator_task_routine(s, Timestamp::new(l), Timestamp::new(t))
.instrument(trace_span!(parent: None, "ping validator task routine")),
)
});
}
// Set relay management tick task
{
let this = self.clone();
self.unlocked_inner
.relay_management_task
.set_routine(move |s, l, t| {
Box::pin(
this.clone()
.relay_management_task_routine(s, Timestamp::new(l), Timestamp::new(t))
.instrument(trace_span!(parent: None, "relay management task routine")),
)
});
}
// Set private route management tick task
{
let this = self.clone();
self.unlocked_inner
.private_route_management_task
.set_routine(move |s, l, t| {
Box::pin(
this.clone()
.private_route_management_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
)
.instrument(trace_span!(
parent: None,
"private route management task routine"
)),
)
});
}
}
/// Ticks about once per second
/// to run tick tasks which may run at slower tick rates as configured
pub async fn tick(&self) -> EyreResult<()> {
// Don't tick if paused
if self.inner.read().tick_paused {
return Ok(());
}
// Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs
self.unlocked_inner.rolling_transfers_task.tick().await?;
// Kick buckets task
let kick_bucket_queue_count = self.unlocked_inner.kick_queue.lock().len();
if kick_bucket_queue_count > 0 {
self.unlocked_inner.kick_buckets_task.tick().await?;
}
// Refresh entry counts
let entry_counts = {
let mut inner = self.inner.write();
inner.refresh_cached_entry_counts()
};
let min_peer_count = self.with_config(|c| c.network.dht.min_peer_count as usize);
// Figure out which tables need bootstrap or peer minimum refresh
let mut needs_bootstrap = false;
let mut needs_peer_minimum_refresh = false;
for ck in VALID_CRYPTO_KINDS {
let eckey = (RoutingDomain::PublicInternet, ck);
let cnt = entry_counts.get(&eckey).copied().unwrap_or_default();
if cnt == 0 {
needs_bootstrap = true;
} else if cnt < min_peer_count {
needs_peer_minimum_refresh = true;
}
}
if needs_bootstrap {
self.unlocked_inner.bootstrap_task.tick().await?;
}
if needs_peer_minimum_refresh {
self.unlocked_inner.peer_minimum_refresh_task.tick().await?;
}
// Ping validate some nodes to groom the table
self.unlocked_inner.ping_validator_task.tick().await?;
// Run the relay management task
self.unlocked_inner.relay_management_task.tick().await?;
// Run the private route management task
// If we don't know our network class then don't do this yet
if self.has_valid_network_class(RoutingDomain::PublicInternet) {
self.unlocked_inner
.private_route_management_task
.tick()
.await?;
}
Ok(())
}
pub(crate) async fn pause_tasks(&self, paused: bool) {
let cancel = {
let mut inner = self.inner.write();
if !inner.tick_paused && paused {
inner.tick_paused = true;
true
} else if inner.tick_paused && !paused {
inner.tick_paused = false;
false
} else {
false
}
};
if cancel {
self.cancel_tasks().await;
}
}
pub(crate) async fn cancel_tasks(&self) {
// Cancel all tasks being ticked
debug!("stopping rolling transfers task");
if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await {
error!("rolling_transfers_task not stopped: {}", e);
}
debug!("stopping kick buckets task");
if let Err(e) = self.unlocked_inner.kick_buckets_task.stop().await {
error!("kick_buckets_task not stopped: {}", e);
}
debug!("stopping bootstrap task");
if let Err(e) = self.unlocked_inner.bootstrap_task.stop().await {
error!("bootstrap_task not stopped: {}", e);
}
debug!("stopping peer minimum refresh task");
if let Err(e) = self.unlocked_inner.peer_minimum_refresh_task.stop().await {
error!("peer_minimum_refresh_task not stopped: {}", e);
}
debug!("stopping ping_validator task");
if let Err(e) = self.unlocked_inner.ping_validator_task.stop().await {
error!("ping_validator_task not stopped: {}", e);
}
debug!("stopping relay management task");
if let Err(e) = self.unlocked_inner.relay_management_task.stop().await {
warn!("relay_management_task not stopped: {}", e);
}
debug!("stopping private route management task");
if let Err(e) = self
.unlocked_inner
.private_route_management_task
.stop()
.await
{
warn!("private_route_management_task not stopped: {}", e);
}
}
}