diff --git a/Cargo.lock b/Cargo.lock index 85b0cb11..65985865 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3651,6 +3651,7 @@ dependencies = [ "capnpc", "cfg-if 0.1.10", "chacha20poly1305", + "chrono", "config 0.11.0", "console_error_panic_hook", "curve25519-dalek-ng", diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index af8fbd4d..63d5149d 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -66,6 +66,7 @@ if-addrs = { path = "../external/if-addrs" } async_executors = { version = "^0", features = [ "async_std" ]} socket2 = "^0" bugsalot = "^0" +chrono = "^0" # Dependencies for WASM builds only [target.'cfg(target_arch = "wasm32")'.dependencies] diff --git a/veilid-core/src/intf/native/network/protocol/mod.rs b/veilid-core/src/intf/native/network/protocol/mod.rs index e42aae67..ab25d07d 100644 --- a/veilid-core/src/intf/native/network/protocol/mod.rs +++ b/veilid-core/src/intf/native/network/protocol/mod.rs @@ -14,10 +14,10 @@ impl DummyNetworkConnection { pub fn protocol_type(&self) -> ProtocolType { ProtocolType::UDP } - pub fn send(&self, _message: Vec) -> SystemPinBoxFuture> { + pub fn send(&self, _message: Vec) -> SystemPinBoxFuture> { Box::pin(async { Ok(()) }) } - pub fn recv(&self) -> SystemPinBoxFuture, ()>> { + pub fn recv(&self) -> SystemPinBoxFuture, String>> { Box::pin(async { Ok(Vec::new()) }) } } @@ -42,7 +42,7 @@ impl NetworkConnection { Self::Wss(w) => w.protocol_type(), } } - pub fn send(&self, message: Vec) -> SystemPinBoxFuture> { + pub fn send(&self, message: Vec) -> SystemPinBoxFuture> { match self { Self::Dummy(d) => d.send(message), Self::RawTcp(t) => t.send(message), @@ -51,7 +51,7 @@ impl NetworkConnection { Self::Wss(w) => w.send(message), } } - pub fn recv(&self) -> SystemPinBoxFuture, ()>> { + pub fn recv(&self) -> SystemPinBoxFuture, String>> { match self { Self::Dummy(d) => d.recv(), Self::RawTcp(t) => t.recv(), diff --git a/veilid-core/src/intf/native/network/protocol/tcp.rs b/veilid-core/src/intf/native/network/protocol/tcp.rs index d899d4c2..d19eb5e9 100644 --- a/veilid-core/src/intf/native/network/protocol/tcp.rs +++ b/veilid-core/src/intf/native/network/protocol/tcp.rs @@ -49,40 +49,58 @@ impl RawTcpNetworkConnection { ProtocolType::TCP } - pub fn send(&self, message: Vec) -> SystemPinBoxFuture> { + pub fn send(&self, message: Vec) -> SystemPinBoxFuture> { let inner = self.inner.clone(); Box::pin(async move { if message.len() > MAX_MESSAGE_SIZE { - return Err(()); + return Err("sending too large TCP message".to_owned()); } let len = message.len() as u16; let header = [b'V', b'L', len as u8, (len >> 8) as u8]; let mut inner = inner.lock().await; - inner.stream.write_all(&header).await.map_err(drop)?; - inner.stream.write_all(&message).await.map_err(drop) + inner + .stream + .write_all(&header) + .await + .map_err(map_to_string) + .map_err(logthru_net!())?; + inner + .stream + .write_all(&message) + .await + .map_err(map_to_string) + .map_err(logthru_net!()) }) } - pub fn recv(&self) -> SystemPinBoxFuture, ()>> { + pub fn recv(&self) -> SystemPinBoxFuture, String>> { let inner = self.inner.clone(); Box::pin(async move { let mut header = [0u8; 4]; let mut inner = inner.lock().await; - inner.stream.read_exact(&mut header).await.map_err(drop)?; + inner + .stream + .read_exact(&mut header) + .await + .map_err(|e| format!("TCP recv error: {}", e))?; if header[0] != b'V' || header[1] != b'L' { - return Err(()); + return Err("received invalid TCP frame header".to_owned()); } let len = ((header[3] as usize) << 8) | (header[2] as usize); if len > MAX_MESSAGE_SIZE { - return Err(()); + return Err("received too large TCP frame".to_owned()); } let mut out: Vec = vec![0u8; len]; - inner.stream.read_exact(&mut out).await.map_err(drop)?; + inner + .stream + .read_exact(&mut out) + .await + .map_err(map_to_string)?; Ok(out) }) } @@ -130,7 +148,8 @@ impl RawTcpProtocolHandler { let peeklen = stream .peek(&mut peekbuf) .await - .map_err(|e| format!("could not peek tcp stream: {}", e))?; + .map_err(map_to_string) + .map_err(logthru_net!("could not peek tcp stream"))?; assert_eq!(peeklen, PEEK_DETECT_LEN); let conn = NetworkConnection::RawTcp(RawTcpNetworkConnection::new(stream)); @@ -159,20 +178,21 @@ impl RawTcpProtocolHandler { // for hole-punch compatibility let domain = Domain::for_address(remote_socket_addr); let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)) - .map_err(|e| format!("could not create tcp socket: {}", e))?; + .map_err(map_to_string) + .map_err(logthru_net!())?; if let Err(e) = socket.set_linger(None) { - warn!("Couldn't set TCP linger: {}", e); + log_net!("Couldn't set TCP linger: {}", e); } if let Err(e) = socket.set_nodelay(true) { - warn!("Couldn't set TCP nodelay: {}", e); + log_net!("Couldn't set TCP nodelay: {}", e); } if let Err(e) = socket.set_reuse_address(true) { - warn!("Couldn't set reuse address: {}", e); + log_net!("Couldn't set reuse address: {}", e); } cfg_if! { if #[cfg(unix)] { if let Err(e) = socket.set_reuse_port(true) { - warn!("Couldn't set reuse port: {}", e); + log_net!("Couldn't set reuse port: {}", e); } } } @@ -181,7 +201,7 @@ impl RawTcpProtocolHandler { if let Some(some_local_addr) = preferred_local_address { let socket2_addr = socket2::SockAddr::from(some_local_addr); if let Err(e) = socket.bind(&socket2_addr) { - warn!("failed to bind TCP socket: {}", e); + log_net!(error "failed to bind TCP socket: {}", e); } } @@ -189,14 +209,16 @@ impl RawTcpProtocolHandler { let remote_socket2_addr = socket2::SockAddr::from(remote_socket_addr); socket .connect(&remote_socket2_addr) - .map_err(|e| format!("couln't connect tcp: {}", e))?; + .map_err(map_to_string) + .map_err(logthru_net!("addr={}", remote_socket_addr))?; let std_stream: std::net::TcpStream = socket.into(); let ts = TcpStream::from(std_stream); // See what local address we ended up with and turn this into a stream let local_address = ts .local_addr() - .map_err(|e| format!("couldn't get local address for tcp socket: {}", e))?; + .map_err(map_to_string) + .map_err(logthru_net!())?; let ps = AsyncPeekStream::new(ts); let peer_addr = PeerAddress::new( Address::from_socket_addr(remote_socket_addr).to_canonical(), @@ -215,9 +237,12 @@ impl RawTcpProtocolHandler { Ok(conn) } - pub async fn send_unbound_message(data: Vec, socket_addr: SocketAddr) -> Result<(), ()> { + pub async fn send_unbound_message( + data: Vec, + socket_addr: SocketAddr, + ) -> Result<(), String> { if data.len() > MAX_MESSAGE_SIZE { - return Err(()); + return Err("sending too large unbound TCP message".to_owned()); } trace!( "sending unbound message of length {} to {}", @@ -225,8 +250,10 @@ impl RawTcpProtocolHandler { socket_addr ); - let mut stream = TcpStream::connect(socket_addr).await.map_err(drop)?; - stream.write_all(&data).await.map_err(drop) + let mut stream = TcpStream::connect(socket_addr) + .await + .map_err(|e| format!("{}", e))?; + stream.write_all(&data).await.map_err(|e| format!("{}", e)) } } diff --git a/veilid-core/src/intf/native/network/protocol/udp.rs b/veilid-core/src/intf/native/network/protocol/udp.rs index c0db1ef4..0b5e2af0 100644 --- a/veilid-core/src/intf/native/network/protocol/udp.rs +++ b/veilid-core/src/intf/native/network/protocol/udp.rs @@ -30,9 +30,9 @@ impl RawUdpProtocolHandler { } } - pub async fn on_message(&self, data: &[u8], remote_addr: SocketAddr) -> Result { + pub async fn on_message(&self, data: &[u8], remote_addr: SocketAddr) -> Result { if data.len() > MAX_MESSAGE_SIZE { - return Err(()); + return Err("received too large UDP message".to_owned()); } trace!( @@ -52,7 +52,7 @@ impl RawUdpProtocolHandler { remote_addr.port(), ProtocolType::UDP, ); - let local_socket_addr = socket.local_addr().map_err(drop)?; + let local_socket_addr = socket.local_addr().map_err(|e| format!("{}", e))?; network_manager .on_recv_envelope( data, @@ -61,9 +61,9 @@ impl RawUdpProtocolHandler { .await } - pub async fn send_message(&self, data: Vec, socket_addr: SocketAddr) -> Result<(), ()> { + pub async fn send_message(&self, data: Vec, socket_addr: SocketAddr) -> Result<(), String> { if data.len() > MAX_MESSAGE_SIZE { - return Err(()); + return Err("sending too large UDP message".to_owned()); } trace!( @@ -73,18 +73,25 @@ impl RawUdpProtocolHandler { ); let socket = self.inner.lock().socket.clone(); - let len = socket.send_to(&data, socket_addr).await.map_err(drop)?; + let len = socket + .send_to(&data, socket_addr) + .await + .map_err(|e| format!("{}", e))?; if len != data.len() { - Err(()) + Err("UDP partial send".to_owned()) } else { Ok(()) } } - pub async fn send_unbound_message(data: Vec, socket_addr: SocketAddr) -> Result<(), ()> { + pub async fn send_unbound_message( + data: Vec, + socket_addr: SocketAddr, + ) -> Result<(), String> { if data.len() > MAX_MESSAGE_SIZE { - return Err(()); + return Err("sending too large unbound UDP message".to_owned()); } + xxx continue here trace!( "sending unbound message of length {} to {}", data.len(), @@ -98,10 +105,15 @@ impl RawUdpProtocolHandler { SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 0) } }; - let socket = UdpSocket::bind(local_socket_addr).await.map_err(drop)?; - let len = socket.send_to(&data, socket_addr).await.map_err(drop)?; + let socket = UdpSocket::bind(local_socket_addr) + .await + .map_err(|e| format!("{}", e))?; + let len = socket + .send_to(&data, socket_addr) + .await + .map_err(|e| format!("{}", e))?; if len != data.len() { - Err(()) + Err("UDP partial unbound send".to_owned()) } else { Ok(()) } diff --git a/veilid-core/src/intf/native/network/protocol/ws.rs b/veilid-core/src/intf/native/network/protocol/ws.rs index 7468795b..ff3dcb08 100644 --- a/veilid-core/src/intf/native/network/protocol/ws.rs +++ b/veilid-core/src/intf/native/network/protocol/ws.rs @@ -87,22 +87,23 @@ where ProtocolType::WS } } - pub fn send(&self, message: Vec) -> SystemPinBoxFuture> { + + pub fn send(&self, message: Vec) -> SystemPinBoxFuture> { let inner = self.inner.clone(); Box::pin(async move { if message.len() > MAX_MESSAGE_SIZE { - return Err(()); + return Err("received too large WS message".to_owned()); } let mut inner = inner.lock().await; inner .ws_stream .send(Message::binary(message)) .await - .map_err(drop) + .map_err(map_to_string) }) } - pub fn recv(&self) -> SystemPinBoxFuture, ()>> { + pub fn recv(&self) -> SystemPinBoxFuture, String>> { let inner = self.inner.clone(); Box::pin(async move { @@ -110,13 +111,18 @@ where let out = match inner.ws_stream.next().await { Some(Ok(Message::Binary(v))) => v, - _ => { - trace!("websocket recv failed"); - return Err(()); + Some(Ok(_)) => { + return Err("Unexpected WS message type".to_owned()); + } + Some(Err(e)) => { + return Err(e.to_string()); + } + None => { + return Err("WS stream closed".to_owned()); } }; if out.len() > MAX_MESSAGE_SIZE { - Err(()) + Err("sending too large WS message".to_owned()) } else { Ok(out) } @@ -193,17 +199,15 @@ impl WebsocketProtocolHandler { && peekbuf[request_path_len - 1] == b' ')); if !matches_path { - trace!("not websocket"); + log_net!("not websocket"); return Ok(false); } - trace!("found websocket"); + log_net!("found websocket"); - let ws_stream = match accept_async(ps).await { - Ok(s) => s, - Err(e) => { - return Err(format!("failed websockets handshake: {:?}", e)); - } - }; + let ws_stream = accept_async(ps) + .await + .map_err(map_to_string) + .map_err(logthru_net!("failed websockets handshake"))?; // Wrap the websocket in a NetworkConnection and register it let protocol_type = if self.inner.tls { diff --git a/veilid-core/src/intf/native/system.rs b/veilid-core/src/intf/native/system.rs index 1d852402..068438e3 100644 --- a/veilid-core/src/intf/native/system.rs +++ b/veilid-core/src/intf/native/system.rs @@ -11,6 +11,11 @@ pub fn get_timestamp() -> u64 { } } +pub fn get_timestamp_string() -> String { + let dt = chrono::Utc::now(); + dt.time().format("%H:%M:%S.3f").to_string() +} + pub fn random_bytes(dest: &mut [u8]) -> Result<(), String> { let mut rng = rand::thread_rng(); rng.try_fill_bytes(dest).map_err(|err| format!("{:?}", err)) diff --git a/veilid-core/src/intf/wasm/network/protocol/mod.rs b/veilid-core/src/intf/wasm/network/protocol/mod.rs index b07ea49f..7cd6a03f 100644 --- a/veilid-core/src/intf/wasm/network/protocol/mod.rs +++ b/veilid-core/src/intf/wasm/network/protocol/mod.rs @@ -33,13 +33,13 @@ impl NetworkConnection { Self::WS(w) => w.protocol_type(), } } - pub fn send(&self, message: Vec) -> SystemPinBoxFuture> { + pub fn send(&self, message: Vec) -> SystemPinBoxFuture> { match self { Self::Dummy(d) => d.send(message), Self::WS(w) => w.send(message), } } - pub fn recv(&self) -> SystemPinBoxFuture, ()>> { + pub fn recv(&self) -> SystemPinBoxFuture, String>> { match self { Self::Dummy(d) => d.recv(), Self::WS(w) => w.recv(), diff --git a/veilid-core/src/intf/wasm/network/protocol/ws.rs b/veilid-core/src/intf/wasm/network/protocol/ws.rs index 3577ce24..536d7330 100644 --- a/veilid-core/src/intf/wasm/network/protocol/ws.rs +++ b/veilid-core/src/intf/wasm/network/protocol/ws.rs @@ -52,27 +52,32 @@ impl WebsocketNetworkConnection { ProtocolType::WS } } - pub fn send(&self, message: Vec) -> SystemPinBoxFuture> { + pub fn send(&self, message: Vec) -> SystemPinBoxFuture> { let inner = self.inner.clone(); Box::pin(async move { if message.len() > MAX_MESSAGE_SIZE { - return Err(()); + return Err("sending too large WS message".to_owned()); } - inner.lock().ws.send_with_u8_array(&message).map_err(drop) + inner.lock().ws.send_with_u8_array(&message).map_err(map_to_string) }) } - pub fn recv(&self) -> SystemPinBoxFuture, ()>> { + pub fn recv(&self) -> SystemPinBoxFuture, String)>> { let inner = self.inner.clone(); Box::pin(async move { let out = match inner.lock().ws_stream.next().await { Some(WsMessage::Binary(v)) => v, - _ => { - trace!("websocket recv failed"); - return Err(()); + Some(_) => { + return Err("Unexpected WS message type".to_owned()); + } + Some(Err(e)) => { + return Err(|e| e.to_string()); + } + None => { + return Err("WS stream closed".to_owned()); } }; if out.len() > MAX_MESSAGE_SIZE { - Err(()) + Err("sending too large WS message".to_owned()) } else { Ok(out) } diff --git a/veilid-core/src/intf/wasm/system.rs b/veilid-core/src/intf/wasm/system.rs index 443ad34b..41547b7f 100644 --- a/veilid-core/src/intf/wasm/system.rs +++ b/veilid-core/src/intf/wasm/system.rs @@ -28,6 +28,18 @@ pub fn get_timestamp() -> u64 { } } +pub fn get_timestamp_string() -> String { + let date = Date::now(); + let hours = Date::get_utc_hours(date); + let minutes = Date::get_utc_minutes(date); + let seconds = Date::get_utc_seconds(date); + let milliseconds = Date::get_utc_milliseconds(date); + format!( + "{:02}:{:02}:{:02}.{}", + hours, minutes, seconds, milliseconds + ) +} + pub fn random_bytes(dest: &mut [u8]) -> Result<(), String> { let len = dest.len(); let u32len = len / 4; diff --git a/veilid-core/src/network_manager.rs b/veilid-core/src/network_manager.rs index 3275284b..ca6931e2 100644 --- a/veilid-core/src/network_manager.rs +++ b/veilid-core/src/network_manager.rs @@ -275,14 +275,14 @@ impl NetworkManager { .lock() .connection_add_channel_tx .as_ref() - .ok_or_else(|| "connection channel isn't open yet".to_owned())? + .ok_or_else(fn_string!("connection channel isn't open yet"))? .clone(); let this = self.clone(); let receiver_loop_future = Self::process_connection(this, descriptor, conn); - Ok(tx - .try_send(receiver_loop_future) + tx.try_send(receiver_loop_future) .await - .map_err(|_| "connection loop stopped".to_owned())?) + .map_err(map_to_string) + .map_err(logthru_net!(error "failed to start receiver loop")) } // Connection receiver loop @@ -299,7 +299,7 @@ impl NetworkManager { { Ok(e) => e, Err(err) => { - error!("{}", err); + error!(target: "net", "{}", err); return; } }; @@ -323,7 +323,10 @@ impl NetworkManager { }; match this.on_recv_envelope(message.as_slice(), &descriptor).await { Ok(_) => (), - Err(_) => break, + Err(e) => { + error!("{}", e); + break; + } }; } @@ -527,19 +530,16 @@ impl NetworkManager { &self, data: &[u8], descriptor: &ConnectionDescriptor, - ) -> Result { + ) -> Result { // Is this an out-of-band receipt instead of an envelope? if data[0..4] == *RECEIPT_MAGIC { - self.process_receipt(data).await.map_err(|s| { - trace!("receipt failed to process: {}", s); - })?; + self.process_receipt(data).await?; return Ok(true); } // Decode envelope header - let envelope = Envelope::from_data(data).map_err(|_| { - trace!("envelope failed to decode"); - })?; + let envelope = + Envelope::from_data(data).map_err(|_| "envelope failed to decode".to_owned())?; // Get routing table and rpc processor let (routing_table, lease_manager, rpc) = { @@ -560,22 +560,22 @@ impl NetworkManager { if !lease_manager.server_has_valid_relay_lease(&recipient_id) && !lease_manager.server_has_valid_relay_lease(&sender_id) { - trace!("received envelope not intended for this node"); - return Err(()); + return Err("received envelope not intended for this node".to_owned()); } // Resolve the node to send this to - let relay_nr = rpc.resolve_node(recipient_id).await.map_err(|_| { - trace!("failed to resolve recipient node for relay, dropping packet..."); + let relay_nr = rpc.resolve_node(recipient_id).await.map_err(|e| { + format!( + "failed to resolve recipient node for relay, dropping packet...: {:?}", + e + ) })?; // Re-send the packet to the leased node self.net() .send_data(relay_nr, data.to_vec()) .await - .map_err(|_| { - trace!("failed to forward envelope"); - })?; + .map_err(|e| format!("failed to forward envelope: {}", e))?; // Inform caller that we dealt with the envelope, but did not process it locally return Ok(false); } @@ -585,7 +585,9 @@ impl NetworkManager { // Decrypt the envelope body // xxx: punish nodes that send messages that fail to decrypt eventually - let body = envelope.decrypt_body(self.crypto(), data, &node_id_secret)?; + let body = envelope + .decrypt_body(self.crypto(), data, &node_id_secret) + .map_err(|_| "failed to decrypt envelope body".to_owned())?; // Get timestamp range let (tsbehind, tsahead) = { @@ -601,20 +603,18 @@ impl NetworkManager { let ets = envelope.get_timestamp(); if let Some(tsbehind) = tsbehind { if tsbehind > 0 && (ts > ets && ts - ets > tsbehind) { - trace!( + return Err(format!( "envelope time was too far in the past: {}ms ", timestamp_to_secs(ts - ets) * 1000f64 - ); - return Err(()); + )); } } if let Some(tsahead) = tsahead { if tsahead > 0 && (ts < ets && ets - ts > tsahead) { - trace!( + return Err(format!( "envelope time was too far in the future: {}ms", timestamp_to_secs(ets - ts) * 1000f64 - ); - return Err(()); + )); } } @@ -625,9 +625,7 @@ impl NetworkManager { descriptor.clone(), ts, ) - .map_err(|_| { - trace!("node id registration failed"); - })?; + .map_err(|e| format!("node id registration failed: {}", e))?; source_noderef.operate(|e| e.set_min_max_version(envelope.get_min_max_version())); // xxx: deal with spoofing and flooding here? @@ -635,9 +633,7 @@ impl NetworkManager { // Pass message to RPC system rpc.enqueue_message(envelope, body, source_noderef) .await - .map_err(|_| { - trace!("enqueing rpc message failed"); - })?; + .map_err(|e| format!("enqueing rpc message failed: {}", e))?; // Inform caller that we dealt with the envelope locally Ok(true) diff --git a/veilid-core/src/rpc_processor/debug.rs b/veilid-core/src/rpc_processor/debug.rs new file mode 100644 index 00000000..b63c1fab --- /dev/null +++ b/veilid-core/src/rpc_processor/debug.rs @@ -0,0 +1,282 @@ +use super::*; + +#[derive(Debug, Clone, PartialOrd, PartialEq, Eq, Ord)] +pub enum RPCError { + Timeout, + InvalidFormat, + Unimplemented(String), + Protocol(String), + Internal(String), +} +pub fn rpc_error_internal>(x: T) -> RPCError { + error!("RPCError Internal: {}", x.as_ref()); + RPCError::Internal(x.as_ref().to_owned()) +} +pub fn rpc_error_capnp_error(e: capnp::Error) -> RPCError { + error!("RPCError Protocol: {}", &e.description); + RPCError::Protocol(e.description) +} +pub fn rpc_error_capnp_notinschema(e: capnp::NotInSchema) -> RPCError { + error!("RPCError Protocol: not in schema: {}", &e.0); + RPCError::Protocol(format!("not in schema: {}", &e.0)) +} +pub fn rpc_error_unimplemented>(x: T) -> RPCError { + error!("RPCError Unimplemented: {}", x.as_ref()); + RPCError::Unimplemented(x.as_ref().to_owned()) +} + +impl fmt::Display for RPCError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RPCError::Timeout => write!(f, "[RPCError: Timeout]"), + RPCError::InvalidFormat => write!(f, "[RPCError: InvalidFormat]"), + RPCError::Unimplemented(s) => write!(f, "[RPCError: Unimplemented({})]", s), + RPCError::Protocol(s) => write!(f, "[RPCError: Protocol({})]", s), + RPCError::Internal(s) => write!(f, "[RPCError: Internal({})]", s), + } + } +} + +#[macro_export] +macro_rules! map_error_internal { + ($x:expr) => { + |_| rpc_error_internal($x) + }; +} +#[macro_export] +macro_rules! map_error_string { + () => { + |s| rpc_error_internal(&s) + }; +} + +#[macro_export] +macro_rules! map_error_capnp_error { + () => { + |e| rpc_error_capnp_error(e) + }; +} + +#[macro_export] +macro_rules! map_error_capnp_notinschema { + () => { + |e| rpc_error_capnp_notinschema(e) + }; +} + +#[macro_export] +macro_rules! map_error_panic { + () => { + |_| panic!("oops") + }; +} + +impl RPCProcessor { + pub(super) fn get_rpc_request_debug_info( + &self, + dest: &Destination, + message: &capnp::message::Reader, + safety_route_spec: &Option<&SafetyRouteSpec>, + ) -> String { + format!( + "REQ->{:?}{} {}", + dest, + match safety_route_spec { + None => "".to_owned(), + Some(srs) => format!("[{:?}]", srs), + }, + self.get_rpc_message_debug_info(message) + ) + } + pub(super) fn get_rpc_reply_debug_info( + &self, + request_rpcreader: &RPCMessageReader, + reply_msg: &capnp::message::Reader, + safety_route_spec: &Option<&SafetyRouteSpec>, + ) -> String { + let request_operation = match request_rpcreader + .reader + .get_root::() + { + Ok(v) => v, + Err(e) => { + return format!("invalid operation: {}", e); + } + }; + + let respond_to = match request_operation.get_respond_to().which() { + Ok(v) => v, + Err(e) => { + return format!("(respond_to not in schema: {:?})", e); + } + }; + let respond_to_str = match respond_to { + veilid_capnp::operation::respond_to::None(_) => "(None)".to_owned(), + veilid_capnp::operation::respond_to::Sender(_) => "Sender".to_owned(), + veilid_capnp::operation::respond_to::PrivateRoute(pr) => { + let pr_reader = match pr { + Ok(prr) => prr, + Err(e) => { + return e.to_string(); + } + }; + let private_route = match decode_private_route(&pr_reader) { + Ok(pr) => pr, + Err(e) => { + return e.to_string(); + } + }; + format!("[PR:{:?}]", private_route) + } + }; + format!( + "REPLY->{:?}{} {}", + respond_to_str, + match safety_route_spec { + None => "".to_owned(), + Some(srs) => format!("[SR:{:?}]", srs), + }, + self.get_rpc_message_debug_info(reply_msg) + ) + } + + pub(super) fn get_rpc_message_debug_info( + &self, + message: &capnp::message::Reader, + ) -> String { + let operation = match message.get_root::() { + Ok(v) => v, + Err(e) => { + return format!("invalid operation: {}", e); + } + }; + let op_id = operation.get_op_id(); + let detail = match operation.get_detail().which() { + Ok(v) => v, + Err(e) => { + return format!("(operation detail not in schema: {})", e); + } + }; + format!( + "#{} {}", + op_id, + self.get_rpc_operation_detail_debug_info(&detail) + ) + } + + #[allow(clippy::useless_format)] + pub(super) fn get_rpc_operation_detail_debug_info( + &self, + detail: &veilid_capnp::operation::detail::WhichReader, + ) -> String { + match detail { + veilid_capnp::operation::detail::InfoQ(_) => { + format!("InfoQ") + } + veilid_capnp::operation::detail::InfoA(_) => { + format!("InfoA") + } + veilid_capnp::operation::detail::ValidateDialInfo(_) => { + format!("ValidateDialInfo") + } + veilid_capnp::operation::detail::FindNodeQ(d) => { + let fnqr = match d { + Ok(fnqr) => fnqr, + Err(e) => { + return format!("(invalid detail: {})", e); + } + }; + let nidr = match fnqr.get_node_id() { + Ok(nidr) => nidr, + Err(e) => { + return format!("(invalid node id: {})", e); + } + }; + let pir = match fnqr.get_peer_info() { + Ok(pir) => pir, + Err(e) => { + return format!("(invalid peer_info: {})", e); + } + }; + let node_id = decode_public_key(&nidr); + let peer_info = match decode_peer_info(&pir) { + Ok(pi) => pi, + Err(e) => { + return e.to_string(); + } + }; + format!( + "FindNodeQ: node_id={} peer_info={:?}", + node_id.encode(), + peer_info + ) + } + veilid_capnp::operation::detail::FindNodeA(_) => { + format!("FindNodeA") + } + veilid_capnp::operation::detail::Route(_) => { + format!("Route") + } + veilid_capnp::operation::detail::GetValueQ(_) => { + format!("GetValueQ") + } + veilid_capnp::operation::detail::GetValueA(_) => { + format!("GetValueA") + } + veilid_capnp::operation::detail::SetValueQ(_) => { + format!("SetValueQ") + } + veilid_capnp::operation::detail::SetValueA(_) => { + format!("SetValueA") + } + veilid_capnp::operation::detail::WatchValueQ(_) => { + format!("WatchValueQ") + } + veilid_capnp::operation::detail::WatchValueA(_) => { + format!("WatchValueA") + } + veilid_capnp::operation::detail::ValueChanged(_) => { + format!("ValueChanged") + } + veilid_capnp::operation::detail::SupplyBlockQ(_) => { + format!("SupplyBlockQ") + } + veilid_capnp::operation::detail::SupplyBlockA(_) => { + format!("SupplyBlockA") + } + veilid_capnp::operation::detail::FindBlockQ(_) => { + format!("FindBlockQ") + } + veilid_capnp::operation::detail::FindBlockA(_) => { + format!("FindBlockA") + } + veilid_capnp::operation::detail::SignalQ(_) => { + format!("SignalQ") + } + veilid_capnp::operation::detail::SignalA(_) => { + format!("SignalA") + } + veilid_capnp::operation::detail::ReturnReceipt(_) => { + format!("ReturnReceipt") + } + veilid_capnp::operation::detail::StartTunnelQ(_) => { + format!("StartTunnelQ") + } + veilid_capnp::operation::detail::StartTunnelA(_) => { + format!("StartTunnelA") + } + veilid_capnp::operation::detail::CompleteTunnelQ(_) => { + format!("CompleteTunnelQ") + } + veilid_capnp::operation::detail::CompleteTunnelA(_) => { + format!("CompleteTunnelA") + } + veilid_capnp::operation::detail::CancelTunnelQ(_) => { + format!("CancelTunnelQ") + } + veilid_capnp::operation::detail::CancelTunnelA(_) => { + format!("CancelTunnelA") + } + } + } +} diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index b845c4ea..45ba5785 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -1,4 +1,9 @@ mod coders; +mod debug; +mod private_route; + +pub use debug::*; +pub use private_route::*; use crate::dht::*; use crate::intf::utils::channel::*; @@ -18,77 +23,6 @@ use routing_table::*; type OperationId = u64; -#[derive(Debug, Clone, PartialOrd, PartialEq, Eq, Ord)] -pub enum RPCError { - Timeout, - InvalidFormat, - Unimplemented(String), - Protocol(String), - Internal(String), -} -pub fn rpc_error_internal>(x: T) -> RPCError { - error!("RPCError Internal: {}", x.as_ref()); - RPCError::Internal(x.as_ref().to_owned()) -} -pub fn rpc_error_capnp_error(e: capnp::Error) -> RPCError { - error!("RPCError Protocol: {}", &e.description); - RPCError::Protocol(e.description) -} -pub fn rpc_error_capnp_notinschema(e: capnp::NotInSchema) -> RPCError { - error!("RPCError Protocol: not in schema: {}", &e.0); - RPCError::Protocol(format!("not in schema: {}", &e.0)) -} -pub fn rpc_error_unimplemented>(x: T) -> RPCError { - error!("RPCError Unimplemented: {}", x.as_ref()); - RPCError::Unimplemented(x.as_ref().to_owned()) -} - -impl fmt::Display for RPCError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - RPCError::Timeout => write!(f, "[RPCError: Timeout]"), - RPCError::InvalidFormat => write!(f, "[RPCError: InvalidFormat]"), - RPCError::Unimplemented(s) => write!(f, "[RPCError: Unimplemented({})]", s), - RPCError::Protocol(s) => write!(f, "[RPCError: Protocol({})]", s), - RPCError::Internal(s) => write!(f, "[RPCError: Internal({})]", s), - } - } -} - -#[macro_export] -macro_rules! map_error_internal { - ($x:expr) => { - |_| rpc_error_internal($x) - }; -} -#[macro_export] -macro_rules! map_error_string { - () => { - |s| rpc_error_internal(&s) - }; -} - -#[macro_export] -macro_rules! map_error_capnp_error { - () => { - |e| rpc_error_capnp_error(e) - }; -} - -#[macro_export] -macro_rules! map_error_capnp_notinschema { - () => { - |e| rpc_error_capnp_notinschema(e) - }; -} - -#[macro_export] -macro_rules! map_error_panic { - () => { - |_| panic!("oops") - }; -} - #[derive(Debug, Clone)] pub enum Destination { Direct(NodeRef), @@ -207,7 +141,7 @@ pub struct RPCProcessorInner { routing_table: RoutingTable, node_id: key::DHTKey, node_id_secret: key::DHTKeySecret, - channel: Option<(Sender, Receiver)>, + send_channel: Option>, timeout: u64, max_route_hop_count: usize, waiting_rpc_table: BTreeMap>, @@ -228,7 +162,7 @@ impl RPCProcessor { routing_table: network_manager.routing_table(), node_id: key::DHTKey::default(), node_id_secret: key::DHTKeySecret::default(), - channel: None, + send_channel: None, timeout: 10000000, max_route_hop_count: 7, waiting_rpc_table: BTreeMap::new(), @@ -327,197 +261,6 @@ impl RPCProcessor { Ok(nr) } - ////////////////////////////////////////////////////////////////////// - fn new_stub_private_route<'a, T>( - &self, - dest_node_id: key::DHTKey, - builder: &'a mut ::capnp::message::Builder, - ) -> Result, RPCError> - where - T: capnp::message::Allocator + 'a, - { - let mut pr = builder.init_root::(); - - let mut pr_pk = pr.reborrow().init_public_key(); - encode_public_key(&dest_node_id, &mut pr_pk)?; - pr.set_hop_count(0u8); - // leave firstHop as null - Ok(pr.into_reader()) - } - - fn encode_safety_route<'a>( - &self, - safety_route: &SafetyRouteSpec, - private_route: veilid_capnp::private_route::Reader<'a>, - builder: &'a mut veilid_capnp::safety_route::Builder<'a>, - ) -> Result<(), RPCError> { - // Ensure the total hop count isn't too long for our config - let pr_hopcount = private_route.get_hop_count() as usize; - let sr_hopcount = safety_route.hops.len(); - let hopcount = 1 + sr_hopcount + pr_hopcount; - if hopcount > self.inner.lock().max_route_hop_count { - return Err(rpc_error_internal("hop count too long for route")); - } - - // Build the safety route - let mut sr_pk = builder.reborrow().init_public_key(); - encode_public_key(&safety_route.public_key, &mut sr_pk)?; - - builder.set_hop_count( - u8::try_from(sr_hopcount) - .map_err(map_error_internal!("hop count too large for safety route"))?, - ); - - // Build all the hops in the safety route - let mut hops_builder = builder.reborrow().init_hops(); - if sr_hopcount == 0 { - hops_builder - .set_private(private_route) - .map_err(map_error_internal!( - "invalid private route while encoding safety route" - ))?; - } else { - // start last blob-to-encrypt data off as private route - let mut blob_data = { - let mut pr_message = ::capnp::message::Builder::new_default(); - pr_message - .set_root_canonical(private_route) - .map_err(map_error_internal!( - "invalid private route while encoding safety route" - ))?; - let mut blob_data = builder_to_vec(pr_message)?; - // append the private route tag so we know how to decode it later - blob_data.push(1u8); - blob_data - }; - - // Encode each hop from inside to outside - // skips the outermost hop since that's entering the - // safety route and does not include the dialInfo - // (outer hop is a RouteHopData, not a RouteHop). - // Each loop mutates 'nonce', and 'blob_data' - let mut nonce = Crypto::get_random_nonce(); - for h in (1..sr_hopcount).rev() { - // Get blob to encrypt for next hop - blob_data = { - // RouteHop - let mut rh_message = ::capnp::message::Builder::new_default(); - let mut rh_builder = rh_message.init_root::(); - let mut di_builder = rh_builder.reborrow().init_dial_info(); - encode_node_dial_info_single(&safety_route.hops[h].dial_info, &mut di_builder)?; - // RouteHopData - let mut rhd_builder = rh_builder.init_next_hop(); - // Add the nonce - let mut rhd_nonce = rhd_builder.reborrow().init_nonce(); - encode_nonce(&nonce, &mut rhd_nonce); - // Encrypt the previous blob ENC(nonce, DH(PKhop,SKsr)) - let dh_secret = self - .crypto - .cached_dh( - &safety_route.hops[h].dial_info.node_id.key, - &safety_route.secret_key, - ) - .map_err(map_error_internal!("dh failed"))?; - let enc_msg_data = - Crypto::encrypt(blob_data.as_slice(), &nonce, &dh_secret, None) - .map_err(map_error_internal!("encryption failed"))?; - - rhd_builder.set_blob(enc_msg_data.as_slice()); - let mut blob_data = builder_to_vec(rh_message)?; - - // append the route hop tag so we know how to decode it later - blob_data.push(0u8); - blob_data - }; - // Make another nonce for the next hop - nonce = Crypto::get_random_nonce(); - } - - // Encode first RouteHopData - let mut first_rhd_builder = hops_builder.init_data(); - let mut first_rhd_nonce = first_rhd_builder.reborrow().init_nonce(); - encode_nonce(&nonce, &mut first_rhd_nonce); - let dh_secret = self - .crypto - .cached_dh( - &safety_route.hops[0].dial_info.node_id.key, - &safety_route.secret_key, - ) - .map_err(map_error_internal!("dh failed"))?; - let enc_msg_data = Crypto::encrypt(blob_data.as_slice(), &nonce, &dh_secret, None) - .map_err(map_error_internal!("encryption failed"))?; - - first_rhd_builder.set_blob(enc_msg_data.as_slice()); - } - - Ok(()) - } - - // Wrap an operation inside a route - fn wrap_with_route<'a>( - &self, - safety_route: Option<&SafetyRouteSpec>, - private_route: veilid_capnp::private_route::Reader<'a>, - message_data: Vec, - ) -> Result, RPCError> { - // Get stuff before we lock inner - let op_id = self.get_next_op_id(); - // Encrypt routed operation - let nonce = Crypto::get_random_nonce(); - let pr_pk_reader = private_route - .get_public_key() - .map_err(map_error_internal!("public key is invalid"))?; - let pr_pk = decode_public_key(&pr_pk_reader); - let stub_safety_route = SafetyRouteSpec::new(); - let sr = safety_route.unwrap_or(&stub_safety_route); - let dh_secret = self - .crypto - .cached_dh(&pr_pk, &sr.secret_key) - .map_err(map_error_internal!("dh failed"))?; - let enc_msg_data = Crypto::encrypt(&message_data, &nonce, &dh_secret, None) - .map_err(map_error_internal!("encryption failed"))?; - - // Prepare route operation - let route_msg = { - let mut route_msg = ::capnp::message::Builder::new_default(); - let mut route_operation = route_msg.init_root::(); - - // Doesn't matter what this op id because there's no answer - // but it shouldn't conflict with any other op id either - route_operation.set_op_id(op_id); - - // Answers don't get a 'respond' - let mut respond_to = route_operation.reborrow().init_respond_to(); - respond_to.set_none(()); - - // Set up 'route' operation - let mut route = route_operation.reborrow().init_detail().init_route(); - - // Set the safety route we've constructed - let mut msg_sr = route.reborrow().init_safety_route(); - self.encode_safety_route(sr, private_route, &mut msg_sr)?; - - // Put in the encrypted operation we're routing - let mut msg_operation = route.init_operation(); - msg_operation.reborrow().init_signatures(0); - let mut route_nonce = msg_operation.reborrow().init_nonce(); - encode_nonce(&nonce, &mut route_nonce); - let data = msg_operation.reborrow().init_data( - enc_msg_data - .len() - .try_into() - .map_err(map_error_internal!("data too large"))?, - ); - data.copy_from_slice(enc_msg_data.as_slice()); - - route_msg - }; - - // Convert message to bytes and return it - let out = builder_to_vec(route_msg)?; - Ok(out) - } - // set up wait for reply fn add_op_id_waiter(&self, op_id: OperationId) -> EventualValue { let mut inner = self.inner.lock(); @@ -619,6 +362,8 @@ impl RPCProcessor { message: capnp::message::Reader, safety_route_spec: Option<&SafetyRouteSpec>, ) -> Result, RPCError> { + log_rpc!(self.get_rpc_request_debug_info(&dest, &message, &safety_route_spec)); + let (op_id, wants_answer, is_ping) = { let operation = message .get_root::() @@ -626,6 +371,7 @@ impl RPCProcessor { let op_id = operation.get_op_id(); let wants_answer = self.wants_answer(&operation)?; let is_ping = operation.get_detail().has_info_q(); + (op_id, wants_answer, is_ping) }; @@ -727,7 +473,9 @@ impl RPCProcessor { let node_ref = match out_noderef { None => { // resolve node - self.resolve_node(out_node_id).await? + self.resolve_node(out_node_id) + .await + .map_err(logthru_rpc!(error))? } Some(nr) => { // got the node in the routing table already @@ -748,6 +496,7 @@ impl RPCProcessor { .network_manager() .send_envelope(node_ref.clone(), out) .await + .map_err(logthru_rpc!(error)) .map_err(RPCError::Internal) { // Make sure to clean up op id waiter in case of error @@ -792,6 +541,8 @@ impl RPCProcessor { reply_msg: capnp::message::Reader, safety_route_spec: Option<&SafetyRouteSpec>, ) -> Result<(), RPCError> { + log_rpc!(self.get_rpc_reply_debug_info(&request_rpcreader, &reply_msg, &safety_route_spec)); + // let out_node_id; let mut out_noderef: Option = None; @@ -821,7 +572,7 @@ impl RPCProcessor { veilid_capnp::operation::respond_to::None(_) => { // Do not respond // -------------- - return Ok(()); + return Err(rpc_error_internal("no response requested")); } veilid_capnp::operation::respond_to::Sender(_) => { // Respond to envelope source node, possibly through a relay if the request arrived that way @@ -1329,7 +1080,6 @@ impl RPCProcessor { } ////////////////////////////////////////////////////////////////////// - async fn process_rpc_message_version_0(&self, msg: RPCMessage) -> Result<(), RPCError> { let reader = capnp::message::Reader::new(msg.data, Default::default()); let rpcreader = RPCMessageReader { @@ -1442,9 +1192,10 @@ impl RPCProcessor { async fn rpc_worker(self, receiver: Receiver) { while let Ok(msg) = receiver.recv().await { - if let Err(e) = self.process_rpc_message(msg).await { - error!("Couldn't process rpc message: {}", e); - } + let _ = self + .process_rpc_message(msg) + .await + .map_err(logthru_rpc!("couldn't process rpc message")); } } @@ -1479,7 +1230,7 @@ impl RPCProcessor { inner.timeout = timeout; inner.max_route_hop_count = max_route_hop_count; let channel = channel(queue_size as usize); - inner.channel = Some(channel.clone()); + inner.send_channel = Some(channel.0.clone()); // spin up N workers trace!("Spinning up {} RPC workers", concurrency); @@ -1502,8 +1253,7 @@ impl RPCProcessor { envelope: envelope::Envelope, body: Vec, peer_noderef: NodeRef, - ) -> Result<(), ()> { - trace!("enqueue_message: body len = {}", body.len()); + ) -> Result<(), String> { let msg = RPCMessage { header: RPCMessageHeader { timestamp: get_timestamp(), @@ -1515,9 +1265,12 @@ impl RPCProcessor { }; let send_channel = { let inner = self.inner.lock(); - inner.channel.as_ref().unwrap().0.clone() + inner.send_channel.as_ref().unwrap().clone() }; - send_channel.try_send(msg).await.map_err(drop)?; + send_channel + .try_send(msg) + .await + .map_err(|e| format!("failed to enqueue received RPC message: {:?}", e))?; Ok(()) } diff --git a/veilid-core/src/rpc_processor/private_route.rs b/veilid-core/src/rpc_processor/private_route.rs new file mode 100644 index 00000000..4b2fe939 --- /dev/null +++ b/veilid-core/src/rpc_processor/private_route.rs @@ -0,0 +1,194 @@ +use super::*; + +impl RPCProcessor { + ////////////////////////////////////////////////////////////////////// + pub(super) fn new_stub_private_route<'a, T>( + &self, + dest_node_id: key::DHTKey, + builder: &'a mut ::capnp::message::Builder, + ) -> Result, RPCError> + where + T: capnp::message::Allocator + 'a, + { + let mut pr = builder.init_root::(); + + let mut pr_pk = pr.reborrow().init_public_key(); + encode_public_key(&dest_node_id, &mut pr_pk)?; + pr.set_hop_count(0u8); + // leave firstHop as null + Ok(pr.into_reader()) + } + + fn encode_safety_route<'a>( + &self, + safety_route: &SafetyRouteSpec, + private_route: veilid_capnp::private_route::Reader<'a>, + builder: &'a mut veilid_capnp::safety_route::Builder<'a>, + ) -> Result<(), RPCError> { + // Ensure the total hop count isn't too long for our config + let pr_hopcount = private_route.get_hop_count() as usize; + let sr_hopcount = safety_route.hops.len(); + let hopcount = 1 + sr_hopcount + pr_hopcount; + if hopcount > self.inner.lock().max_route_hop_count { + return Err(rpc_error_internal("hop count too long for route")); + } + + // Build the safety route + let mut sr_pk = builder.reborrow().init_public_key(); + encode_public_key(&safety_route.public_key, &mut sr_pk)?; + + builder.set_hop_count( + u8::try_from(sr_hopcount) + .map_err(map_error_internal!("hop count too large for safety route"))?, + ); + + // Build all the hops in the safety route + let mut hops_builder = builder.reborrow().init_hops(); + if sr_hopcount == 0 { + hops_builder + .set_private(private_route) + .map_err(map_error_internal!( + "invalid private route while encoding safety route" + ))?; + } else { + // start last blob-to-encrypt data off as private route + let mut blob_data = { + let mut pr_message = ::capnp::message::Builder::new_default(); + pr_message + .set_root_canonical(private_route) + .map_err(map_error_internal!( + "invalid private route while encoding safety route" + ))?; + let mut blob_data = builder_to_vec(pr_message)?; + // append the private route tag so we know how to decode it later + blob_data.push(1u8); + blob_data + }; + + // Encode each hop from inside to outside + // skips the outermost hop since that's entering the + // safety route and does not include the dialInfo + // (outer hop is a RouteHopData, not a RouteHop). + // Each loop mutates 'nonce', and 'blob_data' + let mut nonce = Crypto::get_random_nonce(); + for h in (1..sr_hopcount).rev() { + // Get blob to encrypt for next hop + blob_data = { + // RouteHop + let mut rh_message = ::capnp::message::Builder::new_default(); + let mut rh_builder = rh_message.init_root::(); + let mut di_builder = rh_builder.reborrow().init_dial_info(); + encode_node_dial_info_single(&safety_route.hops[h].dial_info, &mut di_builder)?; + // RouteHopData + let mut rhd_builder = rh_builder.init_next_hop(); + // Add the nonce + let mut rhd_nonce = rhd_builder.reborrow().init_nonce(); + encode_nonce(&nonce, &mut rhd_nonce); + // Encrypt the previous blob ENC(nonce, DH(PKhop,SKsr)) + let dh_secret = self + .crypto + .cached_dh( + &safety_route.hops[h].dial_info.node_id.key, + &safety_route.secret_key, + ) + .map_err(map_error_internal!("dh failed"))?; + let enc_msg_data = + Crypto::encrypt(blob_data.as_slice(), &nonce, &dh_secret, None) + .map_err(map_error_internal!("encryption failed"))?; + + rhd_builder.set_blob(enc_msg_data.as_slice()); + let mut blob_data = builder_to_vec(rh_message)?; + + // append the route hop tag so we know how to decode it later + blob_data.push(0u8); + blob_data + }; + // Make another nonce for the next hop + nonce = Crypto::get_random_nonce(); + } + + // Encode first RouteHopData + let mut first_rhd_builder = hops_builder.init_data(); + let mut first_rhd_nonce = first_rhd_builder.reborrow().init_nonce(); + encode_nonce(&nonce, &mut first_rhd_nonce); + let dh_secret = self + .crypto + .cached_dh( + &safety_route.hops[0].dial_info.node_id.key, + &safety_route.secret_key, + ) + .map_err(map_error_internal!("dh failed"))?; + let enc_msg_data = Crypto::encrypt(blob_data.as_slice(), &nonce, &dh_secret, None) + .map_err(map_error_internal!("encryption failed"))?; + + first_rhd_builder.set_blob(enc_msg_data.as_slice()); + } + + Ok(()) + } + + // Wrap an operation inside a route + pub(super) fn wrap_with_route<'a>( + &self, + safety_route: Option<&SafetyRouteSpec>, + private_route: veilid_capnp::private_route::Reader<'a>, + message_data: Vec, + ) -> Result, RPCError> { + // Get stuff before we lock inner + let op_id = self.get_next_op_id(); + // Encrypt routed operation + let nonce = Crypto::get_random_nonce(); + let pr_pk_reader = private_route + .get_public_key() + .map_err(map_error_internal!("public key is invalid"))?; + let pr_pk = decode_public_key(&pr_pk_reader); + let stub_safety_route = SafetyRouteSpec::new(); + let sr = safety_route.unwrap_or(&stub_safety_route); + let dh_secret = self + .crypto + .cached_dh(&pr_pk, &sr.secret_key) + .map_err(map_error_internal!("dh failed"))?; + let enc_msg_data = Crypto::encrypt(&message_data, &nonce, &dh_secret, None) + .map_err(map_error_internal!("encryption failed"))?; + + // Prepare route operation + let route_msg = { + let mut route_msg = ::capnp::message::Builder::new_default(); + let mut route_operation = route_msg.init_root::(); + + // Doesn't matter what this op id because there's no answer + // but it shouldn't conflict with any other op id either + route_operation.set_op_id(op_id); + + // Answers don't get a 'respond' + let mut respond_to = route_operation.reborrow().init_respond_to(); + respond_to.set_none(()); + + // Set up 'route' operation + let mut route = route_operation.reborrow().init_detail().init_route(); + + // Set the safety route we've constructed + let mut msg_sr = route.reborrow().init_safety_route(); + self.encode_safety_route(sr, private_route, &mut msg_sr)?; + + // Put in the encrypted operation we're routing + let mut msg_operation = route.init_operation(); + msg_operation.reborrow().init_signatures(0); + let mut route_nonce = msg_operation.reborrow().init_nonce(); + encode_nonce(&nonce, &mut route_nonce); + let data = msg_operation.reborrow().init_data( + enc_msg_data + .len() + .try_into() + .map_err(map_error_internal!("data too large"))?, + ); + data.copy_from_slice(enc_msg_data.as_slice()); + + route_msg + }; + + // Convert message to bytes and return it + let out = builder_to_vec(route_msg)?; + Ok(out) + } +} diff --git a/veilid-core/src/xx/log_thru.rs b/veilid-core/src/xx/log_thru.rs new file mode 100644 index 00000000..4125b4dd --- /dev/null +++ b/veilid-core/src/xx/log_thru.rs @@ -0,0 +1,181 @@ +pub fn map_to_string(arg: X) -> String { + arg.to_string() +} + +/* +trait LogThru { + fn log_thru F>(self, op: O) -> Result; +} + +impl LogThru for Result { + fn log_thru F>(self, op: O) -> Result { + match self { + Ok(t) => Ok(t), + Err(e) => Err(op(e)), + } + } +} +*/ + +#[macro_export] +macro_rules! fn_string { + ($text:expr) => { + || $text.to_string() + }; +} + +#[macro_export] +macro_rules! log_net { + (error $text:expr) => {error!( + target: "net", + "{}", + $text, + )}; + (error $fmt:literal, $($arg:expr)+) => { + error!(target:"net", $fmt, $($arg)+); + }; + ($text:expr) => {trace!( + target: "net", + "{}", + $text, + )}; + ($fmt:literal, $($arg:expr)+) => { + trace!(target:"net", $fmt, $($arg)+); + } +} + +#[macro_export] +macro_rules! log_rpc { + (error $text:expr) => { error!( + target: "rpc", + "{}", + $text, + )}; + (error $fmt:literal, $($arg:expr)+) => { + error!(target:"rpc", $fmt, $($arg)+); + }; + ($text:expr) => {trace!( + target: "rpc", + "{}", + $text, + )}; + ($fmt:literal, $($arg:expr)+) => { + trace!(target:"rpc", $fmt, $($arg)+); + } +} + +#[macro_export] +macro_rules! log_rtab { + (error $text:expr) => { error!( + target: "rtab", + "{}", + $text, + )}; + (error $fmt:literal, $($arg:expr)+) => { + error!(target:"rtab", $fmt, $($arg)+); + }; + ($text:expr) => {trace!( + target: "rtab", + "{}", + $text, + )}; + ($fmt:literal, $($arg:expr)+) => { + trace!(target:"rtab", $fmt, $($arg)+); + } +} + +#[macro_export] +macro_rules! logthru_net { + ($($level:ident)?) => { + logthru!($($level)? "net") + }; + ($($level:ident)? $text:literal) => { + logthru!($($level)? "net", $text) + }; + ($($level:ident)? $fmt:literal, $($arg:expr)+) => { + logthru!($($level)? "net", $fmt, $($arg)+) + } +} +#[macro_export] +macro_rules! logthru_rpc { + ($($level:ident)?) => { + logthru!($($level)? "rpc") + }; + ($($level:ident)? $text:literal) => { + logthru!($($level)? "rpc", $text) + }; + ($($level:ident)? $fmt:literal, $($arg:expr)+) => { + logthru!($($level)? "rpc", $fmt, $($arg)+) + } +} + +#[macro_export] +macro_rules! logthru_rtab { + ($($level:ident)?) => { + logthru!($($level)? "rtab") + }; + ($($level:ident)? $text:literal) => { + logthru!($($level)? "rtab", $text) + }; + ($($level:ident)? $fmt:literal, $($arg:expr)+) => { + logthru!($($level)? "rtab", $fmt, $($arg)+) + } +} + +#[macro_export] +macro_rules! logthru { + // error + (error $target:literal) => (|e__| { + error!( + target: $target, + "[{}]", + e__, + ); + e__ + }); + (error $target:literal, $text:literal) => (|e__| { + error!( + target: $target, + "[{}] {}", + e__, + $text + ); + e__ + }); + (error $target:literal, $fmt:literal, $($arg:expr)+) => (|e__| { + error!( + target: $target, + concat!("[{}] ", $fmt), + e__, + $($arg)+ + ); + e__ + }); + // trace + ($target:literal) => (|e__| { + trace!( + target: $target, + "[{}]", + e__, + ); + e__ + }); + ($target:literal, $text:literal) => (|e__| { + trace!( + target: $target, + "[{}] {}", + e__, + $text + ); + e__ + }); + ($target:literal, $fmt:literal, $($arg:expr)+) => (|e__| { + trace!( + target: $target, + concat!("[{}] ", $fmt), + e__, + $($arg)+ + ); + e__ + }) +} diff --git a/veilid-core/src/xx/mod.rs b/veilid-core/src/xx/mod.rs index aa906297..da3988c6 100644 --- a/veilid-core/src/xx/mod.rs +++ b/veilid-core/src/xx/mod.rs @@ -5,6 +5,7 @@ mod eventual_value; mod eventual_value_clone; mod ip_addr_port; mod ip_extra; +mod log_thru; mod single_future; mod single_shot_eventual; mod split_url; @@ -13,6 +14,7 @@ mod tools; pub use cfg_if::*; pub use log::*; +pub use log_thru::*; pub use parking_lot::*; pub use split_url::*; pub use static_assertions::*;