From 971fa94751cbc00348e7349adf3b7ad467b09555 Mon Sep 17 00:00:00 2001 From: John Smith Date: Fri, 17 Dec 2021 19:18:25 -0500 Subject: [PATCH] more log refactor --- veilid-core/src/intf/native/network/mod.rs | 52 ++++------ .../src/intf/native/network/protocol/tcp.rs | 2 +- .../src/intf/native/network/protocol/udp.rs | 24 +++-- .../src/intf/native/network/protocol/ws.rs | 33 ++++--- .../network/public_dialinfo_discovery.rs | 44 ++++----- veilid-core/src/intf/wasm/network/mod.rs | 50 ++++++---- .../src/intf/wasm/network/protocol/mod.rs | 4 +- .../src/intf/wasm/network/protocol/ws.rs | 40 +++++--- veilid-core/src/intf/wasm/system.rs | 10 +- veilid-core/src/intf/wasm/utils/channel.rs | 14 +++ veilid-core/src/routing_table/bucket.rs | 4 +- veilid-core/src/routing_table/find_nodes.rs | 4 +- veilid-core/src/routing_table/mod.rs | 89 ++++++----------- veilid-core/src/rpc_processor/mod.rs | 99 ++++++++++++------- veilid-core/src/veilid_api/mod.rs | 2 +- veilid-core/src/xx/log_thru.rs | 46 ++++----- veilid-core/src/xx/split_url.rs | 3 + 17 files changed, 283 insertions(+), 237 deletions(-) diff --git a/veilid-core/src/intf/native/network/mod.rs b/veilid-core/src/intf/native/network/mod.rs index 63d8b155..3bc9d8b3 100644 --- a/veilid-core/src/intf/native/network/mod.rs +++ b/veilid-core/src/intf/native/network/mod.rs @@ -663,14 +663,15 @@ impl Network { let peer_socket_addr = descriptor .remote .to_socket_addr() - .map_err(|_| "unable to get socket address".to_owned())?; + .map_err(map_to_string) + .map_err(logthru_net!(error))?; if let Some(ph) = self.find_best_udp_protocol_handler(&peer_socket_addr, &descriptor.local) { ph.clone() .send_message(data, peer_socket_addr) .await - .map_err(|_| "unable to send udp message".to_owned())?; + .map_err(logthru_net!())?; // Data was consumed return Ok(None); } @@ -683,11 +684,7 @@ impl Network { .get_connection(descriptor) { // connection exists, send over it - entry - .conn - .send(data) - .await - .map_err(|_| "failed to send tcp or ws message".to_owned())?; + entry.conn.send(data).await.map_err(logthru_net!())?; // Data was consumed return Ok(None); @@ -706,24 +703,22 @@ impl Network { ) -> Result<(), String> { match &dial_info { DialInfo::UDP(_) => { - let peer_socket_addr = dial_info - .to_socket_addr() - .map_err(|_| "failed to resolve dial info for UDP dial info".to_owned())?; + let peer_socket_addr = dial_info.to_socket_addr().map_err(logthru_net!())?; RawUdpProtocolHandler::send_unbound_message(data, peer_socket_addr) .await - .map_err(|_| "failed to send unbound message to UDP dial info".to_owned()) + .map_err(logthru_net!()) } DialInfo::TCP(_) => { - let peer_socket_addr = dial_info - .to_socket_addr() - .map_err(|_| "failed to resolve dial info for TCP dial info".to_owned())?; + let peer_socket_addr = dial_info.to_socket_addr().map_err(logthru_net!())?; RawTcpProtocolHandler::send_unbound_message(data, peer_socket_addr) .await - .map_err(|_| "failed to connect to TCP dial info".to_owned()) + .map_err(logthru_net!()) } - DialInfo::WS(_) => Err("WS protocol does not support unbound messages".to_owned()), - DialInfo::WSS(_) => Err("WSS protocol does not support unbound messages".to_owned()), + DialInfo::WS(_) => Err("WS protocol does not support unbound messages".to_owned()) + .map_err(logthru_net!(error)), + DialInfo::WSS(_) => Err("WSS protocol does not support unbound messages".to_owned()) + .map_err(logthru_net!(error)), } } @@ -736,38 +731,33 @@ impl Network { let conn = match &dial_info { DialInfo::UDP(_) => { - let peer_socket_addr = dial_info - .to_socket_addr() - .map_err(|_| "failed to resolve dial info for UDP dial info".to_owned())?; + let peer_socket_addr = dial_info.to_socket_addr().map_err(logthru_net!())?; if let Some(ph) = self.find_best_udp_protocol_handler(&peer_socket_addr, &None) { return ph .send_message(data, peer_socket_addr) .await - .map_err(|_| "failed to send message to UDP dial info".to_owned()); + .map_err(logthru_net!()); } else { - return Err("no appropriate UDP protocol handler for dial_info".to_owned()); + return Err("no appropriate UDP protocol handler for dial_info".to_owned()) + .map_err(logthru_net!(error)); } } DialInfo::TCP(_) => { - let peer_socket_addr = dial_info - .to_socket_addr() - .map_err(|_| "failed to resolve dial info for TCP dial info".to_owned())?; + let peer_socket_addr = dial_info.to_socket_addr().map_err(logthru_net!())?; let some_local_addr = self.find_best_tcp_local_address(&peer_socket_addr); RawTcpProtocolHandler::connect(network_manager, some_local_addr, peer_socket_addr) .await - .map_err(|_| "failed to connect to TCP dial info".to_owned())? + .map_err(logthru_net!())? } DialInfo::WS(_) => WebsocketProtocolHandler::connect(network_manager, dial_info) .await - .map_err(|_| "failed to connect to WS dial info".to_owned())?, + .map_err(logthru_net!(error))?, DialInfo::WSS(_) => WebsocketProtocolHandler::connect(network_manager, dial_info) .await - .map_err(|_| "failed to connect to WSS dial info".to_owned())?, + .map_err(logthru_net!(error))?, }; - conn.send(data) - .await - .map_err(|_| "failed to send data to dial info".to_owned()) + conn.send(data).await.map_err(logthru_net!(error)) } pub async fn send_data(&self, node_ref: NodeRef, data: Vec) -> Result<(), String> { diff --git a/veilid-core/src/intf/native/network/protocol/tcp.rs b/veilid-core/src/intf/native/network/protocol/tcp.rs index d19eb5e9..95ba2af8 100644 --- a/veilid-core/src/intf/native/network/protocol/tcp.rs +++ b/veilid-core/src/intf/native/network/protocol/tcp.rs @@ -210,7 +210,7 @@ impl RawTcpProtocolHandler { socket .connect(&remote_socket2_addr) .map_err(map_to_string) - .map_err(logthru_net!("addr={}", remote_socket_addr))?; + .map_err(logthru_net!(error "addr={}", remote_socket_addr))?; let std_stream: std::net::TcpStream = socket.into(); let ts = TcpStream::from(std_stream); diff --git a/veilid-core/src/intf/native/network/protocol/udp.rs b/veilid-core/src/intf/native/network/protocol/udp.rs index 0b5e2af0..9929bc3a 100644 --- a/veilid-core/src/intf/native/network/protocol/udp.rs +++ b/veilid-core/src/intf/native/network/protocol/udp.rs @@ -63,10 +63,10 @@ impl RawUdpProtocolHandler { pub async fn send_message(&self, data: Vec, socket_addr: SocketAddr) -> Result<(), String> { if data.len() > MAX_MESSAGE_SIZE { - return Err("sending too large UDP message".to_owned()); + return Err("sending too large UDP message".to_owned()).map_err(logthru_net!(error)); } - trace!( + log_net!( "sending UDP message of length {} to {}", data.len(), socket_addr @@ -76,9 +76,11 @@ impl RawUdpProtocolHandler { let len = socket .send_to(&data, socket_addr) .await - .map_err(|e| format!("{}", e))?; + .map_err(map_to_string) + .map_err(logthru_net!(error "failed udp send: addr={}", socket_addr))?; + if len != data.len() { - Err("UDP partial send".to_owned()) + Err("UDP partial send".to_owned()).map_err(logthru_net!(error)) } else { Ok(()) } @@ -89,10 +91,10 @@ impl RawUdpProtocolHandler { socket_addr: SocketAddr, ) -> Result<(), String> { if data.len() > MAX_MESSAGE_SIZE { - return Err("sending too large unbound UDP message".to_owned()); + return Err("sending too large unbound UDP message".to_owned()) + .map_err(logthru_net!(error)); } - xxx continue here - trace!( + log_net!( "sending unbound message of length {} to {}", data.len(), socket_addr @@ -107,13 +109,15 @@ impl RawUdpProtocolHandler { }; let socket = UdpSocket::bind(local_socket_addr) .await - .map_err(|e| format!("{}", e))?; + .map_err(map_to_string) + .map_err(logthru_net!(error "failed to bind unbound udp socket"))?; let len = socket .send_to(&data, socket_addr) .await - .map_err(|e| format!("{}", e))?; + .map_err(map_to_string) + .map_err(logthru_net!(error "failed unbound udp send: addr={}", socket_addr))?; if len != data.len() { - Err("UDP partial unbound send".to_owned()) + Err("UDP partial unbound send".to_owned()).map_err(logthru_net!(error)) } else { Ok(()) } diff --git a/veilid-core/src/intf/native/network/protocol/ws.rs b/veilid-core/src/intf/native/network/protocol/ws.rs index ff3dcb08..2dcd3e8c 100644 --- a/veilid-core/src/intf/native/network/protocol/ws.rs +++ b/veilid-core/src/intf/native/network/protocol/ws.rs @@ -101,6 +101,7 @@ where .send(Message::binary(message)) .await .map_err(map_to_string) + .map_err(logthru_net!(error "failed to send websocket message")) }) } pub fn recv(&self) -> SystemPinBoxFuture, String>> { @@ -112,17 +113,18 @@ where let out = match inner.ws_stream.next().await { Some(Ok(Message::Binary(v))) => v, Some(Ok(_)) => { - return Err("Unexpected WS message type".to_owned()); + return Err("Unexpected WS message type".to_owned()) + .map_err(logthru_net!(error)); } Some(Err(e)) => { - return Err(e.to_string()); + return Err(e.to_string()).map_err(logthru_net!(error)); } None => { - return Err("WS stream closed".to_owned()); + return Err("WS stream closed".to_owned()).map_err(logthru_net!()); } }; if out.len() > MAX_MESSAGE_SIZE { - Err("sending too large WS message".to_owned()) + Err("sending too large WS message".to_owned()).map_err(logthru_net!(error)) } else { Ok(out) } @@ -189,7 +191,10 @@ impl WebsocketProtocolHandler { { Ok(_) => (), Err(e) => { - return Err(format!("failed to peek stream: {:?}", e)); + if e.kind() == io::ErrorKind::TimedOut { + return Err(e).map_err(map_to_string).map_err(logthru_net!()); + } + return Err(e).map_err(map_to_string).map_err(logthru_net!(error)); } } // Check for websocket path @@ -261,13 +266,16 @@ impl WebsocketProtocolHandler { let tcp_stream = TcpStream::connect(format!("{}:{}", &domain, &port)) .await - .map_err(|e| format!("failed to connect tcp stream: {}", e))?; + .map_err(map_to_string) + .map_err(logthru_net!(error))?; let local_addr = tcp_stream .local_addr() - .map_err(|e| format!("can't get local address for tcp stream: {}", e))?; + .map_err(map_to_string) + .map_err(logthru_net!(error))?; let peer_socket_addr = tcp_stream .peer_addr() - .map_err(|e| format!("can't get peer address for tcp stream: {}", e))?; + .map_err(map_to_string) + .map_err(logthru_net!(error))?; let peer_addr = PeerAddress::new( Address::from_socket_addr(peer_socket_addr), peer_socket_addr.port(), @@ -279,10 +287,12 @@ impl WebsocketProtocolHandler { let tls_stream = connector .connect(domain, tcp_stream) .await - .map_err(|e| format!("can't connect tls: {}", e))?; + .map_err(map_to_string) + .map_err(logthru_net!(error))?; let (ws_stream, _response) = client_async(request, tls_stream) .await - .map_err(|e| format!("wss negotation failed: {}", e))?; + .map_err(map_to_string) + .map_err(logthru_net!(error))?; let conn = NetworkConnection::Wss(WebsocketNetworkConnection::new(tls, ws_stream)); network_manager .on_new_connection( @@ -294,7 +304,8 @@ impl WebsocketProtocolHandler { } else { let (ws_stream, _response) = client_async(request, tcp_stream) .await - .map_err(|e| format!("ws negotiate failed: {}", e))?; + .map_err(map_to_string) + .map_err(logthru_net!(error))?; let conn = NetworkConnection::Ws(WebsocketNetworkConnection::new(tls, ws_stream)); network_manager .on_new_connection( diff --git a/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs b/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs index 9e651ff9..d9de185b 100644 --- a/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs +++ b/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs @@ -12,14 +12,14 @@ impl Network { async fn request_public_address(&self, node_ref: NodeRef) -> Option { let routing_table = self.routing_table(); let rpc = routing_table.rpc_processor(); - let info_answer = match rpc.rpc_call_info(node_ref.clone()).await { - Err(e) => { - trace!("failed to get info answer from {:?}: {:?}", node_ref, e); - return None; - } - Ok(ia) => ia, - }; - info_answer.sender_info.socket_address + rpc.rpc_call_info(node_ref.clone()) + .await + .map_err(logthru_net!( + "failed to get info answer from {:?}", + node_ref + )) + .map(|info_answer| info_answer.sender_info.socket_address) + .unwrap_or(None) } // find fast peers with a particular address type, and ask them to tell us what our external address is @@ -31,7 +31,7 @@ impl Network { let routing_table = self.routing_table(); let peers = routing_table.get_fast_nodes_of_type(protocol_address_type); if peers.is_empty() { - trace!("no peers of type '{:?}'", protocol_address_type); + log_net!("no peers of type '{:?}'", protocol_address_type); return None; } for peer in peers { @@ -44,7 +44,7 @@ impl Network { return Some((sa, peer)); } } - trace!("no peers responded with an external address"); + log_net!("no peers responded with an external address"); None } @@ -78,19 +78,13 @@ impl Network { ) -> bool { let routing_table = self.routing_table(); let rpc = routing_table.rpc_processor(); - match rpc - .rpc_call_validate_dial_info(node_ref.clone(), dial_info, redirect, alternate_port) + rpc.rpc_call_validate_dial_info(node_ref.clone(), dial_info, redirect, alternate_port) .await - { - Err(e) => { - error!( - "failed to send validate_dial_info to {:?}: {:?}", - node_ref, e - ); - false - } - Ok(val) => val, - } + .map_err(logthru_net!( + "failed to send validate_dial_info to {:?}", + node_ref + )) + .unwrap_or(false) } async fn try_port_mapping>( @@ -103,7 +97,7 @@ impl Network { } pub async fn update_udpv4_dialinfo_task_routine(self, _l: u64, _t: u64) -> Result<(), String> { - trace!("looking for udpv4 public dial info"); + log_net!("looking for udpv4 public dial info"); let routing_table = self.routing_table(); let mut retry_count = { @@ -148,7 +142,7 @@ impl Network { break; } else { // UDP firewall? - warn!("UDP static public dial info not reachable. UDP firewall may be blocking inbound to {:?} for {:?}",external1_dial_info, node_b); + log_net!("UDP static public dial info not reachable. UDP firewall may be blocking inbound to {:?} for {:?}",external1_dial_info, node_b); } } else { // There is -some NAT- @@ -260,7 +254,7 @@ impl Network { } pub async fn update_tcpv4_dialinfo_task_routine(self, _l: u64, _t: u64) -> Result<(), String> { - trace!("looking for tcpv4 public dial info"); + log_net!("looking for tcpv4 public dial info"); // xxx //Err("unimplemented".to_owned()) Ok(()) diff --git a/veilid-core/src/intf/wasm/network/mod.rs b/veilid-core/src/intf/wasm/network/mod.rs index 90011f3e..6fb958de 100644 --- a/veilid-core/src/intf/wasm/network/mod.rs +++ b/veilid-core/src/intf/wasm/network/mod.rs @@ -45,8 +45,12 @@ impl Network { data: Vec, ) -> Result>, String> { match descriptor.protocol_type() { - ProtocolType::UDP => return Err("no support for udp protocol".to_owned()), - ProtocolType::TCP => return Err("no support for tcp protocol".to_owned()), + ProtocolType::UDP => { + return Err("no support for udp protocol".to_owned()).map_err(logthru_net!(error)) + } + ProtocolType::TCP => { + return Err("no support for tcp protocol".to_owned()).map_err(logthru_net!(error)) + } ProtocolType::WS | ProtocolType::WSS => { // find an existing connection in the connection table if one exists let network_manager = self.inner.lock().network_manager.clone(); @@ -55,11 +59,7 @@ impl Network { .get_connection(&descriptor) { // connection exists, send over it - entry - .conn - .send(data) - .await - .map_err(|_| "failed to send ws message".to_owned())?; + entry.conn.send(data).await.map_err(logthru_net!())?; // Data was consumed return Ok(None); } @@ -78,10 +78,16 @@ impl Network { let network_manager = self.inner.lock().network_manager.clone(); match &dial_info { - DialInfo::UDP(_) => return Err("no support for UDP protocol".to_owned()), - DialInfo::TCP(_) => return Err("no support for TCP protocol".to_owned()), - DialInfo::WS(_) => Err("WS protocol does not support unbound messages".to_owned()), - DialInfo::WSS(_) => Err("WSS protocol does not support unbound messages".to_owned()), + DialInfo::UDP(_) => { + return Err("no support for UDP protocol".to_owned()).map_err(logthru_net!(error)) + } + DialInfo::TCP(_) => { + return Err("no support for TCP protocol".to_owned()).map_err(logthru_net!(error)) + } + DialInfo::WS(_) => Err("WS protocol does not support unbound messages".to_owned()) + .map_err(logthru_net!(error)), + DialInfo::WSS(_) => Err("WSS protocol does not support unbound messages".to_owned()) + .map_err(logthru_net!(error)), } } pub async fn send_data_to_dial_info( @@ -92,19 +98,21 @@ impl Network { let network_manager = self.inner.lock().network_manager.clone(); let conn = match &dial_info { - DialInfo::UDP(_) => return Err("no support for UDP protocol".to_owned()), - DialInfo::TCP(_) => return Err("no support for TCP protocol".to_owned()), + DialInfo::UDP(_) => { + return Err("no support for UDP protocol".to_owned()).map_err(logthru_net!(error)) + } + DialInfo::TCP(_) => { + return Err("no support for TCP protocol".to_owned()).map_err(logthru_net!(error)) + } DialInfo::WS(_) => WebsocketProtocolHandler::connect(network_manager, dial_info) .await - .map_err(|_| "failed to connect to WS dial info".to_owned())?, + .map_err(logthru_net!(error))?, DialInfo::WSS(_) => WebsocketProtocolHandler::connect(network_manager, dial_info) .await - .map_err(|_| "failed to connect to WSS dial info".to_owned())?, + .map_err(logthru_net!(error))?, }; - conn.send(data) - .await - .map_err(|_| "failed to send data to dial info".to_owned()) + conn.send(data).await.map_err(logthru_net!(error)) } pub async fn send_data(&self, node_ref: NodeRef, data: Vec) -> Result<(), String> { @@ -129,9 +137,13 @@ impl Network { // If that fails, try to make a connection or reach out to the peer via its dial info if let Some(di) = dial_info { - self.clone().send_data_to_dial_info(&di, di_data).await + self.clone() + .send_data_to_dial_info(&di, di_data) + .await + .map_err(logthru_net!(error)) } else { Err("couldn't send data, no dial info or peer address".to_owned()) + .map_err(logthru_net!(error)) } } diff --git a/veilid-core/src/intf/wasm/network/protocol/mod.rs b/veilid-core/src/intf/wasm/network/protocol/mod.rs index 7cd6a03f..890b8d9b 100644 --- a/veilid-core/src/intf/wasm/network/protocol/mod.rs +++ b/veilid-core/src/intf/wasm/network/protocol/mod.rs @@ -11,10 +11,10 @@ impl DummyNetworkConnection { pub fn protocol_type(&self) -> ProtocolType { ProtocolType::UDP } - pub fn send(&self, _message: Vec) -> SystemPinBoxFuture> { + pub fn send(&self, _message: Vec) -> SystemPinBoxFuture> { Box::pin(async { Ok(()) }) } - pub fn recv(&self) -> SystemPinBoxFuture, ()>> { + pub fn recv(&self) -> SystemPinBoxFuture, String>> { Box::pin(async { Ok(Vec::new()) }) } } diff --git a/veilid-core/src/intf/wasm/network/protocol/ws.rs b/veilid-core/src/intf/wasm/network/protocol/ws.rs index 536d7330..9147df7b 100644 --- a/veilid-core/src/intf/wasm/network/protocol/ws.rs +++ b/veilid-core/src/intf/wasm/network/protocol/ws.rs @@ -56,28 +56,31 @@ impl WebsocketNetworkConnection { let inner = self.inner.clone(); Box::pin(async move { if message.len() > MAX_MESSAGE_SIZE { - return Err("sending too large WS message".to_owned()); + return Err("sending too large WS message".to_owned()).map_err(logthru_net!(error)); } - inner.lock().ws.send_with_u8_array(&message).map_err(map_to_string) + inner + .lock() + .ws + .send_with_u8_array(&message) + .map_err(|_| "failed to send to websocket".to_owned()) + .map_err(logthru_net!(error)) }) } - pub fn recv(&self) -> SystemPinBoxFuture, String)>> { + pub fn recv(&self) -> SystemPinBoxFuture, String>> { let inner = self.inner.clone(); Box::pin(async move { let out = match inner.lock().ws_stream.next().await { Some(WsMessage::Binary(v)) => v, Some(_) => { - return Err("Unexpected WS message type".to_owned()); - } - Some(Err(e)) => { - return Err(|e| e.to_string()); + return Err("Unexpected WS message type".to_owned()) + .map_err(logthru_net!(error)); } None => { - return Err("WS stream closed".to_owned()); + return Err("WS stream closed".to_owned()).map_err(logthru_net!(error)); } }; if out.len() > MAX_MESSAGE_SIZE { - Err("sending too large WS message".to_owned()) + Err("sending too large WS message".to_owned()).map_err(logthru_net!(error)) } else { Ok(out) } @@ -99,15 +102,22 @@ impl WebsocketProtocolHandler { let (tls, host, port, protocol_type) = match dial_info { DialInfo::WS(ws) => (false, ws.host.clone(), ws.port, ProtocolType::WS), DialInfo::WSS(wss) => (true, wss.host.clone(), wss.port, ProtocolType::WSS), - _ => return Err("wrong protocol for WebsocketProtocolHandler".to_owned()), + _ => { + return Err("wrong protocol for WebsocketProtocolHandler".to_owned()) + .map_err(logthru_net!(error)) + } }; - let peer_addr = PeerAddress::new(Address::from_str(&host)?, port, protocol_type); + let peer_addr = PeerAddress::new( + Address::from_str(&host).map_err(logthru_net!(error))?, + port, + protocol_type, + ); - let (ws, wsio) = match WsMeta::connect(url, None).await { - Ok(conn) => conn, - Err(e) => return Err(format!("couldn't connect to WS url: {}", e)), - }; + let (ws, wsio) = WsMeta::connect(url, None) + .await + .map_err(map_to_string) + .map_err(logthru_net!(error))?; let conn = NetworkConnection::WS(WebsocketNetworkConnection::new(tls, ws, wsio)); network_manager diff --git a/veilid-core/src/intf/wasm/system.rs b/veilid-core/src/intf/wasm/system.rs index 41547b7f..f0cbda61 100644 --- a/veilid-core/src/intf/wasm/system.rs +++ b/veilid-core/src/intf/wasm/system.rs @@ -29,11 +29,11 @@ pub fn get_timestamp() -> u64 { } pub fn get_timestamp_string() -> String { - let date = Date::now(); - let hours = Date::get_utc_hours(date); - let minutes = Date::get_utc_minutes(date); - let seconds = Date::get_utc_seconds(date); - let milliseconds = Date::get_utc_milliseconds(date); + let date = Date::new_0(); + let hours = Date::get_utc_hours(&date); + let minutes = Date::get_utc_minutes(&date); + let seconds = Date::get_utc_seconds(&date); + let milliseconds = Date::get_utc_milliseconds(&date); format!( "{:02}:{:02}:{:02}.{}", hours, minutes, seconds, milliseconds diff --git a/veilid-core/src/intf/wasm/utils/channel.rs b/veilid-core/src/intf/wasm/utils/channel.rs index b6fc605e..91eeb732 100644 --- a/veilid-core/src/intf/wasm/utils/channel.rs +++ b/veilid-core/src/intf/wasm/utils/channel.rs @@ -1,5 +1,6 @@ use crate::xx::*; use alloc::collections::VecDeque; +use core::fmt; #[derive(Debug)] pub struct Channel { @@ -58,6 +59,19 @@ pub enum TrySendError { Disconnected(T), } +impl fmt::Display for TrySendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + TrySendError::Full(_) => { + write!(f, "Full") + } + TrySendError::Disconnected(_) => { + write!(f, "Disconnected") + } + } + } +} + impl Sender { // NOTE: This needs a timeout or you could block a very long time // pub async fn send(&self, msg: T) -> Result<(), SendError> { diff --git a/veilid-core/src/routing_table/bucket.rs b/veilid-core/src/routing_table/bucket.rs index 023c9d8c..ae6cec19 100644 --- a/veilid-core/src/routing_table/bucket.rs +++ b/veilid-core/src/routing_table/bucket.rs @@ -28,7 +28,7 @@ impl Bucket { } pub(super) fn add_entry(&mut self, node_id: DHTKey) -> NodeRef { - info!("Node added: {}", node_id.encode()); + log_rtab!("Node added: {}", node_id.encode()); // Add new entry self.entries.insert(node_id, BucketEntry::new()); @@ -42,7 +42,7 @@ impl Bucket { } pub(super) fn remove_entry(&mut self, node_id: &DHTKey) { - info!("Node removed: {}", node_id); + log_rtab!("Node removed: {}", node_id); // Remove the entry self.entries.remove(node_id); diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index c97d5633..34626cfc 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -215,7 +215,7 @@ impl RoutingTable { // transform, transform, ); - trace!(">> find_fastest_nodes: node count = {}", out.len()); + log_rtab!(">> find_fastest_nodes: node count = {}", out.len()); out } @@ -272,7 +272,7 @@ impl RoutingTable { // transform, transform, ); - trace!(">> find_closest_nodes: node count = {}", out.len()); + log_rtab!(">> find_closest_nodes: node count = {}", out.len()); out } } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 371a4493..e0831219 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -393,7 +393,7 @@ impl RoutingTable { if let Some(dead_node_ids) = bucket.kick(bucket_depth) { // Remove counts inner.bucket_entry_count -= dead_node_ids.len(); - debug!("Routing table now has {} nodes", inner.bucket_entry_count); + log_rtab!("Routing table now has {} nodes", inner.bucket_entry_count); // Now purge the routing table inner vectors //let filter = |k: &DHTKey| dead_node_ids.contains(k); @@ -431,7 +431,7 @@ impl RoutingTable { pub fn create_node_ref(&self, node_id: DHTKey) -> Result { // Ensure someone isn't trying register this node itself if node_id == self.node_id() { - return Err("can't register own node".to_owned()); + return Err("can't register own node".to_owned()).map_err(logthru_rtab!(error)); } // Insert into bucket, possibly evicting the newest bucket member @@ -448,7 +448,7 @@ impl RoutingTable { // Update count inner.bucket_entry_count += 1; - debug!("Routing table now has {} nodes", inner.bucket_entry_count); + log_rtab!("Routing table now has {} nodes", inner.bucket_entry_count); nr }; @@ -480,13 +480,7 @@ impl RoutingTable { node_id: DHTKey, dial_infos: &[DialInfo], ) -> Result { - let nr = match self.create_node_ref(node_id) { - Err(e) => { - return Err(format!("Couldn't create node reference: {}", e)); - } - Ok(v) => v, - }; - + let nr = self.create_node_ref(node_id)?; nr.operate(move |e| -> Result<(), String> { for di in dial_infos { e.add_dial_info(di.clone())?; @@ -505,13 +499,7 @@ impl RoutingTable { descriptor: ConnectionDescriptor, timestamp: u64, ) -> Result { - let nr = match self.create_node_ref(node_id) { - Err(e) => { - return Err(format!("Couldn't create node reference: {}", e)); - } - Ok(v) => v, - }; - + let nr = self.create_node_ref(node_id)?; nr.operate(move |e| { // set the most recent node address for connection finding and udp replies e.set_last_connection(descriptor, timestamp); @@ -535,7 +523,7 @@ impl RoutingTable { let node_id = self.node_id(); let rpc_processor = self.rpc_processor(); - let res = match rpc_processor + let res = rpc_processor .rpc_call_find_node( Destination::Direct(node_ref.clone()), node_id, @@ -543,13 +531,9 @@ impl RoutingTable { RespondTo::Sender, ) .await - { - Ok(v) => v, - Err(e) => { - return Err(format!("couldn't contact node at {:?}: {}", &node_ref, e)); - } - }; - trace!( + .map_err(map_to_string) + .map_err(logthru_rtab!())?; + log_rtab!( "find_self for at {:?} answered in {}ms", &node_ref, timestamp_to_secs(res.latency) * 1000.0f64 @@ -564,15 +548,14 @@ impl RoutingTable { } // register the node if it's new - let nr = match self.register_node_with_dial_info(p.node_id.key, &p.dial_infos) { - Ok(v) => v, - Err(e) => { - return Err(format!( - "couldn't register node {} at {:?}: {}", - p.node_id.key, &p.dial_infos, e - )); - } - }; + let nr = self + .register_node_with_dial_info(p.node_id.key, &p.dial_infos) + .map_err(map_to_string) + .map_err(logthru_rtab!( + "couldn't register node {} at {:?}", + p.node_id.key, + &p.dial_infos + ))?; out.push(nr); } Ok(out) @@ -585,7 +568,7 @@ impl RoutingTable { // Ask bootstrap server for nodes closest to our own node let closest_nodes = match self.find_self(node_ref.clone()).await { Err(e) => { - error!( + log_rtab!(error "reverse_find_node: find_self failed for {:?}: {}", &node_ref, e ); @@ -599,7 +582,7 @@ impl RoutingTable { for closest_nr in closest_nodes { match self.find_self(closest_nr.clone()).await { Err(e) => { - error!( + log_rtab!(error "reverse_find_node: closest node find_self failed for {:?}: {}", &closest_nr, e ); @@ -617,36 +600,28 @@ impl RoutingTable { c.network.bootstrap.clone() }; - debug!("--- bootstrap_task"); + log_rtab!("--- bootstrap_task"); // Map all bootstrap entries to a single key with multiple dialinfo let mut bsmap: BTreeMap> = BTreeMap::new(); for b in bootstrap { - let ndis = match NodeDialInfoSingle::from_str(b.as_str()) { - Err(_) => { - return Err(format!("Invalid dial info in bootstrap entry: {}", b)); - } - Ok(v) => v, - }; + let ndis = NodeDialInfoSingle::from_str(b.as_str()) + .map_err(logthru_rtab!("Invalid dial info in bootstrap entry: {}", b))?; let node_id = ndis.node_id.key; bsmap .entry(node_id) .or_insert_with(Vec::new) .push(ndis.dial_info); } - trace!(" bootstrap list: {:?}", bsmap); + log_rtab!(" bootstrap list: {:?}", bsmap); // Run all bootstrap operations concurrently let mut unord = FuturesUnordered::new(); for (k, v) in bsmap { - let nr = match self.register_node_with_dial_info(k, &v) { - Ok(nr) => nr, - Err(e) => { - return Err(format!("Couldn't add bootstrap node: {}", e)); - } - }; - - trace!(" bootstrapping {} with {:?}", k.encode(), &v); + let nr = self + .register_node_with_dial_info(k, &v) + .map_err(logthru_rtab!("Couldn't add bootstrap node: {}", k))?; + log_rtab!(" bootstrapping {} with {:?}", k.encode(), &v); unord.push(self.reverse_find_node(nr, true)); } while unord.next().await.is_some() {} @@ -659,7 +634,7 @@ impl RoutingTable { // Ask our remaining peers to give us more peers before we go // back to the bootstrap servers to keep us from bothering them too much async fn peer_minimum_refresh_task_routine(self) -> Result<(), String> { - trace!("--- peer_minimum_refresh task"); + log_rtab!("--- peer_minimum_refresh task"); // get list of all peers we know about, even the unreliable ones, and ask them to bootstrap too let noderefs = { @@ -672,7 +647,7 @@ impl RoutingTable { } noderefs }; - trace!(" refreshing with nodes: {:?}", noderefs); + log_rtab!(" refreshing with nodes: {:?}", noderefs); // do peer minimum search concurrently let mut unord = FuturesUnordered::new(); @@ -688,14 +663,14 @@ impl RoutingTable { // Ping each node in the routing table if they need to be pinged // to determine their reliability async fn ping_validator_task_routine(self, _last_ts: u64, cur_ts: u64) -> Result<(), String> { - trace!("--- ping_validator task"); + log_rtab!("--- ping_validator task"); let rpc = self.rpc_processor(); let mut inner = self.inner.lock(); for b in &mut inner.buckets { for (k, entry) in b.entries_mut() { if entry.needs_ping(cur_ts) { let nr = NodeRef::new(self.clone(), *k, entry); - debug!( + log_rtab!( " --- ping validating: {:?} ({})", nr, entry.state_debug_info(cur_ts) @@ -709,7 +684,7 @@ impl RoutingTable { // Compute transfer statistics to determine how 'fast' a node is async fn rolling_transfers_task_routine(self, last_ts: u64, cur_ts: u64) -> Result<(), String> { - trace!("--- rolling_transfers task"); + log_rtab!("--- rolling_transfers task"); let inner = &mut *self.inner.lock(); // Roll our own node's transfers diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 45ba5785..c9f56d00 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -98,14 +98,18 @@ where let wordvec = builder .into_reader() .canonicalize() - .map_err(map_error_capnp_error!())?; + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; Ok(capnp::Word::words_to_bytes(wordvec.as_slice()).to_vec()) } fn reader_to_vec<'a, T>(reader: capnp::message::Reader) -> Result, RPCError> where T: capnp::message::ReaderSegments + 'a, { - let wordvec = reader.canonicalize().map_err(map_error_capnp_error!())?; + let wordvec = reader + .canonicalize() + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; Ok(capnp::Word::words_to_bytes(wordvec.as_slice()).to_vec()) } @@ -222,7 +226,7 @@ impl RPCProcessor { // xxx find node but stop if we find the exact node we want // xxx return whatever node is closest after the timeout - Err(rpc_error_unimplemented("search_dht_single_key")) + Err(rpc_error_unimplemented("search_dht_single_key")).map_err(logthru_rpc!(error)) } // Search the DHT for the 'count' closest nodes to a key, adding them all to the routing table if they are not there and returning their node references @@ -234,7 +238,7 @@ impl RPCProcessor { _timeout: Option, ) -> Result, RPCError> { // xxx return closest nodes after the timeout - Err(rpc_error_unimplemented("search_dht_multi_key")) + Err(rpc_error_unimplemented("search_dht_multi_key")).map_err(logthru_rpc!(error)) } // Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference @@ -254,8 +258,7 @@ impl RPCProcessor { if nr.node_id() != node_id { // found a close node, but not exact within our configured resolve_node timeout - error!("XXX: Timeout"); - return Err(RPCError::Timeout); + return Err(RPCError::Timeout).map_err(logthru_rpc!()); } Ok(nr) @@ -559,7 +562,8 @@ impl RPCProcessor { let request_operation = request_rpcreader .reader .get_root::() - .map_err(map_error_capnp_error!())?; + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; let reply_vec = reader_to_vec(reply_msg)?; @@ -567,12 +571,14 @@ impl RPCProcessor { match request_operation .get_respond_to() .which() - .map_err(map_error_internal!("invalid request operation"))? + .map_err(map_error_internal!("invalid request operation")) + .map_err(logthru_rpc!())? { veilid_capnp::operation::respond_to::None(_) => { // Do not respond // -------------- - return Err(rpc_error_internal("no response requested")); + return Err(rpc_error_internal("no response requested")) + .map_err(logthru_rpc!()); } veilid_capnp::operation::respond_to::Sender(_) => { // Respond to envelope source node, possibly through a relay if the request arrived that way @@ -594,17 +600,20 @@ impl RPCProcessor { // No private route was specified for the return // but we are using a safety route, so we must create an empty private route let mut pr_builder = ::capnp::message::Builder::new_default(); - let private_route = self.new_stub_private_route( - request_rpcreader.header.envelope.get_sender_id(), - &mut pr_builder, - )?; + let private_route = self + .new_stub_private_route( + request_rpcreader.header.envelope.get_sender_id(), + &mut pr_builder, + ) + .map_err(logthru_rpc!())?; out = self.wrap_with_route(Some(sr), private_route, reply_vec)?; // first out_node_id = sr .hops .first() - .ok_or_else(|| rpc_error_internal("no hop in safety route"))? + .ok_or_else(|| rpc_error_internal("no hop in safety route")) + .map_err(logthru_rpc!())? .dial_info .node_id .key; @@ -618,7 +627,10 @@ impl RPCProcessor { // Extract private route for reply let private_route = match pr { Ok(v) => v, - Err(_) => return Err(rpc_error_internal("invalid private route")), + Err(_) => { + return Err(rpc_error_internal("invalid private route")) + .map_err(logthru_rpc!()) + } }; // Reply with 'route' operation @@ -627,23 +639,27 @@ impl RPCProcessor { None => { // If no safety route, the first node is the first hop of the private route if !private_route.has_first_hop() { - return Err(rpc_error_internal("private route has no hops")); + return Err(rpc_error_internal("private route has no hops")) + .map_err(logthru_rpc!()); } let hop = private_route .get_first_hop() .map_err(map_error_internal!("not a valid first hop"))?; decode_public_key( &hop.get_dial_info() - .map_err(map_error_internal!("not a valid dial info"))? + .map_err(map_error_internal!("not a valid dial info")) + .map_err(logthru_rpc!())? .get_node_id() - .map_err(map_error_internal!("not a valid node id"))?, + .map_err(map_error_internal!("not a valid node id")) + .map_err(logthru_rpc!())?, ) } Some(sr) => { // If safety route is in use, first node is the first hop of the safety route sr.hops .first() - .ok_or_else(|| rpc_error_internal("no hop in safety route"))? + .ok_or_else(|| rpc_error_internal("no hop in safety route")) + .map_err(logthru_rpc!())? .dial_info .node_id .key @@ -774,7 +790,8 @@ impl RPCProcessor { let operation = rpcreader .reader .get_root::() - .map_err(map_error_capnp_error!())?; + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; // Don't bother unless we are going to answer if !self.wants_answer(&operation)? { @@ -813,7 +830,8 @@ impl RPCProcessor { let operation = rpcreader .reader .get_root::() - .map_err(map_error_capnp_error!())?; + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; // This should never want an answer if self.wants_answer(&operation)? { @@ -902,11 +920,12 @@ impl RPCProcessor { let operation = rpcreader .reader .get_root::() - .map_err(map_error_capnp_error!())?; + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; // find_node must always want an answer if !self.wants_answer(&operation)? { - return Err(RPCError::InvalidFormat); + return Err(RPCError::InvalidFormat).map_err(logthru_rpc!()); } // get findNodeQ reader @@ -916,12 +935,17 @@ impl RPCProcessor { }; // ensure find_node peerinfo matches the envelope - let target_node_id = - decode_public_key(&fnq_reader.get_node_id().map_err(map_error_capnp_error!())?); + let target_node_id = decode_public_key( + &fnq_reader + .get_node_id() + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?, + ); let peer_info = decode_peer_info( &fnq_reader .get_peer_info() - .map_err(map_error_capnp_error!())?, + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?, )?; if peer_info.node_id.key != rpcreader.header.envelope.get_sender_id() { return Err(RPCError::InvalidFormat); @@ -958,7 +982,7 @@ impl RPCProcessor { // transform |e| RoutingTable::transform_to_peer_info(e, peer_scope, &own_peer_info), ); - trace!(">>>> Returning {} closest peers", closest_nodes.len()); + log_rpc!(">>>> Returning {} closest peers", closest_nodes.len()); // Send find_node answer let mut reply_msg = ::capnp::message::Builder::new_default(); @@ -1022,7 +1046,8 @@ impl RPCProcessor { let operation = rpcreader .reader .get_root::() - .map_err(map_error_capnp_error!())?; + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; // This should never want an answer if self.wants_answer(&operation)? { @@ -1072,7 +1097,8 @@ impl RPCProcessor { let operation = rpcreader .reader .get_root::() - .map_err(map_error_capnp_error!())?; + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; operation.get_op_id() }; @@ -1091,7 +1117,8 @@ impl RPCProcessor { let operation = rpcreader .reader .get_root::() - .map_err(map_error_capnp_error!())?; + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; match operation .get_detail() @@ -1300,11 +1327,13 @@ impl RPCProcessor { let response_operation = rpcreader .reader .get_root::() - .map_err(map_error_capnp_error!())?; + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; let info_a = match response_operation .get_detail() .which() - .map_err(map_error_capnp_notinschema!())? + .map_err(map_error_capnp_notinschema!()) + .map_err(logthru_rpc!())? { veilid_capnp::operation::detail::InfoA(a) => { a.map_err(map_error_internal!("Invalid InfoA"))? @@ -1450,11 +1479,13 @@ impl RPCProcessor { let response_operation = rpcreader .reader .get_root::() - .map_err(map_error_capnp_error!())?; + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; let find_node_a = match response_operation .get_detail() .which() - .map_err(map_error_capnp_notinschema!())? + .map_err(map_error_capnp_notinschema!()) + .map_err(logthru_rpc!())? { veilid_capnp::operation::detail::FindNodeA(a) => { a.map_err(map_error_internal!("Invalid FindNodeA"))? diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 01aee3d3..b3ad639c 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -586,7 +586,7 @@ impl core::str::FromStr for NodeDialInfoSingle { cur_url = &cur_url[6..]; proto = ProtocolType::WSS; } else { - return Err("unknown protocol".to_owned()); + return Err(format!("unknown protocol: {}", url)); } // parse out node id if we have one diff --git a/veilid-core/src/xx/log_thru.rs b/veilid-core/src/xx/log_thru.rs index 4125b4dd..18eca04f 100644 --- a/veilid-core/src/xx/log_thru.rs +++ b/veilid-core/src/xx/log_thru.rs @@ -1,3 +1,5 @@ +pub use alloc::string::{String, ToString}; + pub fn map_to_string(arg: X) -> String { arg.to_string() } @@ -31,16 +33,16 @@ macro_rules! log_net { "{}", $text, )}; - (error $fmt:literal, $($arg:expr)+) => { - error!(target:"net", $fmt, $($arg)+); + (error $fmt:literal, $($arg:expr),+) => { + error!(target:"net", $fmt, $($arg),+); }; ($text:expr) => {trace!( target: "net", "{}", $text, )}; - ($fmt:literal, $($arg:expr)+) => { - trace!(target:"net", $fmt, $($arg)+); + ($fmt:literal, $($arg:expr),+) => { + trace!(target:"net", $fmt, $($arg),+); } } @@ -51,16 +53,16 @@ macro_rules! log_rpc { "{}", $text, )}; - (error $fmt:literal, $($arg:expr)+) => { - error!(target:"rpc", $fmt, $($arg)+); + (error $fmt:literal, $($arg:expr),+) => { + error!(target:"rpc", $fmt, $($arg),+); }; ($text:expr) => {trace!( target: "rpc", "{}", $text, )}; - ($fmt:literal, $($arg:expr)+) => { - trace!(target:"rpc", $fmt, $($arg)+); + ($fmt:literal, $($arg:expr),+) => { + trace!(target:"rpc", $fmt, $($arg),+); } } @@ -71,16 +73,16 @@ macro_rules! log_rtab { "{}", $text, )}; - (error $fmt:literal, $($arg:expr)+) => { - error!(target:"rtab", $fmt, $($arg)+); + (error $fmt:literal, $($arg:expr),+) => { + error!(target:"rtab", $fmt, $($arg),+); }; ($text:expr) => {trace!( target: "rtab", "{}", $text, )}; - ($fmt:literal, $($arg:expr)+) => { - trace!(target:"rtab", $fmt, $($arg)+); + ($fmt:literal, $($arg:expr),+) => { + trace!(target:"rtab", $fmt, $($arg),+); } } @@ -92,8 +94,8 @@ macro_rules! logthru_net { ($($level:ident)? $text:literal) => { logthru!($($level)? "net", $text) }; - ($($level:ident)? $fmt:literal, $($arg:expr)+) => { - logthru!($($level)? "net", $fmt, $($arg)+) + ($($level:ident)? $fmt:literal, $($arg:expr),+) => { + logthru!($($level)? "net", $fmt, $($arg),+) } } #[macro_export] @@ -104,8 +106,8 @@ macro_rules! logthru_rpc { ($($level:ident)? $text:literal) => { logthru!($($level)? "rpc", $text) }; - ($($level:ident)? $fmt:literal, $($arg:expr)+) => { - logthru!($($level)? "rpc", $fmt, $($arg)+) + ($($level:ident)? $fmt:literal, $($arg:expr),+) => { + logthru!($($level)? "rpc", $fmt, $($arg),+) } } @@ -117,8 +119,8 @@ macro_rules! logthru_rtab { ($($level:ident)? $text:literal) => { logthru!($($level)? "rtab", $text) }; - ($($level:ident)? $fmt:literal, $($arg:expr)+) => { - logthru!($($level)? "rtab", $fmt, $($arg)+) + ($($level:ident)? $fmt:literal, $($arg:expr),+) => { + logthru!($($level)? "rtab", $fmt, $($arg),+) } } @@ -142,12 +144,12 @@ macro_rules! logthru { ); e__ }); - (error $target:literal, $fmt:literal, $($arg:expr)+) => (|e__| { + (error $target:literal, $fmt:literal, $($arg:expr),+) => (|e__| { error!( target: $target, concat!("[{}] ", $fmt), e__, - $($arg)+ + $($arg),+ ); e__ }); @@ -169,12 +171,12 @@ macro_rules! logthru { ); e__ }); - ($target:literal, $fmt:literal, $($arg:expr)+) => (|e__| { + ($target:literal, $fmt:literal, $($arg:expr),+) => (|e__| { trace!( target: $target, concat!("[{}] ", $fmt), e__, - $($arg)+ + $($arg),+ ); e__ }) diff --git a/veilid-core/src/xx/split_url.rs b/veilid-core/src/xx/split_url.rs index 6dfc4e02..cc38708d 100644 --- a/veilid-core/src/xx/split_url.rs +++ b/veilid-core/src/xx/split_url.rs @@ -8,6 +8,9 @@ // Only IP address and DNS hostname host fields are supported use super::IpAddr; +use alloc::borrow::ToOwned; +use alloc::string::String; +use alloc::vec::Vec; use core::fmt; use core::str::FromStr;