fix tokio

This commit is contained in:
John Smith 2022-06-29 10:13:49 -04:00
parent d3f872eb1f
commit 018d7da429
20 changed files with 115 additions and 54 deletions

3
Cargo.lock generated
View File

@ -5168,7 +5168,10 @@ dependencies = [
"serial_test", "serial_test",
"signal-hook", "signal-hook",
"signal-hook-async-std", "signal-hook-async-std",
"stop-token",
"tokio", "tokio",
"tokio-stream",
"tokio-util 0.6.10",
"tracing", "tracing",
"tracing-appender", "tracing-appender",
"tracing-journald", "tracing-journald",

View File

@ -196,7 +196,6 @@ impl ClientApiConnection {
stream.set_nodelay(true).map_err(map_to_string)?; stream.set_nodelay(true).map_err(map_to_string)?;
// Create the VAT network // Create the VAT network
cfg_if! { cfg_if! {
if #[cfg(feature="rt-async-std")] { if #[cfg(feature="rt-async-std")] {
let (reader, writer) = stream.split(); let (reader, writer) = stream.split();

View File

@ -45,7 +45,7 @@ flume = { version = "^0", features = ["async"] }
enumset = { version= "^1", features = ["serde"] } enumset = { version= "^1", features = ["serde"] }
backtrace = { version = "^0", optional = true } backtrace = { version = "^0", optional = true }
owo-colors = "^3" owo-colors = "^3"
stop-token = "^0" stop-token = { version = "^0", default-features = false }
ed25519-dalek = { version = "^1", default_features = false, features = ["alloc", "u64_backend"] } ed25519-dalek = { version = "^1", default_features = false, features = ["alloc", "u64_backend"] }
x25519-dalek = { package = "x25519-dalek-ng", version = "^1", default_features = false, features = ["u64_backend"] } x25519-dalek = { package = "x25519-dalek-ng", version = "^1", default_features = false, features = ["u64_backend"] }
curve25519-dalek = { package = "curve25519-dalek-ng", version = "^4", default_features = false, features = ["alloc", "u64_backend"] } curve25519-dalek = { package = "curve25519-dalek-ng", version = "^4", default_features = false, features = ["alloc", "u64_backend"] }

View File

@ -61,8 +61,14 @@ pub fn veilid_version() -> (u32, u32, u32) {
#[cfg(target_os = "android")] #[cfg(target_os = "android")]
pub use intf::utils::android::{veilid_core_setup_android, veilid_core_setup_android_no_log}; pub use intf::utils::android::{veilid_core_setup_android, veilid_core_setup_android_no_log};
pub static DEFAULT_LOG_IGNORE_LIST: [&str; 12] = [ pub static DEFAULT_LOG_IGNORE_LIST: [&str; 18] = [
"mio", "mio",
"h2",
"hyper",
"tower",
"tonic",
"tokio_util",
"want",
"serial_test", "serial_test",
"async_std", "async_std",
"async_io", "async_io",

View File

@ -46,6 +46,7 @@ impl ConnectionTable {
let mut unord = FuturesUnordered::new(); let mut unord = FuturesUnordered::new();
for table in &mut self.conn_by_descriptor { for table in &mut self.conn_by_descriptor {
for (_, v) in table.drain() { for (_, v) in table.drain() {
trace!("connection table join: {:?}", v);
unord.push(v); unord.push(v);
} }
} }

View File

@ -319,9 +319,9 @@ impl NetworkManager {
let components = self.inner.lock().components.clone(); let components = self.inner.lock().components.clone();
if let Some(components) = components { if let Some(components) = components {
components.net.shutdown().await; components.net.shutdown().await;
components.connection_manager.shutdown().await;
components.rpc_processor.shutdown().await; components.rpc_processor.shutdown().await;
components.receipt_manager.shutdown().await; components.receipt_manager.shutdown().await;
components.connection_manager.shutdown().await;
} }
// reset the state // reset the state

View File

@ -561,7 +561,7 @@ impl Network {
// Drop the stop // Drop the stop
drop(inner.stop_source.take()); drop(inner.stop_source.take());
} }
debug!("stopping {} low level network tasks", unord.len(),); debug!("stopping {} low level network tasks", unord.len());
// Wait for everything to stop // Wait for everything to stop
while unord.next().await.is_some() {} while unord.next().await.is_some() {}

View File

@ -208,6 +208,7 @@ impl Network {
if #[cfg(feature="rt-async-std")] { if #[cfg(feature="rt-async-std")] {
let listener = TcpListener::from(std_listener); let listener = TcpListener::from(std_listener);
} else if #[cfg(feature="rt-tokio")] { } else if #[cfg(feature="rt-tokio")] {
std_listener.set_nonblocking(true).expect("failed to set nonblocking");
let listener = TcpListener::from_std(std_listener).map_err(map_to_string)?; let listener = TcpListener::from_std(std_listener).map_err(map_to_string)?;
} }
} }

View File

@ -23,7 +23,7 @@ impl Network {
// Run thread task to process stream of messages // Run thread task to process stream of messages
let this = self.clone(); let this = self.clone();
let jh = spawn_with_local_set(async move { let jh = spawn(async move {
trace!("UDP listener task spawned"); trace!("UDP listener task spawned");
// Collect all our protocol handlers into a vector // Collect all our protocol handlers into a vector
@ -49,7 +49,7 @@ impl Network {
for ph in protocol_handlers { for ph in protocol_handlers {
let network_manager = network_manager.clone(); let network_manager = network_manager.clone();
let stop_token = stop_token.clone(); let stop_token = stop_token.clone();
let jh = intf::spawn_local(async move { let ph_future = async move {
let mut data = vec![0u8; 65536]; let mut data = vec![0u8; 65536];
loop { loop {
@ -84,28 +84,20 @@ impl Network {
} }
} }
} }
}); };
protocol_handlers_unordered.push(jh); protocol_handlers_unordered.push(ph_future);
} }
// Now we wait for join handles to exit, // Now we wait for join handles to exit,
// if any error out it indicates an error needing // if any error out it indicates an error needing
// us to completely restart the network // us to completely restart the network
loop { while let Some(v) = protocol_handlers_unordered.next().await {
match protocol_handlers_unordered.next().await {
Some(v) => {
// true = stopped, false = errored // true = stopped, false = errored
if !v { if !v {
// If any protocol handler fails, our socket died and we need to restart the network // If any protocol handler fails, our socket died and we need to restart the network
this.inner.lock().network_needs_restart = true; this.inner.lock().network_needs_restart = true;
} }
} }
None => {
// All protocol handlers exited
break;
}
}
}
trace!("UDP listener task stopped"); trace!("UDP listener task stopped");
}); });
@ -138,6 +130,7 @@ impl Network {
if #[cfg(feature="rt-async-std")] { if #[cfg(feature="rt-async-std")] {
let udp_socket = UdpSocket::from(std_udp_socket); let udp_socket = UdpSocket::from(std_udp_socket);
} else if #[cfg(feature="rt-tokio")] { } else if #[cfg(feature="rt-tokio")] {
std_udp_socket.set_nonblocking(true).expect("failed to set nonblocking");
let udp_socket = UdpSocket::from_std(std_udp_socket).map_err(map_to_string)?; let udp_socket = UdpSocket::from_std(std_udp_socket).map_err(map_to_string)?;
} }
} }
@ -158,6 +151,7 @@ impl Network {
if #[cfg(feature="rt-async-std")] { if #[cfg(feature="rt-async-std")] {
let udp_socket = UdpSocket::from(std_udp_socket); let udp_socket = UdpSocket::from(std_udp_socket);
} else if #[cfg(feature="rt-tokio")] { } else if #[cfg(feature="rt-tokio")] {
std_udp_socket.set_nonblocking(true).expect("failed to set nonblocking");
let udp_socket = UdpSocket::from_std(std_udp_socket).map_err(map_to_string)?; let udp_socket = UdpSocket::from_std(std_udp_socket).map_err(map_to_string)?;
} }
} }
@ -184,6 +178,7 @@ impl Network {
if #[cfg(feature="rt-async-std")] { if #[cfg(feature="rt-async-std")] {
let udp_socket = UdpSocket::from(std_udp_socket); let udp_socket = UdpSocket::from(std_udp_socket);
} else if #[cfg(feature="rt-tokio")] { } else if #[cfg(feature="rt-tokio")] {
std_udp_socket.set_nonblocking(true).expect("failed to set nonblocking");
let udp_socket = UdpSocket::from_std(std_udp_socket).map_err(map_to_string)?; let udp_socket = UdpSocket::from_std(std_udp_socket).map_err(map_to_string)?;
} }
} }

