diff --git a/Cargo.lock b/Cargo.lock index 0cbe99c5..3b29885a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1524,6 +1524,19 @@ dependencies = [ "yansi", ] +[[package]] +name = "flume" +version = "0.10.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b279436a715a9de95dcd26b151db590a71961cc06e54918b24fe0dd5b7d3fc4" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "pin-project 1.0.10", + "spin 0.9.2", +] + [[package]] name = "fnv" version = "1.0.7" @@ -2269,6 +2282,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "nanorand" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "729eb334247daa1803e0a094d0a5c55711b85571179f5ec6e53eccfdf7008958" +dependencies = [ + "getrandom 0.2.5", +] + [[package]] name = "nb-connect" version = "1.2.0" @@ -3212,7 +3234,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", + "spin 0.5.2", "untrusted", "web-sys", "winapi", @@ -3753,6 +3775,15 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5" +dependencies = [ + "lock_api 0.4.6", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -4145,7 +4176,6 @@ version = "0.1.0" dependencies = [ "android_logger", "anyhow", - "async-channel", "async-lock", "async-std", "async-tls", @@ -4166,6 +4196,7 @@ dependencies = [ "digest 0.9.0", "directories", "ed25519-dalek", + "flume", "futures-util", "generic-array 0.14.5", "getrandom 0.2.5", diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index 9c6eeca8..6f7e29e1 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -33,6 +33,7 @@ lazy_static = "^1" directories = "^4" once_cell = "^1" json = "^0" +flume = { version = "^0", features = ["async"] } ed25519-dalek = { version = "^1", default_features = false, features = ["alloc", "u64_backend"] } x25519-dalek = { package = "x25519-dalek-ng", version = "^1", default_features = false, features = ["u64_backend"] } @@ -90,7 +91,6 @@ getrandom = { version = "^0", features = ["js"] } ws_stream_wasm = "^0" async_executors = { version = "^0", default-features = false, features = [ "bindgen" ]} async-lock = "^2" -async-channel = { version = "^1" } # Configuration for WASM32 'web-sys' crate [target.'cfg(target_arch = "wasm32")'.dependencies.web-sys] diff --git a/veilid-core/src/api_logger.rs b/veilid-core/src/api_logger.rs index 42baeeb0..60ab81ae 100644 --- a/veilid-core/src/api_logger.rs +++ b/veilid-core/src/api_logger.rs @@ -9,7 +9,7 @@ struct ApiLoggerInner { level: LevelFilter, filter_ignore: Cow<'static, [Cow<'static, str>]>, join_handle: Option<JoinHandle<()>>, - tx: async_channel::Sender<(VeilidLogLevel, String)>, + tx: Option<flume::Sender<(VeilidLogLevel, String)>>, } #[derive(Clone)] @@ -21,9 +21,9 @@ static API_LOGGER: OnceCell<ApiLogger> = OnceCell::new(); impl ApiLogger { fn new_inner(level: LevelFilter, update_callback: UpdateCallback) -> ApiLoggerInner { - let (tx, rx) = async_channel::unbounded::<(VeilidLogLevel, String)>(); + let (tx, rx) = flume::unbounded::<(VeilidLogLevel, String)>(); let join_handle: Option<JoinHandle<()>> = Some(spawn(async move { - while let Ok(v) = rx.recv().await { + while let Ok(v) = rx.recv_async().await { (update_callback)(VeilidUpdate::Log { log_level: v.0, message: v.1, @@ -35,7 +35,7 @@ impl ApiLogger { level, filter_ignore: Default::default(), join_handle, - tx, + tx: Some(tx), } } @@ -60,7 +60,7 @@ impl ApiLogger { // Terminate channel if let Some(inner) = (*inner).as_mut() { - inner.tx.close(); + inner.tx = None; join_handle = inner.join_handle.take(); } *inner = None; @@ -139,7 +139,9 @@ impl Log for ApiLogger { let s = format!("{}{}{}", tgt, loc, record.args()); - let _ = inner.tx.try_send((ll, s)); + if let Some(tx) = &inner.tx { + let _ = tx.try_send((ll, s)); + } } } } diff --git a/veilid-core/src/connection_manager.rs b/veilid-core/src/connection_manager.rs index d7d5e751..793bc356 100644 --- a/veilid-core/src/connection_manager.rs +++ b/veilid-core/src/connection_manager.rs @@ -15,7 +15,7 @@ const CONNECTION_PROCESSOR_CHANNEL_SIZE: usize = 128usize; struct ConnectionManagerInner { connection_table: ConnectionTable, connection_processor_jh: Option<JoinHandle<()>>, - connection_add_channel_tx: Option<async_channel::Sender<SystemPinBoxFuture<()>>>, + connection_add_channel_tx: Option<flume::Sender<SystemPinBoxFuture<()>>>, } impl core::fmt::Debug for ConnectionManagerInner { @@ -70,7 +70,7 @@ impl ConnectionManager { pub async fn startup(&self) { trace!("startup connection manager"); let mut inner = self.arc.inner.lock().await; - let cac = async_channel::bounded(CONNECTION_PROCESSOR_CHANNEL_SIZE); // xxx move to config + let cac = flume::bounded(CONNECTION_PROCESSOR_CHANNEL_SIZE); // xxx move to config inner.connection_add_channel_tx = Some(cac.0); let rx = cac.1.clone(); let this = self.clone(); @@ -196,12 +196,12 @@ impl ConnectionManager { // Process connection oriented sockets in the background // This never terminates and must have its task cancelled once started // Task cancellation is performed by shutdown() by dropping the join handle - async fn connection_processor(self, rx: async_channel::Receiver<SystemPinBoxFuture<()>>) { + async fn connection_processor(self, rx: flume::Receiver<SystemPinBoxFuture<()>>) { let mut connection_futures: FuturesUnordered<SystemPinBoxFuture<()>> = FuturesUnordered::new(); loop { // Either process an existing connection, or receive a new one to add to our list - match select(connection_futures.next(), Box::pin(rx.recv())).await { + match select(connection_futures.next(), Box::pin(rx.recv_async())).await { Either::Left((x, _)) => { // Processed some connection to completion, or there are none left match x { @@ -210,7 +210,7 @@ impl ConnectionManager { } None => { // No connections to process, wait for one - match rx.recv().await { + match rx.recv_async().await { Ok(v) => { connection_futures.push(v); } diff --git a/veilid-core/src/intf/native/utils/network_interfaces/mod.rs b/veilid-core/src/intf/native/utils/network_interfaces/mod.rs index 0bc3481d..100aa6ac 100644 --- a/veilid-core/src/intf/native/utils/network_interfaces/mod.rs +++ b/veilid-core/src/intf/native/utils/network_interfaces/mod.rs @@ -329,14 +329,12 @@ impl NetworkInterfaces { // returns Ok(false) if refresh had no changes, Ok(true) if changes were present pub async fn refresh(&mut self) -> Result<bool, String> { self.valid = false; - eprintln!("a"); let last_interfaces = core::mem::take(&mut self.interfaces); let mut platform_support = PlatformSupport::new().map_err(logthru_net!())?; platform_support .get_interfaces(&mut self.interfaces) .await?; - eprintln!("b"); self.valid = true; @@ -344,8 +342,6 @@ impl NetworkInterfaces { if changed { trace!("NetworkInterfaces refreshed: {:#?}?", self); } - eprintln!("c"); -xxx investigate why things get stuck here. threading and dart issue with logging ? Ok(changed) } pub fn len(&self) -> usize { diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index ac326083..61ab221e 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -144,7 +144,7 @@ pub struct RPCProcessorInner { routing_table: RoutingTable, node_id: key::DHTKey, node_id_secret: key::DHTKeySecret, - send_channel: Option<async_channel::Sender<RPCMessage>>, + send_channel: Option<flume::Sender<RPCMessage>>, timeout: u64, max_route_hop_count: usize, waiting_rpc_table: BTreeMap<OperationId, EventualValue<RPCMessageReader>>, @@ -1246,8 +1246,8 @@ impl RPCProcessor { } } - async fn rpc_worker(self, receiver: async_channel::Receiver<RPCMessage>) { - while let Ok(msg) = receiver.recv().await { + async fn rpc_worker(self, receiver: flume::Receiver<RPCMessage>) { + while let Ok(msg) = receiver.recv_async().await { let _ = self .process_rpc_message(msg) .await @@ -1285,7 +1285,7 @@ impl RPCProcessor { } inner.timeout = timeout; inner.max_route_hop_count = max_route_hop_count; - let channel = async_channel::bounded(queue_size as usize); + let channel = flume::bounded(queue_size as usize); inner.send_channel = Some(channel.0.clone()); // spin up N workers diff --git a/veilid-core/src/xx/mod.rs b/veilid-core/src/xx/mod.rs index a8410f1a..488b149e 100644 --- a/veilid-core/src/xx/mod.rs +++ b/veilid-core/src/xx/mod.rs @@ -70,7 +70,6 @@ cfg_if! { pub use async_std::pin::Pin; pub use async_std::sync::Mutex as AsyncMutex; pub use async_std::sync::MutexGuard as AsyncMutexGuard; - pub use async_std::channel as async_channel; pub use std::net::{ SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, IpAddr, Ipv4Addr, Ipv6Addr }; pub type SystemPinBoxFuture<T> = PinBox<dyn Future<Output = T> + Send + 'static>; pub type SystemPinBoxFutureLifetime<'a, T> = PinBox<dyn Future<Output = T> + Send + 'a>;