assembly buffer

This commit is contained in:
John Smith 2023-06-23 12:05:28 -04:00
parent d21f580de2
commit e4f97cfefa
8 changed files with 219 additions and 9 deletions

2
Cargo.lock generated
View File

@ -6608,6 +6608,7 @@ dependencies = [
"cfg-if 1.0.0",
"console_error_panic_hook",
"eyre",
"flume",
"fn_name",
"futures-util",
"jni 0.21.1",
@ -6626,6 +6627,7 @@ dependencies = [
"paranoid-android",
"parking_lot 0.11.2",
"rand 0.7.3",
"range-set-blaze",
"rust-fsm",
"send_wrapper 0.6.0",
"serial_test",

View File

@ -4,8 +4,6 @@ pub mod udp;
pub mod wrtc;
pub mod ws;
mod assembly_buffer;
use super::*;
use std::io;

View File

@ -1,5 +1,4 @@
use super::*;
use assembly_buffer::*;
use sockets::*;
#[derive(Clone)]

View File

@ -36,6 +36,8 @@ rand = "^0.7"
rust-fsm = "^0"
backtrace = "^0"
fn_name = "^0"
range-set-blaze = "0.1.5"
flume = { version = "^0", features = ["async"] }
# Dependencies for native builds only
# Linux, Windows, Mac, iOS, Android

View File

@ -11,7 +11,7 @@ const HEADER_LEN: usize = 8;
const MAX_LEN: usize = LengthType::MAX as usize;
// XXX: keep statistics on all drops and why we dropped them
// XXX: move to config
// XXX: move to config eventually?
const FRAGMENT_LEN: usize = 1280 - HEADER_LEN;
const MAX_CONCURRENT_HOSTS: usize = 256;
const MAX_ASSEMBLIES_PER_HOST: usize = 256;
@ -27,7 +27,7 @@ struct PeerKey {
#[derive(Clone, Eq, PartialEq)]
struct MessageAssembly {
timestamp: Timestamp,
timestamp: u64,
seq: SequenceType,
data: Vec<u8>,
parts: RangeSetBlaze<LengthType>,
@ -35,13 +35,115 @@ struct MessageAssembly {
#[derive(Clone, Eq, PartialEq)]
struct PeerMessages {
assemblies: LinkedList<MessageAssembly>,
total_buffer: usize,
assemblies: VecDeque<MessageAssembly>,
}
impl PeerMessages {
pub fn new() -> Self {
Self {
assemblies: LinkedList::new(),
total_buffer: 0,
assemblies: VecDeque::new(),
}
}
fn merge_in_data(
&mut self,
timestamp: u64,
ass: usize,
off: LengthType,
len: LengthType,
chunk: &[u8],
) -> bool {
let assembly = &mut self.assemblies[ass];
// Ensure the new fragment hasn't redefined the message length, reusing the same seq
if assembly.data.len() != len as usize {
// Drop the assembly and just go with the new fragment as starting a new assembly
let seq = assembly.seq;
drop(assembly);
self.remove_assembly(ass);
self.new_assembly(timestamp, seq, off, len, chunk);
return false;
}
let part_start = off;
let part_end = off + chunk.len() as LengthType - 1;
let part = RangeSetBlaze::from_iter([part_start..=part_end]);
// if fragments overlap, drop the old assembly and go with a new one
if !assembly.parts.is_disjoint(&part) {
let seq = assembly.seq;
drop(assembly);
self.remove_assembly(ass);
self.new_assembly(timestamp, seq, off, len, chunk);
return false;
}
// Merge part
assembly.parts |= part;
assembly.data[part_start as usize..=part_end as usize].copy_from_slice(chunk);
// Check to see if this part is done
if assembly.parts.ranges_len() == 1
&& assembly.parts.first().unwrap() == 0
&& assembly.parts.last().unwrap() == len - 1
{
return true;
}
false
}
fn new_assembly(
&mut self,
timestamp: u64,
seq: SequenceType,
off: LengthType,
len: LengthType,
chunk: &[u8],
) -> usize {
// ensure we have enough space for the new assembly
self.reclaim_space(len as usize);
// make the assembly
let part_start = off;
let part_end = off + chunk.len() as LengthType - 1;
let mut assembly = MessageAssembly {
timestamp,
seq,
data: vec![0u8; len as usize],
parts: RangeSetBlaze::from_iter([part_start..=part_end]),
};
assembly.data[part_start as usize..=part_end as usize].copy_from_slice(chunk);
// Add the buffer length in
self.total_buffer += assembly.data.len();
self.assemblies.push_front(assembly);
// Was pushed front, return the front index
0
}
fn remove_assembly(&mut self, index: usize) -> MessageAssembly {
let assembly = self.assemblies.remove(index).unwrap();
self.total_buffer -= assembly.data.len();
assembly
}
fn truncate_assemblies(&mut self, new_len: usize) {
for an in new_len..self.assemblies.len() {
self.total_buffer -= self.assemblies[an].data.len();
}
self.assemblies.truncate(new_len);
}
fn reclaim_space(&mut self, needed_space: usize) {
// If we have too many assemblies or too much buffer rotate some out
while self.assemblies.len() > (MAX_ASSEMBLIES_PER_HOST - 1)
|| self.total_buffer > (MAX_BUFFER_PER_HOST - needed_space)
{
self.remove_assembly(self.assemblies.len() - 1);
}
}
@ -56,7 +158,37 @@ impl PeerMessages {
let cur_ts = get_timestamp();
// Get the assembly this belongs to by its sequence number
for a in self.assemblies {}
let mut ass = None;
for an in 0..self.assemblies.len() {
// If this assembly's timestamp is too old, then everything after it will be too, drop em all
let age = cur_ts.saturating_sub(self.assemblies[an].timestamp);
if age > MAX_ASSEMBLY_AGE_US {
self.truncate_assemblies(an);
break;
}
// If this assembly has a matching seq, then assemble with it
if self.assemblies[an].seq == seq {
ass = Some(an);
}
}
if ass.is_none() {
// Add a new assembly to the front and return the first index
self.new_assembly(cur_ts, seq, off, len, chunk);
return None;
}
let ass = ass.unwrap();
// Now that we have an assembly, merge in the fragment
let done = self.merge_in_data(cur_ts, ass, off, len, chunk);
// If the assembly is now equal to the entire range, then return it
if done {
let assembly = self.remove_assembly(ass);
return Some(assembly.data);
}
// Otherwise, do nothing
None
}
}
@ -128,7 +260,7 @@ impl AssemblyBuffer {
// See if we have a whole message and not a fragment
if off == 0 && len as usize == chunk.len() {
return Some(frame.to_vec());
return Some(chunk.to_vec());
}
// Drop fragments with offsets greater than or equal to the message length

View File

@ -1,4 +1,5 @@
// mod bump_port;
mod assembly_buffer;
mod async_peek_stream;
mod async_tag_lock;
mod callback_state_machine;
@ -88,6 +89,7 @@ cfg_if! {
}
// pub use bump_port::*;
pub use assembly_buffer::*;
pub use async_peek_stream::*;
pub use async_tag_lock::*;
pub use callback_state_machine::*;

View File

@ -1,6 +1,7 @@
//! Test suite for Native
#![cfg(not(target_arch = "wasm32"))]
mod test_assembly_buffer;
mod test_async_peek_stream;
use super::*;
@ -16,6 +17,8 @@ pub async fn run_all_tests() {
test_async_peek_stream::test_all().await;
info!("TEST: exec_test_async_tag_lock");
test_async_tag_lock::test_all().await;
info!("TEST: exec_test_assembly_buffer");
test_assembly_buffer::test_all().await;
info!("Finished unit tests");
}
@ -96,5 +99,14 @@ cfg_if! {
test_async_tag_lock::test_all().await;
});
}
#[test]
#[serial]
fn run_test_assembly_buffer() {
setup();
block_on(async {
test_assembly_buffer::test_all().await;
});
}
}
}

View File

@ -0,0 +1,63 @@
use crate::*;
fn random_sockaddr() -> SocketAddr {
if get_random_u32() & 1 == 0 {
let mut addr = [0u8; 16];
random_bytes(&mut addr);
let port = get_random_u32() as u16;
SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(addr), port, 0, 0))
} else {
let mut addr = [0u8; 4];
random_bytes(&mut addr);
let port = get_random_u32() as u16;
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(addr), port))
}
}
pub async fn test_single_out_in() {
let assbuf_out = AssemblyBuffer::new();
let assbuf_in = AssemblyBuffer::new();
let (net_tx, net_rx) = flume::unbounded();
let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| {
let net_tx = net_tx.clone();
async move {
net_tx
.send_async((framed_chunk, remote_addr))
.await
.expect("should send");
Ok(NetworkResult::value(()))
}
};
for _ in 0..1000 {
let message = vec![1u8; 1000];
let remote_addr = random_sockaddr();
// Send single message below fragmentation limit
assert!(matches!(
assbuf_out
.split_message(message.clone(), remote_addr, sender)
.await,
Ok(NetworkResult::Value(()))
));
// Ensure we didn't fragment
let (frame, r_remote_addr) = net_rx.recv_async().await.expect("should recv");
// Send to input
let r_message = assbuf_in
.insert_frame(&frame, r_remote_addr)
.expect("should get one out");
// We should have gotten the same message
assert_eq!(r_message, message);
assert_eq!(r_remote_addr, remote_addr);
}
// Shoud have consumed everything
assert!(net_rx.is_empty())
}
pub async fn test_all() {
test_single_out_in().await;
}