diff --git a/Cargo.lock b/Cargo.lock index c4eedfc8..84dc16d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3743,6 +3743,7 @@ version = "0.1.0" dependencies = [ "android_logger", "anyhow", + "async-channel", "async-lock", "async-std", "async-tls", diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index 1bf608ca..fffc956f 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -87,6 +87,7 @@ getrandom = { version = "^0", features = ["js"] } ws_stream_wasm = "^0" async_executors = { version = "^0", default-features = false, features = [ "bindgen" ]} async-lock = "^2" +async-channel = { version = "^1" } # Configuration for WASM32 'web-sys' crate [target.'cfg(target_arch = "wasm32")'.dependencies.web-sys] diff --git a/veilid-core/src/connection_manager.rs b/veilid-core/src/connection_manager.rs index fe20e134..c139ce8c 100644 --- a/veilid-core/src/connection_manager.rs +++ b/veilid-core/src/connection_manager.rs @@ -15,7 +15,7 @@ const CONNECTION_PROCESSOR_CHANNEL_SIZE: usize = 128usize; struct ConnectionManagerInner { connection_table: ConnectionTable, connection_processor_jh: Option>, - connection_add_channel_tx: Option>>, + connection_add_channel_tx: Option>>, } impl core::fmt::Debug for ConnectionManagerInner { @@ -69,7 +69,7 @@ impl ConnectionManager { pub async fn startup(&self) { let mut inner = self.arc.inner.lock().await; - let cac = utils::channel::channel(CONNECTION_PROCESSOR_CHANNEL_SIZE); // xxx move to config + let cac = async_channel::bounded(CONNECTION_PROCESSOR_CHANNEL_SIZE); // xxx move to config inner.connection_add_channel_tx = Some(cac.0); let rx = cac.1.clone(); let this = self.clone(); @@ -90,7 +90,7 @@ impl ConnectionManager { } // Internal routine to register new connection atomically - async fn on_new_connection_internal( + fn on_new_connection_internal( &self, inner: &mut ConnectionManagerInner, conn: NetworkConnection, @@ -103,7 +103,6 @@ impl ConnectionManager { let receiver_loop_future = Self::process_connection(self.clone(), conn.clone()); tx.try_send(receiver_loop_future) - .await .map_err(map_to_string) .map_err(logthru_net!(error "failed to start receiver loop"))?; @@ -117,7 +116,7 @@ impl ConnectionManager { // and spawns a message processing loop for the connection pub async fn on_new_connection(&self, conn: NetworkConnection) -> Result<(), String> { let mut inner = self.arc.inner.lock().await; - self.on_new_connection_internal(&mut *inner, conn).await + self.on_new_connection_internal(&mut *inner, conn) } pub async fn get_or_create_connection( @@ -133,18 +132,21 @@ impl ConnectionManager { None => ConnectionDescriptor::new_no_local(peer_address), }; - // If connection exists, then return it + // If any connection to this remote exists that has the same protocol, return it + // Any connection will do, we don't have to match the local address let mut inner = self.arc.inner.lock().await; - if let Some(conn) = inner.connection_table.get_connection(descriptor) { + if let Some(conn) = inner + .connection_table + .get_last_connection_by_remote(descriptor.remote) + { return Ok(conn); } // If not, attempt new connection let conn = NetworkConnection::connect(local_addr, dial_info).await?; - self.on_new_connection_internal(&mut *inner, conn.clone()) - .await?; + self.on_new_connection_internal(&mut *inner, conn.clone())?; Ok(conn) } @@ -154,6 +156,7 @@ impl ConnectionManager { this: ConnectionManager, conn: NetworkConnection, ) -> SystemPinBoxFuture<()> { + log_net!("Starting process_connection loop for {:?}", conn); let network_manager = this.network_manager(); Box::pin(async move { // @@ -162,7 +165,10 @@ impl ConnectionManager { let res = conn.clone().recv().await; let message = match res { Ok(v) => v, - Err(_) => break, + Err(e) => { + log_net!(error e); + break; + } }; if let Err(e) = network_manager .on_recv_envelope(message.as_slice(), descriptor) @@ -189,7 +195,7 @@ impl ConnectionManager { // Process connection oriented sockets in the background // This never terminates and must have its task cancelled once started // Task cancellation is performed by shutdown() by dropping the join handle - async fn connection_processor(self, rx: utils::channel::Receiver>) { + async fn connection_processor(self, rx: async_channel::Receiver>) { let mut connection_futures: FuturesUnordered> = FuturesUnordered::new(); loop { diff --git a/veilid-core/src/connection_table.rs b/veilid-core/src/connection_table.rs index a2b7d40b..ccc3b341 100644 --- a/veilid-core/src/connection_table.rs +++ b/veilid-core/src/connection_table.rs @@ -1,16 +1,19 @@ use crate::network_connection::*; use crate::xx::*; use crate::*; +use alloc::collections::btree_map::Entry; #[derive(Debug)] pub struct ConnectionTable { - conn_by_addr: BTreeMap, + conn_by_descriptor: BTreeMap, + conns_by_remote: BTreeMap>, } impl ConnectionTable { pub fn new() -> Self { Self { - conn_by_addr: BTreeMap::new(), + conn_by_descriptor: BTreeMap::new(), + conns_by_remote: BTreeMap::new(), } } @@ -21,31 +24,71 @@ impl ConnectionTable { ProtocolType::UDP, "Only connection oriented protocols go in the table!" ); - if self.conn_by_addr.contains_key(&descriptor) { + if self.conn_by_descriptor.contains_key(&descriptor) { return Err(format!( "Connection already added to table: {:?}", descriptor )); } - let res = self.conn_by_addr.insert(descriptor, conn); + let res = self.conn_by_descriptor.insert(descriptor, conn.clone()); assert!(res.is_none()); + + let conns = self.conns_by_remote.entry(descriptor.remote).or_default(); + warn!("add_connection: {:?}", conn); + conns.push(conn); + Ok(()) } pub fn get_connection(&self, descriptor: ConnectionDescriptor) -> Option { - self.conn_by_addr.get(&descriptor).cloned() + let out = self.conn_by_descriptor.get(&descriptor).cloned(); + warn!("get_connection: {:?} -> {:?}", descriptor, out); + out + } + pub fn get_last_connection_by_remote(&self, remote: PeerAddress) -> Option { + let out = self + .conns_by_remote + .get(&remote) + .map(|v| v[(v.len() - 1)].clone()); + warn!("get_last_connection_by_remote: {:?} -> {:?}", remote, out); + out } pub fn connection_count(&self) -> usize { - self.conn_by_addr.len() + self.conn_by_descriptor.len() } pub fn remove_connection( &mut self, descriptor: ConnectionDescriptor, ) -> Result { - self.conn_by_addr + warn!("remove_connection: {:?}", descriptor); + let out = self + .conn_by_descriptor .remove(&descriptor) - .ok_or_else(|| format!("Connection not in table: {:?}", descriptor)) + .ok_or_else(|| format!("Connection not in table: {:?}", descriptor))?; + + match self.conns_by_remote.entry(descriptor.remote) { + Entry::Vacant(_) => { + panic!("inconsistency in connection table") + } + Entry::Occupied(mut o) => { + let v = o.get_mut(); + + // Remove one matching connection from the list + for (n, elem) in v.iter().enumerate() { + if elem.connection_descriptor() == descriptor { + v.remove(n); + break; + } + } + // No connections left for this remote, remove the entry from conns_by_remote + if v.is_empty() { + o.remove_entry(); + } + } + } + + Ok(out) } } diff --git a/veilid-core/src/intf/native/network/mod.rs b/veilid-core/src/intf/native/network/mod.rs index 0ae9c3ae..64fb3070 100644 --- a/veilid-core/src/intf/native/network/mod.rs +++ b/veilid-core/src/intf/native/network/mod.rs @@ -14,7 +14,6 @@ use protocol::tcp::RawTcpProtocolHandler; use protocol::udp::RawUdpProtocolHandler; use protocol::ws::WebsocketProtocolHandler; pub use protocol::*; -use utils::async_peek_stream::*; use utils::network_interfaces::*; use async_std::io; @@ -302,6 +301,8 @@ impl Network { } // Handle connection-oriented protocols + + // Try to send to the exact existing connection if one exists if let Some(conn) = self.connection_manager().get_connection(descriptor).await { // connection exists, send over it conn.send(data).await.map_err(logthru_net!())?; @@ -355,7 +356,8 @@ impl Network { match self .clone() .send_data_to_existing_connection(descriptor, data) - .await? + .await + .map_err(logthru_net!())? { None => { return Ok(()); @@ -371,7 +373,9 @@ impl Network { .best_dial_info() .ok_or_else(|| "couldn't send data, no dial info or peer address".to_owned())?; - self.send_data_to_dial_info(dial_info, data).await + self.send_data_to_dial_info(dial_info, data) + .await + .map_err(logthru_net!()) } ///////////////////////////////////////////////////////////////// diff --git a/veilid-core/src/intf/native/network/network_tcp.rs b/veilid-core/src/intf/native/network/network_tcp.rs index ae051864..7edef9f1 100644 --- a/veilid-core/src/intf/native/network/network_tcp.rs +++ b/veilid-core/src/intf/native/network/network_tcp.rs @@ -1,8 +1,6 @@ use super::*; use crate::intf::*; use crate::network_connection::*; -use utils::clone_stream::*; - use async_tls::TlsAcceptor; ///////////////////////////////////////////////////////////////// @@ -135,7 +133,7 @@ impl Network { let addr = match tcp_stream.peer_addr() { Ok(addr) => addr, Err(e) => { - error!("failed to get peer address: {}", e); + log_net!(error "failed to get peer address: {}", e); return; } }; @@ -159,6 +157,7 @@ impl Network { { // If we fail to get a packet within the connection initial timeout // then we punt this connection + log_net!(warn "connection initial timeout from: {:?}", addr); return; } diff --git a/veilid-core/src/intf/native/network/protocol/mod.rs b/veilid-core/src/intf/native/network/protocol/mod.rs index c4af451a..f69be868 100644 --- a/veilid-core/src/intf/native/network/protocol/mod.rs +++ b/veilid-core/src/intf/native/network/protocol/mod.rs @@ -56,7 +56,7 @@ impl ProtocolNetworkConnection { } } - pub async fn close(&mut self) -> Result<(), String> { + pub async fn close(&self) -> Result<(), String> { match self { Self::Dummy(d) => d.close(), Self::RawTcp(t) => t.close().await, @@ -66,7 +66,7 @@ impl ProtocolNetworkConnection { } } - pub async fn send(&mut self, message: Vec) -> Result<(), String> { + pub async fn send(&self, message: Vec) -> Result<(), String> { match self { Self::Dummy(d) => d.send(message), Self::RawTcp(t) => t.send(message).await, @@ -75,7 +75,7 @@ impl ProtocolNetworkConnection { Self::Wss(w) => w.send(message).await, } } - pub async fn recv(&mut self) -> Result, String> { + pub async fn recv(&self) -> Result, String> { match self { Self::Dummy(d) => d.recv(), Self::RawTcp(t) => t.recv().await, diff --git a/veilid-core/src/intf/native/network/protocol/tcp.rs b/veilid-core/src/intf/native/network/protocol/tcp.rs index 7d53faf8..4391eb04 100644 --- a/veilid-core/src/intf/native/network/protocol/tcp.rs +++ b/veilid-core/src/intf/native/network/protocol/tcp.rs @@ -1,5 +1,4 @@ use super::*; -use crate::intf::native::utils::async_peek_stream::*; use crate::intf::*; use crate::network_manager::MAX_MESSAGE_SIZE; use crate::*; @@ -22,37 +21,43 @@ impl RawTcpNetworkConnection { Self { stream } } - pub async fn close(&mut self) -> Result<(), String> { + pub async fn close(&self) -> Result<(), String> { self.stream + .clone() .close() .await .map_err(map_to_string) .map_err(logthru_net!()) } - pub async fn send(&mut self, message: Vec) -> Result<(), String> { + pub async fn send(&self, message: Vec) -> Result<(), String> { + log_net!("sending TCP message of size {}", message.len()); if message.len() > MAX_MESSAGE_SIZE { return Err("sending too large TCP message".to_owned()); } let len = message.len() as u16; let header = [b'V', b'L', len as u8, (len >> 8) as u8]; - self.stream + let mut stream = self.stream.clone(); + + stream .write_all(&header) .await .map_err(map_to_string) .map_err(logthru_net!())?; - self.stream + stream .write_all(&message) .await .map_err(map_to_string) .map_err(logthru_net!()) } - pub async fn recv(&mut self) -> Result, String> { + pub async fn recv(&self) -> Result, String> { let mut header = [0u8; 4]; - self.stream + let mut stream = self.stream.clone(); + + stream .read_exact(&mut header) .await .map_err(|e| format!("TCP recv error: {}", e))?; @@ -65,10 +70,7 @@ impl RawTcpNetworkConnection { } let mut out: Vec = vec![0u8; len]; - self.stream - .read_exact(&mut out) - .await - .map_err(map_to_string)?; + stream.read_exact(&mut out).await.map_err(map_to_string)?; Ok(out) } } @@ -122,6 +124,8 @@ impl RawTcpProtocolHandler { ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new(stream)), ); + warn!("on_accept_async from: {}", socket_addr); + Ok(Some(conn)) } diff --git a/veilid-core/src/intf/native/network/protocol/ws.rs b/veilid-core/src/intf/native/network/protocol/ws.rs index 90010271..c505ea32 100644 --- a/veilid-core/src/intf/native/network/protocol/ws.rs +++ b/veilid-core/src/intf/native/network/protocol/ws.rs @@ -1,5 +1,4 @@ use super::*; -use crate::intf::native::utils::async_peek_stream::*; use crate::intf::*; use crate::network_manager::MAX_MESSAGE_SIZE; use crate::*; @@ -19,31 +18,12 @@ pub type WebsocketNetworkConnectionWSS = WebsocketNetworkConnection>; pub type WebsocketNetworkConnectionWS = WebsocketNetworkConnection; -struct WebSocketNetworkConnectionInner -where - T: io::Read + io::Write + Send + Unpin + 'static, -{ - ws_stream: WebSocketStream, -} - pub struct WebsocketNetworkConnection where T: io::Read + io::Write + Send + Unpin + 'static, { tls: bool, - inner: Arc>>, -} - -impl Clone for WebsocketNetworkConnection -where - T: io::Read + io::Write + Send + Unpin + 'static, -{ - fn clone(&self) -> Self { - Self { - tls: self.tls, - inner: self.inner.clone(), - } - } + ws_stream: CloneStream>, } impl fmt::Debug for WebsocketNetworkConnection @@ -62,38 +42,28 @@ where pub fn new(tls: bool, ws_stream: WebSocketStream) -> Self { Self { tls, - inner: Arc::new(AsyncMutex::new(WebSocketNetworkConnectionInner { - ws_stream, - })), + ws_stream: CloneStream::new(ws_stream), } } pub async fn close(&self) -> Result<(), String> { - let mut inner = self.inner.lock().await; - inner - .ws_stream - .close(None) - .await - .map_err(map_to_string) - .map_err(logthru_net!(error "failed to close websocket")) + self.ws_stream.clone().close().await.map_err(map_to_string) } pub async fn send(&self, message: Vec) -> Result<(), String> { if message.len() > MAX_MESSAGE_SIZE { return Err("received too large WS message".to_owned()); } - let mut inner = self.inner.lock().await; - inner - .ws_stream + self.ws_stream + .clone() .send(Message::binary(message)) .await .map_err(map_to_string) .map_err(logthru_net!(error "failed to send websocket message")) } - pub async fn recv(&self) -> Result, String> { - let mut inner = self.inner.lock().await; - let out = match inner.ws_stream.next().await { + pub async fn recv(&self) -> Result, String> { + let out = match self.ws_stream.clone().next().await { Some(Ok(Message::Binary(v))) => v, Some(Ok(_)) => { return Err("Unexpected WS message type".to_owned()).map_err(logthru_net!(error)); diff --git a/veilid-core/src/intf/native/utils/channel.rs b/veilid-core/src/intf/native/utils/channel.rs deleted file mode 100644 index 4df53fb8..00000000 --- a/veilid-core/src/intf/native/utils/channel.rs +++ /dev/null @@ -1,83 +0,0 @@ -pub use async_std::channel; - -#[derive(Debug)] -pub struct Sender { - imp: channel::Sender, -} - -impl Clone for Sender { - fn clone(&self) -> Self { - Self { - imp: self.imp.clone(), - } - } -} - -#[derive(Debug)] -pub struct Receiver { - imp: channel::Receiver, -} - -impl Clone for Receiver { - fn clone(&self) -> Self { - Self { - imp: self.imp.clone(), - } - } -} - -pub fn channel(cap: usize) -> (Sender, Receiver) { - let imp = channel::bounded(cap); - (Sender { imp: imp.0 }, Receiver { imp: imp.1 }) -} - -pub use channel::SendError; -pub use channel::TrySendError; - -#[allow(dead_code)] -impl Sender { - // NOTE: This needs a timeout or you could block a very long time - // pub async fn send(&self, msg: T) -> Result<(), SendError> { - // self.imp.send(msg).await - // } - pub async fn try_send(&self, msg: T) -> Result<(), TrySendError> { - self.imp.try_send(msg) - } - pub fn capacity(&self) -> usize { - self.imp.capacity().unwrap() - } - pub fn is_empty(&self) -> bool { - self.imp.is_empty() - } - pub fn is_full(&self) -> bool { - self.imp.is_full() - } - pub fn len(&self) -> usize { - self.imp.len() - } -} - -pub use channel::RecvError; -pub use channel::TryRecvError; - -#[allow(dead_code)] -impl Receiver { - pub async fn recv(&self) -> Result { - self.imp.recv().await - } - pub async fn try_recv(&self) -> Result { - self.imp.try_recv() - } - pub fn capacity(&self) -> usize { - self.imp.capacity().unwrap() - } - pub fn is_empty(&self) -> bool { - self.imp.is_empty() - } - pub fn is_full(&self) -> bool { - self.imp.is_full() - } - pub fn len(&self) -> usize { - self.imp.len() - } -} diff --git a/veilid-core/src/intf/native/utils/clone_stream.rs b/veilid-core/src/intf/native/utils/clone_stream.rs deleted file mode 100644 index 84b9e0c5..00000000 --- a/veilid-core/src/intf/native/utils/clone_stream.rs +++ /dev/null @@ -1,66 +0,0 @@ -use crate::xx::*; -use async_std::io::{Read, Result, Write}; -use core::task::{Context, Poll}; -use std::pin::Pin; - -pub struct CloneStream -where - T: Read + Write + Send + Unpin, -{ - inner: Arc>, -} - -impl Clone for CloneStream -where - T: Read + Write + Send + Unpin, -{ - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - } - } -} - -impl CloneStream -where - T: Read + Write + Send + Unpin, -{ - pub fn new(t: T) -> Self { - Self { - inner: Arc::new(Mutex::new(t)), - } - } -} -impl Read for CloneStream -where - T: Read + Write + Send + Unpin, -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - let mut inner = self.inner.lock(); - Pin::new(&mut *inner).poll_read(cx, buf) - } -} - -impl Write for CloneStream -where - T: Read + Write + Send + Unpin, -{ - fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - let mut inner = self.inner.lock(); - Pin::new(&mut *inner).poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut inner = self.inner.lock(); - Pin::new(&mut *inner).poll_flush(cx) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut inner = self.inner.lock(); - Pin::new(&mut *inner).poll_close(cx) - } -} diff --git a/veilid-core/src/intf/native/utils/mod.rs b/veilid-core/src/intf/native/utils/mod.rs index 1958dd2f..781be6b4 100644 --- a/veilid-core/src/intf/native/utils/mod.rs +++ b/veilid-core/src/intf/native/utils/mod.rs @@ -1,8 +1,5 @@ #[cfg(target_os = "android")] pub mod android; -pub mod async_peek_stream; -pub mod channel; -pub mod clone_stream; #[cfg(target_os = "ios")] pub mod ios; pub mod network_interfaces; diff --git a/veilid-core/src/intf/wasm/network/mod.rs b/veilid-core/src/intf/wasm/network/mod.rs index 09421e60..5e5ef819 100644 --- a/veilid-core/src/intf/wasm/network/mod.rs +++ b/veilid-core/src/intf/wasm/network/mod.rs @@ -65,6 +65,8 @@ impl Network { } // Handle connection-oriented protocols + + // Try to send to the exact existing connection if one exists if let Some(conn) = self.connection_manager().get_connection(descriptor).await { // connection exists, send over it conn.send(data).await.map_err(logthru_net!())?; diff --git a/veilid-core/src/intf/wasm/network/protocol/mod.rs b/veilid-core/src/intf/wasm/network/protocol/mod.rs index b047361a..717c3d34 100644 --- a/veilid-core/src/intf/wasm/network/protocol/mod.rs +++ b/veilid-core/src/intf/wasm/network/protocol/mod.rs @@ -46,20 +46,20 @@ impl ProtocolNetworkConnection { } } } - pub async fn close(&mut self) -> Result<(), String> { + pub async fn close(&self) -> Result<(), String> { match self { Self::Dummy(d) => d.close(), Self::Ws(w) => w.close().await, } } - pub async fn send(&mut self, message: Vec) -> Result<(), String> { + pub async fn send(&self, message: Vec) -> Result<(), String> { match self { Self::Dummy(d) => d.send(message), Self::Ws(w) => w.send(message).await, } } - pub async fn recv(&mut self) -> Result, String> { + pub async fn recv(&self) -> Result, String> { match self { Self::Dummy(d) => d.recv(), Self::Ws(w) => w.recv().await, diff --git a/veilid-core/src/intf/wasm/utils/channel.rs b/veilid-core/src/intf/wasm/utils/channel.rs deleted file mode 100644 index 91eeb732..00000000 --- a/veilid-core/src/intf/wasm/utils/channel.rs +++ /dev/null @@ -1,154 +0,0 @@ -use crate::xx::*; -use alloc::collections::VecDeque; -use core::fmt; - -#[derive(Debug)] -pub struct Channel { - items: VecDeque, - cap: usize, - eventual: Eventual, -} - -#[derive(Debug)] -pub struct Sender { - imp: Arc>>, -} - -impl Clone for Sender { - fn clone(&self) -> Self { - Self { - imp: self.imp.clone(), - } - } -} - -#[derive(Debug)] -pub struct Receiver { - imp: Arc>>, -} - -impl Clone for Receiver { - fn clone(&self) -> Self { - Self { - imp: self.imp.clone(), - } - } -} - -pub fn channel(cap: usize) -> (Sender, Receiver) { - let imp = Channel { - items: VecDeque::with_capacity(cap), - cap, - eventual: Eventual::new(), - }; - - let imparc = Arc::new(Mutex::new(imp)); - ( - Sender { - imp: imparc.clone(), - }, - Receiver { - imp: imparc.clone(), - }, - ) -} - -#[derive(Debug, PartialEq, Eq)] -pub enum TrySendError { - Full(T), - Disconnected(T), -} - -impl fmt::Display for TrySendError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - TrySendError::Full(_) => { - write!(f, "Full") - } - TrySendError::Disconnected(_) => { - write!(f, "Disconnected") - } - } - } -} - -impl Sender { - // NOTE: This needs a timeout or you could block a very long time - // pub async fn send(&self, msg: T) -> Result<(), SendError> { - // xxx - // } - - pub async fn try_send(&self, msg: T) -> Result<(), TrySendError> { - let eventual = { - let mut inner = self.imp.lock(); - if inner.items.len() == inner.cap { - return Err(TrySendError::Full(msg)); - } - let empty = inner.items.is_empty(); - inner.items.push_back(msg); - if empty { - Some(inner.eventual.clone()) - } else { - None - } - }; - if let Some(e) = eventual { - e.resolve().await; - } - Ok(()) - } - pub fn capacity(&self) -> usize { - self.imp.lock().cap - } - pub fn is_empty(&self) -> bool { - self.imp.lock().items.is_empty() - } - pub fn is_full(&self) -> bool { - let inner = self.imp.lock(); - inner.items.len() == inner.cap - } - pub fn len(&self) -> usize { - self.imp.lock().items.len() - } -} - -#[derive(Debug, PartialEq, Eq)] -pub struct RecvError; - -#[derive(Debug, PartialEq, Eq)] -pub enum TryRecvError { - Empty, - Disconnected, -} - -impl Receiver { - pub async fn recv(&self) -> Result { - let eventual = { - let inner = self.imp.lock(); - inner.eventual.clone() - }; - while self.is_empty() { - eventual.instance_clone(true).await; - } - Ok(self.imp.lock().items.pop_front().unwrap()) - } - pub async fn try_recv(&self) -> Result { - if self.is_empty() { - return Err(TryRecvError::Empty); - } - Ok(self.imp.lock().items.pop_front().unwrap()) - } - pub fn capacity(&self) -> usize { - self.imp.lock().cap - } - pub fn is_empty(&self) -> bool { - self.imp.lock().items.is_empty() - } - pub fn is_full(&self) -> bool { - let inner = self.imp.lock(); - inner.items.len() == inner.cap - } - pub fn len(&self) -> usize { - self.imp.lock().items.len() - } -} diff --git a/veilid-core/src/intf/wasm/utils/mod.rs b/veilid-core/src/intf/wasm/utils/mod.rs index 05859c41..cf34813b 100644 --- a/veilid-core/src/intf/wasm/utils/mod.rs +++ b/veilid-core/src/intf/wasm/utils/mod.rs @@ -1,7 +1,5 @@ #![cfg(target_arch = "wasm32")] -pub mod channel; - use crate::xx::*; use core::sync::atomic::{AtomicI8, Ordering}; use js_sys::{global, Reflect}; diff --git a/veilid-core/src/lib.rs b/veilid-core/src/lib.rs index 88476cd1..035bf2b2 100644 --- a/veilid-core/src/lib.rs +++ b/veilid-core/src/lib.rs @@ -1,6 +1,5 @@ #![deny(clippy::all)] #![deny(unused_must_use)] -#![cfg_attr(target_arch = "wasm32", no_std)] #[macro_use] extern crate alloc; diff --git a/veilid-core/src/network_connection.rs b/veilid-core/src/network_connection.rs index 927d6d55..5241934e 100644 --- a/veilid-core/src/network_connection.rs +++ b/veilid-core/src/network_connection.rs @@ -5,41 +5,34 @@ use crate::*; /////////////////////////////////////////////////////////// // Accept -cfg_if! { - if #[cfg(not(target_arch = "wasm32"))] { - use async_std::net::*; - use utils::async_peek_stream::*; +pub trait ProtocolAcceptHandler: ProtocolAcceptHandlerClone + Send + Sync { + fn on_accept( + &self, + stream: AsyncPeekStream, + peer_addr: SocketAddr, + ) -> SystemPinBoxFuture, String>>; +} - pub trait ProtocolAcceptHandler: ProtocolAcceptHandlerClone + Send + Sync { - fn on_accept( - &self, - stream: AsyncPeekStream, - peer_addr: SocketAddr, - ) -> SystemPinBoxFuture, String>>; - } +pub trait ProtocolAcceptHandlerClone { + fn clone_box(&self) -> Box; +} - pub trait ProtocolAcceptHandlerClone { - fn clone_box(&self) -> Box; - } - - impl ProtocolAcceptHandlerClone for T - where - T: 'static + ProtocolAcceptHandler + Clone, - { - fn clone_box(&self) -> Box { - Box::new(self.clone()) - } - } - impl Clone for Box { - fn clone(&self) -> Box { - self.clone_box() - } - } - - pub type NewProtocolAcceptHandler = - dyn Fn(VeilidConfig, bool, SocketAddr) -> Box + Send; +impl ProtocolAcceptHandlerClone for T +where + T: 'static + ProtocolAcceptHandler + Clone, +{ + fn clone_box(&self) -> Box { + Box::new(self.clone()) } } +impl Clone for Box { + fn clone(&self) -> Box { + self.clone_box() + } +} + +pub type NewProtocolAcceptHandler = + dyn Fn(VeilidConfig, bool, SocketAddr) -> Box + Send; /////////////////////////////////////////////////////////// // Dummy protocol network connection for testing @@ -64,7 +57,6 @@ impl DummyNetworkConnection { #[derive(Debug)] struct NetworkConnectionInner { - protocol_connection: ProtocolNetworkConnection, last_message_sent_time: Option, last_message_recv_time: Option, } @@ -73,7 +65,8 @@ struct NetworkConnectionInner { struct NetworkConnectionArc { descriptor: ConnectionDescriptor, established_time: u64, - inner: AsyncMutex, + protocol_connection: ProtocolNetworkConnection, + inner: Mutex, } #[derive(Clone, Debug)] @@ -89,9 +82,8 @@ impl PartialEq for NetworkConnection { impl Eq for NetworkConnection {} impl NetworkConnection { - fn new_inner(protocol_connection: ProtocolNetworkConnection) -> NetworkConnectionInner { + fn new_inner() -> NetworkConnectionInner { NetworkConnectionInner { - protocol_connection, last_message_sent_time: None, last_message_recv_time: None, } @@ -103,7 +95,8 @@ impl NetworkConnection { NetworkConnectionArc { descriptor, established_time: intf::get_timestamp(), - inner: AsyncMutex::new(Self::new_inner(protocol_connection)), + protocol_connection, + inner: Mutex::new(Self::new_inner()), } } @@ -135,23 +128,24 @@ impl NetworkConnection { } pub async fn close(&self) -> Result<(), String> { - let mut inner = self.arc.inner.lock().await; - inner.protocol_connection.close().await + self.arc.protocol_connection.close().await } pub async fn send(&self, message: Vec) -> Result<(), String> { - let mut inner = self.arc.inner.lock().await; - let out = inner.protocol_connection.send(message).await; + let ts = intf::get_timestamp(); + let out = self.arc.protocol_connection.send(message).await; if out.is_ok() { - inner.last_message_sent_time = Some(intf::get_timestamp()); + let mut inner = self.arc.inner.lock(); + inner.last_message_sent_time.max_assign(Some(ts)); } out } pub async fn recv(&self) -> Result, String> { - let mut inner = self.arc.inner.lock().await; - let out = inner.protocol_connection.recv().await; + let ts = intf::get_timestamp(); + let out = self.arc.protocol_connection.recv().await; if out.is_ok() { - inner.last_message_recv_time = Some(intf::get_timestamp()); + let mut inner = self.arc.inner.lock(); + inner.last_message_recv_time.max_assign(Some(ts)); } out } diff --git a/veilid-core/src/network_manager.rs b/veilid-core/src/network_manager.rs index 6efaeb9c..9b83c70b 100644 --- a/veilid-core/src/network_manager.rs +++ b/veilid-core/src/network_manager.rs @@ -377,6 +377,7 @@ impl NetworkManager { node_ref: NodeRef, body: B, ) -> Result<(), String> { + log_net!("sending envelope to {:?}", node_ref); // Get node's min/max version and see if we can send to it // and if so, get the max version we can use let version = if let Some((node_min, node_max)) = node_ref.operate(|e| e.min_max_version()) @@ -388,7 +389,8 @@ impl NetworkManager { node_ref.node_id(), node_min, node_max - )); + )) + .map_err(logthru_rpc!(warn)); } cmp::min(node_max, MAX_VERSION) } else { @@ -396,7 +398,9 @@ impl NetworkManager { }; // Build the envelope to send - let out = self.build_envelope(node_ref.node_id(), version, body)?; + let out = self + .build_envelope(node_ref.node_id(), version, body) + .map_err(logthru_rpc!(error))?; // Send via relay if we have to self.net().send_data(node_ref, out).await @@ -433,6 +437,11 @@ impl NetworkManager { data: &[u8], descriptor: ConnectionDescriptor, ) -> Result { + log_net!( + "envelope of {} bytes received from {:?}", + data.len(), + descriptor + ); // Is this an out-of-band receipt instead of an envelope? if data[0..4] == *RECEIPT_MAGIC { self.process_receipt(data).await?; @@ -530,7 +539,6 @@ impl NetworkManager { // Pass message to RPC system rpc.enqueue_message(envelope, body, source_noderef) - .await .map_err(|e| format!("enqueing rpc message failed: {}", e))?; // Inform caller that we dealt with the envelope locally diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index f5f67ead..c2d1db52 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -6,7 +6,6 @@ pub use debug::*; pub use private_route::*; use crate::dht::*; -use crate::intf::utils::channel::*; use crate::intf::*; use crate::xx::*; use crate::*; @@ -145,7 +144,7 @@ pub struct RPCProcessorInner { routing_table: RoutingTable, node_id: key::DHTKey, node_id_secret: key::DHTKeySecret, - send_channel: Option>, + send_channel: Option>, timeout: u64, max_route_hop_count: usize, waiting_rpc_table: BTreeMap>, @@ -394,9 +393,10 @@ impl RPCProcessor { let (op_id, wants_answer, is_ping) = { let operation = message .get_root::() - .map_err(map_error_internal!("invalid operation"))?; + .map_err(map_error_internal!("invalid operation")) + .map_err(logthru_rpc!(error))?; let op_id = operation.get_op_id(); - let wants_answer = self.wants_answer(&operation)?; + let wants_answer = self.wants_answer(&operation).map_err(logthru_rpc!())?; let is_ping = operation.get_detail().has_info_q(); (op_id, wants_answer, is_ping) @@ -490,7 +490,8 @@ impl RPCProcessor { // Verify hop count isn't larger than out maximum routed hop count if hopcount > self.inner.lock().max_route_hop_count { - return Err(rpc_error_internal("hop count too long for route")); + return Err(rpc_error_internal("hop count too long for route")) + .map_err(logthru_rpc!(warn)); } // calculate actual timeout // timeout is number of hops times the timeout per hop @@ -1245,7 +1246,7 @@ impl RPCProcessor { } } - async fn rpc_worker(self, receiver: Receiver) { + async fn rpc_worker(self, receiver: async_channel::Receiver) { while let Ok(msg) = receiver.recv().await { let _ = self .process_rpc_message(msg) @@ -1284,7 +1285,7 @@ impl RPCProcessor { } inner.timeout = timeout; inner.max_route_hop_count = max_route_hop_count; - let channel = channel(queue_size as usize); + let channel = async_channel::bounded(queue_size as usize); inner.send_channel = Some(channel.0.clone()); // spin up N workers @@ -1303,7 +1304,7 @@ impl RPCProcessor { *self.inner.lock() = Self::new_inner(self.network_manager()); } - pub async fn enqueue_message( + pub fn enqueue_message( &self, envelope: envelope::Envelope, body: Vec, @@ -1324,7 +1325,6 @@ impl RPCProcessor { }; send_channel .try_send(msg) - .await .map_err(|e| format!("failed to enqueue received RPC message: {:?}", e))?; Ok(()) } diff --git a/veilid-core/src/tests/native/test_async_peek_stream.rs b/veilid-core/src/tests/native/test_async_peek_stream.rs index 7efe7bb1..ad76b295 100644 --- a/veilid-core/src/tests/native/test_async_peek_stream.rs +++ b/veilid-core/src/tests/native/test_async_peek_stream.rs @@ -1,4 +1,3 @@ -use crate::intf::utils::async_peek_stream::*; use crate::xx::*; use async_std::io; use async_std::net::{TcpListener, TcpStream}; diff --git a/veilid-core/src/intf/native/utils/async_peek_stream.rs b/veilid-core/src/xx/async_peek_stream.rs similarity index 96% rename from veilid-core/src/intf/native/utils/async_peek_stream.rs rename to veilid-core/src/xx/async_peek_stream.rs index ecb3fb8a..c64928f7 100644 --- a/veilid-core/src/intf/native/utils/async_peek_stream.rs +++ b/veilid-core/src/xx/async_peek_stream.rs @@ -1,7 +1,10 @@ use crate::xx::*; -use async_std::io::{Read, ReadExt, Result, Write}; use core::pin::Pin; use core::task::{Context, Poll}; +use futures_util::AsyncRead as Read; +use futures_util::AsyncReadExt; +use futures_util::AsyncWrite as Write; +use std::io::Result; //////// /// @@ -168,4 +171,4 @@ impl Write for AsyncPeekStream { } } -impl std::marker::Unpin for AsyncPeekStream {} +impl core::marker::Unpin for AsyncPeekStream {} diff --git a/veilid-core/src/xx/clone_stream.rs b/veilid-core/src/xx/clone_stream.rs new file mode 100644 index 00000000..3790966c --- /dev/null +++ b/veilid-core/src/xx/clone_stream.rs @@ -0,0 +1,111 @@ +use crate::xx::*; +use core::pin::Pin; +use core::task::{Context, Poll}; +use futures_util::AsyncRead as Read; +use futures_util::AsyncWrite as Write; +use futures_util::Sink; +use futures_util::Stream; +use std::io; + +pub struct CloneStream +where + T: Unpin, +{ + inner: Arc>, +} + +impl Clone for CloneStream +where + T: Unpin, +{ + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl CloneStream +where + T: Unpin, +{ + pub fn new(t: T) -> Self { + Self { + inner: Arc::new(Mutex::new(t)), + } + } +} + +impl Read for CloneStream +where + T: Read + Unpin, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let mut inner = self.inner.lock(); + Pin::new(&mut *inner).poll_read(cx, buf) + } +} + +impl Write for CloneStream +where + T: Write + Unpin, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let mut inner = self.inner.lock(); + Pin::new(&mut *inner).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut inner = self.inner.lock(); + Pin::new(&mut *inner).poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut inner = self.inner.lock(); + Pin::new(&mut *inner).poll_close(cx) + } +} + +impl Stream for CloneStream +where + T: Stream + Unpin, +{ + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut inner = self.inner.lock(); + Pin::new(&mut *inner).poll_next(cx) + } +} + +impl Sink for CloneStream +where + T: Sink + Unpin, +{ + type Error = T::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut inner = self.inner.lock(); + Pin::new(&mut *inner).poll_ready(cx) + } + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { + let mut inner = self.inner.lock(); + Pin::new(&mut *inner).start_send(item) + } + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut inner = self.inner.lock(); + Pin::new(&mut *inner).poll_flush(cx) + } + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut inner = self.inner.lock(); + Pin::new(&mut *inner).poll_close(cx) + } +} diff --git a/veilid-core/src/xx/log_thru.rs b/veilid-core/src/xx/log_thru.rs index 42ee7de8..5d0cc1b1 100644 --- a/veilid-core/src/xx/log_thru.rs +++ b/veilid-core/src/xx/log_thru.rs @@ -36,6 +36,14 @@ macro_rules! log_net { (error $fmt:literal, $($arg:expr),+) => { error!(target:"net", $fmt, $($arg),+); }; + (warn $text:expr) => {warn!( + target: "net", + "{}", + $text, + )}; + (warn $fmt:literal, $($arg:expr),+) => { + warn!(target:"net", $fmt, $($arg),+); + }; ($text:expr) => {trace!( target: "net", "{}", @@ -56,6 +64,14 @@ macro_rules! log_rpc { (error $fmt:literal, $($arg:expr),+) => { error!(target:"rpc", $fmt, $($arg),+); }; + (warn $text:expr) => { warn!( + target: "rpc", + "{}", + $text, + )}; + (warn $fmt:literal, $($arg:expr),+) => { + warn!(target:"rpc", $fmt, $($arg),+); + }; ($text:expr) => {trace!( target: "rpc", "{}", @@ -76,6 +92,14 @@ macro_rules! log_rtab { (error $fmt:literal, $($arg:expr),+) => { error!(target:"rtab", $fmt, $($arg),+); }; + (warn $text:expr) => { warn!( + target: "rtab", + "{}", + $text, + )}; + (warn $fmt:literal, $($arg:expr),+) => { + warn!(target:"rtab", $fmt, $($arg),+); + }; ($text:expr) => {trace!( target: "rtab", "{}", @@ -153,6 +177,33 @@ macro_rules! logthru { ); e__ }); + // warn + (warn $target:literal) => (|e__| { + warn!( + target: $target, + "[{}]", + e__, + ); + e__ + }); + (warn $target:literal, $text:literal) => (|e__| { + warn!( + target: $target, + "[{}] {}", + e__, + $text + ); + e__ + }); + (warn $target:literal, $fmt:literal, $($arg:expr),+) => (|e__| { + warn!( + target: $target, + concat!("[{}] ", $fmt), + e__, + $($arg),+ + ); + e__ + }); // debug (debug $target:literal) => (|e__| { debug!( diff --git a/veilid-core/src/xx/mod.rs b/veilid-core/src/xx/mod.rs index 12ffa506..a8410f1a 100644 --- a/veilid-core/src/xx/mod.rs +++ b/veilid-core/src/xx/mod.rs @@ -1,4 +1,6 @@ // mod bump_port; +mod async_peek_stream; +mod clone_stream; mod eventual; mod eventual_base; mod eventual_value; @@ -68,6 +70,7 @@ cfg_if! { pub use async_std::pin::Pin; pub use async_std::sync::Mutex as AsyncMutex; pub use async_std::sync::MutexGuard as AsyncMutexGuard; + pub use async_std::channel as async_channel; pub use std::net::{ SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, IpAddr, Ipv4Addr, Ipv6Addr }; pub type SystemPinBoxFuture = PinBox + Send + 'static>; pub type SystemPinBoxFutureLifetime<'a, T> = PinBox + Send + 'a>; @@ -75,6 +78,8 @@ cfg_if! { } // pub use bump_port::*; +pub use async_peek_stream::*; +pub use clone_stream::*; pub use eventual::*; pub use eventual_base::{EventualCommon, EventualResolvedFuture}; pub use eventual_value::*;