core fixes

This commit is contained in:
John Smith
2022-11-29 19:22:33 -05:00
parent f7582fabb2
commit 5c0a500971
60 changed files with 98 additions and 1160 deletions

View File

@@ -1,4 +1,3 @@
pub mod test_async_tag_lock;
pub mod test_host_interface;
pub mod test_protected_store;
pub mod test_table_store;

View File

@@ -1,158 +0,0 @@
use crate::xx::*;
pub async fn test_simple_no_contention() {
info!("test_simple_no_contention");
let table = AsyncTagLockTable::new();
let a1 = SocketAddr::new("1.2.3.4".parse().unwrap(), 1234);
let a2 = SocketAddr::new("6.9.6.9".parse().unwrap(), 6969);
{
let g1 = table.lock_tag(a1).await;
let g2 = table.lock_tag(a2).await;
drop(g2);
drop(g1);
}
{
let g1 = table.lock_tag(a1).await;
let g2 = table.lock_tag(a2).await;
drop(g1);
drop(g2);
}
assert_eq!(table.len(), 0);
}
pub async fn test_simple_single_contention() {
info!("test_simple_single_contention");
let table = AsyncTagLockTable::new();
let a1 = SocketAddr::new("1.2.3.4".parse().unwrap(), 1234);
let g1 = table.lock_tag(a1).await;
info!("locked");
let t1 = spawn(async move {
// move the guard into the task
let _g1_take = g1;
// hold the guard for a bit
info!("waiting");
sleep(1000).await;
// release the guard
info!("released");
});
// wait to lock again, will contend until spawned task exits
let _g1_b = table.lock_tag(a1).await;
info!("locked");
// Ensure task is joined
t1.await;
assert_eq!(table.len(), 1);
}
pub async fn test_simple_double_contention() {
info!("test_simple_double_contention");
let table = AsyncTagLockTable::new();
let a1 = SocketAddr::new("1.2.3.4".parse().unwrap(), 1234);
let a2 = SocketAddr::new("6.9.6.9".parse().unwrap(), 6969);
let g1 = table.lock_tag(a1).await;
let g2 = table.lock_tag(a2).await;
info!("locked");
let t1 = spawn(async move {
// move the guard into the tas
let _g1_take = g1;
// hold the guard for a bit
info!("waiting");
sleep(1000).await;
// release the guard
info!("released");
});
let t2 = spawn(async move {
// move the guard into the task
let _g2_take = g2;
// hold the guard for a bit
info!("waiting");
sleep(500).await;
// release the guard
info!("released");
});
// wait to lock again, will contend until spawned task exits
let _g1_b = table.lock_tag(a1).await;
// wait to lock again, should complete immediately
let _g2_b = table.lock_tag(a2).await;
info!("locked");
// Ensure tasks are joined
t1.await;
t2.await;
assert_eq!(table.len(), 2);
}
pub async fn test_parallel_single_contention() {
info!("test_parallel_single_contention");
let table = AsyncTagLockTable::new();
let a1 = SocketAddr::new("1.2.3.4".parse().unwrap(), 1234);
let table1 = table.clone();
let t1 = spawn(async move {
// lock the tag
let _g = table1.lock_tag(a1).await;
info!("locked t1");
// hold the guard for a bit
info!("waiting t1");
sleep(500).await;
// release the guard
info!("released t1");
});
let table2 = table.clone();
let t2 = spawn(async move {
// lock the tag
let _g = table2.lock_tag(a1).await;
info!("locked t2");
// hold the guard for a bit
info!("waiting t2");
sleep(500).await;
// release the guard
info!("released t2");
});
let table3 = table.clone();
let t3 = spawn(async move {
// lock the tag
let _g = table3.lock_tag(a1).await;
info!("locked t3");
// hold the guard for a bit
info!("waiting t3");
sleep(500).await;
// release the guard
info!("released t3");
});
// Ensure tasks are joined
t1.await;
t2.await;
t3.await;
assert_eq!(table.len(), 0);
}
pub async fn test_all() {
test_simple_no_contention().await;
test_simple_single_contention().await;
test_parallel_single_contention().await;
}

View File

