switch to flume

This commit is contained in:
John Smith 2022-03-10 10:18:47 -05:00
parent 909aa14fe2
commit bb82811975
7 changed files with 51 additions and 23 deletions

35
Cargo.lock generated
View File

@ -1524,6 +1524,19 @@ dependencies = [
"yansi", "yansi",
] ]
[[package]]
name = "flume"
version = "0.10.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b279436a715a9de95dcd26b151db590a71961cc06e54918b24fe0dd5b7d3fc4"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"pin-project 1.0.10",
"spin 0.9.2",
]
[[package]] [[package]]
name = "fnv" name = "fnv"
version = "1.0.7" version = "1.0.7"
@ -2269,6 +2282,15 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "nanorand"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "729eb334247daa1803e0a094d0a5c55711b85571179f5ec6e53eccfdf7008958"
dependencies = [
"getrandom 0.2.5",
]
[[package]] [[package]]
name = "nb-connect" name = "nb-connect"
version = "1.2.0" version = "1.2.0"
@ -3212,7 +3234,7 @@ dependencies = [
"cc", "cc",
"libc", "libc",
"once_cell", "once_cell",
"spin", "spin 0.5.2",
"untrusted", "untrusted",
"web-sys", "web-sys",
"winapi", "winapi",
@ -3753,6 +3775,15 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5"
dependencies = [
"lock_api 0.4.6",
]
[[package]] [[package]]
name = "stable_deref_trait" name = "stable_deref_trait"
version = "1.2.0" version = "1.2.0"
@ -4145,7 +4176,6 @@ version = "0.1.0"
dependencies = [ dependencies = [
"android_logger", "android_logger",
"anyhow", "anyhow",
"async-channel",
"async-lock", "async-lock",
"async-std", "async-std",
"async-tls", "async-tls",
@ -4166,6 +4196,7 @@ dependencies = [
"digest 0.9.0", "digest 0.9.0",
"directories", "directories",
"ed25519-dalek", "ed25519-dalek",
"flume",
"futures-util", "futures-util",
"generic-array 0.14.5", "generic-array 0.14.5",
"getrandom 0.2.5", "getrandom 0.2.5",

View File

@ -33,6 +33,7 @@ lazy_static = "^1"
directories = "^4" directories = "^4"
once_cell = "^1" once_cell = "^1"
json = "^0" json = "^0"
flume = { version = "^0", features = ["async"] }
ed25519-dalek = { version = "^1", default_features = false, features = ["alloc", "u64_backend"] } ed25519-dalek = { version = "^1", default_features = false, features = ["alloc", "u64_backend"] }
x25519-dalek = { package = "x25519-dalek-ng", version = "^1", default_features = false, features = ["u64_backend"] } x25519-dalek = { package = "x25519-dalek-ng", version = "^1", default_features = false, features = ["u64_backend"] }
@ -90,7 +91,6 @@ getrandom = { version = "^0", features = ["js"] }
ws_stream_wasm = "^0" ws_stream_wasm = "^0"
async_executors = { version = "^0", default-features = false, features = [ "bindgen" ]} async_executors = { version = "^0", default-features = false, features = [ "bindgen" ]}
async-lock = "^2" async-lock = "^2"
async-channel = { version = "^1" }
# Configuration for WASM32 'web-sys' crate # Configuration for WASM32 'web-sys' crate
[target.'cfg(target_arch = "wasm32")'.dependencies.web-sys] [target.'cfg(target_arch = "wasm32")'.dependencies.web-sys]

View File

@ -9,7 +9,7 @@ struct ApiLoggerInner {
level: LevelFilter, level: LevelFilter,
filter_ignore: Cow<'static, [Cow<'static, str>]>, filter_ignore: Cow<'static, [Cow<'static, str>]>,
join_handle: Option<JoinHandle<()>>, join_handle: Option<JoinHandle<()>>,
tx: async_channel::Sender<(VeilidLogLevel, String)>, tx: Option<flume::Sender<(VeilidLogLevel, String)>>,
} }
#[derive(Clone)] #[derive(Clone)]
@ -21,9 +21,9 @@ static API_LOGGER: OnceCell<ApiLogger> = OnceCell::new();
impl ApiLogger { impl ApiLogger {
fn new_inner(level: LevelFilter, update_callback: UpdateCallback) -> ApiLoggerInner { fn new_inner(level: LevelFilter, update_callback: UpdateCallback) -> ApiLoggerInner {
let (tx, rx) = async_channel::unbounded::<(VeilidLogLevel, String)>(); let (tx, rx) = flume::unbounded::<(VeilidLogLevel, String)>();
let join_handle: Option<JoinHandle<()>> = Some(spawn(async move { let join_handle: Option<JoinHandle<()>> = Some(spawn(async move {
while let Ok(v) = rx.recv().await { while let Ok(v) = rx.recv_async().await {
(update_callback)(VeilidUpdate::Log { (update_callback)(VeilidUpdate::Log {
log_level: v.0, log_level: v.0,
message: v.1, message: v.1,
@ -35,7 +35,7 @@ impl ApiLogger {
level, level,
filter_ignore: Default::default(), filter_ignore: Default::default(),
join_handle, join_handle,
tx, tx: Some(tx),
} }
} }
@ -60,7 +60,7 @@ impl ApiLogger {
// Terminate channel // Terminate channel
if let Some(inner) = (*inner).as_mut() { if let Some(inner) = (*inner).as_mut() {
inner.tx.close(); inner.tx = None;
join_handle = inner.join_handle.take(); join_handle = inner.join_handle.take();
} }
*inner = None; *inner = None;
@ -139,7 +139,9 @@ impl Log for ApiLogger {
let s = format!("{}{}{}", tgt, loc, record.args()); let s = format!("{}{}{}", tgt, loc, record.args());
let _ = inner.tx.try_send((ll, s)); if let Some(tx) = &inner.tx {
let _ = tx.try_send((ll, s));
}
} }
} }
} }

View File

@ -15,7 +15,7 @@ const CONNECTION_PROCESSOR_CHANNEL_SIZE: usize = 128usize;
struct ConnectionManagerInner { struct ConnectionManagerInner {
connection_table: ConnectionTable, connection_table: ConnectionTable,
connection_processor_jh: Option<JoinHandle<()>>, connection_processor_jh: Option<JoinHandle<()>>,
connection_add_channel_tx: Option<async_channel::Sender<SystemPinBoxFuture<()>>>, connection_add_channel_tx: Option<flume::Sender<SystemPinBoxFuture<()>>>,
} }
impl core::fmt::Debug for ConnectionManagerInner { impl core::fmt::Debug for ConnectionManagerInner {
@ -70,7 +70,7 @@ impl ConnectionManager {
pub async fn startup(&self) { pub async fn startup(&self) {
trace!("startup connection manager"); trace!("startup connection manager");
let mut inner = self.arc.inner.lock().await; let mut inner = self.arc.inner.lock().await;
let cac = async_channel::bounded(CONNECTION_PROCESSOR_CHANNEL_SIZE); // xxx move to config let cac = flume::bounded(CONNECTION_PROCESSOR_CHANNEL_SIZE); // xxx move to config
inner.connection_add_channel_tx = Some(cac.0); inner.connection_add_channel_tx = Some(cac.0);
let rx = cac.1.clone(); let rx = cac.1.clone();
let this = self.clone(); let this = self.clone();
@ -196,12 +196,12 @@ impl ConnectionManager {
// Process connection oriented sockets in the background // Process connection oriented sockets in the background
// This never terminates and must have its task cancelled once started // This never terminates and must have its task cancelled once started
// Task cancellation is performed by shutdown() by dropping the join handle // Task cancellation is performed by shutdown() by dropping the join handle
async fn connection_processor(self, rx: async_channel::Receiver<SystemPinBoxFuture<()>>) { async fn connection_processor(self, rx: flume::Receiver<SystemPinBoxFuture<()>>) {
let mut connection_futures: FuturesUnordered<SystemPinBoxFuture<()>> = let mut connection_futures: FuturesUnordered<SystemPinBoxFuture<()>> =
FuturesUnordered::new(); FuturesUnordered::new();
loop { loop {
// Either process an existing connection, or receive a new one to add to our list // Either process an existing connection, or receive a new one to add to our list
match select(connection_futures.next(), Box::pin(rx.recv())).await { match select(connection_futures.next(), Box::pin(rx.recv_async())).await {
Either::Left((x, _)) => { Either::Left((x, _)) => {
// Processed some connection to completion, or there are none left // Processed some connection to completion, or there are none left
match x { match x {
@ -210,7 +210,7 @@ impl ConnectionManager {
} }
None => { None => {
// No connections to process, wait for one // No connections to process, wait for one
match rx.recv().await { match rx.recv_async().await {
Ok(v) => { Ok(v) => {
connection_futures.push(v); connection_futures.push(v);
} }

View File

@ -329,14 +329,12 @@ impl NetworkInterfaces {
// returns Ok(false) if refresh had no changes, Ok(true) if changes were present // returns Ok(false) if refresh had no changes, Ok(true) if changes were present
pub async fn refresh(&mut self) -> Result<bool, String> { pub async fn refresh(&mut self) -> Result<bool, String> {
self.valid = false; self.valid = false;
eprintln!("a");
let last_interfaces = core::mem::take(&mut self.interfaces); let last_interfaces = core::mem::take(&mut self.interfaces);
let mut platform_support = PlatformSupport::new().map_err(logthru_net!())?; let mut platform_support = PlatformSupport::new().map_err(logthru_net!())?;
platform_support platform_support
.get_interfaces(&mut self.interfaces) .get_interfaces(&mut self.interfaces)
.await?; .await?;
eprintln!("b");
self.valid = true; self.valid = true;
@ -344,8 +342,6 @@ impl NetworkInterfaces {
if changed { if changed {
trace!("NetworkInterfaces refreshed: {:#?}?", self); trace!("NetworkInterfaces refreshed: {:#?}?", self);
} }
eprintln!("c");
xxx investigate why things get stuck here. threading and dart issue with logging ?
Ok(changed) Ok(changed)
} }
pub fn len(&self) -> usize { pub fn len(&self) -> usize {

View File

@ -144,7 +144,7 @@ pub struct RPCProcessorInner {
routing_table: RoutingTable, routing_table: RoutingTable,
node_id: key::DHTKey, node_id: key::DHTKey,
node_id_secret: key::DHTKeySecret, node_id_secret: key::DHTKeySecret,
send_channel: Option<async_channel::Sender<RPCMessage>>, send_channel: Option<flume::Sender<RPCMessage>>,
timeout: u64, timeout: u64,
max_route_hop_count: usize, max_route_hop_count: usize,
waiting_rpc_table: BTreeMap<OperationId, EventualValue<RPCMessageReader>>, waiting_rpc_table: BTreeMap<OperationId, EventualValue<RPCMessageReader>>,
@ -1246,8 +1246,8 @@ impl RPCProcessor {
} }
} }
async fn rpc_worker(self, receiver: async_channel::Receiver<RPCMessage>) { async fn rpc_worker(self, receiver: flume::Receiver<RPCMessage>) {
while let Ok(msg) = receiver.recv().await { while let Ok(msg) = receiver.recv_async().await {
let _ = self let _ = self
.process_rpc_message(msg) .process_rpc_message(msg)
.await .await
@ -1285,7 +1285,7 @@ impl RPCProcessor {
} }
inner.timeout = timeout; inner.timeout = timeout;
inner.max_route_hop_count = max_route_hop_count; inner.max_route_hop_count = max_route_hop_count;
let channel = async_channel::bounded(queue_size as usize); let channel = flume::bounded(queue_size as usize);
inner.send_channel = Some(channel.0.clone()); inner.send_channel = Some(channel.0.clone());
// spin up N workers // spin up N workers

View File

@ -70,7 +70,6 @@ cfg_if! {
pub use async_std::pin::Pin; pub use async_std::pin::Pin;
pub use async_std::sync::Mutex as AsyncMutex; pub use async_std::sync::Mutex as AsyncMutex;
pub use async_std::sync::MutexGuard as AsyncMutexGuard; pub use async_std::sync::MutexGuard as AsyncMutexGuard;
pub use async_std::channel as async_channel;
pub use std::net::{ SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, IpAddr, Ipv4Addr, Ipv6Addr }; pub use std::net::{ SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, IpAddr, Ipv4Addr, Ipv6Addr };
pub type SystemPinBoxFuture<T> = PinBox<dyn Future<Output = T> + Send + 'static>; pub type SystemPinBoxFuture<T> = PinBox<dyn Future<Output = T> + Send + 'static>;
pub type SystemPinBoxFutureLifetime<'a, T> = PinBox<dyn Future<Output = T> + Send + 'a>; pub type SystemPinBoxFutureLifetime<'a, T> = PinBox<dyn Future<Output = T> + Send + 'a>;