diff --git a/veilid-core/src/network_manager/native/protocol/ws.rs b/veilid-core/src/network_manager/native/protocol/ws.rs index 36125858..76c1a435 100644 --- a/veilid-core/src/network_manager/native/protocol/ws.rs +++ b/veilid-core/src/network_manager/native/protocol/ws.rs @@ -17,23 +17,15 @@ cfg_if! { } } -fn to_io(err: async_tungstenite::tungstenite::Error) -> io::Error { - let kind = match err { - async_tungstenite::tungstenite::Error::ConnectionClosed => io::ErrorKind::ConnectionReset, - async_tungstenite::tungstenite::Error::AlreadyClosed => io::ErrorKind::NotConnected, - async_tungstenite::tungstenite::Error::Io(x) => { - return x; +fn err_to_network_result(err: async_tungstenite::tungstenite::Error) -> NetworkResult { + match err { + async_tungstenite::tungstenite::Error::ConnectionClosed + | async_tungstenite::tungstenite::Error::AlreadyClosed + | async_tungstenite::tungstenite::Error::Io(_) => { + NetworkResult::NoConnection(to_io_error_other(err)) } - async_tungstenite::tungstenite::Error::Tls(_) => io::ErrorKind::InvalidData, - async_tungstenite::tungstenite::Error::Capacity(_) => io::ErrorKind::Other, - async_tungstenite::tungstenite::Error::Protocol(_) => io::ErrorKind::Other, - async_tungstenite::tungstenite::Error::SendQueueFull(_) => io::ErrorKind::Other, - async_tungstenite::tungstenite::Error::Utf8 => io::ErrorKind::Other, - async_tungstenite::tungstenite::Error::Url(_) => io::ErrorKind::Other, - async_tungstenite::tungstenite::Error::Http(_) => io::ErrorKind::Other, - async_tungstenite::tungstenite::Error::HttpFormat(_) => io::ErrorKind::Other, - }; - io::Error::new(kind, err) + _ => NetworkResult::InvalidMessage(err.to_string()), + } } pub type WebSocketNetworkConnectionAccepted = WebsocketNetworkConnection; @@ -73,11 +65,11 @@ where // #[instrument(level = "trace", err, skip(self))] // pub async fn close(&self) -> io::Result<()> { // // Make an attempt to flush the stream - // self.stream.clone().close().await.map_err(to_io)?; + // self.stream.clone().close().await.map_err(to_io_error_other)?; // // Then forcibly close the socket // self.tcp_stream // .shutdown(Shutdown::Both) - // .map_err(to_io) + // .map_err(to_io_error_other) // } #[instrument(level = "trace", err, skip(self, message), fields(network_result, message.len = message.len()))] @@ -85,13 +77,10 @@ where if message.len() > MAX_MESSAGE_SIZE { bail_io_error_other!("received too large WS message"); } - let out = self - .stream - .clone() - .send(Message::binary(message)) - .await - .map_err(to_io) - .into_network_result()?; + let out = match self.stream.clone().send(Message::binary(message)).await { + Ok(v) => NetworkResult::value(v), + Err(e) => err_to_network_result(e), + }; tracing::Span::current().record("network_result", &tracing::field::display(&out)); Ok(out) } @@ -112,16 +101,14 @@ where io::ErrorKind::ConnectionReset, "closeframe", )), - Some(Ok(x)) => { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - format!("Unexpected WS message type: {:?}", x), - )); - } - Some(Err(e)) => return Err(to_io(e)), + Some(Ok(x)) => NetworkResult::NoConnection(io::Error::new( + io::ErrorKind::ConnectionReset, + format!("Unexpected WS message type: {:?}", x), + )), + Some(Err(e)) => err_to_network_result(e), None => NetworkResult::NoConnection(io::Error::new( io::ErrorKind::ConnectionReset, - "connection ended", + "connection ended normally", )), }; diff --git a/veilid-core/src/network_manager/tasks.rs b/veilid-core/src/network_manager/tasks.rs index a399b270..c4e37e36 100644 --- a/veilid-core/src/network_manager/tasks.rs +++ b/veilid-core/src/network_manager/tasks.rs @@ -289,7 +289,7 @@ impl NetworkManager { ) { // Add this our futures to process in parallel let routing_table = routing_table.clone(); - unord.push(intf::spawn(async move { + unord.push(async move { // Need VALID signed peer info, so ask bootstrap to find_node of itself // which will ensure it has the bootstrap's signed peer info as part of the response let _ = routing_table.find_target(nr.clone()).await; @@ -305,7 +305,7 @@ impl NetworkManager { // otherwise this bootstrap is valid, lets ask it to find ourselves now routing_table.reverse_find_node(nr, true).await } - })); + }); } } @@ -333,7 +333,7 @@ impl NetworkManager { let node_refs = routing_table.get_nodes_needing_ping(cur_ts, relay_node_id); for nr in node_refs { let rpc = rpc.clone(); - unord.push(intf::spawn(async move { rpc.rpc_call_status(nr).await })); + unord.push(async move { rpc.rpc_call_status(nr).await }); } // Wait for futures to complete @@ -360,9 +360,7 @@ impl NetworkManager { for nr in noderefs { log_net!("--- peer minimum search with {:?}", nr); let routing_table = routing_table.clone(); - unord.push(intf::spawn(async move { - routing_table.reverse_find_node(nr, false).await - })); + unord.push(async move { routing_table.reverse_find_node(nr, false).await }); } while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} @@ -373,7 +371,7 @@ impl NetworkManager { #[instrument(level = "trace", skip(self), err)] pub(super) async fn relay_management_task_routine( self, - stop_token: StopToken, + _stop_token: StopToken, _last_ts: u64, cur_ts: u64, ) -> EyreResult<()> { @@ -455,7 +453,7 @@ impl NetworkManager { #[instrument(level = "trace", skip(self), err)] pub(super) async fn rolling_transfers_task_routine( self, - stop_token: StopToken, + _stop_token: StopToken, last_ts: u64, cur_ts: u64, ) -> EyreResult<()> { diff --git a/veilid-core/src/xx/network_result.rs b/veilid-core/src/xx/network_result.rs index 8a198e0d..d2eb4233 100644 --- a/veilid-core/src/xx/network_result.rs +++ b/veilid-core/src/xx/network_result.rs @@ -195,7 +195,7 @@ impl From> for Option { // match self { // Self::Timeout => Self::Timeout, // Self::NoConnection(e) => Self::NoConnection(e.clone()), -// Self::InvalidResponse(k, s) => Self::InvalidResponse(k, s.clone()), +// Self::InvalidMessage(s) => Self::InvalidMessage(s.clone()), // Self::Value(t) => Self::Value(t.clone()), // } // }