fix status response

This commit is contained in:
John Smith 2022-08-25 19:21:50 -04:00
parent 317db3cf44
commit 1bd22bf6ba
3 changed files with 41 additions and 13 deletions

View File

@ -1383,25 +1383,28 @@ impl NetworkManager {
async fn on_recv_envelope( async fn on_recv_envelope(
&self, &self,
data: &[u8], data: &[u8],
descriptor: ConnectionDescriptor, connection_descriptor: ConnectionDescriptor,
) -> EyreResult<bool> { ) -> EyreResult<bool> {
let root = span!( let root = span!(
parent: None, parent: None,
Level::TRACE, Level::TRACE,
"on_recv_envelope", "on_recv_envelope",
"data.len" = data.len(), "data.len" = data.len(),
"descriptor" = ?descriptor "descriptor" = ?connection_descriptor
); );
let _root_enter = root.enter(); let _root_enter = root.enter();
log_net!( log_net!(
"envelope of {} bytes received from {:?}", "envelope of {} bytes received from {:?}",
data.len(), data.len(),
descriptor connection_descriptor
); );
// Network accounting // Network accounting
self.stats_packet_rcvd(descriptor.remote_address().to_ip_addr(), data.len() as u64); self.stats_packet_rcvd(
connection_descriptor.remote_address().to_ip_addr(),
data.len() as u64,
);
// If this is a zero length packet, just drop it, because these are used for hole punching // If this is a zero length packet, just drop it, because these are used for hole punching
// and possibly other low-level network connectivity tasks and will never require // and possibly other low-level network connectivity tasks and will never require
@ -1418,7 +1421,7 @@ impl NetworkManager {
// Is this a direct bootstrap request instead of an envelope? // Is this a direct bootstrap request instead of an envelope?
if data[0..4] == *BOOT_MAGIC { if data[0..4] == *BOOT_MAGIC {
network_result_value_or_log!(debug self.handle_boot_request(descriptor).await? => {}); network_result_value_or_log!(debug self.handle_boot_request(connection_descriptor).await? => {});
return Ok(true); return Ok(true);
} }
@ -1528,7 +1531,7 @@ impl NetworkManager {
// Cache the envelope information in the routing table // Cache the envelope information in the routing table
let source_noderef = match routing_table.register_node_with_existing_connection( let source_noderef = match routing_table.register_node_with_existing_connection(
envelope.get_sender_id(), envelope.get_sender_id(),
descriptor, connection_descriptor,
ts, ts,
) { ) {
None => { None => {
@ -1543,7 +1546,7 @@ impl NetworkManager {
// xxx: deal with spoofing and flooding here? // xxx: deal with spoofing and flooding here?
// Pass message to RPC system // Pass message to RPC system
rpc.enqueue_message(envelope, body, source_noderef)?; rpc.enqueue_message(envelope, body, source_noderef, connection_descriptor)?;
// Inform caller that we dealt with the envelope locally // Inform caller that we dealt with the envelope locally
Ok(true) Ok(true)
@ -1756,7 +1759,13 @@ impl NetworkManager {
routing_table.clear_dial_info_details(RoutingDomain::PublicInternet); routing_table.clear_dial_info_details(RoutingDomain::PublicInternet);
net.reset_network_class(); net.reset_network_class();
} else { } else {
let inner = self.inner.lock();
warn!("Public address may have changed. Restarting the server may be required."); warn!("Public address may have changed. Restarting the server may be required.");
info!("report_global_socket_address\nsocket_address: {:#?}\nconnection_descriptor: {:#?}\nreporting_peer: {:#?}", socket_address, connection_descriptor, reporting_peer);
info!(
"public_address_check_cache: {:#?}",
inner.public_address_check_cache
);
} }
} }
} }

View File

@ -36,11 +36,15 @@ use stop_token::future::FutureExt;
type OperationId = u64; type OperationId = u64;
/// Where to send an RPC message
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum Destination { pub enum Destination {
Direct(NodeRef), // Send to node (target noderef) /// Send to node (target noderef)
Relay(NodeRef, DHTKey), // Send to node for relay purposes (relay noderef, target nodeid) Direct(NodeRef),
PrivateRoute(PrivateRoute), // Send to private route (privateroute) /// Send to node for relay purposes (relay noderef, target nodeid)
Relay(NodeRef, DHTKey),
/// Send to private route (privateroute)
PrivateRoute(PrivateRoute),
} }
impl fmt::Display for Destination { impl fmt::Display for Destination {
@ -59,12 +63,19 @@ impl fmt::Display for Destination {
} }
} }
/// The decoded header of an RPC message
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct RPCMessageHeader { struct RPCMessageHeader {
timestamp: u64, // time the message was received, not sent /// Time the message was received, not sent
timestamp: u64,
/// The decoded header of the envelope
envelope: Envelope, envelope: Envelope,
/// The length in bytes of the rpc message body
body_len: u64, body_len: u64,
peer_noderef: NodeRef, // ensures node doesn't get evicted from routing table until we're done with it /// The noderef of the peer that sent the message (not the original sender). Ensures node doesn't get evicted from routing table until we're done with it
peer_noderef: NodeRef,
/// The connection from the peer sent the message (not the original sender)
connection_descriptor: ConnectionDescriptor,
} }
#[derive(Debug)] #[derive(Debug)]
@ -993,6 +1004,7 @@ impl RPCProcessor {
envelope: Envelope, envelope: Envelope,
body: Vec<u8>, body: Vec<u8>,
peer_noderef: NodeRef, peer_noderef: NodeRef,
connection_descriptor: ConnectionDescriptor,
) -> EyreResult<()> { ) -> EyreResult<()> {
let msg = RPCMessageEncoded { let msg = RPCMessageEncoded {
header: RPCMessageHeader { header: RPCMessageHeader {
@ -1000,6 +1012,7 @@ impl RPCProcessor {
envelope, envelope,
body_len: body.len() as u64, body_len: body.len() as u64,
peer_noderef, peer_noderef,
connection_descriptor,
}, },
data: RPCMessageData { contents: body }, data: RPCMessageData { contents: body },
}; };

View File

@ -85,6 +85,7 @@ impl RPCProcessor {
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)] #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id, res), err)]
pub(crate) async fn process_status_q(&self, msg: RPCMessage) -> Result<(), RPCError> { pub(crate) async fn process_status_q(&self, msg: RPCMessage) -> Result<(), RPCError> {
let peer_noderef = msg.header.peer_noderef.clone(); let peer_noderef = msg.header.peer_noderef.clone();
let connection_descriptor = msg.header.connection_descriptor;
// Get the question // Get the question
let status_q = match msg.operation.kind() { let status_q = match msg.operation.kind() {
@ -105,7 +106,12 @@ impl RPCProcessor {
// Make status answer // Make status answer
let node_status = self.network_manager().generate_node_status(); let node_status = self.network_manager().generate_node_status();
let sender_info = Self::generate_sender_info(peer_noderef).await; // Filter the noderef down to the protocol used by the incoming connection
let filtered_peer_noderef =
peer_noderef.filtered_clone(connection_descriptor.make_dial_info_filter());
// Get the peer address in the returned sender info
let sender_info = Self::generate_sender_info(filtered_peer_noderef).await;
let status_a = RPCOperationStatusA { let status_a = RPCOperationStatusA {
node_status, node_status,
sender_info, sender_info,