This commit is contained in:
John Smith 2022-08-23 13:30:49 -04:00
parent 8cce818fea
commit 1b59633d73
3 changed files with 693 additions and 693 deletions

View File

@ -1,317 +1,317 @@
use crate::settings::*; use crate::settings::*;
use crate::*; use crate::*;
use clap::{Arg, ArgMatches, Command}; use clap::{Arg, ArgMatches, Command};
use std::ffi::OsStr; use std::ffi::OsStr;
use std::path::Path; use std::path::Path;
use std::str::FromStr; use std::str::FromStr;
use veilid_core::{DHTKey, DHTKeySecret}; use veilid_core::{DHTKey, DHTKeySecret};
fn do_clap_matches(default_config_path: &OsStr) -> Result<clap::ArgMatches, clap::Error> { fn do_clap_matches(default_config_path: &OsStr) -> Result<clap::ArgMatches, clap::Error> {
let matches = Command::new("veilid-server") let matches = Command::new("veilid-server")
.version("0.1") .version("0.1")
.about("Veilid Server") .about("Veilid Server")
.color(clap::ColorChoice::Auto) .color(clap::ColorChoice::Auto)
.arg( .arg(
Arg::new("daemon") Arg::new("daemon")
.long("daemon") .long("daemon")
.short('d') .short('d')
.help("Run in daemon mode in the background"), .help("Run in daemon mode in the background"),
) )
.arg( .arg(
Arg::new("foreground") Arg::new("foreground")
.long("foreground") .long("foreground")
.short('f') .short('f')
.conflicts_with("daemon") .conflicts_with("daemon")
.help("Run in the foreground"), .help("Run in the foreground"),
) )
.arg( .arg(
Arg::new("config-file") Arg::new("config-file")
.short('c') .short('c')
.long("config-file") .long("config-file")
.takes_value(true) .takes_value(true)
.value_name("FILE") .value_name("FILE")
.default_value_os(default_config_path) .default_value_os(default_config_path)
.allow_invalid_utf8(true) .allow_invalid_utf8(true)
.help("Specify a configuration file to use"), .help("Specify a configuration file to use"),
) )
.arg( .arg(
Arg::new("set-config") Arg::new("set-config")
.short('s') .short('s')
.long("set-config") .long("set-config")
.takes_value(true) .takes_value(true)
.multiple_occurrences(true) .multiple_occurrences(true)
.help("Specify configuration value to set (key in dot format, value in json format), eg: logging.api.enabled=true") .help("Specify configuration value to set (key in dot format, value in json format), eg: logging.api.enabled=true")
) )
.arg( .arg(
Arg::new("attach") Arg::new("attach")
.long("attach") .long("attach")
.takes_value(true) .takes_value(true)
.value_name("BOOL") .value_name("BOOL")
.possible_values(&["false", "true"]) .possible_values(&["false", "true"])
.help("Automatically attach the server to the Veilid network"), .help("Automatically attach the server to the Veilid network"),
) )
// Dev options // Dev options
.arg( .arg(
Arg::new("debug") Arg::new("debug")
.long("debug") .long("debug")
.help("Turn on debug logging on the terminal"), .help("Turn on debug logging on the terminal"),
) )
.arg( .arg(
Arg::new("trace") Arg::new("trace")
.long("trace") .long("trace")
.conflicts_with("debug") .conflicts_with("debug")
.help("Turn on trace logging on the terminal"), .help("Turn on trace logging on the terminal"),
) )
.arg( .arg(
Arg::new("otlp") Arg::new("otlp")
.long("otlp") .long("otlp")
.takes_value(true) .takes_value(true)
.value_name("endpoint") .value_name("endpoint")
.default_missing_value("localhost:4317") .default_missing_value("localhost:4317")
.help("Turn on OpenTelemetry tracing") .help("Turn on OpenTelemetry tracing")
.long_help("This option uses the GRPC OpenTelemetry protocol, not HTTP. The format for the endpoint is host:port, like 'localhost:4317'"), .long_help("This option uses the GRPC OpenTelemetry protocol, not HTTP. The format for the endpoint is host:port, like 'localhost:4317'"),
) )
.arg( .arg(
Arg::new("subnode-index") Arg::new("subnode-index")
.long("subnode-index") .long("subnode-index")
.takes_value(true) .takes_value(true)
.help("Run as an extra daemon on the same machine for testing purposes, specify a number greater than zero to offset the listening ports"), .help("Run as an extra daemon on the same machine for testing purposes, specify a number greater than zero to offset the listening ports"),
) )
.arg( .arg(
Arg::new("generate-dht-key") Arg::new("generate-dht-key")
.long("generate-dht-key") .long("generate-dht-key")
.help("Only generate a new dht key and print it"), .help("Only generate a new dht key and print it"),
) )
.arg( .arg(
Arg::new("set-node-id") Arg::new("set-node-id")
.long("set-node-id") .long("set-node-id")
.takes_value(true) .takes_value(true)
.value_name("ID") .value_name("ID")
.help("Set the node id and secret key") .help("Set the node id and secret key")
.long_help("To specify both node id and secret key on the command line, use a ID:SECRET syntax with a colon, like:\n zsVXz5aTU98vZxwTcDmvpcnO5g1B2jRO3wpdNiDrRgw:gJzQLmzuBvA-dFvEmLcYvLoO5bh7hzCWFzfpJHapZKg\nIf no colon is used, the node id is specified, and a prompt appears to enter the secret key interactively.") .long_help("To specify both node id and secret key on the command line, use a ID:SECRET syntax with a colon, like:\n zsVXz5aTU98vZxwTcDmvpcnO5g1B2jRO3wpdNiDrRgw:gJzQLmzuBvA-dFvEmLcYvLoO5bh7hzCWFzfpJHapZKg\nIf no colon is used, the node id is specified, and a prompt appears to enter the secret key interactively.")
) )
.arg( .arg(
Arg::new("delete-protected-store") Arg::new("delete-protected-store")
.long("delete-protected-store") .long("delete-protected-store")
.help("Delete the entire contents of the protected store (DANGER, NO UNDO!)"), .help("Delete the entire contents of the protected store (DANGER, NO UNDO!)"),
) )
.arg( .arg(
Arg::new("delete-table-store") Arg::new("delete-table-store")
.long("delete-table-store") .long("delete-table-store")
.help("Delete the entire contents of the table store (DANGER, NO UNDO!)"), .help("Delete the entire contents of the table store (DANGER, NO UNDO!)"),
) )
.arg( .arg(
Arg::new("delete-block-store") Arg::new("delete-block-store")
.long("delete-block-store") .long("delete-block-store")
.help("Delete the entire contents of the block store (DANGER, NO UNDO!)"), .help("Delete the entire contents of the block store (DANGER, NO UNDO!)"),
) )
.arg( .arg(
Arg::new("dump-config") Arg::new("dump-config")
.long("dump-config") .long("dump-config")
.help("Instead of running the server, print the configuration it would use to the console"), .help("Instead of running the server, print the configuration it would use to the console"),
) )
.arg( .arg(
Arg::new("dump-txt-record") Arg::new("dump-txt-record")
.long("dump-txt-record") .long("dump-txt-record")
.help("Prints the bootstrap TXT record for this node and then quits") .help("Prints the bootstrap TXT record for this node and then quits")
) )
.arg( .arg(
Arg::new("bootstrap") Arg::new("bootstrap")
.long("bootstrap") .long("bootstrap")
.takes_value(true) .takes_value(true)
.value_name("BOOTSTRAP_LIST") .value_name("BOOTSTRAP_LIST")
.help("Specify a list of bootstrap hostnames to use") .help("Specify a list of bootstrap hostnames to use")
) )
.arg( .arg(
Arg::new("bootstrap-nodes") Arg::new("bootstrap-nodes")
.conflicts_with("bootstrap") .conflicts_with("bootstrap")
.long("bootstrap-nodes") .long("bootstrap-nodes")
.takes_value(true) .takes_value(true)
.value_name("BOOTSTRAP_NODE_LIST") .value_name("BOOTSTRAP_NODE_LIST")
.help("Specify a list of bootstrap node dialinfos to use"), .help("Specify a list of bootstrap node dialinfos to use"),
) )
.arg( .arg(
Arg::new("local") Arg::new("local")
.long("local") .long("local")
.help("Enable local peer scope") .help("Enable local peer scope")
); );
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
let matches = matches.arg( let matches = matches.arg(
Arg::new("wait-for-debug") Arg::new("wait-for-debug")
.long("wait-for-debug") .long("wait-for-debug")
.help("Wait for debugger to attach"), .help("Wait for debugger to attach"),
); );
Ok(matches.get_matches()) Ok(matches.get_matches())
} }
pub fn process_command_line() -> EyreResult<(Settings, ArgMatches)> { pub fn process_command_line() -> EyreResult<(Settings, ArgMatches)> {
// Get command line options // Get command line options
let default_config_path = Settings::get_default_config_path(); let default_config_path = Settings::get_default_config_path();
let matches = do_clap_matches(default_config_path.as_os_str()) let matches = do_clap_matches(default_config_path.as_os_str())
.wrap_err("failed to parse command line: {}")?; .wrap_err("failed to parse command line: {}")?;
// Check for one-off commands // Check for one-off commands
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
if matches.occurrences_of("wait-for-debug") != 0 { if matches.occurrences_of("wait-for-debug") != 0 {
use bugsalot::debugger; use bugsalot::debugger;
debugger::wait_until_attached(None).expect("state() not implemented on this platform"); debugger::wait_until_attached(None).expect("state() not implemented on this platform");
} }
// Attempt to load configuration // Attempt to load configuration
let settings_path = if let Some(config_file) = matches.value_of_os("config-file") { let settings_path = if let Some(config_file) = matches.value_of_os("config-file") {
if Path::new(config_file).exists() { if Path::new(config_file).exists() {
Some(config_file) Some(config_file)
} else { } else {
None None
} }
} else { } else {
None None
}; };
let settings = Settings::new(settings_path).wrap_err("configuration is invalid")?; let settings = Settings::new(settings_path).wrap_err("configuration is invalid")?;
// write lock the settings // write lock the settings
let mut settingsrw = settings.write(); let mut settingsrw = settings.write();
// Set config from command line // Set config from command line
if matches.occurrences_of("daemon") != 0 { if matches.occurrences_of("daemon") != 0 {
settingsrw.daemon.enabled = true; settingsrw.daemon.enabled = true;
settingsrw.logging.terminal.enabled = false; settingsrw.logging.terminal.enabled = false;
} }
if matches.occurrences_of("foreground") != 0 { if matches.occurrences_of("foreground") != 0 {
settingsrw.daemon.enabled = false; settingsrw.daemon.enabled = false;
} }
if matches.occurrences_of("subnode-index") != 0 { if matches.occurrences_of("subnode-index") != 0 {
let subnode_index = match matches.value_of("subnode-index") { let subnode_index = match matches.value_of("subnode-index") {
Some(x) => x.parse().wrap_err("couldn't parse subnode index")?, Some(x) => x.parse().wrap_err("couldn't parse subnode index")?,
None => { None => {
bail!("value not specified for subnode-index"); bail!("value not specified for subnode-index");
} }
}; };
if subnode_index == 0 { if subnode_index == 0 {
bail!("value of subnode_index should be between 1 and 65535"); bail!("value of subnode_index should be between 1 and 65535");
} }
settingsrw.testing.subnode_index = subnode_index; settingsrw.testing.subnode_index = subnode_index;
} }
if matches.occurrences_of("debug") != 0 { if matches.occurrences_of("debug") != 0 {
settingsrw.logging.terminal.enabled = true; settingsrw.logging.terminal.enabled = true;
settingsrw.logging.terminal.level = LogLevel::Debug; settingsrw.logging.terminal.level = LogLevel::Debug;
} }
if matches.occurrences_of("trace") != 0 { if matches.occurrences_of("trace") != 0 {
settingsrw.logging.terminal.enabled = true; settingsrw.logging.terminal.enabled = true;
settingsrw.logging.terminal.level = LogLevel::Trace; settingsrw.logging.terminal.level = LogLevel::Trace;
} }
if matches.occurrences_of("otlp") != 0 { if matches.occurrences_of("otlp") != 0 {
settingsrw.logging.otlp.enabled = true; settingsrw.logging.otlp.enabled = true;
settingsrw.logging.otlp.grpc_endpoint = NamedSocketAddrs::from_str( settingsrw.logging.otlp.grpc_endpoint = NamedSocketAddrs::from_str(
&matches &matches
.value_of("otlp") .value_of("otlp")
.expect("should not be null because of default missing value") .expect("should not be null because of default missing value")
.to_string(), .to_string(),
) )
.wrap_err("failed to parse OTLP address")?; .wrap_err("failed to parse OTLP address")?;
settingsrw.logging.otlp.level = LogLevel::Trace; settingsrw.logging.otlp.level = LogLevel::Trace;
} }
if matches.is_present("attach") { if matches.is_present("attach") {
settingsrw.auto_attach = !matches!(matches.value_of("attach"), Some("true")); settingsrw.auto_attach = !matches!(matches.value_of("attach"), Some("true"));
} }
if matches.is_present("local") { if matches.is_present("local") {
settingsrw.core.network.enable_local_peer_scope = true; settingsrw.core.network.enable_local_peer_scope = true;
} }
if matches.occurrences_of("delete-protected-store") != 0 { if matches.occurrences_of("delete-protected-store") != 0 {
settingsrw.core.protected_store.delete = true; settingsrw.core.protected_store.delete = true;
} }
if matches.occurrences_of("delete-block-store") != 0 { if matches.occurrences_of("delete-block-store") != 0 {
settingsrw.core.block_store.delete = true; settingsrw.core.block_store.delete = true;
} }
if matches.occurrences_of("delete-table-store") != 0 { if matches.occurrences_of("delete-table-store") != 0 {
settingsrw.core.table_store.delete = true; settingsrw.core.table_store.delete = true;
} }
if matches.occurrences_of("dump-txt-record") != 0 { if matches.occurrences_of("dump-txt-record") != 0 {
// Turn off terminal logging so we can be interactive // Turn off terminal logging so we can be interactive
settingsrw.logging.terminal.enabled = false; settingsrw.logging.terminal.enabled = false;
} }
if let Some(v) = matches.value_of("set-node-id") { if let Some(v) = matches.value_of("set-node-id") {
// Turn off terminal logging so we can be interactive // Turn off terminal logging so we can be interactive
settingsrw.logging.terminal.enabled = false; settingsrw.logging.terminal.enabled = false;
// Split or get secret // Split or get secret
let (k, s) = if let Some((k, s)) = v.split_once(':') { let (k, s) = if let Some((k, s)) = v.split_once(':') {
let k = DHTKey::try_decode(k).wrap_err("failed to decode node id from command line")?; let k = DHTKey::try_decode(k).wrap_err("failed to decode node id from command line")?;
let s = DHTKeySecret::try_decode(s)?; let s = DHTKeySecret::try_decode(s)?;
(k, s) (k, s)
} else { } else {
let k = DHTKey::try_decode(v)?; let k = DHTKey::try_decode(v)?;
let buffer = rpassword::prompt_password("Enter secret key (will not echo): ") let buffer = rpassword::prompt_password("Enter secret key (will not echo): ")
.wrap_err("invalid secret key")?; .wrap_err("invalid secret key")?;
let buffer = buffer.trim().to_string(); let buffer = buffer.trim().to_string();
let s = DHTKeySecret::try_decode(&buffer)?; let s = DHTKeySecret::try_decode(&buffer)?;
(k, s) (k, s)
}; };
settingsrw.core.network.node_id = k; settingsrw.core.network.node_id = k;
settingsrw.core.network.node_id_secret = s; settingsrw.core.network.node_id_secret = s;
} }
if matches.occurrences_of("bootstrap") != 0 { if matches.occurrences_of("bootstrap") != 0 {
let bootstrap_list = match matches.value_of("bootstrap") { let bootstrap_list = match matches.value_of("bootstrap") {
Some(x) => { Some(x) => {
println!("Overriding bootstrap list with: "); println!("Overriding bootstrap list with: ");
let mut out: Vec<String> = Vec::new(); let mut out: Vec<String> = Vec::new();
for x in x.split(',') { for x in x.split(',') {
let x = x.trim().to_string(); let x = x.trim().to_string();
println!(" {}", x); println!(" {}", x);
out.push(x); out.push(x);
} }
out out
} }
None => { None => {
bail!("value not specified for bootstrap"); bail!("value not specified for bootstrap");
} }
}; };
settingsrw.core.network.bootstrap = bootstrap_list; settingsrw.core.network.bootstrap = bootstrap_list;
} }
if matches.occurrences_of("bootstrap-nodes") != 0 { if matches.occurrences_of("bootstrap-nodes") != 0 {
let bootstrap_list = match matches.value_of("bootstrap-nodes") { let bootstrap_list = match matches.value_of("bootstrap-nodes") {
Some(x) => { Some(x) => {
println!("Overriding bootstrap node list with: "); println!("Overriding bootstrap node list with: ");
let mut out: Vec<ParsedNodeDialInfo> = Vec::new(); let mut out: Vec<ParsedNodeDialInfo> = Vec::new();
for x in x.split(',') { for x in x.split(',') {
let x = x.trim(); let x = x.trim();
println!(" {}", x); println!(" {}", x);
out.push( out.push(
ParsedNodeDialInfo::from_str(x) ParsedNodeDialInfo::from_str(x)
.wrap_err("unable to parse dial info in bootstrap node list")?, .wrap_err("unable to parse dial info in bootstrap node list")?,
); );
} }
out out
} }
None => { None => {
bail!("value not specified for bootstrap node list"); bail!("value not specified for bootstrap node list");
} }
}; };
settingsrw.core.network.bootstrap_nodes = bootstrap_list; settingsrw.core.network.bootstrap_nodes = bootstrap_list;
} }
drop(settingsrw); drop(settingsrw);
// Set specific config settings // Set specific config settings
if let Some(set_configs) = matches.values_of("set-config") { if let Some(set_configs) = matches.values_of("set-config") {
for set_config in set_configs { for set_config in set_configs {
if let Some((k, v)) = set_config.split_once('=') { if let Some((k, v)) = set_config.split_once('=') {
let k = k.trim(); let k = k.trim();
let v = v.trim(); let v = v.trim();
settings.set(k, v)?; settings.set(k, v)?;
} }
} }
} }
// Apply subnode index if we're testing // Apply subnode index if we're testing
settings settings
.apply_subnode_index() .apply_subnode_index()
.wrap_err("failed to apply subnode index")?; .wrap_err("failed to apply subnode index")?;
Ok((settings, matches)) Ok((settings, matches))
} }

