api and log refactor

This commit is contained in:
John Smith
2022-07-01 12:13:52 -04:00
parent f409c84778
commit c106d324c8
25 changed files with 662 additions and 501 deletions

View File

@@ -36,7 +36,6 @@ capnp = "^0"
parking_lot = "^0"
capnp-rpc = "^0"
config = { version = "^0", features = ["yaml"] }
failure = "^0"
cfg-if = "^1"
serde = "^1"
serde_derive = "^1"

View File

@@ -1,20 +1,24 @@
@0xd29582d26b2fb073;
struct ApiResult {
union {
ok @0 :Text;
err @1 :Text;
}
}
interface Registration {}
interface VeilidServer {
register @0 (veilidClient: VeilidClient) -> (registration: Registration, state: Text);
debug @1 (what: Text) -> (output: Text);
attach @2 ();
detach @3 ();
debug @1 (command: Text) -> (result :ApiResult);
attach @2 () -> (result :ApiResult);
detach @3 () -> (result :ApiResult);
shutdown @4 ();
getState @5 () -> (state: Text);
getState @5 () -> (result :ApiResult);
changeLogLevel @6 (layer: Text, logLevel: Text) -> (result :ApiResult);
}
interface VeilidClient {
update @0 (veilidUpdate: Text);
}

View File

