refactor
This commit is contained in:
		| @@ -23,7 +23,7 @@ pub use network_connection::*; | ||||
| use connection_handle::*; | ||||
| use connection_limits::*; | ||||
| use crypto::*; | ||||
| use futures_util::stream::{FuturesOrdered, FuturesUnordered, StreamExt}; | ||||
| use futures_util::stream::{FuturesUnordered, StreamExt}; | ||||
| use hashlink::LruCache; | ||||
| use intf::*; | ||||
| #[cfg(not(target_arch = "wasm32"))] | ||||
| @@ -37,8 +37,6 @@ use xx::*; | ||||
|  | ||||
| //////////////////////////////////////////////////////////////////////////////////////// | ||||
|  | ||||
| pub const RELAY_MANAGEMENT_INTERVAL_SECS: u32 = 1; | ||||
| pub const PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS: u32 = 1; | ||||
| pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE; | ||||
| pub const IPADDR_TABLE_SIZE: usize = 1024; | ||||
| pub const IPADDR_MAX_INACTIVE_DURATION_US: u64 = 300_000_000u64; // 5 minutes | ||||
| @@ -48,15 +46,6 @@ pub const PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS: u32 = 60; | ||||
| pub const PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US: u64 = 300_000_000u64; // 5 minutes | ||||
| pub const PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US: u64 = 3600_000_000u64; // 60 minutes | ||||
| pub const BOOT_MAGIC: &[u8; 4] = b"BOOT"; | ||||
| pub const BOOTSTRAP_TXT_VERSION: u8 = 0; | ||||
|  | ||||
| #[derive(Clone, Debug)] | ||||
| pub struct BootstrapRecord { | ||||
|     min_version: u8, | ||||
|     max_version: u8, | ||||
|     dial_info_details: Vec<DialInfoDetail>, | ||||
| } | ||||
| pub type BootstrapRecordMap = BTreeMap<DHTKey, BootstrapRecord>; | ||||
|  | ||||
| #[derive(Copy, Clone, Debug, Default)] | ||||
| pub struct ProtocolConfig { | ||||
| @@ -166,11 +155,6 @@ struct NetworkManagerUnlockedInner { | ||||
|     update_callback: RwLock<Option<UpdateCallback>>, | ||||
|     // Background processes | ||||
|     rolling_transfers_task: TickTask<EyreReport>, | ||||
|     relay_management_task: TickTask<EyreReport>, | ||||
|     private_route_management_task: TickTask<EyreReport>, | ||||
|     bootstrap_task: TickTask<EyreReport>, | ||||
|     peer_minimum_refresh_task: TickTask<EyreReport>, | ||||
|     ping_validator_task: TickTask<EyreReport>, | ||||
|     public_address_check_task: TickTask<EyreReport>, | ||||
|     node_info_update_single_future: MustJoinSingleFuture<()>, | ||||
| } | ||||
| @@ -197,7 +181,6 @@ impl NetworkManager { | ||||
|         block_store: BlockStore, | ||||
|         crypto: Crypto, | ||||
|     ) -> NetworkManagerUnlockedInner { | ||||
|         let min_peer_refresh_time_ms = config.get().network.dht.min_peer_refresh_time_ms; | ||||
|         NetworkManagerUnlockedInner { | ||||
|             config, | ||||
|             protected_store, | ||||
| @@ -208,11 +191,6 @@ impl NetworkManager { | ||||
|             components: RwLock::new(None), | ||||
|             update_callback: RwLock::new(None), | ||||
|             rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS), | ||||
|             relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS), | ||||
|             private_route_management_task: TickTask::new(PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS), | ||||
|             bootstrap_task: TickTask::new(1), | ||||
|             peer_minimum_refresh_task: TickTask::new_ms(min_peer_refresh_time_ms), | ||||
|             ping_validator_task: TickTask::new(1), | ||||
|             public_address_check_task: TickTask::new(PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS), | ||||
|             node_info_update_single_future: MustJoinSingleFuture::new(), | ||||
|         } | ||||
| @@ -235,116 +213,9 @@ impl NetworkManager { | ||||
|                 crypto, | ||||
|             )), | ||||
|         }; | ||||
|         // Set rolling transfers tick task | ||||
|         { | ||||
|             let this2 = this.clone(); | ||||
|             this.unlocked_inner | ||||
|                 .rolling_transfers_task | ||||
|                 .set_routine(move |s, l, t| { | ||||
|                     Box::pin( | ||||
|                         this2 | ||||
|                             .clone() | ||||
|                             .rolling_transfers_task_routine(s, l, t) | ||||
|                             .instrument(trace_span!( | ||||
|                                 parent: None, | ||||
|                                 "NetworkManager rolling transfers task routine" | ||||
|                             )), | ||||
|                     ) | ||||
|                 }); | ||||
|         } | ||||
|         // Set relay management tick task | ||||
|         { | ||||
|             let this2 = this.clone(); | ||||
|             this.unlocked_inner | ||||
|                 .relay_management_task | ||||
|                 .set_routine(move |s, l, t| { | ||||
|                     Box::pin( | ||||
|                         this2 | ||||
|                             .clone() | ||||
|                             .relay_management_task_routine(s, l, t) | ||||
|                             .instrument(trace_span!(parent: None, "relay management task routine")), | ||||
|                     ) | ||||
|                 }); | ||||
|         } | ||||
|         // Set private route management tick task | ||||
|         { | ||||
|             let this2 = this.clone(); | ||||
|             this.unlocked_inner | ||||
|                 .private_route_management_task | ||||
|                 .set_routine(move |s, l, t| { | ||||
|                     Box::pin( | ||||
|                         this2 | ||||
|                             .clone() | ||||
|                             .private_route_management_task_routine(s, l, t) | ||||
|                             .instrument(trace_span!( | ||||
|                                 parent: None, | ||||
|                                 "private route management task routine" | ||||
|                             )), | ||||
|                     ) | ||||
|                 }); | ||||
|         } | ||||
|         // Set bootstrap tick task | ||||
|         { | ||||
|             let this2 = this.clone(); | ||||
|             this.unlocked_inner | ||||
|                 .bootstrap_task | ||||
|                 .set_routine(move |s, _l, _t| { | ||||
|                     Box::pin( | ||||
|                         this2 | ||||
|                             .clone() | ||||
|                             .bootstrap_task_routine(s) | ||||
|                             .instrument(trace_span!(parent: None, "bootstrap task routine")), | ||||
|                     ) | ||||
|                 }); | ||||
|         } | ||||
|         // Set peer minimum refresh tick task | ||||
|         { | ||||
|             let this2 = this.clone(); | ||||
|             this.unlocked_inner | ||||
|                 .peer_minimum_refresh_task | ||||
|                 .set_routine(move |s, _l, _t| { | ||||
|                     Box::pin( | ||||
|                         this2 | ||||
|                             .clone() | ||||
|                             .peer_minimum_refresh_task_routine(s) | ||||
|                             .instrument(trace_span!( | ||||
|                                 parent: None, | ||||
|                                 "peer minimum refresh task routine" | ||||
|                             )), | ||||
|                     ) | ||||
|                 }); | ||||
|         } | ||||
|         // Set ping validator tick task | ||||
|         { | ||||
|             let this2 = this.clone(); | ||||
|             this.unlocked_inner | ||||
|                 .ping_validator_task | ||||
|                 .set_routine(move |s, l, t| { | ||||
|                     Box::pin( | ||||
|                         this2 | ||||
|                             .clone() | ||||
|                             .ping_validator_task_routine(s, l, t) | ||||
|                             .instrument(trace_span!(parent: None, "ping validator task routine")), | ||||
|                     ) | ||||
|                 }); | ||||
|         } | ||||
|         // Set public address check task | ||||
|         { | ||||
|             let this2 = this.clone(); | ||||
|             this.unlocked_inner | ||||
|                 .public_address_check_task | ||||
|                 .set_routine(move |s, l, t| { | ||||
|                     Box::pin( | ||||
|                         this2 | ||||
|                             .clone() | ||||
|                             .public_address_check_task_routine(s, l, t) | ||||
|                             .instrument(trace_span!( | ||||
|                                 parent: None, | ||||
|                                 "public address check task routine" | ||||
|                             )), | ||||
|                     ) | ||||
|                 }); | ||||
|         } | ||||
|  | ||||
|         this.start_tasks(); | ||||
|  | ||||
|         this | ||||
|     } | ||||
|     pub fn config(&self) -> VeilidConfig { | ||||
| @@ -492,36 +363,7 @@ impl NetworkManager { | ||||
|         debug!("starting network manager shutdown"); | ||||
|  | ||||
|         // Cancel all tasks | ||||
|         debug!("stopping rolling transfers task"); | ||||
|         if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await { | ||||
|             warn!("rolling_transfers_task not stopped: {}", e); | ||||
|         } | ||||
|         debug!("stopping relay management task"); | ||||
|         if let Err(e) = self.unlocked_inner.relay_management_task.stop().await { | ||||
|             warn!("relay_management_task not stopped: {}", e); | ||||
|         } | ||||
|         debug!("stopping bootstrap task"); | ||||
|         if let Err(e) = self.unlocked_inner.bootstrap_task.stop().await { | ||||
|             error!("bootstrap_task not stopped: {}", e); | ||||
|         } | ||||
|         debug!("stopping peer minimum refresh task"); | ||||
|         if let Err(e) = self.unlocked_inner.peer_minimum_refresh_task.stop().await { | ||||
|             error!("peer_minimum_refresh_task not stopped: {}", e); | ||||
|         } | ||||
|         debug!("stopping ping_validator task"); | ||||
|         if let Err(e) = self.unlocked_inner.ping_validator_task.stop().await { | ||||
|             error!("ping_validator_task not stopped: {}", e); | ||||
|         } | ||||
|         debug!("stopping node info update singlefuture"); | ||||
|         if self | ||||
|             .unlocked_inner | ||||
|             .node_info_update_single_future | ||||
|             .join() | ||||
|             .await | ||||
|             .is_err() | ||||
|         { | ||||
|             error!("node_info_update_single_future not stopped"); | ||||
|         } | ||||
|         self.stop_tasks().await; | ||||
|  | ||||
|         // Shutdown network components if they started up | ||||
|         debug!("shutting down network components"); | ||||
| @@ -597,53 +439,6 @@ impl NetworkManager { | ||||
|         net.needs_restart() | ||||
|     } | ||||
|  | ||||
|     pub async fn tick(&self) -> EyreResult<()> { | ||||
|         let routing_table = self.routing_table(); | ||||
|         let net = self.net(); | ||||
|         let receipt_manager = self.receipt_manager(); | ||||
|  | ||||
|         // Run the rolling transfers task | ||||
|         self.unlocked_inner.rolling_transfers_task.tick().await?; | ||||
|  | ||||
|         // Run the relay management task | ||||
|         self.unlocked_inner.relay_management_task.tick().await?; | ||||
|  | ||||
|         // See how many live PublicInternet entries we have | ||||
|         let live_public_internet_entry_count = routing_table.get_entry_count( | ||||
|             RoutingDomain::PublicInternet.into(), | ||||
|             BucketEntryState::Unreliable, | ||||
|         ); | ||||
|         let min_peer_count = self.with_config(|c| c.network.dht.min_peer_count as usize); | ||||
|  | ||||
|         // If none, then add the bootstrap nodes to it | ||||
|         if live_public_internet_entry_count == 0 { | ||||
|             self.unlocked_inner.bootstrap_task.tick().await?; | ||||
|         } | ||||
|         // If we still don't have enough peers, find nodes until we do | ||||
|         else if !self.unlocked_inner.bootstrap_task.is_running() | ||||
|             && live_public_internet_entry_count < min_peer_count | ||||
|         { | ||||
|             self.unlocked_inner.peer_minimum_refresh_task.tick().await?; | ||||
|         } | ||||
|  | ||||
|         // Ping validate some nodes to groom the table | ||||
|         self.unlocked_inner.ping_validator_task.tick().await?; | ||||
|  | ||||
|         // Run the routing table tick | ||||
|         routing_table.tick().await?; | ||||
|  | ||||
|         // Run the low level network tick | ||||
|         net.tick().await?; | ||||
|  | ||||
|         // Run the receipt manager tick | ||||
|         receipt_manager.tick().await?; | ||||
|  | ||||
|         // Purge the client whitelist | ||||
|         self.purge_client_whitelist(); | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     /// Get our node's capabilities in the PublicInternet routing domain | ||||
|     fn generate_public_internet_node_status(&self) -> PublicInternetNodeStatus { | ||||
|         let own_peer_info = self | ||||
|   | ||||
| @@ -1,699 +0,0 @@ | ||||
| use super::*; | ||||
|  | ||||
| use crate::crypto::*; | ||||
| use crate::xx::*; | ||||
| use futures_util::FutureExt; | ||||
| use stop_token::future::FutureExt as StopFutureExt; | ||||
|  | ||||
| impl NetworkManager { | ||||
|     // Bootstrap lookup process | ||||
|     #[instrument(level = "trace", skip(self), ret, err)] | ||||
|     pub(super) async fn resolve_bootstrap( | ||||
|         &self, | ||||
|         bootstrap: Vec<String>, | ||||
|     ) -> EyreResult<BootstrapRecordMap> { | ||||
|         // Resolve from bootstrap root to bootstrap hostnames | ||||
|         let mut bsnames = Vec::<String>::new(); | ||||
|         for bh in bootstrap { | ||||
|             // Get TXT record for bootstrap (bootstrap.veilid.net, or similar) | ||||
|             let records = intf::txt_lookup(&bh).await?; | ||||
|             for record in records { | ||||
|                 // Split the bootstrap name record by commas | ||||
|                 for rec in record.split(',') { | ||||
|                     let rec = rec.trim(); | ||||
|                     // If the name specified is fully qualified, go with it | ||||
|                     let bsname = if rec.ends_with('.') { | ||||
|                         rec.to_string() | ||||
|                     } | ||||
|                     // If the name is not fully qualified, prepend it to the bootstrap name | ||||
|                     else { | ||||
|                         format!("{}.{}", rec, bh) | ||||
|                     }; | ||||
|  | ||||
|                     // Add to the list of bootstrap name to look up | ||||
|                     bsnames.push(bsname); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // Get bootstrap nodes from hostnames concurrently | ||||
|         let mut unord = FuturesUnordered::new(); | ||||
|         for bsname in bsnames { | ||||
|             unord.push( | ||||
|                 async move { | ||||
|                     // look up boostrap node txt records | ||||
|                     let bsnirecords = match intf::txt_lookup(&bsname).await { | ||||
|                         Err(e) => { | ||||
|                             warn!("bootstrap node txt lookup failed for {}: {}", bsname, e); | ||||
|                             return None; | ||||
|                         } | ||||
|                         Ok(v) => v, | ||||
|                     }; | ||||
|                     // for each record resolve into key/bootstraprecord pairs | ||||
|                     let mut bootstrap_records: Vec<(DHTKey, BootstrapRecord)> = Vec::new(); | ||||
|                     for bsnirecord in bsnirecords { | ||||
|                         // Bootstrap TXT Record Format Version 0: | ||||
|                         // txt_version,min_version,max_version,nodeid,hostname,dialinfoshort* | ||||
|                         // | ||||
|                         // Split bootstrap node record by commas. Example: | ||||
|                         // 0,0,0,7lxDEabK_qgjbe38RtBa3IZLrud84P6NhGP-pRTZzdQ,bootstrap-1.dev.veilid.net,T5150,U5150,W5150/ws | ||||
|                         let records: Vec<String> = bsnirecord | ||||
|                             .trim() | ||||
|                             .split(',') | ||||
|                             .map(|x| x.trim().to_owned()) | ||||
|                             .collect(); | ||||
|                         if records.len() < 6 { | ||||
|                             warn!("invalid number of fields in bootstrap txt record"); | ||||
|                             continue; | ||||
|                         } | ||||
|  | ||||
|                         // Bootstrap TXT record version | ||||
|                         let txt_version: u8 = match records[0].parse::<u8>() { | ||||
|                             Ok(v) => v, | ||||
|                             Err(e) => { | ||||
|                                 warn!( | ||||
|                                 "invalid txt_version specified in bootstrap node txt record: {}", | ||||
|                                 e | ||||
|                             ); | ||||
|                                 continue; | ||||
|                             } | ||||
|                         }; | ||||
|                         if txt_version != BOOTSTRAP_TXT_VERSION { | ||||
|                             warn!("unsupported bootstrap txt record version"); | ||||
|                             continue; | ||||
|                         } | ||||
|  | ||||
|                         // Min/Max wire protocol version | ||||
|                         let min_version: u8 = match records[1].parse::<u8>() { | ||||
|                             Ok(v) => v, | ||||
|                             Err(e) => { | ||||
|                                 warn!( | ||||
|                                 "invalid min_version specified in bootstrap node txt record: {}", | ||||
|                                 e | ||||
|                             ); | ||||
|                                 continue; | ||||
|                             } | ||||
|                         }; | ||||
|                         let max_version: u8 = match records[2].parse::<u8>() { | ||||
|                             Ok(v) => v, | ||||
|                             Err(e) => { | ||||
|                                 warn!( | ||||
|                                 "invalid max_version specified in bootstrap node txt record: {}", | ||||
|                                 e | ||||
|                             ); | ||||
|                                 continue; | ||||
|                             } | ||||
|                         }; | ||||
|  | ||||
|                         // Node Id | ||||
|                         let node_id_str = &records[3]; | ||||
|                         let node_id_key = match DHTKey::try_decode(node_id_str) { | ||||
|                             Ok(v) => v, | ||||
|                             Err(e) => { | ||||
|                                 warn!( | ||||
|                                     "Invalid node id in bootstrap node record {}: {}", | ||||
|                                     node_id_str, e | ||||
|                                 ); | ||||
|                                 continue; | ||||
|                             } | ||||
|                         }; | ||||
|  | ||||
|                         // Hostname | ||||
|                         let hostname_str = &records[4]; | ||||
|  | ||||
|                         // If this is our own node id, then we skip it for bootstrap, in case we are a bootstrap node | ||||
|                         if self.routing_table().node_id() == node_id_key { | ||||
|                             continue; | ||||
|                         } | ||||
|  | ||||
|                         // Resolve each record and store in node dial infos list | ||||
|                         let mut bootstrap_record = BootstrapRecord { | ||||
|                             min_version, | ||||
|                             max_version, | ||||
|                             dial_info_details: Vec::new(), | ||||
|                         }; | ||||
|                         for rec in &records[5..] { | ||||
|                             let rec = rec.trim(); | ||||
|                             let dial_infos = match DialInfo::try_vec_from_short(rec, hostname_str) { | ||||
|                                 Ok(dis) => dis, | ||||
|                                 Err(e) => { | ||||
|                                     warn!( | ||||
|                                         "Couldn't resolve bootstrap node dial info {}: {}", | ||||
|                                         rec, e | ||||
|                                     ); | ||||
|                                     continue; | ||||
|                                 } | ||||
|                             }; | ||||
|  | ||||
|                             for di in dial_infos { | ||||
|                                 bootstrap_record.dial_info_details.push(DialInfoDetail { | ||||
|                                     dial_info: di, | ||||
|                                     class: DialInfoClass::Direct, | ||||
|                                 }); | ||||
|                             } | ||||
|                         } | ||||
|                         bootstrap_records.push((node_id_key, bootstrap_record)); | ||||
|                     } | ||||
|                     Some(bootstrap_records) | ||||
|                 } | ||||
|                 .instrument(Span::current()), | ||||
|             ); | ||||
|         } | ||||
|  | ||||
|         let mut bsmap = BootstrapRecordMap::new(); | ||||
|         while let Some(bootstrap_records) = unord.next().await { | ||||
|             if let Some(bootstrap_records) = bootstrap_records { | ||||
|                 for (bskey, mut bsrec) in bootstrap_records { | ||||
|                     let rec = bsmap.entry(bskey).or_insert_with(|| BootstrapRecord { | ||||
|                         min_version: bsrec.min_version, | ||||
|                         max_version: bsrec.max_version, | ||||
|                         dial_info_details: Vec::new(), | ||||
|                     }); | ||||
|                     rec.dial_info_details.append(&mut bsrec.dial_info_details); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         Ok(bsmap) | ||||
|     } | ||||
|  | ||||
|     // 'direct' bootstrap task routine for systems incapable of resolving TXT records, such as browser WASM | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     pub(super) async fn direct_bootstrap_task_routine( | ||||
|         self, | ||||
|         stop_token: StopToken, | ||||
|         bootstrap_dialinfos: Vec<DialInfo>, | ||||
|     ) -> EyreResult<()> { | ||||
|         let mut unord = FuturesUnordered::new(); | ||||
|         let routing_table = self.routing_table(); | ||||
|  | ||||
|         for bootstrap_di in bootstrap_dialinfos { | ||||
|             log_net!(debug "direct bootstrap with: {}", bootstrap_di); | ||||
|  | ||||
|             let peer_info = self.boot_request(bootstrap_di).await?; | ||||
|  | ||||
|             log_net!(debug "  direct bootstrap peerinfo: {:?}", peer_info); | ||||
|  | ||||
|             // Got peer info, let's add it to the routing table | ||||
|             for pi in peer_info { | ||||
|                 let k = pi.node_id.key; | ||||
|                 // Register the node | ||||
|                 if let Some(nr) = routing_table.register_node_with_signed_node_info( | ||||
|                     RoutingDomain::PublicInternet, | ||||
|                     k, | ||||
|                     pi.signed_node_info, | ||||
|                     false, | ||||
|                 ) { | ||||
|                     // Add this our futures to process in parallel | ||||
|                     let routing_table = routing_table.clone(); | ||||
|                     unord.push( | ||||
|                         // lets ask bootstrap to find ourselves now | ||||
|                         async move { routing_table.reverse_find_node(nr, true).await } | ||||
|                             .instrument(Span::current()), | ||||
|                     ); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // Wait for all bootstrap operations to complete before we complete the singlefuture | ||||
|         while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     pub(super) async fn bootstrap_task_routine(self, stop_token: StopToken) -> EyreResult<()> { | ||||
|         let (bootstrap, bootstrap_nodes) = { | ||||
|             let c = self.unlocked_inner.config.get(); | ||||
|             ( | ||||
|                 c.network.bootstrap.clone(), | ||||
|                 c.network.bootstrap_nodes.clone(), | ||||
|             ) | ||||
|         }; | ||||
|         let routing_table = self.routing_table(); | ||||
|  | ||||
|         log_net!(debug "--- bootstrap_task"); | ||||
|  | ||||
|         // See if we are specifying a direct dialinfo for bootstrap, if so use the direct mechanism | ||||
|         if !bootstrap.is_empty() && bootstrap_nodes.is_empty() { | ||||
|             let mut bootstrap_dialinfos = Vec::<DialInfo>::new(); | ||||
|             for b in &bootstrap { | ||||
|                 if let Ok(bootstrap_di_vec) = DialInfo::try_vec_from_url(&b) { | ||||
|                     for bootstrap_di in bootstrap_di_vec { | ||||
|                         bootstrap_dialinfos.push(bootstrap_di); | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             if bootstrap_dialinfos.len() > 0 { | ||||
|                 return self | ||||
|                     .direct_bootstrap_task_routine(stop_token, bootstrap_dialinfos) | ||||
|                     .await; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // If we aren't specifying a bootstrap node list explicitly, then pull from the bootstrap server(s) | ||||
|         let bsmap: BootstrapRecordMap = if !bootstrap_nodes.is_empty() { | ||||
|             let mut bsmap = BootstrapRecordMap::new(); | ||||
|             let mut bootstrap_node_dial_infos = Vec::new(); | ||||
|             for b in bootstrap_nodes { | ||||
|                 let (id_str, di_str) = b | ||||
|                     .split_once('@') | ||||
|                     .ok_or_else(|| eyre!("Invalid node dial info in bootstrap entry"))?; | ||||
|                 let node_id = | ||||
|                     NodeId::from_str(id_str).wrap_err("Invalid node id in bootstrap entry")?; | ||||
|                 let dial_info = | ||||
|                     DialInfo::from_str(di_str).wrap_err("Invalid dial info in bootstrap entry")?; | ||||
|                 bootstrap_node_dial_infos.push((node_id, dial_info)); | ||||
|             } | ||||
|             for (node_id, dial_info) in bootstrap_node_dial_infos { | ||||
|                 bsmap | ||||
|                     .entry(node_id.key) | ||||
|                     .or_insert_with(|| BootstrapRecord { | ||||
|                         min_version: MIN_CRYPTO_VERSION, | ||||
|                         max_version: MAX_CRYPTO_VERSION, | ||||
|                         dial_info_details: Vec::new(), | ||||
|                     }) | ||||
|                     .dial_info_details | ||||
|                     .push(DialInfoDetail { | ||||
|                         dial_info, | ||||
|                         class: DialInfoClass::Direct, // Bootstraps are always directly reachable | ||||
|                     }); | ||||
|             } | ||||
|             bsmap | ||||
|         } else { | ||||
|             // Resolve bootstrap servers and recurse their TXT entries | ||||
|             self.resolve_bootstrap(bootstrap).await? | ||||
|         }; | ||||
|  | ||||
|         // Map all bootstrap entries to a single key with multiple dialinfo | ||||
|  | ||||
|         // Run all bootstrap operations concurrently | ||||
|         let mut unord = FuturesUnordered::new(); | ||||
|         for (k, mut v) in bsmap { | ||||
|             // Sort dial info so we get the preferred order correct | ||||
|             v.dial_info_details.sort(); | ||||
|  | ||||
|             log_net!("--- bootstrapping {} with {:?}", k.encode(), &v); | ||||
|  | ||||
|             // Make invalid signed node info (no signature) | ||||
|             if let Some(nr) = routing_table.register_node_with_signed_node_info( | ||||
|                 RoutingDomain::PublicInternet, | ||||
|                 k, | ||||
|                 SignedNodeInfo::Direct(SignedDirectNodeInfo::with_no_signature(NodeInfo { | ||||
|                     network_class: NetworkClass::InboundCapable, // Bootstraps are always inbound capable | ||||
|                     outbound_protocols: ProtocolTypeSet::only(ProtocolType::UDP), // Bootstraps do not participate in relaying and will not make outbound requests, but will have UDP enabled | ||||
|                     address_types: AddressTypeSet::all(), // Bootstraps are always IPV4 and IPV6 capable | ||||
|                     min_version: v.min_version, // Minimum crypto version specified in txt record | ||||
|                     max_version: v.max_version, // Maximum crypto version specified in txt record | ||||
|                     dial_info_detail_list: v.dial_info_details, // Dial info is as specified in the bootstrap list | ||||
|                 })), | ||||
|                 true, | ||||
|             ) { | ||||
|                 // Add this our futures to process in parallel | ||||
|                 let routing_table = routing_table.clone(); | ||||
|                 unord.push( | ||||
|                     async move { | ||||
|                         // Need VALID signed peer info, so ask bootstrap to find_node of itself | ||||
|                         // which will ensure it has the bootstrap's signed peer info as part of the response | ||||
|                         let _ = routing_table.find_target(nr.clone()).await; | ||||
|  | ||||
|                         // Ensure we got the signed peer info | ||||
|                         if !nr.signed_node_info_has_valid_signature(RoutingDomain::PublicInternet) { | ||||
|                             log_net!(warn | ||||
|                                 "bootstrap at {:?} did not return valid signed node info", | ||||
|                                 nr | ||||
|                             ); | ||||
|                             // If this node info is invalid, it will time out after being unpingable | ||||
|                         } else { | ||||
|                             // otherwise this bootstrap is valid, lets ask it to find ourselves now | ||||
|                             routing_table.reverse_find_node(nr, true).await | ||||
|                         } | ||||
|                     } | ||||
|                     .instrument(Span::current()), | ||||
|                 ); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // Wait for all bootstrap operations to complete before we complete the singlefuture | ||||
|         while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     // Ping each node in the routing table if they need to be pinged | ||||
|     // to determine their reliability | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     fn ping_validator_public_internet( | ||||
|         &self, | ||||
|         cur_ts: u64, | ||||
|         unord: &mut FuturesUnordered< | ||||
|             SendPinBoxFuture<Result<NetworkResult<Answer<Option<SenderInfo>>>, RPCError>>, | ||||
|         >, | ||||
|     ) -> EyreResult<()> { | ||||
|         let rpc = self.rpc_processor(); | ||||
|         let routing_table = self.routing_table(); | ||||
|  | ||||
|         // Get all nodes needing pings in the PublicInternet routing domain | ||||
|         let node_refs = routing_table.get_nodes_needing_ping(RoutingDomain::PublicInternet, cur_ts); | ||||
|  | ||||
|         // Look up any NAT mappings we may need to try to preserve with keepalives | ||||
|         let mut mapped_port_info = routing_table.get_low_level_port_info(); | ||||
|  | ||||
|         // Get the PublicInternet relay if we are using one | ||||
|         let opt_relay_nr = routing_table.relay_node(RoutingDomain::PublicInternet); | ||||
|         let opt_relay_id = opt_relay_nr.map(|nr| nr.node_id()); | ||||
|  | ||||
|         // Get our publicinternet dial info | ||||
|         let dids = routing_table.all_filtered_dial_info_details( | ||||
|             RoutingDomain::PublicInternet.into(), | ||||
|             &DialInfoFilter::all(), | ||||
|         ); | ||||
|  | ||||
|         // For all nodes needing pings, figure out how many and over what protocols | ||||
|         for nr in node_refs { | ||||
|             // If this is a relay, let's check for NAT keepalives | ||||
|             let mut did_pings = false; | ||||
|             if Some(nr.node_id()) == opt_relay_id { | ||||
|                 // Relay nodes get pinged over all protocols we have inbound dialinfo for | ||||
|                 // This is so we can preserve the inbound NAT mappings at our router | ||||
|                 for did in &dids { | ||||
|                     // Do we need to do this ping? | ||||
|                     // Check if we have already pinged over this low-level-protocol/address-type/port combo | ||||
|                     // We want to ensure we do the bare minimum required here | ||||
|                     let pt = did.dial_info.protocol_type(); | ||||
|                     let at = did.dial_info.address_type(); | ||||
|                     let needs_ping = if let Some((llpt, port)) = | ||||
|                         mapped_port_info.protocol_to_port.get(&(pt, at)) | ||||
|                     { | ||||
|                         mapped_port_info | ||||
|                             .low_level_protocol_ports | ||||
|                             .remove(&(*llpt, at, *port)) | ||||
|                     } else { | ||||
|                         false | ||||
|                     }; | ||||
|                     if needs_ping { | ||||
|                         let rpc = rpc.clone(); | ||||
|                         let dif = did.dial_info.make_filter(); | ||||
|                         let nr_filtered = | ||||
|                             nr.filtered_clone(NodeRefFilter::new().with_dial_info_filter(dif)); | ||||
|                         log_net!("--> Keepalive ping to {:?}", nr_filtered); | ||||
|                         unord.push( | ||||
|                             async move { rpc.rpc_call_status(Destination::direct(nr_filtered)).await } | ||||
|                                 .instrument(Span::current()) | ||||
|                                 .boxed(), | ||||
|                         ); | ||||
|                         did_pings = true; | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             // Just do a single ping with the best protocol for all the other nodes, | ||||
|             // ensuring that we at least ping a relay with -something- even if we didnt have | ||||
|             // any mapped ports to preserve | ||||
|             if !did_pings { | ||||
|                 let rpc = rpc.clone(); | ||||
|                 unord.push( | ||||
|                     async move { rpc.rpc_call_status(Destination::direct(nr)).await } | ||||
|                         .instrument(Span::current()) | ||||
|                         .boxed(), | ||||
|                 ); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     // Ping each node in the LocalNetwork routing domain if they | ||||
|     // need to be pinged to determine their reliability | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     fn ping_validator_local_network( | ||||
|         &self, | ||||
|         cur_ts: u64, | ||||
|         unord: &mut FuturesUnordered< | ||||
|             SendPinBoxFuture<Result<NetworkResult<Answer<Option<SenderInfo>>>, RPCError>>, | ||||
|         >, | ||||
|     ) -> EyreResult<()> { | ||||
|         let rpc = self.rpc_processor(); | ||||
|         let routing_table = self.routing_table(); | ||||
|  | ||||
|         // Get all nodes needing pings in the LocalNetwork routing domain | ||||
|         let node_refs = routing_table.get_nodes_needing_ping(RoutingDomain::LocalNetwork, cur_ts); | ||||
|  | ||||
|         // For all nodes needing pings, figure out how many and over what protocols | ||||
|         for nr in node_refs { | ||||
|             let rpc = rpc.clone(); | ||||
|  | ||||
|             // Just do a single ping with the best protocol for all the nodes | ||||
|             unord.push( | ||||
|                 async move { rpc.rpc_call_status(Destination::direct(nr)).await } | ||||
|                     .instrument(Span::current()) | ||||
|                     .boxed(), | ||||
|             ); | ||||
|         } | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     // Ping each node in the routing table if they need to be pinged | ||||
|     // to determine their reliability | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     pub(super) async fn ping_validator_task_routine( | ||||
|         self, | ||||
|         stop_token: StopToken, | ||||
|         _last_ts: u64, | ||||
|         cur_ts: u64, | ||||
|     ) -> EyreResult<()> { | ||||
|         let mut unord = FuturesUnordered::new(); | ||||
|  | ||||
|         // PublicInternet | ||||
|         self.ping_validator_public_internet(cur_ts, &mut unord)?; | ||||
|  | ||||
|         // LocalNetwork | ||||
|         self.ping_validator_local_network(cur_ts, &mut unord)?; | ||||
|  | ||||
|         // Wait for ping futures to complete in parallel | ||||
|         while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     // Ask our remaining peers to give us more peers before we go | ||||
|     // back to the bootstrap servers to keep us from bothering them too much | ||||
|     // This only adds PublicInternet routing domain peers. The discovery | ||||
|     // mechanism for LocalNetwork suffices for locating all the local network | ||||
|     // peers that are available. This, however, may query other LocalNetwork | ||||
|     // nodes for their PublicInternet peers, which is a very fast way to get | ||||
|     // a new node online. | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     pub(super) async fn peer_minimum_refresh_task_routine( | ||||
|         self, | ||||
|         stop_token: StopToken, | ||||
|     ) -> EyreResult<()> { | ||||
|         let routing_table = self.routing_table(); | ||||
|         let mut ord = FuturesOrdered::new(); | ||||
|         let min_peer_count = { | ||||
|             let c = self.unlocked_inner.config.get(); | ||||
|             c.network.dht.min_peer_count as usize | ||||
|         }; | ||||
|  | ||||
|         // For the PublicInternet routing domain, get list of all peers we know about | ||||
|         // even the unreliable ones, and ask them to find nodes close to our node too | ||||
|         let noderefs = routing_table.find_fastest_nodes( | ||||
|             min_peer_count, | ||||
|             VecDeque::new(), | ||||
|             |_rti, k: DHTKey, v: Option<Arc<BucketEntry>>| { | ||||
|                 NodeRef::new(routing_table.clone(), k, v.unwrap().clone(), None) | ||||
|             }, | ||||
|         ); | ||||
|         for nr in noderefs { | ||||
|             let routing_table = routing_table.clone(); | ||||
|             ord.push_back( | ||||
|                 async move { routing_table.reverse_find_node(nr, false).await } | ||||
|                     .instrument(Span::current()), | ||||
|             ); | ||||
|         } | ||||
|  | ||||
|         // do peer minimum search in order from fastest to slowest | ||||
|         while let Ok(Some(_)) = ord.next().timeout_at(stop_token.clone()).await {} | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     // Keep relays assigned and accessible | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     pub(super) async fn relay_management_task_routine( | ||||
|         self, | ||||
|         _stop_token: StopToken, | ||||
|         _last_ts: u64, | ||||
|         cur_ts: u64, | ||||
|     ) -> EyreResult<()> { | ||||
|         // Get our node's current node info and network class and do the right thing | ||||
|         let routing_table = self.routing_table(); | ||||
|         let own_peer_info = routing_table.get_own_peer_info(RoutingDomain::PublicInternet); | ||||
|         let own_node_info = own_peer_info.signed_node_info.node_info(); | ||||
|         let network_class = routing_table.get_network_class(RoutingDomain::PublicInternet); | ||||
|  | ||||
|         // Get routing domain editor | ||||
|         let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet); | ||||
|  | ||||
|         // Do we know our network class yet? | ||||
|         if let Some(network_class) = network_class { | ||||
|             // If we already have a relay, see if it is dead, or if we don't need it any more | ||||
|             let has_relay = { | ||||
|                 if let Some(relay_node) = routing_table.relay_node(RoutingDomain::PublicInternet) { | ||||
|                     let state = relay_node.state(cur_ts); | ||||
|                     // Relay node is dead or no longer needed | ||||
|                     if matches!(state, BucketEntryState::Dead) { | ||||
|                         info!("Relay node died, dropping relay {}", relay_node); | ||||
|                         editor.clear_relay_node(); | ||||
|                         false | ||||
|                     } else if !own_node_info.requires_relay() { | ||||
|                         info!( | ||||
|                             "Relay node no longer required, dropping relay {}", | ||||
|                             relay_node | ||||
|                         ); | ||||
|                         editor.clear_relay_node(); | ||||
|                         false | ||||
|                     } else { | ||||
|                         true | ||||
|                     } | ||||
|                 } else { | ||||
|                     false | ||||
|                 } | ||||
|             }; | ||||
|  | ||||
|             // Do we need a relay? | ||||
|             if !has_relay && own_node_info.requires_relay() { | ||||
|                 // Do we want an outbound relay? | ||||
|                 let mut got_outbound_relay = false; | ||||
|                 if network_class.outbound_wants_relay() { | ||||
|                     // The outbound relay is the host of the PWA | ||||
|                     if let Some(outbound_relay_peerinfo) = intf::get_outbound_relay_peer().await { | ||||
|                         // Register new outbound relay | ||||
|                         if let Some(nr) = routing_table.register_node_with_signed_node_info( | ||||
|                             RoutingDomain::PublicInternet, | ||||
|                             outbound_relay_peerinfo.node_id.key, | ||||
|                             outbound_relay_peerinfo.signed_node_info, | ||||
|                             false, | ||||
|                         ) { | ||||
|                             info!("Outbound relay node selected: {}", nr); | ||||
|                             editor.set_relay_node(nr); | ||||
|                             got_outbound_relay = true; | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|                 if !got_outbound_relay { | ||||
|                     // Find a node in our routing table that is an acceptable inbound relay | ||||
|                     if let Some(nr) = | ||||
|                         routing_table.find_inbound_relay(RoutingDomain::PublicInternet, cur_ts) | ||||
|                     { | ||||
|                         info!("Inbound relay node selected: {}", nr); | ||||
|                         editor.set_relay_node(nr); | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // Commit the changes | ||||
|         editor.commit().await; | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     // Keep private routes assigned and accessible | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     pub(super) async fn private_route_management_task_routine( | ||||
|         self, | ||||
|         _stop_token: StopToken, | ||||
|         _last_ts: u64, | ||||
|         cur_ts: u64, | ||||
|     ) -> EyreResult<()> { | ||||
|         // Get our node's current node info and network class and do the right thing | ||||
|         let routing_table = self.routing_table(); | ||||
|         let own_peer_info = routing_table.get_own_peer_info(RoutingDomain::PublicInternet); | ||||
|         let network_class = routing_table.get_network_class(RoutingDomain::PublicInternet); | ||||
|  | ||||
|         // Get routing domain editor | ||||
|         let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet); | ||||
|  | ||||
|         // Do we know our network class yet? | ||||
|         if let Some(network_class) = network_class { | ||||
|  | ||||
|             // see if we have any routes that need testing | ||||
|         } | ||||
|  | ||||
|         // Commit the changes | ||||
|         editor.commit().await; | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     // Compute transfer statistics for the low level network | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     pub(super) async fn rolling_transfers_task_routine( | ||||
|         self, | ||||
|         _stop_token: StopToken, | ||||
|         last_ts: u64, | ||||
|         cur_ts: u64, | ||||
|     ) -> EyreResult<()> { | ||||
|         // log_net!("--- network manager rolling_transfers task"); | ||||
|         { | ||||
|             let inner = &mut *self.inner.lock(); | ||||
|  | ||||
|             // Roll the low level network transfer stats for our address | ||||
|             inner | ||||
|                 .stats | ||||
|                 .self_stats | ||||
|                 .transfer_stats_accounting | ||||
|                 .roll_transfers(last_ts, cur_ts, &mut inner.stats.self_stats.transfer_stats); | ||||
|  | ||||
|             // Roll all per-address transfers | ||||
|             let mut dead_addrs: HashSet<PerAddressStatsKey> = HashSet::new(); | ||||
|             for (addr, stats) in &mut inner.stats.per_address_stats { | ||||
|                 stats.transfer_stats_accounting.roll_transfers( | ||||
|                     last_ts, | ||||
|                     cur_ts, | ||||
|                     &mut stats.transfer_stats, | ||||
|                 ); | ||||
|  | ||||
|                 // While we're here, lets see if this address has timed out | ||||
|                 if cur_ts - stats.last_seen_ts >= IPADDR_MAX_INACTIVE_DURATION_US { | ||||
|                     // it's dead, put it in the dead list | ||||
|                     dead_addrs.insert(*addr); | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             // Remove the dead addresses from our tables | ||||
|             for da in &dead_addrs { | ||||
|                 inner.stats.per_address_stats.remove(da); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // Send update | ||||
|         self.send_network_update(); | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     // Clean up the public address check tables, removing entries that have timed out | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     pub(super) async fn public_address_check_task_routine( | ||||
|         self, | ||||
|         stop_token: StopToken, | ||||
|         _last_ts: u64, | ||||
|         cur_ts: u64, | ||||
|     ) -> EyreResult<()> { | ||||
|         // go through public_address_inconsistencies_table and time out things that have expired | ||||
|         let mut inner = self.inner.lock(); | ||||
|         for (_, pait_v) in &mut inner.public_address_inconsistencies_table { | ||||
|             let mut expired = Vec::new(); | ||||
|             for (addr, exp_ts) in pait_v.iter() { | ||||
|                 if *exp_ts <= cur_ts { | ||||
|                     expired.push(*addr); | ||||
|                 } | ||||
|             } | ||||
|             for exp in expired { | ||||
|                 pait_v.remove(&exp); | ||||
|             } | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
							
								
								
									
										82
									
								
								veilid-core/src/network_manager/tasks/mod.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										82
									
								
								veilid-core/src/network_manager/tasks/mod.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,82 @@ | ||||
| pub mod public_address_check; | ||||
| pub mod rolling_transfers; | ||||
|  | ||||
| use super::*; | ||||
|  | ||||
| impl NetworkManager { | ||||
|     pub(crate) fn start_tasks(&self) { | ||||
|         // Set rolling transfers tick task | ||||
|         { | ||||
|             let this = self.clone(); | ||||
|             self.unlocked_inner | ||||
|                 .rolling_transfers_task | ||||
|                 .set_routine(move |s, l, t| { | ||||
|                     Box::pin( | ||||
|                         this.clone() | ||||
|                             .rolling_transfers_task_routine(s, l, t) | ||||
|                             .instrument(trace_span!( | ||||
|                                 parent: None, | ||||
|                                 "NetworkManager rolling transfers task routine" | ||||
|                             )), | ||||
|                     ) | ||||
|                 }); | ||||
|         } | ||||
|  | ||||
|         // Set public address check task | ||||
|         { | ||||
|             let this = self.clone(); | ||||
|             self.unlocked_inner | ||||
|                 .public_address_check_task | ||||
|                 .set_routine(move |s, l, t| { | ||||
|                     Box::pin( | ||||
|                         this.clone() | ||||
|                             .public_address_check_task_routine(s, l, t) | ||||
|                             .instrument(trace_span!( | ||||
|                                 parent: None, | ||||
|                                 "public address check task routine" | ||||
|                             )), | ||||
|                     ) | ||||
|                 }); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub async fn tick(&self) -> EyreResult<()> { | ||||
|         let routing_table = self.routing_table(); | ||||
|         let net = self.net(); | ||||
|         let receipt_manager = self.receipt_manager(); | ||||
|  | ||||
|         // Run the rolling transfers task | ||||
|         self.unlocked_inner.rolling_transfers_task.tick().await?; | ||||
|  | ||||
|         // Run the routing table tick | ||||
|         routing_table.tick().await?; | ||||
|  | ||||
|         // Run the low level network tick | ||||
|         net.tick().await?; | ||||
|  | ||||
|         // Run the receipt manager tick | ||||
|         receipt_manager.tick().await?; | ||||
|  | ||||
|         // Purge the client whitelist | ||||
|         self.purge_client_whitelist(); | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     pub(crate) async fn stop_tasks(&self) { | ||||
|         debug!("stopping rolling transfers task"); | ||||
|         if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await { | ||||
|             warn!("rolling_transfers_task not stopped: {}", e); | ||||
|         } | ||||
|         debug!("stopping node info update singlefuture"); | ||||
|         if self | ||||
|             .unlocked_inner | ||||
|             .node_info_update_single_future | ||||
|             .join() | ||||
|             .await | ||||
|             .is_err() | ||||
|         { | ||||
|             error!("node_info_update_single_future not stopped"); | ||||
|         } | ||||
|     } | ||||
| } | ||||
| @@ -0,0 +1,28 @@ | ||||
| use super::super::*; | ||||
| use crate::xx::*; | ||||
|  | ||||
| impl NetworkManager { | ||||
|     // Clean up the public address check tables, removing entries that have timed out | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     pub(crate) async fn public_address_check_task_routine( | ||||
|         self, | ||||
|         stop_token: StopToken, | ||||
|         _last_ts: u64, | ||||
|         cur_ts: u64, | ||||
|     ) -> EyreResult<()> { | ||||
|         // go through public_address_inconsistencies_table and time out things that have expired | ||||
|         let mut inner = self.inner.lock(); | ||||
|         for (_, pait_v) in &mut inner.public_address_inconsistencies_table { | ||||
|             let mut expired = Vec::new(); | ||||
|             for (addr, exp_ts) in pait_v.iter() { | ||||
|                 if *exp_ts <= cur_ts { | ||||
|                     expired.push(*addr); | ||||
|                 } | ||||
|             } | ||||
|             for exp in expired { | ||||
|                 pait_v.remove(&exp); | ||||
|             } | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
							
								
								
									
										52
									
								
								veilid-core/src/network_manager/tasks/rolling_transfers.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										52
									
								
								veilid-core/src/network_manager/tasks/rolling_transfers.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,52 @@ | ||||
| use super::super::*; | ||||
|  | ||||
| use crate::xx::*; | ||||
|  | ||||
| impl NetworkManager { | ||||
|     // Compute transfer statistics for the low level network | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     pub(crate) async fn rolling_transfers_task_routine( | ||||
|         self, | ||||
|         _stop_token: StopToken, | ||||
|         last_ts: u64, | ||||
|         cur_ts: u64, | ||||
|     ) -> EyreResult<()> { | ||||
|         // log_net!("--- network manager rolling_transfers task"); | ||||
|         { | ||||
|             let inner = &mut *self.inner.lock(); | ||||
|  | ||||
|             // Roll the low level network transfer stats for our address | ||||
|             inner | ||||
|                 .stats | ||||
|                 .self_stats | ||||
|                 .transfer_stats_accounting | ||||
|                 .roll_transfers(last_ts, cur_ts, &mut inner.stats.self_stats.transfer_stats); | ||||
|  | ||||
|             // Roll all per-address transfers | ||||
|             let mut dead_addrs: HashSet<PerAddressStatsKey> = HashSet::new(); | ||||
|             for (addr, stats) in &mut inner.stats.per_address_stats { | ||||
|                 stats.transfer_stats_accounting.roll_transfers( | ||||
|                     last_ts, | ||||
|                     cur_ts, | ||||
|                     &mut stats.transfer_stats, | ||||
|                 ); | ||||
|  | ||||
|                 // While we're here, lets see if this address has timed out | ||||
|                 if cur_ts - stats.last_seen_ts >= IPADDR_MAX_INACTIVE_DURATION_US { | ||||
|                     // it's dead, put it in the dead list | ||||
|                     dead_addrs.insert(*addr); | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             // Remove the dead addresses from our tables | ||||
|             for da in &dead_addrs { | ||||
|                 inner.stats.per_address_stats.remove(da); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // Send update | ||||
|         self.send_network_update(); | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
| @@ -1,4 +1,5 @@ | ||||
| use super::*; | ||||
| use routing_table::tasks::bootstrap::BOOTSTRAP_TXT_VERSION; | ||||
|  | ||||
| impl RoutingTable { | ||||
|     pub(crate) fn debug_info_nodeinfo(&self) -> String { | ||||
|   | ||||
| @@ -30,6 +30,8 @@ pub use routing_table_inner::*; | ||||
| pub use stats_accounting::*; | ||||
|  | ||||
| ////////////////////////////////////////////////////////////////////////// | ||||
| pub const RELAY_MANAGEMENT_INTERVAL_SECS: u32 = 1; | ||||
| pub const PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS: u32 = 1; | ||||
|  | ||||
| pub type LowLevelProtocolPorts = BTreeSet<(LowLevelProtocolType, AddressType, u16)>; | ||||
| pub type ProtocolToPortMapping = BTreeMap<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>; | ||||
| @@ -66,6 +68,16 @@ pub(super) struct RoutingTableUnlockedInner { | ||||
|     rolling_transfers_task: TickTask<EyreReport>, | ||||
|     /// Backgroup process to purge dead routing table entries when necessary | ||||
|     kick_buckets_task: TickTask<EyreReport>, | ||||
|     /// Background process to get our initial routing table | ||||
|     bootstrap_task: TickTask<EyreReport>, | ||||
|     /// Background process to ensure we have enough nodes in our routing table | ||||
|     peer_minimum_refresh_task: TickTask<EyreReport>, | ||||
|     /// Background process to check nodes to see if they are still alive and for reliability | ||||
|     ping_validator_task: TickTask<EyreReport>, | ||||
|     /// Background process to keep relays up | ||||
|     relay_management_task: TickTask<EyreReport>, | ||||
|     /// Background process to keep private routes up | ||||
|     private_route_management_task: TickTask<EyreReport>, | ||||
| } | ||||
|  | ||||
| #[derive(Clone)] | ||||
| @@ -88,6 +100,11 @@ impl RoutingTable { | ||||
|             kick_queue: Mutex::new(BTreeSet::default()), | ||||
|             rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS), | ||||
|             kick_buckets_task: TickTask::new(1), | ||||
|             bootstrap_task: TickTask::new(1), | ||||
|             peer_minimum_refresh_task: TickTask::new_ms(c.network.dht.min_peer_refresh_time_ms), | ||||
|             ping_validator_task: TickTask::new(1), | ||||
|             relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS), | ||||
|             private_route_management_task: TickTask::new(PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS), | ||||
|         } | ||||
|     } | ||||
|     pub fn new(network_manager: NetworkManager) -> Self { | ||||
| @@ -99,38 +116,8 @@ impl RoutingTable { | ||||
|             unlocked_inner, | ||||
|         }; | ||||
|  | ||||
|         // Set rolling transfers tick task | ||||
|         { | ||||
|             let this2 = this.clone(); | ||||
|             this.unlocked_inner | ||||
|                 .rolling_transfers_task | ||||
|                 .set_routine(move |s, l, t| { | ||||
|                     Box::pin( | ||||
|                         this2 | ||||
|                             .clone() | ||||
|                             .rolling_transfers_task_routine(s, l, t) | ||||
|                             .instrument(trace_span!( | ||||
|                                 parent: None, | ||||
|                                 "RoutingTable rolling transfers task routine" | ||||
|                             )), | ||||
|                     ) | ||||
|                 }); | ||||
|         } | ||||
|         this.start_tasks(); | ||||
|  | ||||
|         // Set kick buckets tick task | ||||
|         { | ||||
|             let this2 = this.clone(); | ||||
|             this.unlocked_inner | ||||
|                 .kick_buckets_task | ||||
|                 .set_routine(move |s, l, t| { | ||||
|                     Box::pin( | ||||
|                         this2 | ||||
|                             .clone() | ||||
|                             .kick_buckets_task_routine(s, l, t) | ||||
|                             .instrument(trace_span!(parent: None, "kick buckets task routine")), | ||||
|                     ) | ||||
|                 }); | ||||
|         } | ||||
|         this | ||||
|     } | ||||
|  | ||||
| @@ -140,6 +127,12 @@ impl RoutingTable { | ||||
|     pub fn rpc_processor(&self) -> RPCProcessor { | ||||
|         self.network_manager().rpc_processor() | ||||
|     } | ||||
|     pub fn with_config<F, R>(&self, f: F) -> R | ||||
|     where | ||||
|         F: FnOnce(&VeilidConfigInner) -> R, | ||||
|     { | ||||
|         f(&*self.unlocked_inner.config.get()) | ||||
|     } | ||||
|  | ||||
|     pub fn node_id(&self) -> DHTKey { | ||||
|         self.unlocked_inner.node_id | ||||
| @@ -194,15 +187,8 @@ impl RoutingTable { | ||||
|     pub async fn terminate(&self) { | ||||
|         debug!("starting routing table terminate"); | ||||
|  | ||||
|         // Cancel all tasks being ticked | ||||
|         debug!("stopping rolling transfers task"); | ||||
|         if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await { | ||||
|             error!("rolling_transfers_task not stopped: {}", e); | ||||
|         } | ||||
|         debug!("stopping kick buckets task"); | ||||
|         if let Err(e) = self.unlocked_inner.kick_buckets_task.stop().await { | ||||
|             error!("kick_buckets_task not stopped: {}", e); | ||||
|         } | ||||
|         // Stop tasks | ||||
|         self.stop_tasks().await; | ||||
|  | ||||
|         // Load bucket entries from table db if possible | ||||
|         debug!("saving routing table entries"); | ||||
| @@ -551,21 +537,6 @@ impl RoutingTable { | ||||
|         ) | ||||
|     } | ||||
|  | ||||
|     /// Ticks about once per second | ||||
|     /// to run tick tasks which may run at slower tick rates as configured | ||||
|     pub async fn tick(&self) -> EyreResult<()> { | ||||
|         // Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs | ||||
|         self.unlocked_inner.rolling_transfers_task.tick().await?; | ||||
|  | ||||
|         // Kick buckets task | ||||
|         let kick_bucket_queue_count = self.unlocked_inner.kick_queue.lock().len(); | ||||
|         if kick_bucket_queue_count > 0 { | ||||
|             self.unlocked_inner.kick_buckets_task.tick().await?; | ||||
|         } | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     ////////////////////////////////////////////////////////////////////// | ||||
|     // Routing Table Health Metrics | ||||
|  | ||||
|   | ||||
| @@ -839,6 +839,7 @@ impl RouteSpecStore { | ||||
|     pub async fn test_route(&self, key: &DHTKey) -> EyreResult<bool> { | ||||
|         let rpc_processor = self.unlocked_inner.routing_table.rpc_processor(); | ||||
|  | ||||
|         // Make loopback route to test with | ||||
|         let dest = { | ||||
|             let private_route = self.assemble_private_route(key, None)?; | ||||
|  | ||||
| @@ -848,48 +849,17 @@ impl RouteSpecStore { | ||||
|             let stability = rsd.stability; | ||||
|             let sequencing = rsd.sequencing; | ||||
|  | ||||
|             // Routes with just one hop can be pinged directly | ||||
|             // More than one hop can be pinged across the route with the target being the second to last hop | ||||
|             if rsd.hops.len() == 1 { | ||||
|                 let safety_spec = SafetySpec { | ||||
|                     preferred_route: Some(key.clone()), | ||||
|                     hop_count, | ||||
|                     stability, | ||||
|                     sequencing, | ||||
|                 }; | ||||
|                 let safety_selection = SafetySelection::Safe(safety_spec); | ||||
|             let safety_spec = SafetySpec { | ||||
|                 preferred_route: Some(key.clone()), | ||||
|                 hop_count, | ||||
|                 stability, | ||||
|                 sequencing, | ||||
|             }; | ||||
|             let safety_selection = SafetySelection::Safe(safety_spec); | ||||
|  | ||||
|                 Destination::PrivateRoute { | ||||
|                     private_route, | ||||
|                     safety_selection, | ||||
|                 } | ||||
|             } else { | ||||
|                 // let target = rsd.hop_node_refs[rsd.hops.len() - 2].clone(); | ||||
|                 // let safety_spec = SafetySpec { | ||||
|                 //     preferred_route: Some(key.clone()), | ||||
|                 //     hop_count, | ||||
|                 //     stability, | ||||
|                 //     sequencing, | ||||
|                 // }; | ||||
|                 // let safety_selection = SafetySelection::Safe(safety_spec); | ||||
|  | ||||
|                 // Destination::Direct { | ||||
|                 //     target, | ||||
|                 //     safety_selection, | ||||
|                 // } | ||||
|  | ||||
|                 let safety_spec = SafetySpec { | ||||
|                     preferred_route: Some(key.clone()), | ||||
|                     hop_count, | ||||
|                     stability, | ||||
|                     sequencing, | ||||
|                 }; | ||||
|                 let safety_selection = SafetySelection::Safe(safety_spec); | ||||
|  | ||||
|                 Destination::PrivateRoute { | ||||
|                     private_route, | ||||
|                     safety_selection, | ||||
|                 } | ||||
|             Destination::PrivateRoute { | ||||
|                 private_route, | ||||
|                 safety_selection, | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|   | ||||
							
								
								
									
										347
									
								
								veilid-core/src/routing_table/tasks/bootstrap.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										347
									
								
								veilid-core/src/routing_table/tasks/bootstrap.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,347 @@ | ||||
| use super::super::*; | ||||
| use crate::xx::*; | ||||
|  | ||||
| use futures_util::stream::{FuturesUnordered, StreamExt}; | ||||
| use stop_token::future::FutureExt as StopFutureExt; | ||||
|  | ||||
| pub const BOOTSTRAP_TXT_VERSION: u8 = 0; | ||||
|  | ||||
| #[derive(Clone, Debug)] | ||||
| pub struct BootstrapRecord { | ||||
|     min_version: u8, | ||||
|     max_version: u8, | ||||
|     dial_info_details: Vec<DialInfoDetail>, | ||||
| } | ||||
| pub type BootstrapRecordMap = BTreeMap<DHTKey, BootstrapRecord>; | ||||
|  | ||||
| impl RoutingTable { | ||||
|     // Bootstrap lookup process | ||||
|     #[instrument(level = "trace", skip(self), ret, err)] | ||||
|     pub(crate) async fn resolve_bootstrap( | ||||
|         &self, | ||||
|         bootstrap: Vec<String>, | ||||
|     ) -> EyreResult<BootstrapRecordMap> { | ||||
|         // Resolve from bootstrap root to bootstrap hostnames | ||||
|         let mut bsnames = Vec::<String>::new(); | ||||
|         for bh in bootstrap { | ||||
|             // Get TXT record for bootstrap (bootstrap.veilid.net, or similar) | ||||
|             let records = intf::txt_lookup(&bh).await?; | ||||
|             for record in records { | ||||
|                 // Split the bootstrap name record by commas | ||||
|                 for rec in record.split(',') { | ||||
|                     let rec = rec.trim(); | ||||
|                     // If the name specified is fully qualified, go with it | ||||
|                     let bsname = if rec.ends_with('.') { | ||||
|                         rec.to_string() | ||||
|                     } | ||||
|                     // If the name is not fully qualified, prepend it to the bootstrap name | ||||
|                     else { | ||||
|                         format!("{}.{}", rec, bh) | ||||
|                     }; | ||||
|  | ||||
|                     // Add to the list of bootstrap name to look up | ||||
|                     bsnames.push(bsname); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // Get bootstrap nodes from hostnames concurrently | ||||
|         let mut unord = FuturesUnordered::new(); | ||||
|         for bsname in bsnames { | ||||
|             unord.push( | ||||
|                 async move { | ||||
|                     // look up boostrap node txt records | ||||
|                     let bsnirecords = match intf::txt_lookup(&bsname).await { | ||||
|                         Err(e) => { | ||||
|                             warn!("bootstrap node txt lookup failed for {}: {}", bsname, e); | ||||
|                             return None; | ||||
|                         } | ||||
|                         Ok(v) => v, | ||||
|                     }; | ||||
|                     // for each record resolve into key/bootstraprecord pairs | ||||
|                     let mut bootstrap_records: Vec<(DHTKey, BootstrapRecord)> = Vec::new(); | ||||
|                     for bsnirecord in bsnirecords { | ||||
|                         // Bootstrap TXT Record Format Version 0: | ||||
|                         // txt_version,min_version,max_version,nodeid,hostname,dialinfoshort* | ||||
|                         // | ||||
|                         // Split bootstrap node record by commas. Example: | ||||
|                         // 0,0,0,7lxDEabK_qgjbe38RtBa3IZLrud84P6NhGP-pRTZzdQ,bootstrap-1.dev.veilid.net,T5150,U5150,W5150/ws | ||||
|                         let records: Vec<String> = bsnirecord | ||||
|                             .trim() | ||||
|                             .split(',') | ||||
|                             .map(|x| x.trim().to_owned()) | ||||
|                             .collect(); | ||||
|                         if records.len() < 6 { | ||||
|                             warn!("invalid number of fields in bootstrap txt record"); | ||||
|                             continue; | ||||
|                         } | ||||
|  | ||||
|                         // Bootstrap TXT record version | ||||
|                         let txt_version: u8 = match records[0].parse::<u8>() { | ||||
|                             Ok(v) => v, | ||||
|                             Err(e) => { | ||||
|                                 warn!( | ||||
|                                 "invalid txt_version specified in bootstrap node txt record: {}", | ||||
|                                 e | ||||
|                             ); | ||||
|                                 continue; | ||||
|                             } | ||||
|                         }; | ||||
|                         if txt_version != BOOTSTRAP_TXT_VERSION { | ||||
|                             warn!("unsupported bootstrap txt record version"); | ||||
|                             continue; | ||||
|                         } | ||||
|  | ||||
|                         // Min/Max wire protocol version | ||||
|                         let min_version: u8 = match records[1].parse::<u8>() { | ||||
|                             Ok(v) => v, | ||||
|                             Err(e) => { | ||||
|                                 warn!( | ||||
|                                 "invalid min_version specified in bootstrap node txt record: {}", | ||||
|                                 e | ||||
|                             ); | ||||
|                                 continue; | ||||
|                             } | ||||
|                         }; | ||||
|                         let max_version: u8 = match records[2].parse::<u8>() { | ||||
|                             Ok(v) => v, | ||||
|                             Err(e) => { | ||||
|                                 warn!( | ||||
|                                 "invalid max_version specified in bootstrap node txt record: {}", | ||||
|                                 e | ||||
|                             ); | ||||
|                                 continue; | ||||
|                             } | ||||
|                         }; | ||||
|  | ||||
|                         // Node Id | ||||
|                         let node_id_str = &records[3]; | ||||
|                         let node_id_key = match DHTKey::try_decode(node_id_str) { | ||||
|                             Ok(v) => v, | ||||
|                             Err(e) => { | ||||
|                                 warn!( | ||||
|                                     "Invalid node id in bootstrap node record {}: {}", | ||||
|                                     node_id_str, e | ||||
|                                 ); | ||||
|                                 continue; | ||||
|                             } | ||||
|                         }; | ||||
|  | ||||
|                         // Hostname | ||||
|                         let hostname_str = &records[4]; | ||||
|  | ||||
|                         // If this is our own node id, then we skip it for bootstrap, in case we are a bootstrap node | ||||
|                         if self.node_id() == node_id_key { | ||||
|                             continue; | ||||
|                         } | ||||
|  | ||||
|                         // Resolve each record and store in node dial infos list | ||||
|                         let mut bootstrap_record = BootstrapRecord { | ||||
|                             min_version, | ||||
|                             max_version, | ||||
|                             dial_info_details: Vec::new(), | ||||
|                         }; | ||||
|                         for rec in &records[5..] { | ||||
|                             let rec = rec.trim(); | ||||
|                             let dial_infos = match DialInfo::try_vec_from_short(rec, hostname_str) { | ||||
|                                 Ok(dis) => dis, | ||||
|                                 Err(e) => { | ||||
|                                     warn!( | ||||
|                                         "Couldn't resolve bootstrap node dial info {}: {}", | ||||
|                                         rec, e | ||||
|                                     ); | ||||
|                                     continue; | ||||
|                                 } | ||||
|                             }; | ||||
|  | ||||
|                             for di in dial_infos { | ||||
|                                 bootstrap_record.dial_info_details.push(DialInfoDetail { | ||||
|                                     dial_info: di, | ||||
|                                     class: DialInfoClass::Direct, | ||||
|                                 }); | ||||
|                             } | ||||
|                         } | ||||
|                         bootstrap_records.push((node_id_key, bootstrap_record)); | ||||
|                     } | ||||
|                     Some(bootstrap_records) | ||||
|                 } | ||||
|                 .instrument(Span::current()), | ||||
|             ); | ||||
|         } | ||||
|  | ||||
|         let mut bsmap = BootstrapRecordMap::new(); | ||||
|         while let Some(bootstrap_records) = unord.next().await { | ||||
|             if let Some(bootstrap_records) = bootstrap_records { | ||||
|                 for (bskey, mut bsrec) in bootstrap_records { | ||||
|                     let rec = bsmap.entry(bskey).or_insert_with(|| BootstrapRecord { | ||||
|                         min_version: bsrec.min_version, | ||||
|                         max_version: bsrec.max_version, | ||||
|                         dial_info_details: Vec::new(), | ||||
|                     }); | ||||
|                     rec.dial_info_details.append(&mut bsrec.dial_info_details); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         Ok(bsmap) | ||||
|     } | ||||
|  | ||||
|     // 'direct' bootstrap task routine for systems incapable of resolving TXT records, such as browser WASM | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     pub(crate) async fn direct_bootstrap_task_routine( | ||||
|         self, | ||||
|         stop_token: StopToken, | ||||
|         bootstrap_dialinfos: Vec<DialInfo>, | ||||
|     ) -> EyreResult<()> { | ||||
|         let mut unord = FuturesUnordered::new(); | ||||
|         let network_manager = self.network_manager(); | ||||
|  | ||||
|         for bootstrap_di in bootstrap_dialinfos { | ||||
|             log_rtab!(debug "direct bootstrap with: {}", bootstrap_di); | ||||
|             let peer_info = network_manager.boot_request(bootstrap_di).await?; | ||||
|  | ||||
|             log_rtab!(debug "  direct bootstrap peerinfo: {:?}", peer_info); | ||||
|  | ||||
|             // Got peer info, let's add it to the routing table | ||||
|             for pi in peer_info { | ||||
|                 let k = pi.node_id.key; | ||||
|                 // Register the node | ||||
|                 if let Some(nr) = self.register_node_with_signed_node_info( | ||||
|                     RoutingDomain::PublicInternet, | ||||
|                     k, | ||||
|                     pi.signed_node_info, | ||||
|                     false, | ||||
|                 ) { | ||||
|                     // Add this our futures to process in parallel | ||||
|                     let routing_table = self.clone(); | ||||
|                     unord.push( | ||||
|                         // lets ask bootstrap to find ourselves now | ||||
|                         async move { routing_table.reverse_find_node(nr, true).await } | ||||
|                             .instrument(Span::current()), | ||||
|                     ); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // Wait for all bootstrap operations to complete before we complete the singlefuture | ||||
|         while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     pub(crate) async fn bootstrap_task_routine(self, stop_token: StopToken) -> EyreResult<()> { | ||||
|         let (bootstrap, bootstrap_nodes) = self.with_config(|c| { | ||||
|             ( | ||||
|                 c.network.bootstrap.clone(), | ||||
|                 c.network.bootstrap_nodes.clone(), | ||||
|             ) | ||||
|         }); | ||||
|  | ||||
|         log_rtab!(debug "--- bootstrap_task"); | ||||
|  | ||||
|         // See if we are specifying a direct dialinfo for bootstrap, if so use the direct mechanism | ||||
|         if !bootstrap.is_empty() && bootstrap_nodes.is_empty() { | ||||
|             let mut bootstrap_dialinfos = Vec::<DialInfo>::new(); | ||||
|             for b in &bootstrap { | ||||
|                 if let Ok(bootstrap_di_vec) = DialInfo::try_vec_from_url(&b) { | ||||
|                     for bootstrap_di in bootstrap_di_vec { | ||||
|                         bootstrap_dialinfos.push(bootstrap_di); | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             if bootstrap_dialinfos.len() > 0 { | ||||
|                 return self | ||||
|                     .direct_bootstrap_task_routine(stop_token, bootstrap_dialinfos) | ||||
|                     .await; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // If we aren't specifying a bootstrap node list explicitly, then pull from the bootstrap server(s) | ||||
|         let bsmap: BootstrapRecordMap = if !bootstrap_nodes.is_empty() { | ||||
|             let mut bsmap = BootstrapRecordMap::new(); | ||||
|             let mut bootstrap_node_dial_infos = Vec::new(); | ||||
|             for b in bootstrap_nodes { | ||||
|                 let (id_str, di_str) = b | ||||
|                     .split_once('@') | ||||
|                     .ok_or_else(|| eyre!("Invalid node dial info in bootstrap entry"))?; | ||||
|                 let node_id = | ||||
|                     NodeId::from_str(id_str).wrap_err("Invalid node id in bootstrap entry")?; | ||||
|                 let dial_info = | ||||
|                     DialInfo::from_str(di_str).wrap_err("Invalid dial info in bootstrap entry")?; | ||||
|                 bootstrap_node_dial_infos.push((node_id, dial_info)); | ||||
|             } | ||||
|             for (node_id, dial_info) in bootstrap_node_dial_infos { | ||||
|                 bsmap | ||||
|                     .entry(node_id.key) | ||||
|                     .or_insert_with(|| BootstrapRecord { | ||||
|                         min_version: MIN_CRYPTO_VERSION, | ||||
|                         max_version: MAX_CRYPTO_VERSION, | ||||
|                         dial_info_details: Vec::new(), | ||||
|                     }) | ||||
|                     .dial_info_details | ||||
|                     .push(DialInfoDetail { | ||||
|                         dial_info, | ||||
|                         class: DialInfoClass::Direct, // Bootstraps are always directly reachable | ||||
|                     }); | ||||
|             } | ||||
|             bsmap | ||||
|         } else { | ||||
|             // Resolve bootstrap servers and recurse their TXT entries | ||||
|             self.resolve_bootstrap(bootstrap).await? | ||||
|         }; | ||||
|  | ||||
|         // Map all bootstrap entries to a single key with multiple dialinfo | ||||
|  | ||||
|         // Run all bootstrap operations concurrently | ||||
|         let mut unord = FuturesUnordered::new(); | ||||
|         for (k, mut v) in bsmap { | ||||
|             // Sort dial info so we get the preferred order correct | ||||
|             v.dial_info_details.sort(); | ||||
|  | ||||
|             log_rtab!("--- bootstrapping {} with {:?}", k.encode(), &v); | ||||
|  | ||||
|             // Make invalid signed node info (no signature) | ||||
|             if let Some(nr) = self.register_node_with_signed_node_info( | ||||
|                 RoutingDomain::PublicInternet, | ||||
|                 k, | ||||
|                 SignedNodeInfo::Direct(SignedDirectNodeInfo::with_no_signature(NodeInfo { | ||||
|                     network_class: NetworkClass::InboundCapable, // Bootstraps are always inbound capable | ||||
|                     outbound_protocols: ProtocolTypeSet::only(ProtocolType::UDP), // Bootstraps do not participate in relaying and will not make outbound requests, but will have UDP enabled | ||||
|                     address_types: AddressTypeSet::all(), // Bootstraps are always IPV4 and IPV6 capable | ||||
|                     min_version: v.min_version, // Minimum crypto version specified in txt record | ||||
|                     max_version: v.max_version, // Maximum crypto version specified in txt record | ||||
|                     dial_info_detail_list: v.dial_info_details, // Dial info is as specified in the bootstrap list | ||||
|                 })), | ||||
|                 true, | ||||
|             ) { | ||||
|                 // Add this our futures to process in parallel | ||||
|                 let routing_table = self.clone(); | ||||
|                 unord.push( | ||||
|                     async move { | ||||
|                         // Need VALID signed peer info, so ask bootstrap to find_node of itself | ||||
|                         // which will ensure it has the bootstrap's signed peer info as part of the response | ||||
|                         let _ = routing_table.find_target(nr.clone()).await; | ||||
|  | ||||
|                         // Ensure we got the signed peer info | ||||
|                         if !nr.signed_node_info_has_valid_signature(RoutingDomain::PublicInternet) { | ||||
|                             log_rtab!(warn | ||||
|                                 "bootstrap at {:?} did not return valid signed node info", | ||||
|                                 nr | ||||
|                             ); | ||||
|                             // If this node info is invalid, it will time out after being unpingable | ||||
|                         } else { | ||||
|                             // otherwise this bootstrap is valid, lets ask it to find ourselves now | ||||
|                             routing_table.reverse_find_node(nr, true).await | ||||
|                         } | ||||
|                     } | ||||
|                     .instrument(Span::current()), | ||||
|                 ); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // Wait for all bootstrap operations to complete before we complete the singlefuture | ||||
|         while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
							
								
								
									
										23
									
								
								veilid-core/src/routing_table/tasks/kick_buckets.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										23
									
								
								veilid-core/src/routing_table/tasks/kick_buckets.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,23 @@ | ||||
| use super::super::*; | ||||
| use crate::xx::*; | ||||
|  | ||||
| impl RoutingTable { | ||||
|     // Kick the queued buckets in the routing table to free dead nodes if necessary | ||||
|     // Attempts to keep the size of the routing table down to the bucket depth | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     pub(crate) async fn kick_buckets_task_routine( | ||||
|         self, | ||||
|         _stop_token: StopToken, | ||||
|         _last_ts: u64, | ||||
|         cur_ts: u64, | ||||
|     ) -> EyreResult<()> { | ||||
|         let kick_queue: Vec<usize> = core::mem::take(&mut *self.unlocked_inner.kick_queue.lock()) | ||||
|             .into_iter() | ||||
|             .collect(); | ||||
|         let mut inner = self.inner.write(); | ||||
|         for idx in kick_queue { | ||||
|             inner.kick_bucket(idx) | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
							
								
								
									
										202
									
								
								veilid-core/src/routing_table/tasks/mod.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										202
									
								
								veilid-core/src/routing_table/tasks/mod.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,202 @@ | ||||
| pub mod bootstrap; | ||||
| pub mod kick_buckets; | ||||
| pub mod peer_minimum_refresh; | ||||
| pub mod ping_validator; | ||||
| pub mod private_route_management; | ||||
| pub mod relay_management; | ||||
| pub mod rolling_transfers; | ||||
|  | ||||
| use super::*; | ||||
|  | ||||
| impl RoutingTable { | ||||
|     pub(crate) fn start_tasks(&self) { | ||||
|         // Set rolling transfers tick task | ||||
|         { | ||||
|             let this = self.clone(); | ||||
|             self.unlocked_inner | ||||
|                 .rolling_transfers_task | ||||
|                 .set_routine(move |s, l, t| { | ||||
|                     Box::pin( | ||||
|                         this.clone() | ||||
|                             .rolling_transfers_task_routine(s, l, t) | ||||
|                             .instrument(trace_span!( | ||||
|                                 parent: None, | ||||
|                                 "RoutingTable rolling transfers task routine" | ||||
|                             )), | ||||
|                     ) | ||||
|                 }); | ||||
|         } | ||||
|  | ||||
|         // Set kick buckets tick task | ||||
|         { | ||||
|             let this = self.clone(); | ||||
|             self.unlocked_inner | ||||
|                 .kick_buckets_task | ||||
|                 .set_routine(move |s, l, t| { | ||||
|                     Box::pin( | ||||
|                         this.clone() | ||||
|                             .kick_buckets_task_routine(s, l, t) | ||||
|                             .instrument(trace_span!(parent: None, "kick buckets task routine")), | ||||
|                     ) | ||||
|                 }); | ||||
|         } | ||||
|  | ||||
|         // Set bootstrap tick task | ||||
|         { | ||||
|             let this = self.clone(); | ||||
|             self.unlocked_inner | ||||
|                 .bootstrap_task | ||||
|                 .set_routine(move |s, _l, _t| { | ||||
|                     Box::pin( | ||||
|                         this.clone() | ||||
|                             .bootstrap_task_routine(s) | ||||
|                             .instrument(trace_span!(parent: None, "bootstrap task routine")), | ||||
|                     ) | ||||
|                 }); | ||||
|         } | ||||
|  | ||||
|         // Set peer minimum refresh tick task | ||||
|         { | ||||
|             let this = self.clone(); | ||||
|             self.unlocked_inner | ||||
|                 .peer_minimum_refresh_task | ||||
|                 .set_routine(move |s, _l, _t| { | ||||
|                     Box::pin( | ||||
|                         this.clone() | ||||
|                             .peer_minimum_refresh_task_routine(s) | ||||
|                             .instrument(trace_span!( | ||||
|                                 parent: None, | ||||
|                                 "peer minimum refresh task routine" | ||||
|                             )), | ||||
|                     ) | ||||
|                 }); | ||||
|         } | ||||
|  | ||||
|         // Set ping validator tick task | ||||
|         { | ||||
|             let this = self.clone(); | ||||
|             self.unlocked_inner | ||||
|                 .ping_validator_task | ||||
|                 .set_routine(move |s, l, t| { | ||||
|                     Box::pin( | ||||
|                         this.clone() | ||||
|                             .ping_validator_task_routine(s, l, t) | ||||
|                             .instrument(trace_span!(parent: None, "ping validator task routine")), | ||||
|                     ) | ||||
|                 }); | ||||
|         } | ||||
|  | ||||
|         // Set relay management tick task | ||||
|         { | ||||
|             let this = self.clone(); | ||||
|             self.unlocked_inner | ||||
|                 .relay_management_task | ||||
|                 .set_routine(move |s, l, t| { | ||||
|                     Box::pin( | ||||
|                         this.clone() | ||||
|                             .relay_management_task_routine(s, l, t) | ||||
|                             .instrument(trace_span!(parent: None, "relay management task routine")), | ||||
|                     ) | ||||
|                 }); | ||||
|         } | ||||
|  | ||||
|         // Set private route management tick task | ||||
|         { | ||||
|             let this = self.clone(); | ||||
|             self.unlocked_inner | ||||
|                 .private_route_management_task | ||||
|                 .set_routine(move |s, l, t| { | ||||
|                     Box::pin( | ||||
|                         this.clone() | ||||
|                             .private_route_management_task_routine(s, l, t) | ||||
|                             .instrument(trace_span!( | ||||
|                                 parent: None, | ||||
|                                 "private route management task routine" | ||||
|                             )), | ||||
|                     ) | ||||
|                 }); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// Ticks about once per second | ||||
|     /// to run tick tasks which may run at slower tick rates as configured | ||||
|     pub async fn tick(&self) -> EyreResult<()> { | ||||
|         // Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs | ||||
|         self.unlocked_inner.rolling_transfers_task.tick().await?; | ||||
|  | ||||
|         // Kick buckets task | ||||
|         let kick_bucket_queue_count = self.unlocked_inner.kick_queue.lock().len(); | ||||
|         if kick_bucket_queue_count > 0 { | ||||
|             self.unlocked_inner.kick_buckets_task.tick().await?; | ||||
|         } | ||||
|  | ||||
|         // See how many live PublicInternet entries we have | ||||
|         let live_public_internet_entry_count = self.get_entry_count( | ||||
|             RoutingDomain::PublicInternet.into(), | ||||
|             BucketEntryState::Unreliable, | ||||
|         ); | ||||
|         let min_peer_count = self.with_config(|c| c.network.dht.min_peer_count as usize); | ||||
|  | ||||
|         // If none, then add the bootstrap nodes to it | ||||
|         if live_public_internet_entry_count == 0 { | ||||
|             self.unlocked_inner.bootstrap_task.tick().await?; | ||||
|         } | ||||
|         // If we still don't have enough peers, find nodes until we do | ||||
|         else if !self.unlocked_inner.bootstrap_task.is_running() | ||||
|             && live_public_internet_entry_count < min_peer_count | ||||
|         { | ||||
|             self.unlocked_inner.peer_minimum_refresh_task.tick().await?; | ||||
|         } | ||||
|  | ||||
|         // Ping validate some nodes to groom the table | ||||
|         self.unlocked_inner.ping_validator_task.tick().await?; | ||||
|  | ||||
|         // Run the relay management task | ||||
|         self.unlocked_inner.relay_management_task.tick().await?; | ||||
|  | ||||
|         // Run the private route management task | ||||
|         self.unlocked_inner | ||||
|             .private_route_management_task | ||||
|             .tick() | ||||
|             .await?; | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     pub(crate) async fn stop_tasks(&self) { | ||||
|         // Cancel all tasks being ticked | ||||
|         debug!("stopping rolling transfers task"); | ||||
|         if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await { | ||||
|             error!("rolling_transfers_task not stopped: {}", e); | ||||
|         } | ||||
|         debug!("stopping kick buckets task"); | ||||
|         if let Err(e) = self.unlocked_inner.kick_buckets_task.stop().await { | ||||
|             error!("kick_buckets_task not stopped: {}", e); | ||||
|         } | ||||
|         debug!("stopping bootstrap task"); | ||||
|         if let Err(e) = self.unlocked_inner.bootstrap_task.stop().await { | ||||
|             error!("bootstrap_task not stopped: {}", e); | ||||
|         } | ||||
|         debug!("stopping peer minimum refresh task"); | ||||
|         if let Err(e) = self.unlocked_inner.peer_minimum_refresh_task.stop().await { | ||||
|             error!("peer_minimum_refresh_task not stopped: {}", e); | ||||
|         } | ||||
|         debug!("stopping ping_validator task"); | ||||
|         if let Err(e) = self.unlocked_inner.ping_validator_task.stop().await { | ||||
|             error!("ping_validator_task not stopped: {}", e); | ||||
|         } | ||||
|         debug!("stopping relay management task"); | ||||
|         if let Err(e) = self.unlocked_inner.relay_management_task.stop().await { | ||||
|             warn!("relay_management_task not stopped: {}", e); | ||||
|         } | ||||
|         debug!("stopping private route management task"); | ||||
|         if let Err(e) = self | ||||
|             .unlocked_inner | ||||
|             .private_route_management_task | ||||
|             .stop() | ||||
|             .await | ||||
|         { | ||||
|             warn!("private_route_management_task not stopped: {}", e); | ||||
|         } | ||||
|     } | ||||
| } | ||||
							
								
								
									
										47
									
								
								veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										47
									
								
								veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,47 @@ | ||||
| use super::super::*; | ||||
| use crate::xx::*; | ||||
|  | ||||
| use futures_util::stream::{FuturesOrdered, StreamExt}; | ||||
| use stop_token::future::FutureExt as StopFutureExt; | ||||
|  | ||||
| impl RoutingTable { | ||||
|     // Ask our remaining peers to give us more peers before we go | ||||
|     // back to the bootstrap servers to keep us from bothering them too much | ||||
|     // This only adds PublicInternet routing domain peers. The discovery | ||||
|     // mechanism for LocalNetwork suffices for locating all the local network | ||||
|     // peers that are available. This, however, may query other LocalNetwork | ||||
|     // nodes for their PublicInternet peers, which is a very fast way to get | ||||
|     // a new node online. | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     pub(crate) async fn peer_minimum_refresh_task_routine( | ||||
|         self, | ||||
|         stop_token: StopToken, | ||||
|     ) -> EyreResult<()> { | ||||
|         let min_peer_count = self.with_config(|c| c.network.dht.min_peer_count as usize); | ||||
|  | ||||
|         // For the PublicInternet routing domain, get list of all peers we know about | ||||
|         // even the unreliable ones, and ask them to find nodes close to our node too | ||||
|         let routing_table = self.clone(); | ||||
|         let noderefs = routing_table.find_fastest_nodes( | ||||
|             min_peer_count, | ||||
|             VecDeque::new(), | ||||
|             |_rti, k: DHTKey, v: Option<Arc<BucketEntry>>| { | ||||
|                 NodeRef::new(routing_table.clone(), k, v.unwrap().clone(), None) | ||||
|             }, | ||||
|         ); | ||||
|  | ||||
|         let mut ord = FuturesOrdered::new(); | ||||
|         for nr in noderefs { | ||||
|             let routing_table = self.clone(); | ||||
|             ord.push_back( | ||||
|                 async move { routing_table.reverse_find_node(nr, false).await } | ||||
|                     .instrument(Span::current()), | ||||
|             ); | ||||
|         } | ||||
|  | ||||
|         // do peer minimum search in order from fastest to slowest | ||||
|         while let Ok(Some(_)) = ord.next().timeout_at(stop_token.clone()).await {} | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
							
								
								
									
										142
									
								
								veilid-core/src/routing_table/tasks/ping_validator.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										142
									
								
								veilid-core/src/routing_table/tasks/ping_validator.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,142 @@ | ||||
| use super::super::*; | ||||
| use crate::xx::*; | ||||
|  | ||||
| use futures_util::stream::{FuturesUnordered, StreamExt}; | ||||
| use futures_util::FutureExt; | ||||
| use stop_token::future::FutureExt as StopFutureExt; | ||||
|  | ||||
| impl RoutingTable { | ||||
|     // Ping each node in the routing table if they need to be pinged | ||||
|     // to determine their reliability | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     fn ping_validator_public_internet( | ||||
|         &self, | ||||
|         cur_ts: u64, | ||||
|         unord: &mut FuturesUnordered< | ||||
|             SendPinBoxFuture<Result<NetworkResult<Answer<Option<SenderInfo>>>, RPCError>>, | ||||
|         >, | ||||
|     ) -> EyreResult<()> { | ||||
|         let rpc = self.rpc_processor(); | ||||
|  | ||||
|         // Get all nodes needing pings in the PublicInternet routing domain | ||||
|         let node_refs = self.get_nodes_needing_ping(RoutingDomain::PublicInternet, cur_ts); | ||||
|  | ||||
|         // Look up any NAT mappings we may need to try to preserve with keepalives | ||||
|         let mut mapped_port_info = self.get_low_level_port_info(); | ||||
|  | ||||
|         // Get the PublicInternet relay if we are using one | ||||
|         let opt_relay_nr = self.relay_node(RoutingDomain::PublicInternet); | ||||
|         let opt_relay_id = opt_relay_nr.map(|nr| nr.node_id()); | ||||
|  | ||||
|         // Get our publicinternet dial info | ||||
|         let dids = self.all_filtered_dial_info_details( | ||||
|             RoutingDomain::PublicInternet.into(), | ||||
|             &DialInfoFilter::all(), | ||||
|         ); | ||||
|  | ||||
|         // For all nodes needing pings, figure out how many and over what protocols | ||||
|         for nr in node_refs { | ||||
|             // If this is a relay, let's check for NAT keepalives | ||||
|             let mut did_pings = false; | ||||
|             if Some(nr.node_id()) == opt_relay_id { | ||||
|                 // Relay nodes get pinged over all protocols we have inbound dialinfo for | ||||
|                 // This is so we can preserve the inbound NAT mappings at our router | ||||
|                 for did in &dids { | ||||
|                     // Do we need to do this ping? | ||||
|                     // Check if we have already pinged over this low-level-protocol/address-type/port combo | ||||
|                     // We want to ensure we do the bare minimum required here | ||||
|                     let pt = did.dial_info.protocol_type(); | ||||
|                     let at = did.dial_info.address_type(); | ||||
|                     let needs_ping = if let Some((llpt, port)) = | ||||
|                         mapped_port_info.protocol_to_port.get(&(pt, at)) | ||||
|                     { | ||||
|                         mapped_port_info | ||||
|                             .low_level_protocol_ports | ||||
|                             .remove(&(*llpt, at, *port)) | ||||
|                     } else { | ||||
|                         false | ||||
|                     }; | ||||
|                     if needs_ping { | ||||
|                         let rpc = rpc.clone(); | ||||
|                         let dif = did.dial_info.make_filter(); | ||||
|                         let nr_filtered = | ||||
|                             nr.filtered_clone(NodeRefFilter::new().with_dial_info_filter(dif)); | ||||
|                         log_net!("--> Keepalive ping to {:?}", nr_filtered); | ||||
|                         unord.push( | ||||
|                             async move { rpc.rpc_call_status(Destination::direct(nr_filtered)).await } | ||||
|                                 .instrument(Span::current()) | ||||
|                                 .boxed(), | ||||
|                         ); | ||||
|                         did_pings = true; | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             // Just do a single ping with the best protocol for all the other nodes, | ||||
|             // ensuring that we at least ping a relay with -something- even if we didnt have | ||||
|             // any mapped ports to preserve | ||||
|             if !did_pings { | ||||
|                 let rpc = rpc.clone(); | ||||
|                 unord.push( | ||||
|                     async move { rpc.rpc_call_status(Destination::direct(nr)).await } | ||||
|                         .instrument(Span::current()) | ||||
|                         .boxed(), | ||||
|                 ); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     // Ping each node in the LocalNetwork routing domain if they | ||||
|     // need to be pinged to determine their reliability | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     fn ping_validator_local_network( | ||||
|         &self, | ||||
|         cur_ts: u64, | ||||
|         unord: &mut FuturesUnordered< | ||||
|             SendPinBoxFuture<Result<NetworkResult<Answer<Option<SenderInfo>>>, RPCError>>, | ||||
|         >, | ||||
|     ) -> EyreResult<()> { | ||||
|         let rpc = self.rpc_processor(); | ||||
|  | ||||
|         // Get all nodes needing pings in the LocalNetwork routing domain | ||||
|         let node_refs = self.get_nodes_needing_ping(RoutingDomain::LocalNetwork, cur_ts); | ||||
|  | ||||
|         // For all nodes needing pings, figure out how many and over what protocols | ||||
|         for nr in node_refs { | ||||
|             let rpc = rpc.clone(); | ||||
|  | ||||
|             // Just do a single ping with the best protocol for all the nodes | ||||
|             unord.push( | ||||
|                 async move { rpc.rpc_call_status(Destination::direct(nr)).await } | ||||
|                     .instrument(Span::current()) | ||||
|                     .boxed(), | ||||
|             ); | ||||
|         } | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     // Ping each node in the routing table if they need to be pinged | ||||
|     // to determine their reliability | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     pub(crate) async fn ping_validator_task_routine( | ||||
|         self, | ||||
|         stop_token: StopToken, | ||||
|         _last_ts: u64, | ||||
|         cur_ts: u64, | ||||
|     ) -> EyreResult<()> { | ||||
|         let mut unord = FuturesUnordered::new(); | ||||
|  | ||||
|         // PublicInternet | ||||
|         self.ping_validator_public_internet(cur_ts, &mut unord)?; | ||||
|  | ||||
|         // LocalNetwork | ||||
|         self.ping_validator_local_network(cur_ts, &mut unord)?; | ||||
|  | ||||
|         // Wait for ping futures to complete in parallel | ||||
|         while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
| @@ -0,0 +1,34 @@ | ||||
| use super::super::*; | ||||
| use crate::xx::*; | ||||
|  | ||||
| use futures_util::stream::{FuturesOrdered, StreamExt}; | ||||
| use stop_token::future::FutureExt as StopFutureExt; | ||||
|  | ||||
| impl RoutingTable { | ||||
|     // Keep private routes assigned and accessible | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     pub(crate) async fn private_route_management_task_routine( | ||||
|         self, | ||||
|         _stop_token: StopToken, | ||||
|         _last_ts: u64, | ||||
|         cur_ts: u64, | ||||
|     ) -> EyreResult<()> { | ||||
|         // Get our node's current node info and network class and do the right thing | ||||
|         let own_peer_info = self.get_own_peer_info(RoutingDomain::PublicInternet); | ||||
|         let network_class = self.get_network_class(RoutingDomain::PublicInternet); | ||||
|  | ||||
|         // Get routing domain editor | ||||
|         let mut editor = self.edit_routing_domain(RoutingDomain::PublicInternet); | ||||
|  | ||||
|         // Do we know our network class yet? | ||||
|         if let Some(network_class) = network_class { | ||||
|  | ||||
|             // see if we have any routes that need testing | ||||
|         } | ||||
|  | ||||
|         // Commit the changes | ||||
|         editor.commit().await; | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
							
								
								
									
										83
									
								
								veilid-core/src/routing_table/tasks/relay_management.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										83
									
								
								veilid-core/src/routing_table/tasks/relay_management.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,83 @@ | ||||
| use super::super::*; | ||||
| use crate::xx::*; | ||||
|  | ||||
| impl RoutingTable { | ||||
|     // Keep relays assigned and accessible | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     pub(crate) async fn relay_management_task_routine( | ||||
|         self, | ||||
|         _stop_token: StopToken, | ||||
|         _last_ts: u64, | ||||
|         cur_ts: u64, | ||||
|     ) -> EyreResult<()> { | ||||
|         // Get our node's current node info and network class and do the right thing | ||||
|         let own_peer_info = self.get_own_peer_info(RoutingDomain::PublicInternet); | ||||
|         let own_node_info = own_peer_info.signed_node_info.node_info(); | ||||
|         let network_class = self.get_network_class(RoutingDomain::PublicInternet); | ||||
|  | ||||
|         // Get routing domain editor | ||||
|         let mut editor = self.edit_routing_domain(RoutingDomain::PublicInternet); | ||||
|  | ||||
|         // Do we know our network class yet? | ||||
|         if let Some(network_class) = network_class { | ||||
|             // If we already have a relay, see if it is dead, or if we don't need it any more | ||||
|             let has_relay = { | ||||
|                 if let Some(relay_node) = self.relay_node(RoutingDomain::PublicInternet) { | ||||
|                     let state = relay_node.state(cur_ts); | ||||
|                     // Relay node is dead or no longer needed | ||||
|                     if matches!(state, BucketEntryState::Dead) { | ||||
|                         info!("Relay node died, dropping relay {}", relay_node); | ||||
|                         editor.clear_relay_node(); | ||||
|                         false | ||||
|                     } else if !own_node_info.requires_relay() { | ||||
|                         info!( | ||||
|                             "Relay node no longer required, dropping relay {}", | ||||
|                             relay_node | ||||
|                         ); | ||||
|                         editor.clear_relay_node(); | ||||
|                         false | ||||
|                     } else { | ||||
|                         true | ||||
|                     } | ||||
|                 } else { | ||||
|                     false | ||||
|                 } | ||||
|             }; | ||||
|  | ||||
|             // Do we need a relay? | ||||
|             if !has_relay && own_node_info.requires_relay() { | ||||
|                 // Do we want an outbound relay? | ||||
|                 let mut got_outbound_relay = false; | ||||
|                 if network_class.outbound_wants_relay() { | ||||
|                     // The outbound relay is the host of the PWA | ||||
|                     if let Some(outbound_relay_peerinfo) = intf::get_outbound_relay_peer().await { | ||||
|                         // Register new outbound relay | ||||
|                         if let Some(nr) = self.register_node_with_signed_node_info( | ||||
|                             RoutingDomain::PublicInternet, | ||||
|                             outbound_relay_peerinfo.node_id.key, | ||||
|                             outbound_relay_peerinfo.signed_node_info, | ||||
|                             false, | ||||
|                         ) { | ||||
|                             info!("Outbound relay node selected: {}", nr); | ||||
|                             editor.set_relay_node(nr); | ||||
|                             got_outbound_relay = true; | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|                 if !got_outbound_relay { | ||||
|                     // Find a node in our routing table that is an acceptable inbound relay | ||||
|                     if let Some(nr) = self.find_inbound_relay(RoutingDomain::PublicInternet, cur_ts) | ||||
|                     { | ||||
|                         info!("Inbound relay node selected: {}", nr); | ||||
|                         editor.set_relay_node(nr); | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // Commit the changes | ||||
|         editor.commit().await; | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
| @@ -1,10 +1,10 @@ | ||||
| use super::*; | ||||
| use super::super::*; | ||||
| use crate::xx::*; | ||||
| 
 | ||||
| impl RoutingTable { | ||||
|     // Compute transfer statistics to determine how 'fast' a node is
 | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     pub(super) async fn rolling_transfers_task_routine( | ||||
|     pub(crate) async fn rolling_transfers_task_routine( | ||||
|         self, | ||||
|         _stop_token: StopToken, | ||||
|         last_ts: u64, | ||||
| @@ -38,23 +38,4 @@ impl RoutingTable { | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     // Kick the queued buckets in the routing table to free dead nodes if necessary
 | ||||
|     // Attempts to keep the size of the routing table down to the bucket depth
 | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     pub(super) async fn kick_buckets_task_routine( | ||||
|         self, | ||||
|         _stop_token: StopToken, | ||||
|         _last_ts: u64, | ||||
|         cur_ts: u64, | ||||
|     ) -> EyreResult<()> { | ||||
|         let kick_queue: Vec<usize> = core::mem::take(&mut *self.unlocked_inner.kick_queue.lock()) | ||||
|             .into_iter() | ||||
|             .collect(); | ||||
|         let mut inner = self.inner.write(); | ||||
|         for idx in kick_queue { | ||||
|             inner.kick_bucket(idx) | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
| @@ -320,6 +320,15 @@ pub struct VeilidStateNetwork { | ||||
|     pub peers: Vec<PeerTableData>, | ||||
| } | ||||
|  | ||||
| #[derive( | ||||
|     Debug, Clone, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize, | ||||
| )] | ||||
| #[archive_attr(repr(C), derive(CheckBytes))] | ||||
| pub struct VeilidStateRoute { | ||||
|     pub dead_routes: Vec<DHTKey>, | ||||
|     pub dead_remote_routes: Vec<DHTKey>, | ||||
| } | ||||
|  | ||||
| #[derive( | ||||
|     Debug, Clone, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize, | ||||
| )] | ||||
| @@ -338,6 +347,7 @@ pub enum VeilidUpdate { | ||||
|     Attachment(VeilidStateAttachment), | ||||
|     Network(VeilidStateNetwork), | ||||
|     Config(VeilidStateConfig), | ||||
|     Route(VeilidStateRoute), | ||||
|     Shutdown, | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user