View File

@ -1,168 +1,168 @@
use crate::client_api; use crate::client_api;
use crate::settings::*; use crate::settings::*;
use crate::tools::*; use crate::tools::*;
use crate::veilid_logs::*; use crate::veilid_logs::*;
use crate::*; use crate::*;
use flume::{unbounded, Receiver, Sender}; use flume::{unbounded, Receiver, Sender};
use lazy_static::*; use lazy_static::*;
use parking_lot::Mutex; use parking_lot::Mutex;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tracing::*; use tracing::*;
use veilid_core::xx::SingleShotEventual; use veilid_core::xx::SingleShotEventual;
#[derive(Debug, Copy, Clone, PartialEq, Eq)] #[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ServerMode { pub enum ServerMode {
Normal, Normal,
ShutdownImmediate, ShutdownImmediate,
DumpTXTRecord, DumpTXTRecord,
} }
lazy_static! { lazy_static! {
static ref SHUTDOWN_SWITCH: Mutex<Option<SingleShotEventual<()>>> = static ref SHUTDOWN_SWITCH: Mutex<Option<SingleShotEventual<()>>> =
Mutex::new(Some(SingleShotEventual::new(Some(())))); Mutex::new(Some(SingleShotEventual::new(Some(()))));
} }
#[instrument] #[instrument]
pub fn shutdown() { pub fn shutdown() {
let shutdown_switch = SHUTDOWN_SWITCH.lock().take(); let shutdown_switch = SHUTDOWN_SWITCH.lock().take();
if let Some(shutdown_switch) = shutdown_switch { if let Some(shutdown_switch) = shutdown_switch {
shutdown_switch.resolve(()); shutdown_switch.resolve(());
} }
} }
pub async fn run_veilid_server( pub async fn run_veilid_server(
settings: Settings, settings: Settings,
server_mode: ServerMode, server_mode: ServerMode,
veilid_logs: VeilidLogs, veilid_logs: VeilidLogs,
) -> EyreResult<()> { ) -> EyreResult<()> {
run_veilid_server_internal(settings, server_mode, veilid_logs).await run_veilid_server_internal(settings, server_mode, veilid_logs).await
} }
//#[instrument(err, skip_all)] //#[instrument(err, skip_all)]
pub async fn run_veilid_server_internal( pub async fn run_veilid_server_internal(
settings: Settings, settings: Settings,
server_mode: ServerMode, server_mode: ServerMode,
veilid_logs: VeilidLogs, veilid_logs: VeilidLogs,
) -> EyreResult<()> { ) -> EyreResult<()> {
trace!(?settings, ?server_mode); trace!(?settings, ?server_mode);
let settingsr = settings.read(); let settingsr = settings.read();
// Create client api state change pipe // Create client api state change pipe
let (sender, receiver): ( let (sender, receiver): (
Sender<veilid_core::VeilidUpdate>, Sender<veilid_core::VeilidUpdate>,
Receiver<veilid_core::VeilidUpdate>, Receiver<veilid_core::VeilidUpdate>,
) = unbounded(); ) = unbounded();
// Create VeilidCore setup // Create VeilidCore setup
let update_callback = Arc::new(move |change: veilid_core::VeilidUpdate| { let update_callback = Arc::new(move |change: veilid_core::VeilidUpdate| {
if sender.send(change).is_err() { if sender.send(change).is_err() {
error!("error sending veilid update callback"); error!("error sending veilid update callback");
} }
}); });
let config_callback = settings.get_core_config_callback(); let config_callback = settings.get_core_config_callback();
// Start Veilid Core and get API // Start Veilid Core and get API
let veilid_api = veilid_core::api_startup(update_callback, config_callback) let veilid_api = veilid_core::api_startup(update_callback, config_callback)
.await .await
.wrap_err("VeilidCore startup failed")?; .wrap_err("VeilidCore startup failed")?;
// Start client api if one is requested // Start client api if one is requested
let mut capi = if settingsr.client_api.enabled && matches!(server_mode, ServerMode::Normal) { let mut capi = if settingsr.client_api.enabled && matches!(server_mode, ServerMode::Normal) {
let some_capi = client_api::ClientApi::new(veilid_api.clone(), veilid_logs.clone()); let some_capi = client_api::ClientApi::new(veilid_api.clone(), veilid_logs.clone());
some_capi some_capi
.clone() .clone()
.run(settingsr.client_api.listen_address.addrs.clone()); .run(settingsr.client_api.listen_address.addrs.clone());
Some(some_capi) Some(some_capi)
} else { } else {
None None
}; };
// Drop rwlock on settings // Drop rwlock on settings
let auto_attach = settingsr.auto_attach || !matches!(server_mode, ServerMode::Normal); let auto_attach = settingsr.auto_attach || !matches!(server_mode, ServerMode::Normal);
drop(settingsr); drop(settingsr);
// Process all updates // Process all updates
let capi2 = capi.clone(); let capi2 = capi.clone();
let update_receiver_jh = spawn_local(async move { let update_receiver_jh = spawn_local(async move {
while let Ok(change) = receiver.recv_async().await { while let Ok(change) = receiver.recv_async().await {
if let Some(capi) = &capi2 { if let Some(capi) = &capi2 {
// Handle state changes on main thread for capnproto rpc // Handle state changes on main thread for capnproto rpc
capi.clone().handle_update(change); capi.clone().handle_update(change);
} }
} }
}); });
// Auto-attach if desired // Auto-attach if desired
let mut out = Ok(()); let mut out = Ok(());
if auto_attach { if auto_attach {
info!("Auto-attach to the Veilid network"); info!("Auto-attach to the Veilid network");
if let Err(e) = veilid_api.attach().await { if let Err(e) = veilid_api.attach().await {
out = Err(eyre!( out = Err(eyre!(
"Auto-attaching to the Veilid network failed: {:?}", "Auto-attaching to the Veilid network failed: {:?}",
e e
)); ));
shutdown(); shutdown();
} }
} }
// Process dump-txt-record // Process dump-txt-record
if matches!(server_mode, ServerMode::DumpTXTRecord) { if matches!(server_mode, ServerMode::DumpTXTRecord) {
let start_time = Instant::now(); let start_time = Instant::now();
while Instant::now().duration_since(start_time) < Duration::from_secs(10) { while Instant::now().duration_since(start_time) < Duration::from_secs(10) {
match veilid_api.get_state().await { match veilid_api.get_state().await {
Ok(vs) => { Ok(vs) => {
if vs.network.started { if vs.network.started {
break; break;
} }
} }
Err(e) => { Err(e) => {
out = Err(eyre!("Getting state failed: {:?}", e)); out = Err(eyre!("Getting state failed: {:?}", e));
break; break;
} }
} }
sleep(Duration::from_millis(100)).await; sleep(Duration::from_millis(100)).await;
} }
match veilid_api.debug("txtrecord".to_string()).await { match veilid_api.debug("txtrecord".to_string()).await {
Ok(v) => { Ok(v) => {
print!("{}", v); print!("{}", v);
} }
Err(e) => { Err(e) => {
out = Err(eyre!("Getting TXT record failed: {:?}", e)); out = Err(eyre!("Getting TXT record failed: {:?}", e));
} }
}; };
shutdown(); shutdown();
} }
// Process shutdown-immediate // Process shutdown-immediate
if matches!(server_mode, ServerMode::ShutdownImmediate) { if matches!(server_mode, ServerMode::ShutdownImmediate) {
shutdown(); shutdown();
} }
// Idle while waiting to exit // Idle while waiting to exit
let shutdown_switch = { let shutdown_switch = {
let shutdown_switch_locked = SHUTDOWN_SWITCH.lock(); let shutdown_switch_locked = SHUTDOWN_SWITCH.lock();
(*shutdown_switch_locked).as_ref().map(|ss| ss.instance()) (*shutdown_switch_locked).as_ref().map(|ss| ss.instance())
}; };
if let Some(shutdown_switch) = shutdown_switch { if let Some(shutdown_switch) = shutdown_switch {
shutdown_switch.await; shutdown_switch.await;
} }
// Stop the client api if we have one // Stop the client api if we have one
if let Some(c) = capi.as_mut().cloned() { if let Some(c) = capi.as_mut().cloned() {
c.stop().await; c.stop().await;
} }
// Shut down Veilid API to release state change sender // Shut down Veilid API to release state change sender
veilid_api.shutdown().await; veilid_api.shutdown().await;
// Wait for update receiver to exit // Wait for update receiver to exit
let _ = update_receiver_jh.await; let _ = update_receiver_jh.await;
// Finally, drop logs // Finally, drop logs
// this is explicit to ensure we don't accidentally drop them too soon via a move // this is explicit to ensure we don't accidentally drop them too soon via a move
drop(veilid_logs); drop(veilid_logs);
out out
} }

