This commit is contained in:
John Smith 2022-11-21 20:21:46 -05:00
parent 749ba91b8b
commit e98fae711e
20 changed files with 169 additions and 116 deletions

View File

@ -7,6 +7,7 @@ use cursive::event::*;
use cursive::theme::*;
use cursive::traits::*;
use cursive::utils::markup::StyledString;
use cursive::view::ScrollStrategy;
use cursive::views::*;
use cursive::Cursive;
use cursive::CursiveRunnable;
@ -215,7 +216,13 @@ impl UI {
UI::setup_quit_handler(s);
});
}
fn clear_handler(siv: &mut Cursive) {
cursive_flexi_logger_view::clear_log();
UI::update_cb(siv);
}
fn node_events(s: &mut Cursive) -> ViewRef<FlexiLoggerView> {
s.find_name("node-events").unwrap()
}
fn node_events_panel(
s: &mut Cursive,
) -> ViewRef<Panel<ResizedView<NamedView<ScrollView<FlexiLoggerView>>>>> {
@ -737,8 +744,12 @@ impl UI {
// Create layouts
let node_events_view = Panel::new(
FlexiLoggerView::new_scrollable()
FlexiLoggerView::new()
.with_name("node-events")
.scrollable()
.scroll_x(true)
.scroll_y(true)
.scroll_strategy(ScrollStrategy::StickToBottom)
.full_screen(),
)
.title_position(HAlign::Left)
@ -822,6 +833,7 @@ impl UI {
UI::setup_colors(&mut siv, &mut inner, settings);
UI::setup_quit_handler(&mut siv);
siv.set_global_callback(cursive::event::Event::Ctrl(Key::K), UI::clear_handler);
drop(inner);
drop(siv);

View File

@ -977,7 +977,8 @@ impl RPCProcessor {
self.unlocked_inner
.waiting_rpc_table
.complete_op_waiter(msg.operation.op_id(), msg)
.await
.await?;
Ok(NetworkResult::value(()))
}
}
}
@ -1005,7 +1006,13 @@ impl RPCProcessor {
Ok(v) => v,
};
network_result_value_or_log!(debug res => {});
cfg_if::cfg_if! {
if #[cfg(debug_assertions)] {
network_result_value_or_log!(warn res => {});
} else {
network_result_value_or_log!(debug res => {});
}
}
}
}

View File

@ -88,11 +88,8 @@ impl RPCProcessor {
let app_call_a = RPCOperationAppCallA { message };
// Send status answer
let res = self
.answer(msg, RPCAnswer::new(RPCAnswerDetail::AppCallA(app_call_a)))
.await?;
tracing::Span::current().record("res", &tracing::field::display(res));
Ok(res)
self.answer(msg, RPCAnswer::new(RPCAnswerDetail::AppCallA(app_call_a)))
.await
}
/// Exposed to API for apps to return app call answers

View File

@ -13,9 +13,7 @@ impl RPCProcessor {
let statement = RPCStatement::new(RPCStatementDetail::AppMessage(app_message));
// Send the app message request
network_result_try!(self.statement(dest, statement).await?);
Ok(NetworkResult::value(()))
self.statement(dest, statement).await
}
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]

View File

@ -1,10 +1,11 @@
use super::*;
impl RPCProcessor {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), ret, err)]
pub(crate) async fn process_cancel_tunnel_q(&self, msg: RPCMessage) -> Result<NetworkResult<()>, RPCError> {
// tracing::Span::current().record("res", &tracing::field::display(res));
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_cancel_tunnel_q(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
Err(RPCError::unimplemented("process_cancel_tunnel_q"))
}
}

View File

@ -1,12 +1,11 @@
use super::*;
impl RPCProcessor {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), ret, err)]
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_complete_tunnel_q(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// tracing::Span::current().record("res", &tracing::field::display(res));
Err(RPCError::unimplemented("process_complete_tunnel_q"))
}
}

View File

@ -1,12 +1,11 @@
use super::*;
impl RPCProcessor {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), ret, err)]
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_find_block_q(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// tracing::Span::current().record("res", &tracing::field::display(res));
Err(RPCError::unimplemented("process_find_block_q"))
}
}

View File

@ -66,7 +66,7 @@ impl RPCProcessor {
)))
}
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), ret, err)]
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_find_node_q(
&self,
msg: RPCMessage,
@ -129,10 +129,7 @@ impl RPCProcessor {
};
// Send status answer
let res = self
.answer(msg, RPCAnswer::new(RPCAnswerDetail::FindNodeA(find_node_a)))
.await?;
tracing::Span::current().record("res", &tracing::field::display(res));
Ok(res)
self.answer(msg, RPCAnswer::new(RPCAnswerDetail::FindNodeA(find_node_a)))
.await
}
}

View File

