add more error telemetry

This commit is contained in:
Christien Rioux 2023-07-13 14:16:14 -04:00
parent a06f24e287
commit 5977b6a141
4 changed files with 92 additions and 30 deletions

View File

@ -807,17 +807,6 @@ impl NetworkManager {
body: B, body: B,
) -> EyreResult<NetworkResult<SendDataKind>> { ) -> EyreResult<NetworkResult<SendDataKind>> {
let destination_node_ref = destination_node_ref.as_ref().unwrap_or(&node_ref).clone(); let destination_node_ref = destination_node_ref.as_ref().unwrap_or(&node_ref).clone();
if !node_ref.same_entry(&destination_node_ref) {
log_net!(
"sending envelope to {:?} via {:?}",
destination_node_ref,
node_ref
);
} else {
log_net!("sending envelope to {:?}", node_ref);
}
let best_node_id = destination_node_ref.best_node_id(); let best_node_id = destination_node_ref.best_node_id();
// Get node's envelope versions and see if we can send to it // Get node's envelope versions and see if we can send to it
@ -832,6 +821,17 @@ impl NetworkManager {
// Build the envelope to send // Build the envelope to send
let out = self.build_envelope(best_node_id, envelope_version, body)?; let out = self.build_envelope(best_node_id, envelope_version, body)?;
if !node_ref.same_entry(&destination_node_ref) {
log_net!(
"sending envelope to {:?} via {:?}, len={}",
destination_node_ref,
node_ref,
out.len()
);
} else {
log_net!("sending envelope to {:?}, len={}", node_ref, out.len());
}
// Send the envelope via whatever means necessary // Send the envelope via whatever means necessary
self.send_data(node_ref, out).await self.send_data(node_ref, out).await
} }

View File

@ -31,8 +31,24 @@ impl RawUdpProtocolHandler {
} }
// Insert into assembly buffer // Insert into assembly buffer
let Some(message) = self.assembly_buffer.insert_frame(&data[0..size], remote_addr) else { let message = match self
continue; .assembly_buffer
.insert_frame(&data[0..size], remote_addr)
{
NetworkResult::Value(Some(v)) => v,
NetworkResult::Value(None) => {
continue;
}
nres => {
#[cfg(feature = "network-result-extra")]
log_network_result!(
"UDP::recv_message insert_frame failed: {:?} <= size={} remote_addr={}",
nres,
size,
remote_addr
);
continue;
}
}; };
// Check length of reassembled message (same for all protocols) // Check length of reassembled message (same for all protocols)

View File

@ -234,23 +234,39 @@ impl AssemblyBuffer {
/// Receive a packet chunk and add to the message assembly /// Receive a packet chunk and add to the message assembly
/// if a message has been completely, return it /// if a message has been completely, return it
pub fn insert_frame(&self, frame: &[u8], remote_addr: SocketAddr) -> Option<Vec<u8>> { pub fn insert_frame(
&self,
frame: &[u8],
remote_addr: SocketAddr,
) -> NetworkResult<Option<Vec<u8>>> {
// If we receive a zero length frame, send it // If we receive a zero length frame, send it
if frame.len() == 0 { if frame.len() == 0 {
return Some(frame.to_vec()); return NetworkResult::value(Some(frame.to_vec()));
} }
// If we receive a frame smaller than or equal to the length of the header, drop it // If we receive a frame smaller than or equal to the length of the header, drop it
// or if this frame is larger than our max message length, then drop it // or if this frame is larger than our max message length, then drop it
if frame.len() <= HEADER_LEN || frame.len() > MAX_LEN { if frame.len() <= HEADER_LEN || frame.len() > MAX_LEN {
return None; #[cfg(feature = "network-result-extra")]
return NetworkResult::invalid_message(format!(
"invalid header length: frame.len={}",
frame.len()
));
#[cfg(not(feature = "network-result-extra"))]
return NetworkResult::invalid_message("invalid header length");
} }
// --- Decode the header // --- Decode the header
// Drop versions we don't understand // Drop versions we don't understand
if frame[0] != VERSION_1 { if frame[0] != VERSION_1 {
return None; #[cfg(feature = "network-result-extra")]
return NetworkResult::invalid_message(format!(
"invalid frame version: frame[0]={}",
frame[0]
));
#[cfg(not(feature = "network-result-extra"))]
return NetworkResult::invalid_message("invalid frame version");
} }
// Version 1 header // Version 1 header
let seq = SequenceType::from_be_bytes(frame[2..4].try_into().unwrap()); let seq = SequenceType::from_be_bytes(frame[2..4].try_into().unwrap());
@ -260,16 +276,30 @@ impl AssemblyBuffer {
// See if we have a whole message and not a fragment // See if we have a whole message and not a fragment
if off == 0 && len as usize == chunk.len() { if off == 0 && len as usize == chunk.len() {
return Some(chunk.to_vec()); return NetworkResult::value(Some(chunk.to_vec()));
} }
// Drop fragments with offsets greater than or equal to the message length // Drop fragments with offsets greater than or equal to the message length
if off >= len { if off >= len {
return None; #[cfg(feature = "network-result-extra")]
return NetworkResult::invalid_message(format!(
"offset greater than length: off={} >= len={}",
off, len
));
#[cfg(not(feature = "network-result-extra"))]
return NetworkResult::invalid_message("offset greater than length");
} }
// Drop fragments where the chunk would be applied beyond the message length // Drop fragments where the chunk would be applied beyond the message length
if off as usize + chunk.len() > len as usize { if off as usize + chunk.len() > len as usize {
return None; #[cfg(feature = "network-result-extra")]
return NetworkResult::invalid_message(format!(
"chunk applied beyond message length: off={} + chunk.len={} > len={}",
off,
chunk.len(),
len
));
#[cfg(not(feature = "network-result-extra"))]
return NetworkResult::invalid_message("chunk applied beyond message length");
} }
// Get or create the peer message assemblies // Get or create the peer message assemblies
@ -291,19 +321,18 @@ impl AssemblyBuffer {
e.remove(); e.remove();
} }
} }
NetworkResult::value(out)
out
} }
std::collections::hash_map::Entry::Vacant(v) => { std::collections::hash_map::Entry::Vacant(v) => {
// See if we have room for one more // See if we have room for one more
if peer_count == MAX_CONCURRENT_HOSTS { if peer_count == MAX_CONCURRENT_HOSTS {
return None; return NetworkResult::value(None);
} }
// Add the peer // Add the peer
let peer_messages = v.insert(PeerMessages::new()); let peer_messages = v.insert(PeerMessages::new());
// Insert the fragment and see what comes out // Insert the fragment and see what comes out
peer_messages.insert_fragment(seq, off, len, chunk) NetworkResult::value(peer_messages.insert_fragment(seq, off, len, chunk))
} }
} }
} }

