From 55a44e0c8f49ec23304b5bfb8a597251b7aeb2a1 Mon Sep 17 00:00:00 2001 From: John Smith Date: Mon, 3 Jan 2022 16:29:04 -0500 Subject: [PATCH] more refactor --- veilid-core/src/connection_manager.rs | 69 ++++++++-- veilid-core/src/dht/envelope.rs | 1 + veilid-core/src/dht/receipt.rs | 1 + veilid-core/src/intf/native/network/mod.rs | 122 +++++------------- .../src/intf/native/network/network_tcp.rs | 12 +- .../src/intf/native/network/network_udp.rs | 30 ++++- .../src/intf/native/network/protocol/mod.rs | 42 +++--- .../src/intf/native/network/protocol/tcp.rs | 32 +---- .../src/intf/native/network/protocol/udp.rs | 59 +++------ .../src/intf/native/network/protocol/ws.rs | 27 ++-- .../intf/native/network/start_protocols.rs | 4 +- veilid-core/src/intf/native/system.rs | 2 + .../native/utils/network_interfaces/mod.rs | 1 + .../native/utils/network_interfaces/tools.rs | 2 + .../src/intf/wasm/network/protocol/mod.rs | 42 +++--- .../src/intf/wasm/network/protocol/ws.rs | 17 +++ veilid-core/src/routing_table/mod.rs | 1 + .../src/tests/common/test_connection_table.rs | 36 +++--- 18 files changed, 261 insertions(+), 239 deletions(-) diff --git a/veilid-core/src/connection_manager.rs b/veilid-core/src/connection_manager.rs index 81492ab8..e98f5bea 100644 --- a/veilid-core/src/connection_manager.rs +++ b/veilid-core/src/connection_manager.rs @@ -8,9 +8,8 @@ use futures_util::stream::{FuturesUnordered, StreamExt}; const CONNECTION_PROCESSOR_CHANNEL_SIZE: usize = 128usize; -type ProtocolConnectHandler = fn(Option, DialInfo) -> Result; - -type ProtocolConnectorMap = BTreeMap; +/////////////////////////////////////////////////////////// +// Accept cfg_if! { if #[cfg(not(target_arch = "wasm32"))] { @@ -44,10 +43,36 @@ cfg_if! { } pub type NewProtocolAcceptHandler = - dyn Fn(ConnectionManager, bool, SocketAddr) -> Box + Send; + dyn Fn(VeilidConfig, bool, SocketAddr) -> Box + Send; } } +/////////////////////////////////////////////////////////// +// Dummy network connection for testing + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DummyNetworkConnection { + descriptor: ConnectionDescriptor, +} + +impl DummyNetworkConnection { + pub fn new(descriptor: ConnectionDescriptor) -> Self { + Self { descriptor } + } + pub fn connection_descriptor(&self) -> ConnectionDescriptor { + self.descriptor.clone() + } + pub async fn send(&self, _message: Vec) -> Result<(), String> { + Ok(()) + } + pub async fn recv(&self) -> Result, String> { + Ok(Vec::new()) + } +} + +/////////////////////////////////////////////////////////// +// Connection manager + pub struct ConnectionManagerInner { network_manager: NetworkManager, connection_table: ConnectionTable, @@ -94,10 +119,6 @@ impl ConnectionManager { self.inner.lock().network_manager.clone() } - pub fn config(&self) -> VeilidConfig { - self.network_manager().config() - } - pub async fn startup(&self) { let cac = utils::channel::channel(CONNECTION_PROCESSOR_CHANNEL_SIZE); // xxx move to config self.inner.lock().connection_add_channel_tx = Some(cac.0); @@ -138,6 +159,38 @@ impl ConnectionManager { .map_err(logthru_net!(error "failed to start receiver loop")) } + pub async fn get_or_create_connection( + &self, + local_addr: Option, + dial_info: DialInfo, + ) -> Result { + let peer_address = dial_info.to_peer_address(); + let descriptor = match local_addr { + Some(la) => { + ConnectionDescriptor::new(peer_address, SocketAddress::from_socket_addr(la)) + } + None => ConnectionDescriptor::new_no_local(peer_address), + }; + + // If connection exists, then return it + if let Some(conn) = self + .inner + .lock() + .connection_table + .get_connection(&descriptor) + .map(|e| e.conn) + { + return Ok(conn); + } + + // If not, attempt new connection + let conn = NetworkConnection::connect(local_addr, dial_info).await?; + + self.on_new_connection(conn.clone()).await?; + + Ok(conn) + } + // Connection receiver loop fn process_connection( this: ConnectionManager, diff --git a/veilid-core/src/dht/envelope.rs b/veilid-core/src/dht/envelope.rs index 578bda86..01f84cab 100644 --- a/veilid-core/src/dht/envelope.rs +++ b/veilid-core/src/dht/envelope.rs @@ -1,3 +1,4 @@ +#![allow(dead_code)] #![allow(clippy::absurd_extreme_comparisons)] use super::crypto::*; use super::key::*; diff --git a/veilid-core/src/dht/receipt.rs b/veilid-core/src/dht/receipt.rs index ae0d2196..53b5c48c 100644 --- a/veilid-core/src/dht/receipt.rs +++ b/veilid-core/src/dht/receipt.rs @@ -1,3 +1,4 @@ +#![allow(dead_code)] #![allow(clippy::absurd_extreme_comparisons)] use super::envelope::{MAX_VERSION, MIN_VERSION}; use super::key::*; diff --git a/veilid-core/src/intf/native/network/mod.rs b/veilid-core/src/intf/native/network/mod.rs index ec1a0c8b..bdb8f450 100644 --- a/veilid-core/src/intf/native/network/mod.rs +++ b/veilid-core/src/intf/native/network/mod.rs @@ -261,13 +261,13 @@ impl Network { match &dial_info { DialInfo::UDP(_) => { let peer_socket_addr = dial_info.to_socket_addr(); - RawUdpProtocolHandler::send_unbound_message(data, peer_socket_addr) + RawUdpProtocolHandler::send_unbound_message(peer_socket_addr, data) .await .map_err(logthru_net!()) } DialInfo::TCP(_) => { let peer_socket_addr = dial_info.to_socket_addr(); - RawTcpProtocolHandler::send_unbound_message(data, peer_socket_addr) + RawTcpProtocolHandler::send_unbound_message(peer_socket_addr, data) .await .map_err(logthru_net!()) } @@ -278,78 +278,40 @@ impl Network { } } - // Initiate a new low-level protocol connection to a node - pub async fn connect_to_dial_info( - &self, - local_addr: Option, - dial_info: &DialInfo, - ) -> Result { - let connection_manager = self.connection_manager(); - let peer_socket_addr = dial_info.to_socket_addr(); - - Ok(match &dial_info { - DialInfo::UDP(_) => { - panic!("Do not attempt to connect to UDP dial info") - } - DialInfo::TCP(_) => { - let local_addr = - self.get_preferred_local_address(self.inner.lock().tcp_port, &peer_socket_addr); - RawTcpProtocolHandler::connect(connection_manager, local_addr, dial_info) - .await - .map_err(logthru_net!())? - } - DialInfo::WS(_) => { - let local_addr = - self.get_preferred_local_address(self.inner.lock().ws_port, &peer_socket_addr); - WebsocketProtocolHandler::connect(connection_manager, local_addr, dial_info) - .await - .map_err(logthru_net!(error))? - } - DialInfo::WSS(_) => { - let local_addr = - self.get_preferred_local_address(self.inner.lock().wss_port, &peer_socket_addr); - WebsocketProtocolHandler::connect(connection_manager, local_addr, dial_info) - .await - .map_err(logthru_net!(error))? - } - }) - } - async fn send_data_to_existing_connection( &self, descriptor: &ConnectionDescriptor, data: Vec, ) -> Result>, String> { - match descriptor.protocol_type() { - ProtocolType::UDP => { - // send over the best udp socket we have bound since UDP is not connection oriented - let peer_socket_addr = descriptor.remote.to_socket_addr(); - if let Some(ph) = self.find_best_udp_protocol_handler( - &peer_socket_addr, - &descriptor.local.map(|sa| sa.to_socket_addr()), - ) { - ph.clone() - .send_message(data, peer_socket_addr) - .await - .map_err(logthru_net!())?; - // Data was consumed - return Ok(None); - } - } - ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS => { - // find an existing connection in the connection table if one exists - if let Some(conn) = self.connection_manager().get_connection(descriptor) { - // connection exists, send over it - conn.send(data).await.map_err(logthru_net!())?; - - // Data was consumed - return Ok(None); - } + // Handle connectionless protocol + if descriptor.protocol_type() == ProtocolType::UDP { + // send over the best udp socket we have bound since UDP is not connection oriented + let peer_socket_addr = descriptor.remote.to_socket_addr(); + if let Some(ph) = self.find_best_udp_protocol_handler( + &peer_socket_addr, + &descriptor.local.map(|sa| sa.to_socket_addr()), + ) { + ph.clone() + .send_message(data, peer_socket_addr) + .await + .map_err(logthru_net!())?; + // Data was consumed + return Ok(None); } } - // connection or local socket didn't exist, we'll need to use dialinfo to create one - // Pass the data back out so we don't own it any more - Ok(Some(data)) + + // Handle connection-oriented protocols + if let Some(conn) = self.connection_manager().get_connection(descriptor) { + // connection exists, send over it + conn.send(data).await.map_err(logthru_net!())?; + + // Data was consumed + Ok(None) + } else { + // Connection or didn't exist + // Pass the data back out so we don't own it any more + Ok(Some(data)) + } } // Send data directly to a dial info, possibly without knowing which node it is going to @@ -372,7 +334,11 @@ impl Network { } // Handle connection-oriented protocols - let conn = self.connect_to_dial_info(dial_info).await?; + let local_addr = self.get_preferred_local_address(dial_info); + let conn = self + .connection_manager() + .get_or_create_connection(Some(local_addr), dial_info.clone()) + .await?; conn.send(data).await.map_err(logthru_net!(error)) } @@ -405,25 +371,7 @@ impl Network { .ok_or_else(|| "couldn't send data, no dial info or peer address".to_owned())?; // Handle connectionless protocol - if dial_info.protocol_type() == ProtocolType::UDP { - let peer_socket_addr = dial_info.to_socket_addr(); - if let Some(ph) = self.find_best_udp_protocol_handler(&peer_socket_addr, &None) { - return ph - .send_message(data, peer_socket_addr) - .await - .map_err(logthru_net!()); - } - return Err("no appropriate UDP protocol handler for dial_info".to_owned()) - .map_err(logthru_net!(error)); - } - - // Handle connection-oriented protocols - let local_addr = self.get_preferred_local_address(&dial_info); - let conn = self - .connection_manager() - .get_or_create_connection(dial_info, Some(local_addr)); xxx implement this and pass thru to NetworkConnection::connect - - conn.send(data).await.map_err(logthru_net!(error)) + self.send_data_to_dial_info(&dial_info, data).await } ///////////////////////////////////////////////////////////////// diff --git a/veilid-core/src/intf/native/network/network_tcp.rs b/veilid-core/src/intf/native/network/network_tcp.rs index 90b90afc..b30d547a 100644 --- a/veilid-core/src/intf/native/network/network_tcp.rs +++ b/veilid-core/src/intf/native/network/network_tcp.rs @@ -134,8 +134,8 @@ impl Network { // and the number of total connections let addr = match tcp_stream.peer_addr() { Ok(addr) => addr, - Err(err) => { - error!("failed to get peer address: {}", err); + Err(e) => { + error!("failed to get peer address: {}", e); return; } }; @@ -191,7 +191,9 @@ impl Network { }; // Register the new connection in the connection manager - connection_manager.on_new_connection(conn).await; + if let Err(e) = connection_manager.on_new_connection(conn).await { + error!("failed to register new connection: {}", e); + } }) .await; trace!("exited incoming loop for {}", addr); @@ -248,7 +250,7 @@ impl Network { ls.write() .tls_protocol_handlers .push(new_protocol_accept_handler( - self.connection_manager(), + self.network_manager().config(), true, addr, )); @@ -256,7 +258,7 @@ impl Network { ls.write() .protocol_handlers .push(new_protocol_accept_handler( - self.connection_manager(), + self.network_manager().config(), false, addr, )); diff --git a/veilid-core/src/intf/native/network/network_udp.rs b/veilid-core/src/intf/native/network/network_udp.rs index affed8cf..cb54d06b 100644 --- a/veilid-core/src/intf/native/network/network_udp.rs +++ b/veilid-core/src/intf/native/network/network_udp.rs @@ -21,6 +21,7 @@ impl Network { //////////////////////////////////////////////////////////// // Run thread task to process stream of messages let this = self.clone(); + let jh = spawn(async move { trace!("UDP listener task spawned"); @@ -41,8 +42,26 @@ impl Network { // Spawn a local async task for each socket let mut protocol_handlers_unordered = stream::FuturesUnordered::new(); + let network_manager = this.network_manager(); + for ph in protocol_handlers { - let jh = spawn_local(ph.clone().receive_loop()); + let network_manager = network_manager.clone(); + let jh = spawn_local(async move { + let mut data = vec![0u8; 65536]; + + while let Ok((size, descriptor)) = ph.recv_message(&mut data).await { + // XXX: Limit the number of packets from the same IP address? + log_net!("UDP packet: {:?}", descriptor); + + if let Err(e) = network_manager + .on_recv_envelope(&data[..size], &descriptor) + .await + { + log_net!(error "failed to process received udp envelope: {}", e); + } + } + }); + protocol_handlers_unordered.push(jh); } // Now we wait for any join handle to exit, @@ -83,8 +102,7 @@ impl Network { let socket_arc = Arc::new(udp_socket); // Create protocol handler - let udpv4_handler = - RawUdpProtocolHandler::new(inner.network_manager.clone(), socket_arc); + let udpv4_handler = RawUdpProtocolHandler::new(socket_arc); inner.outbound_udpv4_protocol_handler = Some(udpv4_handler); } @@ -98,8 +116,7 @@ impl Network { let socket_arc = Arc::new(udp_socket); // Create protocol handler - let udpv6_handler = - RawUdpProtocolHandler::new(inner.network_manager.clone(), socket_arc); + let udpv6_handler = RawUdpProtocolHandler::new(socket_arc); inner.outbound_udpv6_protocol_handler = Some(udpv6_handler); } @@ -119,8 +136,7 @@ impl Network { let socket_arc = Arc::new(udp_socket); // Create protocol handler - let protocol_handler = - RawUdpProtocolHandler::new(self.inner.lock().network_manager.clone(), socket_arc); + let protocol_handler = RawUdpProtocolHandler::new(socket_arc); // Create message_handler records self.inner diff --git a/veilid-core/src/intf/native/network/protocol/mod.rs b/veilid-core/src/intf/native/network/protocol/mod.rs index 7fc2eb74..22e689a4 100644 --- a/veilid-core/src/intf/native/network/protocol/mod.rs +++ b/veilid-core/src/intf/native/network/protocol/mod.rs @@ -3,28 +3,11 @@ pub mod udp; pub mod wrtc; pub mod ws; +use crate::connection_manager::*; use crate::xx::*; use crate::*; use socket2::{Domain, Protocol, Socket, Type}; -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct DummyNetworkConnection {} - -impl DummyNetworkConnection { - pub fn connection_descriptor(&self) -> ConnectionDescriptor { - ConnectionDescriptor::new_no_local(PeerAddress::new( - SocketAddress::default(), - ProtocolType::UDP, - )) - } - pub async fn send(&self, _message: Vec) -> Result<(), String> { - Ok(()) - } - pub async fn recv(&self) -> Result, String> { - Ok(Vec::new()) - } -} - #[derive(Clone, Debug, PartialEq, Eq)] pub enum NetworkConnection { Dummy(DummyNetworkConnection), @@ -52,6 +35,29 @@ impl NetworkConnection { } } } + pub async fn send_unbound_message( + &self, + dial_info: &DialInfo, + data: Vec, + ) -> Result<(), String> { + match dial_info.protocol_type() { + ProtocolType::UDP => { + let peer_socket_addr = dial_info.to_socket_addr(); + udp::RawUdpProtocolHandler::send_unbound_message(peer_socket_addr, data) + .await + .map_err(logthru_net!()) + } + ProtocolType::TCP => { + let peer_socket_addr = dial_info.to_socket_addr(); + tcp::RawTcpProtocolHandler::send_unbound_message(peer_socket_addr, data) + .await + .map_err(logthru_net!()) + } + ProtocolType::WS | ProtocolType::WSS => { + ws::WebsocketProtocolHandler::send_unbound_message(dial_info, data).await + } + } + } pub fn connection_descriptor(&self) -> ConnectionDescriptor { match self { diff --git a/veilid-core/src/intf/native/network/protocol/tcp.rs b/veilid-core/src/intf/native/network/protocol/tcp.rs index 1b8acd4d..9121deeb 100644 --- a/veilid-core/src/intf/native/network/protocol/tcp.rs +++ b/veilid-core/src/intf/native/network/protocol/tcp.rs @@ -1,5 +1,4 @@ use super::*; -use crate::connection_manager::*; use crate::intf::native::utils::async_peek_stream::*; use crate::intf::*; use crate::network_manager::MAX_MESSAGE_SIZE; @@ -104,7 +103,6 @@ impl RawTcpNetworkConnection { /// struct RawTcpProtocolHandlerInner { - connection_manager: ConnectionManager, local_address: SocketAddr, } @@ -117,22 +115,13 @@ where } impl RawTcpProtocolHandler { - fn new_inner( - connection_manager: ConnectionManager, - local_address: SocketAddr, - ) -> RawTcpProtocolHandlerInner { - RawTcpProtocolHandlerInner { - connection_manager, - local_address, - } + fn new_inner(local_address: SocketAddr) -> RawTcpProtocolHandlerInner { + RawTcpProtocolHandlerInner { local_address } } - pub fn new(connection_manager: ConnectionManager, local_address: SocketAddr) -> Self { + pub fn new(local_address: SocketAddr) -> Self { Self { - inner: Arc::new(Mutex::new(Self::new_inner( - connection_manager, - local_address, - ))), + inner: Arc::new(Mutex::new(Self::new_inner(local_address))), } } @@ -153,10 +142,7 @@ impl RawTcpProtocolHandler { SocketAddress::from_socket_addr(socket_addr), ProtocolType::TCP, ); - let (network_manager, local_address) = { - let inner = self.inner.lock(); - (inner.connection_manager.clone(), inner.local_address) - }; + let local_address = self.inner.lock().local_address; let conn = NetworkConnection::RawTcp(RawTcpNetworkConnection::new( stream, ConnectionDescriptor::new(peer_addr, SocketAddress::from_socket_addr(local_address)), @@ -194,12 +180,8 @@ impl RawTcpProtocolHandler { .map_err(map_to_string) .map_err(logthru_net!("could not get local address from TCP stream"))?; let ps = AsyncPeekStream::new(ts); - let peer_addr = PeerAddress::new( - SocketAddress::from_socket_addr(remote_socket_addr), - ProtocolType::TCP, - ); - // Wrap the stream in a network connection and register it + // Wrap the stream in a network connection and return it let conn = NetworkConnection::RawTcp(RawTcpNetworkConnection::new( ps, ConnectionDescriptor { @@ -211,8 +193,8 @@ impl RawTcpProtocolHandler { } pub async fn send_unbound_message( - data: Vec, socket_addr: SocketAddr, + data: Vec, ) -> Result<(), String> { if data.len() > MAX_MESSAGE_SIZE { return Err("sending too large unbound TCP message".to_owned()); diff --git a/veilid-core/src/intf/native/network/protocol/udp.rs b/veilid-core/src/intf/native/network/protocol/udp.rs index d49d7fcf..aaf44695 100644 --- a/veilid-core/src/intf/native/network/protocol/udp.rs +++ b/veilid-core/src/intf/native/network/protocol/udp.rs @@ -1,10 +1,9 @@ use crate::intf::*; -use crate::network_manager::{NetworkManager, MAX_MESSAGE_SIZE}; +use crate::network_manager::MAX_MESSAGE_SIZE; use crate::*; use async_std::net::*; struct RawUdpProtocolHandlerInner { - network_manager: NetworkManager, socket: Arc, } @@ -14,64 +13,44 @@ pub struct RawUdpProtocolHandler { } impl RawUdpProtocolHandler { - fn new_inner( - network_manager: NetworkManager, - socket: Arc, - ) -> RawUdpProtocolHandlerInner { - RawUdpProtocolHandlerInner { - network_manager, - socket, - } + fn new_inner(socket: Arc) -> RawUdpProtocolHandlerInner { + RawUdpProtocolHandlerInner { socket } } - pub fn new(network_manager: NetworkManager, socket: Arc) -> Self { + pub fn new(socket: Arc) -> Self { Self { - inner: Arc::new(Mutex::new(Self::new_inner(network_manager, socket))), + inner: Arc::new(Mutex::new(Self::new_inner(socket))), } } - pub async fn receive_loop(self) { - let mut data = vec![0u8; 65536]; + pub async fn recv_message( + &self, + data: &mut [u8], + ) -> Result<(usize, ConnectionDescriptor), String> { let socket = self.inner.lock().socket.clone(); - while let Ok((size, socket_addr)) = socket.recv_from(&mut data).await { - // XXX: Limit the number of packets from the same IP address? - trace!("UDP packet from: {}", socket_addr); + let (size, remote_addr) = socket.recv_from(data).await.map_err(map_to_string)?; - let _processed = self.clone().on_message(&data[..size], socket_addr).await; - } - } - - pub async fn on_message(&self, data: &[u8], remote_addr: SocketAddr) -> Result { - if data.len() > MAX_MESSAGE_SIZE { + if size > MAX_MESSAGE_SIZE { return Err("received too large UDP message".to_owned()); } trace!( "receiving UDP message of length {} from {}", - data.len(), + size, remote_addr ); // Process envelope - let (network_manager, socket) = { - let inner = self.inner.lock(); - (inner.network_manager.clone(), inner.socket.clone()) - }; - let peer_addr = PeerAddress::new( SocketAddress::from_socket_addr(remote_addr), ProtocolType::UDP, ); - let local_socket_addr = socket.local_addr().map_err(|e| format!("{}", e))?; - network_manager - .on_recv_envelope( - data, - &ConnectionDescriptor::new( - peer_addr, - SocketAddress::from_socket_addr(local_socket_addr), - ), - ) - .await + let local_socket_addr = socket.local_addr().map_err(map_to_string)?; + let descriptor = ConnectionDescriptor::new( + peer_addr, + SocketAddress::from_socket_addr(local_socket_addr), + ); + Ok((size, descriptor)) } pub async fn send_message(&self, data: Vec, socket_addr: SocketAddr) -> Result<(), String> { @@ -100,8 +79,8 @@ impl RawUdpProtocolHandler { } pub async fn send_unbound_message( - data: Vec, socket_addr: SocketAddr, + data: Vec, ) -> Result<(), String> { if data.len() > MAX_MESSAGE_SIZE { return Err("sending too large unbound UDP message".to_owned()) diff --git a/veilid-core/src/intf/native/network/protocol/ws.rs b/veilid-core/src/intf/native/network/protocol/ws.rs index eec0157c..c339c297 100644 --- a/veilid-core/src/intf/native/network/protocol/ws.rs +++ b/veilid-core/src/intf/native/network/protocol/ws.rs @@ -1,5 +1,4 @@ use super::*; -use crate::connection_manager::*; use crate::intf::native::utils::async_peek_stream::*; use crate::intf::*; use crate::network_manager::MAX_MESSAGE_SIZE; @@ -133,7 +132,6 @@ where /// struct WebsocketProtocolHandlerInner { tls: bool, - connection_manager: ConnectionManager, local_address: SocketAddr, request_path: Vec, connection_initial_timeout: u64, @@ -147,12 +145,7 @@ where inner: Arc, } impl WebsocketProtocolHandler { - pub fn new( - connection_manager: ConnectionManager, - tls: bool, - local_address: SocketAddr, - ) -> Self { - let config = connection_manager.config(); + pub fn new(config: VeilidConfig, tls: bool, local_address: SocketAddr) -> Self { let c = config.get(); let path = if tls { format!("GET {}", c.network.protocol.ws.path.trim_end_matches('/')) @@ -167,7 +160,6 @@ impl WebsocketProtocolHandler { let inner = WebsocketProtocolHandlerInner { tls, - connection_manager, local_address, request_path: path.as_bytes().to_vec(), connection_initial_timeout, @@ -313,6 +305,23 @@ impl WebsocketProtocolHandler { ))) } } + + pub async fn send_unbound_message(dial_info: &DialInfo, data: Vec) -> Result<(), String> { + if data.len() > MAX_MESSAGE_SIZE { + return Err("sending too large unbound WS message".to_owned()); + } + trace!( + "sending unbound websocket message of length {} to {}", + data.len(), + dial_info, + ); + + let conn = Self::connect(None, dial_info.clone()) + .await + .map_err(|e| format!("failed to connect websocket for unbound message: {}", e))?; + + conn.send(data).await + } } impl ProtocolAcceptHandler for WebsocketProtocolHandler { diff --git a/veilid-core/src/intf/native/network/start_protocols.rs b/veilid-core/src/intf/native/network/start_protocols.rs index 74995dac..3ad2648d 100644 --- a/veilid-core/src/intf/native/network/start_protocols.rs +++ b/veilid-core/src/intf/native/network/start_protocols.rs @@ -160,7 +160,7 @@ impl Network { .start_tcp_listener( listen_address.clone(), true, - Box::new(|n, t, a| Box::new(WebsocketProtocolHandler::new(n, t, a))), + Box::new(|c, t, a| Box::new(WebsocketProtocolHandler::new(c, t, a))), ) .await?; trace!("WSS: listener started"); @@ -222,7 +222,7 @@ impl Network { .start_tcp_listener( listen_address.clone(), false, - Box::new(|n, _, a| Box::new(RawTcpProtocolHandler::new(n, a))), + Box::new(|_, _, a| Box::new(RawTcpProtocolHandler::new(a))), ) .await?; trace!("TCP: listener started"); diff --git a/veilid-core/src/intf/native/system.rs b/veilid-core/src/intf/native/system.rs index 068438e3..6ea0b3ae 100644 --- a/veilid-core/src/intf/native/system.rs +++ b/veilid-core/src/intf/native/system.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use crate::xx::*; pub use async_executors::JoinHandle; use async_executors::{AsyncStd, LocalSpawnHandleExt, SpawnHandleExt}; diff --git a/veilid-core/src/intf/native/utils/network_interfaces/mod.rs b/veilid-core/src/intf/native/utils/network_interfaces/mod.rs index 17a2636f..88b35167 100644 --- a/veilid-core/src/intf/native/utils/network_interfaces/mod.rs +++ b/veilid-core/src/intf/native/utils/network_interfaces/mod.rs @@ -24,6 +24,7 @@ pub enum IfAddr { V6(Ifv6Addr), } +#[allow(dead_code)] impl IfAddr { pub fn ip(&self) -> IpAddr { match *self { diff --git a/veilid-core/src/intf/native/utils/network_interfaces/tools.rs b/veilid-core/src/intf/native/utils/network_interfaces/tools.rs index 3c5dff68..a10a8ac4 100644 --- a/veilid-core/src/intf/native/utils/network_interfaces/tools.rs +++ b/veilid-core/src/intf/native/utils/network_interfaces/tools.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + pub fn convert_to_unsigned_4(x: [i8; 4]) -> [u8; 4] { let mut out: [u8; 4] = [0u8; 4]; for i in 0..4 { diff --git a/veilid-core/src/intf/wasm/network/protocol/mod.rs b/veilid-core/src/intf/wasm/network/protocol/mod.rs index 65632246..8f9ee3f3 100644 --- a/veilid-core/src/intf/wasm/network/protocol/mod.rs +++ b/veilid-core/src/intf/wasm/network/protocol/mod.rs @@ -1,27 +1,10 @@ pub mod wrtc; pub mod ws; +use crate::connection_manager::*; use crate::veilid_api::ProtocolType; use crate::xx::*; -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct DummyNetworkConnection {} - -impl DummyNetworkConnection { - pub fn connection_descriptor(&self) -> ConnectionDescriptor { - ConnectionDescriptor::new_no_local(PeerAddress::new( - SocketAddress::default(), - ProtocolType::UDP, - )) - } - pub async fn send(&self, _message: Vec) -> Result<(), String> { - Ok(()) - } - pub async fn recv(&self) -> Result, String> { - Ok(Vec::new()) - } -} - #[derive(Clone, Debug, PartialEq, Eq)] pub enum NetworkConnection { Dummy(DummyNetworkConnection), @@ -36,7 +19,7 @@ impl NetworkConnection { ) -> Result { match dial_info.protocol_type() { ProtocolType::UDP => { - panic!("Should not connect to UDP dialinfo"); + panic!("UDP dial info is not support on WASM targets"); } ProtocolType::TCP => { panic!("TCP dial info is not support on WASM targets"); @@ -46,13 +29,32 @@ impl NetworkConnection { } } } - + + pub async fn send_unbound_message( + &self, + dial_info: &DialInfo, + data: Vec, + ) -> Result<(), String> { + match dial_info.protocol_type() { + ProtocolType::UDP => { + panic!("UDP dial info is not support on WASM targets"); + } + ProtocolType::TCP => { + panic!("TCP dial info is not support on WASM targets"); + } + ProtocolType::WS | ProtocolType::WSS => { + ws::WebsocketProtocolHandler::send_unbound_message(dial_info, data).await + } + } + } + pub async fn send(&self, message: Vec) -> Result<(), String> { match self { Self::Dummy(d) => d.send(message).await, Self::WS(w) => w.send(message).await, } } + pub async fn recv(&self) -> Result, String> { match self { Self::Dummy(d) => d.recv().await, diff --git a/veilid-core/src/intf/wasm/network/protocol/ws.rs b/veilid-core/src/intf/wasm/network/protocol/ws.rs index 66afda52..782a4828 100644 --- a/veilid-core/src/intf/wasm/network/protocol/ws.rs +++ b/veilid-core/src/intf/wasm/network/protocol/ws.rs @@ -125,4 +125,21 @@ impl WebsocketProtocolHandler { Ok(NetworkConnection::WS(WebsocketNetworkConnection::new(tls, connection_descriptor, wsio))) } + + pub async fn send_unbound_message(dial_info: &DialInfo, data: Vec) -> Result<(), String> { + if data.len() > MAX_MESSAGE_SIZE { + return Err("sending too large unbound WS message".to_owned()); + } + trace!( + "sending unbound websocket message of length {} to {}", + data.len(), + dial_info, + ); + + let conn = Self::connect(None, dial_info.clone()) + .await + .map_err(|e| format!("failed to connect websocket for unbound message: {}", e))?; + + conn.send(data).await + } } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 7125b14a..c5582a2e 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -245,6 +245,7 @@ impl RoutingTable { .instance_empty(); inst.await; } + fn trigger_changed_dial_info(inner: &mut RoutingTableInner) { let mut new_eventual = Eventual::new(); core::mem::swap(&mut inner.eventual_changed_dial_info, &mut new_eventual); diff --git a/veilid-core/src/tests/common/test_connection_table.rs b/veilid-core/src/tests/common/test_connection_table.rs index c774213f..242726e3 100644 --- a/veilid-core/src/tests/common/test_connection_table.rs +++ b/veilid-core/src/tests/common/test_connection_table.rs @@ -1,3 +1,4 @@ +use crate::connection_manager::*; use crate::connection_table::*; use crate::intf::*; use crate::xx::*; @@ -6,18 +7,11 @@ use crate::*; pub async fn test_add_get_remove() { let mut table = ConnectionTable::new(); - let c1 = NetworkConnection::Dummy(DummyNetworkConnection {}); - let c2 = NetworkConnection::Dummy(DummyNetworkConnection {}); - let c3 = NetworkConnection::Dummy(DummyNetworkConnection {}); - let a1 = ConnectionDescriptor::new_no_local(PeerAddress::new( SocketAddress::new(Address::IPV4(Ipv4Addr::new(127, 0, 0, 1)), 8080), ProtocolType::TCP, )); - let a2 = ConnectionDescriptor::new_no_local(PeerAddress::new( - SocketAddress::new(Address::IPV4(Ipv4Addr::new(127, 0, 0, 1)), 8080), - ProtocolType::TCP, - )); + let a2 = a1.clone(); let a3 = ConnectionDescriptor::new( PeerAddress::new( SocketAddress::new(Address::IPV6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 8090), @@ -55,13 +49,19 @@ pub async fn test_add_get_remove() { ))), ); - assert_eq!(a1, a2); - assert_ne!(a3, a4); - assert_ne!(a4, a5); + let c1 = NetworkConnection::Dummy(DummyNetworkConnection::new(a1.clone())); + let c2 = NetworkConnection::Dummy(DummyNetworkConnection::new(a2.clone())); + let c3 = NetworkConnection::Dummy(DummyNetworkConnection::new(a3.clone())); + let c4 = NetworkConnection::Dummy(DummyNetworkConnection::new(a4.clone())); + let c5 = NetworkConnection::Dummy(DummyNetworkConnection::new(a5)); + + assert_eq!(a1, c2.connection_descriptor()); + assert_ne!(a3, c4.connection_descriptor()); + assert_ne!(a4, c5.connection_descriptor()); assert_eq!(table.connection_count(), 0); assert_eq!(table.get_connection(&a1), None); - let entry1 = table.add_connection(a1.clone(), c1.clone()).unwrap(); + let entry1 = table.add_connection(c1.clone()).unwrap(); assert_eq!(table.connection_count(), 1); assert_err!(table.remove_connection(&a3)); @@ -70,8 +70,8 @@ pub async fn test_add_get_remove() { assert_eq!(table.get_connection(&a1), Some(entry1.clone())); assert_eq!(table.get_connection(&a1), Some(entry1.clone())); assert_eq!(table.connection_count(), 1); - assert_err!(table.add_connection(a1.clone(), c1.clone())); - assert_err!(table.add_connection(a1.clone(), c2.clone())); + assert_err!(table.add_connection(c1.clone())); + assert_err!(table.add_connection(c2.clone())); assert_eq!(table.connection_count(), 1); assert_eq!(table.get_connection(&a1), Some(entry1.clone())); assert_eq!(table.get_connection(&a1), Some(entry1.clone())); @@ -83,10 +83,10 @@ pub async fn test_add_get_remove() { assert_eq!(table.get_connection(&a2), None); assert_eq!(table.get_connection(&a1), None); assert_eq!(table.connection_count(), 0); - let entry2 = table.add_connection(a1, c1.clone()).unwrap(); - assert_err!(table.add_connection(a2.clone(), c1)); - let entry3 = table.add_connection(a3.clone(), c2).unwrap(); - let entry4 = table.add_connection(a4.clone(), c3).unwrap(); + let entry2 = table.add_connection(c1).unwrap(); + assert_err!(table.add_connection(c2)); + let entry3 = table.add_connection(c3).unwrap(); + let entry4 = table.add_connection(c4).unwrap(); assert_eq!(table.connection_count(), 3); assert_eq!(table.remove_connection(&a2), Ok(entry2)); assert_eq!(table.remove_connection(&a3), Ok(entry3));