commit with debug code

This commit is contained in:
John Smith 2022-08-20 17:08:48 -04:00
parent 126bf28070
commit 53ae04aff9
3 changed files with 93 additions and 65 deletions

View File

@ -125,6 +125,7 @@ pub(crate) enum ContactMethod {
pub enum SendDataKind { pub enum SendDataKind {
Direct(ConnectionDescriptor), Direct(ConnectionDescriptor),
Indirect, Indirect,
Existing(ConnectionDescriptor),
} }
#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)] #[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)]
@ -1260,7 +1261,7 @@ impl NetworkManager {
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
node_ref.set_last_connection(connection_descriptor, intf::get_timestamp()); node_ref.set_last_connection(connection_descriptor, intf::get_timestamp());
return Ok(NetworkResult::value(SendDataKind::Direct( return Ok(NetworkResult::value(SendDataKind::Existing(
connection_descriptor, connection_descriptor,
))); )));
} }
@ -1411,7 +1412,7 @@ impl NetworkManager {
// Ensure we can read the magic number // Ensure we can read the magic number
if data.len() < 4 { if data.len() < 4 {
log_net!(debug "short packet".green()); log_net!(debug "short packet");
return Ok(false); return Ok(false);
} }
@ -1428,7 +1429,13 @@ impl NetworkManager {
} }
// Decode envelope header (may fail signature validation) // Decode envelope header (may fail signature validation)
let envelope = Envelope::from_signed_data(data).wrap_err("envelope failed to decode")?; let envelope = match Envelope::from_signed_data(data) {
Ok(v) => v,
Err(e) => {
log_net!(debug "envelope failed to decode: {}", e);
return Ok(false);
}
};
// Get routing table and rpc processor // Get routing table and rpc processor
let (routing_table, rpc) = { let (routing_table, rpc) = {
@ -1453,18 +1460,20 @@ impl NetworkManager {
let ets = envelope.get_timestamp(); let ets = envelope.get_timestamp();
if let Some(tsbehind) = tsbehind { if let Some(tsbehind) = tsbehind {
if tsbehind > 0 && (ts > ets && ts - ets > tsbehind) { if tsbehind > 0 && (ts > ets && ts - ets > tsbehind) {
bail!( log_net!(debug
"envelope time was too far in the past: {}ms ", "envelope time was too far in the past: {}ms ",
timestamp_to_secs(ts - ets) * 1000f64 timestamp_to_secs(ts - ets) * 1000f64
); );
return Ok(false);
} }
} }
if let Some(tsahead) = tsahead { if let Some(tsahead) = tsahead {
if tsahead > 0 && (ts < ets && ets - ts > tsahead) { if tsahead > 0 && (ts < ets && ets - ts > tsahead) {
bail!( log_net!(debug
"envelope time was too far in the future: {}ms", "envelope time was too far in the future: {}ms",
timestamp_to_secs(ets - ts) * 1000f64 timestamp_to_secs(ets - ts) * 1000f64
); );
return Ok(false);
} }
} }
@ -1633,6 +1642,9 @@ impl NetworkManager {
connection_descriptor: ConnectionDescriptor, // the connection descriptor used connection_descriptor: ConnectionDescriptor, // the connection descriptor used
reporting_peer: NodeRef, // the peer's noderef reporting the socket address reporting_peer: NodeRef, // the peer's noderef reporting the socket address
) { ) {
// xxx debug code
info!("report_global_socket_address\nsocket_address: {:#?}\nconnection_descriptor: {:#?}\nreporting_peer: {:#?}", socket_address, connection_descriptor, reporting_peer);
let key = PublicAddressCheckCacheKey( let key = PublicAddressCheckCacheKey(
connection_descriptor.protocol_type(), connection_descriptor.protocol_type(),
connection_descriptor.address_type(), connection_descriptor.address_type(),
@ -1659,71 +1671,80 @@ impl NetworkManager {
let network_class = net.get_network_class().unwrap_or(NetworkClass::Invalid); let network_class = net.get_network_class().unwrap_or(NetworkClass::Invalid);
// Determine if our external address has likely changed // Determine if our external address has likely changed
let needs_public_address_detection = let needs_public_address_detection = if matches!(
if matches!(network_class, NetworkClass::InboundCapable) { network_class,
// Get the dial info filter for this connection so we can check if we have any public dialinfo that may have changed NetworkClass::InboundCapable
let dial_info_filter = connection_descriptor.make_dial_info_filter(); ) {
// Get the dial info filter for this connection so we can check if we have any public dialinfo that may have changed
let dial_info_filter = connection_descriptor.make_dial_info_filter();
// Get current external ip/port from registered global dialinfo // Get current external ip/port from registered global dialinfo
let current_addresses: BTreeSet<SocketAddress> = routing_table let current_addresses: BTreeSet<SocketAddress> = routing_table
.all_filtered_dial_info_details( .all_filtered_dial_info_details(
Some(RoutingDomain::PublicInternet), Some(RoutingDomain::PublicInternet),
&dial_info_filter, &dial_info_filter,
) )
.iter() .iter()
.map(|did| did.dial_info.socket_address()) .map(|did| did.dial_info.socket_address())
.collect(); .collect();
// If we are inbound capable, but start to see inconsistent socket addresses from multiple reporting peers // If we are inbound capable, but start to see inconsistent socket addresses from multiple reporting peers
// then we zap the network class and re-detect it // then we zap the network class and re-detect it
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
let mut inconsistencies = 0; let mut inconsistencies = 0;
let mut changed = false; let mut changed = false;
// Iteration goes from most recent to least recent node/address pair // Iteration goes from most recent to least recent node/address pair
let pacc = inner let pacc = inner
.public_address_check_cache .public_address_check_cache
.entry(key) .entry(key)
.or_insert_with(|| LruCache::new(8)); .or_insert_with(|| LruCache::new(8));
for (_, a) in pacc { for (_, a) in pacc {
if !current_addresses.contains(a) { if !current_addresses.contains(a) {
inconsistencies += 1; inconsistencies += 1;
if inconsistencies >= GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT { if inconsistencies >= GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT {
changed = true; changed = true;
break;
}
}
}
// xxx debug code
if changed {
info!("XXX\npublic_address_check_cache: {:#?}\ncurrent_addresses: {:#?}\ninconsistencies: {}", inner
.public_address_check_cache, current_addresses, inconsistencies);
}
changed
} else {
// If we are currently outbound only, we don't have any public dial info
// but if we are starting to see consistent socket address from multiple reporting peers
// then we may be become inbound capable, so zap the network class so we can re-detect it and any public dial info
let mut inner = self.inner.lock();
let mut consistencies = 0;
let mut consistent = false;
let mut current_address = Option::<SocketAddress>::None;
// Iteration goes from most recent to least recent node/address pair
let pacc = inner
.public_address_check_cache
.entry(key)
.or_insert_with(|| LruCache::new(8));
for (_, a) in pacc {
if let Some(current_address) = current_address {
if current_address == *a {
consistencies += 1;
if consistencies >= GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT {
consistent = true;
break; break;
} }
} }
} else {
current_address = Some(*a);
} }
changed }
} else { consistent
// If we are currently outbound only, we don't have any public dial info };
// but if we are starting to see consistent socket address from multiple reporting peers
// then we may be become inbound capable, so zap the network class so we can re-detect it and any public dial info
let mut inner = self.inner.lock();
let mut consistencies = 0;
let mut consistent = false;
let mut current_address = Option::<SocketAddress>::None;
// Iteration goes from most recent to least recent node/address pair
let pacc = inner
.public_address_check_cache
.entry(key)
.or_insert_with(|| LruCache::new(8));
for (_, a) in pacc {
if let Some(current_address) = current_address {
if current_address == *a {
consistencies += 1;
if consistencies >= GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT {
consistent = true;
break;
}
}
} else {
current_address = Some(*a);
}
}
consistent
};
if needs_public_address_detection { if needs_public_address_detection {
if detect_address_changes { if detect_address_changes {

View File

@ -88,6 +88,7 @@ struct RPCMessageEncoded {
data: RPCMessageData, data: RPCMessageData,
} }
#[derive(Debug)]
pub(crate) struct RPCMessage { pub(crate) struct RPCMessage {
header: RPCMessageHeader, header: RPCMessageHeader,
operation: RPCOperation, operation: RPCOperation,
@ -336,7 +337,10 @@ impl RPCProcessor {
inner inner
.waiting_rpc_table .waiting_rpc_table
.remove(&op_id) .remove(&op_id)
.ok_or_else(RPCError::else_internal("Unmatched operation id"))? .ok_or_else(RPCError::else_internal(format!(
"Unmatched operation id: {:#?}",
msg
)))?
}; };
eventual.resolve((Span::current().id(), msg)).await; eventual.resolve((Span::current().id(), msg)).await;
Ok(()) Ok(())

View File

@ -70,6 +70,9 @@ impl RPCProcessor {
SendDataKind::Indirect => { SendDataKind::Indirect => {
// Do nothing in this case, as the socket address returned here would be for any node other than ours // Do nothing in this case, as the socket address returned here would be for any node other than ours
} }
SendDataKind::Existing(_) => {
// Do nothing in this case, as an existing connection could not have a different public address or it would have been reset
}
} }
} }