View File

@ -1,208 +1,208 @@
use crate::settings::*; use crate::settings::*;
use crate::*; use crate::*;
use cfg_if::*; use cfg_if::*;
use opentelemetry::sdk::*; use opentelemetry::sdk::*;
use opentelemetry::*; use opentelemetry::*;
use opentelemetry_otlp::WithExportConfig; use opentelemetry_otlp::WithExportConfig;
use parking_lot::*; use parking_lot::*;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::path::*; use std::path::*;
use std::sync::Arc; use std::sync::Arc;
use tracing_appender::*; use tracing_appender::*;
use tracing_subscriber::prelude::*; use tracing_subscriber::prelude::*;
use tracing_subscriber::*; use tracing_subscriber::*;
struct VeilidLogsInner { struct VeilidLogsInner {
_guard: Option<non_blocking::WorkerGuard>, _guard: Option<non_blocking::WorkerGuard>,
filters: BTreeMap<&'static str, veilid_core::VeilidLayerFilter>, filters: BTreeMap<&'static str, veilid_core::VeilidLayerFilter>,
} }
#[derive(Clone)] #[derive(Clone)]
pub struct VeilidLogs { pub struct VeilidLogs {
inner: Arc<Mutex<VeilidLogsInner>>, inner: Arc<Mutex<VeilidLogsInner>>,
} }
impl VeilidLogs { impl VeilidLogs {
pub fn setup(settings: Settings) -> EyreResult<VeilidLogs> { pub fn setup(settings: Settings) -> EyreResult<VeilidLogs> {
let settingsr = settings.read(); let settingsr = settings.read();
// Set up subscriber and layers // Set up subscriber and layers
let subscriber = Registry::default(); let subscriber = Registry::default();
let mut layers = Vec::new(); let mut layers = Vec::new();
let mut filters = BTreeMap::new(); let mut filters = BTreeMap::new();
// Error layer // Error layer
// XXX: Spantrace capture causes rwlock deadlocks/crashes // XXX: Spantrace capture causes rwlock deadlocks/crashes
// XXX: // XXX:
//layers.push(tracing_error::ErrorLayer::default().boxed()); //layers.push(tracing_error::ErrorLayer::default().boxed());
// Terminal logger // Terminal logger
if settingsr.logging.terminal.enabled { if settingsr.logging.terminal.enabled {
let filter = veilid_core::VeilidLayerFilter::new( let filter = veilid_core::VeilidLayerFilter::new(
convert_loglevel(settingsr.logging.terminal.level), convert_loglevel(settingsr.logging.terminal.level),
None, None,
); );
let layer = fmt::Layer::new() let layer = fmt::Layer::new()
.compact() .compact()
.with_writer(std::io::stdout) .with_writer(std::io::stdout)
.with_filter(filter.clone()); .with_filter(filter.clone());
filters.insert("terminal", filter); filters.insert("terminal", filter);
layers.push(layer.boxed()); layers.push(layer.boxed());
} }
// OpenTelemetry logger // OpenTelemetry logger
if settingsr.logging.otlp.enabled { if settingsr.logging.otlp.enabled {
let grpc_endpoint = settingsr.logging.otlp.grpc_endpoint.name.clone(); let grpc_endpoint = settingsr.logging.otlp.grpc_endpoint.name.clone();
cfg_if! { cfg_if! {
if #[cfg(feature="rt-async-std")] { if #[cfg(feature="rt-async-std")] {
let exporter = opentelemetry_otlp::new_exporter() let exporter = opentelemetry_otlp::new_exporter()
.grpcio() .grpcio()
.with_endpoint(grpc_endpoint); .with_endpoint(grpc_endpoint);
let batch = opentelemetry::runtime::AsyncStd; let batch = opentelemetry::runtime::AsyncStd;
} else if #[cfg(feature="rt-tokio")] { } else if #[cfg(feature="rt-tokio")] {
let exporter = opentelemetry_otlp::new_exporter() let exporter = opentelemetry_otlp::new_exporter()
.tonic() .tonic()
.with_endpoint(format!("http://{}", grpc_endpoint)); .with_endpoint(format!("http://{}", grpc_endpoint));
let batch = opentelemetry::runtime::Tokio; let batch = opentelemetry::runtime::Tokio;
} }
} }
let tracer = opentelemetry_otlp::new_pipeline() let tracer = opentelemetry_otlp::new_pipeline()
.tracing() .tracing()
.with_exporter(exporter) .with_exporter(exporter)
.with_trace_config(opentelemetry::sdk::trace::config().with_resource( .with_trace_config(opentelemetry::sdk::trace::config().with_resource(
Resource::new(vec![KeyValue::new( Resource::new(vec![KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME, opentelemetry_semantic_conventions::resource::SERVICE_NAME,
format!( format!(
"veilid_server:{}", "veilid_server:{}",
hostname::get() hostname::get()
.map(|s| s.to_string_lossy().into_owned()) .map(|s| s.to_string_lossy().into_owned())
.unwrap_or_else(|_| "unknown".to_owned()) .unwrap_or_else(|_| "unknown".to_owned())
), ),
)]), )]),
)) ))
.install_batch(batch) .install_batch(batch)
.wrap_err("failed to install OpenTelemetry tracer")?; .wrap_err("failed to install OpenTelemetry tracer")?;
let filter = veilid_core::VeilidLayerFilter::new( let filter = veilid_core::VeilidLayerFilter::new(
convert_loglevel(settingsr.logging.otlp.level), convert_loglevel(settingsr.logging.otlp.level),
None, None,
); );
let layer = tracing_opentelemetry::layer() let layer = tracing_opentelemetry::layer()
.with_tracer(tracer) .with_tracer(tracer)
.with_filter(filter.clone()); .with_filter(filter.clone());
filters.insert("otlp", filter); filters.insert("otlp", filter);
layers.push(layer.boxed()); layers.push(layer.boxed());
} }
// File logger // File logger
let mut guard = None; let mut guard = None;
if settingsr.logging.file.enabled { if settingsr.logging.file.enabled {
let log_path = Path::new(&settingsr.logging.file.path); let log_path = Path::new(&settingsr.logging.file.path);
let full_path = std::env::current_dir() let full_path = std::env::current_dir()
.unwrap_or(PathBuf::from(MAIN_SEPARATOR.to_string())) .unwrap_or(PathBuf::from(MAIN_SEPARATOR.to_string()))
.join(log_path); .join(log_path);
let log_parent = full_path let log_parent = full_path
.parent() .parent()
.unwrap_or(Path::new(&MAIN_SEPARATOR.to_string())) .unwrap_or(Path::new(&MAIN_SEPARATOR.to_string()))
.canonicalize() .canonicalize()
.wrap_err(format!( .wrap_err(format!(
"File log path parent does not exist: {}", "File log path parent does not exist: {}",
settingsr.logging.file.path settingsr.logging.file.path
))?; ))?;
let log_filename = full_path.file_name().ok_or(eyre!( let log_filename = full_path.file_name().ok_or(eyre!(
"File log filename not specified in path: {}", "File log filename not specified in path: {}",
settingsr.logging.file.path settingsr.logging.file.path
))?; ))?;
let appender = tracing_appender::rolling::never(log_parent, Path::new(log_filename)); let appender = tracing_appender::rolling::never(log_parent, Path::new(log_filename));
let (non_blocking_appender, non_blocking_guard) = let (non_blocking_appender, non_blocking_guard) =
tracing_appender::non_blocking(appender); tracing_appender::non_blocking(appender);
guard = Some(non_blocking_guard); guard = Some(non_blocking_guard);
let filter = veilid_core::VeilidLayerFilter::new( let filter = veilid_core::VeilidLayerFilter::new(
convert_loglevel(settingsr.logging.file.level), convert_loglevel(settingsr.logging.file.level),
None, None,
); );
let layer = fmt::Layer::new() let layer = fmt::Layer::new()
.compact() .compact()
.with_writer(non_blocking_appender) .with_writer(non_blocking_appender)
.with_filter(filter.clone()); .with_filter(filter.clone());
filters.insert("file", filter); filters.insert("file", filter);
layers.push(layer.boxed()); layers.push(layer.boxed());
} }
// API logger // API logger
if settingsr.logging.api.enabled { if settingsr.logging.api.enabled {
let filter = veilid_core::VeilidLayerFilter::new( let filter = veilid_core::VeilidLayerFilter::new(
convert_loglevel(settingsr.logging.api.level), convert_loglevel(settingsr.logging.api.level),
None, None,
); );
let layer = veilid_core::ApiTracingLayer::get().with_filter(filter.clone()); let layer = veilid_core::ApiTracingLayer::get().with_filter(filter.clone());
filters.insert("api", filter); filters.insert("api", filter);
layers.push(layer.boxed()); layers.push(layer.boxed());
} }
// Systemd Journal logger // Systemd Journal logger
cfg_if! { cfg_if! {
if #[cfg(target_os = "linux")] { if #[cfg(target_os = "linux")] {
if settingsr.logging.system.enabled { if settingsr.logging.system.enabled {
let filter = veilid_core::VeilidLayerFilter::new( let filter = veilid_core::VeilidLayerFilter::new(
convert_loglevel(settingsr.logging.system.level), convert_loglevel(settingsr.logging.system.level),
None, None,
); );
let layer =tracing_journald::layer().wrap_err("failed to set up journald logging")? let layer = tracing_journald::layer().wrap_err("failed to set up journald logging")?
.with_filter(filter.clone()); .with_filter(filter.clone());
filters.insert("system", filter); filters.insert("system", filter);
layers.push(layer.boxed()); layers.push(layer.boxed());
} }
} }
} }
let subscriber = subscriber.with(layers); let subscriber = subscriber.with(layers);
subscriber subscriber
.try_init() .try_init()
.wrap_err("failed to initialize logging")?; .wrap_err("failed to initialize logging")?;
Ok(VeilidLogs { Ok(VeilidLogs {
inner: Arc::new(Mutex::new(VeilidLogsInner { inner: Arc::new(Mutex::new(VeilidLogsInner {
_guard: guard, _guard: guard,
filters, filters,
})), })),
}) })
} }
pub fn change_log_level( pub fn change_log_level(
&self, &self,
layer: String, layer: String,
log_level: veilid_core::VeilidConfigLogLevel, log_level: veilid_core::VeilidConfigLogLevel,
) -> Result<(), veilid_core::VeilidAPIError> { ) -> Result<(), veilid_core::VeilidAPIError> {
// get layer to change level on // get layer to change level on
let layer = if layer == "all" { "".to_owned() } else { layer }; let layer = if layer == "all" { "".to_owned() } else { layer };
// change log level on appropriate layer // change log level on appropriate layer
let inner = self.inner.lock(); let inner = self.inner.lock();
if layer.is_empty() { if layer.is_empty() {
// Change all layers // Change all layers
for f in inner.filters.values() { for f in inner.filters.values() {
f.set_max_level(log_level); f.set_max_level(log_level);
} }
} else { } else {
// Change a specific layer // Change a specific layer
let f = match inner.filters.get(layer.as_str()) { let f = match inner.filters.get(layer.as_str()) {
Some(f) => f, Some(f) => f,
None => { None => {
return Err(veilid_core::VeilidAPIError::InvalidArgument { return Err(veilid_core::VeilidAPIError::InvalidArgument {
context: "change_log_level".to_owned(), context: "change_log_level".to_owned(),
argument: "layer".to_owned(), argument: "layer".to_owned(),
value: layer, value: layer,
}); });
} }
}; };
f.set_max_level(log_level); f.set_max_level(log_level);
} }
Ok(()) Ok(())
} }
} }