checkpoint

This commit is contained in:
John Smith 2022-11-20 22:30:45 -05:00
parent 5453eb2e92
commit 749ba91b8b
19 changed files with 165 additions and 96 deletions

View File

@ -846,7 +846,10 @@ impl RPCProcessor {
//////////////////////////////////////////////////////////////////////
#[instrument(level = "trace", skip(self, encoded_msg), err)]
async fn process_rpc_message(&self, encoded_msg: RPCMessageEncoded) -> Result<(), RPCError> {
async fn process_rpc_message(
&self,
encoded_msg: RPCMessageEncoded,
) -> Result<NetworkResult<()>, RPCError> {
// Decode operation appropriately based on header detail
let msg = match &encoded_msg.header.detail {
RPCMessageHeaderDetail::Direct(detail) => {
@ -990,11 +993,19 @@ impl RPCProcessor {
let rpc_worker_span = span!(parent: None, Level::TRACE, "rpc_worker recv");
// xxx: causes crash (Missing otel data span extensions)
// rpc_worker_span.follows_from(span_id);
let _ = self
let res = match self
.process_rpc_message(msg)
.instrument(rpc_worker_span)
.await
.map_err(logthru_rpc!("couldn't process rpc message"));
{
Err(e) => {
log_rpc!(error "couldn't process rpc message: {}", e);
continue;
}
Ok(v) => v,
};
network_result_value_or_log!(debug res => {});
}
}

View File

@ -39,8 +39,11 @@ impl RPCProcessor {
)))
}
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)]
pub(crate) async fn process_app_call_q(&self, msg: RPCMessage) -> Result<(), RPCError> {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_app_call_q(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Get the question
let app_call_q = match msg.operation.kind() {
RPCOperationKind::Question(q) => match q.detail() {
@ -76,7 +79,7 @@ impl RPCProcessor {
TimeoutOr::Timeout => {
// No message sent on timeout, but this isn't an error
log_rpc!(debug "App call timed out for id {}", id);
return Ok(());
return Ok(NetworkResult::timeout());
}
TimeoutOr::Value(v) => v,
};
@ -89,7 +92,7 @@ impl RPCProcessor {
.answer(msg, RPCAnswer::new(RPCAnswerDetail::AppCallA(app_call_a)))
.await?;
tracing::Span::current().record("res", &tracing::field::display(res));
Ok(())
Ok(res)
}
/// Exposed to API for apps to return app call answers

View File

@ -18,8 +18,11 @@ impl RPCProcessor {
Ok(NetworkResult::value(()))
}
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)]
pub(crate) async fn process_app_message(&self, msg: RPCMessage) -> Result<(), RPCError> {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_app_message(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Get the statement
let app_message = match msg.operation.into_kind() {
RPCOperationKind::Statement(s) => match s.into_detail() {
@ -37,6 +40,6 @@ impl RPCProcessor {
message,
}));
Ok(())
Ok(NetworkResult::value(()))
}
}

View File

@ -1,8 +1,8 @@
use super::*;
impl RPCProcessor {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)]
pub(crate) async fn process_cancel_tunnel_q(&self, msg: RPCMessage) -> Result<(), RPCError> {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), ret, err)]
pub(crate) async fn process_cancel_tunnel_q(&self, msg: RPCMessage) -> Result<NetworkResult<()>, RPCError> {
// tracing::Span::current().record("res", &tracing::field::display(res));
Err(RPCError::unimplemented("process_cancel_tunnel_q"))

View File

@ -1,8 +1,11 @@
use super::*;
impl RPCProcessor {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)]
pub(crate) async fn process_complete_tunnel_q(&self, msg: RPCMessage) -> Result<(), RPCError> {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), ret, err)]
pub(crate) async fn process_complete_tunnel_q(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// tracing::Span::current().record("res", &tracing::field::display(res));
Err(RPCError::unimplemented("process_complete_tunnel_q"))
}

View File

@ -1,8 +1,11 @@
use super::*;
impl RPCProcessor {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)]
pub(crate) async fn process_find_block_q(&self, msg: RPCMessage) -> Result<(), RPCError> {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), ret, err)]
pub(crate) async fn process_find_block_q(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// tracing::Span::current().record("res", &tracing::field::display(res));
Err(RPCError::unimplemented("process_find_block_q"))
}

View File

