From 749ba91b8bf411bf70f611a405a69b6b4a210440 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 20 Nov 2022 22:30:45 -0500 Subject: [PATCH] checkpoint --- veilid-core/src/rpc_processor/mod.rs | 17 ++++++-- veilid-core/src/rpc_processor/rpc_app_call.rs | 11 +++-- .../src/rpc_processor/rpc_app_message.rs | 9 ++-- .../src/rpc_processor/rpc_cancel_tunnel.rs | 4 +- .../src/rpc_processor/rpc_complete_tunnel.rs | 7 +++- .../src/rpc_processor/rpc_find_block.rs | 7 +++- .../src/rpc_processor/rpc_find_node.rs | 11 +++-- .../src/rpc_processor/rpc_get_value.rs | 7 +++- .../src/rpc_processor/rpc_node_info_update.rs | 19 ++++++--- .../src/rpc_processor/rpc_return_receipt.rs | 35 +++++++--------- veilid-core/src/rpc_processor/rpc_route.rs | 41 +++++++++++-------- .../src/rpc_processor/rpc_set_value.rs | 5 ++- veilid-core/src/rpc_processor/rpc_signal.rs | 18 ++++---- .../src/rpc_processor/rpc_start_tunnel.rs | 7 +++- veilid-core/src/rpc_processor/rpc_status.rs | 19 +++++---- .../src/rpc_processor/rpc_supply_block.rs | 7 +++- .../rpc_processor/rpc_validate_dial_info.rs | 25 +++++++---- .../src/rpc_processor/rpc_value_changed.rs | 5 ++- .../src/rpc_processor/rpc_watch_value.rs | 7 +++- 19 files changed, 165 insertions(+), 96 deletions(-) diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 460323ad..ec18eb91 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -846,7 +846,10 @@ impl RPCProcessor { ////////////////////////////////////////////////////////////////////// #[instrument(level = "trace", skip(self, encoded_msg), err)] - async fn process_rpc_message(&self, encoded_msg: RPCMessageEncoded) -> Result<(), RPCError> { + async fn process_rpc_message( + &self, + encoded_msg: RPCMessageEncoded, + ) -> Result, RPCError> { // Decode operation appropriately based on header detail let msg = match &encoded_msg.header.detail { RPCMessageHeaderDetail::Direct(detail) => { @@ -990,11 +993,19 @@ impl RPCProcessor { let rpc_worker_span = span!(parent: None, Level::TRACE, "rpc_worker recv"); // xxx: causes crash (Missing otel data span extensions) // rpc_worker_span.follows_from(span_id); - let _ = self + let res = match self .process_rpc_message(msg) .instrument(rpc_worker_span) .await - .map_err(logthru_rpc!("couldn't process rpc message")); + { + Err(e) => { + log_rpc!(error "couldn't process rpc message: {}", e); + continue; + } + + Ok(v) => v, + }; + network_result_value_or_log!(debug res => {}); } } diff --git a/veilid-core/src/rpc_processor/rpc_app_call.rs b/veilid-core/src/rpc_processor/rpc_app_call.rs index 8f57c4af..3fc60207 100644 --- a/veilid-core/src/rpc_processor/rpc_app_call.rs +++ b/veilid-core/src/rpc_processor/rpc_app_call.rs @@ -39,8 +39,11 @@ impl RPCProcessor { ))) } - #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] - pub(crate) async fn process_app_call_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)] + pub(crate) async fn process_app_call_q( + &self, + msg: RPCMessage, + ) -> Result, RPCError> { // Get the question let app_call_q = match msg.operation.kind() { RPCOperationKind::Question(q) => match q.detail() { @@ -76,7 +79,7 @@ impl RPCProcessor { TimeoutOr::Timeout => { // No message sent on timeout, but this isn't an error log_rpc!(debug "App call timed out for id {}", id); - return Ok(()); + return Ok(NetworkResult::timeout()); } TimeoutOr::Value(v) => v, }; @@ -89,7 +92,7 @@ impl RPCProcessor { .answer(msg, RPCAnswer::new(RPCAnswerDetail::AppCallA(app_call_a))) .await?; tracing::Span::current().record("res", &tracing::field::display(res)); - Ok(()) + Ok(res) } /// Exposed to API for apps to return app call answers diff --git a/veilid-core/src/rpc_processor/rpc_app_message.rs b/veilid-core/src/rpc_processor/rpc_app_message.rs index 0b069280..29f152b4 100644 --- a/veilid-core/src/rpc_processor/rpc_app_message.rs +++ b/veilid-core/src/rpc_processor/rpc_app_message.rs @@ -18,8 +18,11 @@ impl RPCProcessor { Ok(NetworkResult::value(())) } - #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] - pub(crate) async fn process_app_message(&self, msg: RPCMessage) -> Result<(), RPCError> { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)] + pub(crate) async fn process_app_message( + &self, + msg: RPCMessage, + ) -> Result, RPCError> { // Get the statement let app_message = match msg.operation.into_kind() { RPCOperationKind::Statement(s) => match s.into_detail() { @@ -37,6 +40,6 @@ impl RPCProcessor { message, })); - Ok(()) + Ok(NetworkResult::value(())) } } diff --git a/veilid-core/src/rpc_processor/rpc_cancel_tunnel.rs b/veilid-core/src/rpc_processor/rpc_cancel_tunnel.rs index 4d7d046b..04a7a332 100644 --- a/veilid-core/src/rpc_processor/rpc_cancel_tunnel.rs +++ b/veilid-core/src/rpc_processor/rpc_cancel_tunnel.rs @@ -1,8 +1,8 @@ use super::*; impl RPCProcessor { - #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)] - pub(crate) async fn process_cancel_tunnel_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), ret, err)] + pub(crate) async fn process_cancel_tunnel_q(&self, msg: RPCMessage) -> Result, RPCError> { // tracing::Span::current().record("res", &tracing::field::display(res)); Err(RPCError::unimplemented("process_cancel_tunnel_q")) diff --git a/veilid-core/src/rpc_processor/rpc_complete_tunnel.rs b/veilid-core/src/rpc_processor/rpc_complete_tunnel.rs index 66975971..9270921b 100644 --- a/veilid-core/src/rpc_processor/rpc_complete_tunnel.rs +++ b/veilid-core/src/rpc_processor/rpc_complete_tunnel.rs @@ -1,8 +1,11 @@ use super::*; impl RPCProcessor { - #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)] - pub(crate) async fn process_complete_tunnel_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), ret, err)] + pub(crate) async fn process_complete_tunnel_q( + &self, + msg: RPCMessage, + ) -> Result, RPCError> { // tracing::Span::current().record("res", &tracing::field::display(res)); Err(RPCError::unimplemented("process_complete_tunnel_q")) } diff --git a/veilid-core/src/rpc_processor/rpc_find_block.rs b/veilid-core/src/rpc_processor/rpc_find_block.rs index 8c0dcf94..4d7ca42b 100644 --- a/veilid-core/src/rpc_processor/rpc_find_block.rs +++ b/veilid-core/src/rpc_processor/rpc_find_block.rs @@ -1,8 +1,11 @@ use super::*; impl RPCProcessor { - #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)] - pub(crate) async fn process_find_block_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), ret, err)] + pub(crate) async fn process_find_block_q( + &self, + msg: RPCMessage, + ) -> Result, RPCError> { // tracing::Span::current().record("res", &tracing::field::display(res)); Err(RPCError::unimplemented("process_find_block_q")) } diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index c75d820b..4f80df2d 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -66,13 +66,16 @@ impl RPCProcessor { ))) } - #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)] - pub(crate) async fn process_find_node_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), ret, err)] + pub(crate) async fn process_find_node_q( + &self, + msg: RPCMessage, + ) -> Result, RPCError> { // Ensure this never came over a private route, safety route is okay though match &msg.header.detail { RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {} RPCMessageHeaderDetail::PrivateRouted(_) => { - return Err(RPCError::protocol( + return Ok(NetworkResult::invalid_message( "not processing find node request over private route", )) } @@ -130,6 +133,6 @@ impl RPCProcessor { .answer(msg, RPCAnswer::new(RPCAnswerDetail::FindNodeA(find_node_a))) .await?; tracing::Span::current().record("res", &tracing::field::display(res)); - Ok(()) + Ok(res) } } diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index 23fb5175..65aad72f 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -1,8 +1,11 @@ use super::*; impl RPCProcessor { - #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)] - pub(crate) async fn process_get_value_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), ret, err)] + pub(crate) async fn process_get_value_q( + &self, + msg: RPCMessage, + ) -> Result, RPCError> { //tracing::Span::current().record("res", &tracing::field::display(res)); Err(RPCError::unimplemented("process_get_value_q")) } diff --git a/veilid-core/src/rpc_processor/rpc_node_info_update.rs b/veilid-core/src/rpc_processor/rpc_node_info_update.rs index b264baeb..a676f6b2 100644 --- a/veilid-core/src/rpc_processor/rpc_node_info_update.rs +++ b/veilid-core/src/rpc_processor/rpc_node_info_update.rs @@ -30,12 +30,17 @@ impl RPCProcessor { Ok(NetworkResult::value(())) } - #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] - pub(crate) async fn process_node_info_update(&self, msg: RPCMessage) -> Result<(), RPCError> { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)] + pub(crate) async fn process_node_info_update( + &self, + msg: RPCMessage, + ) -> Result, RPCError> { let detail = match msg.header.detail { RPCMessageHeaderDetail::Direct(detail) => detail, RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => { - return Err(RPCError::protocol("node_info_update must be direct")); + return Ok(NetworkResult::invalid_message( + "node_info_update must be direct", + )); } }; let sender_node_id = detail.envelope.get_sender_id(); @@ -52,8 +57,10 @@ impl RPCProcessor { // Update our routing table with signed node info if !self.filter_node_info(routing_domain, &node_info_update.signed_node_info) { - log_rpc!(debug "node info doesn't belong in {:?} routing domain: {}", routing_domain, sender_node_id); - return Ok(()); + return Ok(NetworkResult::invalid_message(format!( + "node info doesn't belong in {:?} routing domain: {}", + routing_domain, sender_node_id + ))); } self.routing_table().register_node_with_signed_node_info( @@ -63,6 +70,6 @@ impl RPCProcessor { false, ); - Ok(()) + Ok(NetworkResult::value(())) } } diff --git a/veilid-core/src/rpc_processor/rpc_return_receipt.rs b/veilid-core/src/rpc_processor/rpc_return_receipt.rs index c4ae3b4c..a5537369 100644 --- a/veilid-core/src/rpc_processor/rpc_return_receipt.rs +++ b/veilid-core/src/rpc_processor/rpc_return_receipt.rs @@ -20,8 +20,11 @@ impl RPCProcessor { Ok(NetworkResult::value(())) } - #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] - pub(crate) async fn process_return_receipt(&self, msg: RPCMessage) -> Result<(), RPCError> { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)] + pub(crate) async fn process_return_receipt( + &self, + msg: RPCMessage, + ) -> Result, RPCError> { // Get the statement let RPCOperationReturnReceipt { receipt } = match msg.operation.into_kind() { RPCOperationKind::Statement(s) => match s.into_detail() { @@ -34,30 +37,22 @@ impl RPCProcessor { // Handle it let network_manager = self.network_manager(); - match msg.header.detail { + let res = match msg.header.detail { RPCMessageHeaderDetail::Direct(detail) => { - network_result_value_or_log!(debug - network_manager - .handle_in_band_receipt(receipt, detail.peer_noderef) - .await => {} - ); + network_manager + .handle_in_band_receipt(receipt, detail.peer_noderef) + .await } RPCMessageHeaderDetail::SafetyRouted(_) => { - network_result_value_or_log!(debug - network_manager - .handle_safety_receipt(receipt) - .await => {} - ); + network_manager.handle_safety_receipt(receipt).await } RPCMessageHeaderDetail::PrivateRouted(detail) => { - network_result_value_or_log!(debug - network_manager - .handle_private_receipt(receipt, detail.private_route) - .await => {} - ); + network_manager + .handle_private_receipt(receipt, detail.private_route) + .await } - } + }; - Ok(()) + Ok(res) } } diff --git a/veilid-core/src/rpc_processor/rpc_route.rs b/veilid-core/src/rpc_processor/rpc_route.rs index ac2513db..1e360ab7 100644 --- a/veilid-core/src/rpc_processor/rpc_route.rs +++ b/veilid-core/src/rpc_processor/rpc_route.rs @@ -1,12 +1,14 @@ use super::*; +xxx go through and convert errs to networkresult + impl RPCProcessor { #[instrument(level = "trace", skip_all, err)] async fn process_route_safety_route_hop( &self, route: RPCOperationRoute, route_hop: RouteHop, - ) -> Result<(), RPCError> { + ) -> Result, RPCError> { // Make sure hop count makes sense if route.safety_route.hop_count as usize > self.unlocked_inner.max_route_hop_count { return Err(RPCError::protocol( @@ -72,11 +74,11 @@ impl RPCProcessor { #[instrument(level = "trace", skip_all, err)] async fn process_route_private_route_hop( &self, - mut routed_operation: RoutedOperation, + routed_operation: RoutedOperation, next_route_node: RouteNode, safety_route_public_key: DHTKey, next_private_route: PrivateRoute, - ) -> Result<(), RPCError> { + ) -> Result, RPCError> { // Make sure hop count makes sense if next_private_route.hop_count as usize > self.unlocked_inner.max_route_hop_count { return Err(RPCError::protocol( @@ -143,7 +145,7 @@ impl RPCProcessor { detail: RPCMessageHeaderDetailDirect, routed_operation: RoutedOperation, safety_route: &SafetyRoute, - ) -> Result<(), RPCError> { + ) -> Result, RPCError> { // Get sequencing preference let sequencing = if detail .connection_descriptor @@ -187,7 +189,7 @@ impl RPCProcessor { routed_operation: RoutedOperation, safety_route: &SafetyRoute, private_route: &PrivateRoute, - ) -> Result<(), RPCError> { + ) -> Result, RPCError> { // Get sender id let sender_id = detail.envelope.get_sender_id(); @@ -233,7 +235,7 @@ impl RPCProcessor { routed_operation: RoutedOperation, safety_route: &SafetyRoute, private_route: &PrivateRoute, - ) -> Result<(), RPCError> { + ) -> Result, RPCError> { // Make sure hop count makes sense if safety_route.hop_count != 0 { return Err(RPCError::protocol( @@ -267,7 +269,7 @@ impl RPCProcessor { operation: RoutedOperation, sr_pubkey: DHTKey, private_route: &PrivateRoute, - ) -> Result<(), RPCError> { + ) -> Result, RPCError> { let PrivateRouteHops::FirstHop(pr_first_hop) = &private_route.hops else { return Err(RPCError::protocol("switching from safety route to private route requires first hop")); }; @@ -290,13 +292,16 @@ impl RPCProcessor { .await } - #[instrument(level = "trace", skip(self, msg), err)] - pub(crate) async fn process_route(&self, msg: RPCMessage) -> Result<(), RPCError> { + #[instrument(level = "trace", skip(self, msg), ret, err)] + pub(crate) async fn process_route( + &self, + msg: RPCMessage, + ) -> Result, RPCError> { // Get header detail, must be direct and not inside a route itself let detail = match msg.header.detail { RPCMessageHeaderDetail::Direct(detail) => detail, RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => { - return Err(RPCError::protocol( + return Ok(NetworkResult::invalid_message( "route operation can not be inside route", )) } @@ -314,7 +319,7 @@ impl RPCProcessor { // Process routed operation version // xxx switch this to a Crypto trait factory method per issue#140 if route.operation.version != MAX_CRYPTO_VERSION { - return Err(RPCError::protocol( + return Ok(NetworkResult::invalid_message( "routes operation crypto is not valid version", )); } @@ -334,7 +339,7 @@ impl RPCProcessor { // See if this is last hop in safety route, if so, we're decoding a PrivateRoute not a RouteHop let Some(dec_blob_tag) = dec_blob_data.pop() else { - return Err(RPCError::protocol("no bytes in blob")); + return Ok(NetworkResult::invalid_message("no bytes in blob")); }; let dec_blob_reader = RPCMessageData::new(dec_blob_data).get_reader()?; @@ -369,7 +374,7 @@ impl RPCProcessor { self.process_route_safety_route_hop(route, route_hop) .await?; } else { - return Err(RPCError::protocol("invalid blob tag")); + return Ok(NetworkResult::invalid_message("invalid blob tag")); } } // No safety route left, now doing private route @@ -411,7 +416,9 @@ impl RPCProcessor { // Ensure hop count > 0 if private_route.hop_count == 0 { - return Err(RPCError::protocol("route should not be at the end")); + return Ok(NetworkResult::invalid_message( + "route should not be at the end", + )); } // Sign the operation if this is not our last hop @@ -443,7 +450,9 @@ impl RPCProcessor { PrivateRouteHops::Empty => { // Ensure hop count == 0 if private_route.hop_count != 0 { - return Err(RPCError::protocol("route should be at the end")); + return Ok(NetworkResult::invalid_message( + "route should be at the end", + )); } // No hops left, time to process the routed operation @@ -458,6 +467,6 @@ impl RPCProcessor { } } - Ok(()) + Ok(NetworkResult::value(())) } } diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index 8ff11f49..9f89e13c 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -2,7 +2,10 @@ use super::*; impl RPCProcessor { #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] - pub(crate) async fn process_set_value_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + pub(crate) async fn process_set_value_q( + &self, + msg: RPCMessage, + ) -> Result, RPCError> { // tracing::Span::current().record("res", &tracing::field::display(res)); Err(RPCError::unimplemented("process_set_value_q")) } diff --git a/veilid-core/src/rpc_processor/rpc_signal.rs b/veilid-core/src/rpc_processor/rpc_signal.rs index 0180fb6c..592c2ed8 100644 --- a/veilid-core/src/rpc_processor/rpc_signal.rs +++ b/veilid-core/src/rpc_processor/rpc_signal.rs @@ -31,14 +31,17 @@ impl RPCProcessor { Ok(NetworkResult::value(())) } - #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] - pub(crate) async fn process_signal(&self, msg: RPCMessage) -> Result<(), RPCError> { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)] + pub(crate) async fn process_signal( + &self, + msg: RPCMessage, + ) -> Result, RPCError> { // Can't allow anything other than direct packets here, as handling reverse connections // or anything like via signals over private routes would deanonymize the route match &msg.header.detail { RPCMessageHeaderDetail::Direct(_) => {} RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => { - return Err(RPCError::protocol("signal must be direct")); + return Ok(NetworkResult::invalid_message("signal must be direct")); } }; @@ -53,14 +56,11 @@ impl RPCProcessor { // Handle it let network_manager = self.network_manager(); - network_result_value_or_log!(debug network_manager + let res = network_manager .handle_signal(signal.signal_info) .await - .map_err(RPCError::network)? => { - return Ok(()); - } - ); + .map_err(RPCError::network)?; - Ok(()) + Ok(res) } } diff --git a/veilid-core/src/rpc_processor/rpc_start_tunnel.rs b/veilid-core/src/rpc_processor/rpc_start_tunnel.rs index 616f7fb0..95218748 100644 --- a/veilid-core/src/rpc_processor/rpc_start_tunnel.rs +++ b/veilid-core/src/rpc_processor/rpc_start_tunnel.rs @@ -1,8 +1,11 @@ use super::*; impl RPCProcessor { - #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] - pub(crate) async fn process_start_tunnel_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)] + pub(crate) async fn process_start_tunnel_q( + &self, + msg: RPCMessage, + ) -> Result, RPCError> { // tracing::Span::current().record("res", &tracing::field::display(res)); Err(RPCError::unimplemented("process_start_tunnel_q")) } diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index 32e13a43..4bbce6e1 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -179,8 +179,11 @@ impl RPCProcessor { Ok(NetworkResult::value(Answer::new(latency, opt_sender_info))) } - #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)] - pub(crate) async fn process_status_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), ret, err)] + pub(crate) async fn process_status_q( + &self, + msg: RPCMessage, + ) -> Result, RPCError> { // Get the question let status_q = match msg.operation.kind() { RPCOperationKind::Question(q) => match q.detail() { @@ -200,14 +203,16 @@ impl RPCProcessor { match routing_domain { RoutingDomain::PublicInternet => { if !matches!(node_status, NodeStatus::PublicInternet(_)) { - log_rpc!(debug "node status doesn't match PublicInternet routing domain"); - return Ok(()); + return Ok(NetworkResult::invalid_message( + "node status doesn't match PublicInternet routing domain", + )); } } RoutingDomain::LocalNetwork => { if !matches!(node_status, NodeStatus::LocalNetwork(_)) { - log_rpc!(debug "node status doesn't match LocalNetwork routing domain"); - return Ok(()); + return Ok(NetworkResult::invalid_message( + "node status doesn't match LocalNetwork routing domain", + )); } } } @@ -249,6 +254,6 @@ impl RPCProcessor { .answer(msg, RPCAnswer::new(RPCAnswerDetail::StatusA(status_a))) .await?; tracing::Span::current().record("res", &tracing::field::display(res)); - Ok(()) + Ok(res) } } diff --git a/veilid-core/src/rpc_processor/rpc_supply_block.rs b/veilid-core/src/rpc_processor/rpc_supply_block.rs index 2de9a72e..9a98f3ff 100644 --- a/veilid-core/src/rpc_processor/rpc_supply_block.rs +++ b/veilid-core/src/rpc_processor/rpc_supply_block.rs @@ -1,8 +1,11 @@ use super::*; impl RPCProcessor { - #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] - pub(crate) async fn process_supply_block_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)] + pub(crate) async fn process_supply_block_q( + &self, + msg: RPCMessage, + ) -> Result, RPCError> { // tracing::Span::current().record("res", &tracing::field::display(res)); Err(RPCError::unimplemented("process_supply_block_q")) diff --git a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs index 14e3c905..954b8d44 100644 --- a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs +++ b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs @@ -54,12 +54,17 @@ impl RPCProcessor { } } - #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] - pub(crate) async fn process_validate_dial_info(&self, msg: RPCMessage) -> Result<(), RPCError> { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)] + pub(crate) async fn process_validate_dial_info( + &self, + msg: RPCMessage, + ) -> Result, RPCError> { let detail = match msg.header.detail { RPCMessageHeaderDetail::Direct(detail) => detail, RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => { - return Err(RPCError::protocol("validate_dial_info must be direct")); + return Ok(NetworkResult::invalid_message( + "validate_dial_info must be direct", + )); } }; @@ -117,7 +122,7 @@ impl RPCProcessor { // Find nodes matching filter to redirect this to let peers = routing_table.find_fast_public_nodes_filtered(node_count, filters); if peers.is_empty() { - return Err(RPCError::internal(format!( + return Ok(NetworkResult::no_connection_other(format!( "no peers able to reach dialinfo '{:?}'", dial_info ))); @@ -139,13 +144,17 @@ impl RPCProcessor { // Send the validate_dial_info request // This can only be sent directly, as relays can not validate dial info - network_result_value_or_log!(debug self.statement(Destination::direct(peer), statement) + let res = network_result_value_or_log!(debug self.statement(Destination::direct(peer), statement) .await? => { - return Ok(()); + continue; } ); + return Ok(NetworkResult::value(())); } - return Ok(()); + + return Ok(NetworkResult::no_connection_other( + "could not redirect, no peers were reachable", + )); }; // Otherwise send a return receipt directly @@ -156,6 +165,6 @@ impl RPCProcessor { .await .map_err(RPCError::network)?; - Ok(()) + Ok(NetworkResult::value(())) } } diff --git a/veilid-core/src/rpc_processor/rpc_value_changed.rs b/veilid-core/src/rpc_processor/rpc_value_changed.rs index dfa631ea..6c46d4d7 100644 --- a/veilid-core/src/rpc_processor/rpc_value_changed.rs +++ b/veilid-core/src/rpc_processor/rpc_value_changed.rs @@ -2,7 +2,10 @@ use super::*; impl RPCProcessor { #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] - pub(crate) async fn process_value_changed(&self, msg: RPCMessage) -> Result<(), RPCError> { + pub(crate) async fn process_value_changed( + &self, + msg: RPCMessage, + ) -> Result, RPCError> { // tracing::Span::current().record("res", &tracing::field::display(res)); Err(RPCError::unimplemented("process_value_changed")) diff --git a/veilid-core/src/rpc_processor/rpc_watch_value.rs b/veilid-core/src/rpc_processor/rpc_watch_value.rs index b55f2a8d..8526e037 100644 --- a/veilid-core/src/rpc_processor/rpc_watch_value.rs +++ b/veilid-core/src/rpc_processor/rpc_watch_value.rs @@ -1,8 +1,11 @@ use super::*; impl RPCProcessor { - #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] - pub(crate) async fn process_watch_value_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)] + pub(crate) async fn process_watch_value_q( + &self, + msg: RPCMessage, + ) -> Result, RPCError> { // tracing::Span::current().record("res", &tracing::field::display(res)); Err(RPCError::unimplemented("process_watch_value_q")) }