diff --git a/veilid-cli/src/client_api_connection.rs b/veilid-cli/src/client_api_connection.rs index 2241f6b3..20f3cce6 100644 --- a/veilid-cli/src/client_api_connection.rs +++ b/veilid-cli/src/client_api_connection.rs @@ -8,6 +8,7 @@ use std::cell::RefCell; use std::net::SocketAddr; use std::rc::Rc; use veilid_core::xx::*; +use veilid_core::*; macro_rules! capnp_failed { ($ex:expr) => {{ @@ -17,6 +18,17 @@ macro_rules! capnp_failed { }}; } +macro_rules! pry_result { + ($ex:expr) => { + match $ex { + Ok(v) => v, + Err(e) => { + return capnp_failed!(e); + } + } + }; +} + struct VeilidClientImpl { comproc: CommandProcessor, } @@ -34,57 +46,23 @@ impl veilid_client::Server for VeilidClientImpl { _results: veilid_client::UpdateResults, ) -> Promise<(), ::capnp::Error> { let veilid_update = pry!(pry!(params.get()).get_veilid_update()); + let veilid_update: VeilidUpdate = pry_result!(deserialize_json(veilid_update)); - let which = match veilid_update.which() { - Ok(v) => v, - Err(e) => { - return capnp_failed!(format!("(missing update kind in schema: {:?})", e)); + match veilid_update { + VeilidUpdate::Log(log) => { + self.comproc.update_log(log); } - }; - match which { - veilid_update::Attachment(Ok(attachment)) => { - let state = pry!(attachment.get_state()); - - trace!("Attachment: {}", state as u16); - self.comproc.update_attachment(state); + VeilidUpdate::Attachment(attachment) => { + self.comproc.update_attachment(attachment); } - veilid_update::Attachment(Err(e)) => { - return capnp_failed!(format!("Update Attachment Error: {}", e)); - } - veilid_update::Network(Ok(network)) => { - let started = network.get_started(); - let bps_down = network.get_bps_down(); - let bps_up = network.get_bps_up(); - - trace!( - "Network: started: {} bps_down: {} bps_up: {}", - started, - bps_down, - bps_up - ); - self.comproc - .update_network_status(started, bps_down, bps_up); - } - veilid_update::Network(Err(e)) => { - return capnp_failed!(format!("Update Network Error: {}", e)); - } - veilid_update::Shutdown(()) => { - return capnp_failed!("Should not get Shutdown here".to_owned()); + VeilidUpdate::Network(network) => { + self.comproc.update_network_status(network); } + VeilidUpdate::Shutdown => self.comproc.update_shutdown(), } Promise::ok(()) } - - fn log_message( - &mut self, - params: veilid_client::LogMessageParams, - _results: veilid_client::LogMessageResults, - ) -> Promise<(), ::capnp::Error> { - let message = pry!(pry!(params.get()).get_message()); - self.comproc.add_log_message(message); - Promise::ok(()) - } } struct ClientApiConnectionInner { @@ -116,53 +94,20 @@ impl ClientApiConnection { } async fn process_veilid_state<'a>( &'a mut self, - veilid_state: veilid_state::Reader<'a>, + veilid_state: VeilidState, ) -> Result<(), String> { let mut inner = self.inner.borrow_mut(); - - // Process attachment state - let attachment = veilid_state - .reborrow() - .get_attachment() - .map_err(map_to_string)?; - let attachment_state = attachment.get_state().map_err(map_to_string)?; - - let network = veilid_state - .reborrow() - .get_network() - .map_err(map_to_string)?; - let started = network.get_started(); - let bps_down = network.get_bps_down(); - let bps_up = network.get_bps_up(); - - inner.comproc.update_attachment(attachment_state); - inner - .comproc - .update_network_status(started, bps_down, bps_up); + inner.comproc.update_attachment(veilid_state.attachment); + inner.comproc.update_network_status(veilid_state.network); Ok(()) } - async fn handle_connection(&mut self) -> Result<(), String> { - trace!("ClientApiConnection::handle_connection"); - let connect_addr = self.inner.borrow().connect_addr.unwrap(); - // Connect the TCP socket - let stream = async_std::net::TcpStream::connect(connect_addr) - .await - .map_err(map_to_string)?; - // If it succeed, disable nagle algorithm - stream.set_nodelay(true).map_err(map_to_string)?; - - // Create the VAT network - let (reader, writer) = stream.split(); - let rpc_network = Box::new(twoparty::VatNetwork::new( - reader, - writer, - rpc_twoparty_capnp::Side::Client, - Default::default(), - )); - // Create the rpc system - let mut rpc_system = RpcSystem::new(rpc_network, None); + async fn spawn_rpc_system( + &mut self, + connect_addr: SocketAddr, + mut rpc_system: RpcSystem, + ) -> Result<(), String> { let mut request; { let mut inner = self.inner.borrow_mut(); @@ -195,29 +140,72 @@ impl ClientApiConnection { )); } - // Process the rpc system until we decide we're done - if let Ok(rpc_jh) = AsyncStd.spawn_handle_local(rpc_system) { - // Send the request and get the state object and the registration object - if let Ok(response) = request.send().promise.await { - if let Ok(response) = response.get() { - if let Ok(_registration) = response.get_registration() { - if let Ok(state) = response.get_state() { - // Set up our state for the first time - if self.process_veilid_state(state).await.is_ok() { - // Don't drop the registration, doing so will remove the client - // object mapping from the server which we need for the update backchannel + let rpc_jh = AsyncStd + .spawn_handle_local(rpc_system) + .map_err(|e| format!("failed to spawn rpc system: {}", e))?; - // Wait until rpc system completion or disconnect was requested - if let Err(e) = rpc_jh.await { - error!("Client RPC system error: {}", e); - } - } - } - } - } + // Send the request and get the state object and the registration object + let response = request + .send() + .promise + .await + .map_err(|e| format!("failed to send register request: {}", e))?; + let response = response + .get() + .map_err(|e| format!("failed to get register response: {}", e))?; + + // Get the registration object, which drops our connection when it is dropped + let _registration = response + .get_registration() + .map_err(|e| format!("failed to get registration object: {}", e))?; + + // Get the initial veilid state + let veilid_state = response + .get_state() + .map_err(|e| format!("failed to get initial veilid state: {}", e))?; + + // Set up our state for the first time + let veilid_state: VeilidState = deserialize_json(veilid_state) + .map_err(|e| format!("failed to get deserialize veilid state: {}", e))?; + self.process_veilid_state(veilid_state).await?; + + // Don't drop the registration, doing so will remove the client + // object mapping from the server which we need for the update backchannel + + // Wait until rpc system completion or disconnect was requested + rpc_jh + .await + .map_err(|e| format!("client RPC system error: {}", e)) + } + + async fn handle_connection(&mut self) -> Result<(), String> { + trace!("ClientApiConnection::handle_connection"); + let connect_addr = self.inner.borrow().connect_addr.unwrap(); + // Connect the TCP socket + let stream = async_std::net::TcpStream::connect(connect_addr) + .await + .map_err(map_to_string)?; + // If it succeed, disable nagle algorithm + stream.set_nodelay(true).map_err(map_to_string)?; + + // Create the VAT network + let (reader, writer) = stream.split(); + let rpc_network = Box::new(twoparty::VatNetwork::new( + reader, + writer, + rpc_twoparty_capnp::Side::Client, + Default::default(), + )); + + // Create the rpc system + let rpc_system = RpcSystem::new(rpc_network, None); + + // Process the rpc system until we decide we're done + match self.spawn_rpc_system(connect_addr, rpc_system).await { + Ok(()) => {} + Err(e) => { + error!("Failed to spawn client RPC system: {}", e); } - } else { - error!("Failed to spawn client RPC system"); } // Drop the server and disconnector too (if we still have it) diff --git a/veilid-cli/src/command_processor.rs b/veilid-cli/src/command_processor.rs index bfc51bc7..e05c9e11 100644 --- a/veilid-cli/src/command_processor.rs +++ b/veilid-cli/src/command_processor.rs @@ -1,7 +1,6 @@ use crate::client_api_connection::*; use crate::settings::Settings; use crate::ui::*; -use crate::veilid_client_capnp::*; use async_std::prelude::FutureExt; use log::*; use std::cell::*; @@ -273,18 +272,23 @@ debug - send a debugging command to the Veilid server // called by client_api_connection // calls into ui //////////////////////////////////////////// - pub fn update_attachment(&mut self, state: AttachmentState) { - self.inner_mut().ui.set_attachment_state(state); + pub fn update_attachment(&mut self, attachment: veilid_core::VeilidStateAttachment) { + self.inner_mut().ui.set_attachment_state(attachment.state); } - pub fn update_network_status(&mut self, started: bool, bps_down: u64, bps_up: u64) { + pub fn update_network_status(&mut self, network: veilid_core::VeilidStateNetwork) { self.inner_mut() .ui - .set_network_status(started, bps_down, bps_up); + .set_network_status(network.started, network.bps_down, network.bps_up); } - pub fn add_log_message(&mut self, message: &str) { - self.inner().ui.add_node_event(message); + pub fn update_log(&mut self, log: veilid_core::VeilidStateLog) { + let message = format!("{}: {}", log.log_level, log.message); + self.inner().ui.add_node_event(&message); + } + + pub fn update_shutdown(&mut self) { + // Do nothing with this, we'll process shutdown when rpc connection closes } // called by client_api_connection diff --git a/veilid-cli/src/ui.rs b/veilid-cli/src/ui.rs index 5227fa0f..14461a6c 100644 --- a/veilid-cli/src/ui.rs +++ b/veilid-cli/src/ui.rs @@ -1,6 +1,5 @@ use crate::command_processor::*; use crate::settings::Settings; -use crate::veilid_client_capnp::*; use crossbeam_channel::Sender; use cursive::align::*; use cursive::event::*; @@ -15,7 +14,7 @@ use log::*; use std::cell::RefCell; use std::collections::{HashMap, VecDeque}; use std::rc::Rc; -// use thiserror::Error; +use veilid_core::*; ////////////////////////////////////////////////////////////// /// @@ -280,9 +279,8 @@ impl UI { .button("Close", move |s| { s.pop_layer(); close_cb(s); - }) - //.wrap_with(CircularFocus::new) - //.wrap_tab(), + }), //.wrap_with(CircularFocus::new) + //.wrap_tab(), ); s.set_global_callback(cursive::event::Event::Key(Key::Esc), move |s| { s.set_global_callback(cursive::event::Event::Key(Key::Esc), UI::quit_handler); diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index eaf30aed..5591b5db 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -179,6 +179,19 @@ impl VeilidLogLevel { } } +impl fmt::Display for VeilidLogLevel { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + let text = match self { + Self::Error => "ERROR", + Self::Warn => "WARN", + Self::Info => "INFO", + Self::Debug => "DEBUG", + Self::Trace => "TRACE", + }; + write!(f, "{}", text) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct VeilidStateLog { pub log_level: VeilidLogLevel, diff --git a/veilid-server/src/log_safe_channel.rs b/veilid-server/src/log_safe_channel.rs deleted file mode 100644 index a217bd47..00000000 --- a/veilid-server/src/log_safe_channel.rs +++ /dev/null @@ -1,104 +0,0 @@ -use parking_lot::Mutex; -use std::sync::Arc; - -// Must use async_std channel to send to main thread from blocking thread -use async_std::channel::bounded as async_bounded; -use async_std::channel::Receiver as AsyncReceiver; -pub use async_std::channel::RecvError; - -// Must use std mpsc so no logs are generated by async code -use std::sync::mpsc::sync_channel as std_sync_channel; -use std::sync::mpsc::SyncSender as StdSender; -use std::sync::mpsc::TrySendError as StdTrySendError; - -////////////////////////////////////////// -#[derive(Clone)] -pub struct LogSafeChannelCloser { - sender: Arc>>>, -} - -impl LogSafeChannelCloser { - pub fn close(&self) { - // Drop the sender - self.sender.lock().take(); - } -} - -////////////////////////////////////////// -pub struct LogSafeChannelWriterShim { - sender: Arc>>>, -} - -impl std::io::Write for LogSafeChannelWriterShim { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - let bufstr = String::from_utf8_lossy(buf).to_string(); - let sender = self.sender.lock(); - if let Some(sender) = &*sender { - if let Err(e) = sender.try_send(bufstr) { - match e { - StdTrySendError::Full(_) => { - Err(std::io::Error::from(std::io::ErrorKind::WouldBlock)) - } - StdTrySendError::Disconnected(_) => { - Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe)) - } - } - } else { - Ok(buf.len()) - } - } else { - Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe)) - } - } - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} - -pub type LogSafeChannelWriter = std::io::LineWriter; - -////////////////////////////////////////// - -#[derive(Clone)] -pub struct LogSafeChannel { - async_receiver: AsyncReceiver, -} - -impl LogSafeChannel { - pub fn new() -> (Self, LogSafeChannelWriter, LogSafeChannelCloser) { - let (async_sender, async_receiver) = async_bounded(1024); - let (std_sender, std_receiver) = std_sync_channel(1024); - let shared_std_sender = Arc::new(Mutex::new(Some(std_sender))); - - // Spawn a processing thread for the blocking std sender - async_std::task::spawn(async move { - #[allow(clippy::while_let_loop)] - loop { - let message = match std_receiver.recv() { - Ok(v) => v, - Err(_) => break, - }; - if async_sender.send(message).await.is_err() { - break; - } - } - }); - - ( - Self { async_receiver }, - LogSafeChannelWriter::with_capacity( - 65536, - LogSafeChannelWriterShim { - sender: shared_std_sender.clone(), - }, - ), - LogSafeChannelCloser { - sender: shared_std_sender, - }, - ) - } - - pub async fn recv(&mut self) -> Result { - self.async_receiver.recv().await - } -} diff --git a/veilid-server/src/main.rs b/veilid-server/src/main.rs index e7eca527..7de5da7d 100644 --- a/veilid-server/src/main.rs +++ b/veilid-server/src/main.rs @@ -4,7 +4,6 @@ mod client_api; mod cmdline; -mod log_safe_channel; mod server; mod settings; #[cfg(unix)] @@ -58,9 +57,9 @@ fn main() -> Result<(), String> { // Handle non-normal server modes if !matches!(server_mode, ServerMode::Normal) { // Init combined console/file logger - let logs = VeilidLogs::setup(settings.clone())?; + let _logs = VeilidLogs::setup(settings.clone())?; // run the server to set the node id and quit - return task::block_on(async { run_veilid_server(settings, logs, server_mode).await }) + return task::block_on(async { run_veilid_server(settings, server_mode).await }) .map(|v| { println!("{}", success); v @@ -83,7 +82,7 @@ fn main() -> Result<(), String> { } // Init combined console/file logger - let logs = VeilidLogs::setup(settings.clone())?; + let _logs = VeilidLogs::setup(settings.clone())?; // --- Normal Startup --- ctrlc::set_handler(move || { @@ -92,5 +91,5 @@ fn main() -> Result<(), String> { .expect("Error setting Ctrl-C handler"); // Run the server loop - task::block_on(async { run_veilid_server(settings, logs, server_mode).await }) + task::block_on(async { run_veilid_server(settings, server_mode).await }) } diff --git a/veilid-server/src/server.rs b/veilid-server/src/server.rs index d81a4128..fcc63b65 100644 --- a/veilid-server/src/server.rs +++ b/veilid-server/src/server.rs @@ -1,6 +1,5 @@ use crate::client_api; use crate::settings::*; -use crate::veilid_logs::*; use flume::{bounded, Receiver, Sender}; use lazy_static::*; use parking_lot::Mutex; @@ -28,12 +27,8 @@ pub fn shutdown() { } } -pub async fn run_veilid_server( - settings: Settings, - logs: VeilidLogs, - server_mode: ServerMode, -) -> Result<(), String> { - run_veilid_server_internal(settings, logs, server_mode) +pub async fn run_veilid_server(settings: Settings, server_mode: ServerMode) -> Result<(), String> { + run_veilid_server_internal(settings, server_mode) .await .map_err(|e| { error!("{}", e); @@ -42,7 +37,6 @@ pub async fn run_veilid_server( } pub async fn run_veilid_server_internal( settings: Settings, - logs: VeilidLogs, server_mode: ServerMode, ) -> Result<(), String> { let settingsr = settings.read(); diff --git a/veilid-server/src/unix.rs b/veilid-server/src/unix.rs index 03253ca2..96a878ac 100644 --- a/veilid-server/src/unix.rs +++ b/veilid-server/src/unix.rs @@ -93,7 +93,7 @@ pub fn run_daemon(settings: Settings, _matches: ArgMatches) -> Result<(), String }; // Init combined console/file logger - let logs = VeilidLogs::setup(settings.clone())?; + let _logs = VeilidLogs::setup(settings.clone())?; // Daemonize daemon @@ -109,7 +109,7 @@ pub fn run_daemon(settings: Settings, _matches: ArgMatches) -> Result<(), String let signals_task = async_std::task::spawn(handle_signals(signals)); - let res = run_veilid_server(settings, logs, ServerMode::Normal).await; + let res = run_veilid_server(settings, ServerMode::Normal).await; // Terminate the signal stream. handle.close(); diff --git a/veilid-server/src/veilid_logs.rs b/veilid-server/src/veilid_logs.rs index 55434924..64637098 100644 --- a/veilid-server/src/veilid_logs.rs +++ b/veilid-server/src/veilid_logs.rs @@ -15,6 +15,12 @@ fn logfilter, V: AsRef<[T]>>( max_level: veilid_core::VeilidLogLevel, ignore_list: V, ) -> bool { + // Skip things out of level + let log_level = veilid_core::VeilidLogLevel::from_tracing_level(*metadata.level()); + if log_level <= max_level { + return true; + } + // Skip filtered targets !match (metadata.target(), ignore_list.as_ref()) { (path, ignore) if !ignore.is_empty() => { @@ -46,6 +52,7 @@ impl VeilidLogs { let subscriber = subscriber.with(if settingsr.logging.terminal.enabled { let terminal_max_log_level = convert_loglevel(settingsr.logging.terminal.level); + let ignore_list = ignore_list.clone(); Some( fmt::Layer::new() .compact() @@ -86,11 +93,12 @@ impl VeilidLogs { tracing_appender::non_blocking(appender); guard = Some(non_blocking_guard); + let ignore_list = ignore_list.clone(); Some( fmt::Layer::new() .compact() .with_writer(non_blocking_appender) - .with_filter(filter::FilterFn::new(|metadata| { + .with_filter(filter::FilterFn::new(move |metadata| { logfilter(metadata, file_max_log_level, &ignore_list) })), ) @@ -108,8 +116,9 @@ impl VeilidLogs { cfg_if! { if #[cfg(target_os = "linux")] { let subscriber = subscriber.with(if settingsr.logging.system.enabled { + let ignore_list = ignore_list.clone(); let system_max_log_level = convert_loglevel(settingsr.logging.system.level); - Some(tracing_journald::layer().map_err(|e| format!("failed to set up journald logging: {}", e))?.with_filter(filter::FilterFn::new(|metadata| { + Some(tracing_journald::layer().map_err(|e| format!("failed to set up journald logging: {}", e))?.with_filter(filter::FilterFn::new(move |metadata| { logfilter(metadata, system_max_log_level, &ignore_list) }))) } else {