diff --git a/Cargo.lock b/Cargo.lock index f841b887..eddd7050 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6158,6 +6158,7 @@ dependencies = [ "cursive-flexi-logger-view", "cursive_buffered_backend", "cursive_table_view", + "data-encoding", "directories", "flexi_logger", "flume", diff --git a/veilid-cli/Cargo.toml b/veilid-cli/Cargo.toml index 021275d2..1701aa1e 100644 --- a/veilid-cli/Cargo.toml +++ b/veilid-cli/Cargo.toml @@ -41,10 +41,11 @@ flexi_logger = { version = "^0", features = ["use_chrono_for_offset"] } thiserror = "^1" crossbeam-channel = "^0" hex = "^0" -veilid-tools = { path = "../veilid-tools", default-features = false } +veilid-tools = { path = "../veilid-tools" } json = "^0" stop-token = { version = "^0", default-features = false } flume = { version = "^0", features = ["async"] } +data-encoding = { version = "^2" } [dev-dependencies] serial_test = "^0" diff --git a/veilid-cli/src/client_api_connection.rs b/veilid-cli/src/client_api_connection.rs index 21d4ba5c..b7e12670 100644 --- a/veilid-cli/src/client_api_connection.rs +++ b/veilid-cli/src/client_api_connection.rs @@ -1,12 +1,12 @@ use crate::command_processor::*; use crate::tools::*; -use serde::de::DeserializeOwned; -use std::cell::RefCell; +use core::str::FromStr; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use std::net::SocketAddr; -use std::rc::Rc; -use stop_token::{future::FutureExt as _, StopSource, StopToken}; +use std::time::SystemTime; +use stop_token::{future::FutureExt as _, StopSource}; -use veilid_tools::*; cfg_if! { if #[cfg(feature="rt-async-std")] { use async_std::io::prelude::BufReadExt; @@ -19,80 +19,41 @@ cfg_if! { } } -// fn map_to_internal_error(e: T) -> VeilidAPIError { -// VeilidAPIError::Internal { -// message: e.to_string(), -// } -// } - -// fn decode_api_result( -// reader: &api_result::Reader, -// ) -> VeilidAPIResult { -// match reader.which().map_err(map_to_internal_error)? { -// api_result::Which::Ok(v) => { -// let ok_val = v.map_err(map_to_internal_error)?; -// let res: T = veilid_core::deserialize_json(ok_val).map_err(map_to_internal_error)?; -// Ok(res) -// } -// api_result::Which::Err(e) => { -// let err_val = e.map_err(map_to_internal_error)?; -// let res: VeilidAPIError = -// veilid_core::deserialize_json(err_val).map_err(map_to_internal_error)?; -// Err(res) -// } -// } -// } - -// struct VeilidClientImpl { -// comproc: CommandProcessor, -// } - -// impl VeilidClientImpl { -// pub fn new(comproc: CommandProcessor) -> Self { -// Self { comproc } -// } -// } - -// } - struct ClientApiConnectionInner { comproc: CommandProcessor, connect_addr: Option, - server: Option>, + request_sender: Option>, server_settings: Option, disconnector: Option, disconnect_requested: bool, - cancel_eventual: Eventual, + reply_channels: HashMap>, + next_req_id: u32, } -type Handle = Rc>; - #[derive(Clone)] pub struct ClientApiConnection { - inner: Handle, + inner: Arc>, } impl ClientApiConnection { pub fn new(comproc: CommandProcessor) -> Self { Self { - inner: Rc::new(RefCell::new(ClientApiConnectionInner { + inner: Arc::new(Mutex::new(ClientApiConnectionInner { comproc, connect_addr: None, - server: None, + request_sender: None, server_settings: None, disconnector: None, disconnect_requested: false, - cancel_eventual: Eventual::new(), + reply_channels: HashMap::new(), + next_req_id: 0, })), } } - pub fn cancel(&self) { - let eventual = { - let inner = self.inner.borrow(); - inner.cancel_eventual.clone() - }; - eventual.resolve(); // don't need to await this + pub fn cancel_all(&self) { + let mut inner = self.inner.lock(); + inner.reply_channels.clear(); } // async fn process_veilid_state<'a>( @@ -106,8 +67,33 @@ impl ClientApiConnection { // Ok(()) // } + async fn process_response(&self, response: json::JsonValue) { + // find the operation id and send the response to the channel for it + let Some(id_str) = response["id"].as_str() else { + error!("missing id: {}", response); + return; + }; + let Ok(id) = u32::from_str(id_str) else { + error!("invalid id: {}", response); + return; + }; + + let reply_channel = { + let mut inner = self.inner.lock(); + inner.reply_channels.remove(&id) + }; + let Some(reply_channel) = reply_channel else { + warn!("received cancelled reply: {}", response); + return; + }; + if let Err(e) = reply_channel.send_async(response).await { + error!("failed to process reply: {}", e); + return; + } + } + async fn process_update(&self, update: json::JsonValue) { - let comproc = self.inner.borrow().comproc.clone(); + let comproc = self.inner.lock().comproc.clone(); let Some(kind) = update["kind"].as_str() else { comproc.log_message(format!("missing update kind: {}", update)); return; @@ -145,7 +131,7 @@ impl ClientApiConnection { } // async fn spawn_rpc_system( - // &mut self, + // &self, // connect_addr: SocketAddr, // mut rpc_system: RpcSystem, // ) -> Result<(), String> { @@ -235,18 +221,9 @@ impl ClientApiConnection { // res.map_err(|e| format!("client RPC system error: {}", e)) // } - async fn handle_connection(&mut self, connect_addr: SocketAddr) -> Result<(), String> { + async fn handle_connection(&self, connect_addr: SocketAddr) -> Result<(), String> { trace!("ClientApiConnection::handle_connection"); - let stop_token = { - let stop_source = StopSource::new(); - let token = stop_source.token(); - let mut inner = self.inner.borrow_mut(); - inner.connect_addr = Some(connect_addr); - inner.disconnector = Some(stop_source); - token - }; - // Connect the TCP socket let stream = TcpStream::connect(connect_addr) .await @@ -255,283 +232,245 @@ impl ClientApiConnection { // If it succeed, disable nagle algorithm stream.set_nodelay(true).map_err(map_to_string)?; + // State we connected + let comproc = self.inner.lock().comproc.clone(); + comproc.set_connection_state(ConnectionState::Connected(connect_addr, SystemTime::now())); + // Split the stream cfg_if! { if #[cfg(feature="rt-async-std")] { use futures::AsyncReadExt; - let (reader, writer) = stream.split(); + let (reader, mut writer) = stream.split(); let mut reader = BufReader::new(reader); } else if #[cfg(feature="rt-tokio")] { - let (reader, writer) = stream.into_split(); + let (reader, mut writer) = stream.into_split(); let mut reader = BufReader::new(reader); } } + // Requests to send + let (requests_tx, requests_rx) = flume::unbounded(); + + let stop_token = { + let stop_source = StopSource::new(); + let token = stop_source.token(); + let mut inner = self.inner.lock(); + inner.connect_addr = Some(connect_addr); + inner.disconnector = Some(stop_source); + inner.request_sender = Some(requests_tx); + token + }; + + // Futures to process unordered + let mut unord = FuturesUnordered::new(); + // Process lines - let mut line = String::new(); - while let Ok(r) = reader - .read_line(&mut line) - .timeout_at(stop_token.clone()) - .await - { - match r { - Ok(size) => { - // Exit on EOF - if size == 0 { - // Disconnected - return Err("Connection closed".to_owned()); - } - } - Err(e) => { + let this = self.clone(); + let recv_messages_future = async move { + let mut line = String::new(); + while let Ok(size) = reader.read_line(&mut line).await { + // Exit on EOF + if size == 0 { // Disconnected - return Err("Connection lost".to_owned()); + break; + } + + // Unmarshal json + let j = match json::parse(line.trim()) { + Ok(v) => v, + Err(e) => { + error!("failed to parse server response: {}", e); + continue; + } + }; + + if j["type"] == "Update" { + this.process_update(j).await; + } else if j["type"] == "Response" { + this.process_response(j).await; } } + // + let mut inner = this.inner.lock(); + inner.request_sender = None; + }; + unord.push(system_boxed(recv_messages_future)); - // Unmarshal json - let j = match json::parse(line.trim()) { - Ok(v) => v, - Err(e) => { - error!("failed to parse server response: {}", e); - continue; + // Requests send processor + let send_requests_future = async move { + while let Ok(req) = requests_rx.recv_async().await { + if let Err(e) = writer.write_all(req.as_bytes()).await { + error!("failed to write request: {}", e) } - }; - - if j["type"] == "Update" { - self.process_update(j).await; } - } + }; + unord.push(system_boxed(send_requests_future)); - // Connection finished - Ok(()) - - // let rpc_network = Box::new(twoparty::VatNetwork::new( - // reader, - // writer, - // rpc_twoparty_capnp::Side::Client, - // Default::default(), - // )); - - // // Create the rpc system - // let rpc_system = RpcSystem::new(rpc_network, None); - - // // Process the rpc system until we decide we're done - // match self.spawn_rpc_system(connect_addr, rpc_system).await { - // Ok(()) => {} - // Err(e) => { - // error!("Failed to spawn client RPC system: {}", e); - // } - // } + // Send and receive until we're done or a stop is requested + while let Ok(Some(())) = unord.next().timeout_at(stop_token.clone()).await {} // // Drop the server and disconnector too (if we still have it) - // let mut inner = self.inner.borrow_mut(); - // let disconnect_requested = inner.disconnect_requested; - // inner.server_settings = None; - // inner.server = None; - // inner.disconnector = None; - // inner.disconnect_requested = false; - // inner.connect_addr = None; + let mut inner = self.inner.lock(); + let disconnect_requested = inner.disconnect_requested; + inner.server_settings = None; + inner.request_sender = None; + inner.disconnector = None; + inner.disconnect_requested = false; + inner.connect_addr = None; + + // Connection finished + if disconnect_requested { + Ok(()) + } else { + Err("Connection lost".to_owned()) + } } - // pub fn cancellable(&mut self, p: Promise) -> Promise - // where - // T: 'static, - // { - // let (mut cancel_instance, cancel_eventual) = { - // let inner = self.inner.borrow(); - // ( - // inner.cancel_eventual.instance_empty().fuse(), - // inner.cancel_eventual.clone(), - // ) - // }; - // let mut p = p.fuse(); + async fn perform_request(&self, mut req: json::JsonValue) -> Option { + let (sender, reply_rx) = { + let mut inner = self.inner.lock(); - // Promise::from_future(async move { - // let out = select! { - // a = p => { - // a - // }, - // _ = cancel_instance => { - // Err(capnp::Error::failed("cancelled".into())) - // } - // }; - // drop(cancel_instance); - // cancel_eventual.reset(); - // out - // }) - // } + // Get the request sender + let Some(sender) = inner.request_sender.clone() else { + error!("dropping request, not connected"); + return None; + }; - pub async fn server_attach(&mut self) -> Result<(), String> { + // Get next id + let id = inner.next_req_id; + inner.next_req_id += 1; + + // Add the id + req["id"] = id.into(); + + // Make a reply receiver + let (reply_tx, reply_rx) = flume::bounded(1); + inner.reply_channels.insert(id, reply_tx); + (sender, reply_rx) + }; + + // Send the request + let req_ndjson = req.dump() + "\n"; + if let Err(e) = sender.send_async(req_ndjson).await { + error!("failed to send request: {}", e); + return None; + } + + // Wait for the reply + let Ok(r) = reply_rx.recv_async().await else { + // Cancelled + return None; + }; + + Some(r) + } + + pub async fn server_attach(&self) -> Result<(), String> { trace!("ClientApiConnection::server_attach"); - let server = { - let inner = self.inner.borrow(); - inner - .server - .as_ref() - .ok_or_else(|| "Not connected, ignoring attach request".to_owned())? - .clone() + + let mut req = json::JsonValue::new_object(); + req["op"] = "Attach".into(); + let Some(resp) = self.perform_request(req).await else { + return Err("Cancelled".to_owned()); }; - let request = server.borrow().attach_request(); - let response = self - .cancellable(request.send().promise) - .await - .map_err(map_to_string)?; - let reader = response - .get() - .map_err(map_to_string)? - .get_result() - .map_err(map_to_string)?; - let res: VeilidAPIResult<()> = decode_api_result(&reader); - res.map_err(map_to_string) + if resp.has_key("error") { + return Err(resp["error"].to_string()); + } + Ok(()) } - pub async fn server_detach(&mut self) -> Result<(), String> { + pub async fn server_detach(&self) -> Result<(), String> { trace!("ClientApiConnection::server_detach"); - let server = { - let inner = self.inner.borrow(); - inner - .server - .as_ref() - .ok_or_else(|| "Not connected, ignoring detach request".to_owned())? - .clone() + let mut req = json::JsonValue::new_object(); + req["op"] = "Detach".into(); + let Some(resp) = self.perform_request(req).await else { + return Err("Cancelled".to_owned()); }; - let request = server.borrow().detach_request(); - let response = self - .cancellable(request.send().promise) - .await - .map_err(map_to_string)?; - let reader = response - .get() - .map_err(map_to_string)? - .get_result() - .map_err(map_to_string)?; - let res: VeilidAPIResult<()> = decode_api_result(&reader); - res.map_err(map_to_string) + if resp.has_key("error") { + return Err(resp["error"].to_string()); + } + Ok(()) } - pub async fn server_shutdown(&mut self) -> Result<(), String> { + pub async fn server_shutdown(&self) -> Result<(), String> { trace!("ClientApiConnection::server_shutdown"); - let server = { - let inner = self.inner.borrow(); - inner - .server - .as_ref() - .ok_or_else(|| "Not connected, ignoring attach request".to_owned())? - .clone() + let mut req = json::JsonValue::new_object(); + req["op"] = "Control".into(); + req["args"] = json::JsonValue::new_array(); + req["args"].push("Shutdown").unwrap(); + let Some(resp) = self.perform_request(req).await else { + return Err("Cancelled".to_owned()); }; - let request = server.borrow().shutdown_request(); - let response = self - .cancellable(request.send().promise) - .await - .map_err(map_to_string)?; - response.get().map(drop).map_err(map_to_string) + if resp.has_key("error") { + return Err(resp["error"].to_string()); + } + Ok(()) } - pub async fn server_debug(&mut self, what: String) -> Result { + pub async fn server_debug(&self, what: String) -> Result { trace!("ClientApiConnection::server_debug"); - let server = { - let inner = self.inner.borrow(); - inner - .server - .as_ref() - .ok_or_else(|| "Not connected, ignoring debug request".to_owned())? - .clone() + let mut req = json::JsonValue::new_object(); + req["op"] = "Debug".into(); + req["command"] = what.into(); + let Some(resp) = self.perform_request(req).await else { + return Err("Cancelled".to_owned()); }; - let mut request = server.borrow().debug_request(); - request.get().set_command(&what); - let response = self - .cancellable(request.send().promise) - .await - .map_err(map_to_string)?; - let reader = response - .get() - .map_err(map_to_string)? - .get_result() - .map_err(map_to_string)?; - let res: VeilidAPIResult = decode_api_result(&reader); - res.map_err(map_to_string) + if resp.has_key("error") { + return Err(resp["error"].to_string()); + } + Ok(resp["value"].to_string()) } pub async fn server_change_log_level( - &mut self, + &self, layer: String, - log_level: VeilidConfigLogLevel, + log_level: String, ) -> Result<(), String> { trace!("ClientApiConnection::change_log_level"); - let server = { - let inner = self.inner.borrow(); - inner - .server - .as_ref() - .ok_or_else(|| "Not connected, ignoring change_log_level request".to_owned())? - .clone() + let mut req = json::JsonValue::new_object(); + req["op"] = "Control".into(); + req["args"] = json::JsonValue::new_array(); + req["args"].push("ChangeLogLevel").unwrap(); + req["args"].push(layer).unwrap(); + req["args"].push(log_level).unwrap(); + let Some(resp) = self.perform_request(req).await else { + return Err("Cancelled".to_owned()); }; - let mut request = server.borrow().change_log_level_request(); - request.get().set_layer(&layer); - let log_level_json = veilid_core::serialize_json(&log_level); - request.get().set_log_level(&log_level_json); - let response = self - .cancellable(request.send().promise) - .await - .map_err(map_to_string)?; - let reader = response - .get() - .map_err(map_to_string)? - .get_result() - .map_err(map_to_string)?; - let res: VeilidAPIResult<()> = decode_api_result(&reader); - res.map_err(map_to_string) + if resp.has_key("error") { + return Err(resp["error"].to_string()); + } + Ok(()) } - pub async fn server_appcall_reply( - &mut self, - id: OperationId, - msg: Vec, - ) -> Result<(), String> { + pub async fn server_appcall_reply(&self, id: u64, msg: Vec) -> Result<(), String> { trace!("ClientApiConnection::appcall_reply"); - let server = { - let inner = self.inner.borrow(); - inner - .server - .as_ref() - .ok_or_else(|| "Not connected, ignoring change_log_level request".to_owned())? - .clone() + let mut req = json::JsonValue::new_object(); + req["op"] = "AppCallReply".into(); + req["call_id"] = id.to_string().into(); + req["message"] = data_encoding::BASE64URL_NOPAD.encode(&msg).into(); + let Some(resp) = self.perform_request(req).await else { + return Err("Cancelled".to_owned()); }; - let mut request = server.borrow().app_call_reply_request(); - request.get().set_id(id.as_u64()); - request.get().set_message(&msg); - let response = self - .cancellable(request.send().promise) - .await - .map_err(map_to_string)?; - let reader = response - .get() - .map_err(map_to_string)? - .get_result() - .map_err(map_to_string)?; - let res: VeilidAPIResult<()> = decode_api_result(&reader); - res.map_err(map_to_string) + if resp.has_key("error") { + return Err(resp["error"].to_string()); + } + Ok(()) } // Start Client API connection - pub async fn connect(&mut self, connect_addr: SocketAddr) -> Result<(), String> { + pub async fn connect(&self, connect_addr: SocketAddr) -> Result<(), String> { trace!("ClientApiConnection::connect"); // Save the address to connect to self.handle_connection(connect_addr).await } // End Client API connection - pub async fn disconnect(&mut self) { + pub async fn disconnect(&self) { trace!("ClientApiConnection::disconnect"); - let disconnector = self.inner.borrow_mut().disconnector.take(); - match disconnector { - Some(d) => { - self.inner.borrow_mut().disconnect_requested = true; - d.await.unwrap(); - } - None => { - debug!("disconnector doesn't exist"); - } + let mut inner = self.inner.lock(); + if inner.disconnector.is_some() { + inner.disconnector = None; + inner.disconnect_requested = true; } } } diff --git a/veilid-cli/src/command_processor.rs b/veilid-cli/src/command_processor.rs index f47ca014..6cbc6331 100644 --- a/veilid-cli/src/command_processor.rs +++ b/veilid-cli/src/command_processor.rs @@ -1,20 +1,19 @@ use crate::client_api_connection::*; use crate::settings::Settings; +use crate::tools::*; use crate::ui::*; -use std::cell::*; use std::net::SocketAddr; -use std::rc::Rc; use std::time::SystemTime; use veilid_tools::*; -pub fn convert_loglevel(s: &str) -> Result { +pub fn convert_loglevel(s: &str) -> Result { match s.to_ascii_lowercase().as_str() { - "off" => Ok(VeilidConfigLogLevel::Off), - "error" => Ok(VeilidConfigLogLevel::Error), - "warn" => Ok(VeilidConfigLogLevel::Warn), - "info" => Ok(VeilidConfigLogLevel::Info), - "debug" => Ok(VeilidConfigLogLevel::Debug), - "trace" => Ok(VeilidConfigLogLevel::Trace), + "off" => Ok("Off".to_owned()), + "error" => Ok("Error".to_owned()), + "warn" => Ok("Warn".to_owned()), + "info" => Ok("Info".to_owned()), + "debug" => Ok("Debug".to_owned()), + "trace" => Ok("Trace".to_owned()), _ => Err(format!("Invalid log level: {}", s)), } } @@ -38,7 +37,7 @@ impl ConnectionState { } struct CommandProcessorInner { - ui: UI, + ui_sender: UISender, capi: Option, reconnect: bool, finished: bool, @@ -46,21 +45,19 @@ struct CommandProcessorInner { autoreconnect: bool, server_addr: Option, connection_waker: Eventual, - last_call_id: Option, + last_call_id: Option, } -type Handle = Rc>; - #[derive(Clone)] pub struct CommandProcessor { - inner: Handle, + inner: Arc>, } impl CommandProcessor { - pub fn new(ui: UI, settings: &Settings) -> Self { + pub fn new(ui_sender: UISender, settings: &Settings) -> Self { Self { - inner: Rc::new(RefCell::new(CommandProcessorInner { - ui, + inner: Arc::new(Mutex::new(CommandProcessorInner { + ui_sender, capi: None, reconnect: settings.autoreconnect, finished: false, @@ -72,20 +69,20 @@ impl CommandProcessor { })), } } - pub fn set_client_api_connection(&mut self, capi: ClientApiConnection) { - self.inner.borrow_mut().capi = Some(capi); + pub fn set_client_api_connection(&self, capi: ClientApiConnection) { + self.inner.lock().capi = Some(capi); } - fn inner(&self) -> Ref { - self.inner.borrow() + fn inner(&self) -> MutexGuard { + self.inner.lock() } - fn inner_mut(&self) -> RefMut { - self.inner.borrow_mut() + fn inner_mut(&self) -> MutexGuard { + self.inner.lock() } - fn ui(&self) -> UI { - self.inner.borrow().ui.clone() + fn ui_sender(&self) -> UISender { + self.inner.lock().ui_sender.clone() } fn capi(&self) -> ClientApiConnection { - self.inner.borrow().capi.as_ref().unwrap().clone() + self.inner.lock().capi.as_ref().unwrap().clone() } fn word_split(line: &str) -> (String, Option) { @@ -102,12 +99,12 @@ impl CommandProcessor { pub fn cancel_command(&self) { trace!("CommandProcessor::cancel_command"); let capi = self.capi(); - capi.cancel(); + capi.cancel_all(); } pub fn cmd_help(&self, _rest: Option, callback: UICallback) -> Result<(), String> { trace!("CommandProcessor::cmd_help"); - self.ui().add_node_event( + self.ui_sender().add_node_event( r#"Commands: exit/quit - exit the client disconnect - disconnect the client from the Veilid node @@ -120,14 +117,14 @@ reply - reply to an AppCall not handled directly by the server "# .to_owned(), ); - let ui = self.ui(); + let ui = self.ui_sender(); ui.send_callback(callback); Ok(()) } pub fn cmd_exit(&self, callback: UICallback) -> Result<(), String> { trace!("CommandProcessor::cmd_exit"); - let ui = self.ui(); + let ui = self.ui_sender(); ui.send_callback(callback); ui.quit(); Ok(()) @@ -135,8 +132,8 @@ reply - reply to an AppCall not handled directly by the server pub fn cmd_shutdown(&self, callback: UICallback) -> Result<(), String> { trace!("CommandProcessor::cmd_shutdown"); - let mut capi = self.capi(); - let ui = self.ui(); + let capi = self.capi(); + let ui = self.ui_sender(); spawn_detached_local(async move { if let Err(e) = capi.server_shutdown().await { error!("Server command 'shutdown' failed to execute: {}", e); @@ -148,8 +145,8 @@ reply - reply to an AppCall not handled directly by the server pub fn cmd_attach(&self, callback: UICallback) -> Result<(), String> { trace!("CommandProcessor::cmd_attach"); - let mut capi = self.capi(); - let ui = self.ui(); + let capi = self.capi(); + let ui = self.ui_sender(); spawn_detached_local(async move { if let Err(e) = capi.server_attach().await { error!("Server command 'attach' failed: {}", e); @@ -161,8 +158,8 @@ reply - reply to an AppCall not handled directly by the server pub fn cmd_detach(&self, callback: UICallback) -> Result<(), String> { trace!("CommandProcessor::cmd_detach"); - let mut capi = self.capi(); - let ui = self.ui(); + let capi = self.capi(); + let ui = self.ui_sender(); spawn_detached_local(async move { if let Err(e) = capi.server_detach().await { error!("Server command 'detach' failed: {}", e); @@ -174,8 +171,8 @@ reply - reply to an AppCall not handled directly by the server pub fn cmd_disconnect(&self, callback: UICallback) -> Result<(), String> { trace!("CommandProcessor::cmd_disconnect"); - let mut capi = self.capi(); - let ui = self.ui(); + let capi = self.capi(); + let ui = self.ui_sender(); spawn_detached_local(async move { capi.disconnect().await; ui.send_callback(callback); @@ -185,8 +182,8 @@ reply - reply to an AppCall not handled directly by the server pub fn cmd_debug(&self, rest: Option, callback: UICallback) -> Result<(), String> { trace!("CommandProcessor::cmd_debug"); - let mut capi = self.capi(); - let ui = self.ui(); + let capi = self.capi(); + let ui = self.ui_sender(); spawn_detached_local(async move { match capi.server_debug(rest.unwrap_or_default()).await { Ok(output) => ui.display_string_dialog("Debug Output", output, callback), @@ -202,8 +199,8 @@ reply - reply to an AppCall not handled directly by the server callback: UICallback, ) -> Result<(), String> { trace!("CommandProcessor::cmd_change_log_level"); - let mut capi = self.capi(); - let ui = self.ui(); + let capi = self.capi(); + let ui = self.ui_sender(); spawn_detached_local(async move { let (layer, rest) = Self::word_split(&rest.unwrap_or_default()); let log_level = match convert_loglevel(&rest.unwrap_or_default()) { @@ -234,8 +231,8 @@ reply - reply to an AppCall not handled directly by the server pub fn cmd_reply(&self, rest: Option, callback: UICallback) -> Result<(), String> { trace!("CommandProcessor::cmd_reply"); - let mut capi = self.capi(); - let ui = self.ui(); + let capi = self.capi(); + let ui = self.ui_sender(); let some_last_id = self.inner_mut().last_call_id.take(); spawn_detached_local(async move { let (first, second) = Self::word_split(&rest.clone().unwrap_or_default()); @@ -248,7 +245,7 @@ reply - reply to an AppCall not handled directly by the server } Ok(v) => v, }; - (OperationId::new(id), second) + (id, second) } else { let id = match some_last_id { None => { @@ -306,14 +303,14 @@ reply - reply to an AppCall not handled directly by the server "change_log_level" => self.cmd_change_log_level(rest, callback), "reply" => self.cmd_reply(rest, callback), _ => { - let ui = self.ui(); + let ui = self.ui_sender(); ui.send_callback(callback); Err(format!("Invalid command: {}", cmd)) } } } - pub async fn connection_manager(&mut self) { + pub async fn connection_manager(&self) { // Connect until we're done while !self.inner_mut().finished { // Wait for connection request @@ -341,7 +338,7 @@ reply - reply to an AppCall not handled directly by the server } else { debug!("Retrying connection to {}", server_addr); } - let mut capi = self.capi(); + let capi = self.capi(); let res = capi.connect(server_addr).await; if res.is_ok() { info!( @@ -376,7 +373,7 @@ reply - reply to an AppCall not handled directly by the server // called by ui //////////////////////////////////////////// - pub fn set_server_address(&mut self, server_addr: Option) { + pub fn set_server_address(&self, server_addr: Option) { self.inner_mut().server_addr = server_addr; } pub fn get_server_address(&self) -> Option { @@ -386,54 +383,61 @@ reply - reply to an AppCall not handled directly by the server // calls into ui //////////////////////////////////////////// - pub fn log_message(&mut self, message: String) { - self.inner().ui.add_node_event(message); + pub fn log_message(&self, message: String) { + self.inner().ui_sender.add_node_event(message); } - pub fn update_attachment(&mut self, attachment: json::JsonValue) { - self.inner_mut().ui.set_attachment_state( - attachment.state, - attachment.public_internet_ready, - attachment.local_network_ready, + pub fn update_attachment(&self, attachment: json::JsonValue) { + self.inner_mut().ui_sender.set_attachment_state( + attachment["state"].as_str().unwrap_or_default().to_owned(), + attachment["public_internet_ready"] + .as_bool() + .unwrap_or_default(), + attachment["local_network_ready"] + .as_bool() + .unwrap_or_default(), ); } - pub fn update_network_status(&mut self, network: veilid_core::VeilidStateNetwork) { - self.inner_mut().ui.set_network_status( - network.started, - network.bps_down.as_u64(), - network.bps_up.as_u64(), - network.peers, + pub fn update_network_status(&self, network: json::JsonValue) { + self.inner_mut().ui_sender.set_network_status( + network["started"].as_bool().unwrap_or_default(), + json_str_u64(&network["bps_down"]), + json_str_u64(&network["bps_up"]), + network["peers"] + .members() + .cloned() + .collect::>(), ); } - pub fn update_config(&mut self, config: veilid_core::VeilidStateConfig) { - self.inner_mut().ui.set_config(config.config) + pub fn update_config(&self, config: json::JsonValue) { + self.inner_mut().ui_sender.set_config(&config["config"]) } - pub fn update_route(&mut self, route: veilid_core::VeilidRouteChange) { + pub fn update_route(&self, route: json::JsonValue) { let mut out = String::new(); - if !route.dead_routes.is_empty() { - out.push_str(&format!("Dead routes: {:?}", route.dead_routes)); + if route["dead_routes"].len() != 0 { + out.push_str(&format!("Dead routes: {:?}", route["dead_routes"])); } - if !route.dead_remote_routes.is_empty() { + if route["dead_routes"].len() != 0 { if !out.is_empty() { out.push_str("\n"); } out.push_str(&format!( "Dead remote routes: {:?}", - route.dead_remote_routes + route["dead_remote_routes"] )); } if !out.is_empty() { - self.inner().ui.add_node_event(out); + self.inner().ui_sender.add_node_event(out); } } - pub fn update_value_change(&mut self, value_change: json::JsonValue) { + pub fn update_value_change(&self, value_change: json::JsonValue) { let out = format!("Value change: {:?}", value_change.as_str().unwrap_or("???")); - self.inner().ui.add_node_event(out); + self.inner().ui_sender.add_node_event(out); } - pub fn update_log(&mut self, log: json::JsonValue) { - self.inner().ui.add_node_event(format!( + pub fn update_log(&self, log: json::JsonValue) { + self.inner().ui_sender.add_node_event(format!( "{}: {}{}", log["log_level"].as_str().unwrap_or("???"), log["message"].as_str().unwrap_or("???"), @@ -445,79 +449,83 @@ reply - reply to an AppCall not handled directly by the server )); } - pub fn update_app_message(&mut self, msg: json::JsonValue) { + pub fn update_app_message(&self, msg: json::JsonValue) { + let message = json_str_vec_u8(&msg["message"]); + // check is message body is ascii printable let mut printable = true; - for c in msg.message() { + for c in &message { if *c < 32 || *c > 126 { printable = false; } } let strmsg = if printable { - String::from_utf8_lossy(msg.message()).to_string() + String::from_utf8_lossy(&message).to_string() } else { - hex::encode(msg.message()) + hex::encode(message) }; self.inner() - .ui - .add_node_event(format!("AppMessage ({:?}): {}", msg.sender(), strmsg)); + .ui_sender + .add_node_event(format!("AppMessage ({:?}): {}", msg["sender"], strmsg)); } - pub fn update_app_call(&mut self, call: veilid_core::VeilidAppCall) { + pub fn update_app_call(&self, call: json::JsonValue) { + let message = json_str_vec_u8(&call["message"]); + // check is message body is ascii printable let mut printable = true; - for c in call.message() { + for c in &message { if *c < 32 || *c > 126 { printable = false; } } let strmsg = if printable { - String::from_utf8_lossy(call.message()).to_string() + String::from_utf8_lossy(&message).to_string() } else { - format!("#{}", hex::encode(call.message())) + format!("#{}", hex::encode(&message)) }; - self.inner().ui.add_node_event(format!( + let id = json_str_u64(&call["id"]); + + self.inner().ui_sender.add_node_event(format!( "AppCall ({:?}) id = {:016x} : {}", - call.sender(), - call.id().as_u64(), - strmsg + call["sender"], id, strmsg )); - self.inner_mut().last_call_id = Some(call.id()); + self.inner_mut().last_call_id = Some(id); } - pub fn update_shutdown(&mut self) { + pub fn update_shutdown(&self) { // Do nothing with this, we'll process shutdown when rpc connection closes } // called by client_api_connection // calls into ui //////////////////////////////////////////// - pub fn set_connection_state(&mut self, state: ConnectionState) { - self.inner_mut().ui.set_connection_state(state); + pub fn set_connection_state(&self, state: ConnectionState) { + self.inner_mut().ui_sender.set_connection_state(state); } // called by ui //////////////////////////////////////////// - pub fn start_connection(&mut self) { + pub fn start_connection(&self) { self.inner_mut().reconnect = true; self.inner_mut().connection_waker.resolve(); } - // pub fn stop_connection(&mut self) { + // pub fn stop_connection(&self) { // self.inner_mut().reconnect = false; // let mut capi = self.capi().clone(); // spawn_detached(async move { // capi.disconnect().await; // }); // } - pub fn cancel_reconnect(&mut self) { + pub fn cancel_reconnect(&self) { self.inner_mut().reconnect = false; self.inner_mut().connection_waker.resolve(); } - pub fn quit(&mut self) { + pub fn quit(&self) { self.inner_mut().finished = true; self.inner_mut().reconnect = false; self.inner_mut().connection_waker.resolve(); @@ -526,8 +534,8 @@ reply - reply to an AppCall not handled directly by the server // called by ui // calls into client_api_connection //////////////////////////////////////////// - pub fn attach(&mut self) { - let mut capi = self.capi(); + pub fn attach(&self) { + let capi = self.capi(); spawn_detached_local(async move { if let Err(e) = capi.server_attach().await { @@ -536,8 +544,8 @@ reply - reply to an AppCall not handled directly by the server }); } - pub fn detach(&mut self) { - let mut capi = self.capi(); + pub fn detach(&self) { + let capi = self.capi(); spawn_detached_local(async move { if let Err(e) = capi.server_detach().await { diff --git a/veilid-cli/src/main.rs b/veilid-cli/src/main.rs index 517a02e2..f51c58e3 100644 --- a/veilid-cli/src/main.rs +++ b/veilid-cli/src/main.rs @@ -3,7 +3,6 @@ #![recursion_limit = "256"] use crate::tools::*; -use veilid_tools::*; use clap::{Arg, ColorChoice, Command}; use flexi_logger::*; @@ -92,7 +91,7 @@ fn main() -> Result<(), String> { } // Create UI object - let mut sivui = ui::UI::new(settings.interface.node_log.scrollback, &settings); + let (mut sivui, uisender) = ui::UI::new(settings.interface.node_log.scrollback, &settings); // Set up loggers { @@ -155,19 +154,19 @@ fn main() -> Result<(), String> { // Create command processor debug!("Creating Command Processor "); - let mut comproc = command_processor::CommandProcessor::new(sivui.clone(), &settings); + let comproc = command_processor::CommandProcessor::new(uisender, &settings); sivui.set_command_processor(comproc.clone()); // Create client api client side info!("Starting API connection"); - let mut capi = client_api_connection::ClientApiConnection::new(comproc.clone()); + let capi = client_api_connection::ClientApiConnection::new(comproc.clone()); // Save client api in command processor comproc.set_client_api_connection(capi.clone()); // Keep a connection to the server comproc.set_server_address(server_addr); - let mut comproc2 = comproc.clone(); + let comproc2 = comproc.clone(); let connection_future = comproc.connection_manager(); // Start async diff --git a/veilid-cli/src/peers_table_view.rs b/veilid-cli/src/peers_table_view.rs index e51aaf62..2d158de6 100644 --- a/veilid-cli/src/peers_table_view.rs +++ b/veilid-cli/src/peers_table_view.rs @@ -23,8 +23,11 @@ pub enum PeerTableColumn { // } // } -fn format_ts(ts: Timestamp) -> String { - let ts = ts.as_u64(); +fn format_ts(ts: &json::JsonValue) -> String { + if ts.is_null() { + return "---".to_owned(); + } + let ts = json_str_u64(ts); let secs = timestamp_to_secs(ts); if secs >= 1.0 { format!("{:.2}s", timestamp_to_secs(ts)) @@ -33,8 +36,11 @@ fn format_ts(ts: Timestamp) -> String { } } -fn format_bps(bps: ByteCount) -> String { - let bps = bps.as_u64(); +fn format_bps(bps: &json::JsonValue) -> String { + if bps.is_null() { + return "---".to_owned(); + } + let bps = json_str_u64(bps); if bps >= 1024u64 * 1024u64 * 1024u64 { format!("{:.2}GB/s", (bps / (1024u64 * 1024u64)) as f64 / 1024.0) } else if bps >= 1024u64 * 1024u64 { @@ -46,25 +52,20 @@ fn format_bps(bps: ByteCount) -> String { } } -impl TableViewItem for PeerTableData { +impl TableViewItem for json::JsonValue { fn to_column(&self, column: PeerTableColumn) -> String { match column { - PeerTableColumn::NodeId => self - .node_ids - .first() - .map(|n| n.to_string()) - .unwrap_or_else(|| "???".to_owned()), - PeerTableColumn::Address => self.peer_address.clone(), - PeerTableColumn::LatencyAvg => format!( - "{}", - self.peer_stats - .latency - .as_ref() - .map(|l| format_ts(l.average)) - .unwrap_or("---".to_owned()) - ), - PeerTableColumn::TransferDownAvg => format_bps(self.peer_stats.transfer.down.average), - PeerTableColumn::TransferUpAvg => format_bps(self.peer_stats.transfer.up.average), + PeerTableColumn::NodeId => self["node_ids"][0].to_string(), + PeerTableColumn::Address => self["peer_address"].to_string(), + PeerTableColumn::LatencyAvg => { + format!("{}", format_ts(&self["peer_stats"]["latency"]["average"])) + } + PeerTableColumn::TransferDownAvg => { + format_bps(&self["peer_stats"]["transfer"]["down"]["average"]) + } + PeerTableColumn::TransferUpAvg => { + format_bps(&self["peer_stats"]["transfer"]["up"]["average"]) + } } } @@ -75,26 +76,20 @@ impl TableViewItem for PeerTableData { match column { PeerTableColumn::NodeId => self.to_column(column).cmp(&other.to_column(column)), PeerTableColumn::Address => self.to_column(column).cmp(&other.to_column(column)), - PeerTableColumn::LatencyAvg => self - .peer_stats - .latency - .as_ref() - .map(|l| l.average) - .cmp(&other.peer_stats.latency.as_ref().map(|l| l.average)), - PeerTableColumn::TransferDownAvg => self - .peer_stats - .transfer - .down - .average - .cmp(&other.peer_stats.transfer.down.average), - PeerTableColumn::TransferUpAvg => self - .peer_stats - .transfer - .up - .average - .cmp(&other.peer_stats.transfer.up.average), + PeerTableColumn::LatencyAvg => json_str_u64(&self["peer_stats"]["latency"]["average"]) + .cmp(&json_str_u64(&other["peer_stats"]["latency"]["average"])), + PeerTableColumn::TransferDownAvg => { + json_str_u64(&self["peer_stats"]["transfer"]["down"]["average"]).cmp(&json_str_u64( + &other["peer_stats"]["transfer"]["down"]["average"], + )) + } + PeerTableColumn::TransferUpAvg => { + json_str_u64(&self["peer_stats"]["transfer"]["up"]["average"]).cmp(&json_str_u64( + &other["peer_stats"]["transfer"]["up"]["average"], + )) + } } } } -pub type PeersTableView = TableView; +pub type PeersTableView = TableView; diff --git a/veilid-cli/src/tools.rs b/veilid-cli/src/tools.rs index bf58d24c..7e6fe04c 100644 --- a/veilid-cli/src/tools.rs +++ b/veilid-cli/src/tools.rs @@ -1,5 +1,10 @@ -use cfg_if::*; +pub use cfg_if::*; +pub use log::*; +pub use parking_lot::*; +pub use veilid_tools::*; + use core::future::Future; +use core::str::FromStr; cfg_if! { if #[cfg(feature="rt-async-std")] { @@ -17,3 +22,13 @@ cfg_if! { } } + +pub fn json_str_u64(value: &json::JsonValue) -> u64 { + u64::from_str(value.as_str().unwrap_or_default()).unwrap_or_default() +} + +pub fn json_str_vec_u8(value: &json::JsonValue) -> Vec { + data_encoding::BASE64URL_NOPAD + .decode(value.as_str().unwrap_or_default().as_bytes()) + .unwrap_or_default() +} diff --git a/veilid-cli/src/ui.rs b/veilid-cli/src/ui.rs index 9a7c8ed3..a38ff6dc 100644 --- a/veilid-cli/src/ui.rs +++ b/veilid-cli/src/ui.rs @@ -1,6 +1,7 @@ use crate::command_processor::*; use crate::peers_table_view::*; use crate::settings::Settings; +use crate::tools::*; use crossbeam_channel::Sender; use cursive::align::*; use cursive::event::*; @@ -12,11 +13,8 @@ use cursive::Cursive; use cursive::CursiveRunnable; use cursive_flexi_logger_view::{CursiveLogWriter, FlexiLoggerView}; //use cursive_multiplex::*; -use std::cell::RefCell; use std::collections::{HashMap, VecDeque}; -use std::rc::Rc; use thiserror::Error; -use veilid_tools::*; ////////////////////////////////////////////////////////////// /// @@ -82,19 +80,15 @@ pub struct UIInner { ui_state: UIState, log_colors: HashMap, cmdproc: Option, - cb_sink: Sender>, cmd_history: VecDeque, cmd_history_position: usize, cmd_history_max_size: usize, connection_dialog_state: Option, } -type Handle = Rc>; - -#[derive(Clone)] pub struct UI { - siv: Handle, - inner: Handle, + siv: CursiveRunnable, + inner: Arc>, } #[derive(Error, Debug)] @@ -112,11 +106,11 @@ impl UI { inner.cmdproc.as_ref().unwrap().clone() } - fn inner(s: &mut Cursive) -> std::cell::Ref<'_, UIInner> { - s.user_data::>().unwrap().borrow() + fn inner(s: &mut Cursive) -> MutexGuard<'_, UIInner> { + s.user_data::>>().unwrap().lock() } - fn inner_mut(s: &mut Cursive) -> std::cell::RefMut<'_, UIInner> { - s.user_data::>().unwrap().borrow_mut() + fn inner_mut(s: &mut Cursive) -> MutexGuard<'_, UIInner> { + s.user_data::>>().unwrap().lock() } fn setup_colors(siv: &mut CursiveRunnable, inner: &mut UIInner, settings: &Settings) { @@ -425,7 +419,7 @@ impl UI { "Detaching" => None, _ => None, }; - let mut cmdproc = Self::command_processor(s); + let cmdproc = Self::command_processor(s); if let Some(a) = action { if a { cmdproc.attach(); @@ -707,7 +701,7 @@ impl UI { //////////////////////////////////////////////////////////////////////////// // Public functions - pub fn new(node_log_scrollback: usize, settings: &Settings) -> Self { + pub fn new(node_log_scrollback: usize, settings: &Settings) -> (Self, UISender) { cursive_flexi_logger_view::resize(node_log_scrollback); // Instantiate the cursive runnable @@ -726,9 +720,9 @@ impl UI { let cb_sink = runnable.cb_sink().clone(); // Create the UI object - let this = Self { - siv: Rc::new(RefCell::new(runnable)), - inner: Rc::new(RefCell::new(UIInner { + let mut this = Self { + siv: runnable, + inner: Arc::new(Mutex::new(UIInner { ui_state: UIState::new(), log_colors: Default::default(), cmdproc: None, @@ -740,15 +734,13 @@ impl UI { cmd_history_position: 0, cmd_history_max_size: settings.interface.command_line.history_size, connection_dialog_state: None, - cb_sink, })), }; - let mut siv = this.siv.borrow_mut(); - let mut inner = this.inner.borrow_mut(); + let mut inner = this.inner.lock(); // Make the inner object accessible in callbacks easily - siv.set_user_data(this.inner.clone()); + this.siv.set_user_data(this.inner.clone()); // Create layouts @@ -831,87 +823,44 @@ impl UI { .child(TextView::new(version)), ); - siv.add_fullscreen_layer(mainlayout); + this.siv.add_fullscreen_layer(mainlayout); - UI::setup_colors(&mut siv, &mut inner, settings); - UI::setup_quit_handler(&mut siv); - siv.set_global_callback(cursive::event::Event::CtrlChar('k'), UI::clear_handler); + UI::setup_colors(&mut this.siv, &mut inner, settings); + UI::setup_quit_handler(&mut this.siv); + this.siv + .set_global_callback(cursive::event::Event::CtrlChar('k'), UI::clear_handler); drop(inner); - drop(siv); - this + let inner = this.inner.clone(); + (this, UISender { inner, cb_sink }) } pub fn cursive_flexi_logger(&self) -> Box { - let mut flv = - cursive_flexi_logger_view::cursive_flexi_logger(self.siv.borrow().cb_sink().clone()); - flv.set_colors(self.inner.borrow().log_colors.clone()); + let mut flv = cursive_flexi_logger_view::cursive_flexi_logger(self.siv.cb_sink().clone()); + flv.set_colors(self.inner.lock().log_colors.clone()); flv } pub fn set_command_processor(&mut self, cmdproc: CommandProcessor) { - let mut inner = self.inner.borrow_mut(); + let mut inner = self.inner.lock(); inner.cmdproc = Some(cmdproc); - let _ = inner.cb_sink.send(Box::new(UI::update_cb)); - } - pub fn set_attachment_state( - &mut self, - state: AttachmentState, - public_internet_ready: bool, - local_network_ready: bool, - ) { - let mut inner = self.inner.borrow_mut(); - inner.ui_state.attachment_state.set(state); - inner - .ui_state - .public_internet_ready - .set(public_internet_ready); - inner.ui_state.local_network_ready.set(local_network_ready); - - let _ = inner.cb_sink.send(Box::new(UI::update_cb)); - } - pub fn set_network_status( - &mut self, - started: bool, - bps_down: u64, - bps_up: u64, - peers: Vec, - ) { - let mut inner = self.inner.borrow_mut(); - inner.ui_state.network_started.set(started); - inner.ui_state.network_down_up.set(( - ((bps_down as f64) / 1000.0f64) as f32, - ((bps_up as f64) / 1000.0f64) as f32, - )); - inner.ui_state.peers_state.set(peers); - let _ = inner.cb_sink.send(Box::new(UI::update_cb)); - } - pub fn set_config(&mut self, config: VeilidConfigInner) { - let mut inner = self.inner.borrow_mut(); - - inner - .ui_state - .node_id - .set(config.network.routing_table.node_id.to_string()); - } - pub fn set_connection_state(&mut self, state: ConnectionState) { - let mut inner = self.inner.borrow_mut(); - inner.ui_state.connection_state.set(state); - let _ = inner.cb_sink.send(Box::new(UI::update_cb)); } - pub fn add_node_event(&self, event: String) { - let inner = self.inner.borrow(); - let color = *inner.log_colors.get(&Level::Info).unwrap(); - let mut starting_style: Style = color.into(); - for line in event.lines() { - let (spanned_string, end_style) = - cursive::utils::markup::ansi::parse_with_starting_style(starting_style, line); - cursive_flexi_logger_view::push_to_log(spanned_string); - starting_style = end_style; - } - let _ = inner.cb_sink.send(Box::new(UI::update_cb)); + // Note: Cursive is not re-entrant, can't borrow_mut self.siv again after this + pub async fn run_async(&mut self) { + self.siv.run_async().await; } + // pub fn run(&mut self) { + // self.siv.run(); + // } +} +#[derive(Clone)] +pub struct UISender { + inner: Arc>, + cb_sink: Sender>, +} + +impl UISender { pub fn display_string_dialog( &self, title: T, @@ -920,31 +869,84 @@ impl UI { ) { let title = title.to_string(); let text = text.to_string(); - let inner = self.inner.borrow(); - let _ = inner.cb_sink.send(Box::new(move |s| { + let _ = self.cb_sink.send(Box::new(move |s| { UI::display_string_dialog_cb(s, title, text, close_cb) })); } pub fn quit(&self) { - let inner = self.inner.borrow(); - let _ = inner.cb_sink.send(Box::new(|s| { + let _ = self.cb_sink.send(Box::new(|s| { s.quit(); })); } pub fn send_callback(&self, callback: UICallback) { - let inner = self.inner.borrow(); - let _ = inner.cb_sink.send(Box::new(move |s| callback(s))); + let _ = self.cb_sink.send(Box::new(move |s| callback(s))); + } + pub fn set_attachment_state( + &mut self, + state: String, + public_internet_ready: bool, + local_network_ready: bool, + ) { + { + let mut inner = self.inner.lock(); + inner.ui_state.attachment_state.set(state); + inner + .ui_state + .public_internet_ready + .set(public_internet_ready); + inner.ui_state.local_network_ready.set(local_network_ready); + } + + let _ = self.cb_sink.send(Box::new(UI::update_cb)); + } + pub fn set_network_status( + &mut self, + started: bool, + bps_down: u64, + bps_up: u64, + peers: Vec, + ) { + { + let mut inner = self.inner.lock(); + inner.ui_state.network_started.set(started); + inner.ui_state.network_down_up.set(( + ((bps_down as f64) / 1000.0f64) as f32, + ((bps_up as f64) / 1000.0f64) as f32, + )); + inner.ui_state.peers_state.set(peers); + } + let _ = self.cb_sink.send(Box::new(UI::update_cb)); + } + pub fn set_config(&mut self, config: &json::JsonValue) { + let mut inner = self.inner.lock(); + + inner + .ui_state + .node_id + .set(config["network"]["routing_table"]["node_id"].to_string()); + } + pub fn set_connection_state(&mut self, state: ConnectionState) { + { + let mut inner = self.inner.lock(); + inner.ui_state.connection_state.set(state); + } + let _ = self.cb_sink.send(Box::new(UI::update_cb)); } - // Note: Cursive is not re-entrant, can't borrow_mut self.siv again after this - pub async fn run_async(&mut self) { - let mut siv = self.siv.borrow_mut(); - siv.run_async().await; + pub fn add_node_event(&self, event: String) { + { + let inner = self.inner.lock(); + let color = *inner.log_colors.get(&Level::Info).unwrap(); + let mut starting_style: Style = color.into(); + for line in event.lines() { + let (spanned_string, end_style) = + cursive::utils::markup::ansi::parse_with_starting_style(starting_style, line); + cursive_flexi_logger_view::push_to_log(spanned_string); + starting_style = end_style; + } + } + let _ = self.cb_sink.send(Box::new(UI::update_cb)); } - // pub fn run(&mut self) { - // let mut siv = self.siv.borrow_mut(); - // siv.run(); - // } } diff --git a/veilid-core/src/intf/native/protected_store.rs b/veilid-core/src/intf/native/protected_store.rs index d8e46918..380a0c34 100644 --- a/veilid-core/src/intf/native/protected_store.rs +++ b/veilid-core/src/intf/native/protected_store.rs @@ -69,7 +69,7 @@ impl ProtectedStore { )); // Ensure permissions are correct - ensure_file_private_owner(&insecure_keyring_file)?; + ensure_file_private_owner(&insecure_keyring_file).map_err(|e| eyre!("{}", e))?; // Open the insecure keyring inner.keyring_manager = Some( diff --git a/veilid-core/src/lib.rs b/veilid-core/src/lib.rs index ff7a311d..f49417d9 100644 --- a/veilid-core/src/lib.rs +++ b/veilid-core/src/lib.rs @@ -41,15 +41,6 @@ pub use self::veilid_config::*; pub use self::veilid_layer_filter::*; pub use veilid_tools as tools; -use enumset::*; -use rkyv::{ - bytecheck, bytecheck::CheckBytes, de::deserializers::SharedDeserializeMap, with::Skip, - Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize, -}; -type RkyvDefaultValidator<'t> = rkyv::validation::validators::DefaultValidator<'t>; -use schemars::{schema_for, JsonSchema}; -use serde::*; - pub mod veilid_capnp { include!(concat!(env!("OUT_DIR"), "/proto/veilid_capnp.rs")); } @@ -94,4 +85,20 @@ pub static DEFAULT_LOG_IGNORE_LIST: [&str; 21] = [ "attohttpc", ]; +use cfg_if::*; +use enumset::*; +use eyre::{bail, eyre, Report as EyreReport, Result as EyreResult, WrapErr}; +use parking_lot::*; +use rkyv::{ + bytecheck, bytecheck::CheckBytes, de::deserializers::SharedDeserializeMap, with::Skip, + Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize, +}; +use tracing::*; use veilid_tools::*; +type RkyvDefaultValidator<'t> = rkyv::validation::validators::DefaultValidator<'t>; +use futures_util::stream::FuturesUnordered; +use owo_colors::OwoColorize; +use schemars::{schema_for, JsonSchema}; +use serde::*; +use stop_token::*; +use thiserror::Error as ThisError; diff --git a/veilid-core/src/network_manager/native/start_protocols.rs b/veilid-core/src/network_manager/native/start_protocols.rs index 36f285e4..76cdaf68 100644 --- a/veilid-core/src/network_manager/native/start_protocols.rs +++ b/veilid-core/src/network_manager/native/start_protocols.rs @@ -212,7 +212,8 @@ impl Network { } else { // If no address is specified, but the port is, use ipv4 and ipv6 unspecified // If the address is specified, only use the specified port and fail otherwise - let sockaddrs = listen_address_to_socket_addrs(&listen_address)?; + let sockaddrs = + listen_address_to_socket_addrs(&listen_address).map_err(|e| eyre!("{}", e))?; if sockaddrs.is_empty() { bail!("No valid listen address: {}", listen_address); } @@ -236,7 +237,8 @@ impl Network { } else { // If no address is specified, but the port is, use ipv4 and ipv6 unspecified // If the address is specified, only use the specified port and fail otherwise - let sockaddrs = listen_address_to_socket_addrs(&listen_address)?; + let sockaddrs = + listen_address_to_socket_addrs(&listen_address).map_err(|e| eyre!("{}", e))?; if sockaddrs.is_empty() { bail!("No valid listen address: {}", listen_address); } diff --git a/veilid-server/src/client_api.rs b/veilid-server/src/client_api.rs index ededea30..e63b5e19 100644 --- a/veilid-server/src/client_api.rs +++ b/veilid-server/src/client_api.rs @@ -167,20 +167,20 @@ impl ClientApi { if args.len() == 0 { apibail_generic!("no control request specified"); } - if args[0] == "shutdown" { + if args[0] == "Shutdown" { if args.len() != 1 { apibail_generic!("wrong number of arguments"); } self.shutdown(); Ok("".to_owned()) - } else if args[0] == "change_log_level" { + } else if args[0] == "ChangeLogLevel" { if args.len() != 3 { apibail_generic!("wrong number of arguments"); } let log_level: VeilidConfigLogLevel = deserialize_json(&args[2])?; self.change_log_level(args[1].clone(), log_level)?; Ok("".to_owned()) - } else if args[0] == "get_server_settings" { + } else if args[0] == "GetServerSettings" { if args.len() != 1 { apibail_generic!("wrong number of arguments"); } @@ -194,7 +194,7 @@ impl ClientApi { settings_json["core"]["protected_store"].remove("new_device_encryption_key_password"); let safe_settings_json = settings_json.to_string(); Ok(safe_settings_json) - } else if args[0] == "emit_schema" { + } else if args[0] == "EmitSchema" { if args.len() != 2 { apibail_generic!("wrong number of arguments"); } diff --git a/veilid-server/src/main.rs b/veilid-server/src/main.rs index b228a68e..e359855a 100644 --- a/veilid-server/src/main.rs +++ b/veilid-server/src/main.rs @@ -14,14 +14,10 @@ mod veilid_logs; #[cfg(windows)] mod windows; -use cfg_if::*; -#[allow(unused_imports)] -use color_eyre::eyre::{bail, ensure, eyre, Result as EyreResult, WrapErr}; use server::*; use std::collections::HashMap; use std::str::FromStr; use tools::*; -use tracing::*; use veilid_logs::*; #[instrument(err)] diff --git a/veilid-server/src/server.rs b/veilid-server/src/server.rs index 2a16f91e..e918fc2f 100644 --- a/veilid-server/src/server.rs +++ b/veilid-server/src/server.rs @@ -1,5 +1,6 @@ use crate::client_api; use crate::settings::*; +use crate::tools::*; use crate::veilid_logs::*; use flume::{unbounded, Receiver, Sender}; use futures_util::select; diff --git a/veilid-server/src/settings.rs b/veilid-server/src/settings.rs index ab7e36ef..d6ee6f3b 100644 --- a/veilid-server/src/settings.rs +++ b/veilid-server/src/settings.rs @@ -2,6 +2,7 @@ use directories::*; +use crate::tools::*; use serde_derive::*; use std::ffi::OsStr; use std::net::SocketAddr; diff --git a/veilid-server/src/tools.rs b/veilid-server/src/tools.rs index 06d87e1d..65f296d5 100644 --- a/veilid-server/src/tools.rs +++ b/veilid-server/src/tools.rs @@ -1,5 +1,8 @@ -use cfg_if::*; -use core::future::Future; +pub use cfg_if::*; +pub use color_eyre::eyre::{bail, ensure, eyre, Result as EyreResult, WrapErr}; +pub use core::future::Future; +pub use parking_lot::*; +pub use tracing::*; cfg_if! { if #[cfg(feature="rt-async-std")] { diff --git a/veilid-tools/src/lib.rs b/veilid-tools/src/lib.rs index 788e143d..253e6c8b 100644 --- a/veilid-tools/src/lib.rs +++ b/veilid-tools/src/lib.rs @@ -28,27 +28,6 @@ mod tools; #[cfg(target_arch = "wasm32")] mod wasm; -pub use cfg_if::*; -#[allow(unused_imports)] -pub use eyre::{bail, ensure, eyre, Report as EyreReport, Result as EyreResult, WrapErr}; -pub use futures_util::future::{select, Either}; -pub use futures_util::select; -pub use futures_util::stream::FuturesUnordered; -pub use futures_util::{AsyncRead, AsyncWrite}; -pub use log_thru::*; -pub use owo_colors::OwoColorize; -pub use parking_lot::*; -pub use split_url::*; -pub use static_assertions::*; -pub use stop_token::*; -pub use thiserror::Error as ThisError; -cfg_if! { - if #[cfg(feature = "tracing")] { - pub use tracing::*; - } else { - pub use log::*; - } -} pub type PinBox = Pin>; pub type PinBoxFuture = PinBox + 'static>; pub type PinBoxFutureLifetime<'a, T> = PinBox + 'a>; @@ -120,6 +99,7 @@ pub use eventual_value_clone::*; pub use interval::*; pub use ip_addr_port::*; pub use ip_extra::*; +pub use log_thru::*; pub use must_join_handle::*; pub use must_join_single_future::*; pub use mutable_future::*; @@ -128,17 +108,32 @@ pub use random::*; pub use single_shot_eventual::*; pub use sleep::*; pub use spawn::*; +pub use split_url::*; pub use tick_task::*; pub use timeout::*; pub use timeout_or::*; pub use timestamp::*; pub use tools::*; + #[cfg(target_arch = "wasm32")] pub use wasm::*; // Tests must be public for wasm-pack tests pub mod tests; +cfg_if! { + if #[cfg(feature = "tracing")] { + use tracing::*; + } else { + use log::*; + } +} +use cfg_if::*; +use futures_util::{AsyncRead, AsyncWrite}; +use parking_lot::*; +use stop_token::*; +use thiserror::Error as ThisError; + // For iOS tests #[no_mangle] pub extern "C" fn main_rs() {} diff --git a/veilid-tools/src/network_result.rs b/veilid-tools/src/network_result.rs index 0ddb3973..437000bc 100644 --- a/veilid-tools/src/network_result.rs +++ b/veilid-tools/src/network_result.rs @@ -337,19 +337,13 @@ macro_rules! network_result_value_or_log { ($r: expr => $f:tt) => { match $r { NetworkResult::Timeout => { - log_network_result!( - "{} at {}@{}:{}", - "Timeout".cyan(), - file!(), - line!(), - column!() - ); + log_network_result!("{} at {}@{}:{}", "Timeout", file!(), line!(), column!()); $f } NetworkResult::ServiceUnavailable => { log_network_result!( "{} at {}@{}:{}", - "ServiceUnavailable".cyan(), + "ServiceUnavailable", file!(), line!(), column!() @@ -359,7 +353,7 @@ macro_rules! network_result_value_or_log { NetworkResult::NoConnection(e) => { log_network_result!( "{}({}) at {}@{}:{}", - "No connection".cyan(), + "No connection", e.to_string(), file!(), line!(), @@ -370,7 +364,7 @@ macro_rules! network_result_value_or_log { NetworkResult::AlreadyExists(e) => { log_network_result!( "{}({}) at {}@{}:{}", - "Already exists".cyan(), + "Already exists", e.to_string(), file!(), line!(), @@ -381,7 +375,7 @@ macro_rules! network_result_value_or_log { NetworkResult::InvalidMessage(s) => { log_network_result!( "{}({}) at {}@{}:{}", - "Invalid message".cyan(), + "Invalid message", s, file!(), line!(), diff --git a/veilid-tools/src/tools.rs b/veilid-tools/src/tools.rs index 4aba1f00..92ce1444 100644 --- a/veilid-tools/src/tools.rs +++ b/veilid-tools/src/tools.rs @@ -97,11 +97,13 @@ cfg_if! { ////////////////////////////////////////////////////////////////////////////////////////////////////////////// -pub fn split_port(name: &str) -> EyreResult<(String, Option)> { +pub fn split_port(name: &str) -> Result<(String, Option), String> { if let Some(split) = name.rfind(':') { let hoststr = &name[0..split]; let portstr = &name[split + 1..]; - let port: u16 = portstr.parse::().wrap_err("invalid port")?; + let port: u16 = portstr + .parse::() + .map_err(|e| format!("invalid port: {}", e))?; Ok((hoststr.to_string(), Some(port))) } else { @@ -130,8 +132,8 @@ pub fn ms_to_us(ms: u32) -> u64 { (ms as u64) * 1000u64 } -pub fn us_to_ms(us: u64) -> EyreResult { - u32::try_from(us / 1000u64).wrap_err("could not convert microseconds") +pub fn us_to_ms(us: u64) -> Result { + u32::try_from(us / 1000u64).map_err(|e| format!("could not convert microseconds: {}", e)) } // Calculate retry attempt with logarhythmic falloff @@ -224,7 +226,7 @@ pub fn compatible_unspecified_socket_addr(socket_addr: &SocketAddr) -> SocketAdd } } -pub fn listen_address_to_socket_addrs(listen_address: &str) -> EyreResult> { +pub fn listen_address_to_socket_addrs(listen_address: &str) -> Result, String> { // If no address is specified, but the port is, use ipv4 and ipv6 unspecified // If the address is specified, only use the specified port and fail otherwise let ip_addrs = vec![ @@ -235,7 +237,7 @@ pub fn listen_address_to_socket_addrs(listen_address: &str) -> EyreResult() - .wrap_err("Invalid port format in udp listen address")?; + .map_err(|e| format!("Invalid port format in udp listen address: {}", e))?; ip_addrs.iter().map(|a| SocketAddr::new(*a, port)).collect() } else if let Ok(port) = listen_address.parse::() { ip_addrs.iter().map(|a| SocketAddr::new(*a, port)).collect() @@ -243,11 +245,11 @@ pub fn listen_address_to_socket_addrs(listen_address: &str) -> EyreResult>(path: P) -> EyreResult<()> + pub fn ensure_file_private_owner>(path: P) -> Result<(), String> { let path = path.as_ref(); if !path.exists() { @@ -286,13 +288,13 @@ cfg_if::cfg_if! { let uid = Uid::effective(); let gid = Gid::effective(); - let meta = std::fs::metadata(path).wrap_err("unable to get metadata for path")?; + let meta = std::fs::metadata(path).map_err(|e| format!("unable to get metadata for path: {}", e))?; if meta.mode() != 0o600 { - std::fs::set_permissions(path,std::fs::Permissions::from_mode(0o600)).wrap_err("unable to set correct permissions on path")?; + std::fs::set_permissions(path,std::fs::Permissions::from_mode(0o600)).map_err(|e| format!("unable to set correct permissions on path: {}", e))?; } if meta.uid() != uid.as_raw() || meta.gid() != gid.as_raw() { - bail!("path has incorrect owner/group"); + return Err("path has incorrect owner/group".to_owned()); } Ok(()) } @@ -300,7 +302,7 @@ cfg_if::cfg_if! { //use std::os::windows::fs::MetadataExt; //use windows_permissions::*; - pub fn ensure_file_private_owner>(path: P) -> EyreResult<()> + pub fn ensure_file_private_owner>(path: P) -> Result<(), String> { let path = path.as_ref(); if !path.exists() {