From 7ed6b44d21334ead8a4672a01020bf28e8666f0e Mon Sep 17 00:00:00 2001 From: John Smith Date: Tue, 4 Oct 2022 13:09:03 -0400 Subject: [PATCH] better race condition handling --- .../src/network_manager/connection_manager.rs | 27 +++++----- veilid-core/src/network_manager/mod.rs | 53 ++++++++++++++++--- veilid-core/src/network_manager/native/mod.rs | 12 +++-- .../src/network_manager/native/network_udp.rs | 2 +- .../native/protocol/sockets.rs | 29 ++-------- veilid-core/src/routing_table/mod.rs | 17 +++++- veilid-core/src/veilid_api/debug.rs | 3 ++ veilid-core/src/veilid_api/routing_context.rs | 5 +- veilid-core/src/xx/network_result.rs | 28 +++++++++- 9 files changed, 120 insertions(+), 56 deletions(-) diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index 7e2923b7..f535ea5d 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -289,28 +289,29 @@ impl ConnectionManager { .await; match result_net_res { Ok(net_res) => { - if net_res.is_value() || retry_count == 0 { - break net_res; - } - } - Err(e) => { - if retry_count == 0 { - // Try one last time to return a connection from the table, in case - // an 'accept' happened at literally the same time as our connect + // If the connection 'already exists', then try one last time to return a connection from the table, in case + // an 'accept' happened at literally the same time as our connect + if net_res.is_already_exists() { if let Some(conn) = self .arc .connection_table .get_last_connection_by_remote(peer_address) { log_net!( - "== Returning existing connection in race local_addr={:?} peer_address={:?}", - local_addr.green(), - peer_address.green() - ); + "== Returning existing connection in race local_addr={:?} peer_address={:?}", + local_addr.green(), + peer_address.green() + ); return Ok(NetworkResult::Value(conn)); } - + } + if net_res.is_value() || retry_count == 0 { + break net_res; + } + } + Err(e) => { + if retry_count == 0 { return Err(e).wrap_err("failed to connect"); } } diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 1a132f38..80301d6c 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -223,7 +223,15 @@ impl NetworkManager { this.unlocked_inner .rolling_transfers_task .set_routine(move |s, l, t| { - Box::pin(this2.clone().rolling_transfers_task_routine(s, l, t)) + Box::pin( + this2 + .clone() + .rolling_transfers_task_routine(s, l, t) + .instrument(trace_span!( + parent: None, + "NetworkManager rolling transfers task routine" + )), + ) }); } // Set relay management tick task @@ -232,7 +240,12 @@ impl NetworkManager { this.unlocked_inner .relay_management_task .set_routine(move |s, l, t| { - Box::pin(this2.clone().relay_management_task_routine(s, l, t)) + Box::pin( + this2 + .clone() + .relay_management_task_routine(s, l, t) + .instrument(trace_span!(parent: None, "relay management task routine")), + ) }); } // Set bootstrap tick task @@ -240,7 +253,14 @@ impl NetworkManager { let this2 = this.clone(); this.unlocked_inner .bootstrap_task - .set_routine(move |s, _l, _t| Box::pin(this2.clone().bootstrap_task_routine(s))); + .set_routine(move |s, _l, _t| { + Box::pin( + this2 + .clone() + .bootstrap_task_routine(s) + .instrument(trace_span!(parent: None, "bootstrap task routine")), + ) + }); } // Set peer minimum refresh tick task { @@ -248,7 +268,15 @@ impl NetworkManager { this.unlocked_inner .peer_minimum_refresh_task .set_routine(move |s, _l, _t| { - Box::pin(this2.clone().peer_minimum_refresh_task_routine(s)) + Box::pin( + this2 + .clone() + .peer_minimum_refresh_task_routine(s) + .instrument(trace_span!( + parent: None, + "peer minimum refresh task routine" + )), + ) }); } // Set ping validator tick task @@ -257,7 +285,12 @@ impl NetworkManager { this.unlocked_inner .ping_validator_task .set_routine(move |s, l, t| { - Box::pin(this2.clone().ping_validator_task_routine(s, l, t)) + Box::pin( + this2 + .clone() + .ping_validator_task_routine(s, l, t) + .instrument(trace_span!(parent: None, "ping validator task routine")), + ) }); } // Set public address check task @@ -266,7 +299,15 @@ impl NetworkManager { this.unlocked_inner .public_address_check_task .set_routine(move |s, l, t| { - Box::pin(this2.clone().public_address_check_task_routine(s, l, t)) + Box::pin( + this2 + .clone() + .public_address_check_task_routine(s, l, t) + .instrument(trace_span!( + parent: None, + "public address check task routine" + )), + ) }); } this diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 868561fa..1a58218f 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -462,11 +462,13 @@ impl Network { // receive single response let mut out = vec![0u8; MAX_MESSAGE_SIZE]; - let (recv_len, recv_addr) = - network_result_try!(timeout(timeout_ms, h.recv_message(&mut out)) - .await - .into_network_result()) - .wrap_err("recv_message failure")?; + let (recv_len, recv_addr) = network_result_try!(timeout( + timeout_ms, + h.recv_message(&mut out).instrument(Span::current()) + ) + .await + .into_network_result()) + .wrap_err("recv_message failure")?; let recv_socket_addr = recv_addr.remote_address().to_socket_addr(); self.network_manager() diff --git a/veilid-core/src/network_manager/native/network_udp.rs b/veilid-core/src/network_manager/native/network_udp.rs index 66c491bb..0d906f16 100644 --- a/veilid-core/src/network_manager/native/network_udp.rs +++ b/veilid-core/src/network_manager/native/network_udp.rs @@ -54,7 +54,7 @@ impl Network { loop { match ph - .recv_message(&mut data) + .recv_message(&mut data).instrument(Span::current()) .timeout_at(stop_token.clone()) .await { diff --git a/veilid-core/src/network_manager/native/protocol/sockets.rs b/veilid-core/src/network_manager/native/protocol/sockets.rs index 1c7b9d26..c8918e33 100644 --- a/veilid-core/src/network_manager/native/protocol/sockets.rs +++ b/veilid-core/src/network_manager/native/protocol/sockets.rs @@ -172,7 +172,8 @@ pub fn new_bound_first_tcp_socket(local_address: SocketAddr) -> io::Result Ok(()), @@ -194,28 +192,7 @@ pub async fn nonblocking_connect( Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => Ok(()), Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => Ok(()), Err(e) => Err(e), - } - .map_err(|e| { - // XXX - // warn!( - // "DEBUGCONNECT XXXFAILXXX: bind={} local={} remote={}\nbacktrace={:?}", - // bind_local_addr, - // socket.local_addr().unwrap().as_socket().unwrap(), - // addr, - // backtrace::Backtrace::new(), - // ); - e - })?; - - // XXX - // warn!( - // "DEBUGCONNECT: bind={} local={} remote={}\nbacktrace={:?}", - // bind_local_addr, - // socket.local_addr().unwrap().as_socket().unwrap(), - // addr, - // backtrace::Backtrace::new(), - // ); - + }?; let async_stream = Async::new(std::net::TcpStream::from(socket))?; // The stream becomes writable when connected diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 6a9cb6a8..cc831641 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -125,7 +125,15 @@ impl RoutingTable { this.unlocked_inner .rolling_transfers_task .set_routine(move |s, l, t| { - Box::pin(this2.clone().rolling_transfers_task_routine(s, l, t)) + Box::pin( + this2 + .clone() + .rolling_transfers_task_routine(s, l, t) + .instrument(trace_span!( + parent: None, + "RoutingTable rolling transfers task routine" + )), + ) }); } @@ -135,7 +143,12 @@ impl RoutingTable { this.unlocked_inner .kick_buckets_task .set_routine(move |s, l, t| { - Box::pin(this2.clone().kick_buckets_task_routine(s, l, t)) + Box::pin( + this2 + .clone() + .kick_buckets_task_routine(s, l, t) + .instrument(trace_span!(parent: None, "kick buckets task routine")), + ) }); } this diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index 39b128ab..16b7b6b7 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -406,6 +406,9 @@ impl VeilidAPI { NetworkResult::NoConnection(e) => { return Ok(format!("NoConnection({})", e)); } + NetworkResult::AlreadyExists(e) => { + return Ok(format!("AlreadyExists({})", e)); + } NetworkResult::InvalidMessage(e) => { return Ok(format!("InvalidMessage({})", e)); } diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index 5cfac250..b095fd3a 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -130,11 +130,12 @@ impl RoutingContext { let answer = match rpc_processor.rpc_call_app_call(dest, request).await { Ok(NetworkResult::Value(v)) => v, Ok(NetworkResult::Timeout) => return Err(VeilidAPIError::Timeout), - Ok(NetworkResult::NoConnection(e)) => { + Ok(NetworkResult::NoConnection(e)) | Ok(NetworkResult::AlreadyExists(e)) => { return Err(VeilidAPIError::NoConnection { message: e.to_string(), }) } + Ok(NetworkResult::InvalidMessage(message)) => { return Err(VeilidAPIError::Generic { message }) } @@ -159,7 +160,7 @@ impl RoutingContext { match rpc_processor.rpc_call_app_message(dest, message).await { Ok(NetworkResult::Value(())) => {} Ok(NetworkResult::Timeout) => return Err(VeilidAPIError::Timeout), - Ok(NetworkResult::NoConnection(e)) => { + Ok(NetworkResult::NoConnection(e)) | Ok(NetworkResult::AlreadyExists(e)) => { return Err(VeilidAPIError::NoConnection { message: e.to_string(), }) diff --git a/veilid-core/src/xx/network_result.rs b/veilid-core/src/xx/network_result.rs index 579a2a80..e4c72ba1 100644 --- a/veilid-core/src/xx/network_result.rs +++ b/veilid-core/src/xx/network_result.rs @@ -35,6 +35,7 @@ impl IoNetworkResultExt for io::Result { | io::ErrorKind::ConnectionReset | io::ErrorKind::HostUnreachable | io::ErrorKind::NetworkUnreachable => Ok(NetworkResult::NoConnection(e)), + io::ErrorKind::AddrNotAvailable => Ok(NetworkResult::AlreadyExists(e)), _ => Err(e), }, #[cfg(not(feature = "io_error_more"))] @@ -50,6 +51,7 @@ impl IoNetworkResultExt for io::Result { io::ErrorKind::ConnectionAborted | io::ErrorKind::ConnectionRefused | io::ErrorKind::ConnectionReset => Ok(NetworkResult::NoConnection(e)), + io::ErrorKind::AddrNotAvailable => Ok(NetworkResult::AlreadyExists(e)), _ => Err(e), } } @@ -66,6 +68,7 @@ impl NetworkResultResultExt for NetworkResult> { match self { NetworkResult::Timeout => Ok(NetworkResult::::Timeout), NetworkResult::NoConnection(e) => Ok(NetworkResult::::NoConnection(e)), + NetworkResult::AlreadyExists(e) => Ok(NetworkResult::::AlreadyExists(e)), NetworkResult::InvalidMessage(s) => Ok(NetworkResult::::InvalidMessage(s)), NetworkResult::Value(Ok(v)) => Ok(NetworkResult::::Value(v)), NetworkResult::Value(Err(e)) => Err(e), @@ -90,6 +93,7 @@ impl FoldedNetworkResultExt for io::Result> { | io::ErrorKind::ConnectionReset | io::ErrorKind::HostUnreachable | io::ErrorKind::NetworkUnreachable => Ok(NetworkResult::NoConnection(e)), + io::ErrorKind::AddrNotAvailable => Ok(NetworkResult::AlreadyExists(e)), _ => Err(e), }, #[cfg(not(feature = "io_error_more"))] @@ -105,6 +109,7 @@ impl FoldedNetworkResultExt for io::Result> { io::ErrorKind::ConnectionAborted | io::ErrorKind::ConnectionRefused | io::ErrorKind::ConnectionReset => Ok(NetworkResult::NoConnection(e)), + io::ErrorKind::AddrNotAvailable => Ok(NetworkResult::AlreadyExists(e)), _ => Err(e), } } @@ -124,6 +129,7 @@ impl FoldedNetworkResultExt for io::Result> { | io::ErrorKind::ConnectionReset | io::ErrorKind::HostUnreachable | io::ErrorKind::NetworkUnreachable => Ok(NetworkResult::NoConnection(e)), + io::ErrorKind::AddrNotAvailable => Ok(NetworkResult::AlreadyExists(e)), _ => Err(e), }, #[cfg(not(feature = "io_error_more"))] @@ -139,6 +145,7 @@ impl FoldedNetworkResultExt for io::Result> { io::ErrorKind::ConnectionAborted | io::ErrorKind::ConnectionRefused | io::ErrorKind::ConnectionReset => Ok(NetworkResult::NoConnection(e)), + io::ErrorKind::AddrNotAvailable => Ok(NetworkResult::AlreadyExists(e)), _ => Err(e), } } @@ -153,6 +160,7 @@ impl FoldedNetworkResultExt for io::Result> { pub enum NetworkResult { Timeout, NoConnection(io::Error), + AlreadyExists(io::Error), InvalidMessage(String), Value(T), } @@ -170,7 +178,9 @@ impl NetworkResult { pub fn invalid_message(s: S) -> Self { Self::InvalidMessage(s.to_string()) } - + pub fn already_exists(e: io::Error) -> Self { + Self::AlreadyExists(e) + } pub fn value(value: T) -> Self { Self::Value(value) } @@ -181,6 +191,9 @@ impl NetworkResult { pub fn is_no_connection(&self) -> bool { matches!(self, Self::NoConnection(_)) } + pub fn is_already_exists(&self) -> bool { + matches!(self, Self::AlreadyExists(_)) + } pub fn is_value(&self) -> bool { matches!(self, Self::Value(_)) } @@ -188,6 +201,7 @@ impl NetworkResult { match self { Self::Timeout => NetworkResult::::Timeout, Self::NoConnection(e) => NetworkResult::::NoConnection(e), + Self::AlreadyExists(e) => NetworkResult::::AlreadyExists(e), Self::InvalidMessage(s) => NetworkResult::::InvalidMessage(s), Self::Value(v) => NetworkResult::::Value(f(v)), } @@ -196,6 +210,7 @@ impl NetworkResult { match self { Self::Timeout => Err(io::Error::new(io::ErrorKind::TimedOut, "Timed out")), Self::NoConnection(e) => Err(e), + Self::AlreadyExists(e) => Err(e), Self::InvalidMessage(s) => Err(io::Error::new( io::ErrorKind::InvalidData, format!("Invalid message: {}", s), @@ -230,6 +245,7 @@ impl Debug for NetworkResult { match self { Self::Timeout => write!(f, "Timeout"), Self::NoConnection(e) => f.debug_tuple("NoConnection").field(e).finish(), + Self::AlreadyExists(e) => f.debug_tuple("AlreadyExists").field(e).finish(), Self::InvalidMessage(s) => f.debug_tuple("InvalidMessage").field(s).finish(), Self::Value(v) => f.debug_tuple("Value").field(v).finish(), } @@ -241,6 +257,7 @@ impl Display for NetworkResult { match self { Self::Timeout => write!(f, "Timeout"), Self::NoConnection(e) => write!(f, "NoConnection({})", e.kind()), + Self::AlreadyExists(e) => write!(f, "AlreadyExists({})", e.kind()), Self::InvalidMessage(s) => write!(f, "InvalidMessage({})", s), Self::Value(_) => write!(f, "Value"), } @@ -258,6 +275,7 @@ macro_rules! network_result_try { match $r { NetworkResult::Timeout => return Ok(NetworkResult::Timeout), NetworkResult::NoConnection(e) => return Ok(NetworkResult::NoConnection(e)), + NetworkResult::AlreadyExists(e) => return Ok(NetworkResult::AlreadyExists(e)), NetworkResult::InvalidMessage(s) => return Ok(NetworkResult::InvalidMessage(s)), NetworkResult::Value(v) => v, } @@ -272,6 +290,10 @@ macro_rules! network_result_try { $f; return Ok(NetworkResult::NoConnection(e)); } + NetworkResult::AlreadyExists(e) => { + $f; + return Ok(NetworkResult::AlreadyExists(e)); + } NetworkResult::InvalidMessage(s) => { $f; return Ok(NetworkResult::InvalidMessage(s)); @@ -293,6 +315,10 @@ macro_rules! network_result_value_or_log { log_net!($level "{}({}) at {}@{}:{}", "No connection".green(), e.to_string(), file!(), line!(), column!()); $f } + NetworkResult::AlreadyExists(e) => { + log_net!($level "{}({}) at {}@{}:{}", "Already exists".green(), e.to_string(), file!(), line!(), column!()); + $f + } NetworkResult::InvalidMessage(s) => { log_net!($level "{}({}) at {}@{}:{}", "Invalid message".green(), s, file!(), line!(), column!()); $f