@@ -1,12 +1,14 @@
use crate::tools::*;
use crate::veilid_client_capnp::*;
use crate::veilid_logs::VeilidLogs;
use capnp::capability::Promise;
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};
use cfg_if::*;
use failure::*;
use futures_util::{future::try_join_all, FutureExt as FuturesFutureExt, StreamExt};
use serde::*;
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt;
use std::net::SocketAddr;
use std::rc::Rc;
use stop_token::future::FutureExt;
@@ -14,9 +16,20 @@ use stop_token::*;
use tracing::*;
use veilid_core::*;
#[derive(Fail, Debug)]
#[fail(display = "Client API error: {}", _0)]
pub struct ClientAPIError(String);
// Encoding for ApiResult
fn encode_api_result<T: Serialize + fmt::Debug>(
result: &Result<T, VeilidAPIError>,
builder: &mut api_result::Builder,
) {
match result {
Ok(v) => {
builder.set_ok(&serialize_json(v));
}
Err(e) => {
builder.set_err(&serialize_json(e));
}
}
}
// --- interface Registration ---------------------------------
@@ -67,17 +80,19 @@ impl registration::Server for RegistrationImpl {}
struct VeilidServerImpl {
veilid_api: veilid_core::VeilidAPI,
veilid_logs: VeilidLogs,
next_id: u64,
pub registration_map: Rc<RefCell<RegistrationMap>>,
}
impl VeilidServerImpl {
#[instrument(level = "trace", skip_all)]
pub fn new(veilid_api: veilid_core::VeilidAPI) -> Self {
pub fn new(veilid_api: veilid_core::VeilidAPI, veilid_logs: VeilidLogs) -> Self {
Self {
next_id: 0,
registration_map: Rc::new(RefCell::new(RegistrationMap::new())),
veilid_api,
veilid_logs,
}
}
}
@@ -115,13 +130,7 @@ impl veilid_server::Server for VeilidServerImpl {
let mut res = results.get();
res.set_registration(registration);
let mut rpc_state = res.init_state(
state
.len()
.try_into()
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?,
);
rpc_state.push_str(&state);
res.set_state(&state);
Ok(())
})
@@ -135,14 +144,11 @@ impl veilid_server::Server for VeilidServerImpl {
) -> Promise<(), ::capnp::Error> {
trace!("VeilidServerImpl::debug");
let veilid_api = self.veilid_api.clone();
let what = pry!(pry!(params.get()).get_what()).to_owned();
let command = pry!(pry!(params.get()).get_command()).to_owned();
Promise::from_future(async move {
let output = veilid_api
.debug(what)
.await
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?;
results.get().set_output(output.as_str());
let result = veilid_api.debug(command).await;
encode_api_result(&result, &mut results.get().init_result());
Ok(())
})
}
@@ -151,15 +157,14 @@ impl veilid_server::Server for VeilidServerImpl {
fn attach(
&mut self,
_params: veilid_server::AttachParams,
mut _results: veilid_server::AttachResults,
mut results: veilid_server::AttachResults,
) -> Promise<(), ::capnp::Error> {
trace!("VeilidServerImpl::attach");
let veilid_api = self.veilid_api.clone();
Promise::from_future(async move {
veilid_api
.attach()
.await
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))
let result = veilid_api.attach().await;
encode_api_result(&result, &mut results.get().init_result());
Ok(())
})
}
@@ -167,15 +172,14 @@ impl veilid_server::Server for VeilidServerImpl {
fn detach(
&mut self,
_params: veilid_server::DetachParams,
mut _results: veilid_server::DetachResults,
mut results: veilid_server::DetachResults,
) -> Promise<(), ::capnp::Error> {
trace!("VeilidServerImpl::detach");
let veilid_api = self.veilid_api.clone();
Promise::from_future(async move {
veilid_api
.detach()
.await
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))
let result = veilid_api.detach().await;
encode_api_result(&result, &mut results.get().init_result());
Ok(())
})
}
@@ -208,24 +212,30 @@ impl veilid_server::Server for VeilidServerImpl {
trace!("VeilidServerImpl::get_state");
let veilid_api = self.veilid_api.clone();
Promise::from_future(async move {
let state = veilid_api
.get_state()
.await
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?;
let state = serialize_json(state);
let res = results.get();
let mut rpc_state = res.init_state(
state
.len()
.try_into()
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?,
);
rpc_state.push_str(&state);
let result = veilid_api.get_state().await;
encode_api_result(&result, &mut results.get().init_result());
Ok(())
})
}
#[instrument(level = "trace", skip_all)]
fn change_log_level(
&mut self,
params: veilid_server::ChangeLogLevelParams,
mut results: veilid_server::ChangeLogLevelResults,
) -> Promise<(), ::capnp::Error> {
trace!("VeilidServerImpl::change_log_level");
let layer = pry!(pry!(params.get()).get_layer()).to_owned();
let log_level_json = pry!(pry!(params.get()).get_log_level()).to_owned();
let log_level: veilid_core::VeilidConfigLogLevel =
pry!(veilid_core::deserialize_json(&log_level_json)
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e))));
let result = self.veilid_logs.change_log_level(layer, log_level);
encode_api_result(&result, &mut results.get().init_result());
Promise::ok(())
}
}
// --- Client API Server-Side ---------------------------------
@@ -235,6 +245,7 @@ type ClientApiAllFuturesJoinHandle =
struct ClientApiInner {
veilid_api: veilid_core::VeilidAPI,
veilid_logs: VeilidLogs,
registration_map: Rc<RefCell<RegistrationMap>>,
stop: Option<StopSource>,
join_handle: Option<ClientApiAllFuturesJoinHandle>,
@@ -246,10 +257,11 @@ pub struct ClientApi {
impl ClientApi {
#[instrument(level = "trace", skip_all)]
pub fn new(veilid_api: veilid_core::VeilidAPI) -> Rc<Self> {
pub fn new(veilid_api: veilid_core::VeilidAPI, veilid_logs: VeilidLogs) -> Rc<Self> {
Rc::new(Self {
inner: RefCell::new(ClientApiInner {
veilid_api,
veilid_logs,
registration_map: Rc::new(RefCell::new(RegistrationMap::new())),
stop: Some(StopSource::new()),
join_handle: None,
@@ -393,7 +405,10 @@ impl ClientApi {
#[instrument(level = "trace", skip(self))]
pub fn run(self: Rc<Self>, bind_addrs: Vec<SocketAddr>) {
// Create client api VeilidServer
let veilid_server_impl = VeilidServerImpl::new(self.inner.borrow().veilid_api.clone());
let veilid_server_impl = VeilidServerImpl::new(
self.inner.borrow().veilid_api.clone(),
self.inner.borrow().veilid_logs.clone(),
);
self.inner.borrow_mut().registration_map = veilid_server_impl.registration_map.clone();
// Make a client object for the server to send to each rpc client

View File

@@ -62,9 +62,9 @@ fn main() -> Result<(), String> {
// run the server to set the node id and quit
return block_on(async {
// Init combined console/file logger
let _logs = VeilidLogs::setup(settings.clone())?;
let veilid_logs = VeilidLogs::setup(settings.clone())?;
run_veilid_server(settings, server_mode).await
run_veilid_server(settings, server_mode, veilid_logs).await
})
.map(|v| {
println!("{}", success);
@@ -96,8 +96,8 @@ fn main() -> Result<(), String> {
// Run the server loop
block_on(async {
// Init combined console/file logger
let _logs = VeilidLogs::setup(settings.clone())?;
let veilid_logs = VeilidLogs::setup(settings.clone())?;
run_veilid_server(settings, server_mode).await
run_veilid_server(settings, server_mode, veilid_logs).await
})
}

View File

@@ -1,6 +1,7 @@
use crate::client_api;
use crate::settings::*;
use crate::tools::*;
use crate::veilid_logs::*;
use flume::{unbounded, Receiver, Sender};
use lazy_static::*;
use parking_lot::Mutex;
@@ -29,14 +30,19 @@ pub fn shutdown() {
}
}
pub async fn run_veilid_server(settings: Settings, server_mode: ServerMode) -> Result<(), String> {
run_veilid_server_internal(settings, server_mode).await
pub async fn run_veilid_server(
settings: Settings,
server_mode: ServerMode,
veilid_logs: VeilidLogs,
) -> Result<(), String> {
run_veilid_server_internal(settings, server_mode, veilid_logs).await
}
#[instrument(err, skip_all)]
pub async fn run_veilid_server_internal(
settings: Settings,
server_mode: ServerMode,
veilid_logs: VeilidLogs,
) -> Result<(), String> {
trace!(?settings, ?server_mode);
@@ -63,7 +69,7 @@ pub async fn run_veilid_server_internal(
// Start client api if one is requested
let mut capi = if settingsr.client_api.enabled && matches!(server_mode, ServerMode::Normal) {
let some_capi = client_api::ClientApi::new(veilid_api.clone());
let some_capi = client_api::ClientApi::new(veilid_api.clone(), veilid_logs.clone());
some_capi
.clone()
.run(settingsr.client_api.listen_address.addrs.clone());
@@ -156,5 +162,9 @@ pub async fn run_veilid_server_internal(
// Wait for update receiver to exit
let _ = update_receiver_jh.await;
// Finally, drop logs
// this is explicit to ensure we don't accidentally drop them too soon via a move
drop(veilid_logs);
out
}

View File

@@ -192,6 +192,7 @@ pub fn load_config(
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum LogLevel {
Off,
Error,
Warn,
Info,
@@ -205,6 +206,7 @@ impl<'de> serde::Deserialize<'de> for LogLevel {
{
let s = String::deserialize(deserializer)?;
match s.to_ascii_lowercase().as_str() {
"off" => Ok(LogLevel::Off),
"error" => Ok(LogLevel::Error),
"warn" => Ok(LogLevel::Warn),
"info" => Ok(LogLevel::Info),
@@ -223,6 +225,7 @@ impl serde::Serialize for LogLevel {
S: serde::Serializer,
{
let s = match self {
LogLevel::Off => "off",
LogLevel::Error => "error",
LogLevel::Warn => "warn",
LogLevel::Info => "info",
@@ -233,13 +236,14 @@ impl serde::Serialize for LogLevel {
}
}
pub fn convert_loglevel(log_level: LogLevel) -> veilid_core::VeilidLogLevel {
pub fn convert_loglevel(log_level: LogLevel) -> veilid_core::VeilidConfigLogLevel {
match log_level {
LogLevel::Error => veilid_core::VeilidLogLevel::Error,
LogLevel::Warn => veilid_core::VeilidLogLevel::Warn,
LogLevel::Info => veilid_core::VeilidLogLevel::Info,
LogLevel::Debug => veilid_core::VeilidLogLevel::Debug,
LogLevel::Trace => veilid_core::VeilidLogLevel::Trace,
LogLevel::Off => veilid_core::VeilidConfigLogLevel::Off,
LogLevel::Error => veilid_core::VeilidConfigLogLevel::Error,
LogLevel::Warn => veilid_core::VeilidConfigLogLevel::Warn,
LogLevel::Info => veilid_core::VeilidConfigLogLevel::Info,
LogLevel::Debug => veilid_core::VeilidConfigLogLevel::Debug,
LogLevel::Trace => veilid_core::VeilidConfigLogLevel::Trace,
}
}
@@ -1016,13 +1020,6 @@ impl Settings {
} else {
format!("subnode{}", inner.testing.subnode_index)
})),
"api_log_level" => Ok(Box::new(if inner.logging.api.enabled {
veilid_core::VeilidConfigLogLevel::from_veilid_log_level(Some(
convert_loglevel(inner.logging.api.level),
))
} else {
veilid_core::VeilidConfigLogLevel::Off
})),
"capabilities.protocol_udp" => Ok(Box::new(true)),
"capabilities.protocol_connect_tcp" => Ok(Box::new(true)),
"capabilities.protocol_accept_tcp" => Ok(Box::new(true)),

View File

@@ -98,7 +98,7 @@ pub fn run_daemon(settings: Settings, _matches: ArgMatches) -> Result<(), String
// Now, run the server
block_on(async {
// Init combined console/file logger
let _logs = VeilidLogs::setup(settings.clone())?;
let veilid_logs = VeilidLogs::setup(settings.clone())?;
// Daemonize
daemon
@@ -112,7 +112,7 @@ pub fn run_daemon(settings: Settings, _matches: ArgMatches) -> Result<(), String
let signals_task = spawn(handle_signals(signals));
let res = run_veilid_server(settings, ServerMode::Normal).await;
let res = run_veilid_server(settings, ServerMode::Normal, veilid_logs).await;
// Terminate the signal stream.
handle.close();

View File

@@ -3,25 +3,22 @@ use cfg_if::*;
use opentelemetry::sdk::*;
use opentelemetry::*;
use opentelemetry_otlp::WithExportConfig;
use parking_lot::*;
use std::collections::BTreeMap;
use std::path::*;
use tracing::*;
use std::sync::Arc;
use tracing_appender::*;
use tracing_subscriber::prelude::*;
use tracing_subscriber::*;
pub struct VeilidLogs {
pub guard: Option<non_blocking::WorkerGuard>,
struct VeilidLogsInner {
_guard: Option<non_blocking::WorkerGuard>,
filters: BTreeMap<&'static str, veilid_core::VeilidLayerFilter>,
}
fn logfilter<T: AsRef<str>, V: AsRef<[T]>>(metadata: &Metadata, ignore_list: V) -> bool {
// Skip filtered targets
!match (metadata.target(), ignore_list.as_ref()) {
(path, ignore) if !ignore.is_empty() => {
// Check that the module path does not match any ignore filters
ignore.iter().any(|v| path.starts_with(v.as_ref()))
}
_ => false,
}
#[derive(Clone)]
pub struct VeilidLogs {
inner: Arc<Mutex<VeilidLogsInner>>,
}
impl VeilidLogs {
@@ -29,39 +26,26 @@ impl VeilidLogs {
let settingsr = settings.read();
// Set up subscriber and layers
let mut ignore_list = Vec::<String>::new();
for ig in veilid_core::DEFAULT_LOG_IGNORE_LIST {
ignore_list.push(ig.to_owned());
}
let subscriber = Registry::default();
let mut layers = Vec::new();
let mut filters = BTreeMap::new();
// Terminal logger
let subscriber = subscriber.with(if settingsr.logging.terminal.enabled {
let terminal_max_log_level: level_filters::LevelFilter =
convert_loglevel(settingsr.logging.terminal.level)
.to_tracing_level()
.into();
let ignore_list = ignore_list.clone();
Some(
fmt::Layer::new()
.compact()
.with_writer(std::io::stdout)
.with_filter(terminal_max_log_level)
.with_filter(filter::FilterFn::new(move |metadata| {
logfilter(metadata, &ignore_list)
})),
)
} else {
None
});
if settingsr.logging.terminal.enabled {
let filter = veilid_core::VeilidLayerFilter::new(
convert_loglevel(settingsr.logging.terminal.level),
None,
);
let layer = fmt::Layer::new()
.compact()
.with_writer(std::io::stdout)
.with_filter(filter.clone());
filters.insert("terminal", filter);
layers.push(layer.boxed());
}
// OpenTelemetry logger
let subscriber = subscriber.with(if settingsr.logging.otlp.enabled {
let otlp_max_log_level: level_filters::LevelFilter =
convert_loglevel(settingsr.logging.otlp.level)
.to_tracing_level()
.into();
if settingsr.logging.otlp.enabled {
let grpc_endpoint = settingsr.logging.otlp.grpc_endpoint.name.clone();
cfg_if! {
@@ -95,27 +79,20 @@ impl VeilidLogs {
.install_batch(batch)
.map_err(|e| format!("failed to install OpenTelemetry tracer: {}", e))?;
let ignore_list = ignore_list.clone();
Some(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(otlp_max_log_level)
.with_filter(filter::FilterFn::new(move |metadata| {
logfilter(metadata, &ignore_list)
})),
)
} else {
None
});
let filter = veilid_core::VeilidLayerFilter::new(
convert_loglevel(settingsr.logging.otlp.level),
None,
);
let layer = tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter.clone());
filters.insert("otlp", filter);
layers.push(layer.boxed());
}
// File logger
let mut guard = None;
let subscriber = subscriber.with(if settingsr.logging.file.enabled {
let file_max_log_level: level_filters::LevelFilter =
convert_loglevel(settingsr.logging.file.level)
.to_tracing_level()
.into();
if settingsr.logging.file.enabled {
let log_path = Path::new(&settingsr.logging.file.path);
let full_path = std::env::current_dir()
.unwrap_or(PathBuf::from(MAIN_SEPARATOR.to_string()))
@@ -140,50 +117,88 @@ impl VeilidLogs {
tracing_appender::non_blocking(appender);
guard = Some(non_blocking_guard);
let ignore_list = ignore_list.clone();
Some(
fmt::Layer::new()
.compact()
.with_writer(non_blocking_appender)
.with_filter(file_max_log_level)
.with_filter(filter::FilterFn::new(move |metadata| {
logfilter(metadata, &ignore_list)
})),
)
} else {
None
});
let filter = veilid_core::VeilidLayerFilter::new(
convert_loglevel(settingsr.logging.file.level),
None,
);
let layer = fmt::Layer::new()
.compact()
.with_writer(non_blocking_appender)
.with_filter(filter.clone());
filters.insert("file", filter);
layers.push(layer.boxed());
}
// API logger
let subscriber = subscriber.with(if settingsr.logging.api.enabled {
// Get layer from veilid core, filtering is done by ApiTracingLayer automatically
Some(veilid_core::ApiTracingLayer::get())
} else {
None
});
if settingsr.logging.api.enabled {
let filter = veilid_core::VeilidLayerFilter::new(
convert_loglevel(settingsr.logging.api.level),
None,
);
let layer = veilid_core::ApiTracingLayer::get().with_filter(filter.clone());
filters.insert("api", filter);
layers.push(layer.boxed());
}
// Systemd Journal logger
cfg_if! {
if #[cfg(target_os = "linux")] {
let subscriber = subscriber.with(if settingsr.logging.system.enabled {
let ignore_list = ignore_list.clone();
let system_max_log_level: level_filters::LevelFilter = convert_loglevel(settingsr.logging.system.level).to_tracing_level().into();
Some(tracing_journald::layer().map_err(|e| format!("failed to set up journald logging: {}", e))?
.with_filter(system_max_log_level)
.with_filter(filter::FilterFn::new(move |metadata| {
logfilter(metadata, &ignore_list)
}))
)
} else {
None
});
if settingsr.logging.system.enabled {
let filter = veilid_core::VeilidLayerFilter::new(
convert_loglevel(settingsr.logging.system.level),
None,
);
let layer =tracing_journald::layer().map_err(|e| format!("failed to set up journald logging: {}", e))?
.with_filter(filter.clone());
filters.insert("system", filter);
layers.push(layer.boxed());
}
}
}
let subscriber = subscriber.with(layers);
subscriber
.try_init()
.map_err(|e| format!("failed to initialize logging: {}", e))?;
Ok(VeilidLogs { guard })
Ok(VeilidLogs {
inner: Arc::new(Mutex::new(VeilidLogsInner {
_guard: guard,
filters,
})),
})
}
pub fn change_log_level(
&self,
layer: String,
log_level: veilid_core::VeilidConfigLogLevel,
) -> Result<(), veilid_core::VeilidAPIError> {
// get layer to change level on
let layer = if layer == "all" { "".to_owned() } else { layer };
// change log level on appropriate layer
let inner = self.inner.lock();
if layer.is_empty() {
// Change all layers
for f in inner.filters.values() {
f.set_max_level(log_level);
}
} else {
// Change a specific layer
let f = match inner.filters.get(layer.as_str()) {
Some(f) => f,
None => {
return Err(veilid_core::VeilidAPIError::InvalidArgument {
context: "change_log_level".to_owned(),
argument: "layer".to_owned(),
value: layer,
});
}
};
f.set_max_level(log_level);
}
Ok(())
}
}