refactor for tracing and api logging

This commit is contained in:
John Smith 2022-06-08 09:33:41 -04:00
parent 1d8c63786a
commit bfe0315af1
9 changed files with 139 additions and 238 deletions

View File

@ -8,6 +8,7 @@ use std::cell::RefCell;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::rc::Rc; use std::rc::Rc;
use veilid_core::xx::*; use veilid_core::xx::*;
use veilid_core::*;
macro_rules! capnp_failed { macro_rules! capnp_failed {
($ex:expr) => {{ ($ex:expr) => {{
@ -17,6 +18,17 @@ macro_rules! capnp_failed {
}}; }};
} }
macro_rules! pry_result {
($ex:expr) => {
match $ex {
Ok(v) => v,
Err(e) => {
return capnp_failed!(e);
}
}
};
}
struct VeilidClientImpl { struct VeilidClientImpl {
comproc: CommandProcessor, comproc: CommandProcessor,
} }
@ -34,57 +46,23 @@ impl veilid_client::Server for VeilidClientImpl {
_results: veilid_client::UpdateResults, _results: veilid_client::UpdateResults,
) -> Promise<(), ::capnp::Error> { ) -> Promise<(), ::capnp::Error> {
let veilid_update = pry!(pry!(params.get()).get_veilid_update()); let veilid_update = pry!(pry!(params.get()).get_veilid_update());
let veilid_update: VeilidUpdate = pry_result!(deserialize_json(veilid_update));
let which = match veilid_update.which() { match veilid_update {
Ok(v) => v, VeilidUpdate::Log(log) => {
Err(e) => { self.comproc.update_log(log);
return capnp_failed!(format!("(missing update kind in schema: {:?})", e));
} }
}; VeilidUpdate::Attachment(attachment) => {
match which { self.comproc.update_attachment(attachment);
veilid_update::Attachment(Ok(attachment)) => {
let state = pry!(attachment.get_state());
trace!("Attachment: {}", state as u16);
self.comproc.update_attachment(state);
} }
veilid_update::Attachment(Err(e)) => { VeilidUpdate::Network(network) => {
return capnp_failed!(format!("Update Attachment Error: {}", e)); self.comproc.update_network_status(network);
}
veilid_update::Network(Ok(network)) => {
let started = network.get_started();
let bps_down = network.get_bps_down();
let bps_up = network.get_bps_up();
trace!(
"Network: started: {} bps_down: {} bps_up: {}",
started,
bps_down,
bps_up
);
self.comproc
.update_network_status(started, bps_down, bps_up);
}
veilid_update::Network(Err(e)) => {
return capnp_failed!(format!("Update Network Error: {}", e));
}
veilid_update::Shutdown(()) => {
return capnp_failed!("Should not get Shutdown here".to_owned());
} }
VeilidUpdate::Shutdown => self.comproc.update_shutdown(),
} }
Promise::ok(()) Promise::ok(())
} }
fn log_message(
&mut self,
params: veilid_client::LogMessageParams,
_results: veilid_client::LogMessageResults,
) -> Promise<(), ::capnp::Error> {
let message = pry!(pry!(params.get()).get_message());
self.comproc.add_log_message(message);
Promise::ok(())
}
} }
struct ClientApiConnectionInner { struct ClientApiConnectionInner {
@ -116,53 +94,20 @@ impl ClientApiConnection {
} }
async fn process_veilid_state<'a>( async fn process_veilid_state<'a>(
&'a mut self, &'a mut self,
veilid_state: veilid_state::Reader<'a>, veilid_state: VeilidState,
) -> Result<(), String> { ) -> Result<(), String> {
let mut inner = self.inner.borrow_mut(); let mut inner = self.inner.borrow_mut();
inner.comproc.update_attachment(veilid_state.attachment);
// Process attachment state inner.comproc.update_network_status(veilid_state.network);
let attachment = veilid_state
.reborrow()
.get_attachment()
.map_err(map_to_string)?;
let attachment_state = attachment.get_state().map_err(map_to_string)?;
let network = veilid_state
.reborrow()
.get_network()
.map_err(map_to_string)?;
let started = network.get_started();
let bps_down = network.get_bps_down();
let bps_up = network.get_bps_up();
inner.comproc.update_attachment(attachment_state);
inner
.comproc
.update_network_status(started, bps_down, bps_up);
Ok(()) Ok(())
} }
async fn handle_connection(&mut self) -> Result<(), String> { async fn spawn_rpc_system(
trace!("ClientApiConnection::handle_connection"); &mut self,
let connect_addr = self.inner.borrow().connect_addr.unwrap(); connect_addr: SocketAddr,
// Connect the TCP socket mut rpc_system: RpcSystem<rpc_twoparty_capnp::Side>,
let stream = async_std::net::TcpStream::connect(connect_addr) ) -> Result<(), String> {
.await
.map_err(map_to_string)?;
// If it succeed, disable nagle algorithm
stream.set_nodelay(true).map_err(map_to_string)?;
// Create the VAT network
let (reader, writer) = stream.split();
let rpc_network = Box::new(twoparty::VatNetwork::new(
reader,
writer,
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
// Create the rpc system
let mut rpc_system = RpcSystem::new(rpc_network, None);
let mut request; let mut request;
{ {
let mut inner = self.inner.borrow_mut(); let mut inner = self.inner.borrow_mut();
@ -195,29 +140,72 @@ impl ClientApiConnection {
)); ));
} }
// Process the rpc system until we decide we're done let rpc_jh = AsyncStd
if let Ok(rpc_jh) = AsyncStd.spawn_handle_local(rpc_system) { .spawn_handle_local(rpc_system)
// Send the request and get the state object and the registration object .map_err(|e| format!("failed to spawn rpc system: {}", e))?;
if let Ok(response) = request.send().promise.await {
if let Ok(response) = response.get() {
if let Ok(_registration) = response.get_registration() {
if let Ok(state) = response.get_state() {
// Set up our state for the first time
if self.process_veilid_state(state).await.is_ok() {
// Don't drop the registration, doing so will remove the client
// object mapping from the server which we need for the update backchannel
// Wait until rpc system completion or disconnect was requested // Send the request and get the state object and the registration object
if let Err(e) = rpc_jh.await { let response = request
error!("Client RPC system error: {}", e); .send()
} .promise
} .await
} .map_err(|e| format!("failed to send register request: {}", e))?;
} let response = response
} .get()
.map_err(|e| format!("failed to get register response: {}", e))?;
// Get the registration object, which drops our connection when it is dropped
let _registration = response
.get_registration()
.map_err(|e| format!("failed to get registration object: {}", e))?;
// Get the initial veilid state
let veilid_state = response
.get_state()
.map_err(|e| format!("failed to get initial veilid state: {}", e))?;
// Set up our state for the first time
let veilid_state: VeilidState = deserialize_json(veilid_state)
.map_err(|e| format!("failed to get deserialize veilid state: {}", e))?;
self.process_veilid_state(veilid_state).await?;
// Don't drop the registration, doing so will remove the client
// object mapping from the server which we need for the update backchannel
// Wait until rpc system completion or disconnect was requested
rpc_jh
.await
.map_err(|e| format!("client RPC system error: {}", e))
}
async fn handle_connection(&mut self) -> Result<(), String> {
trace!("ClientApiConnection::handle_connection");
let connect_addr = self.inner.borrow().connect_addr.unwrap();
// Connect the TCP socket
let stream = async_std::net::TcpStream::connect(connect_addr)
.await
.map_err(map_to_string)?;
// If it succeed, disable nagle algorithm
stream.set_nodelay(true).map_err(map_to_string)?;
// Create the VAT network
let (reader, writer) = stream.split();
let rpc_network = Box::new(twoparty::VatNetwork::new(
reader,
writer,
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
// Create the rpc system
let rpc_system = RpcSystem::new(rpc_network, None);
// Process the rpc system until we decide we're done
match self.spawn_rpc_system(connect_addr, rpc_system).await {
Ok(()) => {}
Err(e) => {
error!("Failed to spawn client RPC system: {}", e);
} }
} else {
error!("Failed to spawn client RPC system");
} }
// Drop the server and disconnector too (if we still have it) // Drop the server and disconnector too (if we still have it)

View File

@ -1,7 +1,6 @@
use crate::client_api_connection::*; use crate::client_api_connection::*;
use crate::settings::Settings; use crate::settings::Settings;
use crate::ui::*; use crate::ui::*;
use crate::veilid_client_capnp::*;
use async_std::prelude::FutureExt; use async_std::prelude::FutureExt;
use log::*; use log::*;
use std::cell::*; use std::cell::*;
@ -273,18 +272,23 @@ debug - send a debugging command to the Veilid server
// called by client_api_connection // called by client_api_connection
// calls into ui // calls into ui
//////////////////////////////////////////// ////////////////////////////////////////////
pub fn update_attachment(&mut self, state: AttachmentState) { pub fn update_attachment(&mut self, attachment: veilid_core::VeilidStateAttachment) {
self.inner_mut().ui.set_attachment_state(state); self.inner_mut().ui.set_attachment_state(attachment.state);
} }
pub fn update_network_status(&mut self, started: bool, bps_down: u64, bps_up: u64) { pub fn update_network_status(&mut self, network: veilid_core::VeilidStateNetwork) {
self.inner_mut() self.inner_mut()
.ui .ui
.set_network_status(started, bps_down, bps_up); .set_network_status(network.started, network.bps_down, network.bps_up);
} }
pub fn add_log_message(&mut self, message: &str) { pub fn update_log(&mut self, log: veilid_core::VeilidStateLog) {
self.inner().ui.add_node_event(message); let message = format!("{}: {}", log.log_level, log.message);
self.inner().ui.add_node_event(&message);
}
pub fn update_shutdown(&mut self) {
// Do nothing with this, we'll process shutdown when rpc connection closes
} }
// called by client_api_connection // called by client_api_connection

View File

@ -1,6 +1,5 @@
use crate::command_processor::*; use crate::command_processor::*;
use crate::settings::Settings; use crate::settings::Settings;
use crate::veilid_client_capnp::*;
use crossbeam_channel::Sender; use crossbeam_channel::Sender;
use cursive::align::*; use cursive::align::*;
use cursive::event::*; use cursive::event::*;
@ -15,7 +14,7 @@ use log::*;
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::rc::Rc; use std::rc::Rc;
// use thiserror::Error; use veilid_core::*;
////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////
/// ///
@ -280,9 +279,8 @@ impl UI {
.button("Close", move |s| { .button("Close", move |s| {
s.pop_layer(); s.pop_layer();
close_cb(s); close_cb(s);
}) }), //.wrap_with(CircularFocus::new)
//.wrap_with(CircularFocus::new) //.wrap_tab(),
//.wrap_tab(),
); );
s.set_global_callback(cursive::event::Event::Key(Key::Esc), move |s| { s.set_global_callback(cursive::event::Event::Key(Key::Esc), move |s| {
s.set_global_callback(cursive::event::Event::Key(Key::Esc), UI::quit_handler); s.set_global_callback(cursive::event::Event::Key(Key::Esc), UI::quit_handler);

View File

@ -179,6 +179,19 @@ impl VeilidLogLevel {
} }
} }
impl fmt::Display for VeilidLogLevel {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
let text = match self {
Self::Error => "ERROR",
Self::Warn => "WARN",
Self::Info => "INFO",
Self::Debug => "DEBUG",
Self::Trace => "TRACE",
};
write!(f, "{}", text)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VeilidStateLog { pub struct VeilidStateLog {
pub log_level: VeilidLogLevel, pub log_level: VeilidLogLevel,

View File

@ -1,104 +0,0 @@
use parking_lot::Mutex;
use std::sync::Arc;
// Must use async_std channel to send to main thread from blocking thread
use async_std::channel::bounded as async_bounded;
use async_std::channel::Receiver as AsyncReceiver;
pub use async_std::channel::RecvError;
// Must use std mpsc so no logs are generated by async code
use std::sync::mpsc::sync_channel as std_sync_channel;
use std::sync::mpsc::SyncSender as StdSender;
use std::sync::mpsc::TrySendError as StdTrySendError;
//////////////////////////////////////////
#[derive(Clone)]
pub struct LogSafeChannelCloser {
sender: Arc<Mutex<Option<StdSender<String>>>>,
}
impl LogSafeChannelCloser {
pub fn close(&self) {
// Drop the sender
self.sender.lock().take();
}
}
//////////////////////////////////////////
pub struct LogSafeChannelWriterShim {
sender: Arc<Mutex<Option<StdSender<String>>>>,
}
impl std::io::Write for LogSafeChannelWriterShim {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let bufstr = String::from_utf8_lossy(buf).to_string();
let sender = self.sender.lock();
if let Some(sender) = &*sender {
if let Err(e) = sender.try_send(bufstr) {
match e {
StdTrySendError::Full(_) => {
Err(std::io::Error::from(std::io::ErrorKind::WouldBlock))
}
StdTrySendError::Disconnected(_) => {
Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe))
}
}
} else {
Ok(buf.len())
}
} else {
Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe))
}
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
pub type LogSafeChannelWriter = std::io::LineWriter<LogSafeChannelWriterShim>;
//////////////////////////////////////////
#[derive(Clone)]
pub struct LogSafeChannel {
async_receiver: AsyncReceiver<String>,
}
impl LogSafeChannel {
pub fn new() -> (Self, LogSafeChannelWriter, LogSafeChannelCloser) {
let (async_sender, async_receiver) = async_bounded(1024);
let (std_sender, std_receiver) = std_sync_channel(1024);
let shared_std_sender = Arc::new(Mutex::new(Some(std_sender)));
// Spawn a processing thread for the blocking std sender
async_std::task::spawn(async move {
#[allow(clippy::while_let_loop)]
loop {
let message = match std_receiver.recv() {
Ok(v) => v,
Err(_) => break,
};
if async_sender.send(message).await.is_err() {
break;
}
}
});
(
Self { async_receiver },
LogSafeChannelWriter::with_capacity(
65536,
LogSafeChannelWriterShim {
sender: shared_std_sender.clone(),
},
),
LogSafeChannelCloser {
sender: shared_std_sender,
},
)
}
pub async fn recv(&mut self) -> Result<String, RecvError> {
self.async_receiver.recv().await
}
}

View File

@ -4,7 +4,6 @@
mod client_api; mod client_api;
mod cmdline; mod cmdline;
mod log_safe_channel;
mod server; mod server;
mod settings; mod settings;
#[cfg(unix)] #[cfg(unix)]
@ -58,9 +57,9 @@ fn main() -> Result<(), String> {
// Handle non-normal server modes // Handle non-normal server modes
if !matches!(server_mode, ServerMode::Normal) { if !matches!(server_mode, ServerMode::Normal) {
// Init combined console/file logger // Init combined console/file logger
let logs = VeilidLogs::setup(settings.clone())?; let _logs = VeilidLogs::setup(settings.clone())?;
// run the server to set the node id and quit // run the server to set the node id and quit
return task::block_on(async { run_veilid_server(settings, logs, server_mode).await }) return task::block_on(async { run_veilid_server(settings, server_mode).await })
.map(|v| { .map(|v| {
println!("{}", success); println!("{}", success);
v v
@ -83,7 +82,7 @@ fn main() -> Result<(), String> {
} }
// Init combined console/file logger // Init combined console/file logger
let logs = VeilidLogs::setup(settings.clone())?; let _logs = VeilidLogs::setup(settings.clone())?;
// --- Normal Startup --- // --- Normal Startup ---
ctrlc::set_handler(move || { ctrlc::set_handler(move || {
@ -92,5 +91,5 @@ fn main() -> Result<(), String> {
.expect("Error setting Ctrl-C handler"); .expect("Error setting Ctrl-C handler");
// Run the server loop // Run the server loop
task::block_on(async { run_veilid_server(settings, logs, server_mode).await }) task::block_on(async { run_veilid_server(settings, server_mode).await })
} }

View File

@ -1,6 +1,5 @@
use crate::client_api; use crate::client_api;
use crate::settings::*; use crate::settings::*;
use crate::veilid_logs::*;
use flume::{bounded, Receiver, Sender}; use flume::{bounded, Receiver, Sender};
use lazy_static::*; use lazy_static::*;
use parking_lot::Mutex; use parking_lot::Mutex;
@ -28,12 +27,8 @@ pub fn shutdown() {
} }
} }
pub async fn run_veilid_server( pub async fn run_veilid_server(settings: Settings, server_mode: ServerMode) -> Result<(), String> {
settings: Settings, run_veilid_server_internal(settings, server_mode)
logs: VeilidLogs,
server_mode: ServerMode,
) -> Result<(), String> {
run_veilid_server_internal(settings, logs, server_mode)
.await .await
.map_err(|e| { .map_err(|e| {
error!("{}", e); error!("{}", e);
@ -42,7 +37,6 @@ pub async fn run_veilid_server(
} }
pub async fn run_veilid_server_internal( pub async fn run_veilid_server_internal(
settings: Settings, settings: Settings,
logs: VeilidLogs,
server_mode: ServerMode, server_mode: ServerMode,
) -> Result<(), String> { ) -> Result<(), String> {
let settingsr = settings.read(); let settingsr = settings.read();

View File

@ -93,7 +93,7 @@ pub fn run_daemon(settings: Settings, _matches: ArgMatches) -> Result<(), String
}; };
// Init combined console/file logger // Init combined console/file logger
let logs = VeilidLogs::setup(settings.clone())?; let _logs = VeilidLogs::setup(settings.clone())?;
// Daemonize // Daemonize
daemon daemon
@ -109,7 +109,7 @@ pub fn run_daemon(settings: Settings, _matches: ArgMatches) -> Result<(), String
let signals_task = async_std::task::spawn(handle_signals(signals)); let signals_task = async_std::task::spawn(handle_signals(signals));
let res = run_veilid_server(settings, logs, ServerMode::Normal).await; let res = run_veilid_server(settings, ServerMode::Normal).await;
// Terminate the signal stream. // Terminate the signal stream.
handle.close(); handle.close();

View File

@ -15,6 +15,12 @@ fn logfilter<T: AsRef<str>, V: AsRef<[T]>>(
max_level: veilid_core::VeilidLogLevel, max_level: veilid_core::VeilidLogLevel,
ignore_list: V, ignore_list: V,
) -> bool { ) -> bool {
// Skip things out of level
let log_level = veilid_core::VeilidLogLevel::from_tracing_level(*metadata.level());
if log_level <= max_level {
return true;
}
// Skip filtered targets // Skip filtered targets
!match (metadata.target(), ignore_list.as_ref()) { !match (metadata.target(), ignore_list.as_ref()) {
(path, ignore) if !ignore.is_empty() => { (path, ignore) if !ignore.is_empty() => {
@ -46,6 +52,7 @@ impl VeilidLogs {
let subscriber = subscriber.with(if settingsr.logging.terminal.enabled { let subscriber = subscriber.with(if settingsr.logging.terminal.enabled {
let terminal_max_log_level = convert_loglevel(settingsr.logging.terminal.level); let terminal_max_log_level = convert_loglevel(settingsr.logging.terminal.level);
let ignore_list = ignore_list.clone();
Some( Some(
fmt::Layer::new() fmt::Layer::new()
.compact() .compact()
@ -86,11 +93,12 @@ impl VeilidLogs {
tracing_appender::non_blocking(appender); tracing_appender::non_blocking(appender);
guard = Some(non_blocking_guard); guard = Some(non_blocking_guard);
let ignore_list = ignore_list.clone();
Some( Some(
fmt::Layer::new() fmt::Layer::new()
.compact() .compact()
.with_writer(non_blocking_appender) .with_writer(non_blocking_appender)
.with_filter(filter::FilterFn::new(|metadata| { .with_filter(filter::FilterFn::new(move |metadata| {
logfilter(metadata, file_max_log_level, &ignore_list) logfilter(metadata, file_max_log_level, &ignore_list)
})), })),
) )
@ -108,8 +116,9 @@ impl VeilidLogs {
cfg_if! { cfg_if! {
if #[cfg(target_os = "linux")] { if #[cfg(target_os = "linux")] {
let subscriber = subscriber.with(if settingsr.logging.system.enabled { let subscriber = subscriber.with(if settingsr.logging.system.enabled {
let ignore_list = ignore_list.clone();
let system_max_log_level = convert_loglevel(settingsr.logging.system.level); let system_max_log_level = convert_loglevel(settingsr.logging.system.level);
Some(tracing_journald::layer().map_err(|e| format!("failed to set up journald logging: {}", e))?.with_filter(filter::FilterFn::new(|metadata| { Some(tracing_journald::layer().map_err(|e| format!("failed to set up journald logging: {}", e))?.with_filter(filter::FilterFn::new(move |metadata| {
logfilter(metadata, system_max_log_level, &ignore_list) logfilter(metadata, system_max_log_level, &ignore_list)
}))) })))
} else { } else {