diff --git a/veilid-cli/src/client_api_connection.rs b/veilid-cli/src/client_api_connection.rs index 03b8343c..834612e2 100644 --- a/veilid-cli/src/client_api_connection.rs +++ b/veilid-cli/src/client_api_connection.rs @@ -92,6 +92,9 @@ impl veilid_client::Server for VeilidClientImpl { VeilidUpdate::Config(config) => { self.comproc.update_config(config); } + VeilidUpdate::Route(route) => { + self.comproc.update_route(route); + } VeilidUpdate::Shutdown => self.comproc.update_shutdown(), } diff --git a/veilid-cli/src/command_processor.rs b/veilid-cli/src/command_processor.rs index e2457d77..81e5e65a 100644 --- a/veilid-cli/src/command_processor.rs +++ b/veilid-cli/src/command_processor.rs @@ -404,6 +404,9 @@ reply - reply to an AppCall not handled directly by the server pub fn update_config(&mut self, config: veilid_core::VeilidStateConfig) { self.inner_mut().ui.set_config(config.config) } + pub fn update_route(&mut self, route: veilid_core::VeilidStateRoute) { + //self.inner_mut().ui.set_config(config.config) + } pub fn update_log(&mut self, log: veilid_core::VeilidLog) { self.inner().ui.add_node_event(format!( diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 6c87aaf1..ba3adccf 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -23,7 +23,7 @@ pub use network_connection::*; use connection_handle::*; use connection_limits::*; use crypto::*; -use futures_util::stream::{FuturesOrdered, FuturesUnordered, StreamExt}; +use futures_util::stream::{FuturesUnordered, StreamExt}; use hashlink::LruCache; use intf::*; #[cfg(not(target_arch = "wasm32"))] @@ -37,8 +37,6 @@ use xx::*; //////////////////////////////////////////////////////////////////////////////////////// -pub const RELAY_MANAGEMENT_INTERVAL_SECS: u32 = 1; -pub const PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS: u32 = 1; pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE; pub const IPADDR_TABLE_SIZE: usize = 1024; pub const IPADDR_MAX_INACTIVE_DURATION_US: u64 = 300_000_000u64; // 5 minutes @@ -48,15 +46,6 @@ pub const PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS: u32 = 60; pub const PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US: u64 = 300_000_000u64; // 5 minutes pub const PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US: u64 = 3600_000_000u64; // 60 minutes pub const BOOT_MAGIC: &[u8; 4] = b"BOOT"; -pub const BOOTSTRAP_TXT_VERSION: u8 = 0; - -#[derive(Clone, Debug)] -pub struct BootstrapRecord { - min_version: u8, - max_version: u8, - dial_info_details: Vec, -} -pub type BootstrapRecordMap = BTreeMap; #[derive(Copy, Clone, Debug, Default)] pub struct ProtocolConfig { @@ -166,11 +155,6 @@ struct NetworkManagerUnlockedInner { update_callback: RwLock>, // Background processes rolling_transfers_task: TickTask, - relay_management_task: TickTask, - private_route_management_task: TickTask, - bootstrap_task: TickTask, - peer_minimum_refresh_task: TickTask, - ping_validator_task: TickTask, public_address_check_task: TickTask, node_info_update_single_future: MustJoinSingleFuture<()>, } @@ -197,7 +181,6 @@ impl NetworkManager { block_store: BlockStore, crypto: Crypto, ) -> NetworkManagerUnlockedInner { - let min_peer_refresh_time_ms = config.get().network.dht.min_peer_refresh_time_ms; NetworkManagerUnlockedInner { config, protected_store, @@ -208,11 +191,6 @@ impl NetworkManager { components: RwLock::new(None), update_callback: RwLock::new(None), rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS), - relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS), - private_route_management_task: TickTask::new(PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS), - bootstrap_task: TickTask::new(1), - peer_minimum_refresh_task: TickTask::new_ms(min_peer_refresh_time_ms), - ping_validator_task: TickTask::new(1), public_address_check_task: TickTask::new(PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS), node_info_update_single_future: MustJoinSingleFuture::new(), } @@ -235,116 +213,9 @@ impl NetworkManager { crypto, )), }; - // Set rolling transfers tick task - { - let this2 = this.clone(); - this.unlocked_inner - .rolling_transfers_task - .set_routine(move |s, l, t| { - Box::pin( - this2 - .clone() - .rolling_transfers_task_routine(s, l, t) - .instrument(trace_span!( - parent: None, - "NetworkManager rolling transfers task routine" - )), - ) - }); - } - // Set relay management tick task - { - let this2 = this.clone(); - this.unlocked_inner - .relay_management_task - .set_routine(move |s, l, t| { - Box::pin( - this2 - .clone() - .relay_management_task_routine(s, l, t) - .instrument(trace_span!(parent: None, "relay management task routine")), - ) - }); - } - // Set private route management tick task - { - let this2 = this.clone(); - this.unlocked_inner - .private_route_management_task - .set_routine(move |s, l, t| { - Box::pin( - this2 - .clone() - .private_route_management_task_routine(s, l, t) - .instrument(trace_span!( - parent: None, - "private route management task routine" - )), - ) - }); - } - // Set bootstrap tick task - { - let this2 = this.clone(); - this.unlocked_inner - .bootstrap_task - .set_routine(move |s, _l, _t| { - Box::pin( - this2 - .clone() - .bootstrap_task_routine(s) - .instrument(trace_span!(parent: None, "bootstrap task routine")), - ) - }); - } - // Set peer minimum refresh tick task - { - let this2 = this.clone(); - this.unlocked_inner - .peer_minimum_refresh_task - .set_routine(move |s, _l, _t| { - Box::pin( - this2 - .clone() - .peer_minimum_refresh_task_routine(s) - .instrument(trace_span!( - parent: None, - "peer minimum refresh task routine" - )), - ) - }); - } - // Set ping validator tick task - { - let this2 = this.clone(); - this.unlocked_inner - .ping_validator_task - .set_routine(move |s, l, t| { - Box::pin( - this2 - .clone() - .ping_validator_task_routine(s, l, t) - .instrument(trace_span!(parent: None, "ping validator task routine")), - ) - }); - } - // Set public address check task - { - let this2 = this.clone(); - this.unlocked_inner - .public_address_check_task - .set_routine(move |s, l, t| { - Box::pin( - this2 - .clone() - .public_address_check_task_routine(s, l, t) - .instrument(trace_span!( - parent: None, - "public address check task routine" - )), - ) - }); - } + + this.start_tasks(); + this } pub fn config(&self) -> VeilidConfig { @@ -492,36 +363,7 @@ impl NetworkManager { debug!("starting network manager shutdown"); // Cancel all tasks - debug!("stopping rolling transfers task"); - if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await { - warn!("rolling_transfers_task not stopped: {}", e); - } - debug!("stopping relay management task"); - if let Err(e) = self.unlocked_inner.relay_management_task.stop().await { - warn!("relay_management_task not stopped: {}", e); - } - debug!("stopping bootstrap task"); - if let Err(e) = self.unlocked_inner.bootstrap_task.stop().await { - error!("bootstrap_task not stopped: {}", e); - } - debug!("stopping peer minimum refresh task"); - if let Err(e) = self.unlocked_inner.peer_minimum_refresh_task.stop().await { - error!("peer_minimum_refresh_task not stopped: {}", e); - } - debug!("stopping ping_validator task"); - if let Err(e) = self.unlocked_inner.ping_validator_task.stop().await { - error!("ping_validator_task not stopped: {}", e); - } - debug!("stopping node info update singlefuture"); - if self - .unlocked_inner - .node_info_update_single_future - .join() - .await - .is_err() - { - error!("node_info_update_single_future not stopped"); - } + self.stop_tasks().await; // Shutdown network components if they started up debug!("shutting down network components"); @@ -597,53 +439,6 @@ impl NetworkManager { net.needs_restart() } - pub async fn tick(&self) -> EyreResult<()> { - let routing_table = self.routing_table(); - let net = self.net(); - let receipt_manager = self.receipt_manager(); - - // Run the rolling transfers task - self.unlocked_inner.rolling_transfers_task.tick().await?; - - // Run the relay management task - self.unlocked_inner.relay_management_task.tick().await?; - - // See how many live PublicInternet entries we have - let live_public_internet_entry_count = routing_table.get_entry_count( - RoutingDomain::PublicInternet.into(), - BucketEntryState::Unreliable, - ); - let min_peer_count = self.with_config(|c| c.network.dht.min_peer_count as usize); - - // If none, then add the bootstrap nodes to it - if live_public_internet_entry_count == 0 { - self.unlocked_inner.bootstrap_task.tick().await?; - } - // If we still don't have enough peers, find nodes until we do - else if !self.unlocked_inner.bootstrap_task.is_running() - && live_public_internet_entry_count < min_peer_count - { - self.unlocked_inner.peer_minimum_refresh_task.tick().await?; - } - - // Ping validate some nodes to groom the table - self.unlocked_inner.ping_validator_task.tick().await?; - - // Run the routing table tick - routing_table.tick().await?; - - // Run the low level network tick - net.tick().await?; - - // Run the receipt manager tick - receipt_manager.tick().await?; - - // Purge the client whitelist - self.purge_client_whitelist(); - - Ok(()) - } - /// Get our node's capabilities in the PublicInternet routing domain fn generate_public_internet_node_status(&self) -> PublicInternetNodeStatus { let own_peer_info = self diff --git a/veilid-core/src/network_manager/tasks.rs b/veilid-core/src/network_manager/tasks.rs deleted file mode 100644 index 3786410a..00000000 --- a/veilid-core/src/network_manager/tasks.rs +++ /dev/null @@ -1,699 +0,0 @@ -use super::*; - -use crate::crypto::*; -use crate::xx::*; -use futures_util::FutureExt; -use stop_token::future::FutureExt as StopFutureExt; - -impl NetworkManager { - // Bootstrap lookup process - #[instrument(level = "trace", skip(self), ret, err)] - pub(super) async fn resolve_bootstrap( - &self, - bootstrap: Vec, - ) -> EyreResult { - // Resolve from bootstrap root to bootstrap hostnames - let mut bsnames = Vec::::new(); - for bh in bootstrap { - // Get TXT record for bootstrap (bootstrap.veilid.net, or similar) - let records = intf::txt_lookup(&bh).await?; - for record in records { - // Split the bootstrap name record by commas - for rec in record.split(',') { - let rec = rec.trim(); - // If the name specified is fully qualified, go with it - let bsname = if rec.ends_with('.') { - rec.to_string() - } - // If the name is not fully qualified, prepend it to the bootstrap name - else { - format!("{}.{}", rec, bh) - }; - - // Add to the list of bootstrap name to look up - bsnames.push(bsname); - } - } - } - - // Get bootstrap nodes from hostnames concurrently - let mut unord = FuturesUnordered::new(); - for bsname in bsnames { - unord.push( - async move { - // look up boostrap node txt records - let bsnirecords = match intf::txt_lookup(&bsname).await { - Err(e) => { - warn!("bootstrap node txt lookup failed for {}: {}", bsname, e); - return None; - } - Ok(v) => v, - }; - // for each record resolve into key/bootstraprecord pairs - let mut bootstrap_records: Vec<(DHTKey, BootstrapRecord)> = Vec::new(); - for bsnirecord in bsnirecords { - // Bootstrap TXT Record Format Version 0: - // txt_version,min_version,max_version,nodeid,hostname,dialinfoshort* - // - // Split bootstrap node record by commas. Example: - // 0,0,0,7lxDEabK_qgjbe38RtBa3IZLrud84P6NhGP-pRTZzdQ,bootstrap-1.dev.veilid.net,T5150,U5150,W5150/ws - let records: Vec = bsnirecord - .trim() - .split(',') - .map(|x| x.trim().to_owned()) - .collect(); - if records.len() < 6 { - warn!("invalid number of fields in bootstrap txt record"); - continue; - } - - // Bootstrap TXT record version - let txt_version: u8 = match records[0].parse::() { - Ok(v) => v, - Err(e) => { - warn!( - "invalid txt_version specified in bootstrap node txt record: {}", - e - ); - continue; - } - }; - if txt_version != BOOTSTRAP_TXT_VERSION { - warn!("unsupported bootstrap txt record version"); - continue; - } - - // Min/Max wire protocol version - let min_version: u8 = match records[1].parse::() { - Ok(v) => v, - Err(e) => { - warn!( - "invalid min_version specified in bootstrap node txt record: {}", - e - ); - continue; - } - }; - let max_version: u8 = match records[2].parse::() { - Ok(v) => v, - Err(e) => { - warn!( - "invalid max_version specified in bootstrap node txt record: {}", - e - ); - continue; - } - }; - - // Node Id - let node_id_str = &records[3]; - let node_id_key = match DHTKey::try_decode(node_id_str) { - Ok(v) => v, - Err(e) => { - warn!( - "Invalid node id in bootstrap node record {}: {}", - node_id_str, e - ); - continue; - } - }; - - // Hostname - let hostname_str = &records[4]; - - // If this is our own node id, then we skip it for bootstrap, in case we are a bootstrap node - if self.routing_table().node_id() == node_id_key { - continue; - } - - // Resolve each record and store in node dial infos list - let mut bootstrap_record = BootstrapRecord { - min_version, - max_version, - dial_info_details: Vec::new(), - }; - for rec in &records[5..] { - let rec = rec.trim(); - let dial_infos = match DialInfo::try_vec_from_short(rec, hostname_str) { - Ok(dis) => dis, - Err(e) => { - warn!( - "Couldn't resolve bootstrap node dial info {}: {}", - rec, e - ); - continue; - } - }; - - for di in dial_infos { - bootstrap_record.dial_info_details.push(DialInfoDetail { - dial_info: di, - class: DialInfoClass::Direct, - }); - } - } - bootstrap_records.push((node_id_key, bootstrap_record)); - } - Some(bootstrap_records) - } - .instrument(Span::current()), - ); - } - - let mut bsmap = BootstrapRecordMap::new(); - while let Some(bootstrap_records) = unord.next().await { - if let Some(bootstrap_records) = bootstrap_records { - for (bskey, mut bsrec) in bootstrap_records { - let rec = bsmap.entry(bskey).or_insert_with(|| BootstrapRecord { - min_version: bsrec.min_version, - max_version: bsrec.max_version, - dial_info_details: Vec::new(), - }); - rec.dial_info_details.append(&mut bsrec.dial_info_details); - } - } - } - - Ok(bsmap) - } - - // 'direct' bootstrap task routine for systems incapable of resolving TXT records, such as browser WASM - #[instrument(level = "trace", skip(self), err)] - pub(super) async fn direct_bootstrap_task_routine( - self, - stop_token: StopToken, - bootstrap_dialinfos: Vec, - ) -> EyreResult<()> { - let mut unord = FuturesUnordered::new(); - let routing_table = self.routing_table(); - - for bootstrap_di in bootstrap_dialinfos { - log_net!(debug "direct bootstrap with: {}", bootstrap_di); - - let peer_info = self.boot_request(bootstrap_di).await?; - - log_net!(debug " direct bootstrap peerinfo: {:?}", peer_info); - - // Got peer info, let's add it to the routing table - for pi in peer_info { - let k = pi.node_id.key; - // Register the node - if let Some(nr) = routing_table.register_node_with_signed_node_info( - RoutingDomain::PublicInternet, - k, - pi.signed_node_info, - false, - ) { - // Add this our futures to process in parallel - let routing_table = routing_table.clone(); - unord.push( - // lets ask bootstrap to find ourselves now - async move { routing_table.reverse_find_node(nr, true).await } - .instrument(Span::current()), - ); - } - } - } - - // Wait for all bootstrap operations to complete before we complete the singlefuture - while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} - - Ok(()) - } - - #[instrument(level = "trace", skip(self), err)] - pub(super) async fn bootstrap_task_routine(self, stop_token: StopToken) -> EyreResult<()> { - let (bootstrap, bootstrap_nodes) = { - let c = self.unlocked_inner.config.get(); - ( - c.network.bootstrap.clone(), - c.network.bootstrap_nodes.clone(), - ) - }; - let routing_table = self.routing_table(); - - log_net!(debug "--- bootstrap_task"); - - // See if we are specifying a direct dialinfo for bootstrap, if so use the direct mechanism - if !bootstrap.is_empty() && bootstrap_nodes.is_empty() { - let mut bootstrap_dialinfos = Vec::::new(); - for b in &bootstrap { - if let Ok(bootstrap_di_vec) = DialInfo::try_vec_from_url(&b) { - for bootstrap_di in bootstrap_di_vec { - bootstrap_dialinfos.push(bootstrap_di); - } - } - } - if bootstrap_dialinfos.len() > 0 { - return self - .direct_bootstrap_task_routine(stop_token, bootstrap_dialinfos) - .await; - } - } - - // If we aren't specifying a bootstrap node list explicitly, then pull from the bootstrap server(s) - let bsmap: BootstrapRecordMap = if !bootstrap_nodes.is_empty() { - let mut bsmap = BootstrapRecordMap::new(); - let mut bootstrap_node_dial_infos = Vec::new(); - for b in bootstrap_nodes { - let (id_str, di_str) = b - .split_once('@') - .ok_or_else(|| eyre!("Invalid node dial info in bootstrap entry"))?; - let node_id = - NodeId::from_str(id_str).wrap_err("Invalid node id in bootstrap entry")?; - let dial_info = - DialInfo::from_str(di_str).wrap_err("Invalid dial info in bootstrap entry")?; - bootstrap_node_dial_infos.push((node_id, dial_info)); - } - for (node_id, dial_info) in bootstrap_node_dial_infos { - bsmap - .entry(node_id.key) - .or_insert_with(|| BootstrapRecord { - min_version: MIN_CRYPTO_VERSION, - max_version: MAX_CRYPTO_VERSION, - dial_info_details: Vec::new(), - }) - .dial_info_details - .push(DialInfoDetail { - dial_info, - class: DialInfoClass::Direct, // Bootstraps are always directly reachable - }); - } - bsmap - } else { - // Resolve bootstrap servers and recurse their TXT entries - self.resolve_bootstrap(bootstrap).await? - }; - - // Map all bootstrap entries to a single key with multiple dialinfo - - // Run all bootstrap operations concurrently - let mut unord = FuturesUnordered::new(); - for (k, mut v) in bsmap { - // Sort dial info so we get the preferred order correct - v.dial_info_details.sort(); - - log_net!("--- bootstrapping {} with {:?}", k.encode(), &v); - - // Make invalid signed node info (no signature) - if let Some(nr) = routing_table.register_node_with_signed_node_info( - RoutingDomain::PublicInternet, - k, - SignedNodeInfo::Direct(SignedDirectNodeInfo::with_no_signature(NodeInfo { - network_class: NetworkClass::InboundCapable, // Bootstraps are always inbound capable - outbound_protocols: ProtocolTypeSet::only(ProtocolType::UDP), // Bootstraps do not participate in relaying and will not make outbound requests, but will have UDP enabled - address_types: AddressTypeSet::all(), // Bootstraps are always IPV4 and IPV6 capable - min_version: v.min_version, // Minimum crypto version specified in txt record - max_version: v.max_version, // Maximum crypto version specified in txt record - dial_info_detail_list: v.dial_info_details, // Dial info is as specified in the bootstrap list - })), - true, - ) { - // Add this our futures to process in parallel - let routing_table = routing_table.clone(); - unord.push( - async move { - // Need VALID signed peer info, so ask bootstrap to find_node of itself - // which will ensure it has the bootstrap's signed peer info as part of the response - let _ = routing_table.find_target(nr.clone()).await; - - // Ensure we got the signed peer info - if !nr.signed_node_info_has_valid_signature(RoutingDomain::PublicInternet) { - log_net!(warn - "bootstrap at {:?} did not return valid signed node info", - nr - ); - // If this node info is invalid, it will time out after being unpingable - } else { - // otherwise this bootstrap is valid, lets ask it to find ourselves now - routing_table.reverse_find_node(nr, true).await - } - } - .instrument(Span::current()), - ); - } - } - - // Wait for all bootstrap operations to complete before we complete the singlefuture - while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} - Ok(()) - } - - // Ping each node in the routing table if they need to be pinged - // to determine their reliability - #[instrument(level = "trace", skip(self), err)] - fn ping_validator_public_internet( - &self, - cur_ts: u64, - unord: &mut FuturesUnordered< - SendPinBoxFuture>>, RPCError>>, - >, - ) -> EyreResult<()> { - let rpc = self.rpc_processor(); - let routing_table = self.routing_table(); - - // Get all nodes needing pings in the PublicInternet routing domain - let node_refs = routing_table.get_nodes_needing_ping(RoutingDomain::PublicInternet, cur_ts); - - // Look up any NAT mappings we may need to try to preserve with keepalives - let mut mapped_port_info = routing_table.get_low_level_port_info(); - - // Get the PublicInternet relay if we are using one - let opt_relay_nr = routing_table.relay_node(RoutingDomain::PublicInternet); - let opt_relay_id = opt_relay_nr.map(|nr| nr.node_id()); - - // Get our publicinternet dial info - let dids = routing_table.all_filtered_dial_info_details( - RoutingDomain::PublicInternet.into(), - &DialInfoFilter::all(), - ); - - // For all nodes needing pings, figure out how many and over what protocols - for nr in node_refs { - // If this is a relay, let's check for NAT keepalives - let mut did_pings = false; - if Some(nr.node_id()) == opt_relay_id { - // Relay nodes get pinged over all protocols we have inbound dialinfo for - // This is so we can preserve the inbound NAT mappings at our router - for did in &dids { - // Do we need to do this ping? - // Check if we have already pinged over this low-level-protocol/address-type/port combo - // We want to ensure we do the bare minimum required here - let pt = did.dial_info.protocol_type(); - let at = did.dial_info.address_type(); - let needs_ping = if let Some((llpt, port)) = - mapped_port_info.protocol_to_port.get(&(pt, at)) - { - mapped_port_info - .low_level_protocol_ports - .remove(&(*llpt, at, *port)) - } else { - false - }; - if needs_ping { - let rpc = rpc.clone(); - let dif = did.dial_info.make_filter(); - let nr_filtered = - nr.filtered_clone(NodeRefFilter::new().with_dial_info_filter(dif)); - log_net!("--> Keepalive ping to {:?}", nr_filtered); - unord.push( - async move { rpc.rpc_call_status(Destination::direct(nr_filtered)).await } - .instrument(Span::current()) - .boxed(), - ); - did_pings = true; - } - } - } - // Just do a single ping with the best protocol for all the other nodes, - // ensuring that we at least ping a relay with -something- even if we didnt have - // any mapped ports to preserve - if !did_pings { - let rpc = rpc.clone(); - unord.push( - async move { rpc.rpc_call_status(Destination::direct(nr)).await } - .instrument(Span::current()) - .boxed(), - ); - } - } - - Ok(()) - } - - // Ping each node in the LocalNetwork routing domain if they - // need to be pinged to determine their reliability - #[instrument(level = "trace", skip(self), err)] - fn ping_validator_local_network( - &self, - cur_ts: u64, - unord: &mut FuturesUnordered< - SendPinBoxFuture>>, RPCError>>, - >, - ) -> EyreResult<()> { - let rpc = self.rpc_processor(); - let routing_table = self.routing_table(); - - // Get all nodes needing pings in the LocalNetwork routing domain - let node_refs = routing_table.get_nodes_needing_ping(RoutingDomain::LocalNetwork, cur_ts); - - // For all nodes needing pings, figure out how many and over what protocols - for nr in node_refs { - let rpc = rpc.clone(); - - // Just do a single ping with the best protocol for all the nodes - unord.push( - async move { rpc.rpc_call_status(Destination::direct(nr)).await } - .instrument(Span::current()) - .boxed(), - ); - } - - Ok(()) - } - - // Ping each node in the routing table if they need to be pinged - // to determine their reliability - #[instrument(level = "trace", skip(self), err)] - pub(super) async fn ping_validator_task_routine( - self, - stop_token: StopToken, - _last_ts: u64, - cur_ts: u64, - ) -> EyreResult<()> { - let mut unord = FuturesUnordered::new(); - - // PublicInternet - self.ping_validator_public_internet(cur_ts, &mut unord)?; - - // LocalNetwork - self.ping_validator_local_network(cur_ts, &mut unord)?; - - // Wait for ping futures to complete in parallel - while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} - - Ok(()) - } - - // Ask our remaining peers to give us more peers before we go - // back to the bootstrap servers to keep us from bothering them too much - // This only adds PublicInternet routing domain peers. The discovery - // mechanism for LocalNetwork suffices for locating all the local network - // peers that are available. This, however, may query other LocalNetwork - // nodes for their PublicInternet peers, which is a very fast way to get - // a new node online. - #[instrument(level = "trace", skip(self), err)] - pub(super) async fn peer_minimum_refresh_task_routine( - self, - stop_token: StopToken, - ) -> EyreResult<()> { - let routing_table = self.routing_table(); - let mut ord = FuturesOrdered::new(); - let min_peer_count = { - let c = self.unlocked_inner.config.get(); - c.network.dht.min_peer_count as usize - }; - - // For the PublicInternet routing domain, get list of all peers we know about - // even the unreliable ones, and ask them to find nodes close to our node too - let noderefs = routing_table.find_fastest_nodes( - min_peer_count, - VecDeque::new(), - |_rti, k: DHTKey, v: Option>| { - NodeRef::new(routing_table.clone(), k, v.unwrap().clone(), None) - }, - ); - for nr in noderefs { - let routing_table = routing_table.clone(); - ord.push_back( - async move { routing_table.reverse_find_node(nr, false).await } - .instrument(Span::current()), - ); - } - - // do peer minimum search in order from fastest to slowest - while let Ok(Some(_)) = ord.next().timeout_at(stop_token.clone()).await {} - - Ok(()) - } - - // Keep relays assigned and accessible - #[instrument(level = "trace", skip(self), err)] - pub(super) async fn relay_management_task_routine( - self, - _stop_token: StopToken, - _last_ts: u64, - cur_ts: u64, - ) -> EyreResult<()> { - // Get our node's current node info and network class and do the right thing - let routing_table = self.routing_table(); - let own_peer_info = routing_table.get_own_peer_info(RoutingDomain::PublicInternet); - let own_node_info = own_peer_info.signed_node_info.node_info(); - let network_class = routing_table.get_network_class(RoutingDomain::PublicInternet); - - // Get routing domain editor - let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet); - - // Do we know our network class yet? - if let Some(network_class) = network_class { - // If we already have a relay, see if it is dead, or if we don't need it any more - let has_relay = { - if let Some(relay_node) = routing_table.relay_node(RoutingDomain::PublicInternet) { - let state = relay_node.state(cur_ts); - // Relay node is dead or no longer needed - if matches!(state, BucketEntryState::Dead) { - info!("Relay node died, dropping relay {}", relay_node); - editor.clear_relay_node(); - false - } else if !own_node_info.requires_relay() { - info!( - "Relay node no longer required, dropping relay {}", - relay_node - ); - editor.clear_relay_node(); - false - } else { - true - } - } else { - false - } - }; - - // Do we need a relay? - if !has_relay && own_node_info.requires_relay() { - // Do we want an outbound relay? - let mut got_outbound_relay = false; - if network_class.outbound_wants_relay() { - // The outbound relay is the host of the PWA - if let Some(outbound_relay_peerinfo) = intf::get_outbound_relay_peer().await { - // Register new outbound relay - if let Some(nr) = routing_table.register_node_with_signed_node_info( - RoutingDomain::PublicInternet, - outbound_relay_peerinfo.node_id.key, - outbound_relay_peerinfo.signed_node_info, - false, - ) { - info!("Outbound relay node selected: {}", nr); - editor.set_relay_node(nr); - got_outbound_relay = true; - } - } - } - if !got_outbound_relay { - // Find a node in our routing table that is an acceptable inbound relay - if let Some(nr) = - routing_table.find_inbound_relay(RoutingDomain::PublicInternet, cur_ts) - { - info!("Inbound relay node selected: {}", nr); - editor.set_relay_node(nr); - } - } - } - } - - // Commit the changes - editor.commit().await; - - Ok(()) - } - - // Keep private routes assigned and accessible - #[instrument(level = "trace", skip(self), err)] - pub(super) async fn private_route_management_task_routine( - self, - _stop_token: StopToken, - _last_ts: u64, - cur_ts: u64, - ) -> EyreResult<()> { - // Get our node's current node info and network class and do the right thing - let routing_table = self.routing_table(); - let own_peer_info = routing_table.get_own_peer_info(RoutingDomain::PublicInternet); - let network_class = routing_table.get_network_class(RoutingDomain::PublicInternet); - - // Get routing domain editor - let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet); - - // Do we know our network class yet? - if let Some(network_class) = network_class { - - // see if we have any routes that need testing - } - - // Commit the changes - editor.commit().await; - - Ok(()) - } - - // Compute transfer statistics for the low level network - #[instrument(level = "trace", skip(self), err)] - pub(super) async fn rolling_transfers_task_routine( - self, - _stop_token: StopToken, - last_ts: u64, - cur_ts: u64, - ) -> EyreResult<()> { - // log_net!("--- network manager rolling_transfers task"); - { - let inner = &mut *self.inner.lock(); - - // Roll the low level network transfer stats for our address - inner - .stats - .self_stats - .transfer_stats_accounting - .roll_transfers(last_ts, cur_ts, &mut inner.stats.self_stats.transfer_stats); - - // Roll all per-address transfers - let mut dead_addrs: HashSet = HashSet::new(); - for (addr, stats) in &mut inner.stats.per_address_stats { - stats.transfer_stats_accounting.roll_transfers( - last_ts, - cur_ts, - &mut stats.transfer_stats, - ); - - // While we're here, lets see if this address has timed out - if cur_ts - stats.last_seen_ts >= IPADDR_MAX_INACTIVE_DURATION_US { - // it's dead, put it in the dead list - dead_addrs.insert(*addr); - } - } - - // Remove the dead addresses from our tables - for da in &dead_addrs { - inner.stats.per_address_stats.remove(da); - } - } - - // Send update - self.send_network_update(); - - Ok(()) - } - - // Clean up the public address check tables, removing entries that have timed out - #[instrument(level = "trace", skip(self), err)] - pub(super) async fn public_address_check_task_routine( - self, - stop_token: StopToken, - _last_ts: u64, - cur_ts: u64, - ) -> EyreResult<()> { - // go through public_address_inconsistencies_table and time out things that have expired - let mut inner = self.inner.lock(); - for (_, pait_v) in &mut inner.public_address_inconsistencies_table { - let mut expired = Vec::new(); - for (addr, exp_ts) in pait_v.iter() { - if *exp_ts <= cur_ts { - expired.push(*addr); - } - } - for exp in expired { - pait_v.remove(&exp); - } - } - Ok(()) - } -} diff --git a/veilid-core/src/network_manager/tasks/mod.rs b/veilid-core/src/network_manager/tasks/mod.rs new file mode 100644 index 00000000..108afd2d --- /dev/null +++ b/veilid-core/src/network_manager/tasks/mod.rs @@ -0,0 +1,82 @@ +pub mod public_address_check; +pub mod rolling_transfers; + +use super::*; + +impl NetworkManager { + pub(crate) fn start_tasks(&self) { + // Set rolling transfers tick task + { + let this = self.clone(); + self.unlocked_inner + .rolling_transfers_task + .set_routine(move |s, l, t| { + Box::pin( + this.clone() + .rolling_transfers_task_routine(s, l, t) + .instrument(trace_span!( + parent: None, + "NetworkManager rolling transfers task routine" + )), + ) + }); + } + + // Set public address check task + { + let this = self.clone(); + self.unlocked_inner + .public_address_check_task + .set_routine(move |s, l, t| { + Box::pin( + this.clone() + .public_address_check_task_routine(s, l, t) + .instrument(trace_span!( + parent: None, + "public address check task routine" + )), + ) + }); + } + } + + pub async fn tick(&self) -> EyreResult<()> { + let routing_table = self.routing_table(); + let net = self.net(); + let receipt_manager = self.receipt_manager(); + + // Run the rolling transfers task + self.unlocked_inner.rolling_transfers_task.tick().await?; + + // Run the routing table tick + routing_table.tick().await?; + + // Run the low level network tick + net.tick().await?; + + // Run the receipt manager tick + receipt_manager.tick().await?; + + // Purge the client whitelist + self.purge_client_whitelist(); + + Ok(()) + } + + pub(crate) async fn stop_tasks(&self) { + debug!("stopping rolling transfers task"); + if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await { + warn!("rolling_transfers_task not stopped: {}", e); + } + debug!("stopping node info update singlefuture"); + if self + .unlocked_inner + .node_info_update_single_future + .join() + .await + .is_err() + { + error!("node_info_update_single_future not stopped"); + } + } +} diff --git a/veilid-core/src/network_manager/tasks/public_address_check.rs b/veilid-core/src/network_manager/tasks/public_address_check.rs new file mode 100644 index 00000000..92303369 --- /dev/null +++ b/veilid-core/src/network_manager/tasks/public_address_check.rs @@ -0,0 +1,28 @@ +use super::super::*; +use crate::xx::*; + +impl NetworkManager { + // Clean up the public address check tables, removing entries that have timed out + #[instrument(level = "trace", skip(self), err)] + pub(crate) async fn public_address_check_task_routine( + self, + stop_token: StopToken, + _last_ts: u64, + cur_ts: u64, + ) -> EyreResult<()> { + // go through public_address_inconsistencies_table and time out things that have expired + let mut inner = self.inner.lock(); + for (_, pait_v) in &mut inner.public_address_inconsistencies_table { + let mut expired = Vec::new(); + for (addr, exp_ts) in pait_v.iter() { + if *exp_ts <= cur_ts { + expired.push(*addr); + } + } + for exp in expired { + pait_v.remove(&exp); + } + } + Ok(()) + } +} diff --git a/veilid-core/src/network_manager/tasks/rolling_transfers.rs b/veilid-core/src/network_manager/tasks/rolling_transfers.rs new file mode 100644 index 00000000..4007a4a9 --- /dev/null +++ b/veilid-core/src/network_manager/tasks/rolling_transfers.rs @@ -0,0 +1,52 @@ +use super::super::*; + +use crate::xx::*; + +impl NetworkManager { + // Compute transfer statistics for the low level network + #[instrument(level = "trace", skip(self), err)] + pub(crate) async fn rolling_transfers_task_routine( + self, + _stop_token: StopToken, + last_ts: u64, + cur_ts: u64, + ) -> EyreResult<()> { + // log_net!("--- network manager rolling_transfers task"); + { + let inner = &mut *self.inner.lock(); + + // Roll the low level network transfer stats for our address + inner + .stats + .self_stats + .transfer_stats_accounting + .roll_transfers(last_ts, cur_ts, &mut inner.stats.self_stats.transfer_stats); + + // Roll all per-address transfers + let mut dead_addrs: HashSet = HashSet::new(); + for (addr, stats) in &mut inner.stats.per_address_stats { + stats.transfer_stats_accounting.roll_transfers( + last_ts, + cur_ts, + &mut stats.transfer_stats, + ); + + // While we're here, lets see if this address has timed out + if cur_ts - stats.last_seen_ts >= IPADDR_MAX_INACTIVE_DURATION_US { + // it's dead, put it in the dead list + dead_addrs.insert(*addr); + } + } + + // Remove the dead addresses from our tables + for da in &dead_addrs { + inner.stats.per_address_stats.remove(da); + } + } + + // Send update + self.send_network_update(); + + Ok(()) + } +} diff --git a/veilid-core/src/routing_table/debug.rs b/veilid-core/src/routing_table/debug.rs index 3170fae6..fcac7958 100644 --- a/veilid-core/src/routing_table/debug.rs +++ b/veilid-core/src/routing_table/debug.rs @@ -1,4 +1,5 @@ use super::*; +use routing_table::tasks::bootstrap::BOOTSTRAP_TXT_VERSION; impl RoutingTable { pub(crate) fn debug_info_nodeinfo(&self) -> String { diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 89370fc4..1e8e5ea5 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -30,6 +30,8 @@ pub use routing_table_inner::*; pub use stats_accounting::*; ////////////////////////////////////////////////////////////////////////// +pub const RELAY_MANAGEMENT_INTERVAL_SECS: u32 = 1; +pub const PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS: u32 = 1; pub type LowLevelProtocolPorts = BTreeSet<(LowLevelProtocolType, AddressType, u16)>; pub type ProtocolToPortMapping = BTreeMap<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>; @@ -66,6 +68,16 @@ pub(super) struct RoutingTableUnlockedInner { rolling_transfers_task: TickTask, /// Backgroup process to purge dead routing table entries when necessary kick_buckets_task: TickTask, + /// Background process to get our initial routing table + bootstrap_task: TickTask, + /// Background process to ensure we have enough nodes in our routing table + peer_minimum_refresh_task: TickTask, + /// Background process to check nodes to see if they are still alive and for reliability + ping_validator_task: TickTask, + /// Background process to keep relays up + relay_management_task: TickTask, + /// Background process to keep private routes up + private_route_management_task: TickTask, } #[derive(Clone)] @@ -88,6 +100,11 @@ impl RoutingTable { kick_queue: Mutex::new(BTreeSet::default()), rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS), kick_buckets_task: TickTask::new(1), + bootstrap_task: TickTask::new(1), + peer_minimum_refresh_task: TickTask::new_ms(c.network.dht.min_peer_refresh_time_ms), + ping_validator_task: TickTask::new(1), + relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS), + private_route_management_task: TickTask::new(PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS), } } pub fn new(network_manager: NetworkManager) -> Self { @@ -99,38 +116,8 @@ impl RoutingTable { unlocked_inner, }; - // Set rolling transfers tick task - { - let this2 = this.clone(); - this.unlocked_inner - .rolling_transfers_task - .set_routine(move |s, l, t| { - Box::pin( - this2 - .clone() - .rolling_transfers_task_routine(s, l, t) - .instrument(trace_span!( - parent: None, - "RoutingTable rolling transfers task routine" - )), - ) - }); - } + this.start_tasks(); - // Set kick buckets tick task - { - let this2 = this.clone(); - this.unlocked_inner - .kick_buckets_task - .set_routine(move |s, l, t| { - Box::pin( - this2 - .clone() - .kick_buckets_task_routine(s, l, t) - .instrument(trace_span!(parent: None, "kick buckets task routine")), - ) - }); - } this } @@ -140,6 +127,12 @@ impl RoutingTable { pub fn rpc_processor(&self) -> RPCProcessor { self.network_manager().rpc_processor() } + pub fn with_config(&self, f: F) -> R + where + F: FnOnce(&VeilidConfigInner) -> R, + { + f(&*self.unlocked_inner.config.get()) + } pub fn node_id(&self) -> DHTKey { self.unlocked_inner.node_id @@ -194,15 +187,8 @@ impl RoutingTable { pub async fn terminate(&self) { debug!("starting routing table terminate"); - // Cancel all tasks being ticked - debug!("stopping rolling transfers task"); - if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await { - error!("rolling_transfers_task not stopped: {}", e); - } - debug!("stopping kick buckets task"); - if let Err(e) = self.unlocked_inner.kick_buckets_task.stop().await { - error!("kick_buckets_task not stopped: {}", e); - } + // Stop tasks + self.stop_tasks().await; // Load bucket entries from table db if possible debug!("saving routing table entries"); @@ -551,21 +537,6 @@ impl RoutingTable { ) } - /// Ticks about once per second - /// to run tick tasks which may run at slower tick rates as configured - pub async fn tick(&self) -> EyreResult<()> { - // Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs - self.unlocked_inner.rolling_transfers_task.tick().await?; - - // Kick buckets task - let kick_bucket_queue_count = self.unlocked_inner.kick_queue.lock().len(); - if kick_bucket_queue_count > 0 { - self.unlocked_inner.kick_buckets_task.tick().await?; - } - - Ok(()) - } - ////////////////////////////////////////////////////////////////////// // Routing Table Health Metrics diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index b1ebab55..5723fbcc 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -839,6 +839,7 @@ impl RouteSpecStore { pub async fn test_route(&self, key: &DHTKey) -> EyreResult { let rpc_processor = self.unlocked_inner.routing_table.rpc_processor(); + // Make loopback route to test with let dest = { let private_route = self.assemble_private_route(key, None)?; @@ -848,48 +849,17 @@ impl RouteSpecStore { let stability = rsd.stability; let sequencing = rsd.sequencing; - // Routes with just one hop can be pinged directly - // More than one hop can be pinged across the route with the target being the second to last hop - if rsd.hops.len() == 1 { - let safety_spec = SafetySpec { - preferred_route: Some(key.clone()), - hop_count, - stability, - sequencing, - }; - let safety_selection = SafetySelection::Safe(safety_spec); + let safety_spec = SafetySpec { + preferred_route: Some(key.clone()), + hop_count, + stability, + sequencing, + }; + let safety_selection = SafetySelection::Safe(safety_spec); - Destination::PrivateRoute { - private_route, - safety_selection, - } - } else { - // let target = rsd.hop_node_refs[rsd.hops.len() - 2].clone(); - // let safety_spec = SafetySpec { - // preferred_route: Some(key.clone()), - // hop_count, - // stability, - // sequencing, - // }; - // let safety_selection = SafetySelection::Safe(safety_spec); - - // Destination::Direct { - // target, - // safety_selection, - // } - - let safety_spec = SafetySpec { - preferred_route: Some(key.clone()), - hop_count, - stability, - sequencing, - }; - let safety_selection = SafetySelection::Safe(safety_spec); - - Destination::PrivateRoute { - private_route, - safety_selection, - } + Destination::PrivateRoute { + private_route, + safety_selection, } }; diff --git a/veilid-core/src/routing_table/tasks/bootstrap.rs b/veilid-core/src/routing_table/tasks/bootstrap.rs new file mode 100644 index 00000000..d7d3da7f --- /dev/null +++ b/veilid-core/src/routing_table/tasks/bootstrap.rs @@ -0,0 +1,347 @@ +use super::super::*; +use crate::xx::*; + +use futures_util::stream::{FuturesUnordered, StreamExt}; +use stop_token::future::FutureExt as StopFutureExt; + +pub const BOOTSTRAP_TXT_VERSION: u8 = 0; + +#[derive(Clone, Debug)] +pub struct BootstrapRecord { + min_version: u8, + max_version: u8, + dial_info_details: Vec, +} +pub type BootstrapRecordMap = BTreeMap; + +impl RoutingTable { + // Bootstrap lookup process + #[instrument(level = "trace", skip(self), ret, err)] + pub(crate) async fn resolve_bootstrap( + &self, + bootstrap: Vec, + ) -> EyreResult { + // Resolve from bootstrap root to bootstrap hostnames + let mut bsnames = Vec::::new(); + for bh in bootstrap { + // Get TXT record for bootstrap (bootstrap.veilid.net, or similar) + let records = intf::txt_lookup(&bh).await?; + for record in records { + // Split the bootstrap name record by commas + for rec in record.split(',') { + let rec = rec.trim(); + // If the name specified is fully qualified, go with it + let bsname = if rec.ends_with('.') { + rec.to_string() + } + // If the name is not fully qualified, prepend it to the bootstrap name + else { + format!("{}.{}", rec, bh) + }; + + // Add to the list of bootstrap name to look up + bsnames.push(bsname); + } + } + } + + // Get bootstrap nodes from hostnames concurrently + let mut unord = FuturesUnordered::new(); + for bsname in bsnames { + unord.push( + async move { + // look up boostrap node txt records + let bsnirecords = match intf::txt_lookup(&bsname).await { + Err(e) => { + warn!("bootstrap node txt lookup failed for {}: {}", bsname, e); + return None; + } + Ok(v) => v, + }; + // for each record resolve into key/bootstraprecord pairs + let mut bootstrap_records: Vec<(DHTKey, BootstrapRecord)> = Vec::new(); + for bsnirecord in bsnirecords { + // Bootstrap TXT Record Format Version 0: + // txt_version,min_version,max_version,nodeid,hostname,dialinfoshort* + // + // Split bootstrap node record by commas. Example: + // 0,0,0,7lxDEabK_qgjbe38RtBa3IZLrud84P6NhGP-pRTZzdQ,bootstrap-1.dev.veilid.net,T5150,U5150,W5150/ws + let records: Vec = bsnirecord + .trim() + .split(',') + .map(|x| x.trim().to_owned()) + .collect(); + if records.len() < 6 { + warn!("invalid number of fields in bootstrap txt record"); + continue; + } + + // Bootstrap TXT record version + let txt_version: u8 = match records[0].parse::() { + Ok(v) => v, + Err(e) => { + warn!( + "invalid txt_version specified in bootstrap node txt record: {}", + e + ); + continue; + } + }; + if txt_version != BOOTSTRAP_TXT_VERSION { + warn!("unsupported bootstrap txt record version"); + continue; + } + + // Min/Max wire protocol version + let min_version: u8 = match records[1].parse::() { + Ok(v) => v, + Err(e) => { + warn!( + "invalid min_version specified in bootstrap node txt record: {}", + e + ); + continue; + } + }; + let max_version: u8 = match records[2].parse::() { + Ok(v) => v, + Err(e) => { + warn!( + "invalid max_version specified in bootstrap node txt record: {}", + e + ); + continue; + } + }; + + // Node Id + let node_id_str = &records[3]; + let node_id_key = match DHTKey::try_decode(node_id_str) { + Ok(v) => v, + Err(e) => { + warn!( + "Invalid node id in bootstrap node record {}: {}", + node_id_str, e + ); + continue; + } + }; + + // Hostname + let hostname_str = &records[4]; + + // If this is our own node id, then we skip it for bootstrap, in case we are a bootstrap node + if self.node_id() == node_id_key { + continue; + } + + // Resolve each record and store in node dial infos list + let mut bootstrap_record = BootstrapRecord { + min_version, + max_version, + dial_info_details: Vec::new(), + }; + for rec in &records[5..] { + let rec = rec.trim(); + let dial_infos = match DialInfo::try_vec_from_short(rec, hostname_str) { + Ok(dis) => dis, + Err(e) => { + warn!( + "Couldn't resolve bootstrap node dial info {}: {}", + rec, e + ); + continue; + } + }; + + for di in dial_infos { + bootstrap_record.dial_info_details.push(DialInfoDetail { + dial_info: di, + class: DialInfoClass::Direct, + }); + } + } + bootstrap_records.push((node_id_key, bootstrap_record)); + } + Some(bootstrap_records) + } + .instrument(Span::current()), + ); + } + + let mut bsmap = BootstrapRecordMap::new(); + while let Some(bootstrap_records) = unord.next().await { + if let Some(bootstrap_records) = bootstrap_records { + for (bskey, mut bsrec) in bootstrap_records { + let rec = bsmap.entry(bskey).or_insert_with(|| BootstrapRecord { + min_version: bsrec.min_version, + max_version: bsrec.max_version, + dial_info_details: Vec::new(), + }); + rec.dial_info_details.append(&mut bsrec.dial_info_details); + } + } + } + + Ok(bsmap) + } + + // 'direct' bootstrap task routine for systems incapable of resolving TXT records, such as browser WASM + #[instrument(level = "trace", skip(self), err)] + pub(crate) async fn direct_bootstrap_task_routine( + self, + stop_token: StopToken, + bootstrap_dialinfos: Vec, + ) -> EyreResult<()> { + let mut unord = FuturesUnordered::new(); + let network_manager = self.network_manager(); + + for bootstrap_di in bootstrap_dialinfos { + log_rtab!(debug "direct bootstrap with: {}", bootstrap_di); + let peer_info = network_manager.boot_request(bootstrap_di).await?; + + log_rtab!(debug " direct bootstrap peerinfo: {:?}", peer_info); + + // Got peer info, let's add it to the routing table + for pi in peer_info { + let k = pi.node_id.key; + // Register the node + if let Some(nr) = self.register_node_with_signed_node_info( + RoutingDomain::PublicInternet, + k, + pi.signed_node_info, + false, + ) { + // Add this our futures to process in parallel + let routing_table = self.clone(); + unord.push( + // lets ask bootstrap to find ourselves now + async move { routing_table.reverse_find_node(nr, true).await } + .instrument(Span::current()), + ); + } + } + } + + // Wait for all bootstrap operations to complete before we complete the singlefuture + while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} + + Ok(()) + } + + #[instrument(level = "trace", skip(self), err)] + pub(crate) async fn bootstrap_task_routine(self, stop_token: StopToken) -> EyreResult<()> { + let (bootstrap, bootstrap_nodes) = self.with_config(|c| { + ( + c.network.bootstrap.clone(), + c.network.bootstrap_nodes.clone(), + ) + }); + + log_rtab!(debug "--- bootstrap_task"); + + // See if we are specifying a direct dialinfo for bootstrap, if so use the direct mechanism + if !bootstrap.is_empty() && bootstrap_nodes.is_empty() { + let mut bootstrap_dialinfos = Vec::::new(); + for b in &bootstrap { + if let Ok(bootstrap_di_vec) = DialInfo::try_vec_from_url(&b) { + for bootstrap_di in bootstrap_di_vec { + bootstrap_dialinfos.push(bootstrap_di); + } + } + } + if bootstrap_dialinfos.len() > 0 { + return self + .direct_bootstrap_task_routine(stop_token, bootstrap_dialinfos) + .await; + } + } + + // If we aren't specifying a bootstrap node list explicitly, then pull from the bootstrap server(s) + let bsmap: BootstrapRecordMap = if !bootstrap_nodes.is_empty() { + let mut bsmap = BootstrapRecordMap::new(); + let mut bootstrap_node_dial_infos = Vec::new(); + for b in bootstrap_nodes { + let (id_str, di_str) = b + .split_once('@') + .ok_or_else(|| eyre!("Invalid node dial info in bootstrap entry"))?; + let node_id = + NodeId::from_str(id_str).wrap_err("Invalid node id in bootstrap entry")?; + let dial_info = + DialInfo::from_str(di_str).wrap_err("Invalid dial info in bootstrap entry")?; + bootstrap_node_dial_infos.push((node_id, dial_info)); + } + for (node_id, dial_info) in bootstrap_node_dial_infos { + bsmap + .entry(node_id.key) + .or_insert_with(|| BootstrapRecord { + min_version: MIN_CRYPTO_VERSION, + max_version: MAX_CRYPTO_VERSION, + dial_info_details: Vec::new(), + }) + .dial_info_details + .push(DialInfoDetail { + dial_info, + class: DialInfoClass::Direct, // Bootstraps are always directly reachable + }); + } + bsmap + } else { + // Resolve bootstrap servers and recurse their TXT entries + self.resolve_bootstrap(bootstrap).await? + }; + + // Map all bootstrap entries to a single key with multiple dialinfo + + // Run all bootstrap operations concurrently + let mut unord = FuturesUnordered::new(); + for (k, mut v) in bsmap { + // Sort dial info so we get the preferred order correct + v.dial_info_details.sort(); + + log_rtab!("--- bootstrapping {} with {:?}", k.encode(), &v); + + // Make invalid signed node info (no signature) + if let Some(nr) = self.register_node_with_signed_node_info( + RoutingDomain::PublicInternet, + k, + SignedNodeInfo::Direct(SignedDirectNodeInfo::with_no_signature(NodeInfo { + network_class: NetworkClass::InboundCapable, // Bootstraps are always inbound capable + outbound_protocols: ProtocolTypeSet::only(ProtocolType::UDP), // Bootstraps do not participate in relaying and will not make outbound requests, but will have UDP enabled + address_types: AddressTypeSet::all(), // Bootstraps are always IPV4 and IPV6 capable + min_version: v.min_version, // Minimum crypto version specified in txt record + max_version: v.max_version, // Maximum crypto version specified in txt record + dial_info_detail_list: v.dial_info_details, // Dial info is as specified in the bootstrap list + })), + true, + ) { + // Add this our futures to process in parallel + let routing_table = self.clone(); + unord.push( + async move { + // Need VALID signed peer info, so ask bootstrap to find_node of itself + // which will ensure it has the bootstrap's signed peer info as part of the response + let _ = routing_table.find_target(nr.clone()).await; + + // Ensure we got the signed peer info + if !nr.signed_node_info_has_valid_signature(RoutingDomain::PublicInternet) { + log_rtab!(warn + "bootstrap at {:?} did not return valid signed node info", + nr + ); + // If this node info is invalid, it will time out after being unpingable + } else { + // otherwise this bootstrap is valid, lets ask it to find ourselves now + routing_table.reverse_find_node(nr, true).await + } + } + .instrument(Span::current()), + ); + } + } + + // Wait for all bootstrap operations to complete before we complete the singlefuture + while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} + Ok(()) + } +} diff --git a/veilid-core/src/routing_table/tasks/kick_buckets.rs b/veilid-core/src/routing_table/tasks/kick_buckets.rs new file mode 100644 index 00000000..730bad1d --- /dev/null +++ b/veilid-core/src/routing_table/tasks/kick_buckets.rs @@ -0,0 +1,23 @@ +use super::super::*; +use crate::xx::*; + +impl RoutingTable { + // Kick the queued buckets in the routing table to free dead nodes if necessary + // Attempts to keep the size of the routing table down to the bucket depth + #[instrument(level = "trace", skip(self), err)] + pub(crate) async fn kick_buckets_task_routine( + self, + _stop_token: StopToken, + _last_ts: u64, + cur_ts: u64, + ) -> EyreResult<()> { + let kick_queue: Vec = core::mem::take(&mut *self.unlocked_inner.kick_queue.lock()) + .into_iter() + .collect(); + let mut inner = self.inner.write(); + for idx in kick_queue { + inner.kick_bucket(idx) + } + Ok(()) + } +} diff --git a/veilid-core/src/routing_table/tasks/mod.rs b/veilid-core/src/routing_table/tasks/mod.rs new file mode 100644 index 00000000..d2992626 --- /dev/null +++ b/veilid-core/src/routing_table/tasks/mod.rs @@ -0,0 +1,202 @@ +pub mod bootstrap; +pub mod kick_buckets; +pub mod peer_minimum_refresh; +pub mod ping_validator; +pub mod private_route_management; +pub mod relay_management; +pub mod rolling_transfers; + +use super::*; + +impl RoutingTable { + pub(crate) fn start_tasks(&self) { + // Set rolling transfers tick task + { + let this = self.clone(); + self.unlocked_inner + .rolling_transfers_task + .set_routine(move |s, l, t| { + Box::pin( + this.clone() + .rolling_transfers_task_routine(s, l, t) + .instrument(trace_span!( + parent: None, + "RoutingTable rolling transfers task routine" + )), + ) + }); + } + + // Set kick buckets tick task + { + let this = self.clone(); + self.unlocked_inner + .kick_buckets_task + .set_routine(move |s, l, t| { + Box::pin( + this.clone() + .kick_buckets_task_routine(s, l, t) + .instrument(trace_span!(parent: None, "kick buckets task routine")), + ) + }); + } + + // Set bootstrap tick task + { + let this = self.clone(); + self.unlocked_inner + .bootstrap_task + .set_routine(move |s, _l, _t| { + Box::pin( + this.clone() + .bootstrap_task_routine(s) + .instrument(trace_span!(parent: None, "bootstrap task routine")), + ) + }); + } + + // Set peer minimum refresh tick task + { + let this = self.clone(); + self.unlocked_inner + .peer_minimum_refresh_task + .set_routine(move |s, _l, _t| { + Box::pin( + this.clone() + .peer_minimum_refresh_task_routine(s) + .instrument(trace_span!( + parent: None, + "peer minimum refresh task routine" + )), + ) + }); + } + + // Set ping validator tick task + { + let this = self.clone(); + self.unlocked_inner + .ping_validator_task + .set_routine(move |s, l, t| { + Box::pin( + this.clone() + .ping_validator_task_routine(s, l, t) + .instrument(trace_span!(parent: None, "ping validator task routine")), + ) + }); + } + + // Set relay management tick task + { + let this = self.clone(); + self.unlocked_inner + .relay_management_task + .set_routine(move |s, l, t| { + Box::pin( + this.clone() + .relay_management_task_routine(s, l, t) + .instrument(trace_span!(parent: None, "relay management task routine")), + ) + }); + } + + // Set private route management tick task + { + let this = self.clone(); + self.unlocked_inner + .private_route_management_task + .set_routine(move |s, l, t| { + Box::pin( + this.clone() + .private_route_management_task_routine(s, l, t) + .instrument(trace_span!( + parent: None, + "private route management task routine" + )), + ) + }); + } + } + + /// Ticks about once per second + /// to run tick tasks which may run at slower tick rates as configured + pub async fn tick(&self) -> EyreResult<()> { + // Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs + self.unlocked_inner.rolling_transfers_task.tick().await?; + + // Kick buckets task + let kick_bucket_queue_count = self.unlocked_inner.kick_queue.lock().len(); + if kick_bucket_queue_count > 0 { + self.unlocked_inner.kick_buckets_task.tick().await?; + } + + // See how many live PublicInternet entries we have + let live_public_internet_entry_count = self.get_entry_count( + RoutingDomain::PublicInternet.into(), + BucketEntryState::Unreliable, + ); + let min_peer_count = self.with_config(|c| c.network.dht.min_peer_count as usize); + + // If none, then add the bootstrap nodes to it + if live_public_internet_entry_count == 0 { + self.unlocked_inner.bootstrap_task.tick().await?; + } + // If we still don't have enough peers, find nodes until we do + else if !self.unlocked_inner.bootstrap_task.is_running() + && live_public_internet_entry_count < min_peer_count + { + self.unlocked_inner.peer_minimum_refresh_task.tick().await?; + } + + // Ping validate some nodes to groom the table + self.unlocked_inner.ping_validator_task.tick().await?; + + // Run the relay management task + self.unlocked_inner.relay_management_task.tick().await?; + + // Run the private route management task + self.unlocked_inner + .private_route_management_task + .tick() + .await?; + + Ok(()) + } + + pub(crate) async fn stop_tasks(&self) { + // Cancel all tasks being ticked + debug!("stopping rolling transfers task"); + if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await { + error!("rolling_transfers_task not stopped: {}", e); + } + debug!("stopping kick buckets task"); + if let Err(e) = self.unlocked_inner.kick_buckets_task.stop().await { + error!("kick_buckets_task not stopped: {}", e); + } + debug!("stopping bootstrap task"); + if let Err(e) = self.unlocked_inner.bootstrap_task.stop().await { + error!("bootstrap_task not stopped: {}", e); + } + debug!("stopping peer minimum refresh task"); + if let Err(e) = self.unlocked_inner.peer_minimum_refresh_task.stop().await { + error!("peer_minimum_refresh_task not stopped: {}", e); + } + debug!("stopping ping_validator task"); + if let Err(e) = self.unlocked_inner.ping_validator_task.stop().await { + error!("ping_validator_task not stopped: {}", e); + } + debug!("stopping relay management task"); + if let Err(e) = self.unlocked_inner.relay_management_task.stop().await { + warn!("relay_management_task not stopped: {}", e); + } + debug!("stopping private route management task"); + if let Err(e) = self + .unlocked_inner + .private_route_management_task + .stop() + .await + { + warn!("private_route_management_task not stopped: {}", e); + } + } +} diff --git a/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs b/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs new file mode 100644 index 00000000..7733755c --- /dev/null +++ b/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs @@ -0,0 +1,47 @@ +use super::super::*; +use crate::xx::*; + +use futures_util::stream::{FuturesOrdered, StreamExt}; +use stop_token::future::FutureExt as StopFutureExt; + +impl RoutingTable { + // Ask our remaining peers to give us more peers before we go + // back to the bootstrap servers to keep us from bothering them too much + // This only adds PublicInternet routing domain peers. The discovery + // mechanism for LocalNetwork suffices for locating all the local network + // peers that are available. This, however, may query other LocalNetwork + // nodes for their PublicInternet peers, which is a very fast way to get + // a new node online. + #[instrument(level = "trace", skip(self), err)] + pub(crate) async fn peer_minimum_refresh_task_routine( + self, + stop_token: StopToken, + ) -> EyreResult<()> { + let min_peer_count = self.with_config(|c| c.network.dht.min_peer_count as usize); + + // For the PublicInternet routing domain, get list of all peers we know about + // even the unreliable ones, and ask them to find nodes close to our node too + let routing_table = self.clone(); + let noderefs = routing_table.find_fastest_nodes( + min_peer_count, + VecDeque::new(), + |_rti, k: DHTKey, v: Option>| { + NodeRef::new(routing_table.clone(), k, v.unwrap().clone(), None) + }, + ); + + let mut ord = FuturesOrdered::new(); + for nr in noderefs { + let routing_table = self.clone(); + ord.push_back( + async move { routing_table.reverse_find_node(nr, false).await } + .instrument(Span::current()), + ); + } + + // do peer minimum search in order from fastest to slowest + while let Ok(Some(_)) = ord.next().timeout_at(stop_token.clone()).await {} + + Ok(()) + } +} diff --git a/veilid-core/src/routing_table/tasks/ping_validator.rs b/veilid-core/src/routing_table/tasks/ping_validator.rs new file mode 100644 index 00000000..976fb79b --- /dev/null +++ b/veilid-core/src/routing_table/tasks/ping_validator.rs @@ -0,0 +1,142 @@ +use super::super::*; +use crate::xx::*; + +use futures_util::stream::{FuturesUnordered, StreamExt}; +use futures_util::FutureExt; +use stop_token::future::FutureExt as StopFutureExt; + +impl RoutingTable { + // Ping each node in the routing table if they need to be pinged + // to determine their reliability + #[instrument(level = "trace", skip(self), err)] + fn ping_validator_public_internet( + &self, + cur_ts: u64, + unord: &mut FuturesUnordered< + SendPinBoxFuture>>, RPCError>>, + >, + ) -> EyreResult<()> { + let rpc = self.rpc_processor(); + + // Get all nodes needing pings in the PublicInternet routing domain + let node_refs = self.get_nodes_needing_ping(RoutingDomain::PublicInternet, cur_ts); + + // Look up any NAT mappings we may need to try to preserve with keepalives + let mut mapped_port_info = self.get_low_level_port_info(); + + // Get the PublicInternet relay if we are using one + let opt_relay_nr = self.relay_node(RoutingDomain::PublicInternet); + let opt_relay_id = opt_relay_nr.map(|nr| nr.node_id()); + + // Get our publicinternet dial info + let dids = self.all_filtered_dial_info_details( + RoutingDomain::PublicInternet.into(), + &DialInfoFilter::all(), + ); + + // For all nodes needing pings, figure out how many and over what protocols + for nr in node_refs { + // If this is a relay, let's check for NAT keepalives + let mut did_pings = false; + if Some(nr.node_id()) == opt_relay_id { + // Relay nodes get pinged over all protocols we have inbound dialinfo for + // This is so we can preserve the inbound NAT mappings at our router + for did in &dids { + // Do we need to do this ping? + // Check if we have already pinged over this low-level-protocol/address-type/port combo + // We want to ensure we do the bare minimum required here + let pt = did.dial_info.protocol_type(); + let at = did.dial_info.address_type(); + let needs_ping = if let Some((llpt, port)) = + mapped_port_info.protocol_to_port.get(&(pt, at)) + { + mapped_port_info + .low_level_protocol_ports + .remove(&(*llpt, at, *port)) + } else { + false + }; + if needs_ping { + let rpc = rpc.clone(); + let dif = did.dial_info.make_filter(); + let nr_filtered = + nr.filtered_clone(NodeRefFilter::new().with_dial_info_filter(dif)); + log_net!("--> Keepalive ping to {:?}", nr_filtered); + unord.push( + async move { rpc.rpc_call_status(Destination::direct(nr_filtered)).await } + .instrument(Span::current()) + .boxed(), + ); + did_pings = true; + } + } + } + // Just do a single ping with the best protocol for all the other nodes, + // ensuring that we at least ping a relay with -something- even if we didnt have + // any mapped ports to preserve + if !did_pings { + let rpc = rpc.clone(); + unord.push( + async move { rpc.rpc_call_status(Destination::direct(nr)).await } + .instrument(Span::current()) + .boxed(), + ); + } + } + + Ok(()) + } + + // Ping each node in the LocalNetwork routing domain if they + // need to be pinged to determine their reliability + #[instrument(level = "trace", skip(self), err)] + fn ping_validator_local_network( + &self, + cur_ts: u64, + unord: &mut FuturesUnordered< + SendPinBoxFuture>>, RPCError>>, + >, + ) -> EyreResult<()> { + let rpc = self.rpc_processor(); + + // Get all nodes needing pings in the LocalNetwork routing domain + let node_refs = self.get_nodes_needing_ping(RoutingDomain::LocalNetwork, cur_ts); + + // For all nodes needing pings, figure out how many and over what protocols + for nr in node_refs { + let rpc = rpc.clone(); + + // Just do a single ping with the best protocol for all the nodes + unord.push( + async move { rpc.rpc_call_status(Destination::direct(nr)).await } + .instrument(Span::current()) + .boxed(), + ); + } + + Ok(()) + } + + // Ping each node in the routing table if they need to be pinged + // to determine their reliability + #[instrument(level = "trace", skip(self), err)] + pub(crate) async fn ping_validator_task_routine( + self, + stop_token: StopToken, + _last_ts: u64, + cur_ts: u64, + ) -> EyreResult<()> { + let mut unord = FuturesUnordered::new(); + + // PublicInternet + self.ping_validator_public_internet(cur_ts, &mut unord)?; + + // LocalNetwork + self.ping_validator_local_network(cur_ts, &mut unord)?; + + // Wait for ping futures to complete in parallel + while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} + + Ok(()) + } +} diff --git a/veilid-core/src/routing_table/tasks/private_route_management.rs b/veilid-core/src/routing_table/tasks/private_route_management.rs new file mode 100644 index 00000000..e1770a1e --- /dev/null +++ b/veilid-core/src/routing_table/tasks/private_route_management.rs @@ -0,0 +1,34 @@ +use super::super::*; +use crate::xx::*; + +use futures_util::stream::{FuturesOrdered, StreamExt}; +use stop_token::future::FutureExt as StopFutureExt; + +impl RoutingTable { + // Keep private routes assigned and accessible + #[instrument(level = "trace", skip(self), err)] + pub(crate) async fn private_route_management_task_routine( + self, + _stop_token: StopToken, + _last_ts: u64, + cur_ts: u64, + ) -> EyreResult<()> { + // Get our node's current node info and network class and do the right thing + let own_peer_info = self.get_own_peer_info(RoutingDomain::PublicInternet); + let network_class = self.get_network_class(RoutingDomain::PublicInternet); + + // Get routing domain editor + let mut editor = self.edit_routing_domain(RoutingDomain::PublicInternet); + + // Do we know our network class yet? + if let Some(network_class) = network_class { + + // see if we have any routes that need testing + } + + // Commit the changes + editor.commit().await; + + Ok(()) + } +} diff --git a/veilid-core/src/routing_table/tasks/relay_management.rs b/veilid-core/src/routing_table/tasks/relay_management.rs new file mode 100644 index 00000000..85f4fd8a --- /dev/null +++ b/veilid-core/src/routing_table/tasks/relay_management.rs @@ -0,0 +1,83 @@ +use super::super::*; +use crate::xx::*; + +impl RoutingTable { + // Keep relays assigned and accessible + #[instrument(level = "trace", skip(self), err)] + pub(crate) async fn relay_management_task_routine( + self, + _stop_token: StopToken, + _last_ts: u64, + cur_ts: u64, + ) -> EyreResult<()> { + // Get our node's current node info and network class and do the right thing + let own_peer_info = self.get_own_peer_info(RoutingDomain::PublicInternet); + let own_node_info = own_peer_info.signed_node_info.node_info(); + let network_class = self.get_network_class(RoutingDomain::PublicInternet); + + // Get routing domain editor + let mut editor = self.edit_routing_domain(RoutingDomain::PublicInternet); + + // Do we know our network class yet? + if let Some(network_class) = network_class { + // If we already have a relay, see if it is dead, or if we don't need it any more + let has_relay = { + if let Some(relay_node) = self.relay_node(RoutingDomain::PublicInternet) { + let state = relay_node.state(cur_ts); + // Relay node is dead or no longer needed + if matches!(state, BucketEntryState::Dead) { + info!("Relay node died, dropping relay {}", relay_node); + editor.clear_relay_node(); + false + } else if !own_node_info.requires_relay() { + info!( + "Relay node no longer required, dropping relay {}", + relay_node + ); + editor.clear_relay_node(); + false + } else { + true + } + } else { + false + } + }; + + // Do we need a relay? + if !has_relay && own_node_info.requires_relay() { + // Do we want an outbound relay? + let mut got_outbound_relay = false; + if network_class.outbound_wants_relay() { + // The outbound relay is the host of the PWA + if let Some(outbound_relay_peerinfo) = intf::get_outbound_relay_peer().await { + // Register new outbound relay + if let Some(nr) = self.register_node_with_signed_node_info( + RoutingDomain::PublicInternet, + outbound_relay_peerinfo.node_id.key, + outbound_relay_peerinfo.signed_node_info, + false, + ) { + info!("Outbound relay node selected: {}", nr); + editor.set_relay_node(nr); + got_outbound_relay = true; + } + } + } + if !got_outbound_relay { + // Find a node in our routing table that is an acceptable inbound relay + if let Some(nr) = self.find_inbound_relay(RoutingDomain::PublicInternet, cur_ts) + { + info!("Inbound relay node selected: {}", nr); + editor.set_relay_node(nr); + } + } + } + } + + // Commit the changes + editor.commit().await; + + Ok(()) + } +} diff --git a/veilid-core/src/routing_table/tasks.rs b/veilid-core/src/routing_table/tasks/rolling_transfers.rs similarity index 60% rename from veilid-core/src/routing_table/tasks.rs rename to veilid-core/src/routing_table/tasks/rolling_transfers.rs index 52bf843c..b97b8afc 100644 --- a/veilid-core/src/routing_table/tasks.rs +++ b/veilid-core/src/routing_table/tasks/rolling_transfers.rs @@ -1,10 +1,10 @@ -use super::*; +use super::super::*; use crate::xx::*; impl RoutingTable { // Compute transfer statistics to determine how 'fast' a node is #[instrument(level = "trace", skip(self), err)] - pub(super) async fn rolling_transfers_task_routine( + pub(crate) async fn rolling_transfers_task_routine( self, _stop_token: StopToken, last_ts: u64, @@ -38,23 +38,4 @@ impl RoutingTable { Ok(()) } - - // Kick the queued buckets in the routing table to free dead nodes if necessary - // Attempts to keep the size of the routing table down to the bucket depth - #[instrument(level = "trace", skip(self), err)] - pub(super) async fn kick_buckets_task_routine( - self, - _stop_token: StopToken, - _last_ts: u64, - cur_ts: u64, - ) -> EyreResult<()> { - let kick_queue: Vec = core::mem::take(&mut *self.unlocked_inner.kick_queue.lock()) - .into_iter() - .collect(); - let mut inner = self.inner.write(); - for idx in kick_queue { - inner.kick_bucket(idx) - } - Ok(()) - } } diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index b56b0277..c8988a4a 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -320,6 +320,15 @@ pub struct VeilidStateNetwork { pub peers: Vec, } +#[derive( + Debug, Clone, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize, +)] +#[archive_attr(repr(C), derive(CheckBytes))] +pub struct VeilidStateRoute { + pub dead_routes: Vec, + pub dead_remote_routes: Vec, +} + #[derive( Debug, Clone, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize, )] @@ -338,6 +347,7 @@ pub enum VeilidUpdate { Attachment(VeilidStateAttachment), Network(VeilidStateNetwork), Config(VeilidStateConfig), + Route(VeilidStateRoute), Shutdown, }