@@ -1,469 +1,13 @@
use crate::xx::*;
use crate::*;
cfg_if! {
if #[cfg(target_arch = "wasm32")] {
use js_sys::*;
} else {
use std::time::{Duration, SystemTime};
}
}
pub async fn test_log() {
info!("testing log");
}
pub async fn test_get_timestamp() {
info!("testing get_timestamp");
let t1 = get_timestamp();
let t2 = get_timestamp();
assert!(t2 >= t1);
}
pub async fn test_eventual() {
info!("testing Eventual");
{
let e1 = Eventual::new();
let i1 = e1.instance_clone(1u32);
let i2 = e1.instance_clone(2u32);
let i3 = e1.instance_clone(3u32);
drop(i3);
let i4 = e1.instance_clone(4u32);
drop(i2);
let jh = spawn(async move {
sleep(1000).await;
e1.resolve();
});
assert_eq!(i1.await, 1u32);
assert_eq!(i4.await, 4u32);
jh.await;
}
{
let e1 = Eventual::new();
let i1 = e1.instance_clone(1u32);
let i2 = e1.instance_clone(2u32);
let i3 = e1.instance_clone(3u32);
let i4 = e1.instance_clone(4u32);
let e1_c1 = e1.clone();
let jh = spawn(async move {
let i5 = e1.instance_clone(5u32);
let i6 = e1.instance_clone(6u32);
assert_eq!(i1.await, 1u32);
assert_eq!(i5.await, 5u32);
assert_eq!(i6.await, 6u32);
});
sleep(1000).await;
let resolved = e1_c1.resolve();
drop(i2);
drop(i3);
assert_eq!(i4.await, 4u32);
resolved.await;
jh.await;
}
{
let e1 = Eventual::new();
let i1 = e1.instance_clone(1u32);
let i2 = e1.instance_clone(2u32);
let e1_c1 = e1.clone();
let jh = spawn(async move {
assert_eq!(i1.await, 1u32);
assert_eq!(i2.await, 2u32);
});
sleep(1000).await;
e1_c1.resolve().await;
jh.await;
e1_c1.reset();
//
let j1 = e1.instance_clone(1u32);
let j2 = e1.instance_clone(2u32);
let jh = spawn(async move {
assert_eq!(j1.await, 1u32);
assert_eq!(j2.await, 2u32);
});
sleep(1000).await;
e1_c1.resolve().await;
jh.await;
e1_c1.reset();
}
}
pub async fn test_eventual_value() {
info!("testing Eventual Value");
{
let e1 = EventualValue::<u32>::new();
let i1 = e1.instance();
let i2 = e1.instance();
let i3 = e1.instance();
drop(i3);
let i4 = e1.instance();
drop(i2);
let e1_c1 = e1.clone();
let jh = spawn(async move {
sleep(1000).await;
e1_c1.resolve(3u32);
});
i1.await;
i4.await;
jh.await;
assert_eq!(e1.take_value(), Some(3u32));
}
{
let e1 = EventualValue::new();
let i1 = e1.instance();
let i2 = e1.instance();
let i3 = e1.instance();
let i4 = e1.instance();
let e1_c1 = e1.clone();
let jh = spawn(async move {
let i5 = e1.instance();
let i6 = e1.instance();
i1.await;
i5.await;
i6.await;
});
sleep(1000).await;
let resolved = e1_c1.resolve(4u16);
drop(i2);
drop(i3);
i4.await;
resolved.await;
jh.await;
assert_eq!(e1_c1.take_value(), Some(4u16));
}
{
let e1 = EventualValue::new();
assert_eq!(e1.take_value(), None);
let i1 = e1.instance();
let i2 = e1.instance();
let e1_c1 = e1.clone();
let jh = spawn(async move {
i1.await;
i2.await;
});
sleep(1000).await;
e1_c1.resolve(5u32).await;
jh.await;
assert_eq!(e1_c1.take_value(), Some(5u32));
e1_c1.reset();
assert_eq!(e1_c1.take_value(), None);
//
let j1 = e1.instance();
let j2 = e1.instance();
let jh = spawn(async move {
j1.await;
j2.await;
});
sleep(1000).await;
e1_c1.resolve(6u32).await;
jh.await;
assert_eq!(e1_c1.take_value(), Some(6u32));
e1_c1.reset();
assert_eq!(e1_c1.take_value(), None);
}
}
pub async fn test_eventual_value_clone() {
info!("testing Eventual Value Clone");
{
let e1 = EventualValueClone::<u32>::new();
let i1 = e1.instance();
let i2 = e1.instance();
let i3 = e1.instance();
drop(i3);
let i4 = e1.instance();
drop(i2);
let jh = spawn(async move {
sleep(1000).await;
e1.resolve(3u32);
});
assert_eq!(i1.await, 3);
assert_eq!(i4.await, 3);
jh.await;
}
{
let e1 = EventualValueClone::new();
let i1 = e1.instance();
let i2 = e1.instance();
let i3 = e1.instance();
let i4 = e1.instance();
let e1_c1 = e1.clone();
let jh = spawn(async move {
let i5 = e1.instance();
let i6 = e1.instance();
assert_eq!(i1.await, 4);
assert_eq!(i5.await, 4);
assert_eq!(i6.await, 4);
});
sleep(1000).await;
let resolved = e1_c1.resolve(4u16);
drop(i2);
drop(i3);
assert_eq!(i4.await, 4);
resolved.await;
jh.await;
}
{
let e1 = EventualValueClone::new();
let i1 = e1.instance();
let i2 = e1.instance();
let e1_c1 = e1.clone();
let jh = spawn(async move {
assert_eq!(i1.await, 5);
assert_eq!(i2.await, 5);
});
sleep(1000).await;
e1_c1.resolve(5u32).await;
jh.await;
e1_c1.reset();
//
let j1 = e1.instance();
let j2 = e1.instance();
let jh = spawn(async move {
assert_eq!(j1.await, 6);
assert_eq!(j2.await, 6);
});
sleep(1000).await;
e1_c1.resolve(6u32).await;
jh.await;
e1_c1.reset();
}
}
pub async fn test_interval() {
info!("testing interval");
let tick: Arc<Mutex<u32>> = Arc::new(Mutex::new(0u32));
let stopper = interval(1000, move || {
let tick = tick.clone();
async move {
let mut tick = tick.lock();
trace!("tick {}", tick);
*tick += 1;
}
});
sleep(5500).await;
stopper.await;
}
pub async fn test_timeout() {
info!("testing timeout");
let tick: Arc<Mutex<u32>> = Arc::new(Mutex::new(0u32));
let tick_1 = tick.clone();
assert!(
timeout(2500, async move {
let mut tick = tick_1.lock();
trace!("tick {}", tick);
sleep(1000).await;
*tick += 1;
trace!("tick {}", tick);
sleep(1000).await;
*tick += 1;
trace!("tick {}", tick);
sleep(1000).await;
*tick += 1;
trace!("tick {}", tick);
sleep(1000).await;
*tick += 1;
})
.await
.is_err(),
"should have timed out"
);
let ticks = *tick.lock();
assert!(ticks <= 2);
}
pub async fn test_sleep() {
info!("testing sleep");
cfg_if! {
if #[cfg(target_arch = "wasm32")] {
let t1 = Date::now();
intf::sleep(1000).await;
let t2 = Date::now();
assert!((t2-t1) >= 1000.0);
} else {
let sys_time = SystemTime::now();
let one_sec = Duration::from_secs(1);
sleep(1000).await;
assert!(sys_time.elapsed().unwrap() >= one_sec);
}
}
}
macro_rules! assert_split_url {
($url:expr, $scheme:expr, $host:expr) => {
assert_eq!(
SplitUrl::from_str($url),
Ok(SplitUrl::new($scheme, None, $host, None, None))
);
};
($url:expr, $scheme:expr, $host:expr, $port:expr) => {
assert_eq!(
SplitUrl::from_str($url),
Ok(SplitUrl::new($scheme, None, $host, $port, None))
);
};
($url:expr, $scheme:expr, $host:expr, $port:expr, $path:expr) => {
assert_eq!(
SplitUrl::from_str($url),
Ok(SplitUrl::new(
$scheme,
None,
$host,
$port,
Some(SplitUrlPath::new(
$path,
Option::<String>::None,
Option::<String>::None
))
))
);
};
($url:expr, $scheme:expr, $host:expr, $port:expr, $path:expr, $frag:expr, $query:expr) => {
assert_eq!(
SplitUrl::from_str($url),
Ok(SplitUrl::new(
$scheme,
None,
$host,
$port,
Some(SplitUrlPath::new($path, $frag, $query))
))
);
};
}
macro_rules! assert_split_url_parse {
($url:expr) => {
let url = $url;
let su1 = SplitUrl::from_str(url).expect("should parse");
assert_eq!(su1.to_string(), url);
};
}
fn host<S: AsRef<str>>(s: S) -> SplitUrlHost {
SplitUrlHost::Hostname(s.as_ref().to_owned())
}
fn ip<S: AsRef<str>>(s: S) -> SplitUrlHost {
SplitUrlHost::IpAddr(IpAddr::from_str(s.as_ref()).unwrap())
}
pub async fn test_split_url() {
info!("testing split_url");
assert_split_url!("http://foo", "http", host("foo"));
assert_split_url!("http://foo:1234", "http", host("foo"), Some(1234));
assert_split_url!("http://foo:1234/", "http", host("foo"), Some(1234), "");
assert_split_url!(
"http://foo:1234/asdf/qwer",
"http",
host("foo"),
Some(1234),
"asdf/qwer"
);
assert_split_url!("http://foo/", "http", host("foo"), None, "");
assert_split_url!("http://11.2.3.144/", "http", ip("11.2.3.144"), None, "");
assert_split_url!("http://[1111::2222]/", "http", ip("1111::2222"), None, "");
assert_split_url!(
"http://[1111::2222]:123/",
"http",
ip("1111::2222"),
Some(123),
""
);
assert_split_url!(
"http://foo/asdf/qwer",
"http",
host("foo"),
None,
"asdf/qwer"
);
assert_split_url!(
"http://foo/asdf/qwer#3",
"http",
host("foo"),
None,
"asdf/qwer",
Some("3"),
Option::<String>::None
);
assert_split_url!(
"http://foo/asdf/qwer?xxx",
"http",
host("foo"),
None,
"asdf/qwer",
Option::<String>::None,
Some("xxx")
);
assert_split_url!(
"http://foo/asdf/qwer#yyy?xxx",
"http",
host("foo"),
None,
"asdf/qwer",
Some("yyy"),
Some("xxx")
);
assert_err!(SplitUrl::from_str("://asdf"));
assert_err!(SplitUrl::from_str(""));
assert_err!(SplitUrl::from_str("::"));
assert_err!(SplitUrl::from_str("://:"));
assert_err!(SplitUrl::from_str("a://:"));
assert_err!(SplitUrl::from_str("a://:1243"));
assert_err!(SplitUrl::from_str("a://:65536"));
assert_err!(SplitUrl::from_str("a://:-16"));
assert_err!(SplitUrl::from_str("a:///"));
assert_err!(SplitUrl::from_str("a:///qwer:"));
assert_err!(SplitUrl::from_str("a:///qwer://"));
assert_err!(SplitUrl::from_str("a://qwer://"));
assert_err!(SplitUrl::from_str("a://[1111::2222]:/"));
assert_err!(SplitUrl::from_str("a://[1111::2222]:"));
assert_split_url_parse!("sch://foo:bar@baz.com:1234/fnord#qux?zuz");
assert_split_url_parse!("sch://foo:bar@baz.com:1234/fnord#qux");
assert_split_url_parse!("sch://foo:bar@baz.com:1234/fnord?zuz");
assert_split_url_parse!("sch://foo:bar@baz.com:1234/fnord/");
assert_split_url_parse!("sch://foo:bar@baz.com:1234//");
assert_split_url_parse!("sch://foo:bar@baz.com:1234");
assert_split_url_parse!("sch://foo:bar@[1111::2222]:1234");
assert_split_url_parse!("sch://foo:bar@[::]:1234");
assert_split_url_parse!("sch://foo:bar@1.2.3.4:1234");
assert_split_url_parse!("sch://@baz.com:1234");
assert_split_url_parse!("sch://baz.com/asdf/asdf");
assert_split_url_parse!("sch://baz.com/");
assert_split_url_parse!("s://s");
}
cfg_if! {
if #[cfg(not(target_arch = "wasm32"))] {
use intf::network_interfaces::NetworkInterfaces;
pub async fn test_network_interfaces() {
info!("testing network interfaces");
let t1 = get_timestamp();
let interfaces = intf::utils::network_interfaces::NetworkInterfaces::new();
let interfaces = NetworkInterfaces::new();
let count = 100;
for x in 0..count {
info!("loop {}", x);
@@ -479,99 +23,7 @@ cfg_if! {
}
}
pub async fn test_get_random_u64() {
info!("testing random number generator for u64");
let t1 = get_timestamp();
let count = 10000;
for _ in 0..count {
let _ = get_random_u64();
}
let t2 = get_timestamp();
let tdiff = ((t2 - t1) as f64) / 1000000.0f64;
info!(
"running network interface test with {} iterations took {} seconds",
count, tdiff
);
}
pub async fn test_get_random_u32() {
info!("testing random number generator for u32");
let t1 = get_timestamp();
let count = 10000;
for _ in 0..count {
let _ = get_random_u32();
}
let t2 = get_timestamp();
let tdiff = ((t2 - t1) as f64) / 1000000.0f64;
info!(
"running network interface test with {} iterations took {} seconds",
count, tdiff
);
}
pub async fn test_must_join_single_future() {
info!("testing must join single future");
let sf = MustJoinSingleFuture::<u32>::new();
assert_eq!(sf.check().await, Ok(None));
assert_eq!(
sf.single_spawn(async {
sleep(2000).await;
69
})
.await,
Ok((None, true))
);
assert_eq!(sf.check().await, Ok(None));
assert_eq!(sf.single_spawn(async { panic!() }).await, Ok((None, false)));
assert_eq!(sf.join().await, Ok(Some(69)));
assert_eq!(
sf.single_spawn(async {
sleep(1000).await;
37
})
.await,
Ok((None, true))
);
sleep(2000).await;
assert_eq!(
sf.single_spawn(async {
sleep(1000).await;
27
})
.await,
Ok((Some(37), true))
);
sleep(2000).await;
assert_eq!(sf.join().await, Ok(Some(27)));
assert_eq!(sf.check().await, Ok(None));
}
pub async fn test_tools() {
info!("testing retry_falloff_log");
let mut last_us = 0u64;
for x in 0..1024 {
let cur_us = x as u64 * 1000000u64;
if retry_falloff_log(last_us, cur_us, 10_000_000u64, 6_000_000_000u64, 2.0f64) {
info!(" retry at {} secs", timestamp_to_secs(cur_us));
last_us = cur_us;
}
}
}
pub async fn test_all() {
test_log().await;
test_get_timestamp().await;
test_tools().await;
test_split_url().await;
test_get_random_u64().await;
test_get_random_u32().await;
test_sleep().await;
#[cfg(not(target_arch = "wasm32"))]
test_network_interfaces().await; XXX KEEP THIS IN NATIVE TESTS
test_must_join_single_future().await;
test_eventual().await;
test_eventual_value().await;
test_eventual_value_clone().await;
test_interval().await;
test_timeout().await;
test_network_interfaces().await;
}

View File

@@ -1,5 +1,4 @@
use super::test_veilid_config::*;
use crate::xx::*;
use crate::*;
async fn startup() -> VeilidAPI {

View File

@@ -1,5 +1,4 @@
use super::test_veilid_config::*;
use crate::xx::*;
use crate::*;
async fn startup() -> VeilidAPI {

View File

@@ -1,7 +1,7 @@
#![allow(clippy::bool_assert_comparison)]
use crate::xx::*;
use crate::*;
cfg_if! {
if #[cfg(not(target_arch = "wasm32"))] {
use std::fs::File;

View File

@@ -1,5 +1,4 @@
use super::test_veilid_config::*;
use crate::xx::*;
use crate::*;
pub async fn test_startup_shutdown() {

View File

@@ -1,13 +1,9 @@
//! Test suite for Native
#![cfg(not(target_arch = "wasm32"))]
mod test_async_peek_stream;
use crate::xx::*;
use crate::crypto::tests::*;
use crate::network_manager::tests::*;
use crate::tests::common::*;
use crate::*;
#[cfg(all(target_os = "android", feature = "android_tests"))]
use jni::{objects::JClass, objects::JObject, JNIEnv};
@@ -59,8 +55,6 @@ pub fn run_all_tests() {
exec_test_veilid_core();
info!("TEST: exec_test_veilid_config");
exec_test_veilid_config();
info!("TEST: exec_test_async_peek_stream");
exec_test_async_peek_stream();
info!("TEST: exec_test_connection_table");
exec_test_connection_table();
info!("TEST: exec_test_table_store");
@@ -71,8 +65,6 @@ pub fn run_all_tests() {
exec_test_crypto();
info!("TEST: exec_test_envelope_receipt");
exec_test_envelope_receipt();
info!("TEST: exec_test_async_tag_lock");
exec_test_async_tag_lock();
info!("Finished unit tests");
}
@@ -108,11 +100,6 @@ fn exec_test_veilid_config() {
test_veilid_config::test_all().await;
})
}
fn exec_test_async_peek_stream() {
block_on(async {
test_async_peek_stream::test_all().await;
})
}
fn exec_test_connection_table() {
block_on(async {
test_connection_table::test_all().await;
@@ -138,11 +125,7 @@ fn exec_test_envelope_receipt() {
test_envelope_receipt::test_all().await;
})
}
fn exec_test_async_tag_lock() {
block_on(async {
test_async_tag_lock::test_all().await;
})
}
///////////////////////////////////////////////////////////////////////////
cfg_if! {
if #[cfg(test)] {
@@ -190,13 +173,6 @@ cfg_if! {
exec_test_veilid_config();
}
#[test]
#[serial]
fn run_test_async_peek_stream() {
setup();
exec_test_async_peek_stream();
}
#[test]
#[serial]
fn run_test_connection_table() {
@@ -232,11 +208,5 @@ cfg_if! {
exec_test_envelope_receipt();
}
#[test]
#[serial]
fn run_test_async_tag_lock() {
setup();
exec_test_async_tag_lock();
}
}
}

View File

@@ -1,352 +0,0 @@
use crate::xx::*;
cfg_if! {
if #[cfg(feature="rt-async-std")] {
use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::FutureExt;
use async_std::task::sleep;
} else if #[cfg(feature="rt-tokio")] {
use tokio::net::{TcpListener, TcpStream};
use tokio::time::sleep;
use tokio_util::compat::*;
}
}
use futures_util::{AsyncReadExt, AsyncWriteExt};
use std::io;
static MESSAGE: &[u8; 62] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
async fn make_tcp_loopback() -> Result<(TcpStream, TcpStream), io::Error> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let local_addr = listener.local_addr()?;
let accept_future = async {
let (accepted_stream, peer_address) = listener.accept().await?;
trace!("connection from {}", peer_address);
accepted_stream.set_nodelay(true)?;
Result::<TcpStream, io::Error>::Ok(accepted_stream)
};
let connect_future = async {
sleep(Duration::from_secs(1)).await;
let connected_stream = TcpStream::connect(local_addr).await?;
connected_stream.set_nodelay(true)?;
Result::<TcpStream, io::Error>::Ok(connected_stream)
};
cfg_if! {
if #[cfg(feature="rt-async-std")] {
accept_future.try_join(connect_future).await
} else if #[cfg(feature="rt-tokio")] {
tokio::try_join!(accept_future, connect_future)
}
}
}
async fn make_async_peek_stream_loopback() -> (AsyncPeekStream, AsyncPeekStream) {
let (acc, conn) = make_tcp_loopback().await.unwrap();
#[cfg(feature = "rt-tokio")]
let acc = acc.compat();
#[cfg(feature = "rt-tokio")]
let conn = conn.compat();
let aps_a = AsyncPeekStream::new(acc);
let aps_c = AsyncPeekStream::new(conn);
(aps_a, aps_c)
}
#[cfg(feature = "rt-tokio")]
async fn make_stream_loopback() -> (Compat<TcpStream>, Compat<TcpStream>) {
let (a, c) = make_tcp_loopback().await.unwrap();
(a.compat(), c.compat())
}
#[cfg(feature = "rt-async-std")]
async fn make_stream_loopback() -> (TcpStream, TcpStream) {
make_tcp_loopback().await.unwrap()
}
pub async fn test_nothing() {
info!("test_nothing");
let (mut a, mut c) = make_stream_loopback().await;
let outbuf = MESSAGE.to_vec();
a.write_all(&outbuf).await.unwrap();
let mut inbuf: Vec<u8> = Vec::new();
inbuf.resize(outbuf.len(), 0u8);
c.read_exact(&mut inbuf).await.unwrap();
assert_eq!(inbuf, outbuf);
}
pub async fn test_no_peek() {
info!("test_no_peek");
let (mut a, mut c) = make_async_peek_stream_loopback().await;
let outbuf = MESSAGE.to_vec();
a.write_all(&outbuf).await.unwrap();
let mut inbuf: Vec<u8> = Vec::new();
inbuf.resize(outbuf.len(), 0u8);
c.read_exact(&mut inbuf).await.unwrap();
assert_eq!(inbuf, outbuf);
}
pub async fn test_peek_all_read() {
info!("test_peek_all_read");
let (mut a, mut c) = make_async_peek_stream_loopback().await;
// write everything
let outbuf = MESSAGE.to_vec();
a.write_all(&outbuf).await.unwrap();
// peek everything
let mut peekbuf1: Vec<u8> = Vec::new();
peekbuf1.resize(outbuf.len(), 0u8);
let peeksize1 = c.peek(&mut peekbuf1).await.unwrap();
assert_eq!(peeksize1, peekbuf1.len());
// read everything
let mut inbuf: Vec<u8> = Vec::new();
inbuf.resize(outbuf.len(), 0u8);
c.read_exact(&mut inbuf).await.unwrap();
assert_eq!(inbuf, outbuf);
assert_eq!(peekbuf1, outbuf);
}
pub async fn test_peek_some_read() {
info!("test_peek_some_read");
let (mut a, mut c) = make_async_peek_stream_loopback().await;
// write everything
let outbuf = MESSAGE.to_vec();
a.write_all(&outbuf).await.unwrap();
// peek partially
let mut peekbuf1: Vec<u8> = Vec::new();
peekbuf1.resize(outbuf.len() / 2, 0u8);
let peeksize1 = c.peek(&mut peekbuf1).await.unwrap();
assert_eq!(peeksize1, peekbuf1.len());
// read everything
let mut inbuf: Vec<u8> = Vec::new();
inbuf.resize(outbuf.len(), 0u8);
c.read_exact(&mut inbuf).await.unwrap();
assert_eq!(inbuf, outbuf);
assert_eq!(peekbuf1, outbuf[0..peeksize1].to_vec());
}
pub async fn test_peek_some_peek_some_read() {
info!("test_peek_some_peek_some_read");
let (mut a, mut c) = make_async_peek_stream_loopback().await;
// write everything
let outbuf = MESSAGE.to_vec();
a.write_all(&outbuf).await.unwrap();
// peek partially
let mut peekbuf1: Vec<u8> = Vec::new();
peekbuf1.resize(outbuf.len() / 4, 0u8);
let peeksize1 = c.peek(&mut peekbuf1).await.unwrap();
assert_eq!(peeksize1, peekbuf1.len());
// peek partially
let mut peekbuf2: Vec<u8> = Vec::new();
peekbuf2.resize(peeksize1 + 1, 0u8);
let peeksize2 = c.peek(&mut peekbuf2).await.unwrap();
assert_eq!(peeksize2, peekbuf2.len());
// read everything
let mut inbuf: Vec<u8> = Vec::new();
inbuf.resize(outbuf.len(), 0u8);
c.read_exact(&mut inbuf).await.unwrap();
assert_eq!(inbuf, outbuf);
assert_eq!(peekbuf1, outbuf[0..peeksize1].to_vec());
assert_eq!(peekbuf2, outbuf[0..peeksize2].to_vec());
}
pub async fn test_peek_some_read_peek_some_read() {
info!("test_peek_some_read_peek_some_read");
let (mut a, mut c) = make_async_peek_stream_loopback().await;
// write everything
let outbuf = MESSAGE.to_vec();
a.write_all(&outbuf).await.unwrap();
// peek partially
let mut peekbuf1: Vec<u8> = Vec::new();
peekbuf1.resize(outbuf.len() / 4, 0u8);
let peeksize1 = c.peek(&mut peekbuf1).await.unwrap();
assert_eq!(peeksize1, peekbuf1.len());
// read partially
let mut inbuf1: Vec<u8> = Vec::new();
inbuf1.resize(peeksize1 - 1, 0u8);
c.read_exact(&mut inbuf1).await.unwrap();
// peek partially
let mut peekbuf2: Vec<u8> = Vec::new();
peekbuf2.resize(2, 0u8);
let peeksize2 = c.peek(&mut peekbuf2).await.unwrap();
assert_eq!(peeksize2, peekbuf2.len());
// read partially
let mut inbuf2: Vec<u8> = Vec::new();
inbuf2.resize(2, 0u8);
c.read_exact(&mut inbuf2).await.unwrap();
assert_eq!(peekbuf1, outbuf[0..peeksize1].to_vec());
assert_eq!(inbuf1, outbuf[0..peeksize1 - 1].to_vec());
assert_eq!(peekbuf2, outbuf[peeksize1 - 1..peeksize1 + 1].to_vec());
assert_eq!(inbuf2, peekbuf2);
}
pub async fn test_peek_some_read_peek_all_read() {
info!("test_peek_some_read_peek_all_read");
let (mut a, mut c) = make_async_peek_stream_loopback().await;
// write everything
let outbuf = MESSAGE.to_vec();
a.write_all(&outbuf).await.unwrap();
// peek partially
let mut peekbuf1: Vec<u8> = Vec::new();
peekbuf1.resize(outbuf.len() / 4, 0u8);
let peeksize1 = c.peek(&mut peekbuf1).await.unwrap();
assert_eq!(peeksize1, peekbuf1.len());
// read partially
let mut inbuf1: Vec<u8> = Vec::new();
inbuf1.resize(peeksize1 + 1, 0u8);
c.read_exact(&mut inbuf1).await.unwrap();
// peek past end
let mut peekbuf2: Vec<u8> = Vec::new();
peekbuf2.resize(outbuf.len(), 0u8);
let peeksize2 = c.peek(&mut peekbuf2).await.unwrap();
assert_eq!(peeksize2, outbuf.len() - (peeksize1 + 1));
// read remaining
let mut inbuf2: Vec<u8> = Vec::new();
inbuf2.resize(peeksize2, 0u8);
c.read_exact(&mut inbuf2).await.unwrap();
assert_eq!(peekbuf1, outbuf[0..peeksize1].to_vec());
assert_eq!(inbuf1, outbuf[0..peeksize1 + 1].to_vec());
assert_eq!(
peekbuf2[0..peeksize2].to_vec(),
outbuf[peeksize1 + 1..outbuf.len()].to_vec()
);
assert_eq!(inbuf2, peekbuf2[0..peeksize2].to_vec());
}
pub async fn test_peek_some_read_peek_some_read_all_read() {
info!("test_peek_some_read_peek_some_read_peek_all_read");
let (mut a, mut c) = make_async_peek_stream_loopback().await;
// write everything
let outbuf = MESSAGE.to_vec();
a.write_all(&outbuf).await.unwrap();
// peek partially
let mut peekbuf1: Vec<u8> = Vec::new();
peekbuf1.resize(outbuf.len() / 4, 0u8);
let peeksize1 = c.peek(&mut peekbuf1).await.unwrap();
assert_eq!(peeksize1, peekbuf1.len());
// read partially
let mut inbuf1: Vec<u8> = Vec::new();
inbuf1.resize(peeksize1 - 1, 0u8);
c.read_exact(&mut inbuf1).await.unwrap();
// peek partially
let mut peekbuf2: Vec<u8> = Vec::new();
peekbuf2.resize(2, 0u8);
let peeksize2 = c.peek(&mut peekbuf2).await.unwrap();
assert_eq!(peeksize2, peekbuf2.len());
// read partially
let mut inbuf2: Vec<u8> = Vec::new();
inbuf2.resize(1, 0u8);
c.read_exact(&mut inbuf2).await.unwrap();
// read remaining
let mut inbuf3: Vec<u8> = Vec::new();
inbuf3.resize(outbuf.len() - peeksize1, 0u8);
c.read_exact(&mut inbuf3).await.unwrap();
assert_eq!(peekbuf1, outbuf[0..peeksize1].to_vec());
assert_eq!(inbuf1, outbuf[0..peeksize1 - 1].to_vec());
assert_eq!(
peekbuf2[0..peeksize2].to_vec(),
outbuf[peeksize1 - 1..peeksize1 + 1].to_vec()
);
assert_eq!(inbuf2, peekbuf2[0..1].to_vec());
assert_eq!(inbuf3, outbuf[peeksize1..outbuf.len()].to_vec());
}
pub async fn test_peek_exact_read_peek_exact_read_all_read() {
info!("test_peek_exact_read_peek_exact_read_all_read");
let (mut a, mut c) = make_async_peek_stream_loopback().await;
// write everything
let outbuf = MESSAGE.to_vec();
a.write_all(&outbuf).await.unwrap();
// peek partially
let mut peekbuf1: Vec<u8> = Vec::new();
peekbuf1.resize(outbuf.len() / 4, 0u8);
let peeksize1 = c.peek_exact(&mut peekbuf1).await.unwrap();
assert_eq!(peeksize1, peekbuf1.len());
// read partially
let mut inbuf1: Vec<u8> = Vec::new();
inbuf1.resize(peeksize1 - 1, 0u8);
c.read_exact(&mut inbuf1).await.unwrap();
// peek partially
let mut peekbuf2: Vec<u8> = Vec::new();
peekbuf2.resize(2, 0u8);
let peeksize2 = c.peek_exact(&mut peekbuf2).await.unwrap();
assert_eq!(peeksize2, peekbuf2.len());
// read partially
let mut inbuf2: Vec<u8> = Vec::new();
inbuf2.resize(1, 0u8);
c.read_exact(&mut inbuf2).await.unwrap();
// read remaining
let mut inbuf3: Vec<u8> = Vec::new();
inbuf3.resize(outbuf.len() - peeksize1, 0u8);
c.read_exact(&mut inbuf3).await.unwrap();
assert_eq!(peekbuf1, outbuf[0..peeksize1].to_vec());
assert_eq!(inbuf1, outbuf[0..peeksize1 - 1].to_vec());
assert_eq!(
peekbuf2[0..peeksize2].to_vec(),
outbuf[peeksize1 - 1..peeksize1 + 1].to_vec()
);
assert_eq!(inbuf2, peekbuf2[0..1].to_vec());
assert_eq!(inbuf3, outbuf[peeksize1..outbuf.len()].to_vec());
}
pub async fn test_all() {
test_nothing().await;
test_no_peek().await;
test_peek_all_read().await;
test_peek_some_read().await;
test_peek_some_peek_some_read().await;
test_peek_some_read_peek_some_read().await;
test_peek_some_read_peek_all_read().await;
test_peek_some_read_peek_some_read_all_read().await;
test_peek_exact_read_peek_exact_read_all_read().await;
}