@ -66,13 +66,16 @@ impl RPCProcessor {
)))
}
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)]
pub(crate) async fn process_find_node_q(&self, msg: RPCMessage) -> Result<(), RPCError> {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), ret, err)]
pub(crate) async fn process_find_node_q(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ensure this never came over a private route, safety route is okay though
match &msg.header.detail {
RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {}
RPCMessageHeaderDetail::PrivateRouted(_) => {
return Err(RPCError::protocol(
return Ok(NetworkResult::invalid_message(
"not processing find node request over private route",
))
}
@ -130,6 +133,6 @@ impl RPCProcessor {
.answer(msg, RPCAnswer::new(RPCAnswerDetail::FindNodeA(find_node_a)))
.await?;
tracing::Span::current().record("res", &tracing::field::display(res));
Ok(())
Ok(res)
}
}

View File

@ -1,8 +1,11 @@
use super::*;
impl RPCProcessor {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)]
pub(crate) async fn process_get_value_q(&self, msg: RPCMessage) -> Result<(), RPCError> {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), ret, err)]
pub(crate) async fn process_get_value_q(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
//tracing::Span::current().record("res", &tracing::field::display(res));
Err(RPCError::unimplemented("process_get_value_q"))
}

View File

@ -30,12 +30,17 @@ impl RPCProcessor {
Ok(NetworkResult::value(()))
}
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)]
pub(crate) async fn process_node_info_update(&self, msg: RPCMessage) -> Result<(), RPCError> {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_node_info_update(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
let detail = match msg.header.detail {
RPCMessageHeaderDetail::Direct(detail) => detail,
RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => {
return Err(RPCError::protocol("node_info_update must be direct"));
return Ok(NetworkResult::invalid_message(
"node_info_update must be direct",
));
}
};
let sender_node_id = detail.envelope.get_sender_id();
@ -52,8 +57,10 @@ impl RPCProcessor {
// Update our routing table with signed node info
if !self.filter_node_info(routing_domain, &node_info_update.signed_node_info) {
log_rpc!(debug "node info doesn't belong in {:?} routing domain: {}", routing_domain, sender_node_id);
return Ok(());
return Ok(NetworkResult::invalid_message(format!(
"node info doesn't belong in {:?} routing domain: {}",
routing_domain, sender_node_id
)));
}
self.routing_table().register_node_with_signed_node_info(
@ -63,6 +70,6 @@ impl RPCProcessor {
false,
);
Ok(())
Ok(NetworkResult::value(()))
}
}

View File

@ -20,8 +20,11 @@ impl RPCProcessor {
Ok(NetworkResult::value(()))
}
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)]
pub(crate) async fn process_return_receipt(&self, msg: RPCMessage) -> Result<(), RPCError> {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_return_receipt(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Get the statement
let RPCOperationReturnReceipt { receipt } = match msg.operation.into_kind() {
RPCOperationKind::Statement(s) => match s.into_detail() {
@ -34,30 +37,22 @@ impl RPCProcessor {
// Handle it
let network_manager = self.network_manager();
match msg.header.detail {
let res = match msg.header.detail {
RPCMessageHeaderDetail::Direct(detail) => {
network_result_value_or_log!(debug
network_manager
.handle_in_band_receipt(receipt, detail.peer_noderef)
.await => {}
);
.await
}
RPCMessageHeaderDetail::SafetyRouted(_) => {
network_result_value_or_log!(debug
network_manager
.handle_safety_receipt(receipt)
.await => {}
);
network_manager.handle_safety_receipt(receipt).await
}
RPCMessageHeaderDetail::PrivateRouted(detail) => {
network_result_value_or_log!(debug
network_manager
.handle_private_receipt(receipt, detail.private_route)
.await => {}
);
}
.await
}
};
Ok(())
Ok(res)
}
}

View File

@ -1,12 +1,14 @@
use super::*;
xxx go through and convert errs to networkresult
impl RPCProcessor {
#[instrument(level = "trace", skip_all, err)]
async fn process_route_safety_route_hop(
&self,
route: RPCOperationRoute,
route_hop: RouteHop,
) -> Result<(), RPCError> {
) -> Result<NetworkResult<()>, RPCError> {
// Make sure hop count makes sense
if route.safety_route.hop_count as usize > self.unlocked_inner.max_route_hop_count {
return Err(RPCError::protocol(
@ -72,11 +74,11 @@ impl RPCProcessor {
#[instrument(level = "trace", skip_all, err)]
async fn process_route_private_route_hop(
&self,
mut routed_operation: RoutedOperation,
routed_operation: RoutedOperation,
next_route_node: RouteNode,
safety_route_public_key: DHTKey,
next_private_route: PrivateRoute,
) -> Result<(), RPCError> {
) -> Result<NetworkResult<()>, RPCError> {
// Make sure hop count makes sense
if next_private_route.hop_count as usize > self.unlocked_inner.max_route_hop_count {
return Err(RPCError::protocol(
@ -143,7 +145,7 @@ impl RPCProcessor {
detail: RPCMessageHeaderDetailDirect,
routed_operation: RoutedOperation,
safety_route: &SafetyRoute,
) -> Result<(), RPCError> {
) -> Result<NetworkResult<()>, RPCError> {
// Get sequencing preference
let sequencing = if detail
.connection_descriptor
@ -187,7 +189,7 @@ impl RPCProcessor {
routed_operation: RoutedOperation,
safety_route: &SafetyRoute,
private_route: &PrivateRoute,
) -> Result<(), RPCError> {
) -> Result<NetworkResult<()>, RPCError> {
// Get sender id
let sender_id = detail.envelope.get_sender_id();
@ -233,7 +235,7 @@ impl RPCProcessor {
routed_operation: RoutedOperation,
safety_route: &SafetyRoute,
private_route: &PrivateRoute,
) -> Result<(), RPCError> {
) -> Result<NetworkResult<()>, RPCError> {
// Make sure hop count makes sense
if safety_route.hop_count != 0 {
return Err(RPCError::protocol(
@ -267,7 +269,7 @@ impl RPCProcessor {
operation: RoutedOperation,
sr_pubkey: DHTKey,
private_route: &PrivateRoute,
) -> Result<(), RPCError> {
) -> Result<NetworkResult<()>, RPCError> {
let PrivateRouteHops::FirstHop(pr_first_hop) = &private_route.hops else {
return Err(RPCError::protocol("switching from safety route to private route requires first hop"));
};
@ -290,13 +292,16 @@ impl RPCProcessor {
.await
}
#[instrument(level = "trace", skip(self, msg), err)]
pub(crate) async fn process_route(&self, msg: RPCMessage) -> Result<(), RPCError> {
#[instrument(level = "trace", skip(self, msg), ret, err)]
pub(crate) async fn process_route(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Get header detail, must be direct and not inside a route itself
let detail = match msg.header.detail {
RPCMessageHeaderDetail::Direct(detail) => detail,
RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => {
return Err(RPCError::protocol(
return Ok(NetworkResult::invalid_message(
"route operation can not be inside route",
))
}
@ -314,7 +319,7 @@ impl RPCProcessor {
// Process routed operation version
// xxx switch this to a Crypto trait factory method per issue#140
if route.operation.version != MAX_CRYPTO_VERSION {
return Err(RPCError::protocol(
return Ok(NetworkResult::invalid_message(
"routes operation crypto is not valid version",
));
}
@ -334,7 +339,7 @@ impl RPCProcessor {
// See if this is last hop in safety route, if so, we're decoding a PrivateRoute not a RouteHop
let Some(dec_blob_tag) = dec_blob_data.pop() else {
return Err(RPCError::protocol("no bytes in blob"));
return Ok(NetworkResult::invalid_message("no bytes in blob"));
};
let dec_blob_reader = RPCMessageData::new(dec_blob_data).get_reader()?;
@ -369,7 +374,7 @@ impl RPCProcessor {
self.process_route_safety_route_hop(route, route_hop)
.await?;
} else {
return Err(RPCError::protocol("invalid blob tag"));
return Ok(NetworkResult::invalid_message("invalid blob tag"));
}
}
// No safety route left, now doing private route
@ -411,7 +416,9 @@ impl RPCProcessor {
// Ensure hop count > 0
if private_route.hop_count == 0 {
return Err(RPCError::protocol("route should not be at the end"));
return Ok(NetworkResult::invalid_message(
"route should not be at the end",
));
}
// Sign the operation if this is not our last hop
@ -443,7 +450,9 @@ impl RPCProcessor {
PrivateRouteHops::Empty => {
// Ensure hop count == 0
if private_route.hop_count != 0 {
return Err(RPCError::protocol("route should be at the end"));
return Ok(NetworkResult::invalid_message(
"route should be at the end",
));
}
// No hops left, time to process the routed operation
@ -458,6 +467,6 @@ impl RPCProcessor {
}
}
Ok(())
Ok(NetworkResult::value(()))
}
}

View File

@ -2,7 +2,10 @@ use super::*;
impl RPCProcessor {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)]
pub(crate) async fn process_set_value_q(&self, msg: RPCMessage) -> Result<(), RPCError> {
pub(crate) async fn process_set_value_q(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// tracing::Span::current().record("res", &tracing::field::display(res));
Err(RPCError::unimplemented("process_set_value_q"))
}

View File

@ -31,14 +31,17 @@ impl RPCProcessor {
Ok(NetworkResult::value(()))
}
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)]
pub(crate) async fn process_signal(&self, msg: RPCMessage) -> Result<(), RPCError> {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_signal(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Can't allow anything other than direct packets here, as handling reverse connections
// or anything like via signals over private routes would deanonymize the route
match &msg.header.detail {
RPCMessageHeaderDetail::Direct(_) => {}
RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => {
return Err(RPCError::protocol("signal must be direct"));
return Ok(NetworkResult::invalid_message("signal must be direct"));
}
};
@ -53,14 +56,11 @@ impl RPCProcessor {
// Handle it
let network_manager = self.network_manager();
network_result_value_or_log!(debug network_manager
let res = network_manager
.handle_signal(signal.signal_info)
.await
.map_err(RPCError::network)? => {
return Ok(());
}
);
.map_err(RPCError::network)?;
Ok(())
Ok(res)
}
}

View File

@ -1,8 +1,11 @@
use super::*;
impl RPCProcessor {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)]
pub(crate) async fn process_start_tunnel_q(&self, msg: RPCMessage) -> Result<(), RPCError> {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_start_tunnel_q(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// tracing::Span::current().record("res", &tracing::field::display(res));
Err(RPCError::unimplemented("process_start_tunnel_q"))
}

View File

@ -179,8 +179,11 @@ impl RPCProcessor {
Ok(NetworkResult::value(Answer::new(latency, opt_sender_info)))
}
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)]
pub(crate) async fn process_status_q(&self, msg: RPCMessage) -> Result<(), RPCError> {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), ret, err)]
pub(crate) async fn process_status_q(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Get the question
let status_q = match msg.operation.kind() {
RPCOperationKind::Question(q) => match q.detail() {
@ -200,14 +203,16 @@ impl RPCProcessor {
match routing_domain {
RoutingDomain::PublicInternet => {
if !matches!(node_status, NodeStatus::PublicInternet(_)) {
log_rpc!(debug "node status doesn't match PublicInternet routing domain");
return Ok(());
return Ok(NetworkResult::invalid_message(
"node status doesn't match PublicInternet routing domain",
));
}
}
RoutingDomain::LocalNetwork => {
if !matches!(node_status, NodeStatus::LocalNetwork(_)) {
log_rpc!(debug "node status doesn't match LocalNetwork routing domain");
return Ok(());
return Ok(NetworkResult::invalid_message(
"node status doesn't match LocalNetwork routing domain",
));
}
}
}
@ -249,6 +254,6 @@ impl RPCProcessor {
.answer(msg, RPCAnswer::new(RPCAnswerDetail::StatusA(status_a)))
.await?;
tracing::Span::current().record("res", &tracing::field::display(res));
Ok(())
Ok(res)
}
}

View File

@ -1,8 +1,11 @@
use super::*;
impl RPCProcessor {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)]
pub(crate) async fn process_supply_block_q(&self, msg: RPCMessage) -> Result<(), RPCError> {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_supply_block_q(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// tracing::Span::current().record("res", &tracing::field::display(res));
Err(RPCError::unimplemented("process_supply_block_q"))

View File

@ -54,12 +54,17 @@ impl RPCProcessor {
}
}
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)]
pub(crate) async fn process_validate_dial_info(&self, msg: RPCMessage) -> Result<(), RPCError> {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_validate_dial_info(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
let detail = match msg.header.detail {
RPCMessageHeaderDetail::Direct(detail) => detail,
RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => {
return Err(RPCError::protocol("validate_dial_info must be direct"));
return Ok(NetworkResult::invalid_message(
"validate_dial_info must be direct",
));
}
};
@ -117,7 +122,7 @@ impl RPCProcessor {
// Find nodes matching filter to redirect this to
let peers = routing_table.find_fast_public_nodes_filtered(node_count, filters);
if peers.is_empty() {
return Err(RPCError::internal(format!(
return Ok(NetworkResult::no_connection_other(format!(
"no peers able to reach dialinfo '{:?}'",
dial_info
)));
@ -139,13 +144,17 @@ impl RPCProcessor {
// Send the validate_dial_info request
// This can only be sent directly, as relays can not validate dial info
network_result_value_or_log!(debug self.statement(Destination::direct(peer), statement)
let res = network_result_value_or_log!(debug self.statement(Destination::direct(peer), statement)
.await? => {
return Ok(());
continue;
}
);
return Ok(NetworkResult::value(()));
}
return Ok(());
return Ok(NetworkResult::no_connection_other(
"could not redirect, no peers were reachable",
));
};
// Otherwise send a return receipt directly
@ -156,6 +165,6 @@ impl RPCProcessor {
.await
.map_err(RPCError::network)?;
Ok(())
Ok(NetworkResult::value(()))
}
}

View File

@ -2,7 +2,10 @@ use super::*;
impl RPCProcessor {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)]
pub(crate) async fn process_value_changed(&self, msg: RPCMessage) -> Result<(), RPCError> {
pub(crate) async fn process_value_changed(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// tracing::Span::current().record("res", &tracing::field::display(res));
Err(RPCError::unimplemented("process_value_changed"))

View File

@ -1,8 +1,11 @@
use super::*;
impl RPCProcessor {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)]
pub(crate) async fn process_watch_value_q(&self, msg: RPCMessage) -> Result<(), RPCError> {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_watch_value_q(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// tracing::Span::current().record("res", &tracing::field::display(res));
Err(RPCError::unimplemented("process_watch_value_q"))
}