diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index fdd32942..227012d7 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -256,7 +256,7 @@ impl ConnectionManager { // Attempt new connection let conn = loop { - match ProtocolNetworkConnection::connect(local_addr, dial_info.clone()).await { + match ProtocolNetworkConnection::connect(local_addr, &dial_info).await { Ok(v) => break Ok(v), Err(e) => { if retry_count == 0 { diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 9b2db725..839d8395 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -1079,10 +1079,14 @@ impl NetworkManager { }; // Send boot magic to requested peer address let data = BOOT_MAGIC.to_vec(); - let out_data: Vec = self + let out_data: Vec = match self .net() .send_recv_data_unbound_to_dial_info(dial_info, data, timeout_ms) - .await?; + .await? + { + TimeoutOr::Timeout => return Ok(Vec::new()), + TimeoutOr::Value(v) => v, + }; let bootstrap_peerinfo: Vec = deserialize_json(std::str::from_utf8(&out_data).wrap_err("bad utf8 in boot peerinfo")?) diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 15376ea8..c8bebf33 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -288,26 +288,35 @@ impl Network { data: Vec, ) -> EyreResult<()> { let data_len = data.len(); - let res = match dial_info.protocol_type() { + match dial_info.protocol_type() { ProtocolType::UDP => { let peer_socket_addr = dial_info.to_socket_addr(); - RawUdpProtocolHandler::send_unbound_message(peer_socket_addr, data).await + let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr) + .await + .wrap_err("create socket failure")?; + h.send_message(data, peer_socket_addr) + .await + .wrap_err("send message failure")?; } ProtocolType::TCP => { let peer_socket_addr = dial_info.to_socket_addr(); - RawTcpProtocolHandler::send_unbound_message(peer_socket_addr, data).await + let pnc = RawTcpProtocolHandler::connect(None, peer_socket_addr) + .await + .wrap_err("connect failure")?; + pnc.send(data).await.wrap_err("send failure")?; } ProtocolType::WS | ProtocolType::WSS => { - WebsocketProtocolHandler::send_unbound_message(dial_info.clone(), data).await + let pnc = WebsocketProtocolHandler::connect(None, &dial_info) + .await + .wrap_err("connect failure")?; + pnc.send(data).await.wrap_err("send failure")?; } } - .wrap_err("low level network error"); - if res.is_ok() { - // Network accounting - self.network_manager() - .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); - } - res + // Network accounting + self.network_manager() + .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); + + Ok(()) } // Send data to a dial info, unbound, using a new connection from a random port @@ -315,43 +324,94 @@ impl Network { // This creates a short-lived connection in the case of connection-oriented protocols // for the purpose of sending this one message. // This bypasses the connection table as it is not a 'node to node' connection. - #[instrument(level="trace", err, skip(self, data), fields(data.len = data.len(), ret.len))] + #[instrument(level="trace", err, skip(self, data), fields(ret.timeout_or, data.len = data.len()))] pub async fn send_recv_data_unbound_to_dial_info( &self, dial_info: DialInfo, data: Vec, timeout_ms: u32, - ) -> EyreResult> { + ) -> EyreResult>> { let data_len = data.len(); - let out = match dial_info.protocol_type() { + match dial_info.protocol_type() { ProtocolType::UDP => { let peer_socket_addr = dial_info.to_socket_addr(); - RawUdpProtocolHandler::send_recv_unbound_message(peer_socket_addr, data, timeout_ms) - .await? - } - ProtocolType::TCP => { - let peer_socket_addr = dial_info.to_socket_addr(); - RawTcpProtocolHandler::send_recv_unbound_message(peer_socket_addr, data, timeout_ms) - .await? - } - ProtocolType::WS | ProtocolType::WSS => { - WebsocketProtocolHandler::send_recv_unbound_message( - dial_info.clone(), - data, - timeout_ms, - ) - .await? - } - }; + let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr) + .await + .wrap_err("create socket failure")?; + h.send_message(data, peer_socket_addr) + .await + .wrap_err("send message failure")?; + self.network_manager() + .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); - // Network accounting - self.network_manager() - .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); - self.network_manager() - .stats_packet_rcvd(dial_info.to_ip_addr(), out.len() as u64); + // receive single response + let mut out = vec![0u8; MAX_MESSAGE_SIZE]; + let timeout_or_ret = timeout(timeout_ms, h.recv_message(&mut out)) + .await + .into_timeout_or() + .into_result() + .wrap_err("recv_message failure")?; + let (recv_len, recv_addr) = match timeout_or_ret { + TimeoutOr::Value(v) => v, + TimeoutOr::Timeout => { + tracing::Span::current().record("ret.timeout_or", &"Timeout".to_owned()); + return Ok(TimeoutOr::Timeout); + } + }; - tracing::Span::current().record("ret.len", &out.len()); - Ok(out) + let recv_socket_addr = recv_addr.remote_address().to_socket_addr(); + self.network_manager() + .stats_packet_rcvd(recv_socket_addr.ip(), recv_len as u64); + + // if the from address is not the same as the one we sent to, then drop this + if recv_socket_addr != peer_socket_addr { + bail!("wrong address"); + } + out.resize(recv_len, 0u8); + Ok(TimeoutOr::Value(out)) + } + ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS => { + let pnc = match dial_info.protocol_type() { + ProtocolType::UDP => unreachable!(), + ProtocolType::TCP => { + let peer_socket_addr = dial_info.to_socket_addr(); + RawTcpProtocolHandler::connect(None, peer_socket_addr) + .await + .wrap_err("connect failure")? + } + ProtocolType::WS | ProtocolType::WSS => { + WebsocketProtocolHandler::connect(None, &dial_info) + .await + .wrap_err("connect failure")? + } + }; + + pnc.send(data).await.wrap_err("send failure")?; + self.network_manager() + .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); + + let out = timeout(timeout_ms, pnc.recv()) + .await + .into_timeout_or() + .into_result() + .wrap_err("recv failure")?; + + tracing::Span::current().record( + "ret.timeout_or", + &match out { + TimeoutOr::>::Value(ref v) => format!("Value(len={})", v.len()), + TimeoutOr::>::Timeout => "Timeout".to_owned(), + }, + ); + + if let TimeoutOr::Value(out) = &out { + self.network_manager() + .stats_packet_rcvd(dial_info.to_ip_addr(), out.len() as u64); + } + + Ok(out) + } + } } #[instrument(level="trace", err, skip(self, data), fields(data.len = data.len()))] diff --git a/veilid-core/src/network_manager/native/protocol/mod.rs b/veilid-core/src/network_manager/native/protocol/mod.rs index a40c03ca..f1aef2f7 100644 --- a/veilid-core/src/network_manager/native/protocol/mod.rs +++ b/veilid-core/src/network_manager/native/protocol/mod.rs @@ -21,14 +21,14 @@ pub enum ProtocolNetworkConnection { impl ProtocolNetworkConnection { pub async fn connect( local_address: Option, - dial_info: DialInfo, + dial_info: &DialInfo, ) -> io::Result { match dial_info.protocol_type() { ProtocolType::UDP => { panic!("Should not connect to UDP dialinfo"); } ProtocolType::TCP => { - tcp::RawTcpProtocolHandler::connect(local_address, dial_info).await + tcp::RawTcpProtocolHandler::connect(local_address, dial_info.to_socket_addr()).await } ProtocolType::WS | ProtocolType::WSS => { ws::WebsocketProtocolHandler::connect(local_address, dial_info).await @@ -36,53 +36,6 @@ impl ProtocolNetworkConnection { } } - pub async fn send_unbound_message(dial_info: DialInfo, data: Vec) -> io::Result<()> { - match dial_info.protocol_type() { - ProtocolType::UDP => { - let peer_socket_addr = dial_info.to_socket_addr(); - udp::RawUdpProtocolHandler::send_unbound_message(peer_socket_addr, data).await - } - ProtocolType::TCP => { - let peer_socket_addr = dial_info.to_socket_addr(); - tcp::RawTcpProtocolHandler::send_unbound_message(peer_socket_addr, data).await - } - ProtocolType::WS | ProtocolType::WSS => { - ws::WebsocketProtocolHandler::send_unbound_message(dial_info, data).await - } - } - } - - pub async fn send_recv_unbound_message( - dial_info: DialInfo, - data: Vec, - timeout_ms: u32, - ) -> io::Result> { - match dial_info.protocol_type() { - ProtocolType::UDP => { - let peer_socket_addr = dial_info.to_socket_addr(); - udp::RawUdpProtocolHandler::send_recv_unbound_message( - peer_socket_addr, - data, - timeout_ms, - ) - .await - } - ProtocolType::TCP => { - let peer_socket_addr = dial_info.to_socket_addr(); - tcp::RawTcpProtocolHandler::send_recv_unbound_message( - peer_socket_addr, - data, - timeout_ms, - ) - .await - } - ProtocolType::WS | ProtocolType::WSS => { - ws::WebsocketProtocolHandler::send_recv_unbound_message(dial_info, data, timeout_ms) - .await - } - } - } - pub fn descriptor(&self) -> ConnectionDescriptor { match self { Self::Dummy(d) => d.descriptor(), diff --git a/veilid-core/src/network_manager/native/protocol/tcp.rs b/veilid-core/src/network_manager/native/protocol/tcp.rs index 0a997b3b..959b5c34 100644 --- a/veilid-core/src/network_manager/native/protocol/tcp.rs +++ b/veilid-core/src/network_manager/native/protocol/tcp.rs @@ -58,7 +58,7 @@ impl RawTcpNetworkConnection { Self::send_internal(&mut stream, message).await } - pub async fn recv_internal(stream: &mut AsyncPeekStream) -> io::Result> { + async fn recv_internal(stream: &mut AsyncPeekStream) -> io::Result> { let mut header = [0u8; 4]; stream.read_exact(&mut header).await?; @@ -141,21 +141,16 @@ impl RawTcpProtocolHandler { #[instrument(level = "trace", err)] pub async fn connect( local_address: Option, - dial_info: DialInfo, + socket_addr: SocketAddr, ) -> io::Result { - // Get remote socket address to connect to - let remote_socket_addr = dial_info.to_socket_addr(); - // Make a shared socket let socket = match local_address { Some(a) => new_bound_shared_tcp_socket(a)?, - None => { - new_unbound_shared_tcp_socket(socket2::Domain::for_address(remote_socket_addr))? - } + None => new_unbound_shared_tcp_socket(socket2::Domain::for_address(socket_addr))?, }; // Non-blocking connect to remote address - let ts = nonblocking_connect(socket, remote_socket_addr).await?; + let ts = nonblocking_connect(socket, socket_addr).await?; // See what local address we ended up with and turn this into a stream let actual_local_address = ts.local_addr()?; @@ -166,7 +161,10 @@ impl RawTcpProtocolHandler { // Wrap the stream in a network connection and return it let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new( ConnectionDescriptor::new( - dial_info.to_peer_address(), + PeerAddress::new( + SocketAddress::from_socket_addr(socket_addr), + ProtocolType::TCP, + ), SocketAddress::from_socket_addr(actual_local_address), ), ps, @@ -175,79 +173,74 @@ impl RawTcpProtocolHandler { Ok(conn) } - #[instrument(level = "trace", err, skip(data), fields(data.len = data.len()))] - pub async fn send_unbound_message(socket_addr: SocketAddr, data: Vec) -> io::Result<()> { - if data.len() > MAX_MESSAGE_SIZE { - bail_io_error_other!("sending too large unbound TCP message"); - } - trace!( - "sending unbound message of length {} to {}", - data.len(), - socket_addr - ); + // #[instrument(level = "trace", err, skip(data), fields(data.len = data.len()))] + // pub async fn send_unbound_message(socket_addr: SocketAddr, data: Vec) -> io::Result<()> { + // if data.len() > MAX_MESSAGE_SIZE { + // bail_io_error_other!("sending too large unbound TCP message"); + // } + // // Make a shared socket + // let socket = new_unbound_shared_tcp_socket(socket2::Domain::for_address(socket_addr))?; - // Make a shared socket - let socket = new_unbound_shared_tcp_socket(socket2::Domain::for_address(socket_addr))?; + // // Non-blocking connect to remote address + // let ts = nonblocking_connect(socket, socket_addr).await?; - // Non-blocking connect to remote address - let ts = nonblocking_connect(socket, socket_addr).await?; + // // See what local address we ended up with and turn this into a stream + // // let actual_local_address = ts + // // .local_addr() + // // .map_err(map_to_string) + // // .map_err(logthru_net!("could not get local address from TCP stream"))?; - // See what local address we ended up with and turn this into a stream - // let actual_local_address = ts - // .local_addr() - // .map_err(map_to_string) - // .map_err(logthru_net!("could not get local address from TCP stream"))?; + // #[cfg(feature = "rt-tokio")] + // let ts = ts.compat(); + // let mut ps = AsyncPeekStream::new(ts); - #[cfg(feature = "rt-tokio")] - let ts = ts.compat(); - let mut ps = AsyncPeekStream::new(ts); + // // Send directly from the raw network connection + // // this builds the connection and tears it down immediately after the send + // RawTcpNetworkConnection::send_internal(&mut ps, data).await + // } - // Send directly from the raw network connection - // this builds the connection and tears it down immediately after the send - RawTcpNetworkConnection::send_internal(&mut ps, data).await - } + // #[instrument(level = "trace", err, skip(data), fields(data.len = data.len(), ret.timeout_or))] + // pub async fn send_recv_unbound_message( + // socket_addr: SocketAddr, + // data: Vec, + // timeout_ms: u32, + // ) -> io::Result>> { + // if data.len() > MAX_MESSAGE_SIZE { + // bail_io_error_other!("sending too large unbound TCP message"); + // } - #[instrument(level = "trace", err, skip(data), fields(data.len = data.len(), ret.len))] - pub async fn send_recv_unbound_message( - socket_addr: SocketAddr, - data: Vec, - timeout_ms: u32, - ) -> io::Result> { - if data.len() > MAX_MESSAGE_SIZE { - bail_io_error_other!("sending too large unbound TCP message"); - } - trace!( - "sending unbound message of length {} to {}", - data.len(), - socket_addr - ); + // // Make a shared socket + // let socket = new_unbound_shared_tcp_socket(socket2::Domain::for_address(socket_addr))?; - // Make a shared socket - let socket = new_unbound_shared_tcp_socket(socket2::Domain::for_address(socket_addr))?; + // // Non-blocking connect to remote address + // let ts = nonblocking_connect(socket, socket_addr).await?; - // Non-blocking connect to remote address - let ts = nonblocking_connect(socket, socket_addr).await?; + // // See what local address we ended up with and turn this into a stream + // // let actual_local_address = ts + // // .local_addr() + // // .map_err(map_to_string) + // // .map_err(logthru_net!("could not get local address from TCP stream"))?; + // #[cfg(feature = "rt-tokio")] + // let ts = ts.compat(); + // let mut ps = AsyncPeekStream::new(ts); - // See what local address we ended up with and turn this into a stream - // let actual_local_address = ts - // .local_addr() - // .map_err(map_to_string) - // .map_err(logthru_net!("could not get local address from TCP stream"))?; - #[cfg(feature = "rt-tokio")] - let ts = ts.compat(); - let mut ps = AsyncPeekStream::new(ts); + // // Send directly from the raw network connection + // // this builds the connection and tears it down immediately after the send + // RawTcpNetworkConnection::send_internal(&mut ps, data).await?; + // let out = timeout(timeout_ms, RawTcpNetworkConnection::recv_internal(&mut ps)) + // .await + // .into_timeout_or() + // .into_result()?; - // Send directly from the raw network connection - // this builds the connection and tears it down immediately after the send - RawTcpNetworkConnection::send_internal(&mut ps, data).await?; - - let out = timeout(timeout_ms, RawTcpNetworkConnection::recv_internal(&mut ps)) - .await - .map_err(|e| e.to_io())??; - - tracing::Span::current().record("ret.len", &out.len()); - Ok(out) - } + // tracing::Span::current().record( + // "ret.timeout_or", + // &match out { + // TimeoutOr::>::Value(ref v) => format!("Value(len={})", v.len()), + // TimeoutOr::>::Timeout => "Timeout".to_owned(), + // }, + // ); + // Ok(out) + // } } impl ProtocolAcceptHandler for RawTcpProtocolHandler { diff --git a/veilid-core/src/network_manager/native/protocol/udp.rs b/veilid-core/src/network_manager/native/protocol/udp.rs index 2c71294c..3ea3ce17 100644 --- a/veilid-core/src/network_manager/native/protocol/udp.rs +++ b/veilid-core/src/network_manager/native/protocol/udp.rs @@ -60,68 +60,65 @@ impl RawUdpProtocolHandler { Ok(()) } - #[instrument(level = "trace", err, skip(data), fields(data.len = data.len()))] - pub async fn send_unbound_message(socket_addr: SocketAddr, data: Vec) -> io::Result<()> { - if data.len() > MAX_MESSAGE_SIZE { - bail_io_error_other!("sending too large unbound UDP message"); - } - + #[instrument(level = "trace", err)] + pub async fn new_unspecified_bound_handler( + socket_addr: &SocketAddr, + ) -> io::Result { // get local wildcard address for bind - let local_socket_addr = match socket_addr { - SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), - SocketAddr::V6(_) => { - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 0) - } - }; + let local_socket_addr = compatible_unspecified_socket_addr(&socket_addr); let socket = UdpSocket::bind(local_socket_addr).await?; - let len = socket.send_to(&data, socket_addr).await?; - if len != data.len() { - bail_io_error_other!("UDP partial unbound send") - } - - Ok(()) + Ok(RawUdpProtocolHandler::new(Arc::new(socket))) } - #[instrument(level = "trace", err, skip(data), fields(data.len = data.len(), ret.len))] - pub async fn send_recv_unbound_message( - socket_addr: SocketAddr, - data: Vec, - timeout_ms: u32, - ) -> io::Result> { - if data.len() > MAX_MESSAGE_SIZE { - bail_io_error_other!("sending too large unbound UDP message"); - } + // #[instrument(level = "trace", err, skip(data), fields(data.len = data.len(), ret.timeout_or))] + // pub async fn send_recv_unbound_message( + // socket_addr: SocketAddr, + // data: Vec, + // timeout_ms: u32, + // ) -> io::Result>> { + // if data.len() > MAX_MESSAGE_SIZE { + // bail_io_error_other!("sending too large unbound UDP message"); + // } - // get local wildcard address for bind - let local_socket_addr = match socket_addr { - SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), - SocketAddr::V6(_) => { - SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 0) - } - }; + // // get local wildcard address for bind + // let local_socket_addr = match socket_addr { + // SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), + // SocketAddr::V6(_) => { + // SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 0) + // } + // }; - // get unspecified bound socket - let socket = UdpSocket::bind(local_socket_addr).await?; - let len = socket.send_to(&data, socket_addr).await?; - if len != data.len() { - bail_io_error_other!("UDP partial unbound send"); - } + // // get unspecified bound socket + // let socket = UdpSocket::bind(local_socket_addr).await?; + // let len = socket.send_to(&data, socket_addr).await?; + // if len != data.len() { + // bail_io_error_other!("UDP partial unbound send"); + // } - // receive single response - let mut out = vec![0u8; MAX_MESSAGE_SIZE]; - let (len, from_addr) = timeout(timeout_ms, socket.recv_from(&mut out)) - .await - .map_err(|e| e.to_io())??; + // // receive single response + // let mut out = vec![0u8; MAX_MESSAGE_SIZE]; + // let timeout_or_ret = timeout(timeout_ms, socket.recv_from(&mut out)) + // .await + // .into_timeout_or() + // .into_result()?; + // let (len, from_addr) = match timeout_or_ret { + // TimeoutOr::Value(v) => v, + // TimeoutOr::Timeout => { + // tracing::Span::current().record("ret.timeout_or", &"Timeout".to_owned()); + // return Ok(TimeoutOr::Timeout); + // } + // }; - // if the from address is not the same as the one we sent to, then drop this - if from_addr != socket_addr { - bail_io_error_other!(format!( - "Unbound response received from wrong address: addr={}", - from_addr, - )); - } - out.resize(len, 0u8); - tracing::Span::current().record("ret.len", &len); - Ok(out) - } + // // if the from address is not the same as the one we sent to, then drop this + // if from_addr != socket_addr { + // bail_io_error_other!(format!( + // "Unbound response received from wrong address: addr={}", + // from_addr, + // )); + // } + // out.resize(len, 0u8); + + // tracing::Span::current().record("ret.timeout_or", &format!("Value(len={})", out.len())); + // Ok(TimeoutOr::Value(out)) + // } } diff --git a/veilid-core/src/network_manager/native/protocol/ws.rs b/veilid-core/src/network_manager/native/protocol/ws.rs index b10bf53e..283309ea 100644 --- a/veilid-core/src/network_manager/native/protocol/ws.rs +++ b/veilid-core/src/network_manager/native/protocol/ws.rs @@ -223,12 +223,13 @@ impl WebsocketProtocolHandler { Ok(Some(conn)) } - async fn connect_internal( + #[instrument(level = "trace", err)] + pub async fn connect( local_address: Option, - dial_info: DialInfo, + dial_info: &DialInfo, ) -> io::Result { // Split dial info up - let (tls, scheme) = match &dial_info { + let (tls, scheme) = match dial_info { DialInfo::WS(_) => (false, "ws"), DialInfo::WSS(_) => (true, "wss"), _ => panic!("invalid dialinfo for WS/WSS protocol"), @@ -285,46 +286,6 @@ impl WebsocketProtocolHandler { )) } } - - #[instrument(level = "trace", err)] - pub async fn connect( - local_address: Option, - dial_info: DialInfo, - ) -> io::Result { - Self::connect_internal(local_address, dial_info).await - } - - #[instrument(level = "trace", err, skip(data), fields(data.len = data.len()))] - pub async fn send_unbound_message(dial_info: DialInfo, data: Vec) -> io::Result<()> { - if data.len() > MAX_MESSAGE_SIZE { - bail_io_error_other!("sending too large unbound WS message"); - } - - let protconn = Self::connect_internal(None, dial_info.clone()).await?; - - protconn.send(data).await - } - - #[instrument(level = "trace", err, skip(data), fields(data.len = data.len(), ret.len))] - pub async fn send_recv_unbound_message( - dial_info: DialInfo, - data: Vec, - timeout_ms: u32, - ) -> io::Result> { - if data.len() > MAX_MESSAGE_SIZE { - bail_io_error_other!("sending too large unbound WS message"); - } - - let protconn = Self::connect_internal(None, dial_info.clone()).await?; - - protconn.send(data).await?; - let out = timeout(timeout_ms, protconn.recv()) - .await - .map_err(|e| e.to_io())??; - - tracing::Span::current().record("ret.len", &out.len()); - Ok(out) - } } impl ProtocolAcceptHandler for WebsocketProtocolHandler { diff --git a/veilid-core/src/network_manager/wasm/mod.rs b/veilid-core/src/network_manager/wasm/mod.rs index 0e51fe89..4c8a9184 100644 --- a/veilid-core/src/network_manager/wasm/mod.rs +++ b/veilid-core/src/network_manager/wasm/mod.rs @@ -59,7 +59,7 @@ impl Network { ) -> EyreResult<()> { let data_len = data.len(); - let res = match dial_info.protocol_type() { + match dial_info.protocol_type() { ProtocolType::UDP => { bail!("no support for UDP protocol") } @@ -67,17 +67,18 @@ impl Network { bail!("no support for TCP protocol") } ProtocolType::WS | ProtocolType::WSS => { - WebsocketProtocolHandler::send_unbound_message(dial_info.clone(), data) + let pnc = WebsocketProtocolHandler::connect(None, &dial_info) .await - .wrap_err("failed to send unbound message") + .wrap_err("connect failure")?; + pnc.send(data).await.wrap_err("send failure")?; } }; - if res.is_ok() { - // Network accounting - self.network_manager() - .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); - } - res + + // Network accounting + self.network_manager() + .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); + + Ok(()) } // Send data to a dial info, unbound, using a new connection from a random port @@ -91,9 +92,9 @@ impl Network { dial_info: DialInfo, data: Vec, timeout_ms: u32, - ) -> EyreResult> { + ) -> EyreResult>> { let data_len = data.len(); - let out = match dial_info.protocol_type() { + match dial_info.protocol_type() { ProtocolType::UDP => { bail!("no support for UDP protocol") } @@ -101,23 +102,47 @@ impl Network { bail!("no support for TCP protocol") } ProtocolType::WS | ProtocolType::WSS => { - WebsocketProtocolHandler::send_recv_unbound_message( - dial_info.clone(), - data, - timeout_ms, - ) - .await? + let pnc = match dial_info.protocol_type() { + ProtocolType::UDP => unreachable!(), + ProtocolType::TCP => { + let peer_socket_addr = dial_info.to_socket_addr(); + RawTcpProtocolHandler::connect(None, peer_socket_addr) + .await + .wrap_err("connect failure")? + } + ProtocolType::WS | ProtocolType::WSS => { + WebsocketProtocolHandler::connect(None, &dial_info) + .await + .wrap_err("connect failure")? + } + }; + + pnc.send(data).await.wrap_err("send failure")?; + self.network_manager() + .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); + + let out = timeout(timeout_ms, pnc.recv()) + .await + .into_timeout_or() + .into_result() + .wrap_err("recv failure")?; + + tracing::Span::current().record( + "ret.timeout_or", + &match out { + TimeoutOr::>::Value(ref v) => format!("Value(len={})", v.len()), + TimeoutOr::>::Timeout => "Timeout".to_owned(), + }, + ); + + if let TimeoutOr::Value(out) = &out { + self.network_manager() + .stats_packet_rcvd(dial_info.to_ip_addr(), out.len() as u64); + } + + Ok(out) } - }; - - // Network accounting - self.network_manager() - .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); - self.network_manager() - .stats_packet_rcvd(dial_info.to_ip_addr(), out.len() as u64); - - tracing::Span::current().record("ret.len", &out.len()); - Ok(out) + } } #[instrument(level="trace", err, skip(self, data), fields(data.len = data.len()))] diff --git a/veilid-core/src/network_manager/wasm/protocol/mod.rs b/veilid-core/src/network_manager/wasm/protocol/mod.rs index 92734266..28f2648a 100644 --- a/veilid-core/src/network_manager/wasm/protocol/mod.rs +++ b/veilid-core/src/network_manager/wasm/protocol/mod.rs @@ -16,7 +16,7 @@ pub enum ProtocolNetworkConnection { impl ProtocolNetworkConnection { pub async fn connect( local_address: Option, - dial_info: DialInfo, + dial_info: &DialInfo, ) -> io::Result { match dial_info.protocol_type() { ProtocolType::UDP => { @@ -31,42 +31,6 @@ impl ProtocolNetworkConnection { } } - pub async fn send_unbound_message( - dial_info: DialInfo, - data: Vec, - ) -> io::Result<()> { - match dial_info.protocol_type() { - ProtocolType::UDP => { - panic!("UDP dial info is not supported on WASM targets"); - } - ProtocolType::TCP => { - panic!("TCP dial info is not supported on WASM targets"); - } - ProtocolType::WS | ProtocolType::WSS => { - ws::WebsocketProtocolHandler::send_unbound_message(dial_info, data).await - } - } - } - - pub async fn send_recv_unbound_message( - dial_info: DialInfo, - data: Vec, - timeout_ms: u32, - ) -> io::Result> { - match dial_info.protocol_type() { - ProtocolType::UDP => { - panic!("UDP dial info is not supported on WASM targets"); - } - ProtocolType::TCP => { - panic!("TCP dial info is not supported on WASM targets"); - } - ProtocolType::WS | ProtocolType::WSS => { - ws::WebsocketProtocolHandler::send_recv_unbound_message(dial_info, data, timeout_ms) - .await - } - } - } - pub fn descriptor(&self) -> ConnectionDescriptor { match self { Self::Dummy(d) => d.descriptor(), diff --git a/veilid-core/src/network_manager/wasm/protocol/ws.rs b/veilid-core/src/network_manager/wasm/protocol/ws.rs index bf47db12..2b7bfdc1 100644 --- a/veilid-core/src/network_manager/wasm/protocol/ws.rs +++ b/veilid-core/src/network_manager/wasm/protocol/ws.rs @@ -85,12 +85,12 @@ impl WebsocketProtocolHandler { #[instrument(level = "trace", err)] pub async fn connect( local_address: Option, - dial_info: DialInfo, + dial_info: &DialInfo, ) -> io::Result { assert!(local_address.is_none()); // Split dial info up - let (_tls, scheme) = match &dial_info { + let (_tls, scheme) = match dial_info { DialInfo::WS(_) => (false, "ws"), DialInfo::WSS(_) => (true, "wss"), _ => panic!("invalid dialinfo for WS/WSS protocol"), @@ -105,45 +105,10 @@ impl WebsocketProtocolHandler { let (wsmeta, wsio) = fut.await.map_err(to_io)?; // Make our connection descriptor - Ok(ProtocolNetworkConnection::Ws( - WebsocketNetworkConnection::new( - ConnectionDescriptor::new_no_local(dial_info.to_peer_address()), - wsmeta, - wsio, - ), + Ok(WebsocketNetworkConnection::new( + ConnectionDescriptor::new_no_local(dial_info.to_peer_address()), + wsmeta, + wsio, )) } - - #[instrument(level = "trace", err, skip(data), fields(data.len = data.len()))] - pub async fn send_unbound_message(dial_info: DialInfo, data: Vec) -> io::Result<()> { - if data.len() > MAX_MESSAGE_SIZE { - bail_io_error_other!("sending too large unbound WS message"); - } - - // Make the real connection - let conn = Self::connect(None, dial_info).await?; - - conn.send(data).await - } - - #[instrument(level = "trace", err, skip(data), fields(data.len = data.len(), ret.len))] - pub async fn send_recv_unbound_message( - dial_info: DialInfo, - data: Vec, - timeout_ms: u32, - ) -> io::Result> { - if data.len() > MAX_MESSAGE_SIZE { - bail_io_error_other!("sending too large unbound WS message"); - } - - let conn = Self::connect(None, dial_info.clone()).await?; - - conn.send(data).await?; - let out = timeout(timeout_ms, conn.recv()) - .await - .map_err(|e| e.to_io())??; - - tracing::Span::current().record("ret.len", &out.len()); - Ok(out) - } } diff --git a/veilid-core/src/routing_table/tasks.rs b/veilid-core/src/routing_table/tasks.rs index 4c17f08b..fab020ab 100644 --- a/veilid-core/src/routing_table/tasks.rs +++ b/veilid-core/src/routing_table/tasks.rs @@ -207,13 +207,7 @@ impl RoutingTable { let mut unord = FuturesUnordered::new(); for bootstrap_di in bootstrap_dialinfos { - let peer_info = match network_manager.boot_request(bootstrap_di).await { - Ok(v) => v, - Err(e) => { - error!("BOOT request failed: {}", e); - continue; - } - }; + let peer_info = network_manager.boot_request(bootstrap_di).await?; // Got peer info, let's add it to the routing table for pi in peer_info { diff --git a/veilid-core/src/xx/timeout_or.rs b/veilid-core/src/xx/timeout_or.rs index b433824d..25efa3cc 100644 --- a/veilid-core/src/xx/timeout_or.rs +++ b/veilid-core/src/xx/timeout_or.rs @@ -32,6 +32,49 @@ cfg_if! { } ////////////////////////////////////////////////////////////////// +// Non-fallible timeout conversions + +pub trait TimeoutOrExt { + fn into_timeout_or(self) -> TimeoutOr; +} + +impl TimeoutOrExt for Result { + fn into_timeout_or(self) -> TimeoutOr { + self.ok().map(|v| TimeoutOr::::Value(v)).unwrap_or(TimeoutOr::::Timeout) + } +} + +pub trait IoTimeoutOrExt { + fn into_timeout_or(self) -> io::Result>; +} + +impl IoTimeoutOrExt for io::Result { + fn into_timeout_or(self) -> io::Result> { + match self { + Ok(v) => Ok(TimeoutOr::::Value(v)), + Err(e) if e.kind() == io::ErrorKind::TimedOut => Ok(TimeoutOr::::Timeout), + Err(e) => Err(e), + } + } +} + +pub trait TimeoutOrResultExt { + fn into_result(self) -> Result, E>; +} + +impl TimeoutOrResultExt for TimeoutOr> { + fn into_result(self) -> Result, E> { + match self { + TimeoutOr::>::Timeout => Ok(TimeoutOr::::Timeout), + TimeoutOr::>::Value(Ok(v)) => Ok(TimeoutOr::::Value(v)), + TimeoutOr::>::Value(Err(e)) => Err(e), + } + } +} + + +////////////////////////////////////////////////////////////////// +// Non-fallible timeout pub enum TimeoutOr { Timeout, diff --git a/veilid-core/src/xx/tools.rs b/veilid-core/src/xx/tools.rs index 2ca3b11c..347de4f1 100644 --- a/veilid-core/src/xx/tools.rs +++ b/veilid-core/src/xx/tools.rs @@ -146,6 +146,13 @@ where } } +pub fn compatible_unspecified_socket_addr(socket_addr: &SocketAddr) -> SocketAddr { + match socket_addr { + SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), + SocketAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 0), + } +} + pub fn listen_address_to_socket_addrs(listen_address: &str) -> EyreResult> { // If no address is specified, but the port is, use ipv4 and ipv6 unspecified // If the address is specified, only use the specified port and fail otherwise