refactor checkpoint

This commit is contained in:
John Smith
2022-06-07 21:31:05 -04:00
parent 182af30b97
commit 1d8c63786a
28 changed files with 822 additions and 626 deletions

View File

@@ -7,74 +7,18 @@ use failure::*;
use futures::io::AsyncReadExt;
use futures::FutureExt as FuturesFutureExt;
use futures::StreamExt;
use log::*;
use std::cell::RefCell;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::rc::Rc;
use tracing::*;
use veilid_core::xx::Eventual;
use veilid_core::*;
#[derive(Fail, Debug)]
#[fail(display = "Client API error: {}", _0)]
pub struct ClientAPIError(String);
fn convert_attachment_state(state: &veilid_core::AttachmentState) -> AttachmentState {
match state {
veilid_core::AttachmentState::Detached => AttachmentState::Detached,
veilid_core::AttachmentState::Attaching => AttachmentState::Attaching,
veilid_core::AttachmentState::AttachedWeak => AttachmentState::AttachedWeak,
veilid_core::AttachmentState::AttachedGood => AttachmentState::AttachedGood,
veilid_core::AttachmentState::AttachedStrong => AttachmentState::AttachedStrong,
veilid_core::AttachmentState::FullyAttached => AttachmentState::FullyAttached,
veilid_core::AttachmentState::OverAttached => AttachmentState::OverAttached,
veilid_core::AttachmentState::Detaching => AttachmentState::Detaching,
}
}
fn convert_update(
update: &veilid_core::VeilidUpdate,
mut rpc_update: crate::veilid_client_capnp::veilid_update::Builder,
) {
match update {
veilid_core::VeilidUpdate::Log(veilid_core::VeilidStateLog {
log_level: _,
message: _,
}) => {
panic!("Should not be logging to api in server!");
}
veilid_core::VeilidUpdate::Attachment(veilid_core::VeilidStateAttachment { state }) => {
let mut att = rpc_update.init_attachment();
att.set_state(convert_attachment_state(state));
}
veilid_core::VeilidUpdate::Network(veilid_core::VeilidStateNetwork {
started,
bps_down,
bps_up,
}) => {
let mut nb = rpc_update.init_network();
nb.set_started(*started);
nb.set_bps_down(*bps_down);
nb.set_bps_up(*bps_up);
}
veilid_core::VeilidUpdate::Shutdown => {
rpc_update.set_shutdown(());
}
}
}
fn convert_state(
state: &veilid_core::VeilidState,
mut rpc_state: crate::veilid_client_capnp::veilid_state::Builder,
) {
let mut ab = rpc_state.reborrow().init_attachment();
ab.set_state(convert_attachment_state(&state.attachment.state));
let mut nb = rpc_state.reborrow().init_network();
nb.set_started(state.network.started);
nb.set_bps_down(state.network.bps_down);
nb.set_bps_up(state.network.bps_up);
}
// --- interface Registration ---------------------------------
struct RegistrationHandle {
@@ -166,11 +110,17 @@ impl veilid_server::Server for VeilidServerImpl {
.get_state()
.await
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?;
let state = serialize_json(state);
let mut res = results.get();
res.set_registration(registration);
let rpc_state = res.init_state();
convert_state(&state, rpc_state);
let mut rpc_state = res.init_state(
state
.len()
.try_into()
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?,
);
rpc_state.push_str(&state);
Ok(())
})
@@ -256,9 +206,17 @@ impl veilid_server::Server for VeilidServerImpl {
.get_state()
.await
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?;
let state = serialize_json(state);
let res = results.get();
let mut rpc_state = res.init_state(
state
.len()
.try_into()
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?,
);
rpc_state.push_str(&state);
let rpc_state = results.get().init_state();
convert_state(&state, rpc_state);
Ok(())
})
}
@@ -345,7 +303,7 @@ impl ClientApi {
fn send_request_to_all_clients<F, T>(self: Rc<Self>, request: F)
where
F: Fn(u64, &mut RegistrationHandle) -> ::capnp::capability::RemotePromise<T>,
F: Fn(u64, &mut RegistrationHandle) -> Option<::capnp::capability::RemotePromise<T>>,
T: capnp::traits::Pipelined + for<'a> capnp::traits::Owned<'a> + 'static + Unpin,
{
// Send status update to each registered client
@@ -361,40 +319,44 @@ impl ClientApi {
}
registration.requests_in_flight += 1;
let request_promise = request(id, registration);
let registration_map2 = registration_map1.clone();
async_std::task::spawn_local(request_promise.promise.map(move |r| match r {
Ok(_) => {
if let Some(ref mut s) =
registration_map2.borrow_mut().registrations.get_mut(&id)
{
s.requests_in_flight -= 1;
if let Some(request_promise) = request(id, registration) {
let registration_map2 = registration_map1.clone();
async_std::task::spawn_local(request_promise.promise.map(move |r| match r {
Ok(_) => {
if let Some(ref mut s) =
registration_map2.borrow_mut().registrations.get_mut(&id)
{
s.requests_in_flight -= 1;
}
}
}
Err(e) => {
println!("Got error: {:?}. Dropping registation.", e);
registration_map2.borrow_mut().registrations.remove(&id);
}
}));
Err(e) => {
println!("Got error: {:?}. Dropping registation.", e);
registration_map2.borrow_mut().registrations.remove(&id);
}
}));
}
}
}
pub fn handle_update(self: Rc<Self>, veilid_update: veilid_core::VeilidUpdate) {
// serialize update
let veilid_update = serialize_json(veilid_update);
// Pass other updates to clients
self.send_request_to_all_clients(|_id, registration| {
let mut request = registration.client.update_request();
let rpc_veilid_update = request.get().init_veilid_update();
convert_update(&veilid_update, rpc_veilid_update);
request.send()
});
}
pub fn handle_client_log(self: Rc<Self>, message: String) {
self.send_request_to_all_clients(|_id, registration| {
let mut request = registration.client.log_message_request();
request.get().set_message(&message);
request.send()
match veilid_update
.len()
.try_into()
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))
{
Ok(len) => {
let mut request = registration.client.update_request();
let mut rpc_veilid_update = request.get().init_veilid_update(len);
rpc_veilid_update.push_str(&veilid_update);
Some(request.send())
}
Err(_) => None,
}
});
}

