From fdbb4c63972b824353af851d17f13c39e267ff77 Mon Sep 17 00:00:00 2001 From: John Smith Date: Mon, 27 Jun 2022 23:46:29 -0400 Subject: [PATCH] executor work --- Cargo.lock | 20 +- veilid-cli/Cargo.toml | 14 +- veilid-cli/src/client_api_connection.rs | 37 ++- veilid-cli/src/command_processor.rs | 22 +- veilid-cli/src/main.rs | 36 ++- veilid-cli/src/tools.rs | 40 +++ veilid-core/Cargo.toml | 11 +- veilid-core/src/api_tracing_layer.rs | 20 +- veilid-core/src/attachment_manager.rs | 6 +- veilid-core/src/core_context.rs | 6 +- veilid-core/src/dht/crypto.rs | 7 +- veilid-core/src/intf/mod.rs | 1 - veilid-core/src/intf/native/block_store.rs | 2 +- veilid-core/src/intf/native/system.rs | 119 +++++++-- veilid-core/src/intf/native/table_store.rs | 2 +- .../utils/network_interfaces/netlink.rs | 70 +++-- veilid-core/src/intf/table_db.rs | 2 +- veilid-core/src/intf/wasm/block_store.rs | 1 - veilid-core/src/intf/wasm/system.rs | 40 ++- veilid-core/src/intf/wasm/table_store.rs | 1 - veilid-core/src/lib.rs | 22 +- .../src/network_manager/connection_manager.rs | 4 +- veilid-core/src/network_manager/native/mod.rs | 8 +- .../src/network_manager/native/network_tcp.rs | 73 +++--- .../src/network_manager/native/network_udp.rs | 30 ++- .../network_manager/native/protocol/mod.rs | 18 +- .../native/protocol/sockets.rs | 18 +- .../network_manager/native/protocol/tcp.rs | 62 ++--- .../network_manager/native/protocol/udp.rs | 1 + .../src/network_manager/native/protocol/ws.rs | 87 ++++--- .../network_manager/native/start_protocols.rs | 4 - .../src/network_manager/network_connection.rs | 6 +- veilid-core/src/routing_table/bucket.rs | 2 +- veilid-core/src/routing_table/bucket_entry.rs | 2 +- veilid-core/src/routing_table/debug.rs | 4 +- veilid-core/src/routing_table/find_nodes.rs | 5 +- veilid-core/src/routing_table/mod.rs | 1 - veilid-core/src/routing_table/tasks.rs | 4 +- veilid-core/src/rpc_processor/mod.rs | 23 +- .../src/tests/common/test_host_interface.rs | 41 +-- .../src/tests/common/test_protected_store.rs | 1 - .../src/tests/common/test_table_store.rs | 1 - veilid-core/src/tests/native/mod.rs | 31 ++- .../tests/native/test_async_peek_stream.rs | 37 ++- veilid-core/src/veilid_api/mod.rs | 2 +- veilid-core/src/xx/async_peek_stream.rs | 15 +- veilid-core/src/xx/mod.rs | 18 +- veilid-core/src/xx/must_join_handle.rs | 37 ++- veilid-core/src/xx/must_join_single_future.rs | 7 +- veilid-core/src/xx/single_future.rs | 242 ------------------ veilid-core/src/xx/tick_task.rs | 4 +- veilid-flutter/rust/Cargo.toml | 9 +- veilid-server/Cargo.toml | 6 +- veilid-server/src/client_api.rs | 15 +- veilid-server/src/main.rs | 7 +- veilid-server/src/server.rs | 5 +- veilid-server/src/tools.rs | 48 ++++ veilid-server/src/unix.rs | 8 +- veilid-server/src/windows.rs | 1 + 59 files changed, 726 insertions(+), 640 deletions(-) create mode 100644 veilid-cli/src/tools.rs delete mode 100644 veilid-core/src/xx/single_future.rs create mode 100644 veilid-server/src/tools.rs diff --git a/Cargo.lock b/Cargo.lock index be457c5c..65f6433d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -366,7 +366,6 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a5c45a0dd44b7e6533ac4e7acc38ead1a3b39885f5bbb738140d30ea528abc7c" dependencies = [ - "async-std", "futures-io", "futures-util", "log", @@ -380,7 +379,6 @@ version = "0.17.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1b71b31561643aa8e7df3effe284fa83ab1a840e52294c5f4bd7bfd8b2becbb" dependencies = [ - "async-std", "async-tls", "futures-io", "futures-util", @@ -402,6 +400,7 @@ dependencies = [ "futures-util", "pin-project 1.0.10", "rustc_version", + "tokio", "wasm-bindgen-futures", ] @@ -2763,6 +2762,7 @@ dependencies = [ "futures", "libc", "log", + "tokio", ] [[package]] @@ -3040,6 +3040,8 @@ dependencies = [ "pin-project 1.0.10", "rand 0.8.5", "thiserror", + "tokio", + "tokio-stream", ] [[package]] @@ -3759,6 +3761,7 @@ dependencies = [ "netlink-proto", "nix 0.22.3", "thiserror", + "tokio", ] [[package]] @@ -4531,6 +4534,7 @@ checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" dependencies = [ "bytes 1.1.0", "futures-core", + "futures-io", "futures-sink", "log", "pin-project-lite", @@ -4786,6 +4790,7 @@ dependencies = [ "smallvec", "thiserror", "tinyvec", + "tokio", "url", ] @@ -4805,6 +4810,7 @@ dependencies = [ "resolv-conf", "smallvec", "thiserror", + "tokio", "trust-dns-proto", ] @@ -4983,7 +4989,6 @@ version = "0.1.0" dependencies = [ "async-std", "async-tungstenite 0.8.0", - "async_executors", "bugsalot", "capnp", "capnp-rpc", @@ -5004,6 +5009,8 @@ dependencies = [ "serde_derive", "serial_test", "thiserror", + "tokio", + "tokio-util 0.6.10", "veilid-core", ] @@ -5081,10 +5088,14 @@ dependencies = [ "static_assertions", "stop-token", "thiserror", + "tokio", + "tokio-stream", + "tokio-util 0.6.10", "tracing", "tracing-error", "tracing-subscriber", "tracing-wasm", + "trust-dns-resolver", "wasm-bindgen", "wasm-bindgen-futures", "wasm-bindgen-test", @@ -5104,7 +5115,6 @@ name = "veilid-flutter" version = "0.1.0" dependencies = [ "allo-isolate", - "async-std", "backtrace", "ffi-support", "futures", @@ -5117,6 +5127,7 @@ dependencies = [ "parking_lot 0.12.1", "serde", "serde_json", + "tokio", "tracing", "tracing-opentelemetry", "tracing-subscriber", @@ -5157,6 +5168,7 @@ dependencies = [ "serial_test", "signal-hook", "signal-hook-async-std", + "tokio", "tracing", "tracing-appender", "tracing-journald", diff --git a/veilid-cli/Cargo.toml b/veilid-cli/Cargo.toml index 49de2989..dd7f24c4 100644 --- a/veilid-cli/Cargo.toml +++ b/veilid-cli/Cargo.toml @@ -10,6 +10,11 @@ license = "LGPL-2.0-or-later OR MPL-2.0 OR (MIT AND BSD-3-Clause)" name = "veilid-cli" path = "src/main.rs" +[features] +default = [ "rt-tokio" ] +rt-async-std = [ "async-std", "veilid-core/rt-async-std" ] +rt-tokio = [ "tokio", "tokio-util", "veilid-core/rt-tokio" ] + [target.'cfg(unix)'.dependencies] cursive = { path = "../external/cursive/cursive", default-features = false, features = ["ncurses-backend", "toml", "rt-async-std"]} @@ -17,9 +22,10 @@ cursive = { path = "../external/cursive/cursive", default-features = false, feat cursive = { path = "../external/cursive/cursive", default-features = false, features = ["crossterm-backend", "toml", "rt-async-std"]} [dependencies] -async-std = { version = "^1.9", features = ["unstable", "attributes"] } -async-tungstenite = { version = "^0.8", features = ["async-std-runtime"] } -async_executors = { version = "^0", default-features = false, features = [ "async_std" ]} +async-std = { version = "^1.9", features = ["unstable", "attributes"], optional = true } +tokio = { version = "^1", features = ["full"], optional = true } +tokio-util = { version = "^0", features = ["compat"], optional = true} +async-tungstenite = { version = "^0.8" } cursive-flexi-logger-view = { path = "../external/cursive-flexi-logger-view" } cursive_buffered_backend = { path = "../external/cursive_buffered_backend" } # cursive-multiplex = "0.4.0" @@ -41,7 +47,7 @@ bugsalot = "^0" flexi_logger = { version = "^0", features = ["use_chrono_for_offset"] } thiserror = "^1" crossbeam-channel = "^0" -veilid-core = { path = "../veilid-core" } +veilid-core = { path = "../veilid-core", default_features = false} [dev-dependencies] serial_test = "^0" diff --git a/veilid-cli/src/client_api_connection.rs b/veilid-cli/src/client_api_connection.rs index 20f3cce6..e12a8cc3 100644 --- a/veilid-cli/src/client_api_connection.rs +++ b/veilid-cli/src/client_api_connection.rs @@ -1,9 +1,8 @@ use crate::command_processor::*; +use crate::tools::*; use crate::veilid_client_capnp::*; -use async_executors::{AsyncStd, LocalSpawnHandleExt}; use capnp::capability::Promise; use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, Disconnector, RpcSystem}; -use futures::io::AsyncReadExt; use std::cell::RefCell; use std::net::SocketAddr; use std::rc::Rc; @@ -140,9 +139,7 @@ impl ClientApiConnection { )); } - let rpc_jh = AsyncStd - .spawn_handle_local(rpc_system) - .map_err(|e| format!("failed to spawn rpc system: {}", e))?; + let rpc_jh = spawn_local(rpc_system); // Send the request and get the state object and the registration object let response = request @@ -173,23 +170,43 @@ impl ClientApiConnection { // object mapping from the server which we need for the update backchannel // Wait until rpc system completion or disconnect was requested - rpc_jh - .await - .map_err(|e| format!("client RPC system error: {}", e)) + + cfg_if! { + if #[cfg(feature="rt-async-std")] { + rpc_jh + .await + .map_err(|e| format!("client RPC system error: {}", e)) + } else if #[cfg(feature="rt-tokio")] { + rpc_jh + .await + .map_err(|e| format!("join error: {}", e))? + .map_err(|e| format!("client RPC system error: {}", e)) + } + } } async fn handle_connection(&mut self) -> Result<(), String> { trace!("ClientApiConnection::handle_connection"); let connect_addr = self.inner.borrow().connect_addr.unwrap(); // Connect the TCP socket - let stream = async_std::net::TcpStream::connect(connect_addr) + let stream = TcpStream::connect(connect_addr) .await .map_err(map_to_string)?; // If it succeed, disable nagle algorithm stream.set_nodelay(true).map_err(map_to_string)?; // Create the VAT network - let (reader, writer) = stream.split(); + + cfg_if! { + if #[cfg(feature="rt-async-std")] { + let (reader, writer) = stream.split(); + } else if #[cfg(feature="rt-tokio")] { + let (reader, writer) = stream.into_split(); + let reader = reader.compat(); + let writer = writer.compat_write(); + } + } + let rpc_network = Box::new(twoparty::VatNetwork::new( reader, writer, diff --git a/veilid-cli/src/command_processor.rs b/veilid-cli/src/command_processor.rs index e05c9e11..3e160560 100644 --- a/veilid-cli/src/command_processor.rs +++ b/veilid-cli/src/command_processor.rs @@ -1,7 +1,7 @@ use crate::client_api_connection::*; use crate::settings::Settings; +use crate::tools::*; use crate::ui::*; -use async_std::prelude::FutureExt; use log::*; use std::cell::*; use std::net::SocketAddr; @@ -116,7 +116,7 @@ debug - send a debugging command to the Veilid server trace!("CommandProcessor::cmd_shutdown"); let mut capi = self.capi(); let ui = self.ui(); - async_std::task::spawn_local(async move { + spawn_detached_local(async move { if let Err(e) = capi.server_shutdown().await { error!("Server command 'shutdown' failed to execute: {}", e); } @@ -129,7 +129,7 @@ debug - send a debugging command to the Veilid server trace!("CommandProcessor::cmd_attach"); let mut capi = self.capi(); let ui = self.ui(); - async_std::task::spawn_local(async move { + spawn_detached_local(async move { if let Err(e) = capi.server_attach().await { error!("Server command 'attach' failed to execute: {}", e); } @@ -142,7 +142,7 @@ debug - send a debugging command to the Veilid server trace!("CommandProcessor::cmd_detach"); let mut capi = self.capi(); let ui = self.ui(); - async_std::task::spawn_local(async move { + spawn_detached_local(async move { if let Err(e) = capi.server_detach().await { error!("Server command 'detach' failed to execute: {}", e); } @@ -155,7 +155,7 @@ debug - send a debugging command to the Veilid server trace!("CommandProcessor::cmd_disconnect"); let mut capi = self.capi(); let ui = self.ui(); - async_std::task::spawn_local(async move { + spawn_detached_local(async move { capi.disconnect().await; ui.send_callback(callback); }); @@ -166,7 +166,7 @@ debug - send a debugging command to the Veilid server trace!("CommandProcessor::cmd_debug"); let mut capi = self.capi(); let ui = self.ui(); - async_std::task::spawn_local(async move { + spawn_detached_local(async move { match capi.server_debug(rest.unwrap_or_default()).await { Ok(output) => ui.display_string_dialog("Debug Output", output, callback), Err(e) => { @@ -248,9 +248,7 @@ debug - send a debugging command to the Veilid server debug!("Connection lost, retrying in 2 seconds"); { let waker = self.inner_mut().connection_waker.instance_clone(()); - waker - .race(async_std::task::sleep(Duration::from_millis(2000))) - .await; + let _ = timeout(Duration::from_millis(2000), waker).await; } self.inner_mut().connection_waker.reset(); first = false; @@ -306,7 +304,7 @@ debug - send a debugging command to the Veilid server // pub fn stop_connection(&mut self) { // self.inner_mut().reconnect = false; // let mut capi = self.capi().clone(); - // async_std::task::spawn_local(async move { + // spawn_detached(async move { // capi.disconnect().await; // }); // } @@ -327,7 +325,7 @@ debug - send a debugging command to the Veilid server trace!("CommandProcessor::attach"); let mut capi = self.capi(); - async_std::task::spawn_local(async move { + spawn_detached_local(async move { if let Err(e) = capi.server_attach().await { error!("Server command 'attach' failed to execute: {}", e); } @@ -338,7 +336,7 @@ debug - send a debugging command to the Veilid server trace!("CommandProcessor::detach"); let mut capi = self.capi(); - async_std::task::spawn_local(async move { + spawn_detached_local(async move { if let Err(e) = capi.server_detach().await { error!("Server command 'detach' failed to execute: {}", e); } diff --git a/veilid-cli/src/main.rs b/veilid-cli/src/main.rs index 79381ce6..c2c664a7 100644 --- a/veilid-cli/src/main.rs +++ b/veilid-cli/src/main.rs @@ -3,16 +3,17 @@ use veilid_core::xx::*; -use async_std::prelude::*; use clap::{Arg, ColorChoice, Command}; use flexi_logger::*; use std::ffi::OsStr; use std::net::ToSocketAddrs; use std::path::Path; +use tools::*; mod client_api_connection; mod command_processor; mod settings; +mod tools; mod ui; #[allow(clippy::all)] @@ -60,8 +61,7 @@ fn parse_command_line(default_config_path: &OsStr) -> Result Result<(), String> { +fn main() -> Result<(), String> { // Get command line options let default_config_path = settings::Settings::get_default_config_path(); let matches = parse_command_line(default_config_path.as_os_str())?; @@ -170,17 +170,29 @@ async fn main() -> Result<(), String> { comproc.set_server_address(server_addr); let mut comproc2 = comproc.clone(); let connection_future = comproc.connection_manager(); - // Start UI - let ui_future = async_std::task::spawn_local(async move { - sivui.run_async().await; - // When UI quits, close connection and command processor cleanly - comproc2.quit(); - capi.disconnect().await; + // Start async + block_on(async move { + // Start UI + let ui_future = async move { + sivui.run_async().await; + + // When UI quits, close connection and command processor cleanly + comproc2.quit(); + capi.disconnect().await; + }; + + cfg_if! { + if #[cfg(feature="rt-async-std")] { + use async_std::prelude::*; + // Wait for ui and connection to complete + let _ = ui_future.join(connection_future).await; + } else if #[cfg(feature="rt-tokio")] { + // Wait for ui and connection to complete + let _ = tokio::join!(ui_future, connection_future); + } + } }); - // Wait for ui and connection to complete - ui_future.join(connection_future).await; - Ok(()) } diff --git a/veilid-cli/src/tools.rs b/veilid-cli/src/tools.rs new file mode 100644 index 00000000..eb25ad23 --- /dev/null +++ b/veilid-cli/src/tools.rs @@ -0,0 +1,40 @@ +use cfg_if::*; +use core::future::Future; + +cfg_if! { + if #[cfg(feature="rt-async-std")] { + pub use async_std::task::JoinHandle; + pub use async_std::net::TcpStream; + pub use async_std::future::TimeoutError; + pub fn spawn_local + 'static, T: 'static>(f: F) -> JoinHandle { + async_std::task::spawn_local(f) + } + pub fn spawn_detached_local + 'static, T: 'static>(f: F) { + let _ = async_std::task::spawn_local(f); + } + pub use async_std::task::sleep; + pub use async_std::future::timeout; + pub fn block_on, T>(f: F) -> T { + async_std::task::block_on(f) + } + } else if #[cfg(feature="rt-tokio")] { + pub use tokio::task::JoinHandle; + pub use tokio::net::TcpStream; + pub use tokio_util::compat::*; + pub use tokio::time::error::Elapsed as TimeoutError; + pub fn spawn_local + 'static, T: 'static>(f: F) -> JoinHandle { + tokio::task::spawn_local(f) + } + pub fn spawn_detached_local + 'static, T: 'static>(f: F) { + let _ = tokio::task::spawn_local(f); + } + pub use tokio::time::sleep; + pub use tokio::time::timeout; + pub fn block_on, T>(f: F) -> T { + let rt = tokio::runtime::Runtime::new().unwrap(); + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, f) + } + + } +} diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index 7314d8d3..54cf5810 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -10,11 +10,13 @@ license = "LGPL-2.0-or-later OR MPL-2.0 OR (MIT AND BSD-3-Clause)" crate-type = ["cdylib", "staticlib", "rlib"] [features] -default = [ "rt-async-std" ] +default = [] +rt-async-std = [ "async-std", "async-std-resolver", "async_executors/async_std", "rtnetlink?/smol_socket" ] +rt-tokio = [ "tokio", "tokio-util", "tokio-stream", "trust-dns-resolver/tokio-runtime", "async_executors/tokio_tp", "async_executors/tokio_io", "async_executors/tokio_timer", "rtnetlink?/tokio_socket" ] + android_tests = [] ios_tests = [ "simplelog", "backtrace" ] tracking = [ "backtrace" ] -rt-async-std = [ "async-std", "async-tungstenite/async-std-runtime", "async-std-resolver", "async_executors/async_std", "rtnetlink?/smol_socket" ] [dependencies] tracing = { version = "^0", features = ["log", "attributes"] } @@ -59,9 +61,13 @@ rtnetlink = { version = "^0", default-features = false, optional = true } # Linux, Windows, Mac, iOS, Android [target.'cfg(not(target_arch = "wasm32"))'.dependencies] async-std = { version = "^1", features = ["unstable"], optional = true} +tokio = { version = "^1", features = ["full"], optional = true} +tokio-util = { version = "^0", features = ["compat"], optional = true} +tokio-stream = { version = "^0", features = ["net"], optional = true} async-io = { version = "^1" } async-tungstenite = { version = "^0", features = ["async-tls"] } async-std-resolver = { version = "^0", optional = true } +trust-dns-resolver = { version = "^0", optional = true } maplit = "^1" config = { version = "^0", features = ["yaml"] } keyring-manager = { path = "../external/keyring-manager" } @@ -77,7 +83,6 @@ data-encoding = { version = "^2" } serde = { version = "^1", features = ["derive" ] } serde_cbor = { version = "^0" } serde_json = { version = "^1" } -async_executors = { version = "^0", default-features = false } socket2 = "^0" bugsalot = "^0" chrono = "^0" diff --git a/veilid-core/src/api_tracing_layer.rs b/veilid-core/src/api_tracing_layer.rs index a75cffc0..8c3551d4 100644 --- a/veilid-core/src/api_tracing_layer.rs +++ b/veilid-core/src/api_tracing_layer.rs @@ -97,24 +97,6 @@ impl ApiTracingLayer { } } -fn display_current_thread_id() -> String { - cfg_if! { - if #[cfg(target_arch = "wasm32")] { - "".to_owned() - } else { - format!("({}:{:?})", - if let Some(n) = async_std::task::current().name() { - n.to_string() - } - else { - async_std::task::current().id().to_string() - }, - std::thread::current().id() - ) - } - } -} - impl registry::LookupSpan<'a>> Layer for ApiTracingLayer { fn enabled(&self, metadata: &tracing::Metadata<'_>, _: layer::Context<'_, S>) -> bool { if let Some(inner) = &mut *self.inner.lock() { @@ -188,7 +170,7 @@ impl registry::LookupSpan<'a>> Layer for ApiTracingLa .and_then(|file| meta.line().map(|ln| format!("{}:{}", file, ln))) .unwrap_or_default(); - let message = format!("{}{} {}", origin, display_current_thread_id(), recorder); + let message = format!("{} {}", origin, recorder); (inner.update_callback)(VeilidUpdate::Log(VeilidStateLog { log_level, diff --git a/veilid-core/src/attachment_manager.rs b/veilid-core/src/attachment_manager.rs index 2bd24d7c..cc142732 100644 --- a/veilid-core/src/attachment_manager.rs +++ b/veilid-core/src/attachment_manager.rs @@ -1,6 +1,5 @@ use crate::callback_state_machine::*; use crate::dht::Crypto; -use crate::intf::*; use crate::network_manager::*; use crate::routing_table::*; use crate::xx::*; @@ -306,9 +305,8 @@ impl AttachmentManager { // Create long-running connection maintenance routine let this = self.clone(); self.inner.lock().maintain_peers = true; - self.inner.lock().attachment_maintainer_jh = Some(MustJoinHandle::new(intf::spawn( - this.attachment_maintainer(), - ))); + self.inner.lock().attachment_maintainer_jh = + Some(intf::spawn(this.attachment_maintainer())); } #[instrument(level = "trace", skip(self))] diff --git a/veilid-core/src/core_context.rs b/veilid-core/src/core_context.rs index 387fb4b7..bf86e6c9 100644 --- a/veilid-core/src/core_context.rs +++ b/veilid-core/src/core_context.rs @@ -1,7 +1,6 @@ use crate::api_tracing_layer::*; use crate::attachment_manager::*; use crate::dht::Crypto; -use crate::intf::*; use crate::veilid_api::*; use crate::veilid_config::*; use crate::xx::*; @@ -10,7 +9,6 @@ cfg_if! { if #[cfg(target_arch = "wasm32")] { pub type UpdateCallback = Arc; } else { - pub type UpdateCallback = Arc; } } @@ -255,7 +253,9 @@ impl VeilidCoreContext { ///////////////////////////////////////////////////////////////////////////// -static INITIALIZED: AsyncMutex = AsyncMutex::new(false); +lazy_static::lazy_static! { + static ref INITIALIZED: AsyncMutex = AsyncMutex::new(false); +} #[instrument(err, skip_all)] pub async fn api_startup( diff --git a/veilid-core/src/dht/crypto.rs b/veilid-core/src/dht/crypto.rs index fc817cdb..59840591 100644 --- a/veilid-core/src/dht/crypto.rs +++ b/veilid-core/src/dht/crypto.rs @@ -1,5 +1,4 @@ use super::key::*; -use crate::intf::*; use crate::xx::*; use crate::*; use chacha20::cipher::{KeyIvInit, StreamCipher}; @@ -124,7 +123,7 @@ impl Crypto { // Schedule flushing let this = self.clone(); - let flush_future = interval(60000, move || { + let flush_future = intf::interval(60000, move || { let this = this.clone(); async move { if let Err(e) = this.flush().await { @@ -214,13 +213,13 @@ impl Crypto { pub fn get_random_nonce() -> Nonce { let mut nonce = [0u8; 24]; - random_bytes(&mut nonce).unwrap(); + intf::random_bytes(&mut nonce).unwrap(); nonce } pub fn get_random_secret() -> SharedSecret { let mut s = [0u8; 32]; - random_bytes(&mut s).unwrap(); + intf::random_bytes(&mut s).unwrap(); s } diff --git a/veilid-core/src/intf/mod.rs b/veilid-core/src/intf/mod.rs index 67774dfc..842e5351 100644 --- a/veilid-core/src/intf/mod.rs +++ b/veilid-core/src/intf/mod.rs @@ -1,5 +1,4 @@ mod table_db; -use crate::xx::*; #[cfg(target_arch = "wasm32")] mod wasm; diff --git a/veilid-core/src/intf/native/block_store.rs b/veilid-core/src/intf/native/block_store.rs index f8ef955c..5d8bd94f 100644 --- a/veilid-core/src/intf/native/block_store.rs +++ b/veilid-core/src/intf/native/block_store.rs @@ -1,4 +1,4 @@ -use crate::intf::*; +use crate::xx::*; use crate::*; struct BlockStoreInner { diff --git a/veilid-core/src/intf/native/system.rs b/veilid-core/src/intf/native/system.rs index 56a27a43..eb046681 100644 --- a/veilid-core/src/intf/native/system.rs +++ b/veilid-core/src/intf/native/system.rs @@ -1,14 +1,33 @@ #![allow(dead_code)] use crate::xx::*; -pub use async_executors::JoinHandle; -use async_executors::{AsyncStd, LocalSpawnHandleExt, SpawnHandleExt}; -use async_std_resolver::{config, resolver, resolver_from_system_conf, AsyncStdResolver}; +cfg_if! { + if #[cfg(feature="rt-async-std")] { + use async_std_resolver::{config, resolver, resolver_from_system_conf, AsyncStdResolver as AsyncResolver}; + } else if #[cfg(feature="rt-tokio")] { + use trust_dns_resolver::{config, TokioAsyncResolver as AsyncResolver, error::ResolveError}; + + pub async fn resolver( + config: config::ResolverConfig, + options: config::ResolverOpts, + ) -> Result { + AsyncResolver::tokio(config, options) + } + + /// Constructs a new async-std based Resolver with the system configuration. + /// + /// This will use `/etc/resolv.conf` on Unix OSes and the registry on Windows. + #[cfg(any(unix, target_os = "windows"))] + pub async fn resolver_from_system_conf() -> Result { + AsyncResolver::tokio_from_system_conf() + } + } +} use rand::prelude::*; use std::time::{Duration, SystemTime, UNIX_EPOCH}; lazy_static::lazy_static! { - static ref RESOLVER: Arc>> = Arc::new(AsyncMutex::new(None)); + static ref RESOLVER: Arc>> = Arc::new(AsyncMutex::new(None)); } pub fn get_timestamp() -> u64 { @@ -40,9 +59,21 @@ pub fn get_random_u64() -> u64 { pub async fn sleep(millis: u32) { if millis == 0 { - async_std::task::yield_now().await; + cfg_if! { + if #[cfg(feature="rt-async-std")] { + async_std::task::yield_now().await; + } else if #[cfg(feature="rt-tokio")] { + tokio::task::yield_now().await; + } + } } else { - async_std::task::sleep(Duration::from_millis(u64::from(millis))).await; + cfg_if! { + if #[cfg(feature="rt-async-std")] { + async_std::task::sleep(Duration::from_millis(u64::from(millis))).await; + } else if #[cfg(feature="rt-tokio")] { + tokio::time::sleep(Duration::from_millis(u64::from(millis))).await; + } + } } } @@ -52,22 +83,64 @@ pub fn system_boxed<'a, Out>( Box::pin(future) } -pub fn spawn(future: impl Future + Send + 'static) -> JoinHandle +pub fn spawn(future: impl Future + Send + 'static) -> MustJoinHandle where Out: Send + 'static, { - AsyncStd - .spawn_handle(future) - .expect("async-std spawn should never error out") + cfg_if! { + if #[cfg(feature="rt-async-std")] { + MustJoinHandle::new(async_std::task::spawn(future)) + } else if #[cfg(feature="rt-tokio")] { + MustJoinHandle::new(tokio::task::spawn(future)) + } + } } -pub fn spawn_local(future: impl Future + 'static) -> JoinHandle +pub fn spawn_local(future: impl Future + 'static) -> MustJoinHandle where Out: 'static, { - AsyncStd - .spawn_handle_local(future) - .expect("async-std spawn_local should never error out") + cfg_if! { + if #[cfg(feature="rt-async-std")] { + MustJoinHandle::new(async_std::task::spawn_local(future)) + } else if #[cfg(feature="rt-tokio")] { + MustJoinHandle::new(tokio::task::spawn_local(future)) + } + } +} + +pub fn spawn_with_local_set( + future: impl Future + Send + 'static, +) -> MustJoinHandle +where + Out: Send + 'static, +{ + cfg_if! { + if #[cfg(feature="rt-async-std")] { + spawn(future) + } else if #[cfg(feature="rt-tokio")] { + MustJoinHandle::new(tokio::task::spawn_blocking(move || { + let rt = tokio::runtime::Handle::current(); + rt.block_on(async { + let local = tokio::task::LocalSet::new(); + local.run_until(future).await + }) + })) + } + } +} + +pub fn spawn_detached(future: impl Future + Send + 'static) +where + Out: Send + 'static, +{ + cfg_if! { + if #[cfg(feature="rt-async-std")] { + drop(async_std::task::spawn(future)); + } else if #[cfg(feature="rt-tokio")] { + drop(tokio::task::spawn(future)); + } + } } pub fn interval(freq_ms: u32, callback: F) -> SystemPinBoxFuture<()> @@ -90,13 +163,25 @@ where }) } -pub use async_std::future::TimeoutError; +cfg_if! { + if #[cfg(feature="rt-async-std")] { + pub use async_std::future::TimeoutError; + } else if #[cfg(feature="rt-tokio")] { + pub use tokio::time::error::Elapsed as TimeoutError; + } +} pub async fn timeout(dur_ms: u32, f: F) -> Result where F: Future, { - async_std::future::timeout(Duration::from_millis(dur_ms as u64), f).await + cfg_if! { + if #[cfg(feature="rt-async-std")] { + async_std::future::timeout(Duration::from_millis(dur_ms as u64), f).await + } else if #[cfg(feature="rt-tokio")] { + tokio::time::timeout(Duration::from_millis(dur_ms as u64), f).await + } + } } pub fn get_concurrency() -> u32 { @@ -128,7 +213,7 @@ where } */ -async fn get_resolver() -> Result { +async fn get_resolver() -> Result { let mut resolver_lock = RESOLVER.lock().await; if let Some(r) = &*resolver_lock { Ok(r.clone()) diff --git a/veilid-core/src/intf/native/table_store.rs b/veilid-core/src/intf/native/table_store.rs index 0766d439..2038abf5 100644 --- a/veilid-core/src/intf/native/table_store.rs +++ b/veilid-core/src/intf/native/table_store.rs @@ -1,5 +1,5 @@ use crate::intf::table_db::*; -use crate::intf::*; +use crate::xx::*; use crate::*; use keyvaluedb_sqlite::*; use std::path::PathBuf; diff --git a/veilid-core/src/intf/native/utils/network_interfaces/netlink.rs b/veilid-core/src/intf/native/utils/network_interfaces/netlink.rs index 9efb91c4..d43b2a6c 100644 --- a/veilid-core/src/intf/native/utils/network_interfaces/netlink.rs +++ b/veilid-core/src/intf/native/utils/network_interfaces/netlink.rs @@ -11,7 +11,13 @@ use rtnetlink::packet::{ nlas::address::Nla, AddressMessage, AF_INET, AF_INET6, IFA_F_DADFAILED, IFA_F_DEPRECATED, IFA_F_OPTIMISTIC, IFA_F_PERMANENT, IFA_F_TEMPORARY, IFA_F_TENTATIVE, }; -use rtnetlink::{new_connection_with_socket, sys::SmolSocket, Handle, IpVersion}; +cfg_if! { + if #[cfg(feature="rt-async-std")] { + use rtnetlink::{new_connection_with_socket, sys::SmolSocket as RTNetLinkSocket, Handle, IpVersion}; + } else if #[cfg(feature="rt-tokio")] { + use rtnetlink::{new_connection_with_socket, sys::TokioSocket as RTNetLinkSocket, Handle, IpVersion}; + } +} use std::convert::TryInto; use std::ffi::CStr; use std::io; @@ -54,24 +60,16 @@ fn flags_to_address_flags(flags: u32) -> AddressFlags { } pub struct PlatformSupportNetlink { - _connection_jh: intf::JoinHandle<()>, - handle: Handle, + connection_jh: Option>, + handle: Option, default_route_interfaces: BTreeSet, } impl PlatformSupportNetlink { pub fn new() -> Result { - // Get the netlink connection - let (connection, handle, _) = new_connection_with_socket::() - .map_err(map_to_string) - .map_err(logthru_net!(error))?; - - // Spawn a connection handler - let _connection_jh = intf::spawn(connection); - Ok(PlatformSupportNetlink { - _connection_jh, - handle, + connection_jh: None, + handle: None, default_route_interfaces: BTreeSet::new(), }) } @@ -79,7 +77,13 @@ impl PlatformSupportNetlink { // Figure out which interfaces have default routes async fn refresh_default_route_interfaces(&mut self) -> Result<(), String> { self.default_route_interfaces.clear(); - let mut routesv4 = self.handle.route().get(IpVersion::V4).execute(); + let mut routesv4 = self + .handle + .as_ref() + .unwrap() + .route() + .get(IpVersion::V4) + .execute(); while let Some(routev4) = routesv4.try_next().await.unwrap_or_default() { if let Some(index) = routev4.output_interface() { //println!("*** ipv4 route: {:#?}", routev4); @@ -88,7 +92,13 @@ impl PlatformSupportNetlink { } } } - let mut routesv6 = self.handle.route().get(IpVersion::V6).execute(); + let mut routesv6 = self + .handle + .as_ref() + .unwrap() + .route() + .get(IpVersion::V6) + .execute(); while let Some(routev6) = routesv6.try_next().await.unwrap_or_default() { if let Some(index) = routev6.output_interface() { //println!("*** ipv6 route: {:#?}", routev6); @@ -228,7 +238,7 @@ impl PlatformSupportNetlink { )) } - pub async fn get_interfaces( + async fn get_interfaces_internal( &mut self, interfaces: &mut BTreeMap, ) -> Result<(), String> { @@ -242,7 +252,7 @@ impl PlatformSupportNetlink { // Ask for all the addresses we have let mut names = BTreeMap::::new(); - let mut addresses = self.handle.address().get().execute(); + let mut addresses = self.handle.as_ref().unwrap().address().get().execute(); while let Some(msg) = addresses .try_next() .await @@ -302,4 +312,30 @@ impl PlatformSupportNetlink { Ok(()) } + + pub async fn get_interfaces( + &mut self, + interfaces: &mut BTreeMap, + ) -> Result<(), String> { + // Get the netlink connection + let (connection, handle, _) = new_connection_with_socket::() + .map_err(map_to_string) + .map_err(logthru_net!(error))?; + + // Spawn a connection handler + let connection_jh = intf::spawn(connection); + + // Save the connection + self.connection_jh = Some(connection_jh); + self.handle = Some(handle); + + // Do the work + let out = self.get_interfaces_internal(interfaces).await; + + // Clean up connection + drop(self.handle.take()); + self.connection_jh.take().unwrap().abort().await; + + out + } } diff --git a/veilid-core/src/intf/table_db.rs b/veilid-core/src/intf/table_db.rs index 653f970e..f6b4f844 100644 --- a/veilid-core/src/intf/table_db.rs +++ b/veilid-core/src/intf/table_db.rs @@ -1,5 +1,5 @@ -use crate::intf::*; use crate::xx::*; +use crate::*; use serde::{Deserialize, Serialize}; cfg_if! { diff --git a/veilid-core/src/intf/wasm/block_store.rs b/veilid-core/src/intf/wasm/block_store.rs index 9fc70c8e..d4873b2f 100644 --- a/veilid-core/src/intf/wasm/block_store.rs +++ b/veilid-core/src/intf/wasm/block_store.rs @@ -1,5 +1,4 @@ -use crate::intf::*; use crate::*; struct BlockStoreInner { diff --git a/veilid-core/src/intf/wasm/system.rs b/veilid-core/src/intf/wasm/system.rs index 5c3eba05..5d1c08d7 100644 --- a/veilid-core/src/intf/wasm/system.rs +++ b/veilid-core/src/intf/wasm/system.rs @@ -1,7 +1,6 @@ use super::utils; use crate::xx::*; use crate::*; -pub use async_executors::JoinHandle; use async_executors::{Bindgen, LocalSpawnHandleExt /*, SpawnHandleExt*/}; use core::fmt; use futures_util::future::{select, Either}; @@ -105,23 +104,42 @@ pub fn system_boxed<'a, Out>( Box::pin(future) } -pub fn spawn(future: impl Future + 'static) -> JoinHandle +pub fn spawn(future: impl Future + 'static) -> MustJoinHandle +where + Out: Send + 'static, +{ + MustJoinHandle::new(Bindgen + .spawn_handle_local(future) + .expect("wasm-bindgen-futures spawn should never error out")) +} + +pub fn spawn_local(future: impl Future + 'static) -> MustJoinHandle +where + Out: 'static, +{ + MustJoinHandle::new(Bindgen + .spawn_handle_local(future) + .expect("wasm-bindgen-futures spawn_local should never error out")) +} + +pub fn spawn_with_local_set( + future: impl Future + 'static, +) -> MustJoinHandle +where + Out: Send + 'static, +{ + spawn(future) +} + +pub fn spawn_detached(future: impl Future + 'static) where Out: Send + 'static, { Bindgen .spawn_handle_local(future) - .expect("wasm-bindgen-futures spawn should never error out") + .expect("wasm-bindgen-futures spawn_local should never error out").detach() } -pub fn spawn_local(future: impl Future + 'static) -> JoinHandle -where - Out: 'static, -{ - Bindgen - .spawn_handle_local(future) - .expect("wasm-bindgen-futures spawn_local should never error out") -} pub fn interval(freq_ms: u32, callback: F) -> SystemPinBoxFuture<()> where diff --git a/veilid-core/src/intf/wasm/table_store.rs b/veilid-core/src/intf/wasm/table_store.rs index 726b0203..03874d5a 100644 --- a/veilid-core/src/intf/wasm/table_store.rs +++ b/veilid-core/src/intf/wasm/table_store.rs @@ -1,5 +1,4 @@ use crate::intf::table_db::*; -use crate::intf::*; use crate::*; use keyvaluedb_web::*; diff --git a/veilid-core/src/lib.rs b/veilid-core/src/lib.rs index 5a0f9824..e302af03 100644 --- a/veilid-core/src/lib.rs +++ b/veilid-core/src/lib.rs @@ -1,9 +1,19 @@ #![deny(clippy::all)] #![deny(unused_must_use)] -#[cfg(all(feature = "rt-async-std", feature = "rt-tokio"))] -compile_error!( - "feature \"rt-async-std\" and feature \"rt-tokio\" cannot be enabled at the same time" -); + +cfg_if::cfg_if! { + if #[cfg(target_arch = "wasm32")] { + #[cfg(any(feature = "rt-async-std", feature = "rt-tokio"))] + compile_error!("features \"rt-async-std\" and \"rt-tokio\" can not be specified for WASM"); + } else { + #[cfg(all(feature = "rt-async-std", feature = "rt-tokio"))] + compile_error!( + "feature \"rt-async-std\" and feature \"rt-tokio\" cannot be enabled at the same time" + ); + #[cfg(not(any(feature = "rt-async-std", feature = "rt-tokio")))] + compile_error!("exactly one of feature \"rt-async-std\" or feature \"rt-tokio\" must be specified"); + } +} #[macro_use] extern crate alloc; @@ -51,7 +61,9 @@ pub fn veilid_version() -> (u32, u32, u32) { #[cfg(target_os = "android")] pub use intf::utils::android::{veilid_core_setup_android, veilid_core_setup_android_no_log}; -pub static DEFAULT_LOG_IGNORE_LIST: [&str; 10] = [ +pub static DEFAULT_LOG_IGNORE_LIST: [&str; 12] = [ + "mio", + "serial_test", "async_std", "async_io", "polling", diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index 544350c1..7afb6c3e 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -43,12 +43,12 @@ impl ConnectionManager { config: VeilidConfig, stop_source: StopSource, sender: flume::Sender, - async_processor_jh: JoinHandle<()>, + async_processor_jh: MustJoinHandle<()>, ) -> ConnectionManagerInner { ConnectionManagerInner { stop_source: Some(stop_source), sender: sender, - async_processor_jh: Some(MustJoinHandle::new(async_processor_jh)), + async_processor_jh: Some(async_processor_jh), connection_table: ConnectionTable::new(config), } } diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 803401f1..f8dfd788 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -5,7 +5,6 @@ mod protocol; mod start_protocols; use super::*; -use crate::intf::*; use crate::routing_table::*; use connection_manager::*; use network_tcp::*; @@ -15,10 +14,9 @@ use protocol::ws::WebsocketProtocolHandler; pub use protocol::*; use utils::network_interfaces::*; -use async_std::io; -use async_std::net::*; use async_tls::TlsAcceptor; use futures_util::StreamExt; +use std::io; // xxx: rustls ^0.20 //use rustls::{server::NoClientAuth, Certificate, PrivateKey, ServerConfig}; use rustls::{Certificate, NoClientAuth, PrivateKey, ServerConfig}; @@ -26,7 +24,6 @@ use rustls_pemfile::{certs, pkcs8_private_keys, rsa_private_keys}; use std::fs::File; use std::io::BufReader; use std::path::{Path, PathBuf}; -use std::time::Duration; ///////////////////////////////////////////////////////////////// @@ -558,12 +555,13 @@ impl Network { let mut inner = self.inner.lock(); // take the join handles out for h in inner.join_handles.drain(..) { + trace!("joining: {:?}", h); unord.push(h); } // Drop the stop drop(inner.stop_source.take()); } - debug!("stopping {} low level network tasks", unord.len()); + debug!("stopping {} low level network tasks", unord.len(),); // Wait for everything to stop while unord.next().await.is_some() {} diff --git a/veilid-core/src/network_manager/native/network_tcp.rs b/veilid-core/src/network_manager/native/network_tcp.rs index 85619d36..355baf3d 100644 --- a/veilid-core/src/network_manager/native/network_tcp.rs +++ b/veilid-core/src/network_manager/native/network_tcp.rs @@ -1,5 +1,4 @@ use super::*; -use crate::intf::*; use async_tls::TlsAcceptor; use sockets::*; use stop_token::future::FutureExt; @@ -43,46 +42,41 @@ impl Network { &self, tls_acceptor: &TlsAcceptor, stream: AsyncPeekStream, - tcp_stream: TcpStream, addr: SocketAddr, protocol_handlers: &[Box], - tls_connection_initial_timeout: u64, + tls_connection_initial_timeout_ms: u32, ) -> Result, String> { - let ts = tls_acceptor + let tls_stream = tls_acceptor .accept(stream) .await .map_err(map_to_string) .map_err(logthru_net!(debug "TLS stream failed handshake"))?; - let ps = AsyncPeekStream::new(CloneStream::new(ts)); + let ps = AsyncPeekStream::new(tls_stream); let mut first_packet = [0u8; PEEK_DETECT_LEN]; // Try the handlers but first get a chunk of data for them to process // Don't waste more than N seconds getting it though, in case someone // is trying to DoS us with a bunch of connections or something // read a chunk of the stream - io::timeout( - Duration::from_micros(tls_connection_initial_timeout), + intf::timeout( + tls_connection_initial_timeout_ms, ps.peek_exact(&mut first_packet), ) .await + .map_err(map_to_string)? .map_err(map_to_string)?; - self.try_handlers(ps, tcp_stream, addr, protocol_handlers) - .await + self.try_handlers(ps, addr, protocol_handlers).await } async fn try_handlers( &self, stream: AsyncPeekStream, - tcp_stream: TcpStream, addr: SocketAddr, protocol_accept_handlers: &[Box], ) -> Result, String> { for ah in protocol_accept_handlers.iter() { - if let Some(nc) = ah - .on_accept(stream.clone(), tcp_stream.clone(), addr) - .await? - { + if let Some(nc) = ah.on_accept(stream.clone(), addr).await? { return Ok(Some(nc)); } } @@ -92,11 +86,11 @@ impl Network { async fn tcp_acceptor( self, - tcp_stream: async_std::io::Result, + tcp_stream: io::Result, listener_state: Arc>, connection_manager: ConnectionManager, - connection_initial_timeout: u64, - tls_connection_initial_timeout: u64, + connection_initial_timeout_ms: u32, + tls_connection_initial_timeout_ms: u32, ) { let tcp_stream = match tcp_stream { Ok(v) => v, @@ -125,14 +119,16 @@ impl Network { log_net!("TCP connection from: {}", addr); // Create a stream we can peek on - let ps = AsyncPeekStream::new(tcp_stream.clone()); + #[cfg(feature = "rt-tokio")] + let tcp_stream = tcp_stream.compat(); + let ps = AsyncPeekStream::new(tcp_stream); ///////////////////////////////////////////////////////////// let mut first_packet = [0u8; PEEK_DETECT_LEN]; // read a chunk of the stream - if io::timeout( - Duration::from_micros(connection_initial_timeout), + if timeout( + connection_initial_timeout_ms, ps.peek_exact(&mut first_packet), ) .await @@ -153,14 +149,13 @@ impl Network { self.try_tls_handlers( ls.tls_acceptor.as_ref().unwrap(), ps, - tcp_stream, addr, &ls.tls_protocol_handlers, - tls_connection_initial_timeout, + tls_connection_initial_timeout_ms, ) .await } else { - self.try_handlers(ps, tcp_stream, addr, &ls.protocol_accept_handlers) + self.try_handlers(ps, addr, &ls.protocol_accept_handlers) .await }; @@ -192,11 +187,11 @@ impl Network { async fn spawn_socket_listener(&self, addr: SocketAddr) -> Result<(), String> { // Get config - let (connection_initial_timeout, tls_connection_initial_timeout) = { + let (connection_initial_timeout_ms, tls_connection_initial_timeout_ms) = { let c = self.config.get(); ( - ms_to_us(c.network.connection_initial_timeout_ms), - ms_to_us(c.network.tls.connection_initial_timeout_ms), + c.network.connection_initial_timeout_ms, + c.network.tls.connection_initial_timeout_ms, ) }; @@ -209,7 +204,13 @@ impl Network { // Make an async tcplistener from the socket2 socket let std_listener: std::net::TcpListener = socket.into(); - let listener = TcpListener::from(std_listener); + cfg_if! { + if #[cfg(feature="rt-async-std")] { + let listener = TcpListener::from(std_listener); + } else if #[cfg(feature="rt-tokio")] { + let listener = TcpListener::from_std(std_listener).map_err(map_to_string)?; + } + } debug!("spawn_socket_listener: binding successful to {}", addr); @@ -229,8 +230,16 @@ impl Network { let jh = spawn(async move { // moves listener object in and get incoming iterator // when this task exists, the listener will close the socket - let _ = listener - .incoming() + + cfg_if! { + if #[cfg(feature="rt-async-std")] { + let incoming_stream = listener.incoming(); + } else if #[cfg(feature="rt-tokio")] { + let incoming_stream = tokio_stream::wrappers::TcpListenerStream::new(listener); + } + } + + let _ = incoming_stream .for_each_concurrent(None, |tcp_stream| { let this = this.clone(); let listener_state = listener_state.clone(); @@ -240,8 +249,8 @@ impl Network { tcp_stream, listener_state, connection_manager, - connection_initial_timeout, - tls_connection_initial_timeout, + connection_initial_timeout_ms, + tls_connection_initial_timeout_ms, ) }) .timeout_at(stop_token) @@ -255,7 +264,7 @@ impl Network { //////////////////////////////////////////////////////////// // Add to join handles - self.add_to_join_handles(MustJoinHandle::new(jh)); + self.add_to_join_handles(jh); Ok(()) } diff --git a/veilid-core/src/network_manager/native/network_udp.rs b/veilid-core/src/network_manager/native/network_udp.rs index 940a0131..2c2a1bc2 100644 --- a/veilid-core/src/network_manager/native/network_udp.rs +++ b/veilid-core/src/network_manager/native/network_udp.rs @@ -23,7 +23,7 @@ impl Network { // Run thread task to process stream of messages let this = self.clone(); - let jh = spawn(async move { + let jh = spawn_with_local_set(async move { trace!("UDP listener task spawned"); // Collect all our protocol handlers into a vector @@ -49,7 +49,7 @@ impl Network { for ph in protocol_handlers { let network_manager = network_manager.clone(); let stop_token = stop_token.clone(); - let jh = spawn_local(async move { + let jh = intf::spawn_local(async move { let mut data = vec![0u8; 65536]; loop { @@ -112,7 +112,7 @@ impl Network { //////////////////////////////////////////////////////////// // Add to join handle - self.add_to_join_handles(MustJoinHandle::new(jh)); + self.add_to_join_handles(jh); } Ok(()) @@ -134,7 +134,13 @@ impl Network { // Make an async UdpSocket from the socket2 socket let std_udp_socket: std::net::UdpSocket = socket.into(); - let udp_socket = UdpSocket::from(std_udp_socket); + cfg_if! { + if #[cfg(feature="rt-async-std")] { + let udp_socket = UdpSocket::from(std_udp_socket); + } else if #[cfg(feature="rt-tokio")] { + let udp_socket = UdpSocket::from_std(std_udp_socket).map_err(map_to_string)?; + } + } let socket_arc = Arc::new(udp_socket); // Create protocol handler @@ -148,7 +154,13 @@ impl Network { if let Ok(socket) = new_bound_shared_udp_socket(socket_addr_v6) { // Make an async UdpSocket from the socket2 socket let std_udp_socket: std::net::UdpSocket = socket.into(); - let udp_socket = UdpSocket::from(std_udp_socket); + cfg_if! { + if #[cfg(feature="rt-async-std")] { + let udp_socket = UdpSocket::from(std_udp_socket); + } else if #[cfg(feature="rt-tokio")] { + let udp_socket = UdpSocket::from_std(std_udp_socket).map_err(map_to_string)?; + } + } let socket_arc = Arc::new(udp_socket); // Create protocol handler @@ -168,7 +180,13 @@ impl Network { // Make an async UdpSocket from the socket2 socket let std_udp_socket: std::net::UdpSocket = socket.into(); - let udp_socket = UdpSocket::from(std_udp_socket); + cfg_if! { + if #[cfg(feature="rt-async-std")] { + let udp_socket = UdpSocket::from(std_udp_socket); + } else if #[cfg(feature="rt-tokio")] { + let udp_socket = UdpSocket::from_std(std_udp_socket).map_err(map_to_string)?; + } + } let socket_arc = Arc::new(udp_socket); // Create protocol handler diff --git a/veilid-core/src/network_manager/native/protocol/mod.rs b/veilid-core/src/network_manager/native/protocol/mod.rs index e9ad74a6..26291f76 100644 --- a/veilid-core/src/network_manager/native/protocol/mod.rs +++ b/veilid-core/src/network_manager/native/protocol/mod.rs @@ -92,15 +92,15 @@ impl ProtocolNetworkConnection { } } - pub async fn close(&self) -> Result<(), String> { - match self { - Self::Dummy(d) => d.close(), - Self::RawTcp(t) => t.close().await, - Self::WsAccepted(w) => w.close().await, - Self::Ws(w) => w.close().await, - Self::Wss(w) => w.close().await, - } - } + // pub async fn close(&self) -> Result<(), String> { + // match self { + // Self::Dummy(d) => d.close(), + // Self::RawTcp(t) => t.close().await, + // Self::WsAccepted(w) => w.close().await, + // Self::Ws(w) => w.close().await, + // Self::Wss(w) => w.close().await, + // } + // } pub async fn send(&self, message: Vec) -> Result<(), String> { match self { diff --git a/veilid-core/src/network_manager/native/protocol/sockets.rs b/veilid-core/src/network_manager/native/protocol/sockets.rs index 2a691a9c..bd7aa6d2 100644 --- a/veilid-core/src/network_manager/native/protocol/sockets.rs +++ b/veilid-core/src/network_manager/native/protocol/sockets.rs @@ -1,7 +1,15 @@ use crate::xx::*; use crate::*; use async_io::Async; -use async_std::net::TcpStream; +cfg_if! { + if #[cfg(feature="rt-async-std")] { + pub use async_std::net::{TcpStream, TcpListener, Shutdown, UdpSocket}; + } else if #[cfg(feature="rt-tokio")] { + pub use tokio::net::{TcpStream, TcpListener, UdpSocket}; + pub use tokio_util::compat::*; + } +} + use socket2::{Domain, Protocol, SockAddr, Socket, Type}; cfg_if! { @@ -218,5 +226,11 @@ pub async fn nonblocking_connect(socket: Socket, addr: SocketAddr) -> std::io::R }?; // Convert back to inner and then return async version - Ok(TcpStream::from(async_stream.into_inner()?)) + cfg_if! { + if #[cfg(feature="rt-async-std")] { + Ok(TcpStream::from(async_stream.into_inner()?)) + } else if #[cfg(feature="rt-tokio")] { + Ok(TcpStream::from_std(async_stream.into_inner()?)?) + } + } } diff --git a/veilid-core/src/network_manager/native/protocol/tcp.rs b/veilid-core/src/network_manager/native/protocol/tcp.rs index 8318ae9a..de54e18c 100644 --- a/veilid-core/src/network_manager/native/protocol/tcp.rs +++ b/veilid-core/src/network_manager/native/protocol/tcp.rs @@ -5,7 +5,6 @@ use sockets::*; pub struct RawTcpNetworkConnection { descriptor: ConnectionDescriptor, stream: AsyncPeekStream, - tcp_stream: TcpStream, } impl fmt::Debug for RawTcpNetworkConnection { @@ -15,31 +14,33 @@ impl fmt::Debug for RawTcpNetworkConnection { } impl RawTcpNetworkConnection { - pub fn new( - descriptor: ConnectionDescriptor, - stream: AsyncPeekStream, - tcp_stream: TcpStream, - ) -> Self { - Self { - descriptor, - stream, - tcp_stream, - } + pub fn new(descriptor: ConnectionDescriptor, stream: AsyncPeekStream) -> Self { + Self { descriptor, stream } } pub fn descriptor(&self) -> ConnectionDescriptor { self.descriptor.clone() } - #[instrument(level = "trace", err, skip(self))] - pub async fn close(&self) -> Result<(), String> { - // Make an attempt to flush the stream - self.stream.clone().close().await.map_err(map_to_string)?; - // Then forcibly close the socket - self.tcp_stream - .shutdown(Shutdown::Both) - .map_err(map_to_string) - } + // #[instrument(level = "trace", err, skip(self))] + // pub async fn close(&mut self) -> Result<(), String> { + // // Make an attempt to flush the stream + // self.stream.clone().close().await.map_err(map_to_string)?; + // // Then shut down the write side of the socket to effect a clean close + // cfg_if! { + // if #[cfg(feature="rt-async-std")] { + // self.tcp_stream + // .shutdown(async_std::net::Shutdown::Write) + // .map_err(map_to_string) + // } else if #[cfg(feature="rt-tokio")] { + // use tokio::io::AsyncWriteExt; + // self.tcp_stream.get_mut() + // .shutdown() + // .await + // .map_err(map_to_string) + // } + // } + // } async fn send_internal(stream: &mut AsyncPeekStream, message: Vec) -> Result<(), String> { log_net!("sending TCP message of size {}", message.len()); @@ -115,11 +116,10 @@ impl RawTcpProtocolHandler { } } - #[instrument(level = "trace", err, skip(self, stream, tcp_stream))] + #[instrument(level = "trace", err, skip(self, stream))] async fn on_accept_async( self, stream: AsyncPeekStream, - tcp_stream: TcpStream, socket_addr: SocketAddr, ) -> Result, String> { log_net!("TCP: on_accept_async: enter"); @@ -139,7 +139,6 @@ impl RawTcpProtocolHandler { let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new( ConnectionDescriptor::new(peer_addr, SocketAddress::from_socket_addr(local_address)), stream, - tcp_stream, )); log_net!(debug "TCP: on_accept_async from: {}", socket_addr); @@ -173,7 +172,9 @@ impl RawTcpProtocolHandler { .local_addr() .map_err(map_to_string) .map_err(logthru_net!("could not get local address from TCP stream"))?; - let ps = AsyncPeekStream::new(ts.clone()); + #[cfg(feature = "rt-tokio")] + let ts = ts.compat(); + let ps = AsyncPeekStream::new(ts); // Wrap the stream in a network connection and return it let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new( @@ -182,7 +183,6 @@ impl RawTcpProtocolHandler { SocketAddress::from_socket_addr(actual_local_address), ), ps, - ts, )); Ok(conn) @@ -216,7 +216,10 @@ impl RawTcpProtocolHandler { // .local_addr() // .map_err(map_to_string) // .map_err(logthru_net!("could not get local address from TCP stream"))?; - let mut ps = AsyncPeekStream::new(ts.clone()); + + #[cfg(feature = "rt-tokio")] + let ts = ts.compat(); + let mut ps = AsyncPeekStream::new(ts); // Send directly from the raw network connection // this builds the connection and tears it down immediately after the send @@ -252,7 +255,9 @@ impl RawTcpProtocolHandler { // .local_addr() // .map_err(map_to_string) // .map_err(logthru_net!("could not get local address from TCP stream"))?; - let mut ps = AsyncPeekStream::new(ts.clone()); + #[cfg(feature = "rt-tokio")] + let ts = ts.compat(); + let mut ps = AsyncPeekStream::new(ts); // Send directly from the raw network connection // this builds the connection and tears it down immediately after the send @@ -271,9 +276,8 @@ impl ProtocolAcceptHandler for RawTcpProtocolHandler { fn on_accept( &self, stream: AsyncPeekStream, - tcp_stream: TcpStream, peer_addr: SocketAddr, ) -> SystemPinBoxFuture, String>> { - Box::pin(self.clone().on_accept_async(stream, tcp_stream, peer_addr)) + Box::pin(self.clone().on_accept_async(stream, peer_addr)) } } diff --git a/veilid-core/src/network_manager/native/protocol/udp.rs b/veilid-core/src/network_manager/native/protocol/udp.rs index 961772b6..1d31f21b 100644 --- a/veilid-core/src/network_manager/native/protocol/udp.rs +++ b/veilid-core/src/network_manager/native/protocol/udp.rs @@ -1,4 +1,5 @@ use super::*; +use sockets::*; #[derive(Clone)] pub struct RawUdpProtocolHandler { diff --git a/veilid-core/src/network_manager/native/protocol/ws.rs b/veilid-core/src/network_manager/native/protocol/ws.rs index 4b0ca448..72179b53 100644 --- a/veilid-core/src/network_manager/native/protocol/ws.rs +++ b/veilid-core/src/network_manager/native/protocol/ws.rs @@ -1,28 +1,35 @@ use super::*; -use async_std::io; + use async_tls::TlsConnector; use async_tungstenite::tungstenite::protocol::Message; use async_tungstenite::{accept_async, client_async, WebSocketStream}; -use futures_util::SinkExt; +use futures_util::{AsyncRead, AsyncWrite, SinkExt}; use sockets::*; +cfg_if! { + if #[cfg(feature="rt-async-std")] { + pub type WebsocketNetworkConnectionWSS = + WebsocketNetworkConnection>; + pub type WebsocketNetworkConnectionWS = WebsocketNetworkConnection; + } else if #[cfg(feature="rt-tokio")] { + pub type WebsocketNetworkConnectionWSS = + WebsocketNetworkConnection>>; + pub type WebsocketNetworkConnectionWS = WebsocketNetworkConnection>; + } +} pub type WebSocketNetworkConnectionAccepted = WebsocketNetworkConnection; -pub type WebsocketNetworkConnectionWSS = - WebsocketNetworkConnection>; -pub type WebsocketNetworkConnectionWS = WebsocketNetworkConnection; pub struct WebsocketNetworkConnection where - T: io::Read + io::Write + Send + Unpin + 'static, + T: AsyncRead + AsyncWrite + Send + Unpin + 'static, { descriptor: ConnectionDescriptor, stream: CloneStream>, - tcp_stream: TcpStream, } impl fmt::Debug for WebsocketNetworkConnection where - T: io::Read + io::Write + Send + Unpin + 'static, + T: AsyncRead + AsyncWrite + Send + Unpin + 'static, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", std::any::type_name::()) @@ -31,17 +38,12 @@ where impl WebsocketNetworkConnection where - T: io::Read + io::Write + Send + Unpin + 'static, + T: AsyncRead + AsyncWrite + Send + Unpin + 'static, { - pub fn new( - descriptor: ConnectionDescriptor, - stream: WebSocketStream, - tcp_stream: TcpStream, - ) -> Self { + pub fn new(descriptor: ConnectionDescriptor, stream: WebSocketStream) -> Self { Self { descriptor, stream: CloneStream::new(stream), - tcp_stream, } } @@ -49,15 +51,15 @@ where self.descriptor.clone() } - #[instrument(level = "trace", err, skip(self))] - pub async fn close(&self) -> Result<(), String> { - // Make an attempt to flush the stream - self.stream.clone().close().await.map_err(map_to_string)?; - // Then forcibly close the socket - self.tcp_stream - .shutdown(Shutdown::Both) - .map_err(map_to_string) - } + // #[instrument(level = "trace", err, skip(self))] + // pub async fn close(&self) -> Result<(), String> { + // // Make an attempt to flush the stream + // self.stream.clone().close().await.map_err(map_to_string)?; + // // Then forcibly close the socket + // self.tcp_stream + // .shutdown(Shutdown::Both) + // .map_err(map_to_string) + // } #[instrument(level = "trace", err, skip(self, message), fields(message.len = message.len()))] pub async fn send(&self, message: Vec) -> Result<(), String> { @@ -101,7 +103,7 @@ struct WebsocketProtocolHandlerArc { tls: bool, local_address: SocketAddr, request_path: Vec, - connection_initial_timeout: u64, + connection_initial_timeout_ms: u32, } #[derive(Clone)] @@ -119,10 +121,10 @@ impl WebsocketProtocolHandler { } else { format!("GET /{}", c.network.protocol.wss.path.trim_end_matches('/')) }; - let connection_initial_timeout = if tls { - ms_to_us(c.network.tls.connection_initial_timeout_ms) + let connection_initial_timeout_ms = if tls { + c.network.tls.connection_initial_timeout_ms } else { - ms_to_us(c.network.connection_initial_timeout_ms) + c.network.connection_initial_timeout_ms }; Self { @@ -130,34 +132,30 @@ impl WebsocketProtocolHandler { tls, local_address, request_path: path.as_bytes().to_vec(), - connection_initial_timeout, + connection_initial_timeout_ms, }), } } - #[instrument(level = "trace", err, skip(self, ps, tcp_stream))] + #[instrument(level = "trace", err, skip(self, ps))] pub async fn on_accept_async( self, ps: AsyncPeekStream, - tcp_stream: TcpStream, socket_addr: SocketAddr, ) -> Result, String> { log_net!("WS: on_accept_async: enter"); let request_path_len = self.arc.request_path.len() + 2; let mut peekbuf: Vec = vec![0u8; request_path_len]; - match io::timeout( - Duration::from_micros(self.arc.connection_initial_timeout), + match timeout( + self.arc.connection_initial_timeout_ms, ps.peek_exact(&mut peekbuf), ) .await { Ok(_) => (), Err(e) => { - if e.kind() == io::ErrorKind::TimedOut { - return Err(e).map_err(map_to_string); - } - return Err(e).map_err(map_to_string); + return Err(e.to_string()); } } @@ -194,7 +192,6 @@ impl WebsocketProtocolHandler { SocketAddress::from_socket_addr(self.arc.local_address), ), ws_stream, - tcp_stream, )); log_net!(debug "{}: on_accept_async from: {}", if self.arc.tls { "WSS" } else { "WS" }, socket_addr); @@ -238,6 +235,9 @@ impl WebsocketProtocolHandler { // See what local address we ended up with let actual_local_addr = tcp_stream.local_addr().map_err(map_to_string)?; + #[cfg(feature = "rt-tokio")] + let tcp_stream = tcp_stream.compat(); + // Make our connection descriptor let descriptor = ConnectionDescriptor::new( dial_info.to_peer_address(), @@ -247,7 +247,7 @@ impl WebsocketProtocolHandler { if tls { let connector = TlsConnector::default(); let tls_stream = connector - .connect(domain.to_string(), tcp_stream.clone()) + .connect(domain.to_string(), tcp_stream) .await .map_err(map_to_string) .map_err(logthru_net!(error))?; @@ -257,15 +257,15 @@ impl WebsocketProtocolHandler { .map_err(logthru_net!(error))?; Ok(ProtocolNetworkConnection::Wss( - WebsocketNetworkConnection::new(descriptor, ws_stream, tcp_stream), + WebsocketNetworkConnection::new(descriptor, ws_stream), )) } else { - let (ws_stream, _response) = client_async(request, tcp_stream.clone()) + let (ws_stream, _response) = client_async(request, tcp_stream) .await .map_err(map_to_string) .map_err(logthru_net!(error))?; Ok(ProtocolNetworkConnection::Ws( - WebsocketNetworkConnection::new(descriptor, ws_stream, tcp_stream), + WebsocketNetworkConnection::new(descriptor, ws_stream), )) } } @@ -319,9 +319,8 @@ impl ProtocolAcceptHandler for WebsocketProtocolHandler { fn on_accept( &self, stream: AsyncPeekStream, - tcp_stream: TcpStream, peer_addr: SocketAddr, ) -> SystemPinBoxFuture, String>> { - Box::pin(self.clone().on_accept_async(stream, tcp_stream, peer_addr)) + Box::pin(self.clone().on_accept_async(stream, peer_addr)) } } diff --git a/veilid-core/src/network_manager/native/start_protocols.rs b/veilid-core/src/network_manager/native/start_protocols.rs index 81c2a139..b64ee3bd 100644 --- a/veilid-core/src/network_manager/native/start_protocols.rs +++ b/veilid-core/src/network_manager/native/start_protocols.rs @@ -319,7 +319,6 @@ impl Network { // Resolve statically configured public dialinfo let mut public_sockaddrs = public_address .to_socket_addrs() - .await .map_err(|e| format!("Unable to resolve address: {}\n{}", public_address, e))?; // Add all resolved addresses as public dialinfo @@ -416,7 +415,6 @@ impl Network { let global_socket_addrs = split_url .host_port(80) .to_socket_addrs() - .await .map_err(map_to_string) .map_err(logthru_net!(error))?; @@ -548,7 +546,6 @@ impl Network { let global_socket_addrs = split_url .host_port(443) .to_socket_addrs() - .await .map_err(map_to_string) .map_err(logthru_net!(error))?; @@ -662,7 +659,6 @@ impl Network { // Resolve statically configured public dialinfo let mut public_sockaddrs = public_address .to_socket_addrs() - .await .map_err(|e| format!("Unable to resolve address: {}\n{}", public_address, e))?; // Add all resolved addresses as public dialinfo diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index 535be202..ea508670 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -6,7 +6,6 @@ cfg_if::cfg_if! { if #[cfg(target_arch = "wasm32")] { // No accept support for WASM } else { - use async_std::net::*; /////////////////////////////////////////////////////////// // Accept @@ -15,7 +14,6 @@ cfg_if::cfg_if! { fn on_accept( &self, stream: AsyncPeekStream, - tcp_stream: TcpStream, peer_addr: SocketAddr, ) -> SystemPinBoxFuture, String>>; } @@ -139,7 +137,7 @@ impl NetworkConnection { let local_stop_token = stop_source.token(); // Spawn connection processor and pass in protocol connection - let processor = MustJoinHandle::new(intf::spawn_local(Self::process_connection( + let processor = intf::spawn_local(Self::process_connection( connection_manager, local_stop_token, manager_stop_token, @@ -148,7 +146,7 @@ impl NetworkConnection { protocol_connection, inactivity_timeout, stats.clone(), - ))); + )); // Return the connection Self { diff --git a/veilid-core/src/routing_table/bucket.rs b/veilid-core/src/routing_table/bucket.rs index fbdbaf12..8312038d 100644 --- a/veilid-core/src/routing_table/bucket.rs +++ b/veilid-core/src/routing_table/bucket.rs @@ -82,7 +82,7 @@ impl Bucket { .iter() .map(|(k, v)| (k.clone(), v.clone())) .collect(); - let cur_ts = get_timestamp(); + let cur_ts = intf::get_timestamp(); sorted_entries.sort_by(|a, b| -> core::cmp::Ordering { if a.0 == b.0 { return core::cmp::Ordering::Equal; diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 6c1b6e97..c2e77024 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -418,7 +418,7 @@ pub struct BucketEntry { impl BucketEntry { pub(super) fn new() -> Self { - let now = get_timestamp(); + let now = intf::get_timestamp(); Self { ref_count: AtomicU32::new(0), inner: RwLock::new(BucketEntryInner { diff --git a/veilid-core/src/routing_table/debug.rs b/veilid-core/src/routing_table/debug.rs index 4731c5db..471e58a3 100644 --- a/veilid-core/src/routing_table/debug.rs +++ b/veilid-core/src/routing_table/debug.rs @@ -89,7 +89,7 @@ impl RoutingTable { pub fn debug_info_entries(&self, limit: usize, min_state: BucketEntryState) -> String { let inner = self.inner.read(); - let cur_ts = get_timestamp(); + let cur_ts = intf::get_timestamp(); let mut out = String::new(); @@ -148,7 +148,7 @@ impl RoutingTable { pub fn debug_info_buckets(&self, min_state: BucketEntryState) -> String { let inner = self.inner.read(); - let cur_ts = get_timestamp(); + let cur_ts = intf::get_timestamp(); let mut out = String::new(); const COLS: usize = 16; diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index 5601f723..509bd3fa 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -1,7 +1,6 @@ use super::*; use crate::dht::*; -use crate::intf::*; use crate::xx::*; use crate::*; @@ -219,7 +218,7 @@ impl RoutingTable { F: FnMut(DHTKey, Option>) -> bool, T: FnMut(DHTKey, Option>) -> O, { - let cur_ts = get_timestamp(); + let cur_ts = intf::get_timestamp(); let out = self.find_peers_with_sort_and_filter( node_count, cur_ts, @@ -301,7 +300,7 @@ impl RoutingTable { T: FnMut(DHTKey, Option>) -> O, F: FnMut(DHTKey, Option>) -> bool, { - let cur_ts = get_timestamp(); + let cur_ts = intf::get_timestamp(); let node_count = { let c = self.config.get(); c.network.dht.max_find_node_count as usize diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 039b3b1b..f98d79c8 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -7,7 +7,6 @@ mod stats_accounting; mod tasks; use crate::dht::*; -use crate::intf::*; use crate::network_manager::*; use crate::rpc_processor::*; use crate::xx::*; diff --git a/veilid-core/src/routing_table/tasks.rs b/veilid-core/src/routing_table/tasks.rs index 72aee5e2..da053095 100644 --- a/veilid-core/src/routing_table/tasks.rs +++ b/veilid-core/src/routing_table/tasks.rs @@ -369,9 +369,7 @@ impl RoutingTable { Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { if v.with(|e| e.needs_ping(&k, cur_ts, relay_node_id)) { let nr = NodeRef::new(self.clone(), k, v, None); - unord.push(MustJoinHandle::new(intf::spawn_local( - rpc.clone().rpc_call_status(nr), - ))); + unord.push(intf::spawn_local(rpc.clone().rpc_call_status(nr))); } Option::<()>::None }); diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 6433ae35..d03b4cd4 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -6,7 +6,6 @@ pub use debug::*; pub use private_route::*; use crate::dht::*; -use crate::intf::*; use crate::xx::*; use capnp::message::ReaderSegments; use coders::*; @@ -228,7 +227,7 @@ impl RPCProcessor { ////////////////////////////////////////////////////////////////////// fn get_next_op_id(&self) -> OperationId { - get_random_u64() + intf::get_random_u64() } fn filter_peer_scope(&self, node_info: &NodeInfo) -> bool { @@ -365,12 +364,12 @@ impl RPCProcessor { let timeout_ms = u32::try_from(waitable_reply.timeout / 1000u64) .map_err(map_error_internal!("invalid timeout"))?; // wait for eventualvalue - let start_ts = get_timestamp(); - let res = timeout(timeout_ms, waitable_reply.eventual.instance()) + let start_ts = intf::get_timestamp(); + let res = intf::timeout(timeout_ms, waitable_reply.eventual.instance()) .await .map_err(|_| RPCError::Timeout)?; let rpcreader = res.take_value().unwrap(); - let end_ts = get_timestamp(); + let end_ts = intf::get_timestamp(); Ok((rpcreader, end_ts - start_ts)) } async fn wait_for_reply( @@ -390,7 +389,7 @@ impl RPCProcessor { waitable_reply.node_ref.set_seen_our_node_info(); // Reply received - let recv_ts = get_timestamp(); + let recv_ts = intf::get_timestamp(); self.routing_table().stats_answer_rcvd( waitable_reply.node_ref, waitable_reply.send_ts, @@ -554,7 +553,7 @@ impl RPCProcessor { log_rpc!(debug "==>> REQUEST({}) -> {:?}", self.get_rpc_message_debug_info(&message), dest); let bytes = out.len() as u64; - let send_ts = get_timestamp(); + let send_ts = intf::get_timestamp(); let send_data_kind = match self .network_manager() .send_envelope(node_ref.clone(), Some(out_node_id), out) @@ -745,7 +744,7 @@ impl RPCProcessor { }, node_ref); let bytes = out.len() as u64; - let send_ts = get_timestamp(); + let send_ts = intf::get_timestamp(); self.network_manager() .send_envelope(node_ref.clone(), Some(out_node_id), out) .await @@ -1399,7 +1398,7 @@ impl RPCProcessor { let mut timeout = ms_to_us(c.network.rpc.timeout_ms); let mut max_route_hop_count = c.network.rpc.max_route_hop_count as usize; if concurrency == 0 { - concurrency = get_concurrency() / 2; + concurrency = intf::get_concurrency() / 2; if concurrency == 0 { concurrency = 1; } @@ -1424,8 +1423,8 @@ impl RPCProcessor { for _ in 0..concurrency { let this = self.clone(); let receiver = channel.1.clone(); - let jh = spawn(Self::rpc_worker(this, inner.stop_source.as_ref().unwrap().token(), receiver)); - inner.worker_join_handles.push(MustJoinHandle::new(jh)); + let jh = intf::spawn(Self::rpc_worker(this, inner.stop_source.as_ref().unwrap().token(), receiver)); + inner.worker_join_handles.push(jh); } Ok(()) @@ -1466,7 +1465,7 @@ impl RPCProcessor { ) -> Result<(), String> { let msg = RPCMessage { header: RPCMessageHeader { - timestamp: get_timestamp(), + timestamp: intf::get_timestamp(), envelope, body_len: body.len() as u64, peer_noderef, diff --git a/veilid-core/src/tests/common/test_host_interface.rs b/veilid-core/src/tests/common/test_host_interface.rs index d0908819..0585e559 100644 --- a/veilid-core/src/tests/common/test_host_interface.rs +++ b/veilid-core/src/tests/common/test_host_interface.rs @@ -465,7 +465,8 @@ cfg_if! { let t1 = intf::get_timestamp(); let mut interfaces = intf::utils::network_interfaces::NetworkInterfaces::new(); let count = 100; - for _ in 0..count { + for x in 0..count { + info!("loop {}", x); if let Err(e) = interfaces.refresh().await { error!("error refreshing interfaces: {}", e); } @@ -508,43 +509,6 @@ pub async fn test_get_random_u32() { ); } -pub async fn test_single_future() { - info!("testing single future"); - let sf = SingleFuture::::new(); - assert_eq!(sf.check().await, Ok(None)); - assert_eq!( - sf.single_spawn(async { - intf::sleep(2000).await; - 69 - }) - .await, - Ok((None, true)) - ); - assert_eq!(sf.check().await, Ok(None)); - assert_eq!(sf.single_spawn(async { panic!() }).await, Ok((None, false))); - assert_eq!(sf.join().await, Ok(Some(69))); - assert_eq!( - sf.single_spawn(async { - intf::sleep(1000).await; - 37 - }) - .await, - Ok((None, true)) - ); - intf::sleep(2000).await; - assert_eq!( - sf.single_spawn(async { - intf::sleep(1000).await; - 27 - }) - .await, - Ok((Some(37), true)) - ); - intf::sleep(2000).await; - assert_eq!(sf.join().await, Ok(Some(27))); - assert_eq!(sf.check().await, Ok(None)); -} - pub async fn test_must_join_single_future() { info!("testing must join single future"); let sf = MustJoinSingleFuture::::new(); @@ -604,7 +568,6 @@ pub async fn test_all() { test_sleep().await; #[cfg(not(target_arch = "wasm32"))] test_network_interfaces().await; - test_single_future().await; test_must_join_single_future().await; test_eventual().await; test_eventual_value().await; diff --git a/veilid-core/src/tests/common/test_protected_store.rs b/veilid-core/src/tests/common/test_protected_store.rs index 8e770513..ef95d208 100644 --- a/veilid-core/src/tests/common/test_protected_store.rs +++ b/veilid-core/src/tests/common/test_protected_store.rs @@ -1,5 +1,4 @@ use super::test_veilid_config::*; -use crate::intf::*; use crate::xx::*; use crate::*; diff --git a/veilid-core/src/tests/common/test_table_store.rs b/veilid-core/src/tests/common/test_table_store.rs index d6ea5e83..ad58d834 100644 --- a/veilid-core/src/tests/common/test_table_store.rs +++ b/veilid-core/src/tests/common/test_table_store.rs @@ -1,5 +1,4 @@ use super::test_veilid_config::*; -use crate::intf::*; use crate::xx::*; use crate::*; diff --git a/veilid-core/src/tests/native/mod.rs b/veilid-core/src/tests/native/mod.rs index 987a6418..f5e161bd 100644 --- a/veilid-core/src/tests/native/mod.rs +++ b/veilid-core/src/tests/native/mod.rs @@ -69,53 +69,64 @@ pub fn run_all_tests() { info!("Finished unit tests"); } +#[cfg(feature = "rt-tokio")] +fn block_on, T>(f: F) -> T { + let rt = tokio::runtime::Runtime::new().unwrap(); + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, f) +} +#[cfg(feature = "rt-async-std")] +fn block_on, T>(f: F) -> T { + async_std::task::block_on(f) +} + fn exec_test_host_interface() { - async_std::task::block_on(async { + block_on(async { test_host_interface::test_all().await; }); } fn exec_test_dht_key() { - async_std::task::block_on(async { + block_on(async { test_dht_key::test_all().await; }); } fn exec_test_veilid_core() { - async_std::task::block_on(async { + block_on(async { test_veilid_core::test_all().await; }); } fn exec_test_veilid_config() { - async_std::task::block_on(async { + block_on(async { test_veilid_config::test_all().await; }) } fn exec_test_async_peek_stream() { - async_std::task::block_on(async { + block_on(async { test_async_peek_stream::test_all().await; }) } fn exec_test_connection_table() { - async_std::task::block_on(async { + block_on(async { test_connection_table::test_all().await; }) } fn exec_test_table_store() { - async_std::task::block_on(async { + block_on(async { test_table_store::test_all().await; }) } fn exec_test_protected_store() { - async_std::task::block_on(async { + block_on(async { test_protected_store::test_all().await; }) } fn exec_test_crypto() { - async_std::task::block_on(async { + block_on(async { test_crypto::test_all().await; }) } fn exec_test_envelope_receipt() { - async_std::task::block_on(async { + block_on(async { test_envelope_receipt::test_all().await; }) } diff --git a/veilid-core/src/tests/native/test_async_peek_stream.rs b/veilid-core/src/tests/native/test_async_peek_stream.rs index 2e70f54f..b23e0759 100644 --- a/veilid-core/src/tests/native/test_async_peek_stream.rs +++ b/veilid-core/src/tests/native/test_async_peek_stream.rs @@ -1,7 +1,17 @@ use super::*; -use async_std::net::{TcpListener, TcpStream}; -use async_std::prelude::FutureExt; -use async_std::task; + +cfg_if! { + if #[cfg(feature="rt-async-std")] { + use async_std::net::{TcpListener, TcpStream}; + use async_std::prelude::FutureExt; + use async_std::task::sleep; + } else if #[cfg(feature="rt-tokio")] { + use tokio::net::{TcpListener, TcpStream}; + use tokio::time::sleep; + use tokio_util::compat::*; + } +} + use futures_util::{AsyncReadExt, AsyncWriteExt}; use std::io; @@ -18,23 +28,40 @@ async fn make_tcp_loopback() -> Result<(TcpStream, TcpStream), io::Error> { Result::::Ok(accepted_stream) }; let connect_future = async { - task::sleep(Duration::from_secs(1)).await; + sleep(Duration::from_secs(1)).await; let connected_stream = TcpStream::connect(local_addr).await?; connected_stream.set_nodelay(true)?; Result::::Ok(connected_stream) }; - Ok(accept_future.try_join(connect_future).await?) + cfg_if! { + if #[cfg(feature="rt-async-std")] { + accept_future.try_join(connect_future).await + } else if #[cfg(feature="rt-tokio")] { + tokio::try_join!(accept_future, connect_future) + } + } } async fn make_async_peek_stream_loopback() -> (AsyncPeekStream, AsyncPeekStream) { let (acc, conn) = make_tcp_loopback().await.unwrap(); + #[cfg(feature = "rt-tokio")] + let acc = acc.compat(); + #[cfg(feature = "rt-tokio")] + let conn = conn.compat(); + let aps_a = AsyncPeekStream::new(acc); let aps_c = AsyncPeekStream::new(conn); (aps_a, aps_c) } +#[cfg(feature = "rt-tokio")] +async fn make_stream_loopback() -> (Compat, Compat) { + let (a, c) = make_tcp_loopback().await.unwrap(); + (a.compat(), c.compat()) +} +#[cfg(feature = "rt-async-std")] async fn make_stream_loopback() -> (TcpStream, TcpStream) { make_tcp_loopback().await.unwrap() } diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index c3e98587..58cf4702 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -1731,7 +1731,7 @@ impl fmt::Debug for VeilidAPIInner { impl Drop for VeilidAPIInner { fn drop(&mut self) { if let Some(context) = self.context.take() { - intf::spawn_local(api_shutdown(context)).detach(); + intf::spawn_detached(api_shutdown(context)); } } } diff --git a/veilid-core/src/xx/async_peek_stream.rs b/veilid-core/src/xx/async_peek_stream.rs index 93425364..57537e75 100644 --- a/veilid-core/src/xx/async_peek_stream.rs +++ b/veilid-core/src/xx/async_peek_stream.rs @@ -5,17 +5,8 @@ use task::{Context, Poll}; //////// /// -trait SendStream: AsyncRead + AsyncWrite + Send + Unpin { - fn clone_stream(&self) -> Box; -} -impl SendStream for S -where - S: AsyncRead + AsyncWrite + Send + Clone + Unpin + 'static, -{ - fn clone_stream(&self) -> Box { - Box::new(self.clone()) - } -} +trait SendStream: AsyncRead + AsyncWrite + Send + Unpin {} +impl SendStream for S where S: AsyncRead + AsyncWrite + Send + Unpin + 'static {} //////// /// @@ -126,7 +117,7 @@ where impl AsyncPeekStream { pub fn new(stream: S) -> Self where - S: AsyncRead + AsyncWrite + Send + Clone + Unpin + 'static, + S: AsyncRead + AsyncWrite + Send + Unpin + 'static, { Self { inner: Arc::new(Mutex::new(AsyncPeekStreamInner { diff --git a/veilid-core/src/xx/mod.rs b/veilid-core/src/xx/mod.rs index 36894216..ec679466 100644 --- a/veilid-core/src/xx/mod.rs +++ b/veilid-core/src/xx/mod.rs @@ -11,7 +11,7 @@ mod log_thru; mod must_join_handle; mod must_join_single_future; mod mutable_future; -mod single_future; +// mod single_future; mod single_shot_eventual; mod split_url; mod tick_task; @@ -67,6 +67,7 @@ cfg_if! { pub use no_std_net::{ SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, IpAddr, Ipv4Addr, Ipv6Addr }; pub type SystemPinBoxFuture = PinBox + 'static>; pub type SystemPinBoxFutureLifetime<'a, T> = PinBox + 'a>; + pub use async_executors::JoinHandle as LowLevelJoinHandle; } else { pub use std::string::String; pub use std::vec::Vec; @@ -91,8 +92,17 @@ cfg_if! { pub use std::time::Duration; pub use std::pin::Pin; pub use std::ops::{FnOnce, FnMut, Fn}; - pub use async_std::sync::Mutex as AsyncMutex; - pub use async_std::sync::MutexGuard as AsyncMutexGuard; + cfg_if! { + if #[cfg(feature="rt-async-std")] { + pub use async_std::sync::Mutex as AsyncMutex; + pub use async_std::sync::MutexGuard as AsyncMutexGuard; + pub use async_std::task::JoinHandle as LowLevelJoinHandle; + } else if #[cfg(feature="rt-tokio")] { + pub use tokio::sync::Mutex as AsyncMutex; + pub use tokio::sync::MutexGuard as AsyncMutexGuard; + pub use tokio::task::JoinHandle as LowLevelJoinHandle; + } + } pub use std::net::{ SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, IpAddr, Ipv4Addr, Ipv6Addr }; pub type SystemPinBoxFuture = PinBox + Send + 'static>; pub type SystemPinBoxFutureLifetime<'a, T> = PinBox + Send + 'a>; @@ -111,7 +121,7 @@ pub use ip_extra::*; pub use must_join_handle::*; pub use must_join_single_future::*; pub use mutable_future::*; -pub use single_future::*; +// pub use single_future::*; pub use single_shot_eventual::*; pub use tick_task::*; pub use tools::*; diff --git a/veilid-core/src/xx/must_join_handle.rs b/veilid-core/src/xx/must_join_handle.rs index c6c8aa4a..a5023ff2 100644 --- a/veilid-core/src/xx/must_join_handle.rs +++ b/veilid-core/src/xx/must_join_handle.rs @@ -1,21 +1,40 @@ -use async_executors::JoinHandle; +use super::*; use core::future::Future; use core::pin::Pin; use core::task::{Context, Poll}; #[derive(Debug)] pub struct MustJoinHandle { - join_handle: JoinHandle, + join_handle: Option>, completed: bool, } impl MustJoinHandle { - pub fn new(join_handle: JoinHandle) -> Self { + pub fn new(join_handle: LowLevelJoinHandle) -> Self { Self { - join_handle, + join_handle: Some(join_handle), completed: false, } } + + pub async fn abort(mut self) { + if !self.completed { + cfg_if! { + if #[cfg(feature="rt-async-std")] { + if let Some(jh) = self.join_handle.take() { + jh.cancel().await; + self.completed = true; + } + } else if #[cfg(feature="rt-tokio")] { + if let Some(jh) = self.join_handle.take() { + jh.abort(); + let _ = jh.await; + self.completed = true; + } + } + } + } + } } impl Drop for MustJoinHandle { @@ -31,10 +50,16 @@ impl Future for MustJoinHandle { type Output = T; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Pin::new(&mut self.join_handle).poll(cx) { + match Pin::new(self.join_handle.as_mut().unwrap()).poll(cx) { Poll::Ready(t) => { self.completed = true; - Poll::Ready(t) + cfg_if! { + if #[cfg(feature="rt-async-std")] { + Poll::Ready(t) + } else if #[cfg(feature="rt-tokio")] { + Poll::Ready(t.unwrap()) + } + } } Poll::Pending => Poll::Pending, } diff --git a/veilid-core/src/xx/must_join_single_future.rs b/veilid-core/src/xx/must_join_single_future.rs index 6d8ffcab..1fa9556b 100644 --- a/veilid-core/src/xx/must_join_single_future.rs +++ b/veilid-core/src/xx/must_join_single_future.rs @@ -1,6 +1,5 @@ use super::*; -use crate::intf::*; -use cfg_if::*; +use crate::*; use core::task::Poll; use futures_util::poll; @@ -160,7 +159,7 @@ where // Run if we should do that if run { - self.unlock(Some(MustJoinHandle::new(spawn_local(future)))); + self.unlock(Some(intf::spawn_local(future))); } // Return the prior result if we have one @@ -203,7 +202,7 @@ cfg_if! { } // Run if we should do that if run { - self.unlock(Some(MustJoinHandle::new(spawn(future)))); + self.unlock(Some(intf::spawn_with_local_set(future))); } // Return the prior result if we have one Ok((out, run)) diff --git a/veilid-core/src/xx/single_future.rs b/veilid-core/src/xx/single_future.rs deleted file mode 100644 index 99c32c6f..00000000 --- a/veilid-core/src/xx/single_future.rs +++ /dev/null @@ -1,242 +0,0 @@ -use super::*; -use crate::intf::*; -use cfg_if::*; -use core::task::Poll; -use futures_util::poll; - -#[derive(Debug)] -struct SingleFutureInner -where - T: 'static, -{ - locked: bool, - join_handle: Option>, -} - -/// Spawns a single background processing task idempotently, possibly returning the return value of the previously executed background task -/// This does not queue, just ensures that no more than a single copy of the task is running at a time, but allowing tasks to be retriggered -#[derive(Debug, Clone)] -pub struct SingleFuture -where - T: 'static, -{ - inner: Arc>>, -} - -impl Default for SingleFuture -where - T: 'static, -{ - fn default() -> Self { - Self::new() - } -} - -impl SingleFuture -where - T: 'static, -{ - pub fn new() -> Self { - Self { - inner: Arc::new(Mutex::new(SingleFutureInner { - locked: false, - join_handle: None, - })), - } - } - - fn try_lock(&self) -> Result>, ()> { - let mut inner = self.inner.lock(); - if inner.locked { - // If already locked error out - return Err(()); - } - inner.locked = true; - // If we got the lock, return what we have for a join handle if anything - Ok(inner.join_handle.take()) - } - - fn unlock(&self, jh: Option>) { - let mut inner = self.inner.lock(); - assert!(inner.locked); - assert!(inner.join_handle.is_none()); - inner.locked = false; - inner.join_handle = jh; - } - - // Check the result - pub async fn check(&self) -> Result, ()> { - let mut out: Option = None; - - // See if we have a result we can return - let maybe_jh = match self.try_lock() { - Ok(v) => v, - Err(_) => { - // If we are already polling somewhere else, don't hand back a result - return Err(()); - } - }; - if maybe_jh.is_some() { - let mut jh = maybe_jh.unwrap(); - - // See if we finished, if so, return the value of the last execution - if let Poll::Ready(r) = poll!(&mut jh) { - out = Some(r); - // Task finished, unlock with nothing - self.unlock(None); - } else { - // Still running put the join handle back so we can check on it later - self.unlock(Some(jh)); - } - } else { - // No task, unlock with nothing - self.unlock(None); - } - - // Return the prior result if we have one - Ok(out) - } - - // Wait for the result - pub async fn join(&self) -> Result, ()> { - let mut out: Option = None; - - // See if we have a result we can return - let maybe_jh = match self.try_lock() { - Ok(v) => v, - Err(_) => { - // If we are already polling somewhere else, - // that's an error because you can only join - // these things once - return Err(()); - } - }; - if maybe_jh.is_some() { - let jh = maybe_jh.unwrap(); - // Wait for return value of the last execution - out = Some(jh.await); - // Task finished, unlock with nothing - } else { - // No task, unlock with nothing - } - self.unlock(None); - - // Return the prior result if we have one - Ok(out) - } - - // Cancel - pub async fn cancel(&self) -> Result, ()> { - let mut out: Option = None; - - // See if we have a result we can return - let maybe_jh = match self.try_lock() { - Ok(v) => v, - Err(_) => { - // If we are already polling somewhere else, don't hand back a result - return Err(()); - } - }; - if maybe_jh.is_some() { - let mut jh = maybe_jh.unwrap(); - - // See if we finished, if so, return the value of the last execution - if let Poll::Ready(r) = poll!(&mut jh) { - out = Some(r); - // Task finished, unlock with nothing - } else { - // Still running but drop the join handle anyway to cancel the task, unlock with nothing - } - } - self.unlock(None); - - // Return the prior result if we have one - Ok(out) - } - - // Possibly spawn the future possibly returning the value of the last execution - cfg_if! { - if #[cfg(target_arch = "wasm32")] { - pub async fn single_spawn( - &self, - future: impl Future + 'static, - ) -> Result<(Option, bool), ()> { - let mut out: Option = None; - - // See if we have a result we can return - let maybe_jh = match self.try_lock() { - Ok(v) => v, - Err(_) => { - // If we are already polling somewhere else, don't hand back a result - return Err(()); - } - }; - let mut run = true; - - if maybe_jh.is_some() { - let mut jh = maybe_jh.unwrap(); - - // See if we finished, if so, return the value of the last execution - if let Poll::Ready(r) = poll!(&mut jh) { - out = Some(r); - // Task finished, unlock with a new task - } else { - // Still running, don't run again, unlock with the current join handle - run = false; - self.unlock(Some(jh)); - } - } - - // Run if we should do that - if run { - self.unlock(Some(spawn_local(future))); - } - - // Return the prior result if we have one - Ok((out, run)) - } - } - } -} -cfg_if! { - if #[cfg(not(target_arch = "wasm32"))] { - impl SingleFuture - where - T: 'static + Send, - { - pub async fn single_spawn( - &self, - future: impl Future + Send + 'static, - ) -> Result<(Option, bool), ()> { - let mut out: Option = None; - // See if we have a result we can return - let maybe_jh = match self.try_lock() { - Ok(v) => v, - Err(_) => { - // If we are already polling somewhere else, don't hand back a result - return Err(()); - } - }; - let mut run = true; - if maybe_jh.is_some() { - let mut jh = maybe_jh.unwrap(); - // See if we finished, if so, return the value of the last execution - if let Poll::Ready(r) = poll!(&mut jh) { - out = Some(r); - // Task finished, unlock with a new task - } else { - // Still running, don't run again, unlock with the current join handle - run = false; - self.unlock(Some(jh)); - } - } - // Run if we should do that - if run { - self.unlock(Some(spawn(future))); - } - // Return the prior result if we have one - Ok((out, run)) - } - } - } -} diff --git a/veilid-core/src/xx/tick_task.rs b/veilid-core/src/xx/tick_task.rs index ab2e5ebb..339a11ff 100644 --- a/veilid-core/src/xx/tick_task.rs +++ b/veilid-core/src/xx/tick_task.rs @@ -1,5 +1,5 @@ use super::*; -use crate::intf::*; +use crate::*; use core::sync::atomic::{AtomicU64, Ordering}; use once_cell::sync::OnceCell; @@ -90,7 +90,7 @@ impl TickTask { } pub async fn tick(&self) -> Result<(), String> { - let now = get_timestamp(); + let now = intf::get_timestamp(); let last_timestamp_us = self.last_timestamp_us.load(Ordering::Acquire); if last_timestamp_us != 0u64 && (now - last_timestamp_us) < self.tick_period_us { diff --git a/veilid-flutter/rust/Cargo.toml b/veilid-flutter/rust/Cargo.toml index 6c216160..1c87f745 100644 --- a/veilid-flutter/rust/Cargo.toml +++ b/veilid-flutter/rust/Cargo.toml @@ -7,7 +7,6 @@ edition = "2021" crate-type = ["cdylib", "staticlib", "rlib"] [dependencies] -veilid-core = { path="../../veilid-core" } tracing = { version = "^0", features = ["log", "attributes"] } tracing-subscriber = "^0" parking_lot = "^0" @@ -19,18 +18,20 @@ futures = "^0" # Dependencies for native builds only # Linux, Windows, Mac, iOS, Android [target.'cfg(not(target_arch = "wasm32"))'.dependencies] -async-std = { version = "^1", features = ["unstable"] } +veilid-core = { path="../../veilid-core", features = [ "rt-tokio" ] } +tokio = { version = "^1", features = ["full"] } allo-isolate = "^0" ffi-support = "^0" lazy_static = "^1" tracing-opentelemetry = "^0" -opentelemetry = { version = "^0", features = ["rt-async-std"] } -opentelemetry-otlp = { version = "^0", features = ["grpc-sys"] } +opentelemetry = { version = "^0", features = ["rt-tokio"] } +opentelemetry-otlp = { version = "^0" } opentelemetry-semantic-conventions = "^0" hostname = "^0" # Dependencies for WASM builds only [target.'cfg(target_arch = "wasm32")'.dependencies] +veilid-core = { path="../../veilid-core" } # Dependencies for Android builds only [target.'cfg(target_os = "android")'.dependencies] diff --git a/veilid-server/Cargo.toml b/veilid-server/Cargo.toml index 0b103842..a36062d6 100644 --- a/veilid-server/Cargo.toml +++ b/veilid-server/Cargo.toml @@ -11,8 +11,9 @@ name = "veilid-server" path = "src/main.rs" [features] -default = [ "rt-async-std" ] -rt-async-std = [ "veilid-core/rt-async-std", "async-std", "async-tungstenite/async-std-runtime", "opentelemetry/rt-async-std", "opentelemetry-otlp/grpc-sys"] +default = [ "rt-tokio" ] +rt-async-std = [ "veilid-core/rt-async-std", "async-std", "opentelemetry/rt-async-std", "opentelemetry-otlp/grpc-sys"] +rt-tokio = [ "veilid-core/rt-tokio", "tokio", "opentelemetry/rt-tokio"] tracking = ["veilid-core/tracking"] [dependencies] @@ -26,6 +27,7 @@ opentelemetry-otlp = { version = "^0" } opentelemetry-semantic-conventions = "^0" clap = "^3" async-std = { version = "^1", features = ["unstable"], optional = true } +tokio = { version = "^1", features = ["full"], optional = true } async-tungstenite = { version = "^0", features = ["async-tls"] } directories = "^4" capnp = "^0" diff --git a/veilid-server/src/client_api.rs b/veilid-server/src/client_api.rs index 0fab05f4..93aa9f4f 100644 --- a/veilid-server/src/client_api.rs +++ b/veilid-server/src/client_api.rs @@ -1,6 +1,5 @@ +use crate::tools::*; use crate::veilid_client_capnp::*; -use async_std::net::TcpListener; -use async_std::prelude::FutureExt; use capnp::capability::Promise; use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem}; use failure::*; @@ -232,7 +231,7 @@ impl veilid_server::Server for VeilidServerImpl { // --- Client API Server-Side --------------------------------- type ClientApiAllFuturesJoinHandle = - async_std::task::JoinHandle, Box<(dyn std::error::Error + 'static)>>>; + JoinHandle, Box<(dyn std::error::Error + 'static)>>>; struct ClientApiInner { veilid_api: veilid_core::VeilidAPI, @@ -286,13 +285,15 @@ impl ClientApi { let listener = TcpListener::bind(bind_addr).await?; debug!("Client API listening on: {:?}", bind_addr); - // Get the + // Process the incoming accept stream + // xxx switch to stoptoken and use stream wrapper for tokio let mut incoming = listener.incoming(); let stop = self.inner.borrow().stop.clone(); let incoming_loop = async move { while let Some(stream_result) = stop.instance_none().race(incoming.next()).await { let stream = stream_result?; stream.set_nodelay(true)?; + // xxx use tokio split code too let (reader, writer) = stream.split(); let network = twoparty::VatNetwork::new( reader, @@ -303,7 +304,7 @@ impl ClientApi { let rpc_system = RpcSystem::new(Box::new(network), Some(client.clone().client)); - async_std::task::spawn_local(rpc_system.map(drop)); + spawn_local(rpc_system.map(drop)); } Ok::<(), Box>(()) }; @@ -332,7 +333,7 @@ impl ClientApi { if let Some(request_promise) = request(id, registration) { let registration_map2 = registration_map1.clone(); - async_std::task::spawn_local(request_promise.promise.map(move |r| match r { + spawn_local(request_promise.promise.map(move |r| match r { Ok(_) => { if let Some(ref mut s) = registration_map2.borrow_mut().registrations.get_mut(&id) @@ -385,6 +386,6 @@ impl ClientApi { .iter() .map(|addr| self.clone().handle_incoming(*addr, client.clone())); let bind_futures_join = futures::future::try_join_all(bind_futures); - self.inner.borrow_mut().join_handle = Some(async_std::task::spawn_local(bind_futures_join)); + self.inner.borrow_mut().join_handle = Some(spawn_local(bind_futures_join)); } } diff --git a/veilid-server/src/main.rs b/veilid-server/src/main.rs index cff57ec4..48ae3141 100644 --- a/veilid-server/src/main.rs +++ b/veilid-server/src/main.rs @@ -6,15 +6,16 @@ mod client_api; mod cmdline; mod server; mod settings; +mod tools; #[cfg(unix)] mod unix; mod veilid_logs; #[cfg(windows)] mod windows; -use async_std::task; use cfg_if::*; use server::*; +use tools::*; use tracing::*; use veilid_logs::*; @@ -59,7 +60,7 @@ fn main() -> Result<(), String> { // Handle non-normal server modes if !matches!(server_mode, ServerMode::Normal) { // run the server to set the node id and quit - return task::block_on(async { + return block_on(async { // Init combined console/file logger let _logs = VeilidLogs::setup(settings.clone())?; @@ -93,7 +94,7 @@ fn main() -> Result<(), String> { .expect("Error setting Ctrl-C handler"); // Run the server loop - task::block_on(async { + block_on(async { // Init combined console/file logger let _logs = VeilidLogs::setup(settings.clone())?; diff --git a/veilid-server/src/server.rs b/veilid-server/src/server.rs index 31d619de..1b573915 100644 --- a/veilid-server/src/server.rs +++ b/veilid-server/src/server.rs @@ -1,5 +1,6 @@ use crate::client_api; use crate::settings::*; +use crate::tools::*; use flume::{unbounded, Receiver, Sender}; use lazy_static::*; use parking_lot::Mutex; @@ -77,7 +78,7 @@ pub async fn run_veilid_server_internal( // Process all updates let capi2 = capi.clone(); - let update_receiver_jh = async_std::task::spawn_local(async move { + let update_receiver_jh = spawn_local(async move { while let Ok(change) = receiver.recv_async().await { if let Some(capi) = &capi2 { // Handle state changes on main thread for capnproto rpc @@ -115,7 +116,7 @@ pub async fn run_veilid_server_internal( break; } } - async_std::task::sleep(Duration::from_millis(100)).await; + sleep(Duration::from_millis(100)).await; } match veilid_api.debug("txtrecord".to_string()).await { Ok(v) => { diff --git a/veilid-server/src/tools.rs b/veilid-server/src/tools.rs new file mode 100644 index 00000000..868ffb5c --- /dev/null +++ b/veilid-server/src/tools.rs @@ -0,0 +1,48 @@ +use cfg_if::*; +use core::future::Future; + +cfg_if! { + if #[cfg(feature="rt-async-std")] { + pub use async_std::task::JoinHandle; + pub use async_std::net::TcpListener; + //pub use async_std::net::TcpStream; + //pub use async_std::future::TimeoutError; + pub fn spawn + Send + 'static, T: Send + 'static>(f: F) -> JoinHandle { + async_std::task::spawn_local(f) + } + pub fn spawn_local + 'static, T: 'static>(f: F) -> JoinHandle { + async_std::task::spawn_local(f) + } + pub fn spawn_detached_local + 'static, T: 'static>(f: F) { + let _ = async_std::task::spawn_local(f); + } + pub use async_std::task::sleep; + pub use async_std::future::timeout; + pub fn block_on, T>(f: F) -> T { + async_std::task::block_on(f) + } + } else if #[cfg(feature="rt-tokio")] { + pub use tokio::task::JoinHandle; + pub use tokio::net::TcpListener; + //pub use tokio::net::TcpStream; + //pub use tokio_util::compat::*; + pub use tokio::time::error::Elapsed as TimeoutError; + pub fn spawn + Send + 'static, T: Send + 'static>(f: F) -> JoinHandle { + tokio::task::spawn(f) + } + pub fn spawn_local + 'static, T: 'static>(f: F) -> JoinHandle { + tokio::task::spawn_local(f) + } + pub fn spawn_detached_local + 'static, T: 'static>(f: F) { + let _ = tokio::task::spawn_local(f); + } + pub use tokio::time::sleep; + pub use tokio::time::timeout; + pub fn block_on, T>(f: F) -> T { + let rt = tokio::runtime::Runtime::new().unwrap(); + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, f) + } + + } +} diff --git a/veilid-server/src/unix.rs b/veilid-server/src/unix.rs index adbbc866..6d5c6da4 100644 --- a/veilid-server/src/unix.rs +++ b/veilid-server/src/unix.rs @@ -1,9 +1,9 @@ use crate::server::*; use crate::settings::Settings; +use crate::tools::*; use crate::veilid_logs::*; -use async_std::stream::StreamExt; -use async_std::task; use clap::ArgMatches; +use futures::StreamExt; use signal_hook::consts::signal::*; use signal_hook_async_std::Signals; use std::io::Read; @@ -96,7 +96,7 @@ pub fn run_daemon(settings: Settings, _matches: ArgMatches) -> Result<(), String }; // Now, run the server - task::block_on(async { + block_on(async { // Init combined console/file logger let _logs = VeilidLogs::setup(settings.clone())?; @@ -110,7 +110,7 @@ pub fn run_daemon(settings: Settings, _matches: ArgMatches) -> Result<(), String .map_err(|e| format!("failed to init signals: {}", e))?; let handle = signals.handle(); - let signals_task = async_std::task::spawn(handle_signals(signals)); + let signals_task = spawn(handle_signals(signals)); let res = run_veilid_server(settings, ServerMode::Normal).await; diff --git a/veilid-server/src/windows.rs b/veilid-server/src/windows.rs index 79070a34..19e28ea2 100644 --- a/veilid-server/src/windows.rs +++ b/veilid-server/src/windows.rs @@ -1,4 +1,5 @@ use crate::settings::*; +use crate::tools::*; use clap::ArgMatches; use log::*; use std::ffi::OsString;