diff --git a/Cargo.lock b/Cargo.lock index 1668dc55..ab494019 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -875,24 +875,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ebf286c900a6d5867aeff75cfee3192857bb7f24b547d4f0df2ed6baa812c90" dependencies = [ "backtrace", - "color-spantrace", "eyre", "indenter", "once_cell", "owo-colors", - "tracing-error", -] - -[[package]] -name = "color-spantrace" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ba75b3d9449ecdccb27ecbc479fdc0b87fa2dd43d2f8298f9bf0e59aacc8dce" -dependencies = [ - "once_cell", - "owo-colors", - "tracing-core", - "tracing-error", ] [[package]] diff --git a/veilid-core/src/dht/crypto.rs b/veilid-core/src/dht/crypto.rs index dd681f5e..7680a47b 100644 --- a/veilid-core/src/dht/crypto.rs +++ b/veilid-core/src/dht/crypto.rs @@ -64,7 +64,7 @@ struct CryptoInner { node_id: DHTKey, node_id_secret: DHTKeySecret, dh_cache: DHCache, - flush_future: Option>, + flush_future: Option>, } #[derive(Clone)] diff --git a/veilid-core/src/intf/native/system.rs b/veilid-core/src/intf/native/system.rs index 2963d4db..c2314761 100644 --- a/veilid-core/src/intf/native/system.rs +++ b/veilid-core/src/intf/native/system.rs @@ -53,7 +53,7 @@ pub async fn sleep(millis: u32) { pub fn system_boxed<'a, Out>( future: impl Future + Send + 'a, -) -> SystemPinBoxFutureLifetime<'a, Out> { +) -> SendPinBoxFutureLifetime<'a, Out> { Box::pin(future) } @@ -117,7 +117,7 @@ where } } -pub fn interval(freq_ms: u32, callback: F) -> SystemPinBoxFuture<()> +pub fn interval(freq_ms: u32, callback: F) -> SendPinBoxFuture<()> where F: Fn() -> FUT + Send + Sync + 'static, FUT: Future + Send, diff --git a/veilid-core/src/intf/native/utils/network_interfaces/mod.rs b/veilid-core/src/intf/native/utils/network_interfaces/mod.rs index 34cbd6e6..ed4633db 100644 --- a/veilid-core/src/intf/native/utils/network_interfaces/mod.rs +++ b/veilid-core/src/intf/native/utils/network_interfaces/mod.rs @@ -290,22 +290,27 @@ impl NetworkInterface { } } -#[derive(PartialEq, Eq, Clone)] -pub struct NetworkInterfaces { +pub struct NetworkInterfacesInner { valid: bool, interfaces: BTreeMap, interface_address_cache: Vec, } +#[derive(Clone)] +pub struct NetworkInterfaces { + inner: Arc>, +} + impl fmt::Debug for NetworkInterfaces { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let inner = self.inner.lock(); f.debug_struct("NetworkInterfaces") - .field("valid", &self.valid) - .field("interfaces", &self.interfaces) + .field("valid", &inner.valid) + .field("interfaces", &inner.interfaces) .finish()?; if f.alternate() { writeln!(f)?; - writeln!(f, "// best_addresses: {:?}", self.best_addresses())?; + writeln!(f, "// best_addresses: {:?}", inner.interface_address_cache)?; } Ok(()) } @@ -315,54 +320,78 @@ impl fmt::Debug for NetworkInterfaces { impl NetworkInterfaces { pub fn new() -> Self { Self { - valid: false, - interfaces: BTreeMap::new(), - interface_address_cache: Vec::new(), + inner: Arc::new(Mutex::new(NetworkInterfacesInner { + valid: false, + interfaces: BTreeMap::new(), + interface_address_cache: Vec::new(), + })), } } + pub fn is_valid(&self) -> bool { - self.valid + let inner = self.inner.lock(); + inner.valid } - pub fn clear(&mut self) { - self.interfaces.clear(); - self.interface_address_cache.clear(); - self.valid = false; + pub fn clear(&self) { + let mut inner = self.inner.lock(); + + inner.interfaces.clear(); + inner.interface_address_cache.clear(); + inner.valid = false; } // returns Ok(false) if refresh had no changes, Ok(true) if changes were present - pub async fn refresh(&mut self) -> EyreResult { - self.valid = false; - let last_interfaces = core::mem::take(&mut self.interfaces); + pub async fn refresh(&self) -> EyreResult { + let mut last_interfaces = { + let mut last_interfaces = BTreeMap::::new(); + let mut platform_support = PlatformSupport::new()?; + platform_support + .get_interfaces(&mut last_interfaces) + .await?; + last_interfaces + }; - let mut platform_support = PlatformSupport::new()?; - platform_support - .get_interfaces(&mut self.interfaces) - .await?; + let mut inner = self.inner.lock(); + core::mem::swap(&mut inner.interfaces, &mut last_interfaces); + inner.valid = true; - self.valid = true; - - let changed = last_interfaces != self.interfaces; + let changed = last_interfaces != inner.interfaces; if changed { - self.cache_best_addresses(); + Self::cache_best_addresses(&mut *inner); - //trace!("NetworkInterfaces refreshed: {:#?}?", self); trace!( "NetworkInterfaces refreshed: {:#?}?", - self.interface_address_cache + inner.interface_address_cache ); } Ok(changed) } - pub fn len(&self) -> usize { - self.interfaces.len() - } - pub fn iter(&self) -> std::collections::btree_map::Iter { - self.interfaces.iter() + pub fn with_interfaces(&self, f: F) -> R + where + F: FnOnce(&BTreeMap) -> R, + { + let inner = self.inner.lock(); + f(&inner.interfaces) } - fn cache_best_addresses(&mut self) { + pub fn best_addresses(&self) -> Vec { + let inner = self.inner.lock(); + inner.interface_address_cache.clone() + } + + pub fn with_best_addresses(&self, f: F) -> R + where + F: FnOnce(&[IpAddr]) -> R, + { + let inner = self.inner.lock(); + f(&inner.interface_address_cache) + } + + ///////////////////////////////////////////// + + fn cache_best_addresses(inner: &mut NetworkInterfacesInner) { // Reduce interfaces to their best routable ip addresses let mut intf_addrs = Vec::new(); - for intf in self.interfaces.values() { + for intf in inner.interfaces.values() { if !intf.is_running() || !intf.has_default_route() || intf.is_loopback() { continue; } @@ -378,17 +407,6 @@ impl NetworkInterfaces { intf_addrs.sort(); // Now export just the addresses - self.interface_address_cache = intf_addrs.iter().map(|x| x.if_addr().ip()).collect() - } - - pub fn best_addresses(&self) -> Vec { - self.interface_address_cache.clone() - } - - pub fn with_best_addresses(&self, f: F) -> R - where - F: FnOnce(&[IpAddr]) -> R, - { - f(&self.interface_address_cache) + inner.interface_address_cache = intf_addrs.iter().map(|x| x.if_addr().ip()).collect() } } diff --git a/veilid-core/src/intf/wasm/system.rs b/veilid-core/src/intf/wasm/system.rs index 021c94a5..f30f8a81 100644 --- a/veilid-core/src/intf/wasm/system.rs +++ b/veilid-core/src/intf/wasm/system.rs @@ -77,7 +77,7 @@ pub async fn sleep(millis: u32) { pub fn system_boxed<'a, Out>( future: impl Future + Send + 'a, -) -> SystemPinBoxFutureLifetime<'a, Out> { +) -> SendPinBoxFutureLifetime<'a, Out> { Box::pin(future) } @@ -118,7 +118,7 @@ where } -pub fn interval(freq_ms: u32, callback: F) -> SystemPinBoxFuture<()> +pub fn interval(freq_ms: u32, callback: F) -> SendPinBoxFuture<()> where F: Fn() -> FUT + Send + Sync + 'static, FUT: Future + Send, diff --git a/veilid-core/src/network_manager/connection_handle.rs b/veilid-core/src/network_manager/connection_handle.rs index 1b085a89..228f44a0 100644 --- a/veilid-core/src/network_manager/connection_handle.rs +++ b/veilid-core/src/network_manager/connection_handle.rs @@ -6,6 +6,12 @@ pub struct ConnectionHandle { channel: flume::Sender>, } +#[derive(Debug)] +pub enum ConnectionHandleSendResult { + Sent, + NotSent(Vec), +} + impl ConnectionHandle { pub(super) fn new(descriptor: ConnectionDescriptor, channel: flume::Sender>) -> Self { Self { @@ -18,16 +24,17 @@ impl ConnectionHandle { self.descriptor.clone() } - pub fn send(&self, message: Vec) -> EyreResult<()> { - self.channel - .send(message) - .wrap_err("failed to send to connection") + pub fn send(&self, message: Vec) -> ConnectionHandleSendResult { + match self.channel.send(message) { + Ok(()) => ConnectionHandleSendResult::Sent, + Err(e) => ConnectionHandleSendResult::NotSent(e.0), + } } - pub async fn send_async(&self, message: Vec) -> EyreResult<()> { - self.channel - .send_async(message) - .await - .wrap_err("failed to send_async to connection") + pub async fn send_async(&self, message: Vec) -> ConnectionHandleSendResult { + match self.channel.send_async(message).await { + Ok(()) => ConnectionHandleSendResult::Sent, + Err(e) => ConnectionHandleSendResult::NotSent(e.0), + } } } diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index 227012d7..2abf2d09 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -142,7 +142,7 @@ impl ConnectionManager { &self, inner: &mut ConnectionManagerInner, conn: ProtocolNetworkConnection, - ) -> EyreResult { + ) -> EyreResult> { log_net!("on_new_protocol_network_connection: {:?}", conn); // Wrap with NetworkConnection object to start the connection processing loop @@ -155,7 +155,7 @@ impl ConnectionManager { let handle = conn.get_handle(); // Add to the connection table inner.connection_table.add_connection(conn)?; - Ok(handle) + Ok(NetworkResult::Value(handle)) } // Called when we want to create a new connection or get the current one that already exists @@ -165,7 +165,7 @@ impl ConnectionManager { &self, local_addr: Option, dial_info: DialInfo, - ) -> EyreResult { + ) -> EyreResult> { let killed = { let mut inner = self.arc.inner.lock(); let inner = match &mut *inner { @@ -202,7 +202,7 @@ impl ConnectionManager { peer_address.green() ); - return Ok(conn); + return Ok(NetworkResult::Value(conn)); } // Drop any other protocols connections to this remote that have the same local addr @@ -254,20 +254,33 @@ impl ConnectionManager { k.await; } + // Get connection timeout + let timeout_ms = { + let config = self.network_manager().config(); + let c = config.get(); + c.network.connection_initial_timeout_ms + }; + // Attempt new connection - let conn = loop { - match ProtocolNetworkConnection::connect(local_addr, &dial_info).await { - Ok(v) => break Ok(v), + let conn = network_result_try!(loop { + let result_net_res = + ProtocolNetworkConnection::connect(local_addr, &dial_info, timeout_ms).await; + match result_net_res { + Ok(net_res) => { + if net_res.is_value() || retry_count == 0 { + break net_res; + } + } Err(e) => { if retry_count == 0 { - break Err(e); + return Err(e).wrap_err("failed to connect"); } - log_net!(debug "get_or_create_connection retries left: {}", retry_count); - retry_count -= 1; - intf::sleep(500).await; } - } - }?; + }; + log_net!(debug "get_or_create_connection retries left: {}", retry_count); + retry_count -= 1; + intf::sleep(500).await; + }); // Add to the connection table let mut inner = self.arc.inner.lock(); diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 839d8395..8ba0f07c 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -712,9 +712,20 @@ impl NetworkManager { // should not be subject to our ability to decode it // Send receipt directly - self.net() + match self + .net() .send_data_unbound_to_dial_info(dial_info, rcpt_data) - .await + .await? + { + NetworkResult::Timeout => { + log_net!(debug "Timeout sending out of band receipt"); + } + NetworkResult::NoConnection(e) => { + log_net!(debug "No connection sending out of band receipt: {}", e); + } + NetworkResult::Value(()) => {} + }; + Ok(()) } // Figure out how to reach a node @@ -987,7 +998,7 @@ impl NetworkManager { &self, node_ref: NodeRef, data: Vec, - ) -> SystemPinBoxFuture> { + ) -> SendPinBoxFuture> { let this = self.clone(); Box::pin(async move { // First try to send data to the last socket we've seen this peer on @@ -1084,8 +1095,8 @@ impl NetworkManager { .send_recv_data_unbound_to_dial_info(dial_info, data, timeout_ms) .await? { - TimeoutOr::Timeout => return Ok(Vec::new()), - TimeoutOr::Value(v) => v, + NetworkResult::Value(v) => v, + _ => return Ok(Vec::new()), }; let bootstrap_peerinfo: Vec = diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index c8bebf33..84887884 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -43,7 +43,6 @@ struct NetworkInner { tcp_port: u16, ws_port: u16, wss_port: u16, - interfaces: NetworkInterfaces, // udp bound_first_udp: BTreeMap>, inbound_udp_protocol_handlers: BTreeMap, @@ -60,8 +59,11 @@ struct NetworkUnlockedInner { routing_table: RoutingTable, network_manager: NetworkManager, connection_manager: ConnectionManager, + // Network + interfaces: NetworkInterfaces, // Background processes update_network_class_task: TickTask, + network_interfaces_task: TickTask, } #[derive(Clone)] @@ -85,7 +87,6 @@ impl Network { tcp_port: 0u16, ws_port: 0u16, wss_port: 0u16, - interfaces: NetworkInterfaces::new(), bound_first_udp: BTreeMap::new(), inbound_udp_protocol_handlers: BTreeMap::new(), outbound_udpv4_protocol_handler: None, @@ -105,7 +106,9 @@ impl Network { network_manager, routing_table, connection_manager, + interfaces: NetworkInterfaces::new(), update_network_class_task: TickTask::new(1), + network_interfaces_task: TickTask::new(5), } } @@ -133,6 +136,15 @@ impl Network { Box::pin(this2.clone().update_network_class_task_routine(s, l, t)) }); } + // Set network interfaces tick task + { + let this2 = this.clone(); + this.unlocked_inner + .network_interfaces_task + .set_routine(move |s, l, t| { + Box::pin(this2.clone().network_interfaces_task_routine(s, l, t)) + }); + } this } @@ -219,11 +231,11 @@ impl Network { inner.join_handles.push(jh); } - fn translate_unspecified_address(inner: &NetworkInner, from: &SocketAddr) -> Vec { + fn translate_unspecified_address(&self, from: &SocketAddr) -> Vec { if !from.ip().is_unspecified() { vec![*from] } else { - inner + self.unlocked_inner .interfaces .best_addresses() .iter() @@ -259,19 +271,17 @@ impl Network { where F: FnOnce(&[IpAddr]) -> R, { - let inner = self.inner.lock(); - inner.interfaces.with_best_addresses(f) + self.unlocked_inner.interfaces.with_best_addresses(f) } // See if our interface addresses have changed, if so we need to punt the network // and redo all our addresses. This is overkill, but anything more accurate // would require inspection of routing tables that we dont want to bother with - pub async fn check_interface_addresses(&self) -> EyreResult { - let mut inner = self.inner.lock(); - if !inner.interfaces.refresh().await? { + async fn check_interface_addresses(&self) -> EyreResult { + if !self.unlocked_inner.interfaces.refresh().await? { return Ok(false); } - inner.network_needs_restart = true; + self.inner.lock().network_needs_restart = true; Ok(true) } @@ -286,8 +296,13 @@ impl Network { &self, dial_info: DialInfo, data: Vec, - ) -> EyreResult<()> { + ) -> EyreResult> { let data_len = data.len(); + let connect_timeout_ms = { + let c = self.config.get(); + c.network.connection_initial_timeout_ms + }; + match dial_info.protocol_type() { ProtocolType::UDP => { let peer_socket_addr = dial_info.to_socket_addr(); @@ -296,19 +311,28 @@ impl Network { .wrap_err("create socket failure")?; h.send_message(data, peer_socket_addr) .await + .map(NetworkResult::Value) .wrap_err("send message failure")?; } ProtocolType::TCP => { let peer_socket_addr = dial_info.to_socket_addr(); - let pnc = RawTcpProtocolHandler::connect(None, peer_socket_addr) - .await - .wrap_err("connect failure")?; + let pnc = network_result_try!(RawTcpProtocolHandler::connect( + None, + peer_socket_addr, + connect_timeout_ms + ) + .await + .wrap_err("connect failure")?); pnc.send(data).await.wrap_err("send failure")?; } ProtocolType::WS | ProtocolType::WSS => { - let pnc = WebsocketProtocolHandler::connect(None, &dial_info) - .await - .wrap_err("connect failure")?; + let pnc = network_result_try!(WebsocketProtocolHandler::connect( + None, + &dial_info, + connect_timeout_ms + ) + .await + .wrap_err("connect failure")?); pnc.send(data).await.wrap_err("send failure")?; } } @@ -316,7 +340,7 @@ impl Network { self.network_manager() .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); - Ok(()) + Ok(NetworkResult::Value(())) } // Send data to a dial info, unbound, using a new connection from a random port @@ -324,14 +348,19 @@ impl Network { // This creates a short-lived connection in the case of connection-oriented protocols // for the purpose of sending this one message. // This bypasses the connection table as it is not a 'node to node' connection. - #[instrument(level="trace", err, skip(self, data), fields(ret.timeout_or, data.len = data.len()))] + #[instrument(level="trace", err, skip(self, data), fields(data.len = data.len()))] pub async fn send_recv_data_unbound_to_dial_info( &self, dial_info: DialInfo, data: Vec, timeout_ms: u32, - ) -> EyreResult>> { + ) -> EyreResult>> { let data_len = data.len(); + let connect_timeout_ms = { + let c = self.config.get(); + c.network.connection_initial_timeout_ms + }; + match dial_info.protocol_type() { ProtocolType::UDP => { let peer_socket_addr = dial_info.to_socket_addr(); @@ -346,18 +375,11 @@ impl Network { // receive single response let mut out = vec![0u8; MAX_MESSAGE_SIZE]; - let timeout_or_ret = timeout(timeout_ms, h.recv_message(&mut out)) - .await - .into_timeout_or() - .into_result() + let (recv_len, recv_addr) = + network_result_try!(timeout(timeout_ms, h.recv_message(&mut out)) + .await + .into_network_result()) .wrap_err("recv_message failure")?; - let (recv_len, recv_addr) = match timeout_or_ret { - TimeoutOr::Value(v) => v, - TimeoutOr::Timeout => { - tracing::Span::current().record("ret.timeout_or", &"Timeout".to_owned()); - return Ok(TimeoutOr::Timeout); - } - }; let recv_socket_addr = recv_addr.remote_address().to_socket_addr(); self.network_manager() @@ -368,48 +390,37 @@ impl Network { bail!("wrong address"); } out.resize(recv_len, 0u8); - Ok(TimeoutOr::Value(out)) + Ok(NetworkResult::Value(out)) } ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS => { - let pnc = match dial_info.protocol_type() { + let pnc = network_result_try!(match dial_info.protocol_type() { ProtocolType::UDP => unreachable!(), ProtocolType::TCP => { let peer_socket_addr = dial_info.to_socket_addr(); - RawTcpProtocolHandler::connect(None, peer_socket_addr) + RawTcpProtocolHandler::connect(None, peer_socket_addr, connect_timeout_ms) .await .wrap_err("connect failure")? } ProtocolType::WS | ProtocolType::WSS => { - WebsocketProtocolHandler::connect(None, &dial_info) + WebsocketProtocolHandler::connect(None, &dial_info, connect_timeout_ms) .await .wrap_err("connect failure")? } - }; + }); pnc.send(data).await.wrap_err("send failure")?; self.network_manager() .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); - let out = timeout(timeout_ms, pnc.recv()) + let out = network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv()) .await - .into_timeout_or() - .into_result() - .wrap_err("recv failure")?; + .into_network_result()) + .wrap_err("recv failure")?); - tracing::Span::current().record( - "ret.timeout_or", - &match out { - TimeoutOr::>::Value(ref v) => format!("Value(len={})", v.len()), - TimeoutOr::>::Timeout => "Timeout".to_owned(), - }, - ); + self.network_manager() + .stats_packet_rcvd(dial_info.to_ip_addr(), out.len() as u64); - if let TimeoutOr::Value(out) = &out { - self.network_manager() - .stats_packet_rcvd(dial_info.to_ip_addr(), out.len() as u64); - } - - Ok(out) + Ok(NetworkResult::Value(out)) } } } @@ -449,21 +460,27 @@ impl Network { // Try to send to the exact existing connection if one exists if let Some(conn) = self.connection_manager().get_connection(descriptor).await { // connection exists, send over it - conn.send_async(data) - .await - .wrap_err("sending data to existing connection")?; + match conn.send_async(data).await { + ConnectionHandleSendResult::Sent => { + // Network accounting + self.network_manager().stats_packet_sent( + descriptor.remote().to_socket_addr().ip(), + data_len as u64, + ); - // Network accounting - self.network_manager() - .stats_packet_sent(descriptor.remote().to_socket_addr().ip(), data_len as u64); - - // Data was consumed - Ok(None) - } else { - // Connection or didn't exist - // Pass the data back out so we don't own it any more - Ok(Some(data)) + // Data was consumed + return Ok(None); + } + ConnectionHandleSendResult::NotSent(data) => { + // Couldn't send + // Pass the data back out so we don't own it any more + return Ok(Some(data)); + } + } } + // Connection didn't exist + // Pass the data back out so we don't own it any more + Ok(Some(data)) } // Send data directly to a dial info, possibly without knowing which node it is going to @@ -472,40 +489,42 @@ impl Network { &self, dial_info: DialInfo, data: Vec, - ) -> EyreResult<()> { + ) -> EyreResult> { let data_len = data.len(); - // Handle connectionless protocol if dial_info.protocol_type() == ProtocolType::UDP { + // Handle connectionless protocol let peer_socket_addr = dial_info.to_socket_addr(); - if let Some(ph) = self.find_best_udp_protocol_handler(&peer_socket_addr, &None) { - let res = ph - .send_message(data, peer_socket_addr) - .await - .wrap_err("failed to send data to dial info"); - if res.is_ok() { - // Network accounting - self.network_manager() - .stats_packet_sent(peer_socket_addr.ip(), data_len as u64); - } - return res; + let ph = match self.find_best_udp_protocol_handler(&peer_socket_addr, &None) { + Some(ph) => ph, + None => bail!("no appropriate UDP protocol handler for dial_info"), + }; + network_result_try!(ph + .send_message(data, peer_socket_addr) + .await + .into_network_result() + .wrap_err("failed to send data to dial info")?); + } else { + // Handle connection-oriented protocols + let local_addr = self.get_preferred_local_address(&dial_info); + let conn = network_result_try!( + self.connection_manager() + .get_or_create_connection(Some(local_addr), dial_info.clone()) + .await? + ); + + if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await { + return Ok(NetworkResult::NoConnection(io::Error::new( + io::ErrorKind::ConnectionReset, + "failed to send", + ))); } - bail!("no appropriate UDP protocol handler for dial_info"); } - // Handle connection-oriented protocols - let local_addr = self.get_preferred_local_address(&dial_info); - let conn = self - .connection_manager() - .get_or_create_connection(Some(local_addr), dial_info.clone()) - .await?; + // Network accounting + self.network_manager() + .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); - let res = conn.send_async(data).await; - if res.is_ok() { - // Network accounting - self.network_manager() - .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); - } - res + Ok(NetworkResult::Value(())) } ///////////////////////////////////////////////////////////////// @@ -517,15 +536,13 @@ impl Network { #[instrument(level = "debug", err, skip_all)] pub async fn startup(&self) -> EyreResult<()> { // initialize interfaces - let mut interfaces = NetworkInterfaces::new(); - interfaces.refresh().await?; + self.unlocked_inner.interfaces.refresh().await?; let protocol_config = { let mut inner = self.inner.lock(); // Create stop source inner.stop_source = Some(StopSource::new()); - inner.interfaces = interfaces; // get protocol config let protocol_config = { @@ -666,6 +683,19 @@ impl Network { ////////////////////////////////////////// + #[instrument(level = "trace", skip(self), err)] + pub async fn network_interfaces_task_routine( + self, + stop_token: StopToken, + _l: u64, + _t: u64, + ) -> EyreResult<()> { + if self.check_interface_addresses().await? { + info!("interface addresses changed, restarting network"); + } + Ok(()) + } + pub async fn tick(&self) -> EyreResult<()> { let network_class = self.get_network_class().unwrap_or(NetworkClass::Invalid); let routing_table = self.routing_table(); @@ -680,6 +710,12 @@ impl Network { } } + // If we aren't resetting the network already, + // check our network interfaces to see if they have changed + if !self.needs_restart() { + self.unlocked_inner.network_interfaces_task.tick().await?; + } + Ok(()) } } diff --git a/veilid-core/src/network_manager/native/network_tcp.rs b/veilid-core/src/network_manager/native/network_tcp.rs index 1fbd0e6b..de0a3d36 100644 --- a/veilid-core/src/network_manager/native/network_tcp.rs +++ b/veilid-core/src/network_manager/native/network_tcp.rs @@ -288,7 +288,7 @@ impl Network { for ip_addr in ip_addrs { let addr = SocketAddr::new(ip_addr, port); - let idi_addrs = Self::translate_unspecified_address(&*(self.inner.lock()), &addr); + let idi_addrs = self.translate_unspecified_address(&addr); // see if we've already bound to this already // if not, spawn a listener diff --git a/veilid-core/src/network_manager/native/network_udp.rs b/veilid-core/src/network_manager/native/network_udp.rs index 1627afb0..b00bf643 100644 --- a/veilid-core/src/network_manager/native/network_udp.rs +++ b/veilid-core/src/network_manager/native/network_udp.rs @@ -214,7 +214,7 @@ impl Network { .inbound_udp_protocol_handlers .contains_key(&addr) { - let idi_addrs = Self::translate_unspecified_address(&*self.inner.lock(), &addr); + let idi_addrs = self.translate_unspecified_address(&addr); self.clone().create_udp_inbound_socket(addr).await?; diff --git a/veilid-core/src/network_manager/native/protocol/mod.rs b/veilid-core/src/network_manager/native/protocol/mod.rs index f1aef2f7..4c2dab23 100644 --- a/veilid-core/src/network_manager/native/protocol/mod.rs +++ b/veilid-core/src/network_manager/native/protocol/mod.rs @@ -22,16 +22,22 @@ impl ProtocolNetworkConnection { pub async fn connect( local_address: Option, dial_info: &DialInfo, - ) -> io::Result { + timeout_ms: u32, + ) -> io::Result> { match dial_info.protocol_type() { ProtocolType::UDP => { panic!("Should not connect to UDP dialinfo"); } ProtocolType::TCP => { - tcp::RawTcpProtocolHandler::connect(local_address, dial_info.to_socket_addr()).await + tcp::RawTcpProtocolHandler::connect( + local_address, + dial_info.to_socket_addr(), + timeout_ms, + ) + .await } ProtocolType::WS | ProtocolType::WSS => { - ws::WebsocketProtocolHandler::connect(local_address, dial_info).await + ws::WebsocketProtocolHandler::connect(local_address, dial_info, timeout_ms).await } } } @@ -46,7 +52,7 @@ impl ProtocolNetworkConnection { } } - // pub async fn close(&self) -> io::Result<()> { + // pub async fn close(&self) -> io::Result> { // match self { // Self::Dummy(d) => d.close(), // Self::RawTcp(t) => t.close().await, @@ -56,7 +62,7 @@ impl ProtocolNetworkConnection { // } // } - pub async fn send(&self, message: Vec) -> io::Result<()> { + pub async fn send(&self, message: Vec) -> io::Result> { match self { Self::Dummy(d) => d.send(message), Self::RawTcp(t) => t.send(message).await, @@ -65,7 +71,7 @@ impl ProtocolNetworkConnection { Self::Wss(w) => w.send(message).await, } } - pub async fn recv(&self) -> io::Result> { + pub async fn recv(&self) -> io::Result>> { match self { Self::Dummy(d) => d.recv(), Self::RawTcp(t) => t.recv().await, diff --git a/veilid-core/src/network_manager/native/protocol/sockets.rs b/veilid-core/src/network_manager/native/protocol/sockets.rs index 6c4b3cc5..942b1f35 100644 --- a/veilid-core/src/network_manager/native/protocol/sockets.rs +++ b/veilid-core/src/network_manager/native/protocol/sockets.rs @@ -166,7 +166,11 @@ pub fn new_bound_first_tcp_socket(local_address: SocketAddr) -> io::Result io::Result { +pub async fn nonblocking_connect( + socket: Socket, + addr: SocketAddr, + timeout_ms: u32, +) -> io::Result> { // Set for non blocking connect socket.set_nonblocking(true)?; @@ -185,9 +189,10 @@ pub async fn nonblocking_connect(socket: Socket, addr: SocketAddr) -> io::Result let async_stream = Async::new(std::net::TcpStream::from(socket))?; // The stream becomes writable when connected - intf::timeout(2000, async_stream.writable()) + intf::timeout(timeout_ms, async_stream.writable()) .await - .map_err(|e| std::io::Error::new(std::io::ErrorKind::TimedOut, e))??; + .into_timeout_or() + .into_result()?; // Check low level error let async_stream = match async_stream.get_ref().take_error()? { @@ -198,9 +203,9 @@ pub async fn nonblocking_connect(socket: Socket, addr: SocketAddr) -> io::Result // Convert back to inner and then return async version cfg_if! { if #[cfg(feature="rt-async-std")] { - Ok(TcpStream::from(async_stream.into_inner()?)) + Ok(TimeoutOr::Value(TcpStream::from(async_stream.into_inner()?))) } else if #[cfg(feature="rt-tokio")] { - Ok(TcpStream::from_std(async_stream.into_inner()?)?) + Ok(TimeoutOr::Value(TcpStream::from_std(async_stream.into_inner()?)?)) } } } diff --git a/veilid-core/src/network_manager/native/protocol/tcp.rs b/veilid-core/src/network_manager/native/protocol/tcp.rs index 959b5c34..fcce1743 100644 --- a/veilid-core/src/network_manager/native/protocol/tcp.rs +++ b/veilid-core/src/network_manager/native/protocol/tcp.rs @@ -23,7 +23,7 @@ impl RawTcpNetworkConnection { } // #[instrument(level = "trace", err, skip(self))] - // pub async fn close(&mut self) -> io::Result<()> { + // pub async fn close(&mut self) -> io::Result> { // // Make an attempt to flush the stream // self.stream.clone().close().await?; // // Then shut down the write side of the socket to effect a clean close @@ -40,7 +40,10 @@ impl RawTcpNetworkConnection { // } // } - async fn send_internal(stream: &mut AsyncPeekStream, message: Vec) -> io::Result<()> { + async fn send_internal( + stream: &mut AsyncPeekStream, + message: Vec, + ) -> io::Result> { log_net!("sending TCP message of size {}", message.len()); if message.len() > MAX_MESSAGE_SIZE { bail_io_error_other!("sending too large TCP message"); @@ -48,20 +51,29 @@ impl RawTcpNetworkConnection { let len = message.len() as u16; let header = [b'V', b'L', len as u8, (len >> 8) as u8]; - stream.write_all(&header).await?; - stream.write_all(&message).await + stream.write_all(&header).await.into_network_result()?; + stream.write_all(&message).await.into_network_result() } - #[instrument(level="trace", err, skip(self, message), fields(message.len = message.len()))] - pub async fn send(&self, message: Vec) -> io::Result<()> { + #[instrument(level="trace", err, skip(self, message), fields(network_result, message.len = message.len()))] + pub async fn send(&self, message: Vec) -> io::Result> { let mut stream = self.stream.clone(); - Self::send_internal(&mut stream, message).await + let out = Self::send_internal(&mut stream, message).await?; + tracing::Span::current().record( + "network_result", + &match &out { + NetworkResult::Timeout => "Timeout".to_owned(), + NetworkResult::NoConnection(e) => format!("No connection: {}", e), + NetworkResult::Value(()) => "Value(())".to_owned(), + }, + ); + Ok(out) } - async fn recv_internal(stream: &mut AsyncPeekStream) -> io::Result> { + async fn recv_internal(stream: &mut AsyncPeekStream) -> io::Result>> { let mut header = [0u8; 4]; - stream.read_exact(&mut header).await?; + stream.read_exact(&mut header).await.into_network_result()?; if header[0] != b'V' || header[1] != b'L' { bail_io_error_other!("received invalid TCP frame header"); @@ -72,16 +84,23 @@ impl RawTcpNetworkConnection { } let mut out: Vec = vec![0u8; len]; - stream.read_exact(&mut out).await?; + stream.read_exact(&mut out).await.into_network_result()?; - Ok(out) + Ok(NetworkResult::Value(out)) } - #[instrument(level="trace", err, skip(self), fields(ret.len))] - pub async fn recv(&self) -> io::Result> { + #[instrument(level = "trace", err, skip(self), fields(network_result))] + pub async fn recv(&self) -> io::Result>> { let mut stream = self.stream.clone(); let out = Self::recv_internal(&mut stream).await?; - tracing::Span::current().record("ret.len", &out.len()); + tracing::Span::current().record( + "network_result", + &match &out { + NetworkResult::Timeout => "Timeout".to_owned(), + NetworkResult::NoConnection(e) => format!("No connection: {}", e), + NetworkResult::Value(v) => format!("Value(len={})", v.len()), + }, + ); Ok(out) } } @@ -142,7 +161,8 @@ impl RawTcpProtocolHandler { pub async fn connect( local_address: Option, socket_addr: SocketAddr, - ) -> io::Result { + timeout_ms: u32, + ) -> io::Result> { // Make a shared socket let socket = match local_address { Some(a) => new_bound_shared_tcp_socket(a)?, @@ -150,7 +170,9 @@ impl RawTcpProtocolHandler { }; // Non-blocking connect to remote address - let ts = nonblocking_connect(socket, socket_addr).await?; + let ts = network_result_try!(nonblocking_connect(socket, socket_addr, timeout_ms) + .await + .folded()?); // See what local address we ended up with and turn this into a stream let actual_local_address = ts.local_addr()?; @@ -170,77 +192,8 @@ impl RawTcpProtocolHandler { ps, )); - Ok(conn) + Ok(NetworkResult::Value(conn)) } - - // #[instrument(level = "trace", err, skip(data), fields(data.len = data.len()))] - // pub async fn send_unbound_message(socket_addr: SocketAddr, data: Vec) -> io::Result<()> { - // if data.len() > MAX_MESSAGE_SIZE { - // bail_io_error_other!("sending too large unbound TCP message"); - // } - // // Make a shared socket - // let socket = new_unbound_shared_tcp_socket(socket2::Domain::for_address(socket_addr))?; - - // // Non-blocking connect to remote address - // let ts = nonblocking_connect(socket, socket_addr).await?; - - // // See what local address we ended up with and turn this into a stream - // // let actual_local_address = ts - // // .local_addr() - // // .map_err(map_to_string) - // // .map_err(logthru_net!("could not get local address from TCP stream"))?; - - // #[cfg(feature = "rt-tokio")] - // let ts = ts.compat(); - // let mut ps = AsyncPeekStream::new(ts); - - // // Send directly from the raw network connection - // // this builds the connection and tears it down immediately after the send - // RawTcpNetworkConnection::send_internal(&mut ps, data).await - // } - - // #[instrument(level = "trace", err, skip(data), fields(data.len = data.len(), ret.timeout_or))] - // pub async fn send_recv_unbound_message( - // socket_addr: SocketAddr, - // data: Vec, - // timeout_ms: u32, - // ) -> io::Result>> { - // if data.len() > MAX_MESSAGE_SIZE { - // bail_io_error_other!("sending too large unbound TCP message"); - // } - - // // Make a shared socket - // let socket = new_unbound_shared_tcp_socket(socket2::Domain::for_address(socket_addr))?; - - // // Non-blocking connect to remote address - // let ts = nonblocking_connect(socket, socket_addr).await?; - - // // See what local address we ended up with and turn this into a stream - // // let actual_local_address = ts - // // .local_addr() - // // .map_err(map_to_string) - // // .map_err(logthru_net!("could not get local address from TCP stream"))?; - // #[cfg(feature = "rt-tokio")] - // let ts = ts.compat(); - // let mut ps = AsyncPeekStream::new(ts); - - // // Send directly from the raw network connection - // // this builds the connection and tears it down immediately after the send - // RawTcpNetworkConnection::send_internal(&mut ps, data).await?; - // let out = timeout(timeout_ms, RawTcpNetworkConnection::recv_internal(&mut ps)) - // .await - // .into_timeout_or() - // .into_result()?; - - // tracing::Span::current().record( - // "ret.timeout_or", - // &match out { - // TimeoutOr::>::Value(ref v) => format!("Value(len={})", v.len()), - // TimeoutOr::>::Timeout => "Timeout".to_owned(), - // }, - // ); - // Ok(out) - // } } impl ProtocolAcceptHandler for RawTcpProtocolHandler { @@ -248,7 +201,7 @@ impl ProtocolAcceptHandler for RawTcpProtocolHandler { &self, stream: AsyncPeekStream, peer_addr: SocketAddr, - ) -> SystemPinBoxFuture>> { + ) -> SendPinBoxFuture>> { Box::pin(self.clone().on_accept_async(stream, peer_addr)) } } diff --git a/veilid-core/src/network_manager/native/protocol/udp.rs b/veilid-core/src/network_manager/native/protocol/udp.rs index 3ea3ce17..95804099 100644 --- a/veilid-core/src/network_manager/native/protocol/udp.rs +++ b/veilid-core/src/network_manager/native/protocol/udp.rs @@ -69,56 +69,4 @@ impl RawUdpProtocolHandler { let socket = UdpSocket::bind(local_socket_addr).await?; Ok(RawUdpProtocolHandler::new(Arc::new(socket))) } - - // #[instrument(level = "trace", err, skip(data), fields(data.len = data.len(), ret.timeout_or))] - // pub async fn send_recv_unbound_message( - // socket_addr: SocketAddr, - // data: Vec, - // timeout_ms: u32, - // ) -> io::Result>> { - // if data.len() > MAX_MESSAGE_SIZE { - // bail_io_error_other!("sending too large unbound UDP message"); - // } - - // // get local wildcard address for bind - // let local_socket_addr = match socket_addr { - // SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), - // SocketAddr::V6(_) => { - // SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 0) - // } - // }; - - // // get unspecified bound socket - // let socket = UdpSocket::bind(local_socket_addr).await?; - // let len = socket.send_to(&data, socket_addr).await?; - // if len != data.len() { - // bail_io_error_other!("UDP partial unbound send"); - // } - - // // receive single response - // let mut out = vec![0u8; MAX_MESSAGE_SIZE]; - // let timeout_or_ret = timeout(timeout_ms, socket.recv_from(&mut out)) - // .await - // .into_timeout_or() - // .into_result()?; - // let (len, from_addr) = match timeout_or_ret { - // TimeoutOr::Value(v) => v, - // TimeoutOr::Timeout => { - // tracing::Span::current().record("ret.timeout_or", &"Timeout".to_owned()); - // return Ok(TimeoutOr::Timeout); - // } - // }; - - // // if the from address is not the same as the one we sent to, then drop this - // if from_addr != socket_addr { - // bail_io_error_other!(format!( - // "Unbound response received from wrong address: addr={}", - // from_addr, - // )); - // } - // out.resize(len, 0u8); - - // tracing::Span::current().record("ret.timeout_or", &format!("Value(len={})", out.len())); - // Ok(TimeoutOr::Value(out)) - // } } diff --git a/veilid-core/src/network_manager/native/protocol/ws.rs b/veilid-core/src/network_manager/native/protocol/ws.rs index 283309ea..4c1d2385 100644 --- a/veilid-core/src/network_manager/native/protocol/ws.rs +++ b/veilid-core/src/network_manager/native/protocol/ws.rs @@ -80,33 +80,45 @@ where // .map_err(to_io) // } - #[instrument(level = "trace", err, skip(self, message), fields(message.len = message.len()))] - pub async fn send(&self, message: Vec) -> io::Result<()> { + #[instrument(level = "trace", err, skip(self, message), fields(network_result, message.len = message.len()))] + pub async fn send(&self, message: Vec) -> io::Result> { if message.len() > MAX_MESSAGE_SIZE { bail_io_error_other!("received too large WS message"); } - self.stream + let out = self + .stream .clone() .send(Message::binary(message)) .await .map_err(to_io) + .into_network_result()?; + tracing::Span::current().record( + "network_result", + &match &out { + NetworkResult::Timeout => "Timeout".to_owned(), + NetworkResult::NoConnection(e) => format!("No connection: {}", e), + NetworkResult::Value(()) => "Value(())".to_owned(), + }, + ); + Ok(out) } - #[instrument(level = "trace", err, skip(self), fields(ret.len))] - pub async fn recv(&self) -> io::Result> { + #[instrument(level = "trace", err, skip(self), fields(network_result, ret.len))] + pub async fn recv(&self) -> io::Result>> { let out = match self.stream.clone().next().await { Some(Ok(Message::Binary(v))) => { if v.len() > MAX_MESSAGE_SIZE { return Err(io::Error::new( - io::ErrorKind::ConnectionReset, + io::ErrorKind::InvalidData, "too large ws message", )); } - v - } - Some(Ok(Message::Close(_))) => { - return Err(io::Error::new(io::ErrorKind::ConnectionReset, "closeframe")) + NetworkResult::Value(v) } + Some(Ok(Message::Close(_))) => NetworkResult::NoConnection(io::Error::new( + io::ErrorKind::ConnectionReset, + "closeframe", + )), Some(Ok(x)) => { return Err(io::Error::new( io::ErrorKind::InvalidData, @@ -114,15 +126,20 @@ where )); } Some(Err(e)) => return Err(to_io(e)), - None => { - return Err(io::Error::new( - io::ErrorKind::ConnectionReset, - "connection ended", - )) - } + None => NetworkResult::NoConnection(io::Error::new( + io::ErrorKind::ConnectionReset, + "connection ended", + )), }; - tracing::Span::current().record("ret.len", &out.len()); + tracing::Span::current().record( + "network_result", + &match &out { + NetworkResult::Timeout => "Timeout".to_owned(), + NetworkResult::NoConnection(e) => format!("No connection: {}", e), + NetworkResult::Value(v) => format!("Value(len={})", v.len()), + }, + ); Ok(out) } } @@ -227,7 +244,8 @@ impl WebsocketProtocolHandler { pub async fn connect( local_address: Option, dial_info: &DialInfo, - ) -> io::Result { + timeout_ms: u32, + ) -> io::Result> { // Split dial info up let (tls, scheme) = match dial_info { DialInfo::WS(_) => (false, "ws"), @@ -253,7 +271,10 @@ impl WebsocketProtocolHandler { }; // Non-blocking connect to remote address - let tcp_stream = nonblocking_connect(socket, remote_socket_addr).await?; + let tcp_stream = + network_result_try!(nonblocking_connect(socket, remote_socket_addr, timeout_ms) + .await + .folded()?); // See what local address we ended up with let actual_local_addr = tcp_stream.local_addr()?; @@ -274,16 +295,16 @@ impl WebsocketProtocolHandler { .await .map_err(to_io_error_other)?; - Ok(ProtocolNetworkConnection::Wss( + Ok(NetworkResult::Value(ProtocolNetworkConnection::Wss( WebsocketNetworkConnection::new(descriptor, ws_stream), - )) + ))) } else { let (ws_stream, _response) = client_async(request, tcp_stream) .await .map_err(to_io_error_other)?; - Ok(ProtocolNetworkConnection::Ws( + Ok(NetworkResult::Value(ProtocolNetworkConnection::Ws( WebsocketNetworkConnection::new(descriptor, ws_stream), - )) + ))) } } } @@ -293,7 +314,7 @@ impl ProtocolAcceptHandler for WebsocketProtocolHandler { &self, stream: AsyncPeekStream, peer_addr: SocketAddr, - ) -> SystemPinBoxFuture>> { + ) -> SendPinBoxFuture>> { Box::pin(self.clone().on_accept_async(stream, peer_addr)) } } diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index 50b36d8c..9adb29ba 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -16,7 +16,7 @@ cfg_if::cfg_if! { &self, stream: AsyncPeekStream, peer_addr: SocketAddr, - ) -> SystemPinBoxFuture>>; + ) -> SendPinBoxFuture>>; } pub trait ProtocolAcceptHandlerClone { @@ -56,11 +56,11 @@ impl DummyNetworkConnection { // pub fn close(&self) -> io::Result<()> { // Ok(()) // } - pub fn send(&self, _message: Vec) -> io::Result<()> { - Ok(()) + pub fn send(&self, _message: Vec) -> io::Result> { + Ok(NetworkResult::Value(())) } - pub fn recv(&self) -> io::Result> { - Ok(Vec::new()) + pub fn recv(&self) -> io::Result>> { + Ok(NetworkResult::Value(Vec::new())) } } @@ -179,26 +179,26 @@ impl NetworkConnection { protocol_connection: &ProtocolNetworkConnection, stats: Arc>, message: Vec, - ) -> io::Result<()> { + ) -> io::Result> { let ts = intf::get_timestamp(); - let out = protocol_connection.send(message).await; - if out.is_ok() { - let mut stats = stats.lock(); - stats.last_message_sent_time.max_assign(Some(ts)); - } - out + let out = network_result_try!(protocol_connection.send(message).await?); + + let mut stats = stats.lock(); + stats.last_message_sent_time.max_assign(Some(ts)); + + Ok(NetworkResult::Value(out)) } async fn recv_internal( protocol_connection: &ProtocolNetworkConnection, stats: Arc>, - ) -> io::Result> { + ) -> io::Result>> { let ts = intf::get_timestamp(); - let out = protocol_connection.recv().await; - if out.is_ok() { - let mut stats = stats.lock(); - stats.last_message_recv_time.max_assign(Some(ts)); - } - out + let out = network_result_try!(protocol_connection.recv().await?); + + let mut stats = stats.lock(); + stats.last_message_recv_time.max_assign(Some(ts)); + + Ok(NetworkResult::Value(out)) } pub fn stats(&self) -> NetworkConnectionStats { @@ -220,7 +220,7 @@ impl NetworkConnection { protocol_connection: ProtocolNetworkConnection, connection_inactivity_timeout_ms: u32, stats: Arc>, - ) -> SystemPinBoxFuture<()> { + ) -> SendPinBoxFuture<()> { Box::pin(async move { log_net!( "== Starting process_connection loop for {:?}", @@ -283,7 +283,7 @@ impl NetworkConnection { let receiver_fut = Self::recv_internal(&protocol_connection, stats.clone()) .then(|res| async { match res { - Ok(message) => { + Ok(NetworkResult::Value(message)) => { // Pass received messages up to the network manager for processing if let Err(e) = network_manager .on_recv_envelope(message.as_slice(), descriptor) @@ -295,9 +295,19 @@ impl NetworkConnection { RecvLoopAction::Recv } } + Ok(NetworkResult::Timeout) => { + // Connection unable to receive, closed + log_net!(debug "Timeout"); + RecvLoopAction::Finish + } + Ok(NetworkResult::NoConnection(e)) => { + // Connection unable to receive, closed + log_net!(debug "No connection: {}", e); + RecvLoopAction::Finish + } Err(e) => { // Connection unable to receive, closed - log_net!(debug e); + log_net!(error e); RecvLoopAction::Finish } } diff --git a/veilid-core/src/network_manager/wasm/mod.rs b/veilid-core/src/network_manager/wasm/mod.rs index 4c8a9184..9da7863a 100644 --- a/veilid-core/src/network_manager/wasm/mod.rs +++ b/veilid-core/src/network_manager/wasm/mod.rs @@ -56,8 +56,12 @@ impl Network { &self, dial_info: DialInfo, data: Vec, - ) -> EyreResult<()> { + ) -> EyreResult> { let data_len = data.len(); + let timeout_ms = { + let c = self.config().get(); + c.network.connection_initial_timeout_ms; + }; match dial_info.protocol_type() { ProtocolType::UDP => { @@ -67,7 +71,7 @@ impl Network { bail!("no support for TCP protocol") } ProtocolType::WS | ProtocolType::WSS => { - let pnc = WebsocketProtocolHandler::connect(None, &dial_info) + let pnc = WebsocketProtocolHandler::connect(None, &dial_info, timeout_ms) .await .wrap_err("connect failure")?; pnc.send(data).await.wrap_err("send failure")?; @@ -92,8 +96,13 @@ impl Network { dial_info: DialInfo, data: Vec, timeout_ms: u32, - ) -> EyreResult>> { + ) -> EyreResult>> { let data_len = data.len(); + let connect_timeout_ms = { + let c = self.config().get(); + c.network.connection_initial_timeout_ms; + }; + match dial_info.protocol_type() { ProtocolType::UDP => { bail!("no support for UDP protocol") @@ -104,14 +113,9 @@ impl Network { ProtocolType::WS | ProtocolType::WSS => { let pnc = match dial_info.protocol_type() { ProtocolType::UDP => unreachable!(), - ProtocolType::TCP => { - let peer_socket_addr = dial_info.to_socket_addr(); - RawTcpProtocolHandler::connect(None, peer_socket_addr) - .await - .wrap_err("connect failure")? - } + ProtocolType::TCP => unreachable!(), ProtocolType::WS | ProtocolType::WSS => { - WebsocketProtocolHandler::connect(None, &dial_info) + WebsocketProtocolHandler::connect(None, &dial_info, connect_timeout_ms) .await .wrap_err("connect failure")? } @@ -187,7 +191,7 @@ impl Network { &self, dial_info: DialInfo, data: Vec, - ) -> EyreResult<()> { + ) -> EyreResult> { let data_len = data.len(); if dial_info.protocol_type() == ProtocolType::UDP { bail!("no support for UDP protocol"); diff --git a/veilid-core/src/network_manager/wasm/protocol/mod.rs b/veilid-core/src/network_manager/wasm/protocol/mod.rs index 28f2648a..2c16ce2c 100644 --- a/veilid-core/src/network_manager/wasm/protocol/mod.rs +++ b/veilid-core/src/network_manager/wasm/protocol/mod.rs @@ -17,7 +17,7 @@ impl ProtocolNetworkConnection { pub async fn connect( local_address: Option, dial_info: &DialInfo, - ) -> io::Result { + ) -> io::Result> { match dial_info.protocol_type() { ProtocolType::UDP => { panic!("UDP dial info is not supported on WASM targets"); @@ -38,20 +38,20 @@ impl ProtocolNetworkConnection { } } - // pub async fn close(&self) -> io::Result<()> { + // pub async fn close(&self) -> io::Result> { // match self { // Self::Dummy(d) => d.close(), // Self::Ws(w) => w.close().await, // } // } - pub async fn send(&self, message: Vec) -> io::Result<()> { + pub async fn send(&self, message: Vec) -> io::Result> { match self { Self::Dummy(d) => d.send(message), Self::Ws(w) => w.send(message).await, } } - pub async fn recv(&self) -> io::Result> { + pub async fn recv(&self) -> io::Result>> { match self { Self::Dummy(d) => d.recv(), Self::Ws(w) => w.recv().await, diff --git a/veilid-core/src/receipt_manager.rs b/veilid-core/src/receipt_manager.rs index d046b7f4..9e174f07 100644 --- a/veilid-core/src/receipt_manager.rs +++ b/veilid-core/src/receipt_manager.rs @@ -22,7 +22,7 @@ pub trait ReceiptCallback: Send + 'static { receipt: Receipt, returns_so_far: u32, expected_returns: u32, - ) -> SystemPinBoxFuture<()>; + ) -> SendPinBoxFuture<()>; } impl ReceiptCallback for T where @@ -35,7 +35,7 @@ where receipt: Receipt, returns_so_far: u32, expected_returns: u32, - ) -> SystemPinBoxFuture<()> { + ) -> SendPinBoxFuture<()> { Box::pin(self(event, receipt, returns_so_far, expected_returns)) } } @@ -189,7 +189,7 @@ impl ReceiptManager { fn perform_callback( evt: ReceiptEvent, record_mut: &mut ReceiptRecord, - ) -> Option> { + ) -> Option> { match &mut record_mut.receipt_callback { ReceiptRecordCallbackType::Normal(callback) => Some(callback.call( evt, diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 25e8e1c0..afa1eaeb 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -269,8 +269,8 @@ impl RPCProcessor { } // Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference - // Note: This routine can possible be recursive, hence the SystemPinBoxFuture async form - pub fn resolve_node(&self, node_id: DHTKey) -> SystemPinBoxFuture> { + // Note: This routine can possible be recursive, hence the SendPinBoxFuture async form + pub fn resolve_node(&self, node_id: DHTKey) -> SendPinBoxFuture> { let this = self.clone(); Box::pin(async move { let routing_table = this.routing_table(); diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index 472c2a1c..071e96f3 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -3,6 +3,7 @@ use super::*; impl RPCProcessor { // Send FindNodeQ RPC request, receive FindNodeA answer // Can be sent via all methods including relays and routes + #[instrument(level = "trace", skip(self), ret, err)] pub async fn rpc_call_find_node( self, dest: Destination, diff --git a/veilid-core/src/rpc_processor/rpc_node_info_update.rs b/veilid-core/src/rpc_processor/rpc_node_info_update.rs index 5893f940..e33f83c1 100644 --- a/veilid-core/src/rpc_processor/rpc_node_info_update.rs +++ b/veilid-core/src/rpc_processor/rpc_node_info_update.rs @@ -3,6 +3,7 @@ use super::*; impl RPCProcessor { // Sends a our node info to another node // Can be sent via all methods including relays and routes + #[instrument(level = "trace", skip(self), ret, err)] pub async fn rpc_call_node_info_update( self, dest: Destination, diff --git a/veilid-core/src/rpc_processor/rpc_return_receipt.rs b/veilid-core/src/rpc_processor/rpc_return_receipt.rs index 0a962081..009bd3b2 100644 --- a/veilid-core/src/rpc_processor/rpc_return_receipt.rs +++ b/veilid-core/src/rpc_processor/rpc_return_receipt.rs @@ -3,6 +3,7 @@ use super::*; impl RPCProcessor { // Sends a unidirectional in-band return receipt // Can be sent via all methods including relays and routes + #[instrument(level = "trace", skip(self, receipt), ret, err)] pub async fn rpc_call_return_receipt>( self, dest: Destination, diff --git a/veilid-core/src/rpc_processor/rpc_signal.rs b/veilid-core/src/rpc_processor/rpc_signal.rs index 8c7362fc..880e9901 100644 --- a/veilid-core/src/rpc_processor/rpc_signal.rs +++ b/veilid-core/src/rpc_processor/rpc_signal.rs @@ -3,6 +3,7 @@ use super::*; impl RPCProcessor { // Sends a unidirectional signal to a node // Can be sent via all methods including relays and routes + #[instrument(level = "trace", skip(self), ret, err)] pub async fn rpc_call_signal( self, dest: Destination, diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index 7a2385ef..3ac9adcb 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -3,6 +3,7 @@ use super::*; impl RPCProcessor { // Send StatusQ RPC request, receive StatusA answer // Can be sent via relays, but not via routes + #[instrument(level = "trace", skip(self), ret, err)] pub async fn rpc_call_status(self, peer: NodeRef) -> Result, RPCError> { let node_status = self.network_manager().generate_node_status(); let status_q = RPCOperationStatusQ { node_status }; diff --git a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs index 02e64de3..cca74db8 100644 --- a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs +++ b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs @@ -2,6 +2,7 @@ use super::*; impl RPCProcessor { // Can only be sent directly, not via relays or routes + #[instrument(level = "trace", skip(self), ret, err)] pub async fn rpc_call_validate_dial_info( self, peer: NodeRef, diff --git a/veilid-core/src/tests/common/test_host_interface.rs b/veilid-core/src/tests/common/test_host_interface.rs index 0585e559..6bad2e12 100644 --- a/veilid-core/src/tests/common/test_host_interface.rs +++ b/veilid-core/src/tests/common/test_host_interface.rs @@ -463,7 +463,7 @@ cfg_if! { pub async fn test_network_interfaces() { info!("testing network interfaces"); let t1 = intf::get_timestamp(); - let mut interfaces = intf::utils::network_interfaces::NetworkInterfaces::new(); + let interfaces = intf::utils::network_interfaces::NetworkInterfaces::new(); let count = 100; for x in 0..count { info!("loop {}", x); diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 78dd3447..f4b3595b 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -8,7 +8,7 @@ pub use serialize_helpers::*; use crate::*; pub use crate::xx::{ - IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, SystemPinBoxFuture, + IpAddr, Ipv4Addr, Ipv6Addr, SendPinBoxFuture, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, }; pub use alloc::string::ToString; @@ -1593,7 +1593,7 @@ pub struct PeerStats { } pub type ValueChangeCallback = - Arc) -> SystemPinBoxFuture<()> + Send + Sync + 'static>; + Arc) -> SendPinBoxFuture<()> + Send + Sync + 'static>; ///////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/veilid-core/src/xx/mod.rs b/veilid-core/src/xx/mod.rs index a461c74b..d85d7aa1 100644 --- a/veilid-core/src/xx/mod.rs +++ b/veilid-core/src/xx/mod.rs @@ -11,7 +11,7 @@ mod log_thru; mod must_join_handle; mod must_join_single_future; mod mutable_future; -// mod single_future; +mod network_result; mod single_shot_eventual; mod split_url; mod tick_task; @@ -69,8 +69,6 @@ cfg_if! { pub use async_lock::Mutex as AsyncMutex; pub use async_lock::MutexGuard as AsyncMutexGuard; pub use no_std_net::{ SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, IpAddr, Ipv4Addr, Ipv6Addr }; - pub type SystemPinBoxFuture = PinBox + Send + 'static>; - pub type SystemPinBoxFutureLifetime<'a, T> = PinBox + Send + 'a>; pub use async_executors::JoinHandle as LowLevelJoinHandle; } else { pub use std::string::String; @@ -108,8 +106,6 @@ cfg_if! { } } pub use std::net::{ SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, IpAddr, Ipv4Addr, Ipv6Addr }; - pub type SystemPinBoxFuture = PinBox + Send + 'static>; - pub type SystemPinBoxFutureLifetime<'a, T> = PinBox + Send + 'a>; } } @@ -125,7 +121,7 @@ pub use ip_extra::*; pub use must_join_handle::*; pub use must_join_single_future::*; pub use mutable_future::*; -// pub use single_future::*; +pub use network_result::*; pub use single_shot_eventual::*; pub use tick_task::*; pub use timeout_or::*; diff --git a/veilid-core/src/xx/network_result.rs b/veilid-core/src/xx/network_result.rs new file mode 100644 index 00000000..737c0229 --- /dev/null +++ b/veilid-core/src/xx/network_result.rs @@ -0,0 +1,214 @@ +use super::*; +use core::fmt::{Debug, Display}; +use core::result::Result; +use std::error::Error; +use std::io; + +////////////////////////////////////////////////////////////////// +// Non-fallible network results conversions + +pub trait NetworkResultExt { + fn into_network_result(self) -> NetworkResult; +} + +impl NetworkResultExt for Result { + fn into_network_result(self) -> NetworkResult { + self.ok() + .map(|v| NetworkResult::::Value(v)) + .unwrap_or(NetworkResult::::Timeout) + } +} + +pub trait IoNetworkResultExt { + fn into_network_result(self) -> io::Result>; +} + +impl IoNetworkResultExt for io::Result { + fn into_network_result(self) -> io::Result> { + match self { + Ok(v) => Ok(NetworkResult::Value(v)), + #[cfg(feature = "io_error_more")] + Err(e) => match e.kind() { + io::ErrorKind::TimedOut => Ok(NetworkResult::Timeout), + io::ErrorKind::ConnectionAborted + | io::ErrorKind::ConnectionRefused + | io::ErrorKind::ConnectionReset + | io::ErrorKind::HostUnreachable + | io::ErrorKind::NetworkUnreachable => Ok(NetworkResult::NoConnection(e)), + _ => Err(e), + }, + #[cfg(not(feature = "io_error_more"))] + Err(e) => match e.kind() { + io::ErrorKind::TimedOut => Ok(NetworkResult::Timeout), + io::ErrorKind::ConnectionAborted + | io::ErrorKind::ConnectionRefused + | io::ErrorKind::ConnectionReset => Ok(NetworkResult::NoConnection(e)), + _ => Err(e), + }, + } + } +} + +pub trait NetworkResultResultExt { + fn into_result_network_result(self) -> Result, E>; +} + +impl NetworkResultResultExt for NetworkResult> { + fn into_result_network_result(self) -> Result, E> { + match self { + NetworkResult::Timeout => Ok(NetworkResult::::Timeout), + NetworkResult::NoConnection(e) => Ok(NetworkResult::::NoConnection(e)), + NetworkResult::Value(Ok(v)) => Ok(NetworkResult::::Value(v)), + NetworkResult::Value(Err(e)) => Err(e), + } + } +} + +pub trait FoldedNetworkResultExt { + fn folded(self) -> io::Result>; +} + +impl FoldedNetworkResultExt for io::Result> { + fn folded(self) -> io::Result> { + match self { + Ok(TimeoutOr::Timeout) => Ok(NetworkResult::Timeout), + Ok(TimeoutOr::Value(v)) => Ok(NetworkResult::Value(v)), + #[cfg(feature = "io_error_more")] + Err(e) => match e.kind() { + io::ErrorKind::TimedOut => Ok(NetworkResult::Timeout), + io::ErrorKind::ConnectionAborted + | io::ErrorKind::ConnectionRefused + | io::ErrorKind::ConnectionReset + | io::ErrorKind::HostUnreachable + | io::ErrorKind::NetworkUnreachable => Ok(NetworkResult::NoConnection(e)), + _ => Err(e), + }, + #[cfg(not(feature = "io_error_more"))] + Err(e) => match e.kind() { + io::ErrorKind::TimedOut => Ok(NetworkResult::Timeout), + io::ErrorKind::ConnectionAborted + | io::ErrorKind::ConnectionRefused + | io::ErrorKind::ConnectionReset => Ok(NetworkResult::NoConnection(e)), + _ => Err(e), + }, + } + } +} + +impl FoldedNetworkResultExt for io::Result> { + fn folded(self) -> io::Result> { + match self { + Ok(v) => Ok(v), + #[cfg(feature = "io_error_more")] + Err(e) => match e.kind() { + io::ErrorKind::TimedOut => Ok(NetworkResult::Timeout), + io::ErrorKind::ConnectionAborted + | io::ErrorKind::ConnectionRefused + | io::ErrorKind::ConnectionReset + | io::ErrorKind::HostUnreachable + | io::ErrorKind::NetworkUnreachable => Ok(NetworkResult::NoConnection(e)), + _ => Err(e), + }, + #[cfg(not(feature = "io_error_more"))] + Err(e) => match e.kind() { + io::ErrorKind::TimedOut => Ok(NetworkResult::Timeout), + io::ErrorKind::ConnectionAborted + | io::ErrorKind::ConnectionRefused + | io::ErrorKind::ConnectionReset => Ok(NetworkResult::NoConnection(e)), + _ => Err(e), + }, + } + } +} + +////////////////////////////////////////////////////////////////// +// Non-fallible network result + +pub enum NetworkResult { + Timeout, + NoConnection(io::Error), + Value(T), +} + +impl NetworkResult { + pub fn timeout() -> Self { + Self::Timeout + } + pub fn no_connection(e: io::Error) -> Self { + Self::NoConnection(e) + } + pub fn value(value: T) -> Self { + Self::Value(value) + } + + pub fn is_timeout(&self) -> bool { + matches!(self, Self::Timeout) + } + pub fn is_no_connection(&self) -> bool { + matches!(self, Self::NoConnection(_)) + } + pub fn is_value(&self) -> bool { + matches!(self, Self::Value(_)) + } + + pub fn into_result(self) -> Result { + match self { + Self::Timeout => Err(io::Error::new(io::ErrorKind::TimedOut, "Timed out")), + Self::NoConnection(e) => Err(e), + Self::Value(v) => Ok(v), + } + } +} + +impl From> for Option { + fn from(t: NetworkResult) -> Self { + match t { + NetworkResult::Value(v) => Some(v), + _ => None, + } + } +} + +// impl Clone for NetworkResult { +// fn clone(&self) -> Self { +// match self { +// Self::Timeout => Self::Timeout, +// Self::NoConnection(e) => Self::NoConnection(e.clone()), +// Self::Value(t) => Self::Value(t.clone()), +// } +// } +// } + +impl Debug for NetworkResult { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::Timeout => write!(f, "Timeout"), + Self::NoConnection(e) => f.debug_tuple("NoConnection").field(e).finish(), + Self::Value(v) => f.debug_tuple("Value").field(v).finish(), + } + } +} +impl Display for NetworkResult { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Timeout => write!(f, ""), + Self::NoConnection(e) => write!(f, "No connection: {}", e.kind()), + Self::Value(v) => write!(f, "{}", v), + } + } +} +impl Error for NetworkResult {} + +////////////////////////////////////////////////////////////////// +// Non-fallible network result macros + +#[macro_export] +macro_rules! network_result_try { + ($r: expr) => { + match $r { + NetworkResult::Timeout => return Ok(NetworkResult::Timeout), + NetworkResult::NoConnection(e) => return Ok(NetworkResult::NoConnection(e)), + NetworkResult::Value(v) => v, + } + }; +} diff --git a/veilid-core/src/xx/timeout_or.rs b/veilid-core/src/xx/timeout_or.rs index 25efa3cc..10f42cd1 100644 --- a/veilid-core/src/xx/timeout_or.rs +++ b/veilid-core/src/xx/timeout_or.rs @@ -40,7 +40,9 @@ pub trait TimeoutOrExt { impl TimeoutOrExt for Result { fn into_timeout_or(self) -> TimeoutOr { - self.ok().map(|v| TimeoutOr::::Value(v)).unwrap_or(TimeoutOr::::Timeout) + self.ok() + .map(|v| TimeoutOr::::Value(v)) + .unwrap_or(TimeoutOr::::Timeout) } } @@ -62,17 +64,16 @@ pub trait TimeoutOrResultExt { fn into_result(self) -> Result, E>; } -impl TimeoutOrResultExt for TimeoutOr> { +impl TimeoutOrResultExt for TimeoutOr> { fn into_result(self) -> Result, E> { match self { - TimeoutOr::>::Timeout => Ok(TimeoutOr::::Timeout), - TimeoutOr::>::Value(Ok(v)) => Ok(TimeoutOr::::Value(v)), - TimeoutOr::>::Value(Err(e)) => Err(e), + TimeoutOr::>::Timeout => Ok(TimeoutOr::::Timeout), + TimeoutOr::>::Value(Ok(v)) => Ok(TimeoutOr::::Value(v)), + TimeoutOr::>::Value(Err(e)) => Err(e), } } } - ////////////////////////////////////////////////////////////////// // Non-fallible timeout @@ -139,3 +140,16 @@ impl Display for TimeoutOr { } } impl Error for TimeoutOr {} + +////////////////////////////////////////////////////////////////// +// Non-fallible timeoue macros + +#[macro_export] +macro_rules! timeout_or_try { + ($r: expr) => { + match $r { + TimeoutOr::Timeout => return Ok(TimeoutOr::Timeout), + TimeoutOr::Value(v) => v, + } + }; +} diff --git a/veilid-server/Cargo.toml b/veilid-server/Cargo.toml index b93f2afd..5f8783b9 100644 --- a/veilid-server/Cargo.toml +++ b/veilid-server/Cargo.toml @@ -22,6 +22,7 @@ tracing = { version = "^0", features = ["log", "attributes"] } tracing-subscriber = { version = "^0", features = ["env-filter"] } tracing-appender = "^0" tracing-opentelemetry = "^0" +# Buggy: tracing-error = "^0" opentelemetry = { version = "^0" } opentelemetry-otlp = { version = "^0" } opentelemetry-semantic-conventions = "^0" @@ -30,7 +31,7 @@ tokio = { version = "^1", features = ["full"], optional = true } tokio-stream = { version = "^0", features = ["net"], optional = true } tokio-util = { version = "^0", features = ["compat"], optional = true} async-tungstenite = { version = "^0", features = ["async-tls"] } -color-eyre = "^0.6" +color-eyre = { version = "^0", default-features = false } clap = "^3" directories = "^4" capnp = "^0" diff --git a/veilid-server/src/veilid_logs.rs b/veilid-server/src/veilid_logs.rs index aa8b2962..d451b940 100644 --- a/veilid-server/src/veilid_logs.rs +++ b/veilid-server/src/veilid_logs.rs @@ -31,6 +31,11 @@ impl VeilidLogs { let mut layers = Vec::new(); let mut filters = BTreeMap::new(); + // Error layer + // XXX: Spantrace capture causes rwlock deadlocks/crashes + // XXX: + //layers.push(tracing_error::ErrorLayer::default().boxed()); + // Terminal logger if settingsr.logging.terminal.enabled { let filter = veilid_core::VeilidLayerFilter::new(