View File

@ -137,7 +137,7 @@ impl NetworkConnection {
let local_stop_token = stop_source.token(); let local_stop_token = stop_source.token();
// Spawn connection processor and pass in protocol connection // Spawn connection processor and pass in protocol connection
let processor = intf::spawn_local(Self::process_connection( let processor = intf::spawn(Self::process_connection(
connection_manager, connection_manager,
local_stop_token, local_stop_token,
manager_stop_token, manager_stop_token,
@ -355,8 +355,20 @@ impl Future for NetworkConnection {
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
let mut pending = 0usize;
// Process all sub-futures, nulling them out when they return ready
if let Some(mut processor) = self.processor.as_mut() { if let Some(mut processor) = self.processor.as_mut() {
Pin::new(&mut processor).poll(cx) if Pin::new(&mut processor).poll(cx).is_ready() {
self.processor = None;
} else {
pending += 1
}
}
// Any sub-futures pending?
if pending > 0 {
task::Poll::Pending
} else { } else {
task::Poll::Ready(()) task::Poll::Ready(())
} }

View File

@ -369,7 +369,7 @@ impl RoutingTable {
Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| {
if v.with(|e| e.needs_ping(&k, cur_ts, relay_node_id)) { if v.with(|e| e.needs_ping(&k, cur_ts, relay_node_id)) {
let nr = NodeRef::new(self.clone(), k, v, None); let nr = NodeRef::new(self.clone(), k, v, None);
unord.push(intf::spawn_local(rpc.clone().rpc_call_status(nr))); unord.push(intf::spawn(rpc.clone().rpc_call_status(nr)));
} }
Option::<()>::None Option::<()>::None
}); });

View File

@ -59,12 +59,25 @@ impl<T: 'static> Future for MustJoinHandle<T> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(self.join_handle.as_mut().unwrap()).poll(cx) { match Pin::new(self.join_handle.as_mut().unwrap()).poll(cx) {
Poll::Ready(t) => { Poll::Ready(t) => {
if self.completed {
panic!("should not poll completed join handle");
}
self.completed = true; self.completed = true;
cfg_if! { cfg_if! {
if #[cfg(feature="rt-async-std")] { if #[cfg(feature="rt-async-std")] {
Poll::Ready(t) Poll::Ready(t)
} else if #[cfg(feature="rt-tokio")] { } else if #[cfg(feature="rt-tokio")] {
Poll::Ready(t.unwrap()) match t {
Ok(t) => Poll::Ready(t),
Err(e) => {
if e.is_panic() {
// Resume the panic on the main task
std::panic::resume_unwind(e.into_panic());
} else {
panic!("join error was not a panic, should not poll after abort");
}
}
}
}else if #[cfg(target_arch = "wasm32")] { }else if #[cfg(target_arch = "wasm32")] {
Poll::Ready(t) Poll::Ready(t)
} else { } else {

View File

@ -202,7 +202,7 @@ cfg_if! {
} }
// Run if we should do that // Run if we should do that
if run { if run {
self.unlock(Some(intf::spawn_with_local_set(future))); self.unlock(Some(intf::spawn(future)));
} }
// Return the prior result if we have one // Return the prior result if we have one
Ok((out, run)) Ok((out, run))

View File

@ -13,7 +13,7 @@ path = "src/main.rs"
[features] [features]
default = [ "rt-tokio" ] default = [ "rt-tokio" ]
rt-async-std = [ "veilid-core/rt-async-std", "async-std", "opentelemetry/rt-async-std", "opentelemetry-otlp/grpc-sys"] rt-async-std = [ "veilid-core/rt-async-std", "async-std", "opentelemetry/rt-async-std", "opentelemetry-otlp/grpc-sys"]
rt-tokio = [ "veilid-core/rt-tokio", "tokio", "opentelemetry/rt-tokio"] rt-tokio = [ "veilid-core/rt-tokio", "tokio", "tokio-stream", "tokio-util", "opentelemetry/rt-tokio"]
tracking = ["veilid-core/tracking"] tracking = ["veilid-core/tracking"]
[dependencies] [dependencies]
@ -28,6 +28,8 @@ opentelemetry-semantic-conventions = "^0"
clap = "^3" clap = "^3"
async-std = { version = "^1", features = ["unstable"], optional = true } async-std = { version = "^1", features = ["unstable"], optional = true }
tokio = { version = "^1", features = ["full"], optional = true } tokio = { version = "^1", features = ["full"], optional = true }
tokio-stream = { version = "^0", features = ["net"], optional = true }
tokio-util = { version = "^0", features = ["compat"], optional = true}
async-tungstenite = { version = "^0", features = ["async-tls"] } async-tungstenite = { version = "^0", features = ["async-tls"] }
directories = "^4" directories = "^4"
capnp = "^0" capnp = "^0"
@ -47,6 +49,7 @@ bugsalot = "^0"
flume = { version = "^0", features = ["async"] } flume = { version = "^0", features = ["async"] }
rpassword = "^6" rpassword = "^6"
hostname = "^0" hostname = "^0"
stop-token = { version = "^0", default-features = false }
[target.'cfg(windows)'.dependencies] [target.'cfg(windows)'.dependencies]
windows-service = "^0" windows-service = "^0"

View File

@ -2,16 +2,17 @@ use crate::tools::*;
use crate::veilid_client_capnp::*; use crate::veilid_client_capnp::*;
use capnp::capability::Promise; use capnp::capability::Promise;
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem}; use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};
use cfg_if::*;
use failure::*; use failure::*;
use futures::io::AsyncReadExt;
use futures::FutureExt as FuturesFutureExt; use futures::FutureExt as FuturesFutureExt;
use futures::StreamExt; use futures::StreamExt;
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::rc::Rc; use std::rc::Rc;
use stop_token::future::FutureExt;
use stop_token::*;
use tracing::*; use tracing::*;
use veilid_core::xx::Eventual;
use veilid_core::*; use veilid_core::*;
#[derive(Fail, Debug)] #[derive(Fail, Debug)]
@ -236,7 +237,7 @@ type ClientApiAllFuturesJoinHandle =
struct ClientApiInner { struct ClientApiInner {
veilid_api: veilid_core::VeilidAPI, veilid_api: veilid_core::VeilidAPI,
registration_map: Rc<RefCell<RegistrationMap>>, registration_map: Rc<RefCell<RegistrationMap>>,
stop: Eventual, stop: Option<StopSource>,
join_handle: Option<ClientApiAllFuturesJoinHandle>, join_handle: Option<ClientApiAllFuturesJoinHandle>,
} }
@ -251,7 +252,7 @@ impl ClientApi {
inner: RefCell::new(ClientApiInner { inner: RefCell::new(ClientApiInner {
veilid_api, veilid_api,
registration_map: Rc::new(RefCell::new(RegistrationMap::new())), registration_map: Rc::new(RefCell::new(RegistrationMap::new())),
stop: Eventual::new(), stop: Some(StopSource::new()),
join_handle: None, join_handle: None,
}), }),
}) })
@ -266,7 +267,7 @@ impl ClientApi {
trace!("ClientApi stop ignored"); trace!("ClientApi stop ignored");
return; return;
} }
inner.stop.resolve(); drop(inner.stop.take());
inner.join_handle.take().unwrap() inner.join_handle.take().unwrap()
}; };
trace!("ClientApi::stop: waiting for stop"); trace!("ClientApi::stop: waiting for stop");
@ -286,15 +287,32 @@ impl ClientApi {
debug!("Client API listening on: {:?}", bind_addr); debug!("Client API listening on: {:?}", bind_addr);
// Process the incoming accept stream // Process the incoming accept stream
// xxx switch to stoptoken and use stream wrapper for tokio cfg_if! {
let mut incoming = listener.incoming(); if #[cfg(feature="rt-async-std")] {
let stop = self.inner.borrow().stop.clone(); let mut incoming_stream = listener.incoming();
} else if #[cfg(feature="rt-tokio")] {
let mut incoming_stream = tokio_stream::wrappers::TcpListenerStream::new(listener);
}
}
let stop_token = self.inner.borrow().stop.as_ref().unwrap().token();
let incoming_loop = async move { let incoming_loop = async move {
while let Some(stream_result) = stop.instance_none().race(incoming.next()).await { while let Ok(Some(stream_result)) =
incoming_stream.next().timeout_at(stop_token.clone()).await
{
let stream = stream_result?; let stream = stream_result?;
stream.set_nodelay(true)?; stream.set_nodelay(true)?;
// xxx use tokio split code too cfg_if! {
if #[cfg(feature="rt-async-std")] {
use futures::AsyncReadExt;
let (reader, writer) = stream.split(); let (reader, writer) = stream.split();
} else if #[cfg(feature="rt-tokio")] {
use tokio_util::compat::*;
let (reader, writer) = stream.into_split();
let reader = reader.compat();
let writer = writer.compat_write();
}
}
let network = twoparty::VatNetwork::new( let network = twoparty::VatNetwork::new(
reader, reader,
writer, writer,

View File

@ -154,7 +154,7 @@ pub async fn run_veilid_server_internal(
veilid_api.shutdown().await; veilid_api.shutdown().await;
// Wait for update receiver to exit // Wait for update receiver to exit
update_receiver_jh.await; let _ = update_receiver_jh.await;
out out
} }

View File

@ -13,9 +13,9 @@ cfg_if! {
pub fn spawn_local<F: Future<Output = T> + 'static, T: 'static>(f: F) -> JoinHandle<T> { pub fn spawn_local<F: Future<Output = T> + 'static, T: 'static>(f: F) -> JoinHandle<T> {
async_std::task::spawn_local(f) async_std::task::spawn_local(f)
} }
pub fn spawn_detached_local<F: Future<Output = T> + 'static, T: 'static>(f: F) { // pub fn spawn_detached_local<F: Future<Output = T> + 'static, T: 'static>(f: F) {
let _ = async_std::task::spawn_local(f); // let _ = async_std::task::spawn_local(f);
} // }
pub use async_std::task::sleep; pub use async_std::task::sleep;
pub use async_std::future::timeout; pub use async_std::future::timeout;
pub fn block_on<F: Future<Output = T>, T>(f: F) -> T { pub fn block_on<F: Future<Output = T>, T>(f: F) -> T {
@ -33,9 +33,9 @@ cfg_if! {
pub fn spawn_local<F: Future<Output = T> + 'static, T: 'static>(f: F) -> JoinHandle<T> { pub fn spawn_local<F: Future<Output = T> + 'static, T: 'static>(f: F) -> JoinHandle<T> {
tokio::task::spawn_local(f) tokio::task::spawn_local(f)
} }
pub fn spawn_detached_local<F: Future<Output = T> + 'static, T: 'static>(f: F) { // pub fn spawn_detached_local<F: Future<Output = T> + 'static, T: 'static>(f: F) {
let _ = tokio::task::spawn_local(f); // let _ = tokio::task::spawn_local(f);
} // }
pub use tokio::time::sleep; pub use tokio::time::sleep;
pub use tokio::time::timeout; pub use tokio::time::timeout;
pub fn block_on<F: Future<Output = T>, T>(f: F) -> T { pub fn block_on<F: Future<Output = T>, T>(f: F) -> T {

View File

@ -116,7 +116,7 @@ pub fn run_daemon(settings: Settings, _matches: ArgMatches) -> Result<(), String
// Terminate the signal stream. // Terminate the signal stream.
handle.close(); handle.close();
signals_task.await; let _ = signals_task.await;
res res
}) })

View File

@ -64,13 +64,23 @@ impl VeilidLogs {
.into(); .into();
let grpc_endpoint = settingsr.logging.otlp.grpc_endpoint.name.clone(); let grpc_endpoint = settingsr.logging.otlp.grpc_endpoint.name.clone();
cfg_if! {
if #[cfg(feature="rt-async-std")] {
let exporter = opentelemetry_otlp::new_exporter()
.grpcio()
.with_endpoint(grpc_endpoint);
let batch = opentelemetry::runtime::AsyncStd;
} else if #[cfg(feature="rt-tokio")] {
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(format!("http://{}", grpc_endpoint));
let batch = opentelemetry::runtime::Tokio;
}
}
let tracer = opentelemetry_otlp::new_pipeline() let tracer = opentelemetry_otlp::new_pipeline()
.tracing() .tracing()
.with_exporter( .with_exporter(exporter)
opentelemetry_otlp::new_exporter()
.grpcio()
.with_endpoint(grpc_endpoint),
)
.with_trace_config(opentelemetry::sdk::trace::config().with_resource( .with_trace_config(opentelemetry::sdk::trace::config().with_resource(
Resource::new(vec![KeyValue::new( Resource::new(vec![KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME, opentelemetry_semantic_conventions::resource::SERVICE_NAME,
@ -82,7 +92,7 @@ impl VeilidLogs {
), ),
)]), )]),
)) ))
.install_batch(opentelemetry::runtime::AsyncStd) .install_batch(batch)
.map_err(|e| format!("failed to install OpenTelemetry tracer: {}", e))?; .map_err(|e| format!("failed to install OpenTelemetry tracer: {}", e))?;
let ignore_list = ignore_list.clone(); let ignore_list = ignore_list.clone();