diff --git a/veilid-server/src/cmdline.rs b/veilid-server/src/cmdline.rs index f7617e11..8d317744 100644 --- a/veilid-server/src/cmdline.rs +++ b/veilid-server/src/cmdline.rs @@ -1,317 +1,317 @@ -use crate::settings::*; -use crate::*; -use clap::{Arg, ArgMatches, Command}; -use std::ffi::OsStr; -use std::path::Path; -use std::str::FromStr; -use veilid_core::{DHTKey, DHTKeySecret}; - -fn do_clap_matches(default_config_path: &OsStr) -> Result { - let matches = Command::new("veilid-server") - .version("0.1") - .about("Veilid Server") - .color(clap::ColorChoice::Auto) - .arg( - Arg::new("daemon") - .long("daemon") - .short('d') - .help("Run in daemon mode in the background"), - ) - .arg( - Arg::new("foreground") - .long("foreground") - .short('f') - .conflicts_with("daemon") - .help("Run in the foreground"), - ) - .arg( - Arg::new("config-file") - .short('c') - .long("config-file") - .takes_value(true) - .value_name("FILE") - .default_value_os(default_config_path) - .allow_invalid_utf8(true) - .help("Specify a configuration file to use"), - ) - .arg( - Arg::new("set-config") - .short('s') - .long("set-config") - .takes_value(true) - .multiple_occurrences(true) - .help("Specify configuration value to set (key in dot format, value in json format), eg: logging.api.enabled=true") - ) - .arg( - Arg::new("attach") - .long("attach") - .takes_value(true) - .value_name("BOOL") - .possible_values(&["false", "true"]) - .help("Automatically attach the server to the Veilid network"), - ) - // Dev options - .arg( - Arg::new("debug") - .long("debug") - .help("Turn on debug logging on the terminal"), - ) - .arg( - Arg::new("trace") - .long("trace") - .conflicts_with("debug") - .help("Turn on trace logging on the terminal"), - ) - .arg( - Arg::new("otlp") - .long("otlp") - .takes_value(true) - .value_name("endpoint") - .default_missing_value("localhost:4317") - .help("Turn on OpenTelemetry tracing") - .long_help("This option uses the GRPC OpenTelemetry protocol, not HTTP. The format for the endpoint is host:port, like 'localhost:4317'"), - ) - .arg( - Arg::new("subnode-index") - .long("subnode-index") - .takes_value(true) - .help("Run as an extra daemon on the same machine for testing purposes, specify a number greater than zero to offset the listening ports"), - ) - .arg( - Arg::new("generate-dht-key") - .long("generate-dht-key") - .help("Only generate a new dht key and print it"), - ) - .arg( - Arg::new("set-node-id") - .long("set-node-id") - .takes_value(true) - .value_name("ID") - .help("Set the node id and secret key") - .long_help("To specify both node id and secret key on the command line, use a ID:SECRET syntax with a colon, like:\n zsVXz5aTU98vZxwTcDmvpcnO5g1B2jRO3wpdNiDrRgw:gJzQLmzuBvA-dFvEmLcYvLoO5bh7hzCWFzfpJHapZKg\nIf no colon is used, the node id is specified, and a prompt appears to enter the secret key interactively.") - ) - .arg( - Arg::new("delete-protected-store") - .long("delete-protected-store") - .help("Delete the entire contents of the protected store (DANGER, NO UNDO!)"), - ) - .arg( - Arg::new("delete-table-store") - .long("delete-table-store") - .help("Delete the entire contents of the table store (DANGER, NO UNDO!)"), - ) - .arg( - Arg::new("delete-block-store") - .long("delete-block-store") - .help("Delete the entire contents of the block store (DANGER, NO UNDO!)"), - ) - .arg( - Arg::new("dump-config") - .long("dump-config") - .help("Instead of running the server, print the configuration it would use to the console"), - ) - .arg( - Arg::new("dump-txt-record") - .long("dump-txt-record") - .help("Prints the bootstrap TXT record for this node and then quits") - ) - .arg( - Arg::new("bootstrap") - .long("bootstrap") - .takes_value(true) - .value_name("BOOTSTRAP_LIST") - .help("Specify a list of bootstrap hostnames to use") - ) - .arg( - Arg::new("bootstrap-nodes") - .conflicts_with("bootstrap") - .long("bootstrap-nodes") - .takes_value(true) - .value_name("BOOTSTRAP_NODE_LIST") - .help("Specify a list of bootstrap node dialinfos to use"), - ) - .arg( - Arg::new("local") - .long("local") - .help("Enable local peer scope") - ); - - #[cfg(debug_assertions)] - let matches = matches.arg( - Arg::new("wait-for-debug") - .long("wait-for-debug") - .help("Wait for debugger to attach"), - ); - - Ok(matches.get_matches()) -} - -pub fn process_command_line() -> EyreResult<(Settings, ArgMatches)> { - // Get command line options - let default_config_path = Settings::get_default_config_path(); - let matches = do_clap_matches(default_config_path.as_os_str()) - .wrap_err("failed to parse command line: {}")?; - - // Check for one-off commands - #[cfg(debug_assertions)] - if matches.occurrences_of("wait-for-debug") != 0 { - use bugsalot::debugger; - debugger::wait_until_attached(None).expect("state() not implemented on this platform"); - } - - // Attempt to load configuration - let settings_path = if let Some(config_file) = matches.value_of_os("config-file") { - if Path::new(config_file).exists() { - Some(config_file) - } else { - None - } - } else { - None - }; - - let settings = Settings::new(settings_path).wrap_err("configuration is invalid")?; - - // write lock the settings - let mut settingsrw = settings.write(); - - // Set config from command line - if matches.occurrences_of("daemon") != 0 { - settingsrw.daemon.enabled = true; - settingsrw.logging.terminal.enabled = false; - } - if matches.occurrences_of("foreground") != 0 { - settingsrw.daemon.enabled = false; - } - if matches.occurrences_of("subnode-index") != 0 { - let subnode_index = match matches.value_of("subnode-index") { - Some(x) => x.parse().wrap_err("couldn't parse subnode index")?, - None => { - bail!("value not specified for subnode-index"); - } - }; - if subnode_index == 0 { - bail!("value of subnode_index should be between 1 and 65535"); - } - settingsrw.testing.subnode_index = subnode_index; - } - - if matches.occurrences_of("debug") != 0 { - settingsrw.logging.terminal.enabled = true; - settingsrw.logging.terminal.level = LogLevel::Debug; - } - if matches.occurrences_of("trace") != 0 { - settingsrw.logging.terminal.enabled = true; - settingsrw.logging.terminal.level = LogLevel::Trace; - } - if matches.occurrences_of("otlp") != 0 { - settingsrw.logging.otlp.enabled = true; - settingsrw.logging.otlp.grpc_endpoint = NamedSocketAddrs::from_str( - &matches - .value_of("otlp") - .expect("should not be null because of default missing value") - .to_string(), - ) - .wrap_err("failed to parse OTLP address")?; - settingsrw.logging.otlp.level = LogLevel::Trace; - } - if matches.is_present("attach") { - settingsrw.auto_attach = !matches!(matches.value_of("attach"), Some("true")); - } - if matches.is_present("local") { - settingsrw.core.network.enable_local_peer_scope = true; - } - if matches.occurrences_of("delete-protected-store") != 0 { - settingsrw.core.protected_store.delete = true; - } - if matches.occurrences_of("delete-block-store") != 0 { - settingsrw.core.block_store.delete = true; - } - if matches.occurrences_of("delete-table-store") != 0 { - settingsrw.core.table_store.delete = true; - } - if matches.occurrences_of("dump-txt-record") != 0 { - // Turn off terminal logging so we can be interactive - settingsrw.logging.terminal.enabled = false; - } - if let Some(v) = matches.value_of("set-node-id") { - // Turn off terminal logging so we can be interactive - settingsrw.logging.terminal.enabled = false; - - // Split or get secret - let (k, s) = if let Some((k, s)) = v.split_once(':') { - let k = DHTKey::try_decode(k).wrap_err("failed to decode node id from command line")?; - let s = DHTKeySecret::try_decode(s)?; - (k, s) - } else { - let k = DHTKey::try_decode(v)?; - let buffer = rpassword::prompt_password("Enter secret key (will not echo): ") - .wrap_err("invalid secret key")?; - let buffer = buffer.trim().to_string(); - let s = DHTKeySecret::try_decode(&buffer)?; - (k, s) - }; - settingsrw.core.network.node_id = k; - settingsrw.core.network.node_id_secret = s; - } - - if matches.occurrences_of("bootstrap") != 0 { - let bootstrap_list = match matches.value_of("bootstrap") { - Some(x) => { - println!("Overriding bootstrap list with: "); - let mut out: Vec = Vec::new(); - for x in x.split(',') { - let x = x.trim().to_string(); - println!(" {}", x); - out.push(x); - } - out - } - None => { - bail!("value not specified for bootstrap"); - } - }; - settingsrw.core.network.bootstrap = bootstrap_list; - } - - if matches.occurrences_of("bootstrap-nodes") != 0 { - let bootstrap_list = match matches.value_of("bootstrap-nodes") { - Some(x) => { - println!("Overriding bootstrap node list with: "); - let mut out: Vec = Vec::new(); - for x in x.split(',') { - let x = x.trim(); - println!(" {}", x); - out.push( - ParsedNodeDialInfo::from_str(x) - .wrap_err("unable to parse dial info in bootstrap node list")?, - ); - } - out - } - None => { - bail!("value not specified for bootstrap node list"); - } - }; - settingsrw.core.network.bootstrap_nodes = bootstrap_list; - } - drop(settingsrw); - - // Set specific config settings - if let Some(set_configs) = matches.values_of("set-config") { - for set_config in set_configs { - if let Some((k, v)) = set_config.split_once('=') { - let k = k.trim(); - let v = v.trim(); - settings.set(k, v)?; - } - } - } - - // Apply subnode index if we're testing - settings - .apply_subnode_index() - .wrap_err("failed to apply subnode index")?; - - Ok((settings, matches)) -} +use crate::settings::*; +use crate::*; +use clap::{Arg, ArgMatches, Command}; +use std::ffi::OsStr; +use std::path::Path; +use std::str::FromStr; +use veilid_core::{DHTKey, DHTKeySecret}; + +fn do_clap_matches(default_config_path: &OsStr) -> Result { + let matches = Command::new("veilid-server") + .version("0.1") + .about("Veilid Server") + .color(clap::ColorChoice::Auto) + .arg( + Arg::new("daemon") + .long("daemon") + .short('d') + .help("Run in daemon mode in the background"), + ) + .arg( + Arg::new("foreground") + .long("foreground") + .short('f') + .conflicts_with("daemon") + .help("Run in the foreground"), + ) + .arg( + Arg::new("config-file") + .short('c') + .long("config-file") + .takes_value(true) + .value_name("FILE") + .default_value_os(default_config_path) + .allow_invalid_utf8(true) + .help("Specify a configuration file to use"), + ) + .arg( + Arg::new("set-config") + .short('s') + .long("set-config") + .takes_value(true) + .multiple_occurrences(true) + .help("Specify configuration value to set (key in dot format, value in json format), eg: logging.api.enabled=true") + ) + .arg( + Arg::new("attach") + .long("attach") + .takes_value(true) + .value_name("BOOL") + .possible_values(&["false", "true"]) + .help("Automatically attach the server to the Veilid network"), + ) + // Dev options + .arg( + Arg::new("debug") + .long("debug") + .help("Turn on debug logging on the terminal"), + ) + .arg( + Arg::new("trace") + .long("trace") + .conflicts_with("debug") + .help("Turn on trace logging on the terminal"), + ) + .arg( + Arg::new("otlp") + .long("otlp") + .takes_value(true) + .value_name("endpoint") + .default_missing_value("localhost:4317") + .help("Turn on OpenTelemetry tracing") + .long_help("This option uses the GRPC OpenTelemetry protocol, not HTTP. The format for the endpoint is host:port, like 'localhost:4317'"), + ) + .arg( + Arg::new("subnode-index") + .long("subnode-index") + .takes_value(true) + .help("Run as an extra daemon on the same machine for testing purposes, specify a number greater than zero to offset the listening ports"), + ) + .arg( + Arg::new("generate-dht-key") + .long("generate-dht-key") + .help("Only generate a new dht key and print it"), + ) + .arg( + Arg::new("set-node-id") + .long("set-node-id") + .takes_value(true) + .value_name("ID") + .help("Set the node id and secret key") + .long_help("To specify both node id and secret key on the command line, use a ID:SECRET syntax with a colon, like:\n zsVXz5aTU98vZxwTcDmvpcnO5g1B2jRO3wpdNiDrRgw:gJzQLmzuBvA-dFvEmLcYvLoO5bh7hzCWFzfpJHapZKg\nIf no colon is used, the node id is specified, and a prompt appears to enter the secret key interactively.") + ) + .arg( + Arg::new("delete-protected-store") + .long("delete-protected-store") + .help("Delete the entire contents of the protected store (DANGER, NO UNDO!)"), + ) + .arg( + Arg::new("delete-table-store") + .long("delete-table-store") + .help("Delete the entire contents of the table store (DANGER, NO UNDO!)"), + ) + .arg( + Arg::new("delete-block-store") + .long("delete-block-store") + .help("Delete the entire contents of the block store (DANGER, NO UNDO!)"), + ) + .arg( + Arg::new("dump-config") + .long("dump-config") + .help("Instead of running the server, print the configuration it would use to the console"), + ) + .arg( + Arg::new("dump-txt-record") + .long("dump-txt-record") + .help("Prints the bootstrap TXT record for this node and then quits") + ) + .arg( + Arg::new("bootstrap") + .long("bootstrap") + .takes_value(true) + .value_name("BOOTSTRAP_LIST") + .help("Specify a list of bootstrap hostnames to use") + ) + .arg( + Arg::new("bootstrap-nodes") + .conflicts_with("bootstrap") + .long("bootstrap-nodes") + .takes_value(true) + .value_name("BOOTSTRAP_NODE_LIST") + .help("Specify a list of bootstrap node dialinfos to use"), + ) + .arg( + Arg::new("local") + .long("local") + .help("Enable local peer scope") + ); + + #[cfg(debug_assertions)] + let matches = matches.arg( + Arg::new("wait-for-debug") + .long("wait-for-debug") + .help("Wait for debugger to attach"), + ); + + Ok(matches.get_matches()) +} + +pub fn process_command_line() -> EyreResult<(Settings, ArgMatches)> { + // Get command line options + let default_config_path = Settings::get_default_config_path(); + let matches = do_clap_matches(default_config_path.as_os_str()) + .wrap_err("failed to parse command line: {}")?; + + // Check for one-off commands + #[cfg(debug_assertions)] + if matches.occurrences_of("wait-for-debug") != 0 { + use bugsalot::debugger; + debugger::wait_until_attached(None).expect("state() not implemented on this platform"); + } + + // Attempt to load configuration + let settings_path = if let Some(config_file) = matches.value_of_os("config-file") { + if Path::new(config_file).exists() { + Some(config_file) + } else { + None + } + } else { + None + }; + + let settings = Settings::new(settings_path).wrap_err("configuration is invalid")?; + + // write lock the settings + let mut settingsrw = settings.write(); + + // Set config from command line + if matches.occurrences_of("daemon") != 0 { + settingsrw.daemon.enabled = true; + settingsrw.logging.terminal.enabled = false; + } + if matches.occurrences_of("foreground") != 0 { + settingsrw.daemon.enabled = false; + } + if matches.occurrences_of("subnode-index") != 0 { + let subnode_index = match matches.value_of("subnode-index") { + Some(x) => x.parse().wrap_err("couldn't parse subnode index")?, + None => { + bail!("value not specified for subnode-index"); + } + }; + if subnode_index == 0 { + bail!("value of subnode_index should be between 1 and 65535"); + } + settingsrw.testing.subnode_index = subnode_index; + } + + if matches.occurrences_of("debug") != 0 { + settingsrw.logging.terminal.enabled = true; + settingsrw.logging.terminal.level = LogLevel::Debug; + } + if matches.occurrences_of("trace") != 0 { + settingsrw.logging.terminal.enabled = true; + settingsrw.logging.terminal.level = LogLevel::Trace; + } + if matches.occurrences_of("otlp") != 0 { + settingsrw.logging.otlp.enabled = true; + settingsrw.logging.otlp.grpc_endpoint = NamedSocketAddrs::from_str( + &matches + .value_of("otlp") + .expect("should not be null because of default missing value") + .to_string(), + ) + .wrap_err("failed to parse OTLP address")?; + settingsrw.logging.otlp.level = LogLevel::Trace; + } + if matches.is_present("attach") { + settingsrw.auto_attach = !matches!(matches.value_of("attach"), Some("true")); + } + if matches.is_present("local") { + settingsrw.core.network.enable_local_peer_scope = true; + } + if matches.occurrences_of("delete-protected-store") != 0 { + settingsrw.core.protected_store.delete = true; + } + if matches.occurrences_of("delete-block-store") != 0 { + settingsrw.core.block_store.delete = true; + } + if matches.occurrences_of("delete-table-store") != 0 { + settingsrw.core.table_store.delete = true; + } + if matches.occurrences_of("dump-txt-record") != 0 { + // Turn off terminal logging so we can be interactive + settingsrw.logging.terminal.enabled = false; + } + if let Some(v) = matches.value_of("set-node-id") { + // Turn off terminal logging so we can be interactive + settingsrw.logging.terminal.enabled = false; + + // Split or get secret + let (k, s) = if let Some((k, s)) = v.split_once(':') { + let k = DHTKey::try_decode(k).wrap_err("failed to decode node id from command line")?; + let s = DHTKeySecret::try_decode(s)?; + (k, s) + } else { + let k = DHTKey::try_decode(v)?; + let buffer = rpassword::prompt_password("Enter secret key (will not echo): ") + .wrap_err("invalid secret key")?; + let buffer = buffer.trim().to_string(); + let s = DHTKeySecret::try_decode(&buffer)?; + (k, s) + }; + settingsrw.core.network.node_id = k; + settingsrw.core.network.node_id_secret = s; + } + + if matches.occurrences_of("bootstrap") != 0 { + let bootstrap_list = match matches.value_of("bootstrap") { + Some(x) => { + println!("Overriding bootstrap list with: "); + let mut out: Vec = Vec::new(); + for x in x.split(',') { + let x = x.trim().to_string(); + println!(" {}", x); + out.push(x); + } + out + } + None => { + bail!("value not specified for bootstrap"); + } + }; + settingsrw.core.network.bootstrap = bootstrap_list; + } + + if matches.occurrences_of("bootstrap-nodes") != 0 { + let bootstrap_list = match matches.value_of("bootstrap-nodes") { + Some(x) => { + println!("Overriding bootstrap node list with: "); + let mut out: Vec = Vec::new(); + for x in x.split(',') { + let x = x.trim(); + println!(" {}", x); + out.push( + ParsedNodeDialInfo::from_str(x) + .wrap_err("unable to parse dial info in bootstrap node list")?, + ); + } + out + } + None => { + bail!("value not specified for bootstrap node list"); + } + }; + settingsrw.core.network.bootstrap_nodes = bootstrap_list; + } + drop(settingsrw); + + // Set specific config settings + if let Some(set_configs) = matches.values_of("set-config") { + for set_config in set_configs { + if let Some((k, v)) = set_config.split_once('=') { + let k = k.trim(); + let v = v.trim(); + settings.set(k, v)?; + } + } + } + + // Apply subnode index if we're testing + settings + .apply_subnode_index() + .wrap_err("failed to apply subnode index")?; + + Ok((settings, matches)) +} diff --git a/veilid-server/src/server.rs b/veilid-server/src/server.rs index bb807bb4..f5f2af2f 100644 --- a/veilid-server/src/server.rs +++ b/veilid-server/src/server.rs @@ -1,168 +1,168 @@ -use crate::client_api; -use crate::settings::*; -use crate::tools::*; -use crate::veilid_logs::*; -use crate::*; -use flume::{unbounded, Receiver, Sender}; -use lazy_static::*; -use parking_lot::Mutex; -use std::sync::Arc; -use std::time::{Duration, Instant}; -use tracing::*; -use veilid_core::xx::SingleShotEventual; - -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum ServerMode { - Normal, - ShutdownImmediate, - DumpTXTRecord, -} - -lazy_static! { - static ref SHUTDOWN_SWITCH: Mutex>> = - Mutex::new(Some(SingleShotEventual::new(Some(())))); -} - -#[instrument] -pub fn shutdown() { - let shutdown_switch = SHUTDOWN_SWITCH.lock().take(); - if let Some(shutdown_switch) = shutdown_switch { - shutdown_switch.resolve(()); - } -} - -pub async fn run_veilid_server( - settings: Settings, - server_mode: ServerMode, - veilid_logs: VeilidLogs, -) -> EyreResult<()> { - run_veilid_server_internal(settings, server_mode, veilid_logs).await -} - -//#[instrument(err, skip_all)] -pub async fn run_veilid_server_internal( - settings: Settings, - server_mode: ServerMode, - veilid_logs: VeilidLogs, -) -> EyreResult<()> { - trace!(?settings, ?server_mode); - - let settingsr = settings.read(); - - // Create client api state change pipe - let (sender, receiver): ( - Sender, - Receiver, - ) = unbounded(); - - // Create VeilidCore setup - let update_callback = Arc::new(move |change: veilid_core::VeilidUpdate| { - if sender.send(change).is_err() { - error!("error sending veilid update callback"); - } - }); - let config_callback = settings.get_core_config_callback(); - - // Start Veilid Core and get API - let veilid_api = veilid_core::api_startup(update_callback, config_callback) - .await - .wrap_err("VeilidCore startup failed")?; - - // Start client api if one is requested - let mut capi = if settingsr.client_api.enabled && matches!(server_mode, ServerMode::Normal) { - let some_capi = client_api::ClientApi::new(veilid_api.clone(), veilid_logs.clone()); - some_capi - .clone() - .run(settingsr.client_api.listen_address.addrs.clone()); - Some(some_capi) - } else { - None - }; - - // Drop rwlock on settings - let auto_attach = settingsr.auto_attach || !matches!(server_mode, ServerMode::Normal); - drop(settingsr); - - // Process all updates - let capi2 = capi.clone(); - let update_receiver_jh = spawn_local(async move { - while let Ok(change) = receiver.recv_async().await { - if let Some(capi) = &capi2 { - // Handle state changes on main thread for capnproto rpc - capi.clone().handle_update(change); - } - } - }); - - // Auto-attach if desired - let mut out = Ok(()); - if auto_attach { - info!("Auto-attach to the Veilid network"); - if let Err(e) = veilid_api.attach().await { - out = Err(eyre!( - "Auto-attaching to the Veilid network failed: {:?}", - e - )); - shutdown(); - } - } - - // Process dump-txt-record - if matches!(server_mode, ServerMode::DumpTXTRecord) { - let start_time = Instant::now(); - while Instant::now().duration_since(start_time) < Duration::from_secs(10) { - match veilid_api.get_state().await { - Ok(vs) => { - if vs.network.started { - break; - } - } - Err(e) => { - out = Err(eyre!("Getting state failed: {:?}", e)); - break; - } - } - sleep(Duration::from_millis(100)).await; - } - match veilid_api.debug("txtrecord".to_string()).await { - Ok(v) => { - print!("{}", v); - } - Err(e) => { - out = Err(eyre!("Getting TXT record failed: {:?}", e)); - } - }; - shutdown(); - } - - // Process shutdown-immediate - if matches!(server_mode, ServerMode::ShutdownImmediate) { - shutdown(); - } - - // Idle while waiting to exit - let shutdown_switch = { - let shutdown_switch_locked = SHUTDOWN_SWITCH.lock(); - (*shutdown_switch_locked).as_ref().map(|ss| ss.instance()) - }; - if let Some(shutdown_switch) = shutdown_switch { - shutdown_switch.await; - } - - // Stop the client api if we have one - if let Some(c) = capi.as_mut().cloned() { - c.stop().await; - } - - // Shut down Veilid API to release state change sender - veilid_api.shutdown().await; - - // Wait for update receiver to exit - let _ = update_receiver_jh.await; - - // Finally, drop logs - // this is explicit to ensure we don't accidentally drop them too soon via a move - drop(veilid_logs); - - out -} +use crate::client_api; +use crate::settings::*; +use crate::tools::*; +use crate::veilid_logs::*; +use crate::*; +use flume::{unbounded, Receiver, Sender}; +use lazy_static::*; +use parking_lot::Mutex; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tracing::*; +use veilid_core::xx::SingleShotEventual; + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum ServerMode { + Normal, + ShutdownImmediate, + DumpTXTRecord, +} + +lazy_static! { + static ref SHUTDOWN_SWITCH: Mutex>> = + Mutex::new(Some(SingleShotEventual::new(Some(())))); +} + +#[instrument] +pub fn shutdown() { + let shutdown_switch = SHUTDOWN_SWITCH.lock().take(); + if let Some(shutdown_switch) = shutdown_switch { + shutdown_switch.resolve(()); + } +} + +pub async fn run_veilid_server( + settings: Settings, + server_mode: ServerMode, + veilid_logs: VeilidLogs, +) -> EyreResult<()> { + run_veilid_server_internal(settings, server_mode, veilid_logs).await +} + +//#[instrument(err, skip_all)] +pub async fn run_veilid_server_internal( + settings: Settings, + server_mode: ServerMode, + veilid_logs: VeilidLogs, +) -> EyreResult<()> { + trace!(?settings, ?server_mode); + + let settingsr = settings.read(); + + // Create client api state change pipe + let (sender, receiver): ( + Sender, + Receiver, + ) = unbounded(); + + // Create VeilidCore setup + let update_callback = Arc::new(move |change: veilid_core::VeilidUpdate| { + if sender.send(change).is_err() { + error!("error sending veilid update callback"); + } + }); + let config_callback = settings.get_core_config_callback(); + + // Start Veilid Core and get API + let veilid_api = veilid_core::api_startup(update_callback, config_callback) + .await + .wrap_err("VeilidCore startup failed")?; + + // Start client api if one is requested + let mut capi = if settingsr.client_api.enabled && matches!(server_mode, ServerMode::Normal) { + let some_capi = client_api::ClientApi::new(veilid_api.clone(), veilid_logs.clone()); + some_capi + .clone() + .run(settingsr.client_api.listen_address.addrs.clone()); + Some(some_capi) + } else { + None + }; + + // Drop rwlock on settings + let auto_attach = settingsr.auto_attach || !matches!(server_mode, ServerMode::Normal); + drop(settingsr); + + // Process all updates + let capi2 = capi.clone(); + let update_receiver_jh = spawn_local(async move { + while let Ok(change) = receiver.recv_async().await { + if let Some(capi) = &capi2 { + // Handle state changes on main thread for capnproto rpc + capi.clone().handle_update(change); + } + } + }); + + // Auto-attach if desired + let mut out = Ok(()); + if auto_attach { + info!("Auto-attach to the Veilid network"); + if let Err(e) = veilid_api.attach().await { + out = Err(eyre!( + "Auto-attaching to the Veilid network failed: {:?}", + e + )); + shutdown(); + } + } + + // Process dump-txt-record + if matches!(server_mode, ServerMode::DumpTXTRecord) { + let start_time = Instant::now(); + while Instant::now().duration_since(start_time) < Duration::from_secs(10) { + match veilid_api.get_state().await { + Ok(vs) => { + if vs.network.started { + break; + } + } + Err(e) => { + out = Err(eyre!("Getting state failed: {:?}", e)); + break; + } + } + sleep(Duration::from_millis(100)).await; + } + match veilid_api.debug("txtrecord".to_string()).await { + Ok(v) => { + print!("{}", v); + } + Err(e) => { + out = Err(eyre!("Getting TXT record failed: {:?}", e)); + } + }; + shutdown(); + } + + // Process shutdown-immediate + if matches!(server_mode, ServerMode::ShutdownImmediate) { + shutdown(); + } + + // Idle while waiting to exit + let shutdown_switch = { + let shutdown_switch_locked = SHUTDOWN_SWITCH.lock(); + (*shutdown_switch_locked).as_ref().map(|ss| ss.instance()) + }; + if let Some(shutdown_switch) = shutdown_switch { + shutdown_switch.await; + } + + // Stop the client api if we have one + if let Some(c) = capi.as_mut().cloned() { + c.stop().await; + } + + // Shut down Veilid API to release state change sender + veilid_api.shutdown().await; + + // Wait for update receiver to exit + let _ = update_receiver_jh.await; + + // Finally, drop logs + // this is explicit to ensure we don't accidentally drop them too soon via a move + drop(veilid_logs); + + out +} diff --git a/veilid-server/src/veilid_logs.rs b/veilid-server/src/veilid_logs.rs index d451b940..7a08c835 100644 --- a/veilid-server/src/veilid_logs.rs +++ b/veilid-server/src/veilid_logs.rs @@ -1,208 +1,208 @@ -use crate::settings::*; -use crate::*; -use cfg_if::*; -use opentelemetry::sdk::*; -use opentelemetry::*; -use opentelemetry_otlp::WithExportConfig; -use parking_lot::*; -use std::collections::BTreeMap; -use std::path::*; -use std::sync::Arc; -use tracing_appender::*; -use tracing_subscriber::prelude::*; -use tracing_subscriber::*; - -struct VeilidLogsInner { - _guard: Option, - filters: BTreeMap<&'static str, veilid_core::VeilidLayerFilter>, -} - -#[derive(Clone)] -pub struct VeilidLogs { - inner: Arc>, -} - -impl VeilidLogs { - pub fn setup(settings: Settings) -> EyreResult { - let settingsr = settings.read(); - - // Set up subscriber and layers - let subscriber = Registry::default(); - let mut layers = Vec::new(); - let mut filters = BTreeMap::new(); - - // Error layer - // XXX: Spantrace capture causes rwlock deadlocks/crashes - // XXX: - //layers.push(tracing_error::ErrorLayer::default().boxed()); - - // Terminal logger - if settingsr.logging.terminal.enabled { - let filter = veilid_core::VeilidLayerFilter::new( - convert_loglevel(settingsr.logging.terminal.level), - None, - ); - let layer = fmt::Layer::new() - .compact() - .with_writer(std::io::stdout) - .with_filter(filter.clone()); - filters.insert("terminal", filter); - layers.push(layer.boxed()); - } - - // OpenTelemetry logger - if settingsr.logging.otlp.enabled { - let grpc_endpoint = settingsr.logging.otlp.grpc_endpoint.name.clone(); - - cfg_if! { - if #[cfg(feature="rt-async-std")] { - let exporter = opentelemetry_otlp::new_exporter() - .grpcio() - .with_endpoint(grpc_endpoint); - let batch = opentelemetry::runtime::AsyncStd; - } else if #[cfg(feature="rt-tokio")] { - let exporter = opentelemetry_otlp::new_exporter() - .tonic() - .with_endpoint(format!("http://{}", grpc_endpoint)); - let batch = opentelemetry::runtime::Tokio; - } - } - - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter(exporter) - .with_trace_config(opentelemetry::sdk::trace::config().with_resource( - Resource::new(vec![KeyValue::new( - opentelemetry_semantic_conventions::resource::SERVICE_NAME, - format!( - "veilid_server:{}", - hostname::get() - .map(|s| s.to_string_lossy().into_owned()) - .unwrap_or_else(|_| "unknown".to_owned()) - ), - )]), - )) - .install_batch(batch) - .wrap_err("failed to install OpenTelemetry tracer")?; - - let filter = veilid_core::VeilidLayerFilter::new( - convert_loglevel(settingsr.logging.otlp.level), - None, - ); - let layer = tracing_opentelemetry::layer() - .with_tracer(tracer) - .with_filter(filter.clone()); - filters.insert("otlp", filter); - layers.push(layer.boxed()); - } - - // File logger - let mut guard = None; - if settingsr.logging.file.enabled { - let log_path = Path::new(&settingsr.logging.file.path); - let full_path = std::env::current_dir() - .unwrap_or(PathBuf::from(MAIN_SEPARATOR.to_string())) - .join(log_path); - let log_parent = full_path - .parent() - .unwrap_or(Path::new(&MAIN_SEPARATOR.to_string())) - .canonicalize() - .wrap_err(format!( - "File log path parent does not exist: {}", - settingsr.logging.file.path - ))?; - let log_filename = full_path.file_name().ok_or(eyre!( - "File log filename not specified in path: {}", - settingsr.logging.file.path - ))?; - - let appender = tracing_appender::rolling::never(log_parent, Path::new(log_filename)); - let (non_blocking_appender, non_blocking_guard) = - tracing_appender::non_blocking(appender); - guard = Some(non_blocking_guard); - - let filter = veilid_core::VeilidLayerFilter::new( - convert_loglevel(settingsr.logging.file.level), - None, - ); - let layer = fmt::Layer::new() - .compact() - .with_writer(non_blocking_appender) - .with_filter(filter.clone()); - - filters.insert("file", filter); - layers.push(layer.boxed()); - } - - // API logger - if settingsr.logging.api.enabled { - let filter = veilid_core::VeilidLayerFilter::new( - convert_loglevel(settingsr.logging.api.level), - None, - ); - let layer = veilid_core::ApiTracingLayer::get().with_filter(filter.clone()); - filters.insert("api", filter); - layers.push(layer.boxed()); - } - - // Systemd Journal logger - cfg_if! { - if #[cfg(target_os = "linux")] { - if settingsr.logging.system.enabled { - let filter = veilid_core::VeilidLayerFilter::new( - convert_loglevel(settingsr.logging.system.level), - None, - ); - let layer =tracing_journald::layer().wrap_err("failed to set up journald logging")? - .with_filter(filter.clone()); - filters.insert("system", filter); - layers.push(layer.boxed()); - } - } - } - - let subscriber = subscriber.with(layers); - subscriber - .try_init() - .wrap_err("failed to initialize logging")?; - - Ok(VeilidLogs { - inner: Arc::new(Mutex::new(VeilidLogsInner { - _guard: guard, - filters, - })), - }) - } - - pub fn change_log_level( - &self, - layer: String, - log_level: veilid_core::VeilidConfigLogLevel, - ) -> Result<(), veilid_core::VeilidAPIError> { - // get layer to change level on - let layer = if layer == "all" { "".to_owned() } else { layer }; - - // change log level on appropriate layer - let inner = self.inner.lock(); - if layer.is_empty() { - // Change all layers - for f in inner.filters.values() { - f.set_max_level(log_level); - } - } else { - // Change a specific layer - let f = match inner.filters.get(layer.as_str()) { - Some(f) => f, - None => { - return Err(veilid_core::VeilidAPIError::InvalidArgument { - context: "change_log_level".to_owned(), - argument: "layer".to_owned(), - value: layer, - }); - } - }; - f.set_max_level(log_level); - } - Ok(()) - } -} +use crate::settings::*; +use crate::*; +use cfg_if::*; +use opentelemetry::sdk::*; +use opentelemetry::*; +use opentelemetry_otlp::WithExportConfig; +use parking_lot::*; +use std::collections::BTreeMap; +use std::path::*; +use std::sync::Arc; +use tracing_appender::*; +use tracing_subscriber::prelude::*; +use tracing_subscriber::*; + +struct VeilidLogsInner { + _guard: Option, + filters: BTreeMap<&'static str, veilid_core::VeilidLayerFilter>, +} + +#[derive(Clone)] +pub struct VeilidLogs { + inner: Arc>, +} + +impl VeilidLogs { + pub fn setup(settings: Settings) -> EyreResult { + let settingsr = settings.read(); + + // Set up subscriber and layers + let subscriber = Registry::default(); + let mut layers = Vec::new(); + let mut filters = BTreeMap::new(); + + // Error layer + // XXX: Spantrace capture causes rwlock deadlocks/crashes + // XXX: + //layers.push(tracing_error::ErrorLayer::default().boxed()); + + // Terminal logger + if settingsr.logging.terminal.enabled { + let filter = veilid_core::VeilidLayerFilter::new( + convert_loglevel(settingsr.logging.terminal.level), + None, + ); + let layer = fmt::Layer::new() + .compact() + .with_writer(std::io::stdout) + .with_filter(filter.clone()); + filters.insert("terminal", filter); + layers.push(layer.boxed()); + } + + // OpenTelemetry logger + if settingsr.logging.otlp.enabled { + let grpc_endpoint = settingsr.logging.otlp.grpc_endpoint.name.clone(); + + cfg_if! { + if #[cfg(feature="rt-async-std")] { + let exporter = opentelemetry_otlp::new_exporter() + .grpcio() + .with_endpoint(grpc_endpoint); + let batch = opentelemetry::runtime::AsyncStd; + } else if #[cfg(feature="rt-tokio")] { + let exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(format!("http://{}", grpc_endpoint)); + let batch = opentelemetry::runtime::Tokio; + } + } + + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(exporter) + .with_trace_config(opentelemetry::sdk::trace::config().with_resource( + Resource::new(vec![KeyValue::new( + opentelemetry_semantic_conventions::resource::SERVICE_NAME, + format!( + "veilid_server:{}", + hostname::get() + .map(|s| s.to_string_lossy().into_owned()) + .unwrap_or_else(|_| "unknown".to_owned()) + ), + )]), + )) + .install_batch(batch) + .wrap_err("failed to install OpenTelemetry tracer")?; + + let filter = veilid_core::VeilidLayerFilter::new( + convert_loglevel(settingsr.logging.otlp.level), + None, + ); + let layer = tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_filter(filter.clone()); + filters.insert("otlp", filter); + layers.push(layer.boxed()); + } + + // File logger + let mut guard = None; + if settingsr.logging.file.enabled { + let log_path = Path::new(&settingsr.logging.file.path); + let full_path = std::env::current_dir() + .unwrap_or(PathBuf::from(MAIN_SEPARATOR.to_string())) + .join(log_path); + let log_parent = full_path + .parent() + .unwrap_or(Path::new(&MAIN_SEPARATOR.to_string())) + .canonicalize() + .wrap_err(format!( + "File log path parent does not exist: {}", + settingsr.logging.file.path + ))?; + let log_filename = full_path.file_name().ok_or(eyre!( + "File log filename not specified in path: {}", + settingsr.logging.file.path + ))?; + + let appender = tracing_appender::rolling::never(log_parent, Path::new(log_filename)); + let (non_blocking_appender, non_blocking_guard) = + tracing_appender::non_blocking(appender); + guard = Some(non_blocking_guard); + + let filter = veilid_core::VeilidLayerFilter::new( + convert_loglevel(settingsr.logging.file.level), + None, + ); + let layer = fmt::Layer::new() + .compact() + .with_writer(non_blocking_appender) + .with_filter(filter.clone()); + + filters.insert("file", filter); + layers.push(layer.boxed()); + } + + // API logger + if settingsr.logging.api.enabled { + let filter = veilid_core::VeilidLayerFilter::new( + convert_loglevel(settingsr.logging.api.level), + None, + ); + let layer = veilid_core::ApiTracingLayer::get().with_filter(filter.clone()); + filters.insert("api", filter); + layers.push(layer.boxed()); + } + + // Systemd Journal logger + cfg_if! { + if #[cfg(target_os = "linux")] { + if settingsr.logging.system.enabled { + let filter = veilid_core::VeilidLayerFilter::new( + convert_loglevel(settingsr.logging.system.level), + None, + ); + let layer = tracing_journald::layer().wrap_err("failed to set up journald logging")? + .with_filter(filter.clone()); + filters.insert("system", filter); + layers.push(layer.boxed()); + } + } + } + + let subscriber = subscriber.with(layers); + subscriber + .try_init() + .wrap_err("failed to initialize logging")?; + + Ok(VeilidLogs { + inner: Arc::new(Mutex::new(VeilidLogsInner { + _guard: guard, + filters, + })), + }) + } + + pub fn change_log_level( + &self, + layer: String, + log_level: veilid_core::VeilidConfigLogLevel, + ) -> Result<(), veilid_core::VeilidAPIError> { + // get layer to change level on + let layer = if layer == "all" { "".to_owned() } else { layer }; + + // change log level on appropriate layer + let inner = self.inner.lock(); + if layer.is_empty() { + // Change all layers + for f in inner.filters.values() { + f.set_max_level(log_level); + } + } else { + // Change a specific layer + let f = match inner.filters.get(layer.as_str()) { + Some(f) => f, + None => { + return Err(veilid_core::VeilidAPIError::InvalidArgument { + context: "change_log_level".to_owned(), + argument: "layer".to_owned(), + value: layer, + }); + } + }; + f.set_max_level(log_level); + } + Ok(()) + } +}