diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 18cacbf6..4939baf8 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -807,17 +807,6 @@ impl NetworkManager { body: B, ) -> EyreResult> { let destination_node_ref = destination_node_ref.as_ref().unwrap_or(&node_ref).clone(); - - if !node_ref.same_entry(&destination_node_ref) { - log_net!( - "sending envelope to {:?} via {:?}", - destination_node_ref, - node_ref - ); - } else { - log_net!("sending envelope to {:?}", node_ref); - } - let best_node_id = destination_node_ref.best_node_id(); // Get node's envelope versions and see if we can send to it @@ -832,6 +821,17 @@ impl NetworkManager { // Build the envelope to send let out = self.build_envelope(best_node_id, envelope_version, body)?; + if !node_ref.same_entry(&destination_node_ref) { + log_net!( + "sending envelope to {:?} via {:?}, len={}", + destination_node_ref, + node_ref, + out.len() + ); + } else { + log_net!("sending envelope to {:?}, len={}", node_ref, out.len()); + } + // Send the envelope via whatever means necessary self.send_data(node_ref, out).await } diff --git a/veilid-core/src/network_manager/native/protocol/udp.rs b/veilid-core/src/network_manager/native/protocol/udp.rs index feab4092..824c0532 100644 --- a/veilid-core/src/network_manager/native/protocol/udp.rs +++ b/veilid-core/src/network_manager/native/protocol/udp.rs @@ -31,8 +31,24 @@ impl RawUdpProtocolHandler { } // Insert into assembly buffer - let Some(message) = self.assembly_buffer.insert_frame(&data[0..size], remote_addr) else { - continue; + let message = match self + .assembly_buffer + .insert_frame(&data[0..size], remote_addr) + { + NetworkResult::Value(Some(v)) => v, + NetworkResult::Value(None) => { + continue; + } + nres => { + #[cfg(feature = "network-result-extra")] + log_network_result!( + "UDP::recv_message insert_frame failed: {:?} <= size={} remote_addr={}", + nres, + size, + remote_addr + ); + continue; + } }; // Check length of reassembled message (same for all protocols) diff --git a/veilid-tools/src/assembly_buffer.rs b/veilid-tools/src/assembly_buffer.rs index d60e7108..3f3bd948 100644 --- a/veilid-tools/src/assembly_buffer.rs +++ b/veilid-tools/src/assembly_buffer.rs @@ -234,23 +234,39 @@ impl AssemblyBuffer { /// Receive a packet chunk and add to the message assembly /// if a message has been completely, return it - pub fn insert_frame(&self, frame: &[u8], remote_addr: SocketAddr) -> Option> { + pub fn insert_frame( + &self, + frame: &[u8], + remote_addr: SocketAddr, + ) -> NetworkResult>> { // If we receive a zero length frame, send it if frame.len() == 0 { - return Some(frame.to_vec()); + return NetworkResult::value(Some(frame.to_vec())); } // If we receive a frame smaller than or equal to the length of the header, drop it // or if this frame is larger than our max message length, then drop it if frame.len() <= HEADER_LEN || frame.len() > MAX_LEN { - return None; + #[cfg(feature = "network-result-extra")] + return NetworkResult::invalid_message(format!( + "invalid header length: frame.len={}", + frame.len() + )); + #[cfg(not(feature = "network-result-extra"))] + return NetworkResult::invalid_message("invalid header length"); } // --- Decode the header // Drop versions we don't understand if frame[0] != VERSION_1 { - return None; + #[cfg(feature = "network-result-extra")] + return NetworkResult::invalid_message(format!( + "invalid frame version: frame[0]={}", + frame[0] + )); + #[cfg(not(feature = "network-result-extra"))] + return NetworkResult::invalid_message("invalid frame version"); } // Version 1 header let seq = SequenceType::from_be_bytes(frame[2..4].try_into().unwrap()); @@ -260,16 +276,30 @@ impl AssemblyBuffer { // See if we have a whole message and not a fragment if off == 0 && len as usize == chunk.len() { - return Some(chunk.to_vec()); + return NetworkResult::value(Some(chunk.to_vec())); } // Drop fragments with offsets greater than or equal to the message length if off >= len { - return None; + #[cfg(feature = "network-result-extra")] + return NetworkResult::invalid_message(format!( + "offset greater than length: off={} >= len={}", + off, len + )); + #[cfg(not(feature = "network-result-extra"))] + return NetworkResult::invalid_message("offset greater than length"); } // Drop fragments where the chunk would be applied beyond the message length if off as usize + chunk.len() > len as usize { - return None; + #[cfg(feature = "network-result-extra")] + return NetworkResult::invalid_message(format!( + "chunk applied beyond message length: off={} + chunk.len={} > len={}", + off, + chunk.len(), + len + )); + #[cfg(not(feature = "network-result-extra"))] + return NetworkResult::invalid_message("chunk applied beyond message length"); } // Get or create the peer message assemblies @@ -291,19 +321,18 @@ impl AssemblyBuffer { e.remove(); } } - - out + NetworkResult::value(out) } std::collections::hash_map::Entry::Vacant(v) => { // See if we have room for one more if peer_count == MAX_CONCURRENT_HOSTS { - return None; + return NetworkResult::value(None); } // Add the peer let peer_messages = v.insert(PeerMessages::new()); // Insert the fragment and see what comes out - peer_messages.insert_fragment(seq, off, len, chunk) + NetworkResult::value(peer_messages.insert_fragment(seq, off, len, chunk)) } } } diff --git a/veilid-tools/src/tests/native/test_assembly_buffer.rs b/veilid-tools/src/tests/native/test_assembly_buffer.rs index 52550dc3..73268516 100644 --- a/veilid-tools/src/tests/native/test_assembly_buffer.rs +++ b/veilid-tools/src/tests/native/test_assembly_buffer.rs @@ -52,7 +52,9 @@ pub async fn test_single_out_in() { // Send to input let r_message = assbuf_in .insert_frame(&frame, r_remote_addr) - .expect("should get one out"); + .into_result() + .expect("should get a value") + .expect("should get something out"); // We should have gotten the same message assert_eq!(r_message, message); @@ -110,7 +112,10 @@ pub async fn test_one_frag_out_in() { while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await { // Send to input - let r_message = assbuf_in.insert_frame(&frame, r_remote_addr); + let r_message = assbuf_in + .insert_frame(&frame, r_remote_addr) + .into_result() + .expect("should get a value"); // We should have gotten the same message if let Some(r_message) = r_message { @@ -172,7 +177,10 @@ pub async fn test_many_frags_out_in() { while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await { // Send to input - let r_message = assbuf_in.insert_frame(&frame, r_remote_addr); + let r_message = assbuf_in + .insert_frame(&frame, r_remote_addr) + .into_result() + .expect("should get a value"); // We should have gotten the same message if let Some(r_message) = r_message { @@ -234,7 +242,10 @@ pub async fn test_many_frags_out_in_single_host() { while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await { // Send to input - let r_message = assbuf_in.insert_frame(&frame, r_remote_addr); + let r_message = assbuf_in + .insert_frame(&frame, r_remote_addr) + .into_result() + .expect("should get a value"); // We should have gotten the same message if let Some(r_message) = r_message { @@ -309,7 +320,10 @@ pub async fn test_many_frags_with_drops() { while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await { // Send to input - let r_message = assbuf_in.insert_frame(&frame, r_remote_addr); + let r_message = assbuf_in + .insert_frame(&frame, r_remote_addr) + .into_result() + .expect("should get a value"); // We should have gotten the same message if let Some(r_message) = r_message { @@ -383,7 +397,10 @@ pub async fn test_many_frags_reordered() { while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await { // Send to input - let r_message = assbuf_in.insert_frame(&frame, r_remote_addr); + let r_message = assbuf_in + .insert_frame(&frame, r_remote_addr) + .into_result() + .expect("should get a value"); // We should have gotten the same message if let Some(r_message) = r_message {