View File

@@ -24,6 +24,9 @@ pub mod veilid_client_capnp {
}
fn main() -> Result<(), String> {
#[cfg(windows)]
let _ = ansi_term::enable_ansi_support();
let (settings, matches) = cmdline::process_command_line()?;
// --- Dump Config ---
@@ -55,7 +58,7 @@ fn main() -> Result<(), String> {
// Handle non-normal server modes
if !matches!(server_mode, ServerMode::Normal) {
// Init combined console/file logger
let logs = VeilidLogs::setup_normal_logs(settings.clone())?;
let logs = VeilidLogs::setup(settings.clone())?;
// run the server to set the node id and quit
return task::block_on(async { run_veilid_server(settings, logs, server_mode).await })
.map(|v| {
@@ -80,7 +83,7 @@ fn main() -> Result<(), String> {
}
// Init combined console/file logger
let logs = VeilidLogs::setup_normal_logs(settings.clone())?;
let logs = VeilidLogs::setup(settings.clone())?;
// --- Normal Startup ---
ctrlc::set_handler(move || {

View File

@@ -3,10 +3,10 @@ use crate::settings::*;
use crate::veilid_logs::*;
use flume::{bounded, Receiver, Sender};
use lazy_static::*;
use log::*;
use parking_lot::Mutex;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::*;
use veilid_core::xx::SingleShotEventual;
#[derive(Copy, Clone, PartialEq, Eq)]
@@ -91,41 +91,6 @@ pub async fn run_veilid_server_internal(
}
}
});
// Handle log messages on main thread for capnproto rpc
let client_log_receiver_jh = capi.clone().and_then(|capi| {
logs.client_log_channel
.clone()
.map(|mut client_log_channel| {
async_std::task::spawn_local(async move {
// Batch messages to either 16384 chars at once or every second to minimize packets
let rate = Duration::from_secs(1);
let mut start = Instant::now();
let mut messages = String::new();
loop {
let timeout_dur =
rate.checked_sub(start.elapsed()).unwrap_or(Duration::ZERO);
match async_std::future::timeout(timeout_dur, client_log_channel.recv())
.await
{
Ok(Ok(message)) => {
messages += &message;
if messages.len() > 16384 {
capi.clone()
.handle_client_log(core::mem::take(&mut messages));
start = Instant::now();
}
}
Ok(Err(_)) => break,
Err(_) => {
capi.clone()
.handle_client_log(core::mem::take(&mut messages));
start = Instant::now();
}
}
}
})
})
});
// Auto-attach if desired
let mut out = Ok(());
@@ -193,18 +158,8 @@ pub async fn run_veilid_server_internal(
// Shut down Veilid API to release state change sender
veilid_api.shutdown().await;
// Close the client api log channel if it is open to release client log sender
if let Some(client_log_channel_closer) = logs.client_log_channel_closer {
client_log_channel_closer.close();
}
// Wait for update receiver to exit
update_receiver_jh.await;
// Wait for client api log receiver to exit
if let Some(client_log_receiver_jh) = client_log_receiver_jh {
client_log_receiver_jh.await;
}
out
}

View File

@@ -1,7 +1,6 @@
#![allow(clippy::bool_assert_comparison)]
use directories::*;
use log::*;
use parking_lot::*;
use serde_derive::*;
@@ -34,7 +33,7 @@ logging:
path: ''
append: true
level: 'info'
client:
api:
enabled: false
level: 'info'
testing:
@@ -226,13 +225,13 @@ impl serde::Serialize for LogLevel {
}
}
pub fn convert_loglevel(log_level: LogLevel) -> LevelFilter {
pub fn convert_loglevel(log_level: LogLevel) -> veilid_core::VeilidLogLevel {
match log_level {
LogLevel::Error => LevelFilter::Error,
LogLevel::Warn => LevelFilter::Warn,
LogLevel::Info => LevelFilter::Info,
LogLevel::Debug => LevelFilter::Debug,
LogLevel::Trace => LevelFilter::Trace,
LogLevel::Error => veilid_core::VeilidLogLevel::Error,
LogLevel::Warn => veilid_core::VeilidLogLevel::Warn,
LogLevel::Info => veilid_core::VeilidLogLevel::Info,
LogLevel::Debug => veilid_core::VeilidLogLevel::Debug,
LogLevel::Trace => veilid_core::VeilidLogLevel::Trace,
}
}
@@ -419,7 +418,7 @@ pub struct System {
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Client {
pub struct Api {
pub enabled: bool,
pub level: LogLevel,
}
@@ -435,7 +434,7 @@ pub struct Logging {
pub system: System,
pub terminal: Terminal,
pub file: File,
pub client: Client,
pub api: Api,
}
#[derive(Debug, Deserialize, Serialize)]
@@ -862,7 +861,13 @@ impl Settings {
} else {
format!("subnode{}", inner.testing.subnode_index)
})),
"log_level" => Ok(Box::new(veilid_core::VeilidConfigLogLevel::Off)),
"api_log_level" => Ok(Box::new(if inner.logging.api.enabled {
veilid_core::VeilidConfigLogLevel::from_veilid_log_level(Some(
convert_loglevel(inner.logging.api.level),
))
} else {
veilid_core::VeilidConfigLogLevel::Off
})),
"capabilities.protocol_udp" => Ok(Box::new(true)),
"capabilities.protocol_connect_tcp" => Ok(Box::new(true)),
"capabilities.protocol_accept_tcp" => Ok(Box::new(true)),
@@ -1257,8 +1262,8 @@ mod tests {
assert_eq!(s.logging.file.path, "");
assert_eq!(s.logging.file.append, true);
assert_eq!(s.logging.file.level, LogLevel::Info);
assert_eq!(s.logging.client.enabled, false);
assert_eq!(s.logging.client.level, LogLevel::Info);
assert_eq!(s.logging.api.enabled, false);
assert_eq!(s.logging.api.level, LogLevel::Info);
assert_eq!(s.testing.subnode_index, 0);
assert_eq!(

View File

@@ -93,7 +93,7 @@ pub fn run_daemon(settings: Settings, _matches: ArgMatches) -> Result<(), String
};
// Init combined console/file logger
let logs = VeilidLogs::setup_normal_logs(settings.clone())?;
let logs = VeilidLogs::setup(settings.clone())?;
// Daemonize
daemon

View File

@@ -1,162 +1,127 @@
use crate::log_safe_channel::*;
use crate::settings::*;
use cfg_if::*;
use log::*;
use simplelog::*;
use std::fs::OpenOptions;
use std::path::Path;
use std::path::*;
use tracing::*;
use tracing_appender::*;
use tracing_subscriber::prelude::*;
use tracing_subscriber::*;
pub struct VeilidLogs {
pub client_log_channel: Option<LogSafeChannel>,
pub client_log_channel_closer: Option<LogSafeChannelCloser>,
pub guard: Option<non_blocking::WorkerGuard>,
}
cfg_if! {
if #[cfg(target_os = "linux")] {
use systemd_journal_logger::JournalLog;
pub struct SystemLogger {
level_filter: LevelFilter,
config: Config,
journal_log: JournalLog<String,String>,
}
impl SystemLogger {
pub fn new(level_filter: LevelFilter, config: Config) -> Box<Self> {
Box::new(Self {
level_filter,
config,
journal_log: JournalLog::with_extra_fields(Vec::new())
})
}
pub fn should_skip(record: &Record<'_>) -> bool {
// // If a module path and allowed list are available
// match (record.target(), &*config.filter_allow) {
// (path, allowed) if !allowed.is_empty() => {
// // Check that the module path matches at least one allow filter
// if !allowed.iter().any(|v| path.starts_with(&**v)) {
// // If not, skip any further writing
// return true;
// }
// }
// _ => {}
// }
// If a module path and ignore list are available
match (record.target(), &veilid_core::DEFAULT_LOG_IGNORE_LIST) {
(path, ignore) if !ignore.is_empty() => {
// Check that the module path does not match any ignore filters
if ignore.iter().any(|v| path.starts_with(&**v)) {
// If not, skip any further writing
return true;
}
}
_ => {}
}
false
}
}
impl Log for SystemLogger {
fn enabled(&self, metadata: &Metadata<'_>) -> bool {
metadata.level() <= self.level_filter
}
fn log(&self, record: &Record<'_>) {
if self.enabled(record.metadata()) && ! Self::should_skip(record) {
self.journal_log.log(record);
}
}
fn flush(&self) {
self.journal_log.flush();
}
}
impl SharedLogger for SystemLogger {
fn level(&self) -> LevelFilter {
self.level_filter
}
fn config(&self) -> Option<&Config> {
Some(&self.config)
}
fn as_log(self: Box<Self>) -> Box<dyn Log> {
Box::new(*self)
}
fn logfilter<T: AsRef<str>, V: AsRef<[T]>>(
metadata: &Metadata,
max_level: veilid_core::VeilidLogLevel,
ignore_list: V,
) -> bool {
// Skip filtered targets
!match (metadata.target(), ignore_list.as_ref()) {
(path, ignore) if !ignore.is_empty() => {
// Check that the module path does not match any ignore filters
ignore.iter().any(|v| path.starts_with(v.as_ref()))
}
_ => false,
}
}
impl VeilidLogs {
pub fn setup_normal_logs(settings: Settings) -> Result<VeilidLogs, String> {
pub fn setup(settings: Settings) -> Result<VeilidLogs, String> {
let settingsr = settings.read();
// Set up loggers
let mut logs: Vec<Box<dyn SharedLogger>> = Vec::new();
let mut client_log_channel: Option<LogSafeChannel> = None;
let mut client_log_channel_closer: Option<LogSafeChannelCloser> = None;
let mut cb = ConfigBuilder::new();
// Set up subscriber and layers
let mut ignore_list = Vec::<String>::new();
for ig in veilid_core::DEFAULT_LOG_IGNORE_LIST {
cb.add_filter_ignore_str(ig);
ignore_list.push(ig.to_owned());
}
if settingsr.logging.terminal.enabled {
logs.push(TermLogger::new(
convert_loglevel(settingsr.logging.terminal.level),
cb.build(),
TerminalMode::Mixed,
ColorChoice::Auto,
))
}
if settingsr.logging.file.enabled {
let subscriber = Registry::default();
let subscriber = subscriber.with(
EnvFilter::builder()
.with_default_directive(level_filters::LevelFilter::INFO.into())
.from_env_lossy(),
);
let subscriber = subscriber.with(if settingsr.logging.terminal.enabled {
let terminal_max_log_level = convert_loglevel(settingsr.logging.terminal.level);
Some(
fmt::Layer::new()
.compact()
.with_writer(std::io::stdout)
.with_filter(filter::FilterFn::new(move |metadata| {
logfilter(metadata, terminal_max_log_level, &ignore_list)
})),
)
} else {
None
});
let mut guard = None;
let subscriber = subscriber.with(if settingsr.logging.file.enabled {
let file_max_log_level = convert_loglevel(settingsr.logging.file.level);
let log_path = Path::new(&settingsr.logging.file.path);
let full_path = std::env::current_dir()
.unwrap_or(PathBuf::from(MAIN_SEPARATOR.to_string()))
.join(log_path);
let log_parent = full_path
.parent()
.unwrap_or(Path::new(&MAIN_SEPARATOR.to_string()))
.canonicalize()
.map_err(|e| {
format!(
"File log path parent does not exist: {} ({})",
settingsr.logging.file.path, e
)
})?;
let log_filename = full_path.file_name().ok_or(format!(
"File log filename not specified in path: {}",
settingsr.logging.file.path
))?;
let logfile = if settingsr.logging.file.append {
OpenOptions::new()
.create(true)
.append(true)
.open(log_path)
.map_err(|e| format!("failed to open log file: {}", e))?
} else {
OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(log_path)
.map_err(|e| format!("failed to open log file: {}", e))?
};
logs.push(WriteLogger::new(
convert_loglevel(settingsr.logging.file.level),
cb.build(),
logfile,
))
}
if settingsr.logging.client.enabled {
let (clog, clogwriter, clogcloser) = LogSafeChannel::new();
client_log_channel = Some(clog);
client_log_channel_closer = Some(clogcloser);
logs.push(WriteLogger::new(
convert_loglevel(settingsr.logging.client.level),
cb.build(),
clogwriter,
))
}
let appender = tracing_appender::rolling::never(log_parent, Path::new(log_filename));
let (non_blocking_appender, non_blocking_guard) =
tracing_appender::non_blocking(appender);
guard = Some(non_blocking_guard);
Some(
fmt::Layer::new()
.compact()
.with_writer(non_blocking_appender)
.with_filter(filter::FilterFn::new(|metadata| {
logfilter(metadata, file_max_log_level, &ignore_list)
})),
)
} else {
None
});
let subscriber = subscriber.with(if settingsr.logging.api.enabled {
// Get layer from veilid core, filtering is done by ApiTracingLayer automatically
Some(veilid_core::ApiTracingLayer::get())
} else {
None
});
cfg_if! {
if #[cfg(target_os = "linux")] {
if settingsr.logging.system.enabled {
logs.push(SystemLogger::new(convert_loglevel(settingsr.logging.system.level), cb.build()))
}
let subscriber = subscriber.with(if settingsr.logging.system.enabled {
let system_max_log_level = convert_loglevel(settingsr.logging.system.level);
Some(tracing_journald::layer().map_err(|e| format!("failed to set up journald logging: {}", e))?.with_filter(filter::FilterFn::new(|metadata| {
logfilter(metadata, system_max_log_level, &ignore_list)
})))
} else {
None
});
}
}
CombinedLogger::init(logs).map_err(|e| format!("failed to init logs: {}", e))?;
subscriber
.try_init()
.map_err(|e| format!("failed to initialize logging: {}", e))?;
Ok(VeilidLogs {
client_log_channel,
client_log_channel_closer,
})
Ok(VeilidLogs { guard })
}
}