@ -1,12 +1,11 @@
use super::*;
impl RPCProcessor {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), ret, err)]
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_get_value_q(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
//tracing::Span::current().record("res", &tracing::field::display(res));
Err(RPCError::unimplemented("process_get_value_q"))
}
}

View File

@ -63,12 +63,21 @@ impl RPCProcessor {
)));
}
self.routing_table().register_node_with_signed_node_info(
routing_domain,
sender_node_id,
node_info_update.signed_node_info,
false,
);
if self
.routing_table()
.register_node_with_signed_node_info(
routing_domain,
sender_node_id,
node_info_update.signed_node_info,
false,
)
.is_none()
{
return Ok(NetworkResult::invalid_message(format!(
"could not register node info update {}",
sender_node_id
)));
}
Ok(NetworkResult::value(()))
}

View File

@ -1,7 +1,5 @@
use super::*;
xxx go through and convert errs to networkresult
impl RPCProcessor {
#[instrument(level = "trace", skip_all, err)]
async fn process_route_safety_route_hop(
@ -11,44 +9,48 @@ impl RPCProcessor {
) -> Result<NetworkResult<()>, RPCError> {
// Make sure hop count makes sense
if route.safety_route.hop_count as usize > self.unlocked_inner.max_route_hop_count {
return Err(RPCError::protocol(
return Ok(NetworkResult::invalid_message(
"Safety route hop count too high to process",
));
}
if route.safety_route.hop_count == 0 {
return Err(RPCError::protocol(
return Ok(NetworkResult::invalid_message(
"Safety route hop count should not be zero if there are more hops",
));
}
if route_hop.next_hop.is_none() {
return Err(RPCError::protocol("Safety route hop must have next hop"));
return Ok(NetworkResult::invalid_message(
"Safety route hop must have next hop",
));
}
// Get next hop node ref
let next_hop_nr = match route_hop.node {
RouteNode::NodeId(id) => {
//
self.routing_table
.lookup_node_ref(id.key)
.ok_or_else(|| RPCError::network(format!("node hop {} not found", id.key)))
let Some(nr) = self.routing_table.lookup_node_ref(id.key) else {
return Ok(NetworkResult::invalid_message(format!("node hop {} not found", id.key)));
};
nr
}
RouteNode::PeerInfo(pi) => {
//
self.routing_table
let Some(nr) = self.routing_table
.register_node_with_signed_node_info(
RoutingDomain::PublicInternet,
pi.node_id.key,
pi.signed_node_info,
false,
)
.ok_or_else(|| {
RPCError::network(format!(
) else
{
return Ok(NetworkResult::invalid_message(format!(
"node hop {} could not be registered",
pi.node_id.key
))
})
)));
};
nr
}
}?;
};
// Pass along the route
let next_hop_route = RPCOperationRoute {
@ -62,13 +64,8 @@ impl RPCProcessor {
let next_hop_route_stmt = RPCStatement::new(RPCStatementDetail::Route(next_hop_route));
// Send the next route statement
network_result_value_or_log!(debug
self.statement(Destination::direct(next_hop_nr), next_hop_route_stmt)
.await? => {
return Err(RPCError::network("unable to send route statement for next safety route hop"));
}
);
Ok(())
self.statement(Destination::direct(next_hop_nr), next_hop_route_stmt)
.await
}
#[instrument(level = "trace", skip_all, err)]
@ -124,14 +121,8 @@ impl RPCProcessor {
let next_hop_route_stmt = RPCStatement::new(RPCStatementDetail::Route(next_hop_route));
// Send the next route statement
network_result_value_or_log!(debug
self.statement(Destination::direct(next_hop_nr), next_hop_route_stmt)
.await? => {
return Err(RPCError::network("unable to send route statement for private route hop"));
}
);
Ok(())
self.statement(Destination::direct(next_hop_nr), next_hop_route_stmt)
.await
}
/// Process a routed operation that came in over a safety route but no private route
@ -164,21 +155,23 @@ impl RPCProcessor {
.crypto
.cached_dh(&safety_route.public_key, &node_id_secret)
.map_err(RPCError::protocol)?;
let body = Crypto::decrypt_aead(
let body = match Crypto::decrypt_aead(
&routed_operation.data,
&routed_operation.nonce,
&dh_secret,
None,
)
.map_err(RPCError::map_internal(
"decryption of routed operation failed",
))?;
) {
Ok(v) => v,
Err(e) => {
return Ok(NetworkResult::invalid_message(format!("decryption of routed operation failed: {}", e)));
}
};
// Pass message to RPC system
self.enqueue_safety_routed_message(sequencing, body)
.map_err(RPCError::internal)?;
Ok(())
Ok(NetworkResult::value(()))
}
/// Process a routed operation that came in over both a safety route and a private route
@ -195,7 +188,7 @@ impl RPCProcessor {
// Look up the private route and ensure it's one in our spec store
let rss = self.routing_table.route_spec_store();
let (secret_key, safety_spec) = rss
let Some((secret_key, safety_spec)) = rss
.validate_signatures(
&private_route.public_key,
&routed_operation.signatures,
@ -203,7 +196,9 @@ impl RPCProcessor {
sender_id,
)
.map_err(RPCError::protocol)?
.ok_or_else(|| RPCError::protocol("signatures did not validate for private route"))?;
else {
return Ok(NetworkResult::invalid_message("signatures did not validate for private route"));
};
// Now that things are valid, decrypt the routed operation with DEC(nonce, DH(the SR's public key, the PR's (or node's) secret)
// xxx: punish nodes that send messages that fail to decrypt eventually. How to do this for private routes?
@ -225,7 +220,7 @@ impl RPCProcessor {
self.enqueue_private_routed_message(private_route.public_key, safety_spec, body)
.map_err(RPCError::internal)?;
Ok(())
Ok(NetworkResult::value(()))
}
#[instrument(level = "trace", skip_all, err)]
@ -238,12 +233,12 @@ impl RPCProcessor {
) -> Result<NetworkResult<()>, RPCError> {
// Make sure hop count makes sense
if safety_route.hop_count != 0 {
return Err(RPCError::protocol(
return Ok(NetworkResult::invalid_message(
"Safety hop count should be zero if switched to private route",
));
}
if private_route.hop_count != 0 {
return Err(RPCError::protocol(
return Ok(NetworkResult::invalid_message(
"Private route hop count should be zero if we are at the end",
));
}
@ -271,7 +266,7 @@ impl RPCProcessor {
private_route: &PrivateRoute,
) -> Result<NetworkResult<()>, RPCError> {
let PrivateRouteHops::FirstHop(pr_first_hop) = &private_route.hops else {
return Err(RPCError::protocol("switching from safety route to private route requires first hop"));
return Ok(NetworkResult::invalid_message("switching from safety route to private route requires first hop"));
};
// Switching to private route from safety route
@ -355,12 +350,12 @@ impl RPCProcessor {
};
// Switching from full safety route to private route first hop
self.process_private_route_first_hop(
network_result_try!(self.process_private_route_first_hop(
route.operation,
route.safety_route.public_key,
&private_route,
)
.await?;
.await?);
} else if dec_blob_tag == 0 {
// RouteHop
let route_hop = {
@ -371,8 +366,8 @@ impl RPCProcessor {
};
// Continue the full safety route with another hop
self.process_route_safety_route_hop(route, route_hop)
.await?;
network_result_try!(self.process_route_safety_route_hop(route, route_hop)
.await?);
} else {
return Ok(NetworkResult::invalid_message("invalid blob tag"));
}
@ -383,12 +378,12 @@ impl RPCProcessor {
match &private_route.hops {
PrivateRouteHops::FirstHop(_) => {
// Safety route was a stub, start with the beginning of the private route
self.process_private_route_first_hop(
network_result_try!(self.process_private_route_first_hop(
route.operation,
route.safety_route.public_key,
private_route,
)
.await?;
.await?);
}
PrivateRouteHops::Data(route_hop_data) => {
// Decrypt the blob with DEC(nonce, DH(the PR's public key, this hop's secret)
@ -397,13 +392,17 @@ impl RPCProcessor {
.crypto
.cached_dh(&private_route.public_key, &node_id_secret)
.map_err(RPCError::protocol)?;
let dec_blob_data = Crypto::decrypt_aead(
let dec_blob_data = match Crypto::decrypt_aead(
&route_hop_data.blob,
&route_hop_data.nonce,
&dh_secret,
None,
)
.map_err(RPCError::protocol)?;
) {
Ok(v) => v,
Err(e) => {
return Ok(NetworkResult::invalid_message(format!("unable to decrypt private route hop data: {}", e)));
}
};
let dec_blob_reader = RPCMessageData::new(dec_blob_data).get_reader()?;
// Decode next RouteHop
@ -432,7 +431,7 @@ impl RPCProcessor {
}
// Make next PrivateRoute and pass it on
self.process_route_private_route_hop(
network_result_try!(self.process_route_private_route_hop(
route.operation,
route_hop.node,
route.safety_route.public_key,
@ -445,7 +444,7 @@ impl RPCProcessor {
.unwrap_or(PrivateRouteHops::Empty),
},
)
.await?;
.await?);
}
PrivateRouteHops::Empty => {
// Ensure hop count == 0
@ -456,12 +455,12 @@ impl RPCProcessor {
}
// No hops left, time to process the routed operation
self.process_routed_operation(
network_result_try!(self.process_routed_operation(
detail,
route.operation,
&route.safety_route,
private_route,
)?;
)?);
}
}
}

View File

@ -1,7 +1,7 @@
use super::*;
impl RPCProcessor {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)]
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_set_value_q(
&self,
msg: RPCMessage,

View File

@ -26,9 +26,7 @@ impl RPCProcessor {
let statement = RPCStatement::new(RPCStatementDetail::Signal(signal));
// Send the signal request
network_result_try!(self.statement(dest, statement).await?);
Ok(NetworkResult::value(()))
self.statement(dest, statement).await
}
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
@ -56,11 +54,9 @@ impl RPCProcessor {
// Handle it
let network_manager = self.network_manager();
let res = network_manager
network_manager
.handle_signal(signal.signal_info)
.await
.map_err(RPCError::network)?;
Ok(res)
.map_err(RPCError::network)
}
}

View File

@ -6,7 +6,6 @@ impl RPCProcessor {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// tracing::Span::current().record("res", &tracing::field::display(res));
Err(RPCError::unimplemented("process_start_tunnel_q"))
}
}

View File

@ -179,7 +179,7 @@ impl RPCProcessor {
Ok(NetworkResult::value(Answer::new(latency, opt_sender_info)))
}
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), ret, err)]
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_status_q(
&self,
msg: RPCMessage,
@ -250,10 +250,7 @@ impl RPCProcessor {
};
// Send status answer
let res = self
.answer(msg, RPCAnswer::new(RPCAnswerDetail::StatusA(status_a)))
.await?;
tracing::Span::current().record("res", &tracing::field::display(res));
Ok(res)
self.answer(msg, RPCAnswer::new(RPCAnswerDetail::StatusA(status_a)))
.await
}
}

View File

@ -6,8 +6,6 @@ impl RPCProcessor {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// tracing::Span::current().record("res", &tracing::field::display(res));
Err(RPCError::unimplemented("process_supply_block_q"))
}
}

View File

@ -144,7 +144,7 @@ impl RPCProcessor {
// Send the validate_dial_info request
// This can only be sent directly, as relays can not validate dial info
let res = network_result_value_or_log!(debug self.statement(Destination::direct(peer), statement)
network_result_value_or_log!(debug self.statement(Destination::direct(peer), statement)
.await? => {
continue;
}

View File

@ -6,8 +6,6 @@ impl RPCProcessor {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// tracing::Span::current().record("res", &tracing::field::display(res));
Err(RPCError::unimplemented("process_value_changed"))
}
}

View File

@ -6,7 +6,6 @@ impl RPCProcessor {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// tracing::Span::current().record("res", &tracing::field::display(res));
Err(RPCError::unimplemented("process_watch_value_q"))
}
}

View File

@ -303,24 +303,73 @@ macro_rules! network_result_try {
};
}
#[macro_export]
macro_rules! log_network_result {
($text:expr) => {
cfg_if::cfg_if! {
if #[cfg(debug_assertions)] {
info!(target: "network_result", "{}", $text)
} else {
debug!(target: "network_result", "{}", $text)
}
}
};
($fmt:literal, $($arg:expr),+) => {
cfg_if::cfg_if! {
if #[cfg(debug_assertions)] {
info!(target: "network_result", $fmt, $($arg),+);
} else {
debug!(target: "network_result", $fmt, $($arg),+);
}
}
};
}
#[macro_export]
macro_rules! network_result_value_or_log {
($level: ident $r: expr => $f:tt) => {
match $r {
NetworkResult::Timeout => {
log_net!($level "{} at {}@{}:{}", "Timeout".green(), file!(), line!(), column!());
log_network_result!(
"{} at {}@{}:{}",
"Timeout".cyan(),
file!(),
line!(),
column!()
);
$f
}
NetworkResult::NoConnection(e) => {
log_net!($level "{}({}) at {}@{}:{}", "No connection".green(), e.to_string(), file!(), line!(), column!());
log_network_result!(
"{}({}) at {}@{}:{}",
"No connection".cyan(),
e.to_string(),
file!(),
line!(),
column!()
);
$f
}
NetworkResult::AlreadyExists(e) => {
log_net!($level "{}({}) at {}@{}:{}", "Already exists".green(), e.to_string(), file!(), line!(), column!());
log_network_result!(
"{}({}) at {}@{}:{}",
"Already exists".cyan(),
e.to_string(),
file!(),
line!(),
column!()
);
$f
}
NetworkResult::InvalidMessage(s) => {
log_net!($level "{}({}) at {}@{}:{}", "Invalid message".green(), s, file!(), line!(), column!());
log_network_result!(
"{}({}) at {}@{}:{}",
"Invalid message".cyan(),
s,
file!(),
line!(),
column!()
);
$f
}
NetworkResult::Value(v) => v,