use crate::*; use connection_manager::*; use dht::*; use hashlink::LruCache; use intf::*; use receipt_manager::*; use routing_table::*; use rpc_processor::*; use xx::*; //////////////////////////////////////////////////////////////////////////////////////// pub const RELAY_MANAGEMENT_INTERVAL_SECS: u32 = 1; pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE; pub const IPADDR_TABLE_SIZE: usize = 1024; pub const IPADDR_MAX_INACTIVE_DURATION_US: u64 = 300_000_000u64; // 5 minutes #[derive(Copy, Clone, Debug, Default)] pub struct ProtocolConfig { pub outbound: ProtocolSet, pub inbound: ProtocolSet, } // Things we get when we start up and go away when we shut down // Routing table is not in here because we want it to survive a network shutdown/startup restart #[derive(Clone)] struct NetworkComponents { net: Network, connection_manager: ConnectionManager, rpc_processor: RPCProcessor, receipt_manager: ReceiptManager, } // Statistics per address #[derive(Clone, Default)] pub struct PerAddressStats { last_seen_ts: u64, transfer_stats_accounting: TransferStatsAccounting, transfer_stats: TransferStatsDownUp, } #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] pub struct PerAddressStatsKey(IpAddr); impl Default for PerAddressStatsKey { fn default() -> Self { Self(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) } } // Statistics about the low-level network #[derive(Clone)] pub struct NetworkManagerStats { self_stats: PerAddressStats, per_address_stats: LruCache, } impl Default for NetworkManagerStats { fn default() -> Self { Self { self_stats: PerAddressStats::default(), per_address_stats: LruCache::new(IPADDR_TABLE_SIZE), } } } struct ClientWhitelistEntry { last_seen: u64, } // Mechanism required to contact another node enum ContactMethod { Unreachable, // Node is not reachable by any means Direct(DialInfo), // Contact the node directly SignalReverse(NodeRef), // Request via signal the node connect back directly SignalHolePunch(NodeRef), // Request via signal the node negotiate a hole punch InboundRelay(NodeRef), // Must use an inbound relay to reach the node OutboundRelay(NodeRef), // Must use outbound relay to reach the node } // The mutable state of the network manager struct NetworkManagerInner { routing_table: Option, components: Option, network_class: Option, stats: NetworkManagerStats, client_whitelist: LruCache, relay_node: Option, } struct NetworkManagerUnlockedInner { // Background processes rolling_transfers_task: TickTask, relay_management_task: TickTask, } #[derive(Clone)] pub struct NetworkManager { config: VeilidConfig, table_store: TableStore, crypto: Crypto, inner: Arc>, unlocked_inner: Arc, } impl NetworkManager { fn new_inner() -> NetworkManagerInner { NetworkManagerInner { routing_table: None, components: None, network_class: None, stats: NetworkManagerStats::default(), client_whitelist: LruCache::new_unbounded(), relay_node: None, } } fn new_unlocked_inner(_config: VeilidConfig) -> NetworkManagerUnlockedInner { //let c = config.get(); NetworkManagerUnlockedInner { rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS), relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS), } } pub fn new(config: VeilidConfig, table_store: TableStore, crypto: Crypto) -> Self { let this = Self { config: config.clone(), table_store, crypto, inner: Arc::new(Mutex::new(Self::new_inner())), unlocked_inner: Arc::new(Self::new_unlocked_inner(config)), }; // Set rolling transfers tick task { let this2 = this.clone(); this.unlocked_inner .rolling_transfers_task .set_routine(move |l, t| { Box::pin(this2.clone().rolling_transfers_task_routine(l, t)) }); } // Set relay management tick task { let this2 = this.clone(); this.unlocked_inner .relay_management_task .set_routine(move |l, t| { Box::pin(this2.clone().relay_management_task_routine(l, t)) }); } this } pub fn config(&self) -> VeilidConfig { self.config.clone() } pub fn table_store(&self) -> TableStore { self.table_store.clone() } pub fn crypto(&self) -> Crypto { self.crypto.clone() } pub fn routing_table(&self) -> RoutingTable { self.inner.lock().routing_table.as_ref().unwrap().clone() } pub fn net(&self) -> Network { self.inner.lock().components.as_ref().unwrap().net.clone() } pub fn rpc_processor(&self) -> RPCProcessor { self.inner .lock() .components .as_ref() .unwrap() .rpc_processor .clone() } pub fn receipt_manager(&self) -> ReceiptManager { self.inner .lock() .components .as_ref() .unwrap() .receipt_manager .clone() } pub fn connection_manager(&self) -> ConnectionManager { self.inner .lock() .components .as_ref() .unwrap() .connection_manager .clone() } pub fn relay_node(&self) -> Option { self.inner.lock().relay_node.clone() } pub async fn init(&self) -> Result<(), String> { let routing_table = RoutingTable::new(self.clone()); routing_table.init().await?; self.inner.lock().routing_table = Some(routing_table.clone()); Ok(()) } pub async fn terminate(&self) { let routing_table = { let mut inner = self.inner.lock(); inner.routing_table.take() }; if let Some(routing_table) = routing_table { routing_table.terminate().await; } } pub async fn internal_startup(&self) -> Result<(), String> { trace!("NetworkManager::internal_startup begin"); if self.inner.lock().components.is_some() { debug!("NetworkManager::internal_startup already started"); return Ok(()); } // Create network components let net = Network::new(self.clone()); let connection_manager = ConnectionManager::new(self.clone()); let rpc_processor = RPCProcessor::new(self.clone()); let receipt_manager = ReceiptManager::new(self.clone()); self.inner.lock().components = Some(NetworkComponents { net: net.clone(), connection_manager: connection_manager.clone(), rpc_processor: rpc_processor.clone(), receipt_manager: receipt_manager.clone(), }); // Start network components rpc_processor.startup().await?; receipt_manager.startup().await?; net.startup().await?; connection_manager.startup().await; trace!("NetworkManager::internal_startup end"); Ok(()) } pub async fn startup(&self) -> Result<(), String> { if let Err(e) = self.internal_startup().await { self.shutdown().await; return Err(e); } Ok(()) } pub async fn shutdown(&self) { trace!("NetworkManager::shutdown begin"); // Shutdown network components if they started up let components = self.inner.lock().components.clone(); if let Some(components) = components { components.connection_manager.shutdown().await; components.net.shutdown().await; components.receipt_manager.shutdown().await; components.rpc_processor.shutdown().await; } // reset the state let mut inner = self.inner.lock(); inner.components = None; inner.network_class = None; trace!("NetworkManager::shutdown end"); } pub fn update_client_whitelist(&self, client: key::DHTKey) { let mut inner = self.inner.lock(); match inner.client_whitelist.entry(client) { hashlink::lru_cache::Entry::Occupied(mut entry) => { entry.get_mut().last_seen = intf::get_timestamp() } hashlink::lru_cache::Entry::Vacant(entry) => { entry.insert(ClientWhitelistEntry { last_seen: intf::get_timestamp(), }); } } } pub fn check_client_whitelist(&self, client: key::DHTKey) -> bool { let mut inner = self.inner.lock(); match inner.client_whitelist.entry(client) { hashlink::lru_cache::Entry::Occupied(mut entry) => { entry.get_mut().last_seen = intf::get_timestamp(); true } hashlink::lru_cache::Entry::Vacant(_) => false, } } pub fn purge_client_whitelist(&self) { let timeout_ms = self.config.get().network.client_whitelist_timeout_ms; let mut inner = self.inner.lock(); let cutoff_timestamp = intf::get_timestamp() - ((timeout_ms as u64) * 1000u64); // Remove clients from the whitelist that haven't been since since our whitelist timeout while inner .client_whitelist .peek_lru() .map(|v| v.1.last_seen < cutoff_timestamp) .unwrap_or_default() { inner.client_whitelist.remove_lru(); } } pub async fn tick(&self) -> Result<(), String> { let (routing_table, net, receipt_manager) = { let inner = self.inner.lock(); let components = inner.components.as_ref().unwrap(); ( inner.routing_table.as_ref().unwrap().clone(), components.net.clone(), components.receipt_manager.clone(), ) }; // If the network needs to be reset, do it // if things can't restart, then we fail out of the attachment manager if net.needs_restart() { net.shutdown().await; net.startup().await?; } // Run the routing table tick routing_table.tick().await?; // Run the low level network tick net.tick().await?; // Run the receipt manager tick receipt_manager.tick().await?; // Purge the client whitelist self.purge_client_whitelist(); Ok(()) } // Return what network class we are in pub fn get_network_class(&self) -> Option { if let Some(components) = &self.inner.lock().components { components.net.get_network_class() } else { None } } // Get our node's capabilities pub fn generate_node_status(&self) -> NodeStatus { let network_class = self.get_network_class().unwrap_or(NetworkClass::Invalid); let will_route = network_class.can_inbound_relay(); // xxx: eventually this may have more criteria added let will_tunnel = network_class.can_inbound_relay(); // xxx: we may want to restrict by battery life and network bandwidth at some point let will_signal = network_class.can_signal(); let will_relay = network_class.can_inbound_relay(); let will_validate_dial_info = network_class.can_validate_dial_info(); NodeStatus { will_route, will_tunnel, will_signal, will_relay, will_validate_dial_info, } } // Return what protocols we have enabled pub fn get_protocol_config(&self) -> Option { if let Some(components) = &self.inner.lock().components { components.net.get_protocol_config() } else { None } } // Generates an out-of-band receipt pub fn generate_receipt>( &self, expiration_us: u64, expected_returns: u32, extra_data: D, callback: impl ReceiptCallback, ) -> Result, String> { let receipt_manager = self.receipt_manager(); let routing_table = self.routing_table(); // Generate receipt and serialized form to return let nonce = Crypto::get_random_nonce(); let receipt = Receipt::try_new(0, nonce, routing_table.node_id(), extra_data)?; let out = receipt .to_signed_data(&routing_table.node_id_secret()) .map_err(|_| "failed to generate signed receipt".to_owned())?; // Record the receipt for later let exp_ts = intf::get_timestamp() + expiration_us; receipt_manager.record_receipt(receipt, exp_ts, expected_returns, callback); Ok(out) } pub fn generate_single_shot_receipt>( &self, expiration_us: u64, extra_data: D, ) -> Result<(Vec, EventualValueCloneFuture), String> { let receipt_manager = self.receipt_manager(); let routing_table = self.routing_table(); // Generate receipt and serialized form to return let nonce = Crypto::get_random_nonce(); let receipt = Receipt::try_new(0, nonce, routing_table.node_id(), extra_data)?; let out = receipt .to_signed_data(&routing_table.node_id_secret()) .map_err(|_| "failed to generate signed receipt".to_owned())?; // Record the receipt for later let exp_ts = intf::get_timestamp() + expiration_us; let eventual = SingleShotEventual::new(ReceiptEvent::Cancelled); let instance = eventual.instance(); receipt_manager.record_single_shot_receipt(receipt, exp_ts, eventual); Ok((out, instance)) } // Process a received out-of-band receipt pub async fn handle_out_of_band_receipt>( &self, receipt_data: R, descriptor: ConnectionDescriptor, ) -> Result<(), String> { let routing_table = self.routing_table(); let receipt_manager = self.receipt_manager(); let ts = intf::get_timestamp(); let receipt = Receipt::from_signed_data(receipt_data.as_ref()) .map_err(|_| "failed to parse signed receipt".to_owned())?; // Cache the receipt information in the routing table let source_noderef = routing_table .register_node_with_existing_connection(receipt.get_sender_id(), descriptor, ts) .map_err(|e| format!("node id registration from receipt failed: {}", e))?; receipt_manager .handle_receipt(source_noderef, receipt) .await } // Process a received in-band receipt pub async fn handle_in_band_receipt>( &self, receipt_data: R, inbound_nr: NodeRef, ) -> Result<(), String> { let receipt_manager = self.receipt_manager(); let receipt = Receipt::from_signed_data(receipt_data.as_ref()) .map_err(|_| "failed to parse signed receipt".to_owned())?; receipt_manager.handle_receipt(inbound_nr, receipt).await } // Process a received signal pub async fn handle_signal(&self, signal_info: SignalInfo) -> Result<(), String> { match signal_info { SignalInfo::ReverseConnect { receipt, peer_info } => { let routing_table = self.routing_table(); let rpc = self.rpc_processor(); // Add the peer info to our routing table let peer_nr = routing_table .register_node_with_node_info(peer_info.node_id.key, peer_info.node_info)?; // Make a reverse connection to the peer and send the receipt to it rpc.rpc_call_return_receipt(Destination::Direct(peer_nr), None, receipt) .await .map_err(map_to_string)?; } SignalInfo::HolePunch { receipt, peer_info } => { let routing_table = self.routing_table(); // Add the peer info to our routing table let peer_nr = routing_table .register_node_with_node_info(peer_info.node_id.key, peer_info.node_info)?; // Get the udp direct dialinfo for the hole punch let hole_punch_dial_info = if let Some(hpdi) = peer_nr .node_info() .first_filtered_dial_info(|di| matches!(di.protocol_type(), ProtocolType::UDP)) { hpdi } else { return Err("No hole punch capable dialinfo found for node".to_owned()); }; // Do our half of the hole punch by sending an empty packet // Both sides will do this and then the receipt will get sent over the punched hole self.net() .send_data_to_dial_info(hole_punch_dial_info.clone(), Vec::new()) .await?; // XXX: do we need a delay here? or another hole punch packet? // Return the receipt over the direct channel since we want to use exactly the same dial info self.send_direct_receipt(hole_punch_dial_info, receipt, false) .await .map_err(map_to_string)?; } } Ok(()) } // Builds an envelope for sending over the network fn build_envelope>( &self, dest_node_id: key::DHTKey, version: u8, body: B, ) -> Result, String> { // DH to get encryption key let routing_table = self.routing_table(); let node_id = routing_table.node_id(); let node_id_secret = routing_table.node_id_secret(); // Get timestamp, nonce let ts = intf::get_timestamp(); let nonce = Crypto::get_random_nonce(); // Encode envelope let envelope = Envelope::new(version, ts, nonce, node_id, dest_node_id); envelope .to_encrypted_data(self.crypto.clone(), body.as_ref(), &node_id_secret) .map_err(|_| "envelope failed to encode".to_owned()) } // Called by the RPC handler when we want to issue an RPC request or response // node_ref is the direct destination to which the envelope will be sent // If 'node_id' is specified, it can be different than node_ref.node_id() // which will cause the envelope to be relayed pub async fn send_envelope>( &self, node_ref: NodeRef, node_id: Option, body: B, ) -> Result<(), String> { if let Some(node_id) = node_id { log_net!("sending envelope to {:?} via {:?}", node_id, node_ref); } else { log_net!("sending envelope to {:?}", node_ref); } // Get node's min/max version and see if we can send to it // and if so, get the max version we can use let version = if let Some((node_min, node_max)) = node_ref.operate(|e| e.min_max_version()) { #[allow(clippy::absurd_extreme_comparisons)] if node_min > MAX_VERSION || node_max < MIN_VERSION { return Err(format!( "can't talk to this node {} because version is unsupported: ({},{})", node_ref.node_id(), node_min, node_max )) .map_err(logthru_rpc!(warn)); } cmp::min(node_max, MAX_VERSION) } else { MAX_VERSION }; // Build the envelope to send let out = self .build_envelope(node_id.unwrap_or_else(|| node_ref.node_id()), version, body) .map_err(logthru_rpc!(error))?; // Send the envelope via whatever means necessary self.send_data(node_ref, out).await } // Called by the RPC handler when we want to issue an direct receipt pub async fn send_direct_receipt>( &self, dial_info: DialInfo, rcpt_data: B, alternate_port: bool, ) -> Result<(), String> { // Validate receipt before we send it, otherwise this may be arbitrary data! let _ = Receipt::from_signed_data(rcpt_data.as_ref()) .map_err(|_| "failed to validate direct receipt".to_owned())?; // Send receipt directly if alternate_port { self.net() .send_data_unbound_to_dial_info(dial_info, rcpt_data.as_ref().to_vec()) .await } else { self.net() .send_data_to_dial_info(dial_info, rcpt_data.as_ref().to_vec()) .await } } // Figure out how to reach a node fn get_contact_method(&self, node_ref: &NodeRef) -> Result { // Get our network class and protocol config let our_network_class = self.get_network_class().unwrap_or(NetworkClass::Invalid); let our_protocol_config = self.get_protocol_config().unwrap(); // See if this is a local node reachable directly let local_node_info = node_ref.local_node_info(); if let Some(local_direct_dial_info) = local_node_info .first_filtered_dial_info(|di| our_protocol_config.outbound.filter_dial_info(di)) { return Ok(ContactMethod::Direct(local_direct_dial_info)); } // Get the best matching direct dial info if we have it let target_node_info = node_ref.node_info(); let opt_direct_dial_info = target_node_info .first_filtered_dial_info(|di| our_protocol_config.outbound.filter_dial_info(di)); // Can the target node do inbound? if target_node_info.network_class.inbound_capable() { // Do we need to signal before going inbound? if target_node_info.network_class.inbound_requires_signal() { // Get the target's inbound relay, it must have one or it is not reachable if let Some(target_rpi) = target_node_info.relay_peer_info { // Can we reach the inbound relay? if target_rpi .node_info .first_filtered_dial_info(|di| { our_protocol_config.outbound.filter_dial_info(di) }) .is_some() { let target_inbound_relay_nr = self.routing_table().register_node_with_node_info( target_rpi.node_id.key, target_rpi.node_info, )?; // Can we receive anything inbound ever? if our_network_class.inbound_capable() { // Can we receive a direct reverse connection? if !our_network_class.inbound_requires_signal() { return Ok(ContactMethod::SignalReverse(target_inbound_relay_nr)); } // Can we hole-punch? else if our_protocol_config.inbound.udp && target_node_info.outbound_protocols.udp { return Ok(ContactMethod::SignalHolePunch(target_inbound_relay_nr)); } // Otherwise we have to inbound relay } return Ok(ContactMethod::InboundRelay(target_inbound_relay_nr)); } } } // Go direct without signaling else { // If we have direct dial info we can use, do it if let Some(ddi) = opt_direct_dial_info { return Ok(ContactMethod::Direct(ddi)); } } } else { // If the other node is not inbound capable at all, it is using a full relay if let Some(target_rpi) = target_node_info.relay_peer_info { // Can we reach the full relay? if target_rpi .node_info .first_filtered_dial_info(|di| { our_protocol_config.outbound.filter_dial_info(di) }) .is_some() { let target_inbound_relay_nr = self.routing_table().register_node_with_node_info( target_rpi.node_id.key, target_rpi.node_info, )?; return Ok(ContactMethod::InboundRelay(target_inbound_relay_nr)); } } } // If we can't reach the node by other means, try our outbound relay if we have one if let Some(relay_node) = self.relay_node() { return Ok(ContactMethod::OutboundRelay(relay_node)); } // Otherwise, we can't reach this node Ok(ContactMethod::Unreachable) } // Send a reverse connection signal and wait for the return receipt over it // Then send the data across the new connection pub async fn do_reverse_connect( &self, relay_nr: NodeRef, target_nr: NodeRef, data: Vec, ) -> Result<(), String> { // Build a return receipt for the signal let receipt_timeout = ms_to_us(self.config.get().network.reverse_connection_receipt_time_ms); let (receipt, eventual_value) = self .generate_single_shot_receipt(receipt_timeout, []) .map_err(map_to_string)?; // Get our peer info let peer_info = self.routing_table().get_own_peer_info(); // Issue the signal let rpc = self.rpc_processor(); rpc.rpc_call_signal( Destination::Relay(relay_nr.clone(), target_nr.node_id()), None, SignalInfo::ReverseConnect { receipt, peer_info }, ) .await .map_err(logthru_net!("failed to send signal to {:?}", relay_nr)) .map_err(map_to_string)?; // Wait for the return receipt let inbound_nr = match eventual_value.await { ReceiptEvent::Returned(inbound_nr) => inbound_nr, ReceiptEvent::Expired => { return Err(format!( "reverse connect receipt expired from {:?}", target_nr )); } ReceiptEvent::Cancelled => { return Err(format!( "reverse connect receipt cancelled from {:?}", target_nr )); } }; // We expect the inbound noderef to be the same as the target noderef // if they aren't the same, we should error on this and figure out what then hell is up if target_nr != inbound_nr { error!("unexpected noderef mismatch on reverse connect"); } // And now use the existing connection to send over if let Some(descriptor) = inbound_nr.last_connection().await { match self .net() .send_data_to_existing_connection(descriptor, data) .await .map_err(logthru_net!())? { None => Ok(()), Some(_) => Err("unable to send over reverse connection".to_owned()), } } else { Err("no reverse connection available".to_owned()) } } // Send a hole punch signal and do a negotiating ping and wait for the return receipt // Then send the data across the new connection pub async fn do_hole_punch( &self, relay_nr: NodeRef, target_nr: NodeRef, data: Vec, ) -> Result<(), String> { // Build a return receipt for the signal let receipt_timeout = ms_to_us(self.config.get().network.reverse_connection_receipt_time_ms); let (receipt, eventual_value) = self .generate_single_shot_receipt(receipt_timeout, []) .map_err(map_to_string)?; // Get our peer info let peer_info = self.routing_table().get_own_peer_info(); // Get the udp direct dialinfo for the hole punch let hole_punch_dial_info = if let Some(hpdi) = target_nr .node_info() .first_filtered_dial_info(|di| matches!(di.protocol_type(), ProtocolType::UDP)) { hpdi } else { return Err("No hole punch capable dialinfo found for node".to_owned()); }; // Do our half of the hole punch by sending an empty packet // Both sides will do this and then the receipt will get sent over the punched hole self.net() .send_data_to_dial_info(hole_punch_dial_info, Vec::new()) .await?; // Issue the signal let rpc = self.rpc_processor(); rpc.rpc_call_signal( Destination::Relay(relay_nr.clone(), target_nr.node_id()), None, SignalInfo::HolePunch { receipt, peer_info }, ) .await .map_err(logthru_net!("failed to send signal to {:?}", relay_nr)) .map_err(map_to_string)?; // Wait for the return receipt let inbound_nr = match eventual_value.await { ReceiptEvent::Returned(inbound_nr) => inbound_nr, ReceiptEvent::Expired => { return Err(format!( "reverse connect receipt expired from {:?}", target_nr )); } ReceiptEvent::Cancelled => { return Err(format!( "reverse connect receipt cancelled from {:?}", target_nr )); } }; // We expect the inbound noderef to be the same as the target noderef // if they aren't the same, we should error on this and figure out what then hell is up if target_nr != inbound_nr { error!("unexpected noderef mismatch on reverse connect"); } // And now use the existing connection to send over if let Some(descriptor) = inbound_nr.last_connection().await { match self .net() .send_data_to_existing_connection(descriptor, data) .await .map_err(logthru_net!())? { None => Ok(()), Some(_) => Err("unable to send over reverse connection".to_owned()), } } else { Err("no reverse connection available".to_owned()) } } // Send raw data to a node // // We may not have dial info for a node, but have an existing connection for it // because an inbound connection happened first, and no FindNodeQ has happened to that // node yet to discover its dial info. The existing connection should be tried first // in this case. // // Sending to a node requires determining a NetworkClass compatible mechanism // pub fn send_data( &self, node_ref: NodeRef, data: Vec, ) -> SystemPinBoxFuture> { let this = self.clone(); Box::pin(async move { // First try to send data to the last socket we've seen this peer on let data = if let Some(descriptor) = node_ref.last_connection().await { match this .net() .send_data_to_existing_connection(descriptor, data) .await .map_err(logthru_net!())? { None => { return Ok(()); } Some(d) => d, } } else { data }; // If we don't have last_connection, try to reach out to the peer via its dial info match this.get_contact_method(&node_ref).map_err(logthru_net!())? { ContactMethod::OutboundRelay(relay_nr) | ContactMethod::InboundRelay(relay_nr) => { this.send_data(relay_nr, data).await } ContactMethod::Direct(dial_info) => { this.net().send_data_to_dial_info(dial_info, data).await } ContactMethod::SignalReverse(relay_nr) => { this.do_reverse_connect(relay_nr, node_ref, data).await } ContactMethod::SignalHolePunch(relay_nr) => { this.do_hole_punch(relay_nr, node_ref, data).await } ContactMethod::Unreachable => Err("Can't send to this relay".to_owned()), } .map_err(logthru_net!()) }) } // Called when a packet potentially containing an RPC envelope is received by a low-level // network protocol handler. Processes the envelope, authenticates and decrypts the RPC message // and passes it to the RPC handler pub async fn on_recv_envelope( &self, data: &[u8], descriptor: ConnectionDescriptor, ) -> Result { log_net!( "envelope of {} bytes received from {:?}", data.len(), descriptor ); // Network accounting self.stats_packet_rcvd(descriptor.remote.to_socket_addr().ip(), data.len() as u64); // Is this an out-of-band receipt instead of an envelope? if data[0..4] == *RECEIPT_MAGIC { self.handle_out_of_band_receipt(data, descriptor).await?; return Ok(true); } // Decode envelope header (may fail signature validation) let envelope = Envelope::from_signed_data(data).map_err(|_| "envelope failed to decode".to_owned())?; // Get routing table and rpc processor let (routing_table, rpc) = { let inner = self.inner.lock(); ( inner.routing_table.as_ref().unwrap().clone(), inner.components.as_ref().unwrap().rpc_processor.clone(), ) }; // Get timestamp range let (tsbehind, tsahead) = { let c = self.config.get(); ( c.network.rpc.max_timestamp_behind_ms.map(ms_to_us), c.network.rpc.max_timestamp_ahead_ms.map(ms_to_us), ) }; // Validate timestamp isn't too old let ts = intf::get_timestamp(); let ets = envelope.get_timestamp(); if let Some(tsbehind) = tsbehind { if tsbehind > 0 && (ts > ets && ts - ets > tsbehind) { return Err(format!( "envelope time was too far in the past: {}ms ", timestamp_to_secs(ts - ets) * 1000f64 )); } } if let Some(tsahead) = tsahead { if tsahead > 0 && (ts < ets && ets - ts > tsahead) { return Err(format!( "envelope time was too far in the future: {}ms", timestamp_to_secs(ets - ts) * 1000f64 )); } } // Peek at header and see if we need to relay this // If the recipient id is not our node id, then it needs relaying let sender_id = envelope.get_sender_id(); let recipient_id = envelope.get_recipient_id(); if recipient_id != routing_table.node_id() { // See if the source node is allowed to resolve nodes // This is a costly operation, so only outbound-relay permitted // nodes are allowed to do this, for example PWA users let relay_nr = if self.check_client_whitelist(sender_id) { // Full relay allowed, do a full resolve_node rpc.resolve_node(recipient_id).await.map_err(|e| { format!( "failed to resolve recipient node for relay, dropping outbound relayed packet...: {:?}", e ) }).map_err(logthru_net!())? } else { // If this is not a node in the client whitelist, only allow inbound relay // which only performs a lightweight lookup before passing the packet back out // See if we have the node in our routing table // We should, because relays are chosen by nodes that have established connectivity and // should be mutually in each others routing tables. The node needing the relay will be // pinging this node regularly to keep itself in the routing table routing_table.lookup_node_ref(recipient_id).ok_or_else(|| { format!( "Inbound relay asked for recipient not in routing table: {}", recipient_id ) })? }; // Relay the packet to the desired destination self.send_data(relay_nr, data.to_vec()) .await .map_err(|e| format!("failed to forward envelope: {}", e))?; // Inform caller that we dealt with the envelope, but did not process it locally return Ok(false); } // DH to get decryption key (cached) let node_id_secret = routing_table.node_id_secret(); // Decrypt the envelope body // xxx: punish nodes that send messages that fail to decrypt eventually let body = envelope .decrypt_body(self.crypto(), data, &node_id_secret) .map_err(|_| "failed to decrypt envelope body".to_owned())?; // Cache the envelope information in the routing table let source_noderef = routing_table .register_node_with_existing_connection(envelope.get_sender_id(), descriptor, ts) .map_err(|e| format!("node id registration failed: {}", e))?; source_noderef.operate(|e| e.set_min_max_version(envelope.get_min_max_version())); // xxx: deal with spoofing and flooding here? // Pass message to RPC system rpc.enqueue_message(envelope, body, source_noderef) .map_err(|e| format!("enqueing rpc message failed: {}", e))?; // Inform caller that we dealt with the envelope locally Ok(true) } // Keep relays assigned and accessible async fn relay_management_task_routine(self, _last_ts: u64, cur_ts: u64) -> Result<(), String> { log_net!("--- network manager relay_management task"); // Get our node's current network class and do the right thing let network_class = self.get_network_class(); // Do we know our network class yet? if let Some(network_class) = network_class { let routing_table = self.routing_table(); // If we already have a relay, see if it is dead, or if we don't need it any more { let mut inner = self.inner.lock(); if let Some(relay_node) = inner.relay_node.clone() { let state = relay_node.operate(|e| e.state(cur_ts)); if matches!(state, BucketEntryState::Dead) || !network_class.needs_relay() { // Relay node is dead or no longer needed inner.relay_node = None; } } } // Do we need an outbound relay? if network_class.outbound_wants_relay() { // The outbound relay is the host of the PWA if let Some(outbound_relay_peerinfo) = intf::get_outbound_relay_peer().await { let mut inner = self.inner.lock(); // Register new outbound relay let nr = routing_table.register_node_with_node_info( outbound_relay_peerinfo.node_id.key, outbound_relay_peerinfo.node_info, )?; inner.relay_node = Some(nr); } } else if network_class.needs_relay() { // Find a node in our routing table that is an acceptable inbound relay if let Some(nr) = routing_table.find_inbound_relay(cur_ts) { let mut inner = self.inner.lock(); inner.relay_node = Some(nr); } } } else { // If we don't know our network class, we do nothing here and wait until we do } Ok(()) } // Compute transfer statistics for the low level network async fn rolling_transfers_task_routine(self, last_ts: u64, cur_ts: u64) -> Result<(), String> { log_net!("--- network manager rolling_transfers task"); let inner = &mut *self.inner.lock(); // Roll the low level network transfer stats for our address inner .stats .self_stats .transfer_stats_accounting .roll_transfers(last_ts, cur_ts, &mut inner.stats.self_stats.transfer_stats); // Roll all per-address transfers let mut dead_addrs: HashSet = HashSet::new(); for (addr, stats) in &mut inner.stats.per_address_stats { stats.transfer_stats_accounting.roll_transfers( last_ts, cur_ts, &mut stats.transfer_stats, ); // While we're here, lets see if this address has timed out if cur_ts - stats.last_seen_ts >= IPADDR_MAX_INACTIVE_DURATION_US { // it's dead, put it in the dead list dead_addrs.insert(*addr); } } // Remove the dead addresses from our tables for da in &dead_addrs { inner.stats.per_address_stats.remove(da); } Ok(()) } // Callbacks from low level network for statistics gathering pub fn stats_packet_sent(&self, addr: IpAddr, bytes: u64) { let inner = &mut *self.inner.lock(); inner .stats .self_stats .transfer_stats_accounting .add_up(bytes); inner .stats .per_address_stats .entry(PerAddressStatsKey(addr)) .or_insert(PerAddressStats::default()) .transfer_stats_accounting .add_up(bytes); } pub fn stats_packet_rcvd(&self, addr: IpAddr, bytes: u64) { let inner = &mut *self.inner.lock(); inner .stats .self_stats .transfer_stats_accounting .add_down(bytes); inner .stats .per_address_stats .entry(PerAddressStatsKey(addr)) .or_insert(PerAddressStats::default()) .transfer_stats_accounting .add_down(bytes); } }