From 018d7da429f8adc7c54a997c27248c8aa866df5a Mon Sep 17 00:00:00 2001 From: John Smith Date: Wed, 29 Jun 2022 10:13:49 -0400 Subject: [PATCH] fix tokio --- Cargo.lock | 3 ++ veilid-cli/src/client_api_connection.rs | 1 - veilid-core/Cargo.toml | 2 +- veilid-core/src/intf/native/system.rs | 2 +- veilid-core/src/lib.rs | 8 +++- .../src/network_manager/connection_table.rs | 1 + veilid-core/src/network_manager/mod.rs | 2 +- veilid-core/src/network_manager/native/mod.rs | 2 +- .../src/network_manager/native/network_tcp.rs | 1 + .../src/network_manager/native/network_udp.rs | 29 ++++++-------- .../src/network_manager/network_connection.rs | 16 +++++++- veilid-core/src/routing_table/tasks.rs | 2 +- veilid-core/src/xx/must_join_handle.rs | 15 ++++++- veilid-core/src/xx/must_join_single_future.rs | 2 +- veilid-server/Cargo.toml | 5 ++- veilid-server/src/client_api.rs | 40 ++++++++++++++----- veilid-server/src/server.rs | 2 +- veilid-server/src/tools.rs | 12 +++--- veilid-server/src/unix.rs | 2 +- veilid-server/src/veilid_logs.rs | 22 +++++++--- 20 files changed, 115 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 65f6433d..b3fa7110 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5168,7 +5168,10 @@ dependencies = [ "serial_test", "signal-hook", "signal-hook-async-std", + "stop-token", "tokio", + "tokio-stream", + "tokio-util 0.6.10", "tracing", "tracing-appender", "tracing-journald", diff --git a/veilid-cli/src/client_api_connection.rs b/veilid-cli/src/client_api_connection.rs index e12a8cc3..1c722cd2 100644 --- a/veilid-cli/src/client_api_connection.rs +++ b/veilid-cli/src/client_api_connection.rs @@ -196,7 +196,6 @@ impl ClientApiConnection { stream.set_nodelay(true).map_err(map_to_string)?; // Create the VAT network - cfg_if! { if #[cfg(feature="rt-async-std")] { let (reader, writer) = stream.split(); diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index 54cf5810..3efd2207 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -45,7 +45,7 @@ flume = { version = "^0", features = ["async"] } enumset = { version= "^1", features = ["serde"] } backtrace = { version = "^0", optional = true } owo-colors = "^3" -stop-token = "^0" +stop-token = { version = "^0", default-features = false } ed25519-dalek = { version = "^1", default_features = false, features = ["alloc", "u64_backend"] } x25519-dalek = { package = "x25519-dalek-ng", version = "^1", default_features = false, features = ["u64_backend"] } curve25519-dalek = { package = "curve25519-dalek-ng", version = "^4", default_features = false, features = ["alloc", "u64_backend"] } diff --git a/veilid-core/src/intf/native/system.rs b/veilid-core/src/intf/native/system.rs index eb046681..0466201f 100644 --- a/veilid-core/src/intf/native/system.rs +++ b/veilid-core/src/intf/native/system.rs @@ -68,7 +68,7 @@ pub async fn sleep(millis: u32) { } } else { cfg_if! { - if #[cfg(feature="rt-async-std")] { + if #[cfg(feature="rt-async-std")] { async_std::task::sleep(Duration::from_millis(u64::from(millis))).await; } else if #[cfg(feature="rt-tokio")] { tokio::time::sleep(Duration::from_millis(u64::from(millis))).await; diff --git a/veilid-core/src/lib.rs b/veilid-core/src/lib.rs index e302af03..8e4b54f6 100644 --- a/veilid-core/src/lib.rs +++ b/veilid-core/src/lib.rs @@ -61,8 +61,14 @@ pub fn veilid_version() -> (u32, u32, u32) { #[cfg(target_os = "android")] pub use intf::utils::android::{veilid_core_setup_android, veilid_core_setup_android_no_log}; -pub static DEFAULT_LOG_IGNORE_LIST: [&str; 12] = [ +pub static DEFAULT_LOG_IGNORE_LIST: [&str; 18] = [ "mio", + "h2", + "hyper", + "tower", + "tonic", + "tokio_util", + "want", "serial_test", "async_std", "async_io", diff --git a/veilid-core/src/network_manager/connection_table.rs b/veilid-core/src/network_manager/connection_table.rs index f51dd73f..d526a383 100644 --- a/veilid-core/src/network_manager/connection_table.rs +++ b/veilid-core/src/network_manager/connection_table.rs @@ -46,6 +46,7 @@ impl ConnectionTable { let mut unord = FuturesUnordered::new(); for table in &mut self.conn_by_descriptor { for (_, v) in table.drain() { + trace!("connection table join: {:?}", v); unord.push(v); } } diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 31ac6a5f..f7d02974 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -319,9 +319,9 @@ impl NetworkManager { let components = self.inner.lock().components.clone(); if let Some(components) = components { components.net.shutdown().await; - components.connection_manager.shutdown().await; components.rpc_processor.shutdown().await; components.receipt_manager.shutdown().await; + components.connection_manager.shutdown().await; } // reset the state diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index f8dfd788..36518d2e 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -561,7 +561,7 @@ impl Network { // Drop the stop drop(inner.stop_source.take()); } - debug!("stopping {} low level network tasks", unord.len(),); + debug!("stopping {} low level network tasks", unord.len()); // Wait for everything to stop while unord.next().await.is_some() {} diff --git a/veilid-core/src/network_manager/native/network_tcp.rs b/veilid-core/src/network_manager/native/network_tcp.rs index 355baf3d..d4050345 100644 --- a/veilid-core/src/network_manager/native/network_tcp.rs +++ b/veilid-core/src/network_manager/native/network_tcp.rs @@ -208,6 +208,7 @@ impl Network { if #[cfg(feature="rt-async-std")] { let listener = TcpListener::from(std_listener); } else if #[cfg(feature="rt-tokio")] { + std_listener.set_nonblocking(true).expect("failed to set nonblocking"); let listener = TcpListener::from_std(std_listener).map_err(map_to_string)?; } } diff --git a/veilid-core/src/network_manager/native/network_udp.rs b/veilid-core/src/network_manager/native/network_udp.rs index 2c2a1bc2..3f6c8570 100644 --- a/veilid-core/src/network_manager/native/network_udp.rs +++ b/veilid-core/src/network_manager/native/network_udp.rs @@ -23,7 +23,7 @@ impl Network { // Run thread task to process stream of messages let this = self.clone(); - let jh = spawn_with_local_set(async move { + let jh = spawn(async move { trace!("UDP listener task spawned"); // Collect all our protocol handlers into a vector @@ -49,7 +49,7 @@ impl Network { for ph in protocol_handlers { let network_manager = network_manager.clone(); let stop_token = stop_token.clone(); - let jh = intf::spawn_local(async move { + let ph_future = async move { let mut data = vec![0u8; 65536]; loop { @@ -84,26 +84,18 @@ impl Network { } } } - }); + }; - protocol_handlers_unordered.push(jh); + protocol_handlers_unordered.push(ph_future); } // Now we wait for join handles to exit, // if any error out it indicates an error needing // us to completely restart the network - loop { - match protocol_handlers_unordered.next().await { - Some(v) => { - // true = stopped, false = errored - if !v { - // If any protocol handler fails, our socket died and we need to restart the network - this.inner.lock().network_needs_restart = true; - } - } - None => { - // All protocol handlers exited - break; - } + while let Some(v) = protocol_handlers_unordered.next().await { + // true = stopped, false = errored + if !v { + // If any protocol handler fails, our socket died and we need to restart the network + this.inner.lock().network_needs_restart = true; } } @@ -138,6 +130,7 @@ impl Network { if #[cfg(feature="rt-async-std")] { let udp_socket = UdpSocket::from(std_udp_socket); } else if #[cfg(feature="rt-tokio")] { + std_udp_socket.set_nonblocking(true).expect("failed to set nonblocking"); let udp_socket = UdpSocket::from_std(std_udp_socket).map_err(map_to_string)?; } } @@ -158,6 +151,7 @@ impl Network { if #[cfg(feature="rt-async-std")] { let udp_socket = UdpSocket::from(std_udp_socket); } else if #[cfg(feature="rt-tokio")] { + std_udp_socket.set_nonblocking(true).expect("failed to set nonblocking"); let udp_socket = UdpSocket::from_std(std_udp_socket).map_err(map_to_string)?; } } @@ -184,6 +178,7 @@ impl Network { if #[cfg(feature="rt-async-std")] { let udp_socket = UdpSocket::from(std_udp_socket); } else if #[cfg(feature="rt-tokio")] { + std_udp_socket.set_nonblocking(true).expect("failed to set nonblocking"); let udp_socket = UdpSocket::from_std(std_udp_socket).map_err(map_to_string)?; } } diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index ea508670..61af8c86 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -137,7 +137,7 @@ impl NetworkConnection { let local_stop_token = stop_source.token(); // Spawn connection processor and pass in protocol connection - let processor = intf::spawn_local(Self::process_connection( + let processor = intf::spawn(Self::process_connection( connection_manager, local_stop_token, manager_stop_token, @@ -355,8 +355,20 @@ impl Future for NetworkConnection { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { + let mut pending = 0usize; + + // Process all sub-futures, nulling them out when they return ready if let Some(mut processor) = self.processor.as_mut() { - Pin::new(&mut processor).poll(cx) + if Pin::new(&mut processor).poll(cx).is_ready() { + self.processor = None; + } else { + pending += 1 + } + } + + // Any sub-futures pending? + if pending > 0 { + task::Poll::Pending } else { task::Poll::Ready(()) } diff --git a/veilid-core/src/routing_table/tasks.rs b/veilid-core/src/routing_table/tasks.rs index da053095..a101f4bc 100644 --- a/veilid-core/src/routing_table/tasks.rs +++ b/veilid-core/src/routing_table/tasks.rs @@ -369,7 +369,7 @@ impl RoutingTable { Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { if v.with(|e| e.needs_ping(&k, cur_ts, relay_node_id)) { let nr = NodeRef::new(self.clone(), k, v, None); - unord.push(intf::spawn_local(rpc.clone().rpc_call_status(nr))); + unord.push(intf::spawn(rpc.clone().rpc_call_status(nr))); } Option::<()>::None }); diff --git a/veilid-core/src/xx/must_join_handle.rs b/veilid-core/src/xx/must_join_handle.rs index 2b96e266..0f90de3a 100644 --- a/veilid-core/src/xx/must_join_handle.rs +++ b/veilid-core/src/xx/must_join_handle.rs @@ -59,12 +59,25 @@ impl Future for MustJoinHandle { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match Pin::new(self.join_handle.as_mut().unwrap()).poll(cx) { Poll::Ready(t) => { + if self.completed { + panic!("should not poll completed join handle"); + } self.completed = true; cfg_if! { if #[cfg(feature="rt-async-std")] { Poll::Ready(t) } else if #[cfg(feature="rt-tokio")] { - Poll::Ready(t.unwrap()) + match t { + Ok(t) => Poll::Ready(t), + Err(e) => { + if e.is_panic() { + // Resume the panic on the main task + std::panic::resume_unwind(e.into_panic()); + } else { + panic!("join error was not a panic, should not poll after abort"); + } + } + } }else if #[cfg(target_arch = "wasm32")] { Poll::Ready(t) } else { diff --git a/veilid-core/src/xx/must_join_single_future.rs b/veilid-core/src/xx/must_join_single_future.rs index 1fa9556b..4687fd24 100644 --- a/veilid-core/src/xx/must_join_single_future.rs +++ b/veilid-core/src/xx/must_join_single_future.rs @@ -202,7 +202,7 @@ cfg_if! { } // Run if we should do that if run { - self.unlock(Some(intf::spawn_with_local_set(future))); + self.unlock(Some(intf::spawn(future))); } // Return the prior result if we have one Ok((out, run)) diff --git a/veilid-server/Cargo.toml b/veilid-server/Cargo.toml index a36062d6..277bb1b8 100644 --- a/veilid-server/Cargo.toml +++ b/veilid-server/Cargo.toml @@ -13,7 +13,7 @@ path = "src/main.rs" [features] default = [ "rt-tokio" ] rt-async-std = [ "veilid-core/rt-async-std", "async-std", "opentelemetry/rt-async-std", "opentelemetry-otlp/grpc-sys"] -rt-tokio = [ "veilid-core/rt-tokio", "tokio", "opentelemetry/rt-tokio"] +rt-tokio = [ "veilid-core/rt-tokio", "tokio", "tokio-stream", "tokio-util", "opentelemetry/rt-tokio"] tracking = ["veilid-core/tracking"] [dependencies] @@ -28,6 +28,8 @@ opentelemetry-semantic-conventions = "^0" clap = "^3" async-std = { version = "^1", features = ["unstable"], optional = true } tokio = { version = "^1", features = ["full"], optional = true } +tokio-stream = { version = "^0", features = ["net"], optional = true } +tokio-util = { version = "^0", features = ["compat"], optional = true} async-tungstenite = { version = "^0", features = ["async-tls"] } directories = "^4" capnp = "^0" @@ -47,6 +49,7 @@ bugsalot = "^0" flume = { version = "^0", features = ["async"] } rpassword = "^6" hostname = "^0" +stop-token = { version = "^0", default-features = false } [target.'cfg(windows)'.dependencies] windows-service = "^0" diff --git a/veilid-server/src/client_api.rs b/veilid-server/src/client_api.rs index 93aa9f4f..cd3310d2 100644 --- a/veilid-server/src/client_api.rs +++ b/veilid-server/src/client_api.rs @@ -2,16 +2,17 @@ use crate::tools::*; use crate::veilid_client_capnp::*; use capnp::capability::Promise; use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem}; +use cfg_if::*; use failure::*; -use futures::io::AsyncReadExt; use futures::FutureExt as FuturesFutureExt; use futures::StreamExt; use std::cell::RefCell; use std::collections::HashMap; use std::net::SocketAddr; use std::rc::Rc; +use stop_token::future::FutureExt; +use stop_token::*; use tracing::*; -use veilid_core::xx::Eventual; use veilid_core::*; #[derive(Fail, Debug)] @@ -236,7 +237,7 @@ type ClientApiAllFuturesJoinHandle = struct ClientApiInner { veilid_api: veilid_core::VeilidAPI, registration_map: Rc>, - stop: Eventual, + stop: Option, join_handle: Option, } @@ -251,7 +252,7 @@ impl ClientApi { inner: RefCell::new(ClientApiInner { veilid_api, registration_map: Rc::new(RefCell::new(RegistrationMap::new())), - stop: Eventual::new(), + stop: Some(StopSource::new()), join_handle: None, }), }) @@ -266,7 +267,7 @@ impl ClientApi { trace!("ClientApi stop ignored"); return; } - inner.stop.resolve(); + drop(inner.stop.take()); inner.join_handle.take().unwrap() }; trace!("ClientApi::stop: waiting for stop"); @@ -286,15 +287,32 @@ impl ClientApi { debug!("Client API listening on: {:?}", bind_addr); // Process the incoming accept stream - // xxx switch to stoptoken and use stream wrapper for tokio - let mut incoming = listener.incoming(); - let stop = self.inner.borrow().stop.clone(); + cfg_if! { + if #[cfg(feature="rt-async-std")] { + let mut incoming_stream = listener.incoming(); + } else if #[cfg(feature="rt-tokio")] { + let mut incoming_stream = tokio_stream::wrappers::TcpListenerStream::new(listener); + } + } + + let stop_token = self.inner.borrow().stop.as_ref().unwrap().token(); let incoming_loop = async move { - while let Some(stream_result) = stop.instance_none().race(incoming.next()).await { + while let Ok(Some(stream_result)) = + incoming_stream.next().timeout_at(stop_token.clone()).await + { let stream = stream_result?; stream.set_nodelay(true)?; - // xxx use tokio split code too - let (reader, writer) = stream.split(); + cfg_if! { + if #[cfg(feature="rt-async-std")] { + use futures::AsyncReadExt; + let (reader, writer) = stream.split(); + } else if #[cfg(feature="rt-tokio")] { + use tokio_util::compat::*; + let (reader, writer) = stream.into_split(); + let reader = reader.compat(); + let writer = writer.compat_write(); + } + } let network = twoparty::VatNetwork::new( reader, writer, diff --git a/veilid-server/src/server.rs b/veilid-server/src/server.rs index 1b573915..6f76ff7e 100644 --- a/veilid-server/src/server.rs +++ b/veilid-server/src/server.rs @@ -154,7 +154,7 @@ pub async fn run_veilid_server_internal( veilid_api.shutdown().await; // Wait for update receiver to exit - update_receiver_jh.await; + let _ = update_receiver_jh.await; out } diff --git a/veilid-server/src/tools.rs b/veilid-server/src/tools.rs index 868ffb5c..6793d342 100644 --- a/veilid-server/src/tools.rs +++ b/veilid-server/src/tools.rs @@ -13,9 +13,9 @@ cfg_if! { pub fn spawn_local + 'static, T: 'static>(f: F) -> JoinHandle { async_std::task::spawn_local(f) } - pub fn spawn_detached_local + 'static, T: 'static>(f: F) { - let _ = async_std::task::spawn_local(f); - } + // pub fn spawn_detached_local + 'static, T: 'static>(f: F) { + // let _ = async_std::task::spawn_local(f); + // } pub use async_std::task::sleep; pub use async_std::future::timeout; pub fn block_on, T>(f: F) -> T { @@ -33,9 +33,9 @@ cfg_if! { pub fn spawn_local + 'static, T: 'static>(f: F) -> JoinHandle { tokio::task::spawn_local(f) } - pub fn spawn_detached_local + 'static, T: 'static>(f: F) { - let _ = tokio::task::spawn_local(f); - } + // pub fn spawn_detached_local + 'static, T: 'static>(f: F) { + // let _ = tokio::task::spawn_local(f); + // } pub use tokio::time::sleep; pub use tokio::time::timeout; pub fn block_on, T>(f: F) -> T { diff --git a/veilid-server/src/unix.rs b/veilid-server/src/unix.rs index 6d5c6da4..8a937d5a 100644 --- a/veilid-server/src/unix.rs +++ b/veilid-server/src/unix.rs @@ -116,7 +116,7 @@ pub fn run_daemon(settings: Settings, _matches: ArgMatches) -> Result<(), String // Terminate the signal stream. handle.close(); - signals_task.await; + let _ = signals_task.await; res }) diff --git a/veilid-server/src/veilid_logs.rs b/veilid-server/src/veilid_logs.rs index f29fe069..6877c9f0 100644 --- a/veilid-server/src/veilid_logs.rs +++ b/veilid-server/src/veilid_logs.rs @@ -64,13 +64,23 @@ impl VeilidLogs { .into(); let grpc_endpoint = settingsr.logging.otlp.grpc_endpoint.name.clone(); + cfg_if! { + if #[cfg(feature="rt-async-std")] { + let exporter = opentelemetry_otlp::new_exporter() + .grpcio() + .with_endpoint(grpc_endpoint); + let batch = opentelemetry::runtime::AsyncStd; + } else if #[cfg(feature="rt-tokio")] { + let exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(format!("http://{}", grpc_endpoint)); + let batch = opentelemetry::runtime::Tokio; + } + } + let tracer = opentelemetry_otlp::new_pipeline() .tracing() - .with_exporter( - opentelemetry_otlp::new_exporter() - .grpcio() - .with_endpoint(grpc_endpoint), - ) + .with_exporter(exporter) .with_trace_config(opentelemetry::sdk::trace::config().with_resource( Resource::new(vec![KeyValue::new( opentelemetry_semantic_conventions::resource::SERVICE_NAME, @@ -82,7 +92,7 @@ impl VeilidLogs { ), )]), )) - .install_batch(opentelemetry::runtime::AsyncStd) + .install_batch(batch) .map_err(|e| format!("failed to install OpenTelemetry tracer: {}", e))?; let ignore_list = ignore_list.clone();