assembly buffer tests

This commit is contained in:
John Smith 2023-06-23 15:55:02 -04:00
parent 296ca5cadb
commit b2bd4bcbbd
3 changed files with 379 additions and 4 deletions

22
.vscode/launch.json vendored
View File

@ -103,6 +103,28 @@
], ],
"cwd": "${workspaceFolder}/target/debug/" "cwd": "${workspaceFolder}/target/debug/"
}, },
{
"type": "lldb",
"request": "launch",
"name": "Debug veilid-tools unit test",
"cargo": {
"args": [
"test",
"--no-run",
"--features=rt-tokio",
"--manifest-path",
"veilid-tools/Cargo.toml"
],
"filter": {
"kind": "cdylib",
"name": "veilid-tools"
}
},
"args": [
"${selectedText}"
],
"cwd": "${workspaceFolder}/target/debug/"
},
{ {
"type": "lldb", "type": "lldb",
"request": "launch", "request": "launch",

View File

@ -282,7 +282,17 @@ impl AssemblyBuffer {
let peer_messages = e.get_mut(); let peer_messages = e.get_mut();
// Insert the fragment and see what comes out // Insert the fragment and see what comes out
peer_messages.insert_fragment(seq, off, len, chunk) let out = peer_messages.insert_fragment(seq, off, len, chunk);
// If we are returning a message, see if there are any more assemblies for this peer
// If not, remove the peer
if out.is_some() {
if peer_messages.assemblies.len() == 0 {
e.remove();
}
}
out
} }
std::collections::hash_map::Entry::Vacant(v) => { std::collections::hash_map::Entry::Vacant(v) => {
// See if we have room for one more // See if we have room for one more
@ -331,10 +341,10 @@ impl AssemblyBuffer {
&self, &self,
data: Vec<u8>, data: Vec<u8>,
remote_addr: SocketAddr, remote_addr: SocketAddr,
sender: S, mut sender: S,
) -> std::io::Result<NetworkResult<()>> ) -> std::io::Result<NetworkResult<()>>
where where
S: Fn(Vec<u8>, SocketAddr) -> F, S: FnMut(Vec<u8>, SocketAddr) -> F,
F: Future<Output = std::io::Result<NetworkResult<()>>>, F: Future<Output = std::io::Result<NetworkResult<()>>>,
{ {
if data.len() > MAX_LEN { if data.len() > MAX_LEN {

View File

@ -1,3 +1,5 @@
use rand::seq::SliceRandom;
use crate::*; use crate::*;
fn random_sockaddr() -> SocketAddr { fn random_sockaddr() -> SocketAddr {
@ -15,6 +17,7 @@ fn random_sockaddr() -> SocketAddr {
} }
pub async fn test_single_out_in() { pub async fn test_single_out_in() {
info!("-- test_single_out_in");
let assbuf_out = AssemblyBuffer::new(); let assbuf_out = AssemblyBuffer::new();
let assbuf_in = AssemblyBuffer::new(); let assbuf_in = AssemblyBuffer::new();
let (net_tx, net_rx) = flume::unbounded(); let (net_tx, net_rx) = flume::unbounded();
@ -30,7 +33,9 @@ pub async fn test_single_out_in() {
}; };
for _ in 0..1000 { for _ in 0..1000 {
let message = vec![1u8; 1000]; let random_len = (get_random_u32() % 1000) as usize;
let mut message = vec![1u8; random_len];
random_bytes(&mut message);
let remote_addr = random_sockaddr(); let remote_addr = random_sockaddr();
// Send single message below fragmentation limit // Send single message below fragmentation limit
@ -58,6 +63,344 @@ pub async fn test_single_out_in() {
assert!(net_rx.is_empty()) assert!(net_rx.is_empty())
} }
pub async fn test_one_frag_out_in() {
info!("-- test_one_frag_out_in");
let assbuf_out = AssemblyBuffer::new();
let assbuf_in = AssemblyBuffer::new();
let (net_tx, net_rx) = flume::unbounded();
let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| {
let net_tx = net_tx.clone();
async move {
net_tx
.send_async((framed_chunk, remote_addr))
.await
.expect("should send");
Ok(NetworkResult::value(()))
}
};
let mut all_sent = HashSet::new();
// Sending
println!("sending");
for _ in 0..10000 {
let random_len = (get_random_u32() % 1000) as usize + 1280;
let mut message = vec![1u8; random_len];
random_bytes(&mut message);
let remote_addr = random_sockaddr();
// Send single message above fragmentation limit
all_sent.insert((message.clone(), remote_addr));
assert!(matches!(
assbuf_out
.split_message(message.clone(), remote_addr, sender)
.await,
Ok(NetworkResult::Value(()))
));
}
println!("all_sent len={}", all_sent.len());
println!("fragments sent = {}", net_rx.len());
drop(net_tx);
// Receiving
println!("receiving");
while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await {
// Send to input
let r_message = assbuf_in.insert_frame(&frame, r_remote_addr);
// We should have gotten the same message
if let Some(r_message) = r_message {
assert!(all_sent.remove(&(r_message, r_remote_addr)));
}
}
println!("all_sent len={}", all_sent.len());
// Shoud have dropped no packets
assert_eq!(all_sent.len(), 0);
}
pub async fn test_many_frags_out_in() {
info!("-- test_many_frags_out_in");
let assbuf_out = AssemblyBuffer::new();
let assbuf_in = AssemblyBuffer::new();
let (net_tx, net_rx) = flume::unbounded();
let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| {
let net_tx = net_tx.clone();
async move {
net_tx
.send_async((framed_chunk, remote_addr))
.await
.expect("should send");
Ok(NetworkResult::value(()))
}
};
let mut all_sent = HashSet::new();
// Sending
let mut total_sent_size = 0usize;
println!("sending");
for _ in 0..1000 {
let random_len = (get_random_u32() % 65536) as usize;
total_sent_size += random_len;
let mut message = vec![1u8; random_len];
random_bytes(&mut message);
let remote_addr = random_sockaddr();
// Send single message
all_sent.insert((message.clone(), remote_addr));
assert!(matches!(
assbuf_out
.split_message(message.clone(), remote_addr, sender)
.await,
Ok(NetworkResult::Value(()))
));
}
println!("all_sent len={}", all_sent.len());
println!("total_sent_size = {}", total_sent_size);
println!("fragments sent = {}", net_rx.len());
drop(net_tx);
// Receiving
println!("receiving");
while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await {
// Send to input
let r_message = assbuf_in.insert_frame(&frame, r_remote_addr);
// We should have gotten the same message
if let Some(r_message) = r_message {
assert!(all_sent.remove(&(r_message, r_remote_addr)));
}
}
println!("all_sent len={}", all_sent.len());
// Shoud have dropped no packets
assert_eq!(all_sent.len(), 0);
}
pub async fn test_many_frags_out_in_single_host() {
info!("-- test_many_frags_out_in_single_host");
let assbuf_out = AssemblyBuffer::new();
let assbuf_in = AssemblyBuffer::new();
let (net_tx, net_rx) = flume::unbounded();
let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| {
let net_tx = net_tx.clone();
async move {
net_tx
.send_async((framed_chunk, remote_addr))
.await
.expect("should send");
Ok(NetworkResult::value(()))
}
};
let mut all_sent = HashSet::new();
// Sending
let mut total_sent_size = 0usize;
println!("sending");
for _ in 0..1000 {
let random_len = (get_random_u32() % 65536) as usize;
total_sent_size += random_len;
let mut message = vec![1u8; random_len];
random_bytes(&mut message);
let remote_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(1, 2, 3, 4), 5678));
// Send single message
all_sent.insert((message.clone(), remote_addr));
assert!(matches!(
assbuf_out
.split_message(message.clone(), remote_addr, sender)
.await,
Ok(NetworkResult::Value(()))
));
}
println!("all_sent len={}", all_sent.len());
println!("total_sent_size = {}", total_sent_size);
println!("fragments sent = {}", net_rx.len());
drop(net_tx);
// Receiving
println!("receiving");
while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await {
// Send to input
let r_message = assbuf_in.insert_frame(&frame, r_remote_addr);
// We should have gotten the same message
if let Some(r_message) = r_message {
assert!(all_sent.remove(&(r_message, r_remote_addr)));
}
}
println!("all_sent len={}", all_sent.len());
// Shoud have dropped no packets
assert_eq!(all_sent.len(), 0);
}
pub async fn test_many_frags_with_drops() {
info!("-- test_many_frags_with_drops");
let assbuf_out = AssemblyBuffer::new();
let assbuf_in = AssemblyBuffer::new();
let (net_tx, net_rx) = flume::unbounded();
let first = Arc::new(AtomicBool::new(true));
let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| {
let net_tx = net_tx.clone();
let first = first.clone();
async move {
// Send only first packet, drop rest
if first.swap(false, Ordering::Relaxed) {
net_tx
.send_async((framed_chunk, remote_addr))
.await
.expect("should send");
}
Ok(NetworkResult::value(()))
}
};
let mut all_sent = HashSet::new();
// Sending
let mut total_sent_size = 0usize;
let mut total_fragged = 0usize;
println!("sending");
for _ in 0..1000 {
let random_len = (get_random_u32() % 65536) as usize;
if random_len > 1280 {
total_fragged += 1;
}
total_sent_size += random_len;
let mut message = vec![1u8; random_len];
random_bytes(&mut message);
let remote_addr = random_sockaddr();
// Send single message
all_sent.insert((message.clone(), remote_addr));
assert!(matches!(
assbuf_out
.split_message(message.clone(), remote_addr, sender)
.await,
Ok(NetworkResult::Value(()))
));
first.store(true, Ordering::Relaxed);
}
println!("all_sent len={}", all_sent.len());
println!("total_sent_size = {}", total_sent_size);
println!("fragments sent = {}", net_rx.len());
println!("total_fragged = {}", total_fragged);
drop(net_tx);
// Receiving
println!("receiving");
while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await {
// Send to input
let r_message = assbuf_in.insert_frame(&frame, r_remote_addr);
// We should have gotten the same message
if let Some(r_message) = r_message {
assert!(all_sent.remove(&(r_message, r_remote_addr)));
}
}
println!("all_sent len={}", all_sent.len());
// Shoud have dropped all fragged packets
assert_eq!(all_sent.len(), total_fragged);
}
pub async fn test_many_frags_reordered() {
info!("-- test_many_frags_reordered");
let assbuf_out = AssemblyBuffer::new();
let assbuf_in = AssemblyBuffer::new();
let (net_tx, net_rx) = flume::unbounded();
let reorder_buffer = Arc::new(Mutex::new(Vec::new()));
let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| {
let reorder_buffer = reorder_buffer.clone();
async move {
reorder_buffer.lock().push((framed_chunk, remote_addr));
Ok(NetworkResult::Value(()))
}
};
let mut all_sent = HashSet::new();
// Sending
let mut total_sent_size = 0usize;
let mut rng = rand::thread_rng();
println!("sending");
for _ in 0..1000 {
let random_len = (get_random_u32() % 65536) as usize;
total_sent_size += random_len;
let mut message = vec![1u8; random_len];
random_bytes(&mut message);
let remote_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(1, 2, 3, 4), 5678));
// Send single message
all_sent.insert((message.clone(), remote_addr));
assert!(matches!(
assbuf_out
.split_message(message.clone(), remote_addr, sender)
.await,
Ok(NetworkResult::Value(()))
));
// Shuffle fragments
let items = {
let mut rbinner = reorder_buffer.lock();
rbinner.shuffle(&mut rng);
let items = rbinner.clone();
rbinner.clear();
items
};
for p in items {
net_tx.send_async(p).await.expect("should send");
}
}
println!("all_sent len={}", all_sent.len());
println!("total_sent_size = {}", total_sent_size);
println!("fragments sent = {}", net_rx.len());
drop(net_tx);
// Receiving
println!("receiving");
while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await {
// Send to input
let r_message = assbuf_in.insert_frame(&frame, r_remote_addr);
// We should have gotten the same message
if let Some(r_message) = r_message {
assert!(all_sent.remove(&(r_message, r_remote_addr)));
}
}
println!("all_sent len={}", all_sent.len());
// Shoud have dropped no packets
assert_eq!(all_sent.len(), 0);
}
pub async fn test_all() { pub async fn test_all() {
test_single_out_in().await; test_single_out_in().await;
test_one_frag_out_in().await;
test_many_frags_out_in().await;
test_many_frags_out_in_single_host().await;
test_many_frags_with_drops().await;
test_many_frags_reordered().await;
} }