From 0a01c0d23e5761439d2ffdd82d16b15e4595b223 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 2 Oct 2022 18:47:36 -0400 Subject: [PATCH] debugging, add async_tag_lock --- .../src/network_manager/connection_handle.rs | 18 +- .../src/network_manager/connection_manager.rs | 36 +-- .../src/network_manager/connection_table.rs | 10 +- veilid-core/src/network_manager/mod.rs | 152 ++++++----- .../src/network_manager/native/network_tcp.rs | 44 ++- .../native/protocol/sockets.rs | 32 ++- .../network_manager/native/protocol/tcp.rs | 21 +- .../src/network_manager/native/protocol/ws.rs | 15 +- .../network_manager/native/start_protocols.rs | 6 +- .../src/network_manager/network_connection.rs | 31 ++- veilid-core/src/network_manager/tasks.rs | 252 ++++++++++-------- .../src/network_manager/wasm/protocol/ws.rs | 2 +- veilid-core/src/rpc_processor/mod.rs | 14 +- veilid-core/src/tests/common/mod.rs | 1 + .../src/tests/common/test_async_tag_lock.rs | 159 +++++++++++ veilid-core/src/tests/native/mod.rs | 14 + veilid-core/src/veilid_api/mod.rs | 11 + veilid-core/src/xx/async_tag_lock.rs | 126 +++++++++ veilid-core/src/xx/mod.rs | 5 + veilid-core/tests/node.rs | 7 + veilid-core/tests/web.rs | 7 + 21 files changed, 690 insertions(+), 273 deletions(-) create mode 100644 veilid-core/src/tests/common/test_async_tag_lock.rs create mode 100644 veilid-core/src/xx/async_tag_lock.rs diff --git a/veilid-core/src/network_manager/connection_handle.rs b/veilid-core/src/network_manager/connection_handle.rs index aa37070a..9eeb1b63 100644 --- a/veilid-core/src/network_manager/connection_handle.rs +++ b/veilid-core/src/network_manager/connection_handle.rs @@ -4,7 +4,7 @@ use super::*; pub struct ConnectionHandle { id: u64, descriptor: ConnectionDescriptor, - channel: flume::Sender>, + channel: flume::Sender<(Option, Vec)>, } #[derive(Debug)] @@ -17,7 +17,7 @@ impl ConnectionHandle { pub(super) fn new( id: u64, descriptor: ConnectionDescriptor, - channel: flume::Sender>, + channel: flume::Sender<(Option, Vec)>, ) -> Self { Self { id, @@ -34,16 +34,22 @@ impl ConnectionHandle { self.descriptor.clone() } + #[instrument(level="trace", skip(self, message), fields(message.len = message.len()))] pub fn send(&self, message: Vec) -> ConnectionHandleSendResult { - match self.channel.send(message) { + match self.channel.send((Span::current().id(), message)) { Ok(()) => ConnectionHandleSendResult::Sent, - Err(e) => ConnectionHandleSendResult::NotSent(e.0), + Err(e) => ConnectionHandleSendResult::NotSent(e.0 .1), } } + #[instrument(level="trace", skip(self, message), fields(message.len = message.len()))] pub async fn send_async(&self, message: Vec) -> ConnectionHandleSendResult { - match self.channel.send_async(message).await { + match self + .channel + .send_async((Span::current().id(), message)) + .await + { Ok(()) => ConnectionHandleSendResult::Sent, - Err(e) => ConnectionHandleSendResult::NotSent(e.0), + Err(e) => ConnectionHandleSendResult::NotSent(e.0 .1), } } } diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index 7686425f..2bceb171 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -140,6 +140,7 @@ impl ConnectionManager { // Internal routine to register new connection atomically. // Registers connection in the connection table for later access // and spawns a message processing loop for the connection + #[instrument(level = "trace", skip(self, inner), ret, err)] fn on_new_protocol_network_connection( &self, inner: &mut ConnectionManagerInner, @@ -195,6 +196,7 @@ impl ConnectionManager { } // Returns a network connection if one already is established + #[instrument(level = "trace", skip(self), ret)] pub fn get_connection(&self, descriptor: ConnectionDescriptor) -> Option { self.arc .connection_table @@ -234,40 +236,43 @@ impl ConnectionManager { did_kill } - // Called when we want to create a new connection or get the current one that already exists - // This will kill off any connections that are in conflict with the new connection to be made - // in order to make room for the new connection in the system's connection table - // This routine needs to be atomic, or connections may exist in the table that are not established + /// Locak remote address + // async fn lock_remote_address(&self, remote_addr: SocketAddr) -> { + + // } + + /// Called when we want to create a new connection or get the current one that already exists + /// This will kill off any connections that are in conflict with the new connection to be made + /// in order to make room for the new connection in the system's connection table + /// This routine needs to be atomic, or connections may exist in the table that are not established + #[instrument(level = "trace", skip(self), ret, err)] pub async fn get_or_create_connection( &self, local_addr: Option, dial_info: DialInfo, ) -> EyreResult> { - log_net!( + warn!( "== get_or_create_connection local_addr={:?} dial_info={:?}", local_addr.green(), dial_info.green() ); + // Make a connection descriptor for this dialinfo + let peer_address = dial_info.to_peer_address(); + + // Async lock on the remote address for atomicity + //let _lock_guard = self.lock_remote_address(peer_address.to_socket_addr()); + // Kill off any possibly conflicting connections let did_kill = self.kill_off_colliding_connections(&dial_info).await; let mut retry_count = if did_kill { 2 } else { 0 }; - // Make a connection descriptor for this dialinfo - let peer_address = dial_info.to_peer_address(); - let descriptor = match local_addr { - Some(la) => { - ConnectionDescriptor::new(peer_address, SocketAddress::from_socket_addr(la)) - } - None => ConnectionDescriptor::new_no_local(peer_address), - }; - // If any connection to this remote exists that has the same protocol, return it // Any connection will do, we don't have to match the local address if let Some(conn) = self .arc .connection_table - .get_last_connection_by_remote(descriptor.remote()) + .get_last_connection_by_remote(peer_address) { log_net!( "== Returning existing connection local_addr={:?} peer_address={:?}", @@ -378,6 +383,7 @@ impl ConnectionManager { // Callback from network connection receive loop when it exits // cleans up the entry in the connection table + #[instrument(level = "trace", skip(self))] pub(super) async fn report_connection_finished(&self, connection_id: u64) { // Get channel sender let sender = { diff --git a/veilid-core/src/network_manager/connection_table.rs b/veilid-core/src/network_manager/connection_table.rs index 3ed4d486..c16214f3 100644 --- a/veilid-core/src/network_manager/connection_table.rs +++ b/veilid-core/src/network_manager/connection_table.rs @@ -72,6 +72,7 @@ impl ConnectionTable { } } + #[instrument(level = "trace", skip(self))] pub async fn join(&self) { let mut unord = { let mut inner = self.inner.lock(); @@ -90,6 +91,7 @@ impl ConnectionTable { while unord.next().await.is_some() {} } + #[instrument(level = "trace", skip(self), ret, err)] pub fn add_connection( &self, network_connection: NetworkConnection, @@ -156,6 +158,7 @@ impl ConnectionTable { Ok(out_conn) } + #[instrument(level = "trace", skip(self), ret)] pub fn get_connection_by_id(&self, id: NetworkConnectionId) -> Option { let mut inner = self.inner.lock(); let protocol_index = *inner.protocol_index_by_id.get(&id)?; @@ -163,6 +166,7 @@ impl ConnectionTable { Some(out.get_handle()) } + #[instrument(level = "trace", skip(self), ret)] pub fn get_connection_by_descriptor( &self, descriptor: ConnectionDescriptor, @@ -175,6 +179,7 @@ impl ConnectionTable { Some(out.get_handle()) } + #[instrument(level = "trace", skip(self), ret)] pub fn get_last_connection_by_remote(&self, remote: PeerAddress) -> Option { let mut inner = self.inner.lock(); @@ -184,7 +189,8 @@ impl ConnectionTable { Some(out.get_handle()) } - pub fn _get_connection_ids_by_remote(&self, remote: PeerAddress) -> Vec { + #[instrument(level = "trace", skip(self), ret)] + pub fn get_connection_ids_by_remote(&self, remote: PeerAddress) -> Vec { let inner = self.inner.lock(); inner .ids_by_remote @@ -219,6 +225,7 @@ impl ConnectionTable { inner.conn_by_id.iter().fold(0, |acc, c| acc + c.len()) } + #[instrument(level = "trace", skip(inner), ret)] fn remove_connection_records( inner: &mut ConnectionTableInner, id: NetworkConnectionId, @@ -251,6 +258,7 @@ impl ConnectionTable { conn } + #[instrument(level = "trace", skip(self), ret)] pub fn remove_connection_by_id(&self, id: NetworkConnectionId) -> Option { let mut inner = self.inner.lock(); diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 3fe83c63..9b2d9f18 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -1321,88 +1321,93 @@ impl NetworkManager { data: Vec, ) -> SendPinBoxFuture>> { let this = self.clone(); - Box::pin(async move { - // info!("{}", format!("send_data to: {:?}", node_ref).red()); + Box::pin( + async move { + // info!("{}", format!("send_data to: {:?}", node_ref).red()); - // First try to send data to the last socket we've seen this peer on - let data = if let Some(connection_descriptor) = node_ref.last_connection() { - // info!( - // "{}", - // format!("last_connection to: {:?}", connection_descriptor).red() - // ); + // First try to send data to the last socket we've seen this peer on + let data = if let Some(connection_descriptor) = node_ref.last_connection() { + // info!( + // "{}", + // format!("last_connection to: {:?}", connection_descriptor).red() + // ); - match this - .net() - .send_data_to_existing_connection(connection_descriptor, data) - .await? - { - None => { - // info!( - // "{}", - // format!("sent to existing connection: {:?}", connection_descriptor) - // .red() - // ); + match this + .net() + .send_data_to_existing_connection(connection_descriptor, data) + .await? + { + None => { + // info!( + // "{}", + // format!("sent to existing connection: {:?}", connection_descriptor) + // .red() + // ); - // Update timestamp for this last connection since we just sent to it + // Update timestamp for this last connection since we just sent to it + node_ref + .set_last_connection(connection_descriptor, intf::get_timestamp()); + + return Ok(NetworkResult::value(SendDataKind::Existing( + connection_descriptor, + ))); + } + Some(d) => d, + } + } else { + data + }; + + // info!("{}", "no existing connection".red()); + + // If we don't have last_connection, try to reach out to the peer via its dial info + let contact_method = this.get_contact_method(node_ref.clone()); + log_net!( + "send_data via {:?} to dialinfo {:?}", + contact_method, + node_ref + ); + match contact_method { + ContactMethod::OutboundRelay(relay_nr) + | ContactMethod::InboundRelay(relay_nr) => { + network_result_try!(this.send_data(relay_nr, data).await?); + Ok(NetworkResult::value(SendDataKind::Indirect)) + } + ContactMethod::Direct(dial_info) => { + let connection_descriptor = network_result_try!( + this.net().send_data_to_dial_info(dial_info, data).await? + ); + // If we connected to this node directly, save off the last connection so we can use it again node_ref.set_last_connection(connection_descriptor, intf::get_timestamp()); - return Ok(NetworkResult::value(SendDataKind::Existing( + Ok(NetworkResult::value(SendDataKind::Direct( connection_descriptor, - ))); + ))) } - Some(d) => d, + ContactMethod::SignalReverse(relay_nr, target_node_ref) => { + let connection_descriptor = network_result_try!( + this.do_reverse_connect(relay_nr, target_node_ref, data) + .await? + ); + Ok(NetworkResult::value(SendDataKind::Direct( + connection_descriptor, + ))) + } + ContactMethod::SignalHolePunch(relay_nr, target_node_ref) => { + let connection_descriptor = network_result_try!( + this.do_hole_punch(relay_nr, target_node_ref, data).await? + ); + Ok(NetworkResult::value(SendDataKind::Direct( + connection_descriptor, + ))) + } + ContactMethod::Unreachable => Ok(NetworkResult::no_connection_other( + "Can't send to this node", + )), } - } else { - data - }; - - // info!("{}", "no existing connection".red()); - - // If we don't have last_connection, try to reach out to the peer via its dial info - let contact_method = this.get_contact_method(node_ref.clone()); - log_net!( - "send_data via {:?} to dialinfo {:?}", - contact_method, - node_ref - ); - match contact_method { - ContactMethod::OutboundRelay(relay_nr) | ContactMethod::InboundRelay(relay_nr) => { - network_result_try!(this.send_data(relay_nr, data).await?); - Ok(NetworkResult::value(SendDataKind::Indirect)) - } - ContactMethod::Direct(dial_info) => { - let connection_descriptor = network_result_try!( - this.net().send_data_to_dial_info(dial_info, data).await? - ); - // If we connected to this node directly, save off the last connection so we can use it again - node_ref.set_last_connection(connection_descriptor, intf::get_timestamp()); - - Ok(NetworkResult::value(SendDataKind::Direct( - connection_descriptor, - ))) - } - ContactMethod::SignalReverse(relay_nr, target_node_ref) => { - let connection_descriptor = network_result_try!( - this.do_reverse_connect(relay_nr, target_node_ref, data) - .await? - ); - Ok(NetworkResult::value(SendDataKind::Direct( - connection_descriptor, - ))) - } - ContactMethod::SignalHolePunch(relay_nr, target_node_ref) => { - let connection_descriptor = network_result_try!( - this.do_hole_punch(relay_nr, target_node_ref, data).await? - ); - Ok(NetworkResult::value(SendDataKind::Direct( - connection_descriptor, - ))) - } - ContactMethod::Unreachable => Ok(NetworkResult::no_connection_other( - "Can't send to this node", - )), } - }) + .instrument(trace_span!("send_data")), + ) } // Direct bootstrap request handler (separate fallback mechanism from cheaper TXT bootstrap mechanism) @@ -1466,6 +1471,7 @@ impl NetworkManager { // Called when a packet potentially containing an RPC envelope is received by a low-level // network protocol handler. Processes the envelope, authenticates and decrypts the RPC message // and passes it to the RPC handler + #[instrument(level = "trace", ret, err, skip(self, data), fields(data.len = data.len()))] async fn on_recv_envelope( &self, data: &[u8], diff --git a/veilid-core/src/network_manager/native/network_tcp.rs b/veilid-core/src/network_manager/native/network_tcp.rs index ca2e3792..806abc2c 100644 --- a/veilid-core/src/network_manager/native/network_tcp.rs +++ b/veilid-core/src/network_manager/native/network_tcp.rs @@ -42,7 +42,8 @@ impl Network { &self, tls_acceptor: &TlsAcceptor, stream: AsyncPeekStream, - addr: SocketAddr, + peer_addr: SocketAddr, + local_addr: SocketAddr, protocol_handlers: &[Box], tls_connection_initial_timeout_ms: u32, ) -> EyreResult> { @@ -65,18 +66,20 @@ impl Network { .wrap_err("tls initial timeout")? .wrap_err("failed to peek tls stream")?; - self.try_handlers(ps, addr, protocol_handlers).await + self.try_handlers(ps, peer_addr, local_addr, protocol_handlers) + .await } async fn try_handlers( &self, stream: AsyncPeekStream, - addr: SocketAddr, + peer_addr: SocketAddr, + local_addr: SocketAddr, protocol_accept_handlers: &[Box], ) -> EyreResult> { for ah in protocol_accept_handlers.iter() { if let Some(nc) = ah - .on_accept(stream.clone(), addr) + .on_accept(stream.clone(), peer_addr, local_addr) .await .wrap_err("io error")? { @@ -105,21 +108,35 @@ impl Network { } }; + // XXX + warn!( + "DEBUGACCEPT: local={} remote={}", + tcp_stream.local_addr().unwrap(), + tcp_stream.peer_addr().unwrap(), + ); + let listener_state = listener_state.clone(); let connection_manager = connection_manager.clone(); // Limit the number of connections from the same IP address // and the number of total connections - let addr = match tcp_stream.peer_addr() { + let peer_addr = match tcp_stream.peer_addr() { Ok(addr) => addr, Err(e) => { log_net!(debug "failed to get peer address: {}", e); return; } }; + let local_addr = match tcp_stream.local_addr() { + Ok(addr) => addr, + Err(e) => { + log_net!(debug "failed to get local address: {}", e); + return; + } + }; // XXX limiting here instead for connection table? may be faster and avoids tls negotiation - log_net!("TCP connection from: {}", addr); + log_net!("TCP connection from: {}", peer_addr); // Create a stream we can peek on #[cfg(feature = "rt-tokio")] @@ -139,7 +156,7 @@ impl Network { { // If we fail to get a packet within the connection initial timeout // then we punt this connection - log_net!("connection initial timeout from: {:?}", addr); + log_net!("connection initial timeout from: {:?}", peer_addr); return; } @@ -152,29 +169,30 @@ impl Network { self.try_tls_handlers( ls.tls_acceptor.as_ref().unwrap(), ps, - addr, + peer_addr, + local_addr, &ls.tls_protocol_handlers, tls_connection_initial_timeout_ms, ) .await } else { - self.try_handlers(ps, addr, &ls.protocol_accept_handlers) + self.try_handlers(ps, peer_addr, local_addr, &ls.protocol_accept_handlers) .await }; let conn = match conn { Ok(Some(c)) => { - log_net!("protocol handler found for {:?}: {:?}", addr, c); + log_net!("protocol handler found for {:?}: {:?}", peer_addr, c); c } Ok(None) => { // No protocol handlers matched? drop it. - log_net!(debug "no protocol handler for connection from {:?}", addr); + log_net!(debug "no protocol handler for connection from {:?}", peer_addr); return; } Err(e) => { // Failed to negotiate connection? drop it. - log_net!(debug "failed to negotiate connection from {:?}: {}", addr, e); + log_net!(debug "failed to negotiate connection from {:?}: {}", peer_addr, e); return; } }; @@ -311,7 +329,6 @@ impl Network { .push(new_protocol_accept_handler( self.network_manager().config(), true, - addr, )); } else { ls.write() @@ -319,7 +336,6 @@ impl Network { .push(new_protocol_accept_handler( self.network_manager().config(), false, - addr, )); } diff --git a/veilid-core/src/network_manager/native/protocol/sockets.rs b/veilid-core/src/network_manager/native/protocol/sockets.rs index dd23bf45..0650d686 100644 --- a/veilid-core/src/network_manager/native/protocol/sockets.rs +++ b/veilid-core/src/network_manager/native/protocol/sockets.rs @@ -34,6 +34,7 @@ cfg_if! { } } +#[instrument(level = "trace", ret, err)] pub fn new_unbound_shared_udp_socket(domain: Domain) -> io::Result { let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?; if domain == Domain::IPV6 { @@ -49,6 +50,7 @@ pub fn new_unbound_shared_udp_socket(domain: Domain) -> io::Result { Ok(socket) } +#[instrument(level = "trace", ret, err)] pub fn new_bound_shared_udp_socket(local_address: SocketAddr) -> io::Result { let domain = Domain::for_address(local_address); let socket = new_unbound_shared_udp_socket(domain)?; @@ -60,6 +62,7 @@ pub fn new_bound_shared_udp_socket(local_address: SocketAddr) -> io::Result io::Result { let domain = Domain::for_address(local_address); let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?; @@ -93,6 +96,7 @@ pub fn new_bound_first_udp_socket(local_address: SocketAddr) -> io::Result io::Result { let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?; if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) { @@ -114,6 +118,7 @@ pub fn new_unbound_shared_tcp_socket(domain: Domain) -> io::Result { Ok(socket) } +#[instrument(level = "trace", ret, err)] pub fn new_bound_shared_tcp_socket(local_address: SocketAddr) -> io::Result { let domain = Domain::for_address(local_address); let socket = new_unbound_shared_tcp_socket(domain)?; @@ -125,6 +130,7 @@ pub fn new_bound_shared_tcp_socket(local_address: SocketAddr) -> io::Result io::Result { let domain = Domain::for_address(local_address); @@ -166,6 +172,7 @@ pub fn new_bound_first_tcp_socket(local_address: SocketAddr) -> io::Result Ok(()), @@ -184,7 +194,27 @@ pub async fn nonblocking_connect( Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => Ok(()), Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => Ok(()), Err(e) => Err(e), - }?; + } + .map_err(|e| { + // XXX + warn!( + "DEBUGCONNECT XXXFAILXXX: bind={} local={} remote={}\nbacktrace={:?}", + bind_local_addr, + socket.local_addr().unwrap().as_socket().unwrap(), + addr, + backtrace::Backtrace::new(), + ); + e + })?; + + // XXX + warn!( + "DEBUGCONNECT: bind={} local={} remote={}\nbacktrace={:?}", + bind_local_addr, + socket.local_addr().unwrap().as_socket().unwrap(), + addr, + backtrace::Backtrace::new(), + ); let async_stream = Async::new(std::net::TcpStream::from(socket))?; diff --git a/veilid-core/src/network_manager/native/protocol/tcp.rs b/veilid-core/src/network_manager/native/protocol/tcp.rs index d0f843e0..91167b69 100644 --- a/veilid-core/src/network_manager/native/protocol/tcp.rs +++ b/veilid-core/src/network_manager/native/protocol/tcp.rs @@ -99,30 +99,20 @@ impl RawTcpNetworkConnection { /////////////////////////////////////////////////////////// /// -struct RawTcpProtocolHandlerInner { - local_address: SocketAddr, -} - #[derive(Clone)] pub struct RawTcpProtocolHandler where Self: ProtocolAcceptHandler, { connection_initial_timeout_ms: u32, - inner: Arc>, } impl RawTcpProtocolHandler { - fn new_inner(local_address: SocketAddr) -> RawTcpProtocolHandlerInner { - RawTcpProtocolHandlerInner { local_address } - } - - pub fn new(config: VeilidConfig, local_address: SocketAddr) -> Self { + pub fn new(config: VeilidConfig) -> Self { let c = config.get(); let connection_initial_timeout_ms = c.network.connection_initial_timeout_ms; Self { connection_initial_timeout_ms, - inner: Arc::new(Mutex::new(Self::new_inner(local_address))), } } @@ -131,6 +121,7 @@ impl RawTcpProtocolHandler { self, ps: AsyncPeekStream, socket_addr: SocketAddr, + local_addr: SocketAddr, ) -> io::Result> { log_net!("TCP: on_accept_async: enter"); let mut peekbuf: [u8; PEEK_DETECT_LEN] = [0u8; PEEK_DETECT_LEN]; @@ -147,9 +138,8 @@ impl RawTcpProtocolHandler { SocketAddress::from_socket_addr(socket_addr), ProtocolType::TCP, ); - let local_address = self.inner.lock().local_address; let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new( - ConnectionDescriptor::new(peer_addr, SocketAddress::from_socket_addr(local_address)), + ConnectionDescriptor::new(peer_addr, SocketAddress::from_socket_addr(local_addr)), ps, )); @@ -158,7 +148,7 @@ impl RawTcpProtocolHandler { Ok(Some(conn)) } - #[instrument(level = "trace", err)] + #[instrument(level = "trace", ret, err)] pub async fn connect( local_address: Option, socket_addr: SocketAddr, @@ -202,7 +192,8 @@ impl ProtocolAcceptHandler for RawTcpProtocolHandler { &self, stream: AsyncPeekStream, peer_addr: SocketAddr, + local_addr: SocketAddr, ) -> SendPinBoxFuture>> { - Box::pin(self.clone().on_accept_async(stream, peer_addr)) + Box::pin(self.clone().on_accept_async(stream, peer_addr, local_addr)) } } diff --git a/veilid-core/src/network_manager/native/protocol/ws.rs b/veilid-core/src/network_manager/native/protocol/ws.rs index a0b48b42..520cd344 100644 --- a/veilid-core/src/network_manager/native/protocol/ws.rs +++ b/veilid-core/src/network_manager/native/protocol/ws.rs @@ -129,7 +129,6 @@ where /// struct WebsocketProtocolHandlerArc { tls: bool, - local_address: SocketAddr, request_path: Vec, connection_initial_timeout_ms: u32, } @@ -142,7 +141,7 @@ where arc: Arc, } impl WebsocketProtocolHandler { - pub fn new(config: VeilidConfig, tls: bool, local_address: SocketAddr) -> Self { + pub fn new(config: VeilidConfig, tls: bool) -> Self { let c = config.get(); let path = if tls { format!("GET /{}", c.network.protocol.ws.path.trim_end_matches('/')) @@ -158,7 +157,6 @@ impl WebsocketProtocolHandler { Self { arc: Arc::new(WebsocketProtocolHandlerArc { tls, - local_address, request_path: path.as_bytes().to_vec(), connection_initial_timeout_ms, }), @@ -170,6 +168,7 @@ impl WebsocketProtocolHandler { self, ps: AsyncPeekStream, socket_addr: SocketAddr, + local_addr: SocketAddr, ) -> io::Result> { log_net!("WS: on_accept_async: enter"); let request_path_len = self.arc.request_path.len() + 2; @@ -209,10 +208,7 @@ impl WebsocketProtocolHandler { PeerAddress::new(SocketAddress::from_socket_addr(socket_addr), protocol_type); let conn = ProtocolNetworkConnection::WsAccepted(WebsocketNetworkConnection::new( - ConnectionDescriptor::new( - peer_addr, - SocketAddress::from_socket_addr(self.arc.local_address), - ), + ConnectionDescriptor::new(peer_addr, SocketAddress::from_socket_addr(local_addr)), ws_stream, )); @@ -221,7 +217,7 @@ impl WebsocketProtocolHandler { Ok(Some(conn)) } - #[instrument(level = "trace", err)] + #[instrument(level = "trace", ret, err)] pub async fn connect( local_address: Option, dial_info: &DialInfo, @@ -296,7 +292,8 @@ impl ProtocolAcceptHandler for WebsocketProtocolHandler { &self, stream: AsyncPeekStream, peer_addr: SocketAddr, + local_addr: SocketAddr, ) -> SendPinBoxFuture>> { - Box::pin(self.clone().on_accept_async(stream, peer_addr)) + Box::pin(self.clone().on_accept_async(stream, peer_addr, local_addr)) } } diff --git a/veilid-core/src/network_manager/native/start_protocols.rs b/veilid-core/src/network_manager/native/start_protocols.rs index bf862fa4..36f285e4 100644 --- a/veilid-core/src/network_manager/native/start_protocols.rs +++ b/veilid-core/src/network_manager/native/start_protocols.rs @@ -387,7 +387,7 @@ impl Network { ip_addrs, ws_port, false, - Box::new(|c, t, a| Box::new(WebsocketProtocolHandler::new(c, t, a))), + Box::new(|c, t| Box::new(WebsocketProtocolHandler::new(c, t))), ) .await?; trace!("WS: listener started on {:#?}", socket_addresses); @@ -496,7 +496,7 @@ impl Network { ip_addrs, wss_port, true, - Box::new(|c, t, a| Box::new(WebsocketProtocolHandler::new(c, t, a))), + Box::new(|c, t| Box::new(WebsocketProtocolHandler::new(c, t))), ) .await?; trace!("WSS: listener started on {:#?}", socket_addresses); @@ -590,7 +590,7 @@ impl Network { ip_addrs, tcp_port, false, - Box::new(move |c, _, a| Box::new(RawTcpProtocolHandler::new(c, a))), + Box::new(move |c, _| Box::new(RawTcpProtocolHandler::new(c))), ) .await?; trace!("TCP: listener started on {:#?}", socket_addresses); diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index 98d007e2..b9658b4a 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -16,6 +16,7 @@ cfg_if::cfg_if! { &self, stream: AsyncPeekStream, peer_addr: SocketAddr, + local_addr: SocketAddr, ) -> SendPinBoxFuture>>; } @@ -38,7 +39,7 @@ cfg_if::cfg_if! { } pub type NewProtocolAcceptHandler = - dyn Fn(VeilidConfig, bool, SocketAddr) -> Box + Send; + dyn Fn(VeilidConfig, bool) -> Box + Send; } } /////////////////////////////////////////////////////////// @@ -91,7 +92,7 @@ pub struct NetworkConnection { processor: Option>, established_time: u64, stats: Arc>, - sender: flume::Sender>, + sender: flume::Sender<(Option, Vec)>, stop_source: Option, } @@ -120,9 +121,6 @@ impl NetworkConnection { protocol_connection: ProtocolNetworkConnection, connection_id: NetworkConnectionId, ) -> Self { - // Get timeout - let network_manager = connection_manager.network_manager(); - // Get descriptor let descriptor = protocol_connection.descriptor(); @@ -181,6 +179,7 @@ impl NetworkConnection { } } + #[instrument(level="trace", skip(message, stats), fields(message.len = message.len()), ret, err)] async fn send_internal( protocol_connection: &ProtocolNetworkConnection, stats: Arc>, @@ -194,6 +193,8 @@ impl NetworkConnection { Ok(NetworkResult::Value(out)) } + + #[instrument(level="trace", skip(stats), fields(ret.len), err)] async fn recv_internal( protocol_connection: &ProtocolNetworkConnection, stats: Arc>, @@ -204,6 +205,8 @@ impl NetworkConnection { let mut stats = stats.lock(); stats.last_message_recv_time.max_assign(Some(ts)); + tracing::Span::current().record("ret.len", out.len()); + Ok(NetworkResult::Value(out)) } @@ -223,7 +226,7 @@ impl NetworkConnection { manager_stop_token: StopToken, connection_id: NetworkConnectionId, descriptor: ConnectionDescriptor, - receiver: flume::Receiver>, + receiver: flume::Receiver<(Option, Vec)>, protocol_connection: ProtocolNetworkConnection, stats: Arc>, ) -> SendPinBoxFuture<()> { @@ -249,7 +252,7 @@ impl NetworkConnection { }; let timer = MutableFuture::new(new_timer()); - unord.push(system_boxed(timer.clone())); + unord.push(system_boxed(timer.clone().instrument(Span::current()))); loop { // Add another message sender future if necessary @@ -257,13 +260,17 @@ impl NetworkConnection { need_sender = false; let sender_fut = receiver.recv_async().then(|res| async { match res { - Ok(message) => { + Ok((span_id, message)) => { + + let recv_span = span!(parent: None, Level::TRACE, "process_connection recv"); + recv_span.follows_from(span_id); + // send the packet if let Err(e) = Self::send_internal( &protocol_connection, stats.clone(), message, - ) + ).instrument(recv_span) .await { // Sending the packet along can fail, if so, this connection is dead @@ -280,7 +287,7 @@ impl NetworkConnection { } } }); - unord.push(system_boxed(sender_fut)); + unord.push(system_boxed(sender_fut.instrument(Span::current()))); } // Add another message receiver future if necessary @@ -314,7 +321,7 @@ impl NetworkConnection { } }); - unord.push(system_boxed(receiver_fut)); + unord.push(system_boxed(receiver_fut.instrument(Span::current()))); } // Process futures @@ -358,7 +365,7 @@ impl NetworkConnection { connection_manager .report_connection_finished(connection_id) .await; - }) + }.instrument(trace_span!("process_connection"))) } } diff --git a/veilid-core/src/network_manager/tasks.rs b/veilid-core/src/network_manager/tasks.rs index bc571210..30014fca 100644 --- a/veilid-core/src/network_manager/tasks.rs +++ b/veilid-core/src/network_manager/tasks.rs @@ -39,119 +39,125 @@ impl NetworkManager { // Get bootstrap nodes from hostnames concurrently let mut unord = FuturesUnordered::new(); for bsname in bsnames { - unord.push(async move { - // look up boostrap node txt records - let bsnirecords = match intf::txt_lookup(&bsname).await { - Err(e) => { - warn!("bootstrap node txt lookup failed for {}: {}", bsname, e); - return None; - } - Ok(v) => v, - }; - // for each record resolve into key/bootstraprecord pairs - let mut bootstrap_records: Vec<(DHTKey, BootstrapRecord)> = Vec::new(); - for bsnirecord in bsnirecords { - // Bootstrap TXT Record Format Version 0: - // txt_version,min_version,max_version,nodeid,hostname,dialinfoshort* - // - // Split bootstrap node record by commas. Example: - // 0,0,0,7lxDEabK_qgjbe38RtBa3IZLrud84P6NhGP-pRTZzdQ,bootstrap-1.dev.veilid.net,T5150,U5150,W5150/ws - let records: Vec = bsnirecord - .trim() - .split(',') - .map(|x| x.trim().to_owned()) - .collect(); - if records.len() < 6 { - warn!("invalid number of fields in bootstrap txt record"); - continue; - } - - // Bootstrap TXT record version - let txt_version: u8 = match records[0].parse::() { - Ok(v) => v, + unord.push( + async move { + // look up boostrap node txt records + let bsnirecords = match intf::txt_lookup(&bsname).await { Err(e) => { - warn!( + warn!("bootstrap node txt lookup failed for {}: {}", bsname, e); + return None; + } + Ok(v) => v, + }; + // for each record resolve into key/bootstraprecord pairs + let mut bootstrap_records: Vec<(DHTKey, BootstrapRecord)> = Vec::new(); + for bsnirecord in bsnirecords { + // Bootstrap TXT Record Format Version 0: + // txt_version,min_version,max_version,nodeid,hostname,dialinfoshort* + // + // Split bootstrap node record by commas. Example: + // 0,0,0,7lxDEabK_qgjbe38RtBa3IZLrud84P6NhGP-pRTZzdQ,bootstrap-1.dev.veilid.net,T5150,U5150,W5150/ws + let records: Vec = bsnirecord + .trim() + .split(',') + .map(|x| x.trim().to_owned()) + .collect(); + if records.len() < 6 { + warn!("invalid number of fields in bootstrap txt record"); + continue; + } + + // Bootstrap TXT record version + let txt_version: u8 = match records[0].parse::() { + Ok(v) => v, + Err(e) => { + warn!( "invalid txt_version specified in bootstrap node txt record: {}", e ); + continue; + } + }; + if txt_version != BOOTSTRAP_TXT_VERSION { + warn!("unsupported bootstrap txt record version"); continue; } - }; - if txt_version != BOOTSTRAP_TXT_VERSION { - warn!("unsupported bootstrap txt record version"); - continue; - } - // Min/Max wire protocol version - let min_version: u8 = match records[1].parse::() { - Ok(v) => v, - Err(e) => { - warn!( + // Min/Max wire protocol version + let min_version: u8 = match records[1].parse::() { + Ok(v) => v, + Err(e) => { + warn!( "invalid min_version specified in bootstrap node txt record: {}", e ); - continue; - } - }; - let max_version: u8 = match records[2].parse::() { - Ok(v) => v, - Err(e) => { - warn!( + continue; + } + }; + let max_version: u8 = match records[2].parse::() { + Ok(v) => v, + Err(e) => { + warn!( "invalid max_version specified in bootstrap node txt record: {}", e ); - continue; - } - }; - - // Node Id - let node_id_str = &records[3]; - let node_id_key = match DHTKey::try_decode(node_id_str) { - Ok(v) => v, - Err(e) => { - warn!( - "Invalid node id in bootstrap node record {}: {}", - node_id_str, e - ); - continue; - } - }; - - // Hostname - let hostname_str = &records[4]; - - // If this is our own node id, then we skip it for bootstrap, in case we are a bootstrap node - if self.routing_table().node_id() == node_id_key { - continue; - } - - // Resolve each record and store in node dial infos list - let mut bootstrap_record = BootstrapRecord { - min_version, - max_version, - dial_info_details: Vec::new(), - }; - for rec in &records[5..] { - let rec = rec.trim(); - let dial_infos = match DialInfo::try_vec_from_short(rec, hostname_str) { - Ok(dis) => dis, - Err(e) => { - warn!("Couldn't resolve bootstrap node dial info {}: {}", rec, e); continue; } }; - for di in dial_infos { - bootstrap_record.dial_info_details.push(DialInfoDetail { - dial_info: di, - class: DialInfoClass::Direct, - }); + // Node Id + let node_id_str = &records[3]; + let node_id_key = match DHTKey::try_decode(node_id_str) { + Ok(v) => v, + Err(e) => { + warn!( + "Invalid node id in bootstrap node record {}: {}", + node_id_str, e + ); + continue; + } + }; + + // Hostname + let hostname_str = &records[4]; + + // If this is our own node id, then we skip it for bootstrap, in case we are a bootstrap node + if self.routing_table().node_id() == node_id_key { + continue; } + + // Resolve each record and store in node dial infos list + let mut bootstrap_record = BootstrapRecord { + min_version, + max_version, + dial_info_details: Vec::new(), + }; + for rec in &records[5..] { + let rec = rec.trim(); + let dial_infos = match DialInfo::try_vec_from_short(rec, hostname_str) { + Ok(dis) => dis, + Err(e) => { + warn!( + "Couldn't resolve bootstrap node dial info {}: {}", + rec, e + ); + continue; + } + }; + + for di in dial_infos { + bootstrap_record.dial_info_details.push(DialInfoDetail { + dial_info: di, + class: DialInfoClass::Direct, + }); + } + } + bootstrap_records.push((node_id_key, bootstrap_record)); } - bootstrap_records.push((node_id_key, bootstrap_record)); + Some(bootstrap_records) } - Some(bootstrap_records) - }); + .instrument(Span::current()), + ); } let mut bsmap = BootstrapRecordMap::new(); @@ -172,6 +178,7 @@ impl NetworkManager { } // 'direct' bootstrap task routine for systems incapable of resolving TXT records, such as browser WASM + #[instrument(level = "trace", skip(self), err)] pub(super) async fn direct_bootstrap_task_routine( self, stop_token: StopToken, @@ -201,7 +208,8 @@ impl NetworkManager { let routing_table = routing_table.clone(); unord.push( // lets ask bootstrap to find ourselves now - async move { routing_table.reverse_find_node(nr, true).await }, + async move { routing_table.reverse_find_node(nr, true).await } + .instrument(Span::current()), ); } } @@ -300,23 +308,26 @@ impl NetworkManager { ) { // Add this our futures to process in parallel let routing_table = routing_table.clone(); - unord.push(async move { - // Need VALID signed peer info, so ask bootstrap to find_node of itself - // which will ensure it has the bootstrap's signed peer info as part of the response - let _ = routing_table.find_target(nr.clone()).await; + unord.push( + async move { + // Need VALID signed peer info, so ask bootstrap to find_node of itself + // which will ensure it has the bootstrap's signed peer info as part of the response + let _ = routing_table.find_target(nr.clone()).await; - // Ensure we got the signed peer info - if !nr.signed_node_info_has_valid_signature(RoutingDomain::PublicInternet) { - log_net!(warn - "bootstrap at {:?} did not return valid signed node info", - nr - ); - // If this node info is invalid, it will time out after being unpingable - } else { - // otherwise this bootstrap is valid, lets ask it to find ourselves now - routing_table.reverse_find_node(nr, true).await + // Ensure we got the signed peer info + if !nr.signed_node_info_has_valid_signature(RoutingDomain::PublicInternet) { + log_net!(warn + "bootstrap at {:?} did not return valid signed node info", + nr + ); + // If this node info is invalid, it will time out after being unpingable + } else { + // otherwise this bootstrap is valid, lets ask it to find ourselves now + routing_table.reverse_find_node(nr, true).await + } } - }); + .instrument(Span::current()), + ); } } @@ -382,7 +393,11 @@ impl NetworkManager { let nr_filtered = nr.filtered_clone(NodeRefFilter::new().with_dial_info_filter(dif)); log_net!("--> Keepalive ping to {:?}", nr_filtered); - unord.push(async move { rpc.rpc_call_status(nr_filtered).await }.boxed()); + unord.push( + async move { rpc.rpc_call_status(nr_filtered).await } + .instrument(Span::current()) + .boxed(), + ); did_pings = true; } } @@ -392,7 +407,11 @@ impl NetworkManager { // any mapped ports to preserve if !did_pings { let rpc = rpc.clone(); - unord.push(async move { rpc.rpc_call_status(nr).await }.boxed()); + unord.push( + async move { rpc.rpc_call_status(nr).await } + .instrument(Span::current()) + .boxed(), + ); } } @@ -420,7 +439,11 @@ impl NetworkManager { let rpc = rpc.clone(); // Just do a single ping with the best protocol for all the nodes - unord.push(async move { rpc.rpc_call_status(nr).await }.boxed()); + unord.push( + async move { rpc.rpc_call_status(nr).await } + .instrument(Span::current()) + .boxed(), + ); } Ok(()) @@ -479,7 +502,10 @@ impl NetworkManager { ); for nr in noderefs { let routing_table = routing_table.clone(); - ord.push_back(async move { routing_table.reverse_find_node(nr, false).await }); + ord.push_back( + async move { routing_table.reverse_find_node(nr, false).await } + .instrument(Span::current()), + ); } // do peer minimum search in order from fastest to slowest diff --git a/veilid-core/src/network_manager/wasm/protocol/ws.rs b/veilid-core/src/network_manager/wasm/protocol/ws.rs index 1c9df3c3..1b48c59d 100644 --- a/veilid-core/src/network_manager/wasm/protocol/ws.rs +++ b/veilid-core/src/network_manager/wasm/protocol/ws.rs @@ -106,7 +106,7 @@ impl WebsocketNetworkConnection { pub struct WebsocketProtocolHandler {} impl WebsocketProtocolHandler { - #[instrument(level = "trace", err)] + #[instrument(level = "trace", ret, err)] pub async fn connect( dial_info: &DialInfo, timeout_ms: u32, diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 6a6ac7ad..63a36b35 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -147,8 +147,6 @@ pub struct RPCProcessorInner { } pub struct RPCProcessorUnlockedInner { - node_id: DHTKey, - node_id_secret: DHTKeySecret, timeout: u64, queue_size: u32, concurrency: u32, @@ -183,8 +181,6 @@ impl RPCProcessor { ) -> RPCProcessorUnlockedInner { // make local copy of node id for easy access let c = config.get(); - let node_id = c.network.node_id; - let node_id_secret = c.network.node_id_secret; // set up channel let mut concurrency = c.network.rpc.concurrency; @@ -209,8 +205,6 @@ impl RPCProcessor { let validate_dial_info_receipt_time_ms = c.network.dht.validate_dial_info_receipt_time_ms; RPCProcessorUnlockedInner { - node_id, - node_id_secret, timeout, queue_size, concurrency, @@ -947,16 +941,16 @@ impl RPCProcessor { stop_token: StopToken, receiver: flume::Receiver<(Option, RPCMessageEncoded)>, ) { - while let Ok(Ok((_span_id, msg))) = + while let Ok(Ok((span_id, msg))) = receiver.recv_async().timeout_at(stop_token.clone()).await { let rpc_worker_span = span!(parent: None, Level::TRACE, "rpc_worker"); + //let rpc_worker_span = span!(Level::TRACE, "rpc_worker"); // fixme: causes crashes? "Missing otel data span extensions"?? - //rpc_worker_span.follows_from(span_id); - let _enter = rpc_worker_span.enter(); - + rpc_worker_span.follows_from(span_id); let _ = self .process_rpc_message(msg) + .instrument(rpc_worker_span) .await .map_err(logthru_rpc!("couldn't process rpc message")); } diff --git a/veilid-core/src/tests/common/mod.rs b/veilid-core/src/tests/common/mod.rs index eaae93db..537a41a5 100644 --- a/veilid-core/src/tests/common/mod.rs +++ b/veilid-core/src/tests/common/mod.rs @@ -1,3 +1,4 @@ +pub mod test_async_tag_lock; pub mod test_host_interface; pub mod test_protected_store; pub mod test_table_store; diff --git a/veilid-core/src/tests/common/test_async_tag_lock.rs b/veilid-core/src/tests/common/test_async_tag_lock.rs new file mode 100644 index 00000000..f932a457 --- /dev/null +++ b/veilid-core/src/tests/common/test_async_tag_lock.rs @@ -0,0 +1,159 @@ +use crate::xx::*; +use crate::*; + +pub async fn test_simple_no_contention() { + info!("test_simple_no_contention"); + + let table = AsyncTagLockTable::new(); + + let a1 = SocketAddr::new("1.2.3.4".parse().unwrap(), 1234); + let a2 = SocketAddr::new("6.9.6.9".parse().unwrap(), 6969); + + { + let g1 = table.lock_tag(a1).await; + let g2 = table.lock_tag(a2).await; + drop(g2); + drop(g1); + } + + { + let g1 = table.lock_tag(a1).await; + let g2 = table.lock_tag(a2).await; + drop(g1); + drop(g2); + } + + assert_eq!(table.len(), 0); +} + +pub async fn test_simple_single_contention() { + info!("test_simple_single_contention"); + + let table = AsyncTagLockTable::new(); + + let a1 = SocketAddr::new("1.2.3.4".parse().unwrap(), 1234); + + let g1 = table.lock_tag(a1).await; + + println!("locked"); + let t1 = intf::spawn(async move { + // move the guard into the task + let _g1_take = g1; + // hold the guard for a bit + println!("waiting"); + intf::sleep(1000).await; + // release the guard + println!("released"); + }); + + // wait to lock again, will contend until spawned task exits + let _g1_b = table.lock_tag(a1).await; + println!("locked"); + + // Ensure task is joined + t1.await; + + assert_eq!(table.len(), 1); +} + +pub async fn test_simple_double_contention() { + info!("test_simple_double_contention"); + + let table = AsyncTagLockTable::new(); + + let a1 = SocketAddr::new("1.2.3.4".parse().unwrap(), 1234); + let a2 = SocketAddr::new("6.9.6.9".parse().unwrap(), 6969); + + let g1 = table.lock_tag(a1).await; + let g2 = table.lock_tag(a2).await; + + println!("locked"); + let t1 = intf::spawn(async move { + // move the guard into the task + let _g1_take = g1; + // hold the guard for a bit + println!("waiting"); + intf::sleep(1000).await; + // release the guard + println!("released"); + }); + let t2 = intf::spawn(async move { + // move the guard into the task + let _g2_take = g2; + // hold the guard for a bit + println!("waiting"); + intf::sleep(500).await; + // release the guard + println!("released"); + }); + + // wait to lock again, will contend until spawned task exits + let _g1_b = table.lock_tag(a1).await; + // wait to lock again, should complete immediately + let _g2_b = table.lock_tag(a2).await; + + println!("locked"); + + // Ensure tasks are joined + t1.await; + t2.await; + + assert_eq!(table.len(), 2); +} + +pub async fn test_parallel_single_contention() { + info!("test_parallel_single_contention"); + + let table = AsyncTagLockTable::new(); + + let a1 = SocketAddr::new("1.2.3.4".parse().unwrap(), 1234); + + let table1 = table.clone(); + let t1 = intf::spawn(async move { + // lock the tag + let _g = table1.lock_tag(a1).await; + println!("locked t1"); + // hold the guard for a bit + println!("waiting t1"); + intf::sleep(500).await; + // release the guard + println!("released t1"); + }); + + let table2 = table.clone(); + let t2 = intf::spawn(async move { + // lock the tag + let _g = table2.lock_tag(a1).await; + println!("locked t2"); + // hold the guard for a bit + println!("waiting t2"); + intf::sleep(500).await; + // release the guard + println!("released t2"); + }); + + let table3 = table.clone(); + let t3 = intf::spawn(async move { + // lock the tag + let _g = table3.lock_tag(a1).await; + println!("locked t3"); + // hold the guard for a bit + println!("waiting t3"); + intf::sleep(500).await; + // release the guard + println!("released t3"); + }); + + // Ensure tasks are joined + t1.await; + t2.await; + t3.await; + + assert_eq!(table.len(), 0); +} + +pub async fn test_all() { + test_simple_no_contention().await; + test_simple_single_contention().await; + test_parallel_single_contention().await; +} diff --git a/veilid-core/src/tests/native/mod.rs b/veilid-core/src/tests/native/mod.rs index e5ea0e2a..da86b323 100644 --- a/veilid-core/src/tests/native/mod.rs +++ b/veilid-core/src/tests/native/mod.rs @@ -70,6 +70,8 @@ pub fn run_all_tests() { exec_test_crypto(); info!("TEST: exec_test_envelope_receipt"); exec_test_envelope_receipt(); + info!("TEST: exec_test_async_tag_lock"); + exec_test_async_tag_lock(); info!("Finished unit tests"); } @@ -135,6 +137,11 @@ fn exec_test_envelope_receipt() { test_envelope_receipt::test_all().await; }) } +fn exec_test_async_tag_lock() { + block_on(async { + test_async_tag_lock::test_all().await; + }) +} /////////////////////////////////////////////////////////////////////////// cfg_if! { if #[cfg(test)] { @@ -223,5 +230,12 @@ cfg_if! { setup(); exec_test_envelope_receipt(); } + + #[test] + #[serial] + fn run_test_async_tag_lock() { + setup(); + exec_test_async_tag_lock(); + } } } diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 0934ff5c..f330ef44 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -1636,6 +1636,13 @@ impl PeerAddress { } } +/// Represents the 5-tuple of an established connection +/// Not used to specify connections to create, that is reserved for DialInfo +/// +/// ConnectionDescriptors should never be from unspecified local addresses for connection oriented protocols +/// If the medium does not allow local addresses, None should have been used or 'new_no_local' +/// If we are specifying only a port, then the socket's 'local_address()' should have been used, since an +/// established connection is always from a real address to another real address. #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] pub struct ConnectionDescriptor { remote: PeerAddress, @@ -1644,6 +1651,10 @@ pub struct ConnectionDescriptor { impl ConnectionDescriptor { pub fn new(remote: PeerAddress, local: SocketAddress) -> Self { + assert!( + !remote.protocol_type().is_connection_oriented() || !local.address().is_unspecified() + ); + Self { remote, local: Some(local), diff --git a/veilid-core/src/xx/async_tag_lock.rs b/veilid-core/src/xx/async_tag_lock.rs new file mode 100644 index 00000000..9e865935 --- /dev/null +++ b/veilid-core/src/xx/async_tag_lock.rs @@ -0,0 +1,126 @@ +use super::*; +use core::hash::Hash; + +pub struct AsyncTagLockGuard +where + T: Hash + Eq + Clone, +{ + table: AsyncTagLockTable, + tag: T, + _guard: AsyncMutexGuardArc<()>, +} + +impl Drop for AsyncTagLockGuard +where + T: Hash + Eq + Clone, +{ + fn drop(&mut self) { + let mut inner = self.table.inner.lock(); + // Inform the table we're dropping this guard + let waiters = { + // Get the table entry, it must exist since we have a guard locked + let entry = inner.table.get_mut(&self.tag).unwrap(); + // Decrement the number of waiters + entry.waiters -= 1; + // Return the number of waiters left + entry.waiters + }; + // If there are no waiters left, we remove the tag from the table + if waiters == 0 { + inner.table.remove(&self.tag).unwrap(); + } + // Proceed with releasing _guard, which may cause some concurrent tag lock to acquire + } +} + +#[derive(Clone)] +struct AsyncTagLockTableEntry { + mutex: Arc>, + waiters: usize, +} + +struct AsyncTagLockTableInner +where + T: Hash + Eq + Clone, +{ + table: HashMap, +} + +#[derive(Clone)] +pub struct AsyncTagLockTable +where + T: Hash + Eq + Clone, +{ + inner: Arc>>, +} + +impl fmt::Debug for AsyncTagLockTable +where + T: Hash + Eq + Clone, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("AsyncTagLockTable").finish() + } +} + +impl AsyncTagLockTable +where + T: Hash + Eq + Clone, +{ + pub fn new() -> Self { + Self { + inner: Arc::new(Mutex::new(AsyncTagLockTableInner { + table: HashMap::new(), + })), + } + } + + pub fn len(&self) -> usize { + let inner = self.inner.lock(); + inner.table.len() + } + + pub async fn lock_tag(&self, tag: T) -> AsyncTagLockGuard { + // Get or create a tag lock entry + let mutex = { + let mut inner = self.inner.lock(); + + // See if this tag is in the table + // and if not, add a new mutex for this tag + let entry = inner + .table + .entry(tag.clone()) + .or_insert_with(|| AsyncTagLockTableEntry { + mutex: Arc::new(AsyncMutex::new(())), + waiters: 0, + }); + + // Increment the number of waiters + entry.waiters += 1; + + // Return the mutex associated with the tag + entry.mutex.clone() + + // Drop the table guard + }; + + // Lock the tag lock + let guard; + cfg_if! { + if #[cfg(feature="rt-tokio")] { + // tokio version + guard = mutex.lock_owned().await; + } else { + // async_std and wasm async_mutex version + guard = mutex.lock_arc().await; + } + } + + // Return the locked guard + AsyncTagLockGuard { + table: self.clone(), + tag, + _guard: guard, + } + } +} diff --git a/veilid-core/src/xx/mod.rs b/veilid-core/src/xx/mod.rs index 0f72f7b6..d9f48ffd 100644 --- a/veilid-core/src/xx/mod.rs +++ b/veilid-core/src/xx/mod.rs @@ -1,5 +1,6 @@ // mod bump_port; mod async_peek_stream; +mod async_tag_lock; mod clone_stream; mod eventual; mod eventual_base; @@ -68,6 +69,7 @@ cfg_if! { pub use core::ops::{FnOnce, FnMut, Fn}; pub use async_lock::Mutex as AsyncMutex; pub use async_lock::MutexGuard as AsyncMutexGuard; + pub use async_lock::MutexGuardArc as AsyncMutexGuardArc; pub use no_std_net::{ SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, IpAddr, Ipv4Addr, Ipv6Addr }; pub use async_executors::JoinHandle as LowLevelJoinHandle; } else { @@ -98,10 +100,12 @@ cfg_if! { if #[cfg(feature="rt-async-std")] { pub use async_std::sync::Mutex as AsyncMutex; pub use async_std::sync::MutexGuard as AsyncMutexGuard; + pub use async_std::sync::MutexGuardArc as AsyncMutexGuardArc; pub use async_std::task::JoinHandle as LowLevelJoinHandle; } else if #[cfg(feature="rt-tokio")] { pub use tokio::sync::Mutex as AsyncMutex; pub use tokio::sync::MutexGuard as AsyncMutexGuard; + pub use tokio::sync::OwnedMutexGuard as AsyncMutexGuardArc; pub use tokio::task::JoinHandle as LowLevelJoinHandle; } else { #[compile_error("must use an executor")] @@ -113,6 +117,7 @@ cfg_if! { // pub use bump_port::*; pub use async_peek_stream::*; +pub use async_tag_lock::*; pub use clone_stream::*; pub use eventual::*; pub use eventual_base::{EventualCommon, EventualResolvedFuture}; diff --git a/veilid-core/tests/node.rs b/veilid-core/tests/node.rs index 547456c3..da301eeb 100644 --- a/veilid-core/tests/node.rs +++ b/veilid-core/tests/node.rs @@ -74,3 +74,10 @@ async fn run_test_envelope_receipt() { test_envelope_receipt::test_all().await; } + +#[wasm_bindgen_test] +async fn run_test_async_tag_lock() { + setup(); + + test_async_tag_lock::test_all().await; +} diff --git a/veilid-core/tests/web.rs b/veilid-core/tests/web.rs index bc93ae4c..a5ac14b7 100644 --- a/veilid-core/tests/web.rs +++ b/veilid-core/tests/web.rs @@ -78,3 +78,10 @@ async fn run_test_envelope_receipt() { test_envelope_receipt::test_all().await; } + +#[wasm_bindgen_test] +async fn run_test_async_tag_lock() { + setup(); + + test_async_tag_lock::test_all().await; +}