diff --git a/Cargo.lock b/Cargo.lock index 556f52af..22646141 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -845,27 +845,6 @@ version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13e2d432d1601d61d1e11140d04e9d239b5cf7316fa1106523c3d86eea19c29d" -[[package]] -name = "capnp-futures" -version = "0.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71d520e0af228b92de357f230f4987ee4f9786f2b8aa24b9cfe53f5b11c17198" -dependencies = [ - "capnp", - "futures", -] - -[[package]] -name = "capnp-rpc" -version = "0.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ab8e869783e491cbcc350427a5e775aa4d8a1deaa5198d74332957cfa430779" -dependencies = [ - "capnp", - "capnp-futures", - "futures", -] - [[package]] name = "capnpc" version = "0.17.1" @@ -5927,6 +5906,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "triomphe" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1ee9bd9239c339d714d657fac840c6d2a4f9c45f4f9ec7b0975113458be78db" +dependencies = [ + "serde", + "stable_deref_trait", +] + [[package]] name = "trust-dns-proto" version = "0.22.0" @@ -6161,9 +6150,6 @@ dependencies = [ "async-std", "async-tungstenite 0.8.0", "bugsalot", - "capnp", - "capnp-rpc", - "capnpc", "cfg-if 1.0.0", "clap 3.2.25", "config", @@ -6366,6 +6352,7 @@ dependencies = [ "tracing-subscriber", "url", "veilid-core", + "wg", "windows-service", ] @@ -6676,6 +6663,17 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "wg" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f390449c16e0679435fc97a6b49d24e67f09dd05fea1de54db1b60902896d273" +dependencies = [ + "atomic-waker", + "parking_lot 0.12.1", + "triomphe", +] + [[package]] name = "which" version = "4.4.0" diff --git a/veilid-cli/Cargo.toml b/veilid-cli/Cargo.toml index f98dd173..cc58e0d3 100644 --- a/veilid-cli/Cargo.toml +++ b/veilid-cli/Cargo.toml @@ -3,7 +3,6 @@ name = "veilid-cli" version = "0.1.0" authors = ["John Smith "] edition = "2021" -build = "build.rs" license = "LGPL-2.0-or-later OR MPL-2.0 OR (MIT AND BSD-3-Clause)" [[bin]] @@ -36,8 +35,6 @@ serde = "^1" serde_derive = "^1" parking_lot = "^0" cfg-if = "^1" -capnp = "^0" -capnp-rpc = "^0" config = { version = "^0", features = ["yaml"] } bugsalot = { git = "https://github.com/crioux/bugsalot.git" } flexi_logger = { version = "^0", features = ["use_chrono_for_offset"] } @@ -49,6 +46,3 @@ json = "^0" [dev-dependencies] serial_test = "^0" - -[build-dependencies] -capnpc = "^0" diff --git a/veilid-cli/build.rs b/veilid-cli/build.rs deleted file mode 100644 index fa57d860..00000000 --- a/veilid-cli/build.rs +++ /dev/null @@ -1,7 +0,0 @@ -fn main() { - ::capnpc::CompilerCommand::new() - .file("../veilid-server/proto/veilid-client.capnp") - .src_prefix("../veilid-server/") - .run() - .expect("compiling schema"); -} diff --git a/veilid-cli/src/client_api_connection.rs b/veilid-cli/src/client_api_connection.rs index b2681257..bdabcb96 100644 --- a/veilid-cli/src/client_api_connection.rs +++ b/veilid-cli/src/client_api_connection.rs @@ -1,8 +1,5 @@ use crate::command_processor::*; use crate::tools::*; -use crate::veilid_client_capnp::*; -use capnp::capability::Promise; -use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, Disconnector, RpcSystem}; use futures::future::FutureExt; use serde::de::DeserializeOwned; use std::cell::RefCell; @@ -11,25 +8,6 @@ use std::rc::Rc; use veilid_core::tools::*; use veilid_core::*; -macro_rules! capnp_failed { - ($ex:expr) => {{ - let msg = format!("Capnp Error: {}", $ex); - error!("{}", msg); - Promise::err(capnp::Error::failed(msg)) - }}; -} - -macro_rules! pry_result { - ($ex:expr) => { - match $ex { - Ok(v) => v, - Err(e) => { - return capnp_failed!(e); - } - } - }; -} - fn map_to_internal_error(e: T) -> VeilidAPIError { VeilidAPIError::Internal { message: e.to_string(), diff --git a/veilid-cli/src/main.rs b/veilid-cli/src/main.rs index 9239478f..73ac351d 100644 --- a/veilid-cli/src/main.rs +++ b/veilid-cli/src/main.rs @@ -18,11 +18,6 @@ mod settings; mod tools; mod ui; -#[allow(clippy::all)] -pub mod veilid_client_capnp { - include!(concat!(env!("OUT_DIR"), "/proto/veilid_client_capnp.rs")); -} - fn parse_command_line(default_config_path: &OsStr) -> Result { let matches = Command::new("veilid-cli") .version("0.1") diff --git a/veilid-core/src/veilid_api/json_api/mod.rs b/veilid-core/src/veilid_api/json_api/mod.rs index 01257559..4e0c4bde 100644 --- a/veilid-core/src/veilid_api/json_api/mod.rs +++ b/veilid-core/src/veilid_api/json_api/mod.rs @@ -16,10 +16,10 @@ pub use process::*; pub struct Request { /// Operation Id (pairs with Response, or empty if unidirectional) #[serde(default)] - id: u32, + pub id: u32, /// The request operation variant #[serde(flatten)] - op: RequestOp, + pub op: RequestOp, } #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] @@ -33,15 +33,18 @@ pub enum RecvMessage { pub struct Response { /// Operation Id (pairs with Request, or empty if unidirectional) #[serde(default)] - id: u32, + pub id: u32, /// The response operation variant #[serde(flatten)] - op: ResponseOp, + pub op: ResponseOp, } #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] #[serde(tag = "op")] pub enum RequestOp { + Control { + args: Vec, + }, GetState, Attach, Detach, @@ -131,6 +134,10 @@ pub struct NewPrivateRouteResult { #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] #[serde(tag = "op")] pub enum ResponseOp { + Control { + #[serde(flatten)] + result: ApiResult, + }, GetState { #[serde(flatten)] result: ApiResult, diff --git a/veilid-core/src/veilid_api/json_api/process.rs b/veilid-core/src/veilid_api/json_api/process.rs index 15304f84..84eb97eb 100644 --- a/veilid-core/src/veilid_api/json_api/process.rs +++ b/veilid-core/src/veilid_api/json_api/process.rs @@ -1,7 +1,7 @@ use super::*; use futures_util::FutureExt; -fn to_json_api_result( +pub fn to_json_api_result( r: VeilidAPIResult, ) -> json_api::ApiResult { match r { @@ -10,7 +10,7 @@ fn to_json_api_result( } } -fn to_json_api_result_with_string( +pub fn to_json_api_result_with_string( r: VeilidAPIResult, ) -> json_api::ApiResultWithString { match r { @@ -19,7 +19,7 @@ fn to_json_api_result_with_string( } } -fn to_json_api_result_with_vec_string( +pub fn to_json_api_result_with_vec_string( r: VeilidAPIResult, ) -> json_api::ApiResultWithVecString { match r { @@ -28,14 +28,14 @@ fn to_json_api_result_with_vec_string( } } -fn to_json_api_result_with_vec_u8(r: VeilidAPIResult>) -> json_api::ApiResultWithVecU8 { +pub fn to_json_api_result_with_vec_u8(r: VeilidAPIResult>) -> json_api::ApiResultWithVecU8 { match r { Err(e) => json_api::ApiResultWithVecU8::Err { error: e }, Ok(v) => json_api::ApiResultWithVecU8::Ok { value: v }, } } -fn to_json_api_result_with_vec_vec_u8( +pub fn to_json_api_result_with_vec_vec_u8( r: VeilidAPIResult>>, ) -> json_api::ApiResultWithVecVecU8 { match r { @@ -46,38 +46,45 @@ fn to_json_api_result_with_vec_vec_u8( } } +struct JsonRequestProcessorInner { + routing_contexts: BTreeMap, + table_dbs: BTreeMap, + table_db_transactions: BTreeMap, + crypto_systems: BTreeMap, +} + +#[derive(Clone)] pub struct JsonRequestProcessor { api: VeilidAPI, - routing_contexts: Mutex>, - table_dbs: Mutex>, - table_db_transactions: Mutex>, - crypto_systems: Mutex>, + inner: Arc>, } impl JsonRequestProcessor { pub fn new(api: VeilidAPI) -> Self { Self { api, - routing_contexts: Default::default(), - table_dbs: Default::default(), - table_db_transactions: Default::default(), - crypto_systems: Default::default(), + inner: Arc::new(Mutex::new(JsonRequestProcessorInner { + routing_contexts: Default::default(), + table_dbs: Default::default(), + table_db_transactions: Default::default(), + crypto_systems: Default::default(), + })), } } // Routing Context fn add_routing_context(&self, routing_context: RoutingContext) -> u32 { + let mut inner = self.inner.lock(); let mut next_id: u32 = 1; - let mut rc = self.routing_contexts.lock(); - while rc.contains_key(&next_id) { + while inner.routing_contexts.contains_key(&next_id) { next_id += 1; } - rc.insert(next_id, routing_context); + inner.routing_contexts.insert(next_id, routing_context); next_id } fn lookup_routing_context(&self, id: u32, rc_id: u32) -> Result { - let routing_contexts = self.routing_contexts.lock(); - let Some(routing_context) = routing_contexts.get(&rc_id).cloned() else { + let inner = self.inner.lock(); + let Some(routing_context) = inner.routing_contexts.get(&rc_id).cloned() else { return Err(Response { id, op: ResponseOp::RoutingContext(RoutingContextResponse { @@ -89,8 +96,8 @@ impl JsonRequestProcessor { Ok(routing_context) } fn release_routing_context(&self, id: u32) -> i32 { - let mut rc = self.routing_contexts.lock(); - if rc.remove(&id).is_none() { + let mut inner = self.inner.lock(); + if inner.routing_contexts.remove(&id).is_none() { return 0; } return 1; @@ -98,17 +105,17 @@ impl JsonRequestProcessor { // TableDB fn add_table_db(&self, table_db: TableDB) -> u32 { + let mut inner = self.inner.lock(); let mut next_id: u32 = 1; - let mut rc = self.table_dbs.lock(); - while rc.contains_key(&next_id) { + while inner.table_dbs.contains_key(&next_id) { next_id += 1; } - rc.insert(next_id, table_db); + inner.table_dbs.insert(next_id, table_db); next_id } fn lookup_table_db(&self, id: u32, db_id: u32) -> Result { - let table_dbs = self.table_dbs.lock(); - let Some(table_db) = table_dbs.get(&db_id).cloned() else { + let inner = self.inner.lock(); + let Some(table_db) = inner.table_dbs.get(&db_id).cloned() else { return Err(Response { id, op: ResponseOp::TableDb(TableDbResponse { @@ -120,8 +127,8 @@ impl JsonRequestProcessor { Ok(table_db) } fn release_table_db(&self, id: u32) -> i32 { - let mut rc = self.table_dbs.lock(); - if rc.remove(&id).is_none() { + let mut inner = self.inner.lock(); + if inner.table_dbs.remove(&id).is_none() { return 0; } return 1; @@ -129,12 +136,12 @@ impl JsonRequestProcessor { // TableDBTransaction fn add_table_db_transaction(&self, tdbt: TableDBTransaction) -> u32 { + let mut inner = self.inner.lock(); let mut next_id: u32 = 1; - let mut tdbts = self.table_db_transactions.lock(); - while tdbts.contains_key(&next_id) { + while inner.table_db_transactions.contains_key(&next_id) { next_id += 1; } - tdbts.insert(next_id, tdbt); + inner.table_db_transactions.insert(next_id, tdbt); next_id } fn lookup_table_db_transaction( @@ -142,8 +149,8 @@ impl JsonRequestProcessor { id: u32, tx_id: u32, ) -> Result { - let table_db_transactions = self.table_db_transactions.lock(); - let Some(table_db_transaction) = table_db_transactions.get(&tx_id).cloned() else { + let inner = self.inner.lock(); + let Some(table_db_transaction) = inner.table_db_transactions.get(&tx_id).cloned() else { return Err(Response { id, op: ResponseOp::TableDbTransaction(TableDbTransactionResponse { @@ -155,8 +162,8 @@ impl JsonRequestProcessor { Ok(table_db_transaction) } fn release_table_db_transaction(&self, id: u32) -> i32 { - let mut tdbts = self.table_db_transactions.lock(); - if tdbts.remove(&id).is_none() { + let mut inner = self.inner.lock(); + if inner.table_db_transactions.remove(&id).is_none() { return 0; } return 1; @@ -164,17 +171,17 @@ impl JsonRequestProcessor { // CryptoSystem fn add_crypto_system(&self, csv: CryptoSystemVersion) -> u32 { + let mut inner = self.inner.lock(); let mut next_id: u32 = 1; - let mut crypto_systems = self.crypto_systems.lock(); - while crypto_systems.contains_key(&next_id) { + while inner.crypto_systems.contains_key(&next_id) { next_id += 1; } - crypto_systems.insert(next_id, csv); + inner.crypto_systems.insert(next_id, csv); next_id } fn lookup_crypto_system(&self, id: u32, cs_id: u32) -> Result { - let crypto_systems = self.crypto_systems.lock(); - let Some(crypto_system) = crypto_systems.get(&cs_id).cloned() else { + let inner = self.inner.lock(); + let Some(crypto_system) = inner.crypto_systems.get(&cs_id).cloned() else { return Err(Response { id, op: ResponseOp::CryptoSystem(CryptoSystemResponse { @@ -186,8 +193,8 @@ impl JsonRequestProcessor { Ok(crypto_system) } fn release_crypto_system(&self, id: u32) -> i32 { - let mut crypto_systems = self.crypto_systems.lock(); - if crypto_systems.remove(&id).is_none() { + let mut inner = self.inner.lock(); + if inner.crypto_systems.remove(&id).is_none() { return 0; } return 1; @@ -528,10 +535,15 @@ impl JsonRequestProcessor { } } - pub async fn process_request(&self, request: Request) -> Response { + pub async fn process_request(self, request: Request) -> Response { let id = request.id; let op = match request.op { + RequestOp::Control { args: _args } => ResponseOp::Control { + result: to_json_api_result(VeilidAPIResult::Err(VeilidAPIError::unimplemented( + "control should be handled by veilid-core host application", + ))), + }, RequestOp::GetState => ResponseOp::GetState { result: to_json_api_result(self.api.get_state().await), }, diff --git a/veilid-core/src/veilid_config.rs b/veilid-core/src/veilid_config.rs index 8559ba47..8b3c3e7e 100644 --- a/veilid-core/src/veilid_config.rs +++ b/veilid-core/src/veilid_config.rs @@ -750,7 +750,7 @@ impl VeilidConfig { self.inner.read() } - fn safe_config(&self) -> VeilidConfigInner { + pub fn safe_config(&self) -> VeilidConfigInner { let mut safe_cfg = self.inner.read().clone(); // Remove secrets @@ -773,6 +773,11 @@ impl VeilidConfig { let out = f(&mut editedinner)?; // Validate Self::validate(&mut editedinner)?; + // See if things have changed + if *inner == editedinner { + // No changes, return early + return Ok(out); + } // Commit changes *inner = editedinner.clone(); out diff --git a/veilid-server/Cargo.toml b/veilid-server/Cargo.toml index 57dd2ba1..11317371 100644 --- a/veilid-server/Cargo.toml +++ b/veilid-server/Cargo.toml @@ -10,13 +10,13 @@ name = "veilid-server" path = "src/main.rs" [features] -default = [ "rt-tokio", "veilid-core/default" ] -crypto-test = [ "rt-tokio", "veilid-core/crypto-test"] -crypto-test-none = [ "rt-tokio", "veilid-core/crypto-test-none"] +default = ["rt-tokio", "veilid-core/default"] +crypto-test = ["rt-tokio", "veilid-core/crypto-test"] +crypto-test-none = ["rt-tokio", "veilid-core/crypto-test-none"] -rt-async-std = [ "veilid-core/rt-async-std", "async-std", "opentelemetry/rt-async-std", "opentelemetry-otlp/grpc-sys" ] -rt-tokio = [ "veilid-core/rt-tokio", "tokio", "tokio-stream", "tokio-util", "opentelemetry/rt-tokio", "console-subscriber" ] -tracking = [ "veilid-core/tracking" ] +rt-async-std = ["veilid-core/rt-async-std", "async-std", "opentelemetry/rt-async-std", "opentelemetry-otlp/grpc-sys"] +rt-tokio = ["veilid-core/rt-tokio", "tokio", "tokio-stream", "tokio-util", "opentelemetry/rt-tokio", "console-subscriber"] +tracking = ["veilid-core/tracking"] [dependencies] veilid-core = { path = "../veilid-core", default-features = false } @@ -55,6 +55,7 @@ rpassword = "^6" hostname = "^0" stop-token = { version = "^0", default-features = false } sysinfo = { version = "^0.28.4", default-features = false } +wg = "0.3.2" [target.'cfg(windows)'.dependencies] windows-service = "^0" @@ -70,4 +71,4 @@ nix = "^0" tracing-journald = "^0" [dev-dependencies] -serial_test = "^0" \ No newline at end of file +serial_test = "^0" diff --git a/veilid-server/src/client_api.rs b/veilid-server/src/client_api.rs index b3bc15cc..ededea30 100644 --- a/veilid-server/src/client_api.rs +++ b/veilid-server/src/client_api.rs @@ -1,19 +1,29 @@ use crate::settings::*; use crate::tools::*; -use crate::veilid_client_capnp::*; use crate::veilid_logs::VeilidLogs; use cfg_if::*; -use futures_util::{future::try_join_all, FutureExt as FuturesFutureExt, StreamExt}; -use serde::*; -use std::cell::RefCell; +use futures_util::{future::try_join_all, stream::FuturesUnordered, StreamExt}; + +use parking_lot::Mutex; use std::collections::HashMap; -use std::fmt; use std::net::SocketAddr; +use std::sync::Arc; use stop_token::future::FutureExt; use stop_token::*; use tracing::*; +use veilid_core::tools::*; use veilid_core::*; +use wg::AsyncWaitGroup; +cfg_if! { + if #[cfg(feature="rt-async-std")] { + use async_std::io::prelude::BufReadExt; + use async_std::io::WriteExt; + } else if #[cfg(feature="rt-tokio")] { + use tokio::io::AsyncBufReadExt; + use tokio::io::AsyncWriteExt; + } +} // struct VeilidServerImpl { // veilid_api: veilid_core::VeilidAPI, // veilid_logs: VeilidLogs, @@ -36,50 +46,11 @@ use veilid_core::*; // } // } -// #[instrument(level = "trace", skip_all)] -// fn shutdown( -// &mut self, -// _params: veilid_server::ShutdownParams, -// mut _results: veilid_server::ShutdownResults, -// ) -> Promise<(), ::capnp::Error> { -// trace!("VeilidServerImpl::shutdown"); - -// cfg_if::cfg_if! { -// if #[cfg(windows)] { -// assert!(false, "write me!"); -// } -// else { -// crate::server::shutdown(); -// } -// } - -// Promise::ok(()) -// } - -// #[instrument(level = "trace", skip_all)] -// fn change_log_level( -// &mut self, -// params: veilid_server::ChangeLogLevelParams, -// mut results: veilid_server::ChangeLogLevelResults, -// ) -> Promise<(), ::capnp::Error> { -// trace!("VeilidServerImpl::change_log_level"); - -// let layer = pry!(pry!(params.get()).get_layer()).to_owned(); -// let log_level_json = pry!(pry!(params.get()).get_log_level()).to_owned(); -// let log_level: veilid_core::VeilidConfigLogLevel = -// pry!(veilid_core::deserialize_json(&log_level_json) -// .map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))); - -// let result = self.veilid_logs.change_log_level(layer, log_level); -// encode_api_result(&result, &mut results.get().init_result()); -// Promise::ok(()) -// } // } // --- Client API Server-Side --------------------------------- -type ClientApiAllFuturesJoinHandle = - JoinHandle, Box<(dyn std::error::Error + 'static)>>>; +type ClientApiAllFuturesJoinHandle = MustJoinHandle>>; struct ClientApiInner { veilid_api: veilid_core::VeilidAPI, @@ -87,8 +58,10 @@ struct ClientApiInner { settings: Settings, stop: Option, join_handle: Option, + update_channels: HashMap<(SocketAddr, SocketAddr), flume::Sender>, } +#[derive(Clone)] pub struct ClientApi { inner: Arc>, } @@ -99,23 +72,43 @@ impl ClientApi { veilid_api: veilid_core::VeilidAPI, veilid_logs: VeilidLogs, settings: Settings, - ) -> Rc { - Rc::new(Self { - inner: RefCell::new(ClientApiInner { + ) -> Self { + Self { + inner: Arc::new(Mutex::new(ClientApiInner { veilid_api, veilid_logs, settings, stop: Some(StopSource::new()), join_handle: None, - }), - }) + update_channels: HashMap::new(), + })), + } + } + + #[instrument(level = "trace", skip_all)] + fn shutdown(&self) { + trace!("ClientApi::shutdown"); + + crate::server::shutdown(); + } + + #[instrument(level = "trace", skip_all)] + fn change_log_level( + &self, + layer: String, + log_level: VeilidConfigLogLevel, + ) -> VeilidAPIResult<()> { + trace!("ClientApi::change_log_level"); + + let veilid_logs = self.inner.lock().veilid_logs.clone(); + veilid_logs.change_log_level(layer, log_level) } #[instrument(level = "trace", skip(self))] - pub async fn stop(self: Rc) { + pub async fn stop(&self) { trace!("ClientApi::stop requested"); let jh = { - let mut inner = self.inner.borrow_mut(); + let mut inner = self.inner.lock(); if inner.join_handle.is_none() { trace!("ClientApi stop ignored"); return; @@ -131,10 +124,7 @@ impl ClientApi { } #[instrument(level = "trace", skip(self), err)] - async fn handle_incoming( - self, - bind_addr: SocketAddr, - ) -> Result<(), Box> { + async fn handle_incoming(self, bind_addr: SocketAddr) -> std::io::Result<()> { let listener = TcpListener::bind(bind_addr).await?; debug!("Client API listening on: {:?}", bind_addr); @@ -147,55 +137,249 @@ impl ClientApi { } } + // Make wait group for all incoming connections + let awg = AsyncWaitGroup::new(); + let stop_token = self.inner.lock().stop.as_ref().unwrap().token(); - let incoming_loop = async move { - while let Ok(Some(stream_result)) = - incoming_stream.next().timeout_at(stop_token.clone()).await - { - let stream = stream_result?; - stream.set_nodelay(true)?; - cfg_if! { - if #[cfg(feature="rt-async-std")] { - use futures_util::AsyncReadExt; - let (reader, writer) = stream.split(); - } else if #[cfg(feature="rt-tokio")] { - use tokio_util::compat::*; - let (reader, writer) = stream.into_split(); - let reader = reader.compat(); - let writer = writer.compat_write(); - } + while let Ok(Some(stream_result)) = + incoming_stream.next().timeout_at(stop_token.clone()).await + { + // Get the stream to process + let stream = stream_result?; + stream.set_nodelay(true)?; + + // Increment wait group + awg.add(1); + let t_awg = awg.clone(); + + // Process the connection + spawn(self.clone().handle_connection(stream, t_awg)).detach(); + } + + // Wait for all connections to terminate + awg.wait().await; + + Ok(()) + } + + // Process control messages for the server + async fn process_control(self, args: Vec) -> VeilidAPIResult { + if args.len() == 0 { + apibail_generic!("no control request specified"); + } + if args[0] == "shutdown" { + if args.len() != 1 { + apibail_generic!("wrong number of arguments"); + } + self.shutdown(); + Ok("".to_owned()) + } else if args[0] == "change_log_level" { + if args.len() != 3 { + apibail_generic!("wrong number of arguments"); + } + let log_level: VeilidConfigLogLevel = deserialize_json(&args[2])?; + self.change_log_level(args[1].clone(), log_level)?; + Ok("".to_owned()) + } else if args[0] == "get_server_settings" { + if args.len() != 1 { + apibail_generic!("wrong number of arguments"); + } + let settings = self.inner.lock().settings.clone(); + let settings = &*settings.read(); + let settings_json_string = serialize_json(settings); + let mut settings_json = + json::parse(&settings_json_string).map_err(VeilidAPIError::internal)?; + settings_json["core"]["network"].remove("node_id_secret"); + settings_json["core"]["protected_store"].remove("device_encryption_key_password"); + settings_json["core"]["protected_store"].remove("new_device_encryption_key_password"); + let safe_settings_json = settings_json.to_string(); + Ok(safe_settings_json) + } else if args[0] == "emit_schema" { + if args.len() != 2 { + apibail_generic!("wrong number of arguments"); + } + + let mut schemas = HashMap::::new(); + veilid_core::json_api::emit_schemas(&mut schemas); + + let Some(schema) = schemas.get(&args[1]) else { + apibail_invalid_argument!("invalid schema", "schema", args[1].clone()); + }; + + Ok(schema.clone()) + } else { + apibail_generic!("unknown control message"); + } + } + + pub async fn handle_connection(self, stream: TcpStream, awg: AsyncWaitGroup) { + // Get address of peer + let peer_addr = match stream.peer_addr() { + Ok(v) => v, + Err(e) => { + error!("can't get peer address: {}", e); + return; + } + }; + // Get local address + let local_addr = match stream.local_addr() { + Ok(v) => v, + Err(e) => { + error!("can't get local address: {}", e); + return; + } + }; + // Get connection tuple + let conn_tuple = (local_addr, peer_addr); + + debug!( + "Accepted Client API Connection: {:?} -> {:?}", + peer_addr, local_addr + ); + + // Make stop token to quit when stop() is requested externally + let stop_token = self.inner.lock().stop.as_ref().unwrap().token(); + + // Split into reader and writer halves + // with line buffering on the reader + cfg_if! { + if #[cfg(feature="rt-async-std")] { + use futures_util::AsyncReadExt; + let (reader, mut writer) = stream.split(); + let mut reader = BufReader::new(reader); + } else if #[cfg(feature="rt-tokio")] { + let (reader, mut writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + } + } + + // Make request processor for this connection + let api = self.inner.lock().veilid_api.clone(); + let jrp = json_api::JsonRequestProcessor::new(api); + + // Futures to process unordered + let mut unord = FuturesUnordered::new(); + let (more_futures_tx, more_futures_rx) = flume::unbounded(); + + // Output to serialize + let (responses_tx, responses_rx) = flume::unbounded(); + + // Request receive processor + let this = self.clone(); + let recv_requests_future = async move { + // Start sending updates + this.inner + .lock() + .update_channels + .insert(conn_tuple, responses_tx.clone()); + + let mut line = String::new(); + while let Ok(size) = reader.read_line(&mut line).await { + // Eof? + if size == 0 { + break; } - xxx spawn json_api handler - spawn_local(rpc_system.map(drop)); - } - Ok::<(), Box>(()) - }; + // Put the processing in the async queue + let jrp = jrp.clone(); + let line = line.trim().to_owned(); + // Ignore newlines + if line.len() == 0 { + continue; + } + let responses_tx = responses_tx.clone(); + let this2 = this.clone(); + let process_request = async move { + // Unmarshal NDJSON - newline => json + // (trim all whitespace around input lines just to make things more permissive for API users) + let request: json_api::Request = deserialize_json(&line)?; - incoming_loop.await + // See if this is a control message or a veilid-core message + let response = if let json_api::RequestOp::Control { args } = request.op { + // Process control messages + json_api::Response { + id: request.id, + op: json_api::ResponseOp::Control { + result: json_api::to_json_api_result( + this2.process_control(args).await, + ), + }, + } + } else { + // Process with ndjson api + jrp.clone().process_request(request).await + }; + + // Marshal json + newline => NDJSON + let response_string = + serialize_json(json_api::RecvMessage::Response(response)) + "\n"; + if let Err(e) = responses_tx.send_async(response_string).await { + warn!("response not sent: {}", e) + } + VeilidAPIResult::Ok(()) + }; + if let Err(e) = more_futures_tx + .send_async(system_boxed(process_request)) + .await + { + warn!("request dropped: {}", e) + } + } + + // Stop sending updates + // Will cause send_responses_future to stop because we drop the responses_tx + this.inner.lock().update_channels.remove(&conn_tuple); + + VeilidAPIResult::Ok(()) + }; + unord.push(system_boxed(recv_requests_future)); + + // Response send processor + let send_responses_future = async move { + while let Ok(resp) = responses_rx.recv_async().await { + if let Err(e) = writer.write_all(resp.as_bytes()).await { + error!("failed to write response: {}", e) + } + } + VeilidAPIResult::Ok(()) + }; + unord.push(system_boxed(send_responses_future)); + + // Send and receive until we're done or a stop is requested + while let Ok(Some(r)) = unord.next().timeout_at(stop_token.clone()).await { + match r { + Ok(()) => {} + Err(e) => { + warn!("JSON API Failure: {}", e); + } + } + // Add more futures if we had one that completed + // Allows processing requests in an async fashion + for fut in more_futures_rx.drain() { + unord.push(fut); + } + } + + debug!( + "Closed Client API Connection: {:?} -> {:?}", + peer_addr, local_addr + ); + + awg.done(); } #[instrument(level = "trace", skip(self))] - pub fn handle_update(self: Rc, veilid_update: veilid_core::VeilidUpdate) { - // serialize update - let veilid_update = serialize_json(veilid_update); + pub fn handle_update(&self, veilid_update: veilid_core::VeilidUpdate) { + // serialize update to NDJSON + let veilid_update = serialize_json(json_api::RecvMessage::Update(veilid_update)) + "\n"; // Pass other updates to clients - self.send_request_to_all_clients(|_id, registration| { - match veilid_update - .len() - .try_into() - .map_err(|e| ::capnp::Error::failed(format!("{:?}", e))) - { - Ok(len) => { - let mut request = registration.client.update_request(); - let mut rpc_veilid_update = request.get().init_veilid_update(len); - rpc_veilid_update.push_str(&veilid_update); - Some(request.send()) - } - Err(_) => None, + let inner = self.inner.lock(); + for ch in inner.update_channels.values() { + if let Err(e) = ch.send(veilid_update.clone()) { + eprintln!("failed to send update: {}", e); } - }); + } } #[instrument(level = "trace", skip(self))] @@ -204,6 +388,6 @@ impl ClientApi { .iter() .map(|addr| self.clone().handle_incoming(*addr)); let bind_futures_join = try_join_all(bind_futures); - self.inner.borrow_mut().join_handle = Some(spawn_local(bind_futures_join)); + self.inner.lock().join_handle = Some(spawn(bind_futures_join)); } } diff --git a/veilid-server/src/server.rs b/veilid-server/src/server.rs index 1ffa64f1..2a16f91e 100644 --- a/veilid-server/src/server.rs +++ b/veilid-server/src/server.rs @@ -1,8 +1,6 @@ use crate::client_api; use crate::settings::*; -use crate::tools::*; use crate::veilid_logs::*; -use crate::*; use flume::{unbounded, Receiver, Sender}; use futures_util::select; use futures_util::FutureExt; @@ -11,7 +9,7 @@ use parking_lot::Mutex; use std::sync::Arc; use std::time::{Duration, Instant}; use tracing::*; -use veilid_core::tools::SingleShotEventual; +use veilid_core::tools::*; #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum ServerMode { @@ -140,7 +138,7 @@ pub async fn run_veilid_server_internal( break; } } - sleep(Duration::from_millis(100)).await; + sleep(100).await; } match veilid_api.debug("txtrecord".to_string()).await { Ok(v) => { diff --git a/veilid-server/src/tools.rs b/veilid-server/src/tools.rs index 0e7b51e3..06d87e1d 100644 --- a/veilid-server/src/tools.rs +++ b/veilid-server/src/tools.rs @@ -3,41 +3,43 @@ use core::future::Future; cfg_if! { if #[cfg(feature="rt-async-std")] { - pub use async_std::task::JoinHandle; +// pub use async_std::task::JoinHandle; pub use async_std::net::TcpListener; - //pub use async_std::net::TcpStream; + pub use async_std::net::TcpStream; + pub use async_std::io::BufReader; //pub use async_std::future::TimeoutError; - pub fn spawn + Send + 'static, T: Send + 'static>(f: F) -> JoinHandle { - async_std::task::spawn_local(f) - } - pub fn spawn_local + 'static, T: 'static>(f: F) -> JoinHandle { - async_std::task::spawn_local(f) - } + //pub fn spawn_detached + Send + 'static, T: Send + 'static>(f: F) -> JoinHandle { + //async_std::task::spawn(f) + //} + // pub fn spawn_local + 'static, T: 'static>(f: F) -> JoinHandle { + // async_std::task::spawn_local(f) + // } // pub fn spawn_detached_local + 'static, T: 'static>(f: F) { // let _ = async_std::task::spawn_local(f); // } - pub use async_std::task::sleep; - pub use async_std::future::timeout; + //pub use async_std::task::sleep; + //pub use async_std::future::timeout; pub fn block_on, T>(f: F) -> T { async_std::task::block_on(f) } } else if #[cfg(feature="rt-tokio")] { - pub use tokio::task::JoinHandle; + //pub use tokio::task::JoinHandle; pub use tokio::net::TcpListener; - //pub use tokio::net::TcpStream; + pub use tokio::net::TcpStream; + pub use tokio::io::BufReader; //pub use tokio_util::compat::*; //pub use tokio::time::error::Elapsed as TimeoutError; - pub fn spawn + Send + 'static, T: Send + 'static>(f: F) -> JoinHandle { - tokio::task::spawn(f) - } - pub fn spawn_local + 'static, T: 'static>(f: F) -> JoinHandle { - tokio::task::spawn_local(f) - } + //pub fn spawn_detached + Send + 'static, T: Send + 'static>(f: F) -> JoinHandle { + //tokio::task::spawn(f) + //} + // pub fn spawn_local + 'static, T: 'static>(f: F) -> JoinHandle { + // tokio::task::spawn_local(f) + // } // pub fn spawn_detached_local + 'static, T: 'static>(f: F) { // let _ = tokio::task::spawn_local(f); // } - pub use tokio::time::sleep; - pub use tokio::time::timeout; + //pub use tokio::time::sleep; + //pub use tokio::time::timeout; pub fn block_on, T>(f: F) -> T { let rt = tokio::runtime::Runtime::new().unwrap(); let local = tokio::task::LocalSet::new(); diff --git a/veilid-server/src/unix.rs b/veilid-server/src/unix.rs index 1d2aa389..68f10760 100644 --- a/veilid-server/src/unix.rs +++ b/veilid-server/src/unix.rs @@ -2,13 +2,11 @@ use crate::server::*; use crate::settings::Settings; use crate::tools::*; use crate::veilid_logs::*; -use crate::*; use clap::ArgMatches; use futures_util::StreamExt; use signal_hook::consts::signal::*; use signal_hook_async_std::Signals; -//use std::io::Read; -use tracing::*; +use veilid_core::tools::*; #[instrument(skip(signals))] async fn handle_signals(mut signals: Signals) { @@ -33,24 +31,7 @@ pub fn run_daemon(settings: Settings, _matches: ArgMatches) -> EyreResult<()> { let mut daemon = daemonize::Daemonize::new(); let s = settings.read(); if let Some(pid_file) = s.daemon.pid_file.clone() { - daemon = daemon.pid_file(pid_file.clone()); //.chown_pid_file(true); - // daemon = daemon.exit_action(move || { - // // wait for pid file to exist before exiting parent - // let pid_path = std::path::Path::new(&pid_file); - // loop { - // if let Ok(mut f) = std::fs::File::open(pid_path) { - // let mut s = String::new(); - // if f.read_to_string(&mut s).is_ok() - // && !s.is_empty() - // && s.parse::().is_ok() - // { - // println!("pidfile found"); - // break; - // } - // } - // std::thread::sleep(std::time::Duration::from_millis(100)); - // } - // }) + daemon = daemon.pid_file(pid_file.clone()); } if let Some(chroot) = &s.daemon.chroot { daemon = daemon.chroot(chroot); diff --git a/veilid-tools/src/must_join_handle.rs b/veilid-tools/src/must_join_handle.rs index ff7231f1..a4afd1dc 100644 --- a/veilid-tools/src/must_join_handle.rs +++ b/veilid-tools/src/must_join_handle.rs @@ -16,6 +16,22 @@ impl MustJoinHandle { } } + pub fn detach(mut self) { + cfg_if! { + if #[cfg(feature="rt-async-std")] { + self.join_handle = None; + } else if #[cfg(feature="rt-tokio")] { + self.join_handle = None; + } else if #[cfg(target_arch = "wasm32")] { + self.join_handle.take().detach(); + self.completed = true; + } else { + compile_error!("needs executor implementation") + } + } + self.completed = true; + } + #[allow(unused_mut)] pub async fn abort(mut self) { if !self.completed {