From 39eb13f34d08519d482163018c10bc59d183f4e7 Mon Sep 17 00:00:00 2001 From: John Smith Date: Wed, 20 Jul 2022 09:39:38 -0400 Subject: [PATCH] networkresult --- veilid-core/src/network_manager/mod.rs | 285 +++++++++++------- veilid-core/src/network_manager/native/mod.rs | 11 +- .../native/network_class_discovery.rs | 30 +- .../native/protocol/sockets.rs | 8 +- .../network_manager/native/protocol/tcp.rs | 24 +- .../src/network_manager/native/protocol/ws.rs | 18 +- .../src/network_manager/network_connection.rs | 17 +- veilid-core/src/receipt_manager.rs | 8 +- veilid-core/src/routing_table/find_nodes.rs | 82 +++-- veilid-core/src/routing_table/mod.rs | 29 +- veilid-core/src/routing_table/node_ref.rs | 2 - veilid-core/src/routing_table/tasks.rs | 56 ++-- veilid-core/src/rpc_processor/mod.rs | 172 ++++++----- .../src/rpc_processor/rpc_cancel_tunnel.rs | 3 + .../src/rpc_processor/rpc_complete_tunnel.rs | 2 + veilid-core/src/rpc_processor/rpc_error.rs | 6 +- .../src/rpc_processor/rpc_find_block.rs | 2 + .../src/rpc_processor/rpc_find_node.rs | 31 +- .../src/rpc_processor/rpc_get_value.rs | 2 + .../src/rpc_processor/rpc_node_info_update.rs | 22 +- .../src/rpc_processor/rpc_return_receipt.rs | 18 +- veilid-core/src/rpc_processor/rpc_route.rs | 4 +- .../src/rpc_processor/rpc_set_value.rs | 2 + veilid-core/src/rpc_processor/rpc_signal.rs | 18 +- .../src/rpc_processor/rpc_start_tunnel.rs | 2 + veilid-core/src/rpc_processor/rpc_status.rs | 38 ++- .../src/rpc_processor/rpc_supply_block.rs | 3 + .../rpc_processor/rpc_validate_dial_info.rs | 26 +- .../src/rpc_processor/rpc_value_changed.rs | 3 + .../src/rpc_processor/rpc_watch_value.rs | 2 + veilid-core/src/xx/network_result.rs | 77 ++++- veilid-core/src/xx/timeout_or.rs | 17 +- 32 files changed, 613 insertions(+), 407 deletions(-) diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 8ba0f07c..b9028b57 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -526,30 +526,38 @@ impl NetworkManager { } // Process a received out-of-band receipt - #[instrument(level = "trace", skip(self, receipt_data), err)] + #[instrument(level = "trace", skip(self, receipt_data), ret)] pub async fn handle_out_of_band_receipt>( &self, receipt_data: R, - ) -> EyreResult<()> { + ) -> NetworkResult<()> { let receipt_manager = self.receipt_manager(); - let receipt = Receipt::from_signed_data(receipt_data.as_ref()) - .wrap_err("failed to parse signed out-of-band receipt")?; + let receipt = match Receipt::from_signed_data(receipt_data.as_ref()) { + Err(e) => { + return NetworkResult::invalid_message(e.to_string()); + } + Ok(v) => v, + }; receipt_manager.handle_receipt(receipt, None).await } // Process a received in-band receipt - #[instrument(level = "trace", skip(self, receipt_data), err)] + #[instrument(level = "trace", skip(self, receipt_data), ret)] pub async fn handle_in_band_receipt>( &self, receipt_data: R, inbound_nr: NodeRef, - ) -> EyreResult<()> { + ) -> NetworkResult<()> { let receipt_manager = self.receipt_manager(); - let receipt = Receipt::from_signed_data(receipt_data.as_ref()) - .wrap_err("failed to parse signed in-band receipt")?; + let receipt = match Receipt::from_signed_data(receipt_data.as_ref()) { + Err(e) => { + return NetworkResult::invalid_message(e.to_string()); + } + Ok(v) => v, + }; receipt_manager .handle_receipt(receipt, Some(inbound_nr)) @@ -558,32 +566,51 @@ impl NetworkManager { // Process a received signal #[instrument(level = "trace", skip(self), err)] - pub async fn handle_signal(&self, signal_info: SignalInfo) -> EyreResult<()> { + pub async fn handle_signal( + &self, + _sender_id: DHTKey, + signal_info: SignalInfo, + ) -> EyreResult> { match signal_info { SignalInfo::ReverseConnect { receipt, peer_info } => { let routing_table = self.routing_table(); let rpc = self.rpc_processor(); // Add the peer info to our routing table - let peer_nr = routing_table.register_node_with_signed_node_info( + let peer_nr = match routing_table.register_node_with_signed_node_info( peer_info.node_id.key, peer_info.signed_node_info, - )?; + ) { + None => { + return Ok(NetworkResult::invalid_message( + "unable to register reverse connect peerinfo", + )) + } + Some(nr) => nr, + }; // Make a reverse connection to the peer and send the receipt to it rpc.rpc_call_return_receipt(Destination::Direct(peer_nr), None, receipt) .await - .wrap_err("rpc failure")?; + .wrap_err("rpc failure") } SignalInfo::HolePunch { receipt, peer_info } => { let routing_table = self.routing_table(); let rpc = self.rpc_processor(); // Add the peer info to our routing table - let mut peer_nr = routing_table.register_node_with_signed_node_info( + let mut peer_nr = match routing_table.register_node_with_signed_node_info( peer_info.node_id.key, peer_info.signed_node_info, - )?; + ) { + None => { + return Ok(NetworkResult::invalid_message( + //sender_id, + "unable to register hole punch connect peerinfo", + )); + } + Some(nr) => nr, + }; // Get the udp direct dialinfo for the hole punch peer_nr.filter_protocols(ProtocolSet::only(ProtocolType::UDP)); @@ -599,23 +626,23 @@ impl NetworkManager { // Do our half of the hole punch by sending an empty packet // Both sides will do this and then the receipt will get sent over the punched hole - self.net() - .send_data_to_dial_info( - hole_punch_dial_info_detail.dial_info.clone(), - Vec::new(), - ) - .await?; + network_result_try!( + self.net() + .send_data_to_dial_info( + hole_punch_dial_info_detail.dial_info.clone(), + Vec::new(), + ) + .await? + ); // XXX: do we need a delay here? or another hole punch packet? // Return the receipt using the same dial info send the receipt to it rpc.rpc_call_return_receipt(Destination::Direct(peer_nr), None, receipt) .await - .wrap_err("rpc failure")?; + .wrap_err("rpc failure") } } - - Ok(()) } // Builds an envelope for sending over the network @@ -652,7 +679,7 @@ impl NetworkManager { node_ref: NodeRef, envelope_node_id: Option, body: B, - ) -> EyreResult { + ) -> EyreResult> { let via_node_id = node_ref.node_id(); let envelope_node_id = envelope_node_id.unwrap_or(via_node_id); @@ -684,18 +711,17 @@ impl NetworkManager { }; // Build the envelope to send - let out = self - .build_envelope(envelope_node_id, version, body) - .map_err(logthru_rpc!(error))?; + let out = self.build_envelope(envelope_node_id, version, body)?; // Send the envelope via whatever means necessary - let send_data_kind = self.send_data(node_ref.clone(), out).await?; + let send_data_kind = network_result_try!(self.send_data(node_ref.clone(), out).await?); // If we asked to relay from the start, then this is always indirect - if envelope_node_id != via_node_id { - return Ok(SendDataKind::GlobalIndirect); - } - Ok(send_data_kind) + Ok(NetworkResult::value(if envelope_node_id != via_node_id { + SendDataKind::GlobalIndirect + } else { + send_data_kind + })) } // Called by the RPC handler when we want to issue an direct receipt @@ -712,19 +738,13 @@ impl NetworkManager { // should not be subject to our ability to decode it // Send receipt directly - match self + network_result_value_or_log!(debug self .net() .send_data_unbound_to_dial_info(dial_info, rcpt_data) - .await? - { - NetworkResult::Timeout => { - log_net!(debug "Timeout sending out of band receipt"); + .await? => { + return Ok(()); } - NetworkResult::NoConnection(e) => { - log_net!(debug "No connection sending out of band receipt: {}", e); - } - NetworkResult::Value(()) => {} - }; + ); Ok(()) } @@ -851,7 +871,7 @@ impl NetworkManager { relay_nr: NodeRef, target_nr: NodeRef, data: Vec, - ) -> EyreResult<()> { + ) -> EyreResult> { // Build a return receipt for the signal let receipt_timeout = ms_to_us(self.config.get().network.reverse_connection_receipt_time_ms); @@ -862,13 +882,15 @@ impl NetworkManager { // Issue the signal let rpc = self.rpc_processor(); - rpc.rpc_call_signal( - Destination::Relay(relay_nr.clone(), target_nr.node_id()), - None, - SignalInfo::ReverseConnect { receipt, peer_info }, - ) - .await - .wrap_err("failed to send signal")?; + network_result_try!(rpc + .rpc_call_signal( + Destination::Relay(relay_nr.clone(), target_nr.node_id()), + None, + SignalInfo::ReverseConnect { receipt, peer_info }, + ) + .await + .wrap_err("failed to send signal")?); + // Wait for the return receipt let inbound_nr = match eventual_value.await.take_value().unwrap() { ReceiptEvent::ReturnedOutOfBand => { @@ -876,7 +898,8 @@ impl NetworkManager { } ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef, ReceiptEvent::Expired => { - bail!("reverse connect receipt expired from {:?}", target_nr); + //bail!("reverse connect receipt expired from {:?}", target_nr); + return Ok(NetworkResult::timeout()); } ReceiptEvent::Cancelled => { bail!("reverse connect receipt cancelled from {:?}", target_nr); @@ -896,8 +919,10 @@ impl NetworkManager { .send_data_to_existing_connection(descriptor, data) .await? { - None => Ok(()), - Some(_) => bail!("unable to send over reverse connection"), + None => Ok(NetworkResult::value(())), + Some(_) => Ok(NetworkResult::no_connection_other( + "unable to send over reverse connection", + )), } } else { bail!("no reverse connection available") @@ -912,7 +937,7 @@ impl NetworkManager { relay_nr: NodeRef, target_nr: NodeRef, data: Vec, - ) -> EyreResult<()> { + ) -> EyreResult> { // Ensure we are filtered down to UDP (the only hole punch protocol supported today) assert!(target_nr .filter_ref() @@ -932,19 +957,22 @@ impl NetworkManager { // Do our half of the hole punch by sending an empty packet // Both sides will do this and then the receipt will get sent over the punched hole - self.net() - .send_data_to_dial_info(hole_punch_did.dial_info, Vec::new()) - .await?; + network_result_try!( + self.net() + .send_data_to_dial_info(hole_punch_did.dial_info, Vec::new()) + .await? + ); // Issue the signal let rpc = self.rpc_processor(); - rpc.rpc_call_signal( - Destination::Relay(relay_nr.clone(), target_nr.node_id()), - None, - SignalInfo::HolePunch { receipt, peer_info }, - ) - .await - .wrap_err("failed to send signal")?; + network_result_try!(rpc + .rpc_call_signal( + Destination::Relay(relay_nr.clone(), target_nr.node_id()), + None, + SignalInfo::HolePunch { receipt, peer_info }, + ) + .await + .wrap_err("failed to send signal")?); // Wait for the return receipt let inbound_nr = match eventual_value.await.take_value().unwrap() { @@ -977,8 +1005,10 @@ impl NetworkManager { .send_data_to_existing_connection(descriptor, data) .await? { - None => Ok(()), - Some(_) => bail!("unable to send over hole punch"), + None => Ok(NetworkResult::value(())), + Some(_) => Ok(NetworkResult::no_connection_other( + "unable to send over hole punch", + )), } } else { bail!("no hole punch available") @@ -998,7 +1028,7 @@ impl NetworkManager { &self, node_ref: NodeRef, data: Vec, - ) -> SendPinBoxFuture> { + ) -> SendPinBoxFuture>> { let this = self.clone(); Box::pin(async move { // First try to send data to the last socket we've seen this peer on @@ -1010,9 +1040,9 @@ impl NetworkManager { { None => { return Ok(if descriptor.matches_peer_scope(PeerScope::Local) { - SendDataKind::LocalDirect + NetworkResult::value(SendDataKind::LocalDirect) } else { - SendDataKind::GlobalDirect + NetworkResult::value(SendDataKind::GlobalDirect) }); } Some(d) => d, @@ -1021,13 +1051,17 @@ impl NetworkManager { data }; - log_net!("send_data via dialinfo to {:?}", node_ref); // If we don't have last_connection, try to reach out to the peer via its dial info - match this.get_contact_method(node_ref.clone()) { + let contact_method = this.get_contact_method(node_ref.clone()); + log_net!( + "send_data via {:?} to dialinfo {:?}", + contact_method, + node_ref + ); + match contact_method { ContactMethod::OutboundRelay(relay_nr) | ContactMethod::InboundRelay(relay_nr) => { - this.send_data(relay_nr, data) - .await - .map(|_| SendDataKind::GlobalIndirect) + network_result_try!(this.send_data(relay_nr, data).await?); + Ok(NetworkResult::value(SendDataKind::GlobalIndirect)) } ContactMethod::Direct(dial_info) => { let send_data_kind = if dial_info.is_local() { @@ -1035,26 +1069,33 @@ impl NetworkManager { } else { SendDataKind::GlobalDirect }; - this.net() - .send_data_to_dial_info(dial_info, data) - .await - .map(|_| send_data_kind) + network_result_try!(this.net().send_data_to_dial_info(dial_info, data).await?); + Ok(NetworkResult::value(send_data_kind)) } - ContactMethod::SignalReverse(relay_nr, target_node_ref) => this - .do_reverse_connect(relay_nr, target_node_ref, data) - .await - .map(|_| SendDataKind::GlobalDirect), - ContactMethod::SignalHolePunch(relay_nr, target_node_ref) => this - .do_hole_punch(relay_nr, target_node_ref, data) - .await - .map(|_| SendDataKind::GlobalDirect), - ContactMethod::Unreachable => Err(eyre!("Can't send to this node")), + ContactMethod::SignalReverse(relay_nr, target_node_ref) => { + network_result_try!( + this.do_reverse_connect(relay_nr, target_node_ref, data) + .await? + ); + Ok(NetworkResult::value(SendDataKind::GlobalDirect)) + } + ContactMethod::SignalHolePunch(relay_nr, target_node_ref) => { + network_result_try!(this.do_hole_punch(relay_nr, target_node_ref, data).await?); + Ok(NetworkResult::value(SendDataKind::GlobalDirect)) + } + ContactMethod::Unreachable => Ok(NetworkResult::no_connection_other( + "Can't send to this node", + )), } }) } // Direct bootstrap request handler (separate fallback mechanism from cheaper TXT bootstrap mechanism) - async fn handle_boot_request(&self, descriptor: ConnectionDescriptor) -> EyreResult<()> { + #[instrument(level = "trace", skip(self), ret, err)] + async fn handle_boot_request( + &self, + descriptor: ConnectionDescriptor, + ) -> EyreResult> { let routing_table = self.routing_table(); // Get a bunch of nodes with the various @@ -1075,9 +1116,11 @@ impl NetworkManager { { None => { // Bootstrap reply was sent - Ok(()) + Ok(NetworkResult::value(())) } - Some(_) => Err(eyre!("bootstrap reply could not be sent")), + Some(_) => Ok(NetworkResult::no_connection_other( + "bootstrap reply could not be sent", + )), } } @@ -1109,12 +1152,20 @@ impl NetworkManager { // Called when a packet potentially containing an RPC envelope is received by a low-level // network protocol handler. Processes the envelope, authenticates and decrypts the RPC message // and passes it to the RPC handler - #[instrument(level="trace", err, skip(self, data), fields(data.len = data.len()))] async fn on_recv_envelope( &self, data: &[u8], descriptor: ConnectionDescriptor, ) -> EyreResult { + let root = span!( + parent: None, + Level::TRACE, + "on_recv_envelope", + "data.len" = data.len(), + "descriptor" = ?descriptor + ); + let _root_enter = root.enter(); + log_net!( "envelope of {} bytes received from {:?}", data.len(), @@ -1133,18 +1184,19 @@ impl NetworkManager { // Ensure we can read the magic number if data.len() < 4 { - bail!("short packet"); + log_net!(debug "short packet".green()); + return Ok(false); } // Is this a direct bootstrap request instead of an envelope? if data[0..4] == *BOOT_MAGIC { - self.handle_boot_request(descriptor).await?; + network_result_value_or_log!(debug self.handle_boot_request(descriptor).await? => {}); return Ok(true); } // Is this an out-of-band receipt instead of an envelope? if data[0..4] == *RECEIPT_MAGIC { - self.handle_out_of_band_receipt(data).await?; + network_result_value_or_log!(debug self.handle_out_of_band_receipt(data).await => {}); return Ok(true); } @@ -1198,7 +1250,7 @@ impl NetworkManager { // This is a costly operation, so only outbound-relay permitted // nodes are allowed to do this, for example PWA users - let relay_nr = if self.check_client_whitelist(sender_id) { + let some_relay_nr = if self.check_client_whitelist(sender_id) { // Full relay allowed, do a full resolve_node rpc.resolve_node(recipient_id).await.wrap_err( "failed to resolve recipient node for relay, dropping outbound relayed packet", @@ -1211,18 +1263,19 @@ impl NetworkManager { // We should, because relays are chosen by nodes that have established connectivity and // should be mutually in each others routing tables. The node needing the relay will be // pinging this node regularly to keep itself in the routing table - routing_table.lookup_node_ref(recipient_id).ok_or_else(|| { - eyre!( - "Inbound relay asked for recipient not in routing table: sender_id={:?} recipient={:?}", - sender_id, recipient_id - ) - })? + routing_table.lookup_node_ref(recipient_id) }; - // Relay the packet to the desired destination - self.send_data(relay_nr, data.to_vec()) - .await - .wrap_err("failed to forward envelope")?; + if let Some(relay_nr) = some_relay_nr { + // Relay the packet to the desired destination + log_net!("relaying {} bytes to {}", data.len(), relay_nr); + network_result_value_or_log!(debug self.send_data(relay_nr, data.to_vec()) + .await + .wrap_err("failed to forward envelope")? => { + return Ok(false); + } + ); + } // Inform caller that we dealt with the envelope, but did not process it locally return Ok(false); } @@ -1237,11 +1290,18 @@ impl NetworkManager { .wrap_err("failed to decrypt envelope body")?; // Cache the envelope information in the routing table - let source_noderef = routing_table.register_node_with_existing_connection( + let source_noderef = match routing_table.register_node_with_existing_connection( envelope.get_sender_id(), descriptor, ts, - )?; + ) { + None => { + // If the node couldn't be registered just skip this envelope, + // the error will have already been logged + return Ok(false); + } + Some(v) => v, + }; source_noderef.operate_mut(|e| e.set_min_max_version(envelope.get_min_max_version())); // xxx: deal with spoofing and flooding here? @@ -1305,13 +1365,14 @@ impl NetworkManager { let mut inner = self.inner.lock(); // Register new outbound relay - let nr = routing_table.register_node_with_signed_node_info( + if let Some(nr) = routing_table.register_node_with_signed_node_info( outbound_relay_peerinfo.node_id.key, outbound_relay_peerinfo.signed_node_info, - )?; - info!("Outbound relay node selected: {}", nr); - inner.relay_node = Some(nr); - node_info_changed = true; + ) { + info!("Outbound relay node selected: {}", nr); + inner.relay_node = Some(nr); + node_info_changed = true; + } } // Otherwise we must need an inbound relay } else { diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 84887884..6c29d157 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -309,10 +309,11 @@ impl Network { let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr) .await .wrap_err("create socket failure")?; - h.send_message(data, peer_socket_addr) + network_result_try!(h + .send_message(data, peer_socket_addr) .await .map(NetworkResult::Value) - .wrap_err("send message failure")?; + .wrap_err("send message failure")?); } ProtocolType::TCP => { let peer_socket_addr = dial_info.to_socket_addr(); @@ -323,7 +324,7 @@ impl Network { ) .await .wrap_err("connect failure")?); - pnc.send(data).await.wrap_err("send failure")?; + network_result_try!(pnc.send(data).await.wrap_err("send failure")?); } ProtocolType::WS | ProtocolType::WSS => { let pnc = network_result_try!(WebsocketProtocolHandler::connect( @@ -333,7 +334,7 @@ impl Network { ) .await .wrap_err("connect failure")?); - pnc.send(data).await.wrap_err("send failure")?; + network_result_try!(pnc.send(data).await.wrap_err("send failure")?); } } // Network accounting @@ -408,7 +409,7 @@ impl Network { } }); - pnc.send(data).await.wrap_err("send failure")?; + network_result_try!(pnc.send(data).await.wrap_err("send failure")?); self.network_manager() .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); diff --git a/veilid-core/src/network_manager/native/network_class_discovery.rs b/veilid-core/src/network_manager/native/network_class_discovery.rs index 7810fe82..0974c0df 100644 --- a/veilid-core/src/network_manager/native/network_class_discovery.rs +++ b/veilid-core/src/network_manager/native/network_class_discovery.rs @@ -78,18 +78,24 @@ impl DiscoveryContext { #[instrument(level = "trace", skip(self), ret)] async fn request_public_address(&self, node_ref: NodeRef) -> Option { let rpc = self.routing_table.rpc_processor(); - rpc.rpc_call_status(node_ref.clone()) - .await - .map_err(logthru_net!( - "failed to get status answer from {:?}", - node_ref - )) - .map(|sa| { - let ret = sa.answer.socket_address; - log_net!("request_public_address: {:?}", ret); - ret - }) - .unwrap_or(None) + let res = network_result_value_or_log!(debug match rpc.rpc_call_status(node_ref.clone()).await { + Ok(v) => v, + Err(e) => { + log_net!(error + "failed to get status answer from {:?}: {}", + node_ref, e + ); + return None; + } + } => { return None; } + ); + + log_net!( + "request_public_address {:?}: Value({:?})", + node_ref, + res.answer + ); + res.answer.socket_address } // find fast peers with a particular address type, and ask them to tell us what our external address is diff --git a/veilid-core/src/network_manager/native/protocol/sockets.rs b/veilid-core/src/network_manager/native/protocol/sockets.rs index 942b1f35..dd23bf45 100644 --- a/veilid-core/src/network_manager/native/protocol/sockets.rs +++ b/veilid-core/src/network_manager/native/protocol/sockets.rs @@ -189,10 +189,10 @@ pub async fn nonblocking_connect( let async_stream = Async::new(std::net::TcpStream::from(socket))?; // The stream becomes writable when connected - intf::timeout(timeout_ms, async_stream.writable()) + timeout_or_try!(intf::timeout(timeout_ms, async_stream.writable()) .await .into_timeout_or() - .into_result()?; + .into_result()?); // Check low level error let async_stream = match async_stream.get_ref().take_error()? { @@ -203,9 +203,9 @@ pub async fn nonblocking_connect( // Convert back to inner and then return async version cfg_if! { if #[cfg(feature="rt-async-std")] { - Ok(TimeoutOr::Value(TcpStream::from(async_stream.into_inner()?))) + Ok(TimeoutOr::value(TcpStream::from(async_stream.into_inner()?))) } else if #[cfg(feature="rt-tokio")] { - Ok(TimeoutOr::Value(TcpStream::from_std(async_stream.into_inner()?)?)) + Ok(TimeoutOr::value(TcpStream::from_std(async_stream.into_inner()?)?)) } } } diff --git a/veilid-core/src/network_manager/native/protocol/tcp.rs b/veilid-core/src/network_manager/native/protocol/tcp.rs index fcce1743..8608a2ad 100644 --- a/veilid-core/src/network_manager/native/protocol/tcp.rs +++ b/veilid-core/src/network_manager/native/protocol/tcp.rs @@ -51,7 +51,7 @@ impl RawTcpNetworkConnection { let len = message.len() as u16; let header = [b'V', b'L', len as u8, (len >> 8) as u8]; - stream.write_all(&header).await.into_network_result()?; + network_result_try!(stream.write_all(&header).await.into_network_result()?); stream.write_all(&message).await.into_network_result() } @@ -59,21 +59,14 @@ impl RawTcpNetworkConnection { pub async fn send(&self, message: Vec) -> io::Result> { let mut stream = self.stream.clone(); let out = Self::send_internal(&mut stream, message).await?; - tracing::Span::current().record( - "network_result", - &match &out { - NetworkResult::Timeout => "Timeout".to_owned(), - NetworkResult::NoConnection(e) => format!("No connection: {}", e), - NetworkResult::Value(()) => "Value(())".to_owned(), - }, - ); + tracing::Span::current().record("network_result", &tracing::field::display(&out)); Ok(out) } async fn recv_internal(stream: &mut AsyncPeekStream) -> io::Result>> { let mut header = [0u8; 4]; - stream.read_exact(&mut header).await.into_network_result()?; + network_result_try!(stream.read_exact(&mut header).await.into_network_result()?); if header[0] != b'V' || header[1] != b'L' { bail_io_error_other!("received invalid TCP frame header"); @@ -84,7 +77,7 @@ impl RawTcpNetworkConnection { } let mut out: Vec = vec![0u8; len]; - stream.read_exact(&mut out).await.into_network_result()?; + network_result_try!(stream.read_exact(&mut out).await.into_network_result()?); Ok(NetworkResult::Value(out)) } @@ -93,14 +86,7 @@ impl RawTcpNetworkConnection { pub async fn recv(&self) -> io::Result>> { let mut stream = self.stream.clone(); let out = Self::recv_internal(&mut stream).await?; - tracing::Span::current().record( - "network_result", - &match &out { - NetworkResult::Timeout => "Timeout".to_owned(), - NetworkResult::NoConnection(e) => format!("No connection: {}", e), - NetworkResult::Value(v) => format!("Value(len={})", v.len()), - }, - ); + tracing::Span::current().record("network_result", &tracing::field::display(&out)); Ok(out) } } diff --git a/veilid-core/src/network_manager/native/protocol/ws.rs b/veilid-core/src/network_manager/native/protocol/ws.rs index 4c1d2385..36125858 100644 --- a/veilid-core/src/network_manager/native/protocol/ws.rs +++ b/veilid-core/src/network_manager/native/protocol/ws.rs @@ -92,14 +92,7 @@ where .await .map_err(to_io) .into_network_result()?; - tracing::Span::current().record( - "network_result", - &match &out { - NetworkResult::Timeout => "Timeout".to_owned(), - NetworkResult::NoConnection(e) => format!("No connection: {}", e), - NetworkResult::Value(()) => "Value(())".to_owned(), - }, - ); + tracing::Span::current().record("network_result", &tracing::field::display(&out)); Ok(out) } @@ -132,14 +125,7 @@ where )), }; - tracing::Span::current().record( - "network_result", - &match &out { - NetworkResult::Timeout => "Timeout".to_owned(), - NetworkResult::NoConnection(e) => format!("No connection: {}", e), - NetworkResult::Value(v) => format!("Value(len={})", v.len()), - }, - ); + tracing::Span::current().record("network_result", &tracing::field::display(&out)); Ok(out) } } diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index 9adb29ba..770d7ef8 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -283,7 +283,12 @@ impl NetworkConnection { let receiver_fut = Self::recv_internal(&protocol_connection, stats.clone()) .then(|res| async { match res { - Ok(NetworkResult::Value(message)) => { + Ok(v) => { + + let message = network_result_value_or_log!(debug v => { + return RecvLoopAction::Finish; + }); + // Pass received messages up to the network manager for processing if let Err(e) = network_manager .on_recv_envelope(message.as_slice(), descriptor) @@ -295,16 +300,6 @@ impl NetworkConnection { RecvLoopAction::Recv } } - Ok(NetworkResult::Timeout) => { - // Connection unable to receive, closed - log_net!(debug "Timeout"); - RecvLoopAction::Finish - } - Ok(NetworkResult::NoConnection(e)) => { - // Connection unable to receive, closed - log_net!(debug "No connection: {}", e); - RecvLoopAction::Finish - } Err(e) => { // Connection unable to receive, closed log_net!(error e); diff --git a/veilid-core/src/receipt_manager.rs b/veilid-core/src/receipt_manager.rs index 9e174f07..b3d76139 100644 --- a/veilid-core/src/receipt_manager.rs +++ b/veilid-core/src/receipt_manager.rs @@ -395,7 +395,7 @@ impl ReceiptManager { &self, receipt: Receipt, inbound_noderef: Option, - ) -> EyreResult<()> { + ) -> NetworkResult<()> { let receipt_nonce = receipt.get_nonce(); let extra_data = receipt.get_extra_data(); @@ -421,13 +421,13 @@ impl ReceiptManager { Some(ss) => ss.token(), None => { // If we're stopping do nothing here - return Ok(()); + return NetworkResult::value(()); } }; let record = match inner.records_by_nonce.get(&receipt_nonce) { Some(r) => r.clone(), None => { - bail!("receipt not recorded"); + return NetworkResult::invalid_message("receipt not recorded"); } }; // Generate the callback future @@ -457,6 +457,6 @@ impl ReceiptManager { let _ = callback_future.timeout_at(stop_token).await; } - Ok(()) + NetworkResult::value(()) } } diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index 327e9893..c488a772 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -389,8 +389,8 @@ impl RoutingTable { best_inbound_relay.map(|(k, e)| NodeRef::new(self.clone(), k, e, None)) } - #[instrument(level = "trace", skip(self), ret, err)] - pub fn register_find_node_answer(&self, peers: Vec) -> EyreResult> { + #[instrument(level = "trace", skip(self), ret)] + pub fn register_find_node_answer(&self, peers: Vec) -> Vec { let node_id = self.node_id(); // register nodes we'd found @@ -401,40 +401,57 @@ impl RoutingTable { continue; } + // node can not be its own relay + if let Some(rpi) = &p.signed_node_info.node_info.relay_peer_info { + if rpi.node_id == p.node_id { + continue; + } + } + // register the node if it's new - let nr = self - .register_node_with_signed_node_info(p.node_id.key, p.signed_node_info.clone())?; - out.push(nr); + if let Some(nr) = + self.register_node_with_signed_node_info(p.node_id.key, p.signed_node_info.clone()) + { + out.push(nr); + } } - Ok(out) + out } #[instrument(level = "trace", skip(self), ret, err)] - pub async fn find_node(&self, node_ref: NodeRef, node_id: DHTKey) -> EyreResult> { + pub async fn find_node( + &self, + node_ref: NodeRef, + node_id: DHTKey, + ) -> EyreResult>> { let rpc_processor = self.rpc_processor(); - let res = rpc_processor - .clone() - .rpc_call_find_node( - Destination::Direct(node_ref.clone()), - node_id, - None, - rpc_processor.make_respond_to_sender(node_ref.clone()), - ) - .await?; + let res = network_result_try!( + rpc_processor + .clone() + .rpc_call_find_node( + Destination::Direct(node_ref.clone()), + node_id, + None, + rpc_processor.make_respond_to_sender(node_ref.clone()), + ) + .await? + ); // register nodes we'd found - self.register_find_node_answer(res.answer) + Ok(NetworkResult::value( + self.register_find_node_answer(res.answer), + )) } #[instrument(level = "trace", skip(self), ret, err)] - pub async fn find_self(&self, node_ref: NodeRef) -> EyreResult> { + pub async fn find_self(&self, node_ref: NodeRef) -> EyreResult>> { let node_id = self.node_id(); self.find_node(node_ref, node_id).await } #[instrument(level = "trace", skip(self), ret, err)] - pub async fn find_target(&self, node_ref: NodeRef) -> EyreResult> { + pub async fn find_target(&self, node_ref: NodeRef) -> EyreResult>> { let node_id = node_ref.node_id(); self.find_node(node_ref, node_id).await } @@ -445,26 +462,35 @@ impl RoutingTable { // and then contact those nodes to inform -them- that we exist // Ask bootstrap server for nodes closest to our own node - let closest_nodes = match self.find_self(node_ref.clone()).await { + let closest_nodes = network_result_value_or_log!(debug match self.find_self(node_ref.clone()).await { Err(e) => { log_rtab!(error - "reverse_find_node: find_self failed for {:?}: {:?}", + "find_self failed for {:?}: {:?}", &node_ref, e ); return; } Ok(v) => v, - }; + } => { + return; + }); // Ask each node near us to find us as well if wide { for closest_nr in closest_nodes { - if let Err(e) = self.find_self(closest_nr.clone()).await { - log_rtab!(error - "reverse_find_node: closest node find_self failed for {:?}: {:?}", - &closest_nr, e - ); - } + network_result_value_or_log!(debug match self.find_self(closest_nr.clone()).await { + Err(e) => { + log_rtab!(error + "find_self failed for {:?}: {:?}", + &closest_nr, e + ); + continue; + } + Ok(v) => v, + } => { + // Do nothing with non-values + continue; + }); } } } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 8e25add2..85260b8b 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -576,13 +576,14 @@ impl RoutingTable { // Create a node reference, possibly creating a bucket entry // the 'update_func' closure is called on the node, and, if created, // in a locked fashion as to ensure the bucket entry state is always valid - pub fn create_node_ref(&self, node_id: DHTKey, update_func: F) -> EyreResult + pub fn create_node_ref(&self, node_id: DHTKey, update_func: F) -> Option where F: FnOnce(&mut BucketEntryInner), { // Ensure someone isn't trying register this node itself if node_id == self.node_id() { - bail!("can't register own node"); + log_rtab!(debug "can't register own node"); + return None; } // Lock this entire operation @@ -627,7 +628,7 @@ impl RoutingTable { } }; - Ok(noderef) + Some(noderef) } pub fn lookup_node_ref(&self, node_id: DHTKey) -> Option { @@ -645,22 +646,22 @@ impl RoutingTable { &self, node_id: DHTKey, signed_node_info: SignedNodeInfo, - ) -> EyreResult { + ) -> Option { // validate signed node info is not something malicious if node_id == self.node_id() { - bail!("can't register own node id in routing table"); + log_rtab!(debug "can't register own node id in routing table"); + return None; } if let Some(rpi) = &signed_node_info.node_info.relay_peer_info { if rpi.node_id.key == node_id { - bail!("node can not be its own relay"); + log_rtab!(debug "node can not be its own relay"); + return None; } } - let nr = self.create_node_ref(node_id, |e| { + self.create_node_ref(node_id, |e| { e.update_node_info(signed_node_info); - })?; - - Ok(nr) + }) } // Shortcut function to add a node to our routing table if it doesn't exist @@ -670,13 +671,11 @@ impl RoutingTable { node_id: DHTKey, descriptor: ConnectionDescriptor, timestamp: u64, - ) -> EyreResult { - let nr = self.create_node_ref(node_id, |e| { + ) -> Option { + self.create_node_ref(node_id, |e| { // set the most recent node address for connection finding and udp replies e.set_last_connection(descriptor, timestamp); - })?; - - Ok(nr) + }) } // Ticks about once per second diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index c55c25f9..c9e84c73 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -104,8 +104,6 @@ impl NodeRef { // Register relay node and return noderef self.routing_table .register_node_with_signed_node_info(t.node_id.key, t.signed_node_info) - .map_err(logthru_rtab!(error)) - .ok() .map(|mut nr| { nr.set_filter(self.filter_ref().cloned()); nr diff --git a/veilid-core/src/routing_table/tasks.rs b/veilid-core/src/routing_table/tasks.rs index fab020ab..bb8b9ebb 100644 --- a/veilid-core/src/routing_table/tasks.rs +++ b/veilid-core/src/routing_table/tasks.rs @@ -213,13 +213,13 @@ impl RoutingTable { for pi in peer_info { let k = pi.node_id.key; // Register the node - let nr = self.register_node_with_signed_node_info(k, pi.signed_node_info)?; - - // Add this our futures to process in parallel - unord.push( - // lets ask bootstrap to find ourselves now - self.reverse_find_node(nr, true), - ); + if let Some(nr) = self.register_node_with_signed_node_info(k, pi.signed_node_info) { + // Add this our futures to process in parallel + unord.push( + // lets ask bootstrap to find ourselves now + self.reverse_find_node(nr, true), + ); + } } } @@ -299,7 +299,7 @@ impl RoutingTable { log_rtab!("--- bootstrapping {} with {:?}", k.encode(), &v); // Make invalid signed node info (no signature) - let nr = self.register_node_with_signed_node_info( + if let Some(nr) = self.register_node_with_signed_node_info( k, SignedNodeInfo::with_no_signature(NodeInfo { network_class: NetworkClass::InboundCapable, // Bootstraps are always inbound capable @@ -309,27 +309,27 @@ impl RoutingTable { dial_info_detail_list: v.dial_info_details, // Dial info is as specified in the bootstrap list relay_peer_info: None, // Bootstraps never require a relay themselves }), - )?; + ) { + // Add this our futures to process in parallel + let this = self.clone(); + unord.push(async move { + // Need VALID signed peer info, so ask bootstrap to find_node of itself + // which will ensure it has the bootstrap's signed peer info as part of the response + let _ = this.find_target(nr.clone()).await; - // Add this our futures to process in parallel - let this = self.clone(); - unord.push(async move { - // Need VALID signed peer info, so ask bootstrap to find_node of itself - // which will ensure it has the bootstrap's signed peer info as part of the response - let _ = this.find_target(nr.clone()).await; - - // Ensure we got the signed peer info - if !nr.operate(|e| e.has_valid_signed_node_info()) { - log_rtab!(warn - "bootstrap at {:?} did not return valid signed node info", - nr - ); - // If this node info is invalid, it will time out after being unpingable - } else { - // otherwise this bootstrap is valid, lets ask it to find ourselves now - this.reverse_find_node(nr, true).await - } - }); + // Ensure we got the signed peer info + if !nr.operate(|e| e.has_valid_signed_node_info()) { + log_rtab!(warn + "bootstrap at {:?} did not return valid signed node info", + nr + ); + // If this node info is invalid, it will time out after being unpingable + } else { + // otherwise this bootstrap is valid, lets ask it to find ourselves now + this.reverse_find_node(nr, true).await + } + }); + } } // Wait for all bootstrap operations to complete before we complete the singlefuture diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index afa1eaeb..e214b138 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -120,7 +120,7 @@ where #[derive(Debug)] struct WaitableReply { op_id: OperationId, - eventual: EventualValue, + eventual: EventualValue<(Option, RPCMessage)>, timeout: u64, node_ref: NodeRef, send_ts: u64, @@ -153,10 +153,10 @@ pub struct RPCProcessorInner { routing_table: RoutingTable, node_id: DHTKey, node_id_secret: DHTKeySecret, - send_channel: Option>, + send_channel: Option, RPCMessageEncoded)>>, timeout: u64, max_route_hop_count: usize, - waiting_rpc_table: BTreeMap>, + waiting_rpc_table: BTreeMap, RPCMessage)>>, stop_source: Option, worker_join_handles: Vec>, } @@ -242,13 +242,14 @@ impl RPCProcessor { ////////////////////////////////////////////////////////////////////// // Search the DHT for a single node closest to a key and add it to the routing table and return the node reference + // If no node was found in the timeout, this returns None pub async fn search_dht_single_key( &self, _node_id: DHTKey, _count: u32, _fanout: u32, _timeout: Option, - ) -> Result { + ) -> Result, RPCError> { //let routing_table = self.routing_table(); // xxx find node but stop if we find the exact node we want @@ -270,7 +271,10 @@ impl RPCProcessor { // Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference // Note: This routine can possible be recursive, hence the SendPinBoxFuture async form - pub fn resolve_node(&self, node_id: DHTKey) -> SendPinBoxFuture> { + pub fn resolve_node( + &self, + node_id: DHTKey, + ) -> SendPinBoxFuture, RPCError>> { let this = self.clone(); Box::pin(async move { let routing_table = this.routing_table(); @@ -280,7 +284,7 @@ impl RPCProcessor { // ensure we have some dial info for the entry already, // if not, we should do the find_node anyway if nr.has_any_dial_info() { - return Ok(nr); + return Ok(Some(nr)); } } @@ -298,9 +302,11 @@ impl RPCProcessor { .search_dht_single_key(node_id, count, fanout, timeout) .await?; - if nr.node_id() != node_id { - // found a close node, but not exact within our configured resolve_node timeout - return Err(RPCError::Timeout).map_err(logthru_rpc!()); + if let Some(nr) = &nr { + if nr.node_id() != node_id { + // found a close node, but not exact within our configured resolve_node timeout + return Ok(None); + } } Ok(nr) @@ -308,7 +314,7 @@ impl RPCProcessor { } // set up wait for reply - fn add_op_id_waiter(&self, op_id: OperationId) -> EventualValue { + fn add_op_id_waiter(&self, op_id: OperationId) -> EventualValue<(Option, RPCMessage)> { let mut inner = self.inner.lock(); let e = EventualValue::new(); inner.waiting_rpc_table.insert(op_id, e.clone()); @@ -322,6 +328,7 @@ impl RPCProcessor { } // complete the reply + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] async fn complete_op_id_waiter(&self, msg: RPCMessage) -> Result<(), RPCError> { let op_id = msg.operation.op_id(); let eventual = { @@ -331,7 +338,7 @@ impl RPCProcessor { .remove(&op_id) .ok_or_else(RPCError::else_internal("Unmatched operation id"))? }; - eventual.resolve(msg).await; + eventual.resolve((Span::current().id(), msg)).await; Ok(()) } @@ -339,32 +346,38 @@ impl RPCProcessor { async fn do_wait_for_reply( &self, waitable_reply: &WaitableReply, - ) -> Result<(RPCMessage, u64), RPCError> { + ) -> Result, RPCError> { let timeout_ms = u32::try_from(waitable_reply.timeout / 1000u64) .map_err(RPCError::map_internal("invalid timeout"))?; // wait for eventualvalue let start_ts = intf::get_timestamp(); let res = intf::timeout(timeout_ms, waitable_reply.eventual.instance()) .await - .map_err(|_| RPCError::Timeout)?; - let rpcreader = res.take_value().unwrap(); - let end_ts = intf::get_timestamp(); - Ok((rpcreader, end_ts - start_ts)) + .into_timeout_or(); + Ok(res.map(|res| { + let (span_id, rpcreader) = res.take_value().unwrap(); + let end_ts = intf::get_timestamp(); + + Span::current().follows_from(span_id); + + (rpcreader, end_ts - start_ts) + })) } + #[instrument(level = "trace", skip(self, waitable_reply), err)] async fn wait_for_reply( &self, waitable_reply: WaitableReply, - ) -> Result<(RPCMessage, u64), RPCError> { + ) -> Result, RPCError> { let out = self.do_wait_for_reply(&waitable_reply).await; match &out { - Err(_) => { + Err(_) | Ok(TimeoutOr::Timeout) => { self.cancel_op_id_waiter(waitable_reply.op_id); self.routing_table() .stats_question_lost(waitable_reply.node_ref.clone()); } - Ok((rpcreader, _)) => { + Ok(TimeoutOr::Value((rpcreader, _))) => { // Note that we definitely received this node info since we got a reply waitable_reply.node_ref.set_seen_our_node_info(); @@ -522,7 +535,7 @@ impl RPCProcessor { dest: Destination, question: RPCQuestion, safety_route_spec: Option<&SafetyRouteSpec>, - ) -> Result { + ) -> Result, RPCError> { // Wrap question in operation let operation = RPCOperation::new_question(question); let op_id = operation.op_id(); @@ -540,16 +553,13 @@ impl RPCProcessor { // If we need to resolve the first hop, do it let node_ref = match node_ref { - None => { - // resolve node - self.resolve_node(node_id) - .await - .map_err(logthru_rpc!(error))? - } - Some(nr) => { - // got the node in the routing table already - nr - } + None => match self.resolve_node(node_id).await? { + None => { + return Ok(NetworkResult::no_connection_other(node_id)); + } + Some(nr) => nr, + }, + Some(nr) => nr, }; // Calculate answer timeout @@ -562,37 +572,31 @@ impl RPCProcessor { // Send question let bytes = message.len() as u64; let send_ts = intf::get_timestamp(); - let send_data_kind = match self + let send_data_kind = network_result_try!(self .network_manager() .send_envelope(node_ref.clone(), Some(node_id), message) .await - .map_err(RPCError::internal) - { - Ok(v) => v, - Err(e) => { + .map_err(RPCError::network)? => { // Make sure to clean up op id waiter in case of error self.cancel_op_id_waiter(op_id); - self.routing_table() .stats_failed_to_send(node_ref, send_ts, true); - - return Err(e); } - }; + ); // Successfully sent self.routing_table() .stats_question_sent(node_ref.clone(), send_ts, bytes, true); // Pass back waitable reply completion - Ok(WaitableReply { + Ok(NetworkResult::value(WaitableReply { op_id, eventual, timeout, node_ref, send_ts, send_data_kind, - }) + })) } // Issue a statement over the network, possibly using an anonymized route @@ -602,7 +606,7 @@ impl RPCProcessor { dest: Destination, statement: RPCStatement, safety_route_spec: Option<&SafetyRouteSpec>, - ) -> Result<(), RPCError> { + ) -> Result, RPCError> { // Wrap statement in operation let operation = RPCOperation::new_statement(statement); @@ -619,40 +623,33 @@ impl RPCProcessor { // If we need to resolve the first hop, do it let node_ref = match node_ref { - None => { - // resolve node - self.resolve_node(node_id) - .await - .map_err(logthru_rpc!(error))? - } - Some(nr) => { - // got the node in the routing table already - nr - } + None => match self.resolve_node(node_id).await? { + None => { + return Ok(NetworkResult::no_connection_other(node_id)); + } + Some(nr) => nr, + }, + Some(nr) => nr, }; // Send statement let bytes = message.len() as u64; let send_ts = intf::get_timestamp(); - let _send_data_kind = match self + let _send_data_kind = network_result_try!(self .network_manager() .send_envelope(node_ref.clone(), Some(node_id), message) .await - .map_err(RPCError::network) - { - Ok(v) => v, - Err(e) => { + .map_err(RPCError::network)? => { self.routing_table() .stats_failed_to_send(node_ref, send_ts, true); - return Err(e); } - }; + ); // Successfully sent self.routing_table() .stats_question_sent(node_ref.clone(), send_ts, bytes, true); - Ok(()) + Ok(NetworkResult::value(())) } // Convert the 'RespondTo' into a 'Destination' for a response @@ -694,7 +691,7 @@ impl RPCProcessor { request: RPCMessage, answer: RPCAnswer, safety_route_spec: Option<&SafetyRouteSpec>, - ) -> Result<(), RPCError> { + ) -> Result, RPCError> { // Wrap answer in operation let operation = RPCOperation::new_answer(&request.operation, answer); @@ -714,33 +711,31 @@ impl RPCProcessor { // If we need to resolve the first hop, do it let node_ref = match node_ref { - None => { - // resolve node - self.resolve_node(node_id).await? - } - Some(nr) => { - // got the node in the routing table already - nr - } + None => match self.resolve_node(node_id).await? { + None => { + return Ok(NetworkResult::no_connection_other(node_id)); + } + Some(nr) => nr, + }, + Some(nr) => nr, }; // Send the reply let bytes = message.len() as u64; let send_ts = intf::get_timestamp(); - self.network_manager() + network_result_try!(self.network_manager() .send_envelope(node_ref.clone(), Some(node_id), message) .await - .map_err(RPCError::network) - .map_err(|e| { + .map_err(RPCError::network)? => { self.routing_table() .stats_failed_to_send(node_ref.clone(), send_ts, false); - e - })?; + } + ); // Reply successfully sent self.routing_table().stats_answer_sent(node_ref, bytes); - Ok(()) + Ok(NetworkResult::value(())) } async fn generate_sender_info(peer_noderef: NodeRef) -> SenderInfo { @@ -752,6 +747,7 @@ impl RPCProcessor { } ////////////////////////////////////////////////////////////////////// + #[instrument(level = "trace", skip(self, encoded_msg), err)] async fn process_rpc_message_version_0( &self, encoded_msg: RPCMessageEncoded, @@ -781,12 +777,9 @@ impl RPCProcessor { "respond_to_sender_signed_node_info has invalid peer scope", )); } - let nr = self + opt_sender_nr = self .routing_table() - .register_node_with_signed_node_info(sender_node_id, sender_ni.clone()) - .map_err(map_to_string) - .map_err(RPCError::Internal)?; - opt_sender_nr = Some(nr); + .register_node_with_signed_node_info(sender_node_id, sender_ni.clone()); } _ => {} } @@ -864,6 +857,7 @@ impl RPCProcessor { } } + #[instrument(level = "trace", skip(self, msg), err)] async fn process_rpc_message(&self, msg: RPCMessageEncoded) -> Result<(), RPCError> { if msg.header.envelope.get_version() == 0 { self.process_rpc_message_version_0(msg).await @@ -875,8 +869,18 @@ impl RPCProcessor { } } - async fn rpc_worker(self, stop_token: StopToken, receiver: flume::Receiver) { - while let Ok(Ok(msg)) = receiver.recv_async().timeout_at(stop_token.clone()).await { + async fn rpc_worker( + self, + stop_token: StopToken, + receiver: flume::Receiver<(Option, RPCMessageEncoded)>, + ) { + while let Ok(Ok((span_id, msg))) = + receiver.recv_async().timeout_at(stop_token.clone()).await + { + let rpc_worker_span = span!(parent: span_id, Level::TRACE, "rpc_worker"); + //rpc_worker_span.follows_from(span_id); + let _enter = rpc_worker_span.enter(); + let _ = self .process_rpc_message(msg) .await @@ -963,6 +967,7 @@ impl RPCProcessor { debug!("finished rpc processor shutdown"); } + #[instrument(level = "trace", skip(self, body), err)] pub fn enqueue_message( &self, envelope: Envelope, @@ -982,8 +987,9 @@ impl RPCProcessor { let inner = self.inner.lock(); inner.send_channel.as_ref().unwrap().clone() }; + let span_id = Span::current().id(); send_channel - .try_send(msg) + .try_send((span_id, msg)) .wrap_err("failed to enqueue received RPC message")?; Ok(()) } diff --git a/veilid-core/src/rpc_processor/rpc_cancel_tunnel.rs b/veilid-core/src/rpc_processor/rpc_cancel_tunnel.rs index bc4b1cc2..4d7d046b 100644 --- a/veilid-core/src/rpc_processor/rpc_cancel_tunnel.rs +++ b/veilid-core/src/rpc_processor/rpc_cancel_tunnel.rs @@ -1,7 +1,10 @@ use super::*; impl RPCProcessor { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)] pub(crate) async fn process_cancel_tunnel_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + // tracing::Span::current().record("res", &tracing::field::display(res)); + Err(RPCError::unimplemented("process_cancel_tunnel_q")) } } diff --git a/veilid-core/src/rpc_processor/rpc_complete_tunnel.rs b/veilid-core/src/rpc_processor/rpc_complete_tunnel.rs index 04d10e76..66975971 100644 --- a/veilid-core/src/rpc_processor/rpc_complete_tunnel.rs +++ b/veilid-core/src/rpc_processor/rpc_complete_tunnel.rs @@ -1,7 +1,9 @@ use super::*; impl RPCProcessor { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)] pub(crate) async fn process_complete_tunnel_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + // tracing::Span::current().record("res", &tracing::field::display(res)); Err(RPCError::unimplemented("process_complete_tunnel_q")) } } diff --git a/veilid-core/src/rpc_processor/rpc_error.rs b/veilid-core/src/rpc_processor/rpc_error.rs index 16288a17..bf650778 100644 --- a/veilid-core/src/rpc_processor/rpc_error.rs +++ b/veilid-core/src/rpc_processor/rpc_error.rs @@ -1,9 +1,8 @@ use super::*; #[derive(ThisError, Debug, Clone, PartialOrd, PartialEq, Eq, Ord)] +#[must_use] pub enum RPCError { - #[error("[RPCError: Timeout]")] - Timeout, #[error("[RPCError: Unreachable({0})]")] Unreachable(DHTKey), #[error("[RPCError: Unimplemented({0})]")] @@ -19,9 +18,6 @@ pub enum RPCError { } impl RPCError { - pub fn timeout() -> Self { - Self::Timeout - } pub fn unreachable(key: DHTKey) -> Self { Self::Unreachable(key) } diff --git a/veilid-core/src/rpc_processor/rpc_find_block.rs b/veilid-core/src/rpc_processor/rpc_find_block.rs index 97b4e61b..8c0dcf94 100644 --- a/veilid-core/src/rpc_processor/rpc_find_block.rs +++ b/veilid-core/src/rpc_processor/rpc_find_block.rs @@ -1,7 +1,9 @@ use super::*; impl RPCProcessor { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)] pub(crate) async fn process_find_block_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + // tracing::Span::current().record("res", &tracing::field::display(res)); Err(RPCError::unimplemented("process_find_block_q")) } } diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index 071e96f3..e3f2213b 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -10,15 +10,19 @@ impl RPCProcessor { key: DHTKey, safety_route: Option<&SafetyRouteSpec>, respond_to: RespondTo, - ) -> Result>, RPCError> { + ) -> Result>>, RPCError> { let find_node_q = RPCOperationFindNodeQ { node_id: key }; let question = RPCQuestion::new(respond_to, RPCQuestionDetail::FindNodeQ(find_node_q)); // Send the find_node request - let waitable_reply = self.question(dest, question, safety_route).await?; + let waitable_reply = + network_result_try!(self.question(dest, question, safety_route).await?); // Wait for reply - let (msg, latency) = self.wait_for_reply(waitable_reply).await?; + let (msg, latency) = match self.wait_for_reply(waitable_reply).await? { + TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), + TimeoutOr::Value(v) => v, + }; // Get the right answer type let find_node_a = match msg.operation.into_kind() { @@ -38,9 +42,13 @@ impl RPCProcessor { } } - Ok(Answer::new(latency, find_node_a.peers)) + Ok(NetworkResult::value(Answer::new( + latency, + find_node_a.peers, + ))) } + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)] pub(crate) async fn process_find_node_q(&self, msg: RPCMessage) -> Result<(), RPCError> { // Get the question let find_node_q = match msg.operation.kind() { @@ -74,11 +82,14 @@ impl RPCProcessor { }; // Send status answer - self.answer( - msg, - RPCAnswer::new(RPCAnswerDetail::FindNodeA(find_node_a)), - None, - ) - .await + let res = self + .answer( + msg, + RPCAnswer::new(RPCAnswerDetail::FindNodeA(find_node_a)), + None, + ) + .await?; + tracing::Span::current().record("res", &tracing::field::display(res)); + Ok(()) } } diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index c72e7092..23fb5175 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -1,7 +1,9 @@ use super::*; impl RPCProcessor { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)] pub(crate) async fn process_get_value_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + //tracing::Span::current().record("res", &tracing::field::display(res)); Err(RPCError::unimplemented("process_get_value_q")) } } diff --git a/veilid-core/src/rpc_processor/rpc_node_info_update.rs b/veilid-core/src/rpc_processor/rpc_node_info_update.rs index e33f83c1..7ef1c6d9 100644 --- a/veilid-core/src/rpc_processor/rpc_node_info_update.rs +++ b/veilid-core/src/rpc_processor/rpc_node_info_update.rs @@ -8,17 +8,18 @@ impl RPCProcessor { self, dest: Destination, safety_route: Option<&SafetyRouteSpec>, - ) -> Result<(), RPCError> { + ) -> Result, RPCError> { let signed_node_info = self.routing_table().get_own_signed_node_info(); let node_info_update = RPCOperationNodeInfoUpdate { signed_node_info }; let statement = RPCStatement::new(RPCStatementDetail::NodeInfoUpdate(node_info_update)); // Send the node_info_update request - self.statement(dest, statement, safety_route).await?; + network_result_try!(self.statement(dest, statement, safety_route).await?); - Ok(()) + Ok(NetworkResult::value(())) } + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] pub(crate) async fn process_node_info_update(&self, msg: RPCMessage) -> Result<(), RPCError> { let sender_node_id = msg.header.envelope.get_sender_id(); @@ -33,15 +34,14 @@ impl RPCProcessor { // Update our routing table with signed node info if !self.filter_peer_scope(&node_info_update.signed_node_info.node_info) { - return Err(RPCError::invalid_format( - "node_info_update has invalid peer scope", - )); + log_rpc!(debug + "node_info_update has invalid peer scope from {}", sender_node_id + ); + return Ok(()); } - let _ = self - .routing_table() - .register_node_with_signed_node_info(sender_node_id, node_info_update.signed_node_info) - .map_err(map_to_string) - .map_err(RPCError::Internal)?; + + self.routing_table() + .register_node_with_signed_node_info(sender_node_id, node_info_update.signed_node_info); Ok(()) } diff --git a/veilid-core/src/rpc_processor/rpc_return_receipt.rs b/veilid-core/src/rpc_processor/rpc_return_receipt.rs index 009bd3b2..838020ae 100644 --- a/veilid-core/src/rpc_processor/rpc_return_receipt.rs +++ b/veilid-core/src/rpc_processor/rpc_return_receipt.rs @@ -9,18 +9,19 @@ impl RPCProcessor { dest: Destination, safety_route: Option<&SafetyRouteSpec>, receipt: D, - ) -> Result<(), RPCError> { + ) -> Result, RPCError> { let receipt = receipt.as_ref().to_vec(); let return_receipt = RPCOperationReturnReceipt { receipt }; let statement = RPCStatement::new(RPCStatementDetail::ReturnReceipt(return_receipt)); // Send the return_receipt request - self.statement(dest, statement, safety_route).await?; + network_result_try!(self.statement(dest, statement, safety_route).await?); - Ok(()) + Ok(NetworkResult::value(())) } + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] pub(crate) async fn process_return_receipt(&self, msg: RPCMessage) -> Result<(), RPCError> { // Get the statement let RPCOperationReturnReceipt { receipt } = match msg.operation.into_kind() { @@ -33,9 +34,12 @@ impl RPCProcessor { // Handle it let network_manager = self.network_manager(); - network_manager - .handle_in_band_receipt(receipt, msg.header.peer_noderef) - .await - .map_err(RPCError::network) + network_result_value_or_log!(debug + network_manager + .handle_in_band_receipt(receipt, msg.header.peer_noderef) + .await => {} + ); + + Ok(()) } } diff --git a/veilid-core/src/rpc_processor/rpc_route.rs b/veilid-core/src/rpc_processor/rpc_route.rs index 00f12eb9..29729bb7 100644 --- a/veilid-core/src/rpc_processor/rpc_route.rs +++ b/veilid-core/src/rpc_processor/rpc_route.rs @@ -3,8 +3,10 @@ use super::*; impl RPCProcessor { // xxx do not process latency for routed messages - pub(crate) async fn process_route(&self, _rpcreader: RPCMessage) -> Result<(), RPCError> { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] + pub(crate) async fn process_route(&self, msg: RPCMessage) -> Result<(), RPCError> { // xxx do not process latency for routed messages + // tracing::Span::current().record("res", &tracing::field::display(res)); Err(RPCError::unimplemented("process_route")) } } diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index 5e1b2661..8ff11f49 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -1,7 +1,9 @@ use super::*; impl RPCProcessor { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] pub(crate) async fn process_set_value_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + // tracing::Span::current().record("res", &tracing::field::display(res)); Err(RPCError::unimplemented("process_set_value_q")) } } diff --git a/veilid-core/src/rpc_processor/rpc_signal.rs b/veilid-core/src/rpc_processor/rpc_signal.rs index 880e9901..51d045cb 100644 --- a/veilid-core/src/rpc_processor/rpc_signal.rs +++ b/veilid-core/src/rpc_processor/rpc_signal.rs @@ -9,17 +9,18 @@ impl RPCProcessor { dest: Destination, safety_route: Option<&SafetyRouteSpec>, signal_info: SignalInfo, - ) -> Result<(), RPCError> { + ) -> Result, RPCError> { //let signed_node_info = self.routing_table().get_own_signed_node_info(); let signal = RPCOperationSignal { signal_info }; let statement = RPCStatement::new(RPCStatementDetail::Signal(signal)); // Send the signal request - self.statement(dest, statement, safety_route).await?; + network_result_try!(self.statement(dest, statement, safety_route).await?); - Ok(()) + Ok(NetworkResult::value(())) } + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] pub(crate) async fn process_signal(&self, msg: RPCMessage) -> Result<(), RPCError> { // Get the statement let signal = match msg.operation.into_kind() { @@ -32,9 +33,14 @@ impl RPCProcessor { // Handle it let network_manager = self.network_manager(); - network_manager - .handle_signal(signal.signal_info) + network_result_value_or_log!(debug network_manager + .handle_signal(msg.header.envelope.get_sender_id(), signal.signal_info) .await - .map_err(RPCError::network) + .map_err(RPCError::network)? => { + return Ok(()); + } + ); + + Ok(()) } } diff --git a/veilid-core/src/rpc_processor/rpc_start_tunnel.rs b/veilid-core/src/rpc_processor/rpc_start_tunnel.rs index 5ac8630b..616f7fb0 100644 --- a/veilid-core/src/rpc_processor/rpc_start_tunnel.rs +++ b/veilid-core/src/rpc_processor/rpc_start_tunnel.rs @@ -1,7 +1,9 @@ use super::*; impl RPCProcessor { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] pub(crate) async fn process_start_tunnel_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + // tracing::Span::current().record("res", &tracing::field::display(res)); Err(RPCError::unimplemented("process_start_tunnel_q")) } } diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index 3ac9adcb..643ba932 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -4,22 +4,29 @@ impl RPCProcessor { // Send StatusQ RPC request, receive StatusA answer // Can be sent via relays, but not via routes #[instrument(level = "trace", skip(self), ret, err)] - pub async fn rpc_call_status(self, peer: NodeRef) -> Result, RPCError> { + pub async fn rpc_call_status( + self, + peer: NodeRef, + ) -> Result>, RPCError> { let node_status = self.network_manager().generate_node_status(); let status_q = RPCOperationStatusQ { node_status }; let respond_to = self.make_respond_to_sender(peer.clone()); let question = RPCQuestion::new(respond_to, RPCQuestionDetail::StatusQ(status_q)); // Send the info request - let waitable_reply = self - .question(Destination::Direct(peer.clone()), question, None) - .await?; + let waitable_reply = network_result_try!( + self.question(Destination::Direct(peer.clone()), question, None) + .await? + ); // Note what kind of ping this was and to what peer scope let send_data_kind = waitable_reply.send_data_kind; // Wait for reply - let (msg, latency) = self.wait_for_reply(waitable_reply).await?; + let (msg, latency) = match self.wait_for_reply(waitable_reply).await? { + TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), + TimeoutOr::Value(v) => v, + }; // Get the right answer type let status_a = match msg.operation.into_kind() { @@ -54,9 +61,13 @@ impl RPCProcessor { } } - Ok(Answer::new(latency, status_a.sender_info)) + Ok(NetworkResult::value(Answer::new( + latency, + status_a.sender_info, + ))) } + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)] pub(crate) async fn process_status_q(&self, msg: RPCMessage) -> Result<(), RPCError> { let peer_noderef = msg.header.peer_noderef.clone(); @@ -86,11 +97,14 @@ impl RPCProcessor { }; // Send status answer - self.answer( - msg, - RPCAnswer::new(RPCAnswerDetail::StatusA(status_a)), - None, - ) - .await + let res = self + .answer( + msg, + RPCAnswer::new(RPCAnswerDetail::StatusA(status_a)), + None, + ) + .await?; + tracing::Span::current().record("res", &tracing::field::display(res)); + Ok(()) } } diff --git a/veilid-core/src/rpc_processor/rpc_supply_block.rs b/veilid-core/src/rpc_processor/rpc_supply_block.rs index 2d46ae53..2de9a72e 100644 --- a/veilid-core/src/rpc_processor/rpc_supply_block.rs +++ b/veilid-core/src/rpc_processor/rpc_supply_block.rs @@ -1,7 +1,10 @@ use super::*; impl RPCProcessor { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] pub(crate) async fn process_supply_block_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + // tracing::Span::current().record("res", &tracing::field::display(res)); + Err(RPCError::unimplemented("process_supply_block_q")) } } diff --git a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs index cca74db8..b75d5d44 100644 --- a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs +++ b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs @@ -32,20 +32,24 @@ impl RPCProcessor { // Send the validate_dial_info request // This can only be sent directly, as relays can not validate dial info - self.statement(Destination::Direct(peer), statement, None) - .await?; + network_result_value_or_log!(debug self.statement(Destination::Direct(peer), statement, None) + .await? => { + return Ok(false); + } + ); // Wait for receipt match eventual_value.await.take_value().unwrap() { - ReceiptEvent::ReturnedInBand { inbound_noderef: _ } => Err(RPCError::internal( - "validate_dial_info receipt should be returned out-of-band", - )), + ReceiptEvent::ReturnedInBand { inbound_noderef: _ } => { + log_net!(debug "validate_dial_info receipt should be returned out-of-band".green()); + Ok(false) + } ReceiptEvent::ReturnedOutOfBand => { log_net!(debug "validate_dial_info receipt returned"); Ok(true) } ReceiptEvent::Expired => { - log_net!(debug "validate_dial_info receipt expired"); + log_net!(debug "validate_dial_info receipt expired".green()); Ok(false) } ReceiptEvent::Cancelled => { @@ -54,6 +58,7 @@ impl RPCProcessor { } } + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] pub(crate) async fn process_validate_dial_info(&self, msg: RPCMessage) -> Result<(), RPCError> { // Get the statement let RPCOperationValidateDialInfo { @@ -137,8 +142,11 @@ impl RPCProcessor { // Send the validate_dial_info request // This can only be sent directly, as relays can not validate dial info - self.statement(Destination::Direct(peer), statement, None) - .await?; + network_result_value_or_log!(debug self.statement(Destination::Direct(peer), statement, None) + .await? => { + return Ok(()); + } + ); } return Ok(()); }; @@ -151,6 +159,8 @@ impl RPCProcessor { .await .map_err(RPCError::network)?; + // tracing::Span::current().record("res", &tracing::field::display(res)); + Ok(()) } } diff --git a/veilid-core/src/rpc_processor/rpc_value_changed.rs b/veilid-core/src/rpc_processor/rpc_value_changed.rs index 80a47658..dfa631ea 100644 --- a/veilid-core/src/rpc_processor/rpc_value_changed.rs +++ b/veilid-core/src/rpc_processor/rpc_value_changed.rs @@ -1,7 +1,10 @@ use super::*; impl RPCProcessor { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] pub(crate) async fn process_value_changed(&self, msg: RPCMessage) -> Result<(), RPCError> { + // tracing::Span::current().record("res", &tracing::field::display(res)); + Err(RPCError::unimplemented("process_value_changed")) } } diff --git a/veilid-core/src/rpc_processor/rpc_watch_value.rs b/veilid-core/src/rpc_processor/rpc_watch_value.rs index 67151b14..b55f2a8d 100644 --- a/veilid-core/src/rpc_processor/rpc_watch_value.rs +++ b/veilid-core/src/rpc_processor/rpc_watch_value.rs @@ -1,7 +1,9 @@ use super::*; impl RPCProcessor { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] pub(crate) async fn process_watch_value_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + // tracing::Span::current().record("res", &tracing::field::display(res)); Err(RPCError::unimplemented("process_watch_value_q")) } } diff --git a/veilid-core/src/xx/network_result.rs b/veilid-core/src/xx/network_result.rs index 737c0229..8a198e0d 100644 --- a/veilid-core/src/xx/network_result.rs +++ b/veilid-core/src/xx/network_result.rs @@ -58,6 +58,7 @@ impl NetworkResultResultExt for NetworkResult> { match self { NetworkResult::Timeout => Ok(NetworkResult::::Timeout), NetworkResult::NoConnection(e) => Ok(NetworkResult::::NoConnection(e)), + NetworkResult::InvalidMessage(s) => Ok(NetworkResult::::InvalidMessage(s)), NetworkResult::Value(Ok(v)) => Ok(NetworkResult::::Value(v)), NetworkResult::Value(Err(e)) => Err(e), } @@ -124,9 +125,11 @@ impl FoldedNetworkResultExt for io::Result> { ////////////////////////////////////////////////////////////////// // Non-fallible network result +#[must_use] pub enum NetworkResult { Timeout, NoConnection(io::Error), + InvalidMessage(String), Value(T), } @@ -137,6 +140,13 @@ impl NetworkResult { pub fn no_connection(e: io::Error) -> Self { Self::NoConnection(e) } + pub fn no_connection_other(s: S) -> Self { + Self::NoConnection(io::Error::new(io::ErrorKind::Other, s.to_string())) + } + pub fn invalid_message(s: S) -> Self { + Self::InvalidMessage(s.to_string()) + } + pub fn value(value: T) -> Self { Self::Value(value) } @@ -150,11 +160,22 @@ impl NetworkResult { pub fn is_value(&self) -> bool { matches!(self, Self::Value(_)) } - + pub fn map X>(self, f: F) -> NetworkResult { + match self { + Self::Timeout => NetworkResult::::Timeout, + Self::NoConnection(e) => NetworkResult::::NoConnection(e), + Self::InvalidMessage(s) => NetworkResult::::InvalidMessage(s), + Self::Value(v) => NetworkResult::::Value(f(v)), + } + } pub fn into_result(self) -> Result { match self { Self::Timeout => Err(io::Error::new(io::ErrorKind::TimedOut, "Timed out")), Self::NoConnection(e) => Err(e), + Self::InvalidMessage(s) => Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Invalid message: {}", s), + )), Self::Value(v) => Ok(v), } } @@ -174,6 +195,7 @@ impl From> for Option { // match self { // Self::Timeout => Self::Timeout, // Self::NoConnection(e) => Self::NoConnection(e.clone()), +// Self::InvalidResponse(k, s) => Self::InvalidResponse(k, s.clone()), // Self::Value(t) => Self::Value(t.clone()), // } // } @@ -184,19 +206,23 @@ impl Debug for NetworkResult { match self { Self::Timeout => write!(f, "Timeout"), Self::NoConnection(e) => f.debug_tuple("NoConnection").field(e).finish(), + Self::InvalidMessage(s) => f.debug_tuple("InvalidMessage").field(s).finish(), Self::Value(v) => f.debug_tuple("Value").field(v).finish(), } } } -impl Display for NetworkResult { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + +impl Display for NetworkResult { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { - Self::Timeout => write!(f, ""), - Self::NoConnection(e) => write!(f, "No connection: {}", e.kind()), - Self::Value(v) => write!(f, "{}", v), + Self::Timeout => write!(f, "Timeout"), + Self::NoConnection(e) => write!(f, "NoConnection({})", e.kind()), + Self::InvalidMessage(s) => write!(f, "InvalidMessage({})", s), + Self::Value(_) => write!(f, "Value"), } } } + impl Error for NetworkResult {} ////////////////////////////////////////////////////////////////// @@ -208,6 +234,45 @@ macro_rules! network_result_try { match $r { NetworkResult::Timeout => return Ok(NetworkResult::Timeout), NetworkResult::NoConnection(e) => return Ok(NetworkResult::NoConnection(e)), + NetworkResult::InvalidMessage(s) => return Ok(NetworkResult::InvalidMessage(s)), + NetworkResult::Value(v) => v, + } + }; + ($r:expr => $f:tt) => { + match $r { + NetworkResult::Timeout => { + $f; + return Ok(NetworkResult::Timeout); + } + NetworkResult::NoConnection(e) => { + $f; + return Ok(NetworkResult::NoConnection(e)); + } + NetworkResult::InvalidMessage(s) => { + $f; + return Ok(NetworkResult::InvalidMessage(s)); + } + NetworkResult::Value(v) => v, + } + }; +} + +#[macro_export] +macro_rules! network_result_value_or_log { + ($level: ident $r: expr => $f:tt) => { + match $r { + NetworkResult::Timeout => { + log_net!($level "{} at {}@{}:{}", "Timeout".green(), file!(), line!(), column!()); + $f + } + NetworkResult::NoConnection(e) => { + log_net!($level "{}({}) at {}@{}:{}", "No connection".green(), e.to_string(), file!(), line!(), column!()); + $f + } + NetworkResult::InvalidMessage(s) => { + log_net!($level "{}({}) at {}@{}:{}", "No connection".green(), s, file!(), line!(), column!()); + $f + } NetworkResult::Value(v) => v, } }; diff --git a/veilid-core/src/xx/timeout_or.rs b/veilid-core/src/xx/timeout_or.rs index 10f42cd1..04f57d53 100644 --- a/veilid-core/src/xx/timeout_or.rs +++ b/veilid-core/src/xx/timeout_or.rs @@ -77,6 +77,7 @@ impl TimeoutOrResultExt for TimeoutOr> { ////////////////////////////////////////////////////////////////// // Non-fallible timeout +#[must_use] pub enum TimeoutOr { Timeout, Value(T), @@ -97,13 +98,25 @@ impl TimeoutOr { pub fn is_value(&self) -> bool { matches!(self, Self::Value(_)) } - - pub fn ok(self) -> Result { + pub fn map X>(self, f: F) -> TimeoutOr { + match self { + Self::Timeout => TimeoutOr::::Timeout, + Self::Value(v) => TimeoutOr::::Value(f(v)), + } + } + pub fn into_timeout_error(self) -> Result { match self { Self::Timeout => Err(TimeoutError {}), Self::Value(v) => Ok(v), } } + + pub fn into_option(self) -> Option { + match self { + Self::Timeout => None, + Self::Value(v) => Some(v), + } + } } impl From> for Option {