View File

@ -52,7 +52,9 @@ pub async fn test_single_out_in() {
// Send to input // Send to input
let r_message = assbuf_in let r_message = assbuf_in
.insert_frame(&frame, r_remote_addr) .insert_frame(&frame, r_remote_addr)
.expect("should get one out"); .into_result()
.expect("should get a value")
.expect("should get something out");
// We should have gotten the same message // We should have gotten the same message
assert_eq!(r_message, message); assert_eq!(r_message, message);
@ -110,7 +112,10 @@ pub async fn test_one_frag_out_in() {
while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await { while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await {
// Send to input // Send to input
let r_message = assbuf_in.insert_frame(&frame, r_remote_addr); let r_message = assbuf_in
.insert_frame(&frame, r_remote_addr)
.into_result()
.expect("should get a value");
// We should have gotten the same message // We should have gotten the same message
if let Some(r_message) = r_message { if let Some(r_message) = r_message {
@ -172,7 +177,10 @@ pub async fn test_many_frags_out_in() {
while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await { while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await {
// Send to input // Send to input
let r_message = assbuf_in.insert_frame(&frame, r_remote_addr); let r_message = assbuf_in
.insert_frame(&frame, r_remote_addr)
.into_result()
.expect("should get a value");
// We should have gotten the same message // We should have gotten the same message
if let Some(r_message) = r_message { if let Some(r_message) = r_message {
@ -234,7 +242,10 @@ pub async fn test_many_frags_out_in_single_host() {
while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await { while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await {
// Send to input // Send to input
let r_message = assbuf_in.insert_frame(&frame, r_remote_addr); let r_message = assbuf_in
.insert_frame(&frame, r_remote_addr)
.into_result()
.expect("should get a value");
// We should have gotten the same message // We should have gotten the same message
if let Some(r_message) = r_message { if let Some(r_message) = r_message {
@ -309,7 +320,10 @@ pub async fn test_many_frags_with_drops() {
while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await { while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await {
// Send to input // Send to input
let r_message = assbuf_in.insert_frame(&frame, r_remote_addr); let r_message = assbuf_in
.insert_frame(&frame, r_remote_addr)
.into_result()
.expect("should get a value");
// We should have gotten the same message // We should have gotten the same message
if let Some(r_message) = r_message { if let Some(r_message) = r_message {
@ -383,7 +397,10 @@ pub async fn test_many_frags_reordered() {
while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await { while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await {
// Send to input // Send to input
let r_message = assbuf_in.insert_frame(&frame, r_remote_addr); let r_message = assbuf_in
.insert_frame(&frame, r_remote_addr)
.into_result()
.expect("should get a value");
// We should have gotten the same message // We should have gotten the same message
if let Some(r_message) = r_message { if let Some(r_message) = r_message {