checkpoint

This commit is contained in:
John Smith 2023-06-08 14:07:09 -04:00
parent 59c14f3b22
commit 419bfcd8ce
19 changed files with 563 additions and 602 deletions

1
Cargo.lock generated
View File

@ -6158,6 +6158,7 @@ dependencies = [
"cursive-flexi-logger-view",
"cursive_buffered_backend",
"cursive_table_view",
"data-encoding",
"directories",
"flexi_logger",
"flume",

View File

@ -41,10 +41,11 @@ flexi_logger = { version = "^0", features = ["use_chrono_for_offset"] }
thiserror = "^1"
crossbeam-channel = "^0"
hex = "^0"
veilid-tools = { path = "../veilid-tools", default-features = false }
veilid-tools = { path = "../veilid-tools" }
json = "^0"
stop-token = { version = "^0", default-features = false }
flume = { version = "^0", features = ["async"] }
data-encoding = { version = "^2" }
[dev-dependencies]
serial_test = "^0"

View File

@ -1,12 +1,12 @@
use crate::command_processor::*;
use crate::tools::*;
use serde::de::DeserializeOwned;
use std::cell::RefCell;
use core::str::FromStr;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::net::SocketAddr;
use std::rc::Rc;
use stop_token::{future::FutureExt as _, StopSource, StopToken};
use std::time::SystemTime;
use stop_token::{future::FutureExt as _, StopSource};
use veilid_tools::*;
cfg_if! {
if #[cfg(feature="rt-async-std")] {
use async_std::io::prelude::BufReadExt;
@ -19,80 +19,41 @@ cfg_if! {
}
}
// fn map_to_internal_error<T: ToString>(e: T) -> VeilidAPIError {
// VeilidAPIError::Internal {
// message: e.to_string(),
// }
// }
// fn decode_api_result<T: DeserializeOwned + fmt::Debug>(
// reader: &api_result::Reader,
// ) -> VeilidAPIResult<T> {
// match reader.which().map_err(map_to_internal_error)? {
// api_result::Which::Ok(v) => {
// let ok_val = v.map_err(map_to_internal_error)?;
// let res: T = veilid_core::deserialize_json(ok_val).map_err(map_to_internal_error)?;
// Ok(res)
// }
// api_result::Which::Err(e) => {
// let err_val = e.map_err(map_to_internal_error)?;
// let res: VeilidAPIError =
// veilid_core::deserialize_json(err_val).map_err(map_to_internal_error)?;
// Err(res)
// }
// }
// }
// struct VeilidClientImpl {
// comproc: CommandProcessor,
// }
// impl VeilidClientImpl {
// pub fn new(comproc: CommandProcessor) -> Self {
// Self { comproc }
// }
// }
// }
struct ClientApiConnectionInner {
comproc: CommandProcessor,
connect_addr: Option<SocketAddr>,
server: Option<flume::Sender<String>>,
request_sender: Option<flume::Sender<String>>,
server_settings: Option<String>,
disconnector: Option<StopSource>,
disconnect_requested: bool,
cancel_eventual: Eventual,
reply_channels: HashMap<u32, flume::Sender<json::JsonValue>>,
next_req_id: u32,
}
type Handle<T> = Rc<RefCell<T>>;
#[derive(Clone)]
pub struct ClientApiConnection {
inner: Handle<ClientApiConnectionInner>,
inner: Arc<Mutex<ClientApiConnectionInner>>,
}
impl ClientApiConnection {
pub fn new(comproc: CommandProcessor) -> Self {
Self {
inner: Rc::new(RefCell::new(ClientApiConnectionInner {
inner: Arc::new(Mutex::new(ClientApiConnectionInner {
comproc,
connect_addr: None,
server: None,
request_sender: None,
server_settings: None,
disconnector: None,
disconnect_requested: false,
cancel_eventual: Eventual::new(),
reply_channels: HashMap::new(),
next_req_id: 0,
})),
}
}
pub fn cancel(&self) {
let eventual = {
let inner = self.inner.borrow();
inner.cancel_eventual.clone()
};
eventual.resolve(); // don't need to await this
pub fn cancel_all(&self) {
let mut inner = self.inner.lock();
inner.reply_channels.clear();
}
// async fn process_veilid_state<'a>(
@ -106,8 +67,33 @@ impl ClientApiConnection {
// Ok(())
// }
async fn process_response(&self, response: json::JsonValue) {
// find the operation id and send the response to the channel for it
let Some(id_str) = response["id"].as_str() else {
error!("missing id: {}", response);
return;
};
let Ok(id) = u32::from_str(id_str) else {
error!("invalid id: {}", response);
return;
};
let reply_channel = {
let mut inner = self.inner.lock();
inner.reply_channels.remove(&id)
};
let Some(reply_channel) = reply_channel else {
warn!("received cancelled reply: {}", response);
return;
};
if let Err(e) = reply_channel.send_async(response).await {
error!("failed to process reply: {}", e);
return;
}
}
async fn process_update(&self, update: json::JsonValue) {
let comproc = self.inner.borrow().comproc.clone();
let comproc = self.inner.lock().comproc.clone();
let Some(kind) = update["kind"].as_str() else {
comproc.log_message(format!("missing update kind: {}", update));
return;
@ -145,7 +131,7 @@ impl ClientApiConnection {
}
// async fn spawn_rpc_system(
// &mut self,
// &self,
// connect_addr: SocketAddr,
// mut rpc_system: RpcSystem<rpc_twoparty_capnp::Side>,
// ) -> Result<(), String> {
@ -235,18 +221,9 @@ impl ClientApiConnection {
// res.map_err(|e| format!("client RPC system error: {}", e))
// }
async fn handle_connection(&mut self, connect_addr: SocketAddr) -> Result<(), String> {
async fn handle_connection(&self, connect_addr: SocketAddr) -> Result<(), String> {
trace!("ClientApiConnection::handle_connection");
let stop_token = {
let stop_source = StopSource::new();
let token = stop_source.token();
let mut inner = self.inner.borrow_mut();
inner.connect_addr = Some(connect_addr);
inner.disconnector = Some(stop_source);
token
};
// Connect the TCP socket
let stream = TcpStream::connect(connect_addr)
.await
@ -255,283 +232,245 @@ impl ClientApiConnection {
// If it succeed, disable nagle algorithm
stream.set_nodelay(true).map_err(map_to_string)?;
// State we connected
let comproc = self.inner.lock().comproc.clone();
comproc.set_connection_state(ConnectionState::Connected(connect_addr, SystemTime::now()));
// Split the stream
cfg_if! {
if #[cfg(feature="rt-async-std")] {
use futures::AsyncReadExt;
let (reader, writer) = stream.split();
let (reader, mut writer) = stream.split();
let mut reader = BufReader::new(reader);
} else if #[cfg(feature="rt-tokio")] {
let (reader, writer) = stream.into_split();
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
}
}
// Requests to send
let (requests_tx, requests_rx) = flume::unbounded();
let stop_token = {
let stop_source = StopSource::new();
let token = stop_source.token();
let mut inner = self.inner.lock();
inner.connect_addr = Some(connect_addr);
inner.disconnector = Some(stop_source);
inner.request_sender = Some(requests_tx);
token
};
// Futures to process unordered
let mut unord = FuturesUnordered::new();
// Process lines
let mut line = String::new();
while let Ok(r) = reader
.read_line(&mut line)
.timeout_at(stop_token.clone())
.await
{
match r {
Ok(size) => {
// Exit on EOF
if size == 0 {
// Disconnected
return Err("Connection closed".to_owned());
}
}
Err(e) => {
let this = self.clone();
let recv_messages_future = async move {
let mut line = String::new();
while let Ok(size) = reader.read_line(&mut line).await {
// Exit on EOF
if size == 0 {
// Disconnected
return Err("Connection lost".to_owned());
break;
}
// Unmarshal json
let j = match json::parse(line.trim()) {
Ok(v) => v,
Err(e) => {
error!("failed to parse server response: {}", e);
continue;
}
};
if j["type"] == "Update" {
this.process_update(j).await;
} else if j["type"] == "Response" {
this.process_response(j).await;
}
}
//
let mut inner = this.inner.lock();
inner.request_sender = None;
};
unord.push(system_boxed(recv_messages_future));
// Unmarshal json
let j = match json::parse(line.trim()) {
Ok(v) => v,
Err(e) => {
error!("failed to parse server response: {}", e);
continue;
// Requests send processor
let send_requests_future = async move {
while let Ok(req) = requests_rx.recv_async().await {
if let Err(e) = writer.write_all(req.as_bytes()).await {
error!("failed to write request: {}", e)
}
};
if j["type"] == "Update" {
self.process_update(j).await;
}
}
};
unord.push(system_boxed(send_requests_future));
// Connection finished
Ok(())
// let rpc_network = Box::new(twoparty::VatNetwork::new(
// reader,
// writer,
// rpc_twoparty_capnp::Side::Client,
// Default::default(),
// ));
// // Create the rpc system
// let rpc_system = RpcSystem::new(rpc_network, None);
// // Process the rpc system until we decide we're done
// match self.spawn_rpc_system(connect_addr, rpc_system).await {
// Ok(()) => {}
// Err(e) => {
// error!("Failed to spawn client RPC system: {}", e);
// }
// }
// Send and receive until we're done or a stop is requested
while let Ok(Some(())) = unord.next().timeout_at(stop_token.clone()).await {}
// // Drop the server and disconnector too (if we still have it)
// let mut inner = self.inner.borrow_mut();
// let disconnect_requested = inner.disconnect_requested;
// inner.server_settings = None;
// inner.server = None;
// inner.disconnector = None;
// inner.disconnect_requested = false;
// inner.connect_addr = None;
let mut inner = self.inner.lock();
let disconnect_requested = inner.disconnect_requested;
inner.server_settings = None;
inner.request_sender = None;
inner.disconnector = None;
inner.disconnect_requested = false;
inner.connect_addr = None;
// Connection finished
if disconnect_requested {
Ok(())
} else {
Err("Connection lost".to_owned())
}
}
// pub fn cancellable<T>(&mut self, p: Promise<T, capnp::Error>) -> Promise<T, capnp::Error>
// where
// T: 'static,
// {
// let (mut cancel_instance, cancel_eventual) = {
// let inner = self.inner.borrow();
// (
// inner.cancel_eventual.instance_empty().fuse(),
// inner.cancel_eventual.clone(),
// )
// };
// let mut p = p.fuse();
async fn perform_request(&self, mut req: json::JsonValue) -> Option<json::JsonValue> {
let (sender, reply_rx) = {
let mut inner = self.inner.lock();
// Promise::from_future(async move {
// let out = select! {
// a = p => {
// a
// },
// _ = cancel_instance => {
// Err(capnp::Error::failed("cancelled".into()))
// }
// };
// drop(cancel_instance);
// cancel_eventual.reset();
// out
// })
// }
// Get the request sender
let Some(sender) = inner.request_sender.clone() else {
error!("dropping request, not connected");
return None;
};
pub async fn server_attach(&mut self) -> Result<(), String> {
// Get next id
let id = inner.next_req_id;
inner.next_req_id += 1;
// Add the id
req["id"] = id.into();
// Make a reply receiver
let (reply_tx, reply_rx) = flume::bounded(1);
inner.reply_channels.insert(id, reply_tx);
(sender, reply_rx)
};
// Send the request
let req_ndjson = req.dump() + "\n";
if let Err(e) = sender.send_async(req_ndjson).await {
error!("failed to send request: {}", e);
return None;
}
// Wait for the reply
let Ok(r) = reply_rx.recv_async().await else {
// Cancelled
return None;
};
Some(r)
}
pub async fn server_attach(&self) -> Result<(), String> {
trace!("ClientApiConnection::server_attach");
let server = {
let inner = self.inner.borrow();
inner
.server
.as_ref()
.ok_or_else(|| "Not connected, ignoring attach request".to_owned())?
.clone()
let mut req = json::JsonValue::new_object();
req["op"] = "Attach".into();
let Some(resp) = self.perform_request(req).await else {
return Err("Cancelled".to_owned());
};
let request = server.borrow().attach_request();
let response = self
.cancellable(request.send().promise)
.await
.map_err(map_to_string)?;
let reader = response
.get()
.map_err(map_to_string)?
.get_result()
.map_err(map_to_string)?;
let res: VeilidAPIResult<()> = decode_api_result(&reader);
res.map_err(map_to_string)
if resp.has_key("error") {
return Err(resp["error"].to_string());
}
Ok(())
}
pub async fn server_detach(&mut self) -> Result<(), String> {
pub async fn server_detach(&self) -> Result<(), String> {
trace!("ClientApiConnection::server_detach");
let server = {
let inner = self.inner.borrow();
inner
.server
.as_ref()
.ok_or_else(|| "Not connected, ignoring detach request".to_owned())?
.clone()
let mut req = json::JsonValue::new_object();
req["op"] = "Detach".into();
let Some(resp) = self.perform_request(req).await else {
return Err("Cancelled".to_owned());
};
let request = server.borrow().detach_request();
let response = self
.cancellable(request.send().promise)
.await
.map_err(map_to_string)?;
let reader = response
.get()
.map_err(map_to_string)?
.get_result()
.map_err(map_to_string)?;
let res: VeilidAPIResult<()> = decode_api_result(&reader);
res.map_err(map_to_string)
if resp.has_key("error") {
return Err(resp["error"].to_string());
}
Ok(())
}
pub async fn server_shutdown(&mut self) -> Result<(), String> {
pub async fn server_shutdown(&self) -> Result<(), String> {
trace!("ClientApiConnection::server_shutdown");
let server = {
let inner = self.inner.borrow();
inner
.server
.as_ref()
.ok_or_else(|| "Not connected, ignoring attach request".to_owned())?
.clone()
let mut req = json::JsonValue::new_object();
req["op"] = "Control".into();
req["args"] = json::JsonValue::new_array();
req["args"].push("Shutdown").unwrap();
let Some(resp) = self.perform_request(req).await else {
return Err("Cancelled".to_owned());
};
let request = server.borrow().shutdown_request();
let response = self
.cancellable(request.send().promise)
.await
.map_err(map_to_string)?;
response.get().map(drop).map_err(map_to_string)
if resp.has_key("error") {
return Err(resp["error"].to_string());
}
Ok(())
}
pub async fn server_debug(&mut self, what: String) -> Result<String, String> {
pub async fn server_debug(&self, what: String) -> Result<String, String> {
trace!("ClientApiConnection::server_debug");
let server = {
let inner = self.inner.borrow();
inner
.server
.as_ref()
.ok_or_else(|| "Not connected, ignoring debug request".to_owned())?
.clone()
let mut req = json::JsonValue::new_object();
req["op"] = "Debug".into();
req["command"] = what.into();
let Some(resp) = self.perform_request(req).await else {
return Err("Cancelled".to_owned());
};
let mut request = server.borrow().debug_request();
request.get().set_command(&what);
let response = self
.cancellable(request.send().promise)
.await
.map_err(map_to_string)?;
let reader = response
.get()
.map_err(map_to_string)?
.get_result()
.map_err(map_to_string)?;
let res: VeilidAPIResult<String> = decode_api_result(&reader);
res.map_err(map_to_string)
if resp.has_key("error") {
return Err(resp["error"].to_string());
}
Ok(resp["value"].to_string())
}
pub async fn server_change_log_level(
&mut self,
&self,
layer: String,
log_level: VeilidConfigLogLevel,
log_level: String,
) -> Result<(), String> {
trace!("ClientApiConnection::change_log_level");
let server = {
let inner = self.inner.borrow();
inner
.server
.as_ref()
.ok_or_else(|| "Not connected, ignoring change_log_level request".to_owned())?
.clone()
let mut req = json::JsonValue::new_object();
req["op"] = "Control".into();
req["args"] = json::JsonValue::new_array();
req["args"].push("ChangeLogLevel").unwrap();
req["args"].push(layer).unwrap();
req["args"].push(log_level).unwrap();
let Some(resp) = self.perform_request(req).await else {
return Err("Cancelled".to_owned());
};
let mut request = server.borrow().change_log_level_request();
request.get().set_layer(&layer);
let log_level_json = veilid_core::serialize_json(&log_level);
request.get().set_log_level(&log_level_json);
let response = self
.cancellable(request.send().promise)
.await
.map_err(map_to_string)?;
let reader = response
.get()
.map_err(map_to_string)?
.get_result()
.map_err(map_to_string)?;
let res: VeilidAPIResult<()> = decode_api_result(&reader);
res.map_err(map_to_string)
if resp.has_key("error") {
return Err(resp["error"].to_string());
}
Ok(())
}
pub async fn server_appcall_reply(
&mut self,
id: OperationId,
msg: Vec<u8>,
) -> Result<(), String> {
pub async fn server_appcall_reply(&self, id: u64, msg: Vec<u8>) -> Result<(), String> {
trace!("ClientApiConnection::appcall_reply");
let server = {
let inner = self.inner.borrow();
inner
.server
.as_ref()
.ok_or_else(|| "Not connected, ignoring change_log_level request".to_owned())?
.clone()
let mut req = json::JsonValue::new_object();
req["op"] = "AppCallReply".into();
req["call_id"] = id.to_string().into();
req["message"] = data_encoding::BASE64URL_NOPAD.encode(&msg).into();
let Some(resp) = self.perform_request(req).await else {
return Err("Cancelled".to_owned());
};
let mut request = server.borrow().app_call_reply_request();
request.get().set_id(id.as_u64());
request.get().set_message(&msg);
let response = self
.cancellable(request.send().promise)
.await
.map_err(map_to_string)?;
let reader = response
.get()
.map_err(map_to_string)?
.get_result()
.map_err(map_to_string)?;
let res: VeilidAPIResult<()> = decode_api_result(&reader);
res.map_err(map_to_string)
if resp.has_key("error") {
return Err(resp["error"].to_string());
}
Ok(())
}
// Start Client API connection
pub async fn connect(&mut self, connect_addr: SocketAddr) -> Result<(), String> {
pub async fn connect(&self, connect_addr: SocketAddr) -> Result<(), String> {
trace!("ClientApiConnection::connect");
// Save the address to connect to
self.handle_connection(connect_addr).await
}
// End Client API connection
pub async fn disconnect(&mut self) {
pub async fn disconnect(&self) {
trace!("ClientApiConnection::disconnect");
let disconnector = self.inner.borrow_mut().disconnector.take();
match disconnector {
Some(d) => {
self.inner.borrow_mut().disconnect_requested = true;
d.await.unwrap();
}
None => {
debug!("disconnector doesn't exist");
}
let mut inner = self.inner.lock();
if inner.disconnector.is_some() {
inner.disconnector = None;
inner.disconnect_requested = true;
}
}
}

View File

@ -1,20 +1,19 @@
use crate::client_api_connection::*;
use crate::settings::Settings;
use crate::tools::*;
use crate::ui::*;
use std::cell::*;
use std::net::SocketAddr;
use std::rc::Rc;
use std::time::SystemTime;
use veilid_tools::*;
pub fn convert_loglevel(s: &str) -> Result<VeilidConfigLogLevel, String> {
pub fn convert_loglevel(s: &str) -> Result<String, String> {
match s.to_ascii_lowercase().as_str() {
"off" => Ok(VeilidConfigLogLevel::Off),
"error" => Ok(VeilidConfigLogLevel::Error),
"warn" => Ok(VeilidConfigLogLevel::Warn),
"info" => Ok(VeilidConfigLogLevel::Info),
"debug" => Ok(VeilidConfigLogLevel::Debug),
"trace" => Ok(VeilidConfigLogLevel::Trace),
"off" => Ok("Off".to_owned()),
"error" => Ok("Error".to_owned()),
"warn" => Ok("Warn".to_owned()),
"info" => Ok("Info".to_owned()),
"debug" => Ok("Debug".to_owned()),
"trace" => Ok("Trace".to_owned()),
_ => Err(format!("Invalid log level: {}", s)),
}
}
@ -38,7 +37,7 @@ impl ConnectionState {
}
struct CommandProcessorInner {
ui: UI,
ui_sender: UISender,
capi: Option<ClientApiConnection>,
reconnect: bool,
finished: bool,
@ -46,21 +45,19 @@ struct CommandProcessorInner {
autoreconnect: bool,
server_addr: Option<SocketAddr>,
connection_waker: Eventual,
last_call_id: Option<OperationId>,
last_call_id: Option<u64>,
}
type Handle<T> = Rc<RefCell<T>>;
#[derive(Clone)]
pub struct CommandProcessor {
inner: Handle<CommandProcessorInner>,
inner: Arc<Mutex<CommandProcessorInner>>,
}
impl CommandProcessor {
pub fn new(ui: UI, settings: &Settings) -> Self {
pub fn new(ui_sender: UISender, settings: &Settings) -> Self {
Self {
inner: Rc::new(RefCell::new(CommandProcessorInner {
ui,
inner: Arc::new(Mutex::new(CommandProcessorInner {
ui_sender,
capi: None,
reconnect: settings.autoreconnect,
finished: false,
@ -72,20 +69,20 @@ impl CommandProcessor {
})),
}
}
pub fn set_client_api_connection(&mut self, capi: ClientApiConnection) {
self.inner.borrow_mut().capi = Some(capi);
pub fn set_client_api_connection(&self, capi: ClientApiConnection) {
self.inner.lock().capi = Some(capi);
}
fn inner(&self) -> Ref<CommandProcessorInner> {
self.inner.borrow()
fn inner(&self) -> MutexGuard<CommandProcessorInner> {
self.inner.lock()
}
fn inner_mut(&self) -> RefMut<CommandProcessorInner> {
self.inner.borrow_mut()
fn inner_mut(&self) -> MutexGuard<CommandProcessorInner> {
self.inner.lock()
}
fn ui(&self) -> UI {
self.inner.borrow().ui.clone()
fn ui_sender(&self) -> UISender {
self.inner.lock().ui_sender.clone()
}
fn capi(&self) -> ClientApiConnection {
self.inner.borrow().capi.as_ref().unwrap().clone()
self.inner.lock().capi.as_ref().unwrap().clone()
}
fn word_split(line: &str) -> (String, Option<String>) {
@ -102,12 +99,12 @@ impl CommandProcessor {
pub fn cancel_command(&self) {
trace!("CommandProcessor::cancel_command");
let capi = self.capi();
capi.cancel();
capi.cancel_all();
}
pub fn cmd_help(&self, _rest: Option<String>, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_help");
self.ui().add_node_event(
self.ui_sender().add_node_event(
r#"Commands:
exit/quit - exit the client
disconnect - disconnect the client from the Veilid node
@ -120,14 +117,14 @@ reply - reply to an AppCall not handled directly by the server
"#
.to_owned(),
);
let ui = self.ui();
let ui = self.ui_sender();
ui.send_callback(callback);
Ok(())
}
pub fn cmd_exit(&self, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_exit");
let ui = self.ui();
let ui = self.ui_sender();
ui.send_callback(callback);
ui.quit();
Ok(())
@ -135,8 +132,8 @@ reply - reply to an AppCall not handled directly by the server
pub fn cmd_shutdown(&self, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_shutdown");
let mut capi = self.capi();
let ui = self.ui();
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
if let Err(e) = capi.server_shutdown().await {
error!("Server command 'shutdown' failed to execute: {}", e);
@ -148,8 +145,8 @@ reply - reply to an AppCall not handled directly by the server
pub fn cmd_attach(&self, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_attach");
let mut capi = self.capi();
let ui = self.ui();
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
if let Err(e) = capi.server_attach().await {
error!("Server command 'attach' failed: {}", e);
@ -161,8 +158,8 @@ reply - reply to an AppCall not handled directly by the server
pub fn cmd_detach(&self, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_detach");
let mut capi = self.capi();
let ui = self.ui();
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
if let Err(e) = capi.server_detach().await {
error!("Server command 'detach' failed: {}", e);
@ -174,8 +171,8 @@ reply - reply to an AppCall not handled directly by the server
pub fn cmd_disconnect(&self, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_disconnect");
let mut capi = self.capi();
let ui = self.ui();
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
capi.disconnect().await;
ui.send_callback(callback);
@ -185,8 +182,8 @@ reply - reply to an AppCall not handled directly by the server
pub fn cmd_debug(&self, rest: Option<String>, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_debug");
let mut capi = self.capi();
let ui = self.ui();
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
match capi.server_debug(rest.unwrap_or_default()).await {
Ok(output) => ui.display_string_dialog("Debug Output", output, callback),
@ -202,8 +199,8 @@ reply - reply to an AppCall not handled directly by the server
callback: UICallback,
) -> Result<(), String> {
trace!("CommandProcessor::cmd_change_log_level");
let mut capi = self.capi();
let ui = self.ui();
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
let (layer, rest) = Self::word_split(&rest.unwrap_or_default());
let log_level = match convert_loglevel(&rest.unwrap_or_default()) {
@ -234,8 +231,8 @@ reply - reply to an AppCall not handled directly by the server
pub fn cmd_reply(&self, rest: Option<String>, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_reply");
let mut capi = self.capi();
let ui = self.ui();
let capi = self.capi();
let ui = self.ui_sender();
let some_last_id = self.inner_mut().last_call_id.take();
spawn_detached_local(async move {
let (first, second) = Self::word_split(&rest.clone().unwrap_or_default());
@ -248,7 +245,7 @@ reply - reply to an AppCall not handled directly by the server
}
Ok(v) => v,
};
(OperationId::new(id), second)
(id, second)
} else {
let id = match some_last_id {
None => {
@ -306,14 +303,14 @@ reply - reply to an AppCall not handled directly by the server
"change_log_level" => self.cmd_change_log_level(rest, callback),
"reply" => self.cmd_reply(rest, callback),
_ => {
let ui = self.ui();
let ui = self.ui_sender();
ui.send_callback(callback);
Err(format!("Invalid command: {}", cmd))
}
}
}
pub async fn connection_manager(&mut self) {
pub async fn connection_manager(&self) {
// Connect until we're done
while !self.inner_mut().finished {
// Wait for connection request
@ -341,7 +338,7 @@ reply - reply to an AppCall not handled directly by the server
} else {
debug!("Retrying connection to {}", server_addr);
}
let mut capi = self.capi();
let capi = self.capi();
let res = capi.connect(server_addr).await;
if res.is_ok() {
info!(
@ -376,7 +373,7 @@ reply - reply to an AppCall not handled directly by the server
// called by ui
////////////////////////////////////////////
pub fn set_server_address(&mut self, server_addr: Option<SocketAddr>) {
pub fn set_server_address(&self, server_addr: Option<SocketAddr>) {
self.inner_mut().server_addr = server_addr;
}
pub fn get_server_address(&self) -> Option<SocketAddr> {
@ -386,54 +383,61 @@ reply - reply to an AppCall not handled directly by the server
// calls into ui
////////////////////////////////////////////
pub fn log_message(&mut self, message: String) {
self.inner().ui.add_node_event(message);
pub fn log_message(&self, message: String) {
self.inner().ui_sender.add_node_event(message);
}
pub fn update_attachment(&mut self, attachment: json::JsonValue) {
self.inner_mut().ui.set_attachment_state(
attachment.state,
attachment.public_internet_ready,
attachment.local_network_ready,
pub fn update_attachment(&self, attachment: json::JsonValue) {
self.inner_mut().ui_sender.set_attachment_state(
attachment["state"].as_str().unwrap_or_default().to_owned(),
attachment["public_internet_ready"]
.as_bool()
.unwrap_or_default(),
attachment["local_network_ready"]
.as_bool()
.unwrap_or_default(),
);
}
pub fn update_network_status(&mut self, network: veilid_core::VeilidStateNetwork) {
self.inner_mut().ui.set_network_status(
network.started,
network.bps_down.as_u64(),
network.bps_up.as_u64(),
network.peers,
pub fn update_network_status(&self, network: json::JsonValue) {
self.inner_mut().ui_sender.set_network_status(
network["started"].as_bool().unwrap_or_default(),
json_str_u64(&network["bps_down"]),
json_str_u64(&network["bps_up"]),
network["peers"]
.members()
.cloned()
.collect::<Vec<json::JsonValue>>(),
);
}
pub fn update_config(&mut self, config: veilid_core::VeilidStateConfig) {
self.inner_mut().ui.set_config(config.config)
pub fn update_config(&self, config: json::JsonValue) {
self.inner_mut().ui_sender.set_config(&config["config"])
}
pub fn update_route(&mut self, route: veilid_core::VeilidRouteChange) {
pub fn update_route(&self, route: json::JsonValue) {
let mut out = String::new();
if !route.dead_routes.is_empty() {
out.push_str(&format!("Dead routes: {:?}", route.dead_routes));
if route["dead_routes"].len() != 0 {
out.push_str(&format!("Dead routes: {:?}", route["dead_routes"]));
}
if !route.dead_remote_routes.is_empty() {
if route["dead_routes"].len() != 0 {
if !out.is_empty() {
out.push_str("\n");
}
out.push_str(&format!(
"Dead remote routes: {:?}",
route.dead_remote_routes
route["dead_remote_routes"]
));
}
if !out.is_empty() {
self.inner().ui.add_node_event(out);
self.inner().ui_sender.add_node_event(out);
}
}
pub fn update_value_change(&mut self, value_change: json::JsonValue) {
pub fn update_value_change(&self, value_change: json::JsonValue) {
let out = format!("Value change: {:?}", value_change.as_str().unwrap_or("???"));
self.inner().ui.add_node_event(out);
self.inner().ui_sender.add_node_event(out);
}
pub fn update_log(&mut self, log: json::JsonValue) {
self.inner().ui.add_node_event(format!(
pub fn update_log(&self, log: json::JsonValue) {
self.inner().ui_sender.add_node_event(format!(
"{}: {}{}",
log["log_level"].as_str().unwrap_or("???"),
log["message"].as_str().unwrap_or("???"),
@ -445,79 +449,83 @@ reply - reply to an AppCall not handled directly by the server
));
}
pub fn update_app_message(&mut self, msg: json::JsonValue) {
pub fn update_app_message(&self, msg: json::JsonValue) {
let message = json_str_vec_u8(&msg["message"]);
// check is message body is ascii printable
let mut printable = true;
for c in msg.message() {
for c in &message {
if *c < 32 || *c > 126 {
printable = false;
}
}
let strmsg = if printable {
String::from_utf8_lossy(msg.message()).to_string()
String::from_utf8_lossy(&message).to_string()
} else {
hex::encode(msg.message())
hex::encode(message)
};
self.inner()
.ui
.add_node_event(format!("AppMessage ({:?}): {}", msg.sender(), strmsg));
.ui_sender
.add_node_event(format!("AppMessage ({:?}): {}", msg["sender"], strmsg));
}
pub fn update_app_call(&mut self, call: veilid_core::VeilidAppCall) {
pub fn update_app_call(&self, call: json::JsonValue) {
let message = json_str_vec_u8(&call["message"]);
// check is message body is ascii printable
let mut printable = true;
for c in call.message() {
for c in &message {
if *c < 32 || *c > 126 {
printable = false;
}
}
let strmsg = if printable {
String::from_utf8_lossy(call.message()).to_string()
String::from_utf8_lossy(&message).to_string()
} else {
format!("#{}", hex::encode(call.message()))
format!("#{}", hex::encode(&message))
};
self.inner().ui.add_node_event(format!(
let id = json_str_u64(&call["id"]);
self.inner().ui_sender.add_node_event(format!(
"AppCall ({:?}) id = {:016x} : {}",
call.sender(),
call.id().as_u64(),
strmsg
call["sender"], id, strmsg
));
self.inner_mut().last_call_id = Some(call.id());
self.inner_mut().last_call_id = Some(id);
}
pub fn update_shutdown(&mut self) {
pub fn update_shutdown(&self) {
// Do nothing with this, we'll process shutdown when rpc connection closes
}
// called by client_api_connection
// calls into ui
////////////////////////////////////////////
pub fn set_connection_state(&mut self, state: ConnectionState) {
self.inner_mut().ui.set_connection_state(state);
pub fn set_connection_state(&self, state: ConnectionState) {
self.inner_mut().ui_sender.set_connection_state(state);
}
// called by ui
////////////////////////////////////////////
pub fn start_connection(&mut self) {
pub fn start_connection(&self) {
self.inner_mut().reconnect = true;
self.inner_mut().connection_waker.resolve();
}
// pub fn stop_connection(&mut self) {
// pub fn stop_connection(&self) {
// self.inner_mut().reconnect = false;
// let mut capi = self.capi().clone();
// spawn_detached(async move {
// capi.disconnect().await;
// });
// }
pub fn cancel_reconnect(&mut self) {
pub fn cancel_reconnect(&self) {
self.inner_mut().reconnect = false;
self.inner_mut().connection_waker.resolve();
}
pub fn quit(&mut self) {
pub fn quit(&self) {
self.inner_mut().finished = true;
self.inner_mut().reconnect = false;
self.inner_mut().connection_waker.resolve();
@ -526,8 +534,8 @@ reply - reply to an AppCall not handled directly by the server
// called by ui
// calls into client_api_connection
////////////////////////////////////////////
pub fn attach(&mut self) {
let mut capi = self.capi();
pub fn attach(&self) {
let capi = self.capi();
spawn_detached_local(async move {
if let Err(e) = capi.server_attach().await {
@ -536,8 +544,8 @@ reply - reply to an AppCall not handled directly by the server
});
}
pub fn detach(&mut self) {
let mut capi = self.capi();
pub fn detach(&self) {
let capi = self.capi();
spawn_detached_local(async move {
if let Err(e) = capi.server_detach().await {

View File

@ -3,7 +3,6 @@
#![recursion_limit = "256"]
use crate::tools::*;
use veilid_tools::*;
use clap::{Arg, ColorChoice, Command};
use flexi_logger::*;
@ -92,7 +91,7 @@ fn main() -> Result<(), String> {
}
// Create UI object
let mut sivui = ui::UI::new(settings.interface.node_log.scrollback, &settings);
let (mut sivui, uisender) = ui::UI::new(settings.interface.node_log.scrollback, &settings);
// Set up loggers
{
@ -155,19 +154,19 @@ fn main() -> Result<(), String> {
// Create command processor
debug!("Creating Command Processor ");
let mut comproc = command_processor::CommandProcessor::new(sivui.clone(), &settings);
let comproc = command_processor::CommandProcessor::new(uisender, &settings);
sivui.set_command_processor(comproc.clone());
// Create client api client side
info!("Starting API connection");
let mut capi = client_api_connection::ClientApiConnection::new(comproc.clone());
let capi = client_api_connection::ClientApiConnection::new(comproc.clone());
// Save client api in command processor
comproc.set_client_api_connection(capi.clone());
// Keep a connection to the server
comproc.set_server_address(server_addr);
let mut comproc2 = comproc.clone();
let comproc2 = comproc.clone();
let connection_future = comproc.connection_manager();
// Start async

View File

@ -23,8 +23,11 @@ pub enum PeerTableColumn {
// }
// }
fn format_ts(ts: Timestamp) -> String {
let ts = ts.as_u64();
fn format_ts(ts: &json::JsonValue) -> String {
if ts.is_null() {
return "---".to_owned();
}
let ts = json_str_u64(ts);
let secs = timestamp_to_secs(ts);
if secs >= 1.0 {
format!("{:.2}s", timestamp_to_secs(ts))
@ -33,8 +36,11 @@ fn format_ts(ts: Timestamp) -> String {
}
}
fn format_bps(bps: ByteCount) -> String {
let bps = bps.as_u64();
fn format_bps(bps: &json::JsonValue) -> String {
if bps.is_null() {
return "---".to_owned();
}
let bps = json_str_u64(bps);
if bps >= 1024u64 * 1024u64 * 1024u64 {
format!("{:.2}GB/s", (bps / (1024u64 * 1024u64)) as f64 / 1024.0)
} else if bps >= 1024u64 * 1024u64 {
@ -46,25 +52,20 @@ fn format_bps(bps: ByteCount) -> String {
}
}
impl TableViewItem<PeerTableColumn> for PeerTableData {
impl TableViewItem<PeerTableColumn> for json::JsonValue {
fn to_column(&self, column: PeerTableColumn) -> String {
match column {
PeerTableColumn::NodeId => self
.node_ids
.first()
.map(|n| n.to_string())
.unwrap_or_else(|| "???".to_owned()),
PeerTableColumn::Address => self.peer_address.clone(),
PeerTableColumn::LatencyAvg => format!(
"{}",
self.peer_stats
.latency
.as_ref()
.map(|l| format_ts(l.average))
.unwrap_or("---".to_owned())
),
PeerTableColumn::TransferDownAvg => format_bps(self.peer_stats.transfer.down.average),
PeerTableColumn::TransferUpAvg => format_bps(self.peer_stats.transfer.up.average),
PeerTableColumn::NodeId => self["node_ids"][0].to_string(),
PeerTableColumn::Address => self["peer_address"].to_string(),
PeerTableColumn::LatencyAvg => {
format!("{}", format_ts(&self["peer_stats"]["latency"]["average"]))
}
PeerTableColumn::TransferDownAvg => {
format_bps(&self["peer_stats"]["transfer"]["down"]["average"])
}
PeerTableColumn::TransferUpAvg => {
format_bps(&self["peer_stats"]["transfer"]["up"]["average"])
}
}
}
@ -75,26 +76,20 @@ impl TableViewItem<PeerTableColumn> for PeerTableData {
match column {
PeerTableColumn::NodeId => self.to_column(column).cmp(&other.to_column(column)),
PeerTableColumn::Address => self.to_column(column).cmp(&other.to_column(column)),
PeerTableColumn::LatencyAvg => self
.peer_stats
.latency
.as_ref()
.map(|l| l.average)
.cmp(&other.peer_stats.latency.as_ref().map(|l| l.average)),
PeerTableColumn::TransferDownAvg => self
.peer_stats
.transfer
.down
.average
.cmp(&other.peer_stats.transfer.down.average),
PeerTableColumn::TransferUpAvg => self
.peer_stats
.transfer
.up
.average
.cmp(&other.peer_stats.transfer.up.average),
PeerTableColumn::LatencyAvg => json_str_u64(&self["peer_stats"]["latency"]["average"])
.cmp(&json_str_u64(&other["peer_stats"]["latency"]["average"])),
PeerTableColumn::TransferDownAvg => {
json_str_u64(&self["peer_stats"]["transfer"]["down"]["average"]).cmp(&json_str_u64(
&other["peer_stats"]["transfer"]["down"]["average"],
))
}
PeerTableColumn::TransferUpAvg => {
json_str_u64(&self["peer_stats"]["transfer"]["up"]["average"]).cmp(&json_str_u64(
&other["peer_stats"]["transfer"]["up"]["average"],
))
}
}
}
}
pub type PeersTableView = TableView<PeerTableData, PeerTableColumn>;
pub type PeersTableView = TableView<json::JsonValue, PeerTableColumn>;

View File

@ -1,5 +1,10 @@
use cfg_if::*;
pub use cfg_if::*;
pub use log::*;
pub use parking_lot::*;
pub use veilid_tools::*;
use core::future::Future;
use core::str::FromStr;
cfg_if! {
if #[cfg(feature="rt-async-std")] {
@ -17,3 +22,13 @@ cfg_if! {
}
}
pub fn json_str_u64(value: &json::JsonValue) -> u64 {
u64::from_str(value.as_str().unwrap_or_default()).unwrap_or_default()
}
pub fn json_str_vec_u8(value: &json::JsonValue) -> Vec<u8> {
data_encoding::BASE64URL_NOPAD
.decode(value.as_str().unwrap_or_default().as_bytes())
.unwrap_or_default()
}

View File

@ -1,6 +1,7 @@
use crate::command_processor::*;
use crate::peers_table_view::*;
use crate::settings::Settings;
use crate::tools::*;
use crossbeam_channel::Sender;
use cursive::align::*;
use cursive::event::*;
@ -12,11 +13,8 @@ use cursive::Cursive;
use cursive::CursiveRunnable;
use cursive_flexi_logger_view::{CursiveLogWriter, FlexiLoggerView};
//use cursive_multiplex::*;
use std::cell::RefCell;
use std::collections::{HashMap, VecDeque};
use std::rc::Rc;
use thiserror::Error;
use veilid_tools::*;
//////////////////////////////////////////////////////////////
///
@ -82,19 +80,15 @@ pub struct UIInner {
ui_state: UIState,
log_colors: HashMap<Level, cursive::theme::Color>,
cmdproc: Option<CommandProcessor>,
cb_sink: Sender<Box<dyn FnOnce(&mut Cursive) + 'static + Send>>,
cmd_history: VecDeque<String>,
cmd_history_position: usize,
cmd_history_max_size: usize,
connection_dialog_state: Option<ConnectionState>,
}
type Handle<T> = Rc<RefCell<T>>;
#[derive(Clone)]
pub struct UI {
siv: Handle<CursiveRunnable>,
inner: Handle<UIInner>,
siv: CursiveRunnable,
inner: Arc<Mutex<UIInner>>,
}
#[derive(Error, Debug)]
@ -112,11 +106,11 @@ impl UI {
inner.cmdproc.as_ref().unwrap().clone()
}
fn inner(s: &mut Cursive) -> std::cell::Ref<'_, UIInner> {
s.user_data::<Handle<UIInner>>().unwrap().borrow()
fn inner(s: &mut Cursive) -> MutexGuard<'_, UIInner> {
s.user_data::<Arc<Mutex<UIInner>>>().unwrap().lock()
}
fn inner_mut(s: &mut Cursive) -> std::cell::RefMut<'_, UIInner> {
s.user_data::<Handle<UIInner>>().unwrap().borrow_mut()
fn inner_mut(s: &mut Cursive) -> MutexGuard<'_, UIInner> {
s.user_data::<Arc<Mutex<UIInner>>>().unwrap().lock()
}
fn setup_colors(siv: &mut CursiveRunnable, inner: &mut UIInner, settings: &Settings) {
@ -425,7 +419,7 @@ impl UI {
"Detaching" => None,
_ => None,
};
let mut cmdproc = Self::command_processor(s);
let cmdproc = Self::command_processor(s);
if let Some(a) = action {
if a {
cmdproc.attach();
@ -707,7 +701,7 @@ impl UI {
////////////////////////////////////////////////////////////////////////////
// Public functions
pub fn new(node_log_scrollback: usize, settings: &Settings) -> Self {
pub fn new(node_log_scrollback: usize, settings: &Settings) -> (Self, UISender) {
cursive_flexi_logger_view::resize(node_log_scrollback);
// Instantiate the cursive runnable
@ -726,9 +720,9 @@ impl UI {
let cb_sink = runnable.cb_sink().clone();
// Create the UI object
let this = Self {
siv: Rc::new(RefCell::new(runnable)),
inner: Rc::new(RefCell::new(UIInner {
let mut this = Self {
siv: runnable,
inner: Arc::new(Mutex::new(UIInner {
ui_state: UIState::new(),
log_colors: Default::default(),
cmdproc: None,
@ -740,15 +734,13 @@ impl UI {
cmd_history_position: 0,
cmd_history_max_size: settings.interface.command_line.history_size,
connection_dialog_state: None,
cb_sink,
})),
};
let mut siv = this.siv.borrow_mut();
let mut inner = this.inner.borrow_mut();
let mut inner = this.inner.lock();
// Make the inner object accessible in callbacks easily
siv.set_user_data(this.inner.clone());
this.siv.set_user_data(this.inner.clone());
// Create layouts
@ -831,87 +823,44 @@ impl UI {
.child(TextView::new(version)),
);
siv.add_fullscreen_layer(mainlayout);
this.siv.add_fullscreen_layer(mainlayout);
UI::setup_colors(&mut siv, &mut inner, settings);
UI::setup_quit_handler(&mut siv);
siv.set_global_callback(cursive::event::Event::CtrlChar('k'), UI::clear_handler);
UI::setup_colors(&mut this.siv, &mut inner, settings);
UI::setup_quit_handler(&mut this.siv);
this.siv
.set_global_callback(cursive::event::Event::CtrlChar('k'), UI::clear_handler);
drop(inner);
drop(siv);
this
let inner = this.inner.clone();
(this, UISender { inner, cb_sink })
}
pub fn cursive_flexi_logger(&self) -> Box<CursiveLogWriter> {
let mut flv =
cursive_flexi_logger_view::cursive_flexi_logger(self.siv.borrow().cb_sink().clone());
flv.set_colors(self.inner.borrow().log_colors.clone());
let mut flv = cursive_flexi_logger_view::cursive_flexi_logger(self.siv.cb_sink().clone());
flv.set_colors(self.inner.lock().log_colors.clone());
flv
}
pub fn set_command_processor(&mut self, cmdproc: CommandProcessor) {
let mut inner = self.inner.borrow_mut();
let mut inner = self.inner.lock();
inner.cmdproc = Some(cmdproc);
let _ = inner.cb_sink.send(Box::new(UI::update_cb));
}
pub fn set_attachment_state(
&mut self,
state: AttachmentState,
public_internet_ready: bool,
local_network_ready: bool,
) {
let mut inner = self.inner.borrow_mut();
inner.ui_state.attachment_state.set(state);
inner
.ui_state
.public_internet_ready
.set(public_internet_ready);
inner.ui_state.local_network_ready.set(local_network_ready);
let _ = inner.cb_sink.send(Box::new(UI::update_cb));
}
pub fn set_network_status(
&mut self,
started: bool,
bps_down: u64,
bps_up: u64,
peers: Vec<PeerTableData>,
) {
let mut inner = self.inner.borrow_mut();
inner.ui_state.network_started.set(started);
inner.ui_state.network_down_up.set((
((bps_down as f64) / 1000.0f64) as f32,
((bps_up as f64) / 1000.0f64) as f32,
));
inner.ui_state.peers_state.set(peers);
let _ = inner.cb_sink.send(Box::new(UI::update_cb));
}
pub fn set_config(&mut self, config: VeilidConfigInner) {
let mut inner = self.inner.borrow_mut();
inner
.ui_state
.node_id
.set(config.network.routing_table.node_id.to_string());
}
pub fn set_connection_state(&mut self, state: ConnectionState) {
let mut inner = self.inner.borrow_mut();
inner.ui_state.connection_state.set(state);
let _ = inner.cb_sink.send(Box::new(UI::update_cb));
}
pub fn add_node_event(&self, event: String) {
let inner = self.inner.borrow();
let color = *inner.log_colors.get(&Level::Info).unwrap();
let mut starting_style: Style = color.into();
for line in event.lines() {
let (spanned_string, end_style) =
cursive::utils::markup::ansi::parse_with_starting_style(starting_style, line);
cursive_flexi_logger_view::push_to_log(spanned_string);
starting_style = end_style;
}
let _ = inner.cb_sink.send(Box::new(UI::update_cb));
// Note: Cursive is not re-entrant, can't borrow_mut self.siv again after this
pub async fn run_async(&mut self) {
self.siv.run_async().await;
}
// pub fn run(&mut self) {
// self.siv.run();
// }
}
#[derive(Clone)]
pub struct UISender {
inner: Arc<Mutex<UIInner>>,
cb_sink: Sender<Box<dyn FnOnce(&mut Cursive) + 'static + Send>>,
}
impl UISender {
pub fn display_string_dialog<T: ToString, S: ToString>(
&self,
title: T,
@ -920,31 +869,84 @@ impl UI {
) {
let title = title.to_string();
let text = text.to_string();
let inner = self.inner.borrow();
let _ = inner.cb_sink.send(Box::new(move |s| {
let _ = self.cb_sink.send(Box::new(move |s| {
UI::display_string_dialog_cb(s, title, text, close_cb)
}));
}
pub fn quit(&self) {
let inner = self.inner.borrow();
let _ = inner.cb_sink.send(Box::new(|s| {
let _ = self.cb_sink.send(Box::new(|s| {
s.quit();
}));
}
pub fn send_callback(&self, callback: UICallback) {
let inner = self.inner.borrow();
let _ = inner.cb_sink.send(Box::new(move |s| callback(s)));
let _ = self.cb_sink.send(Box::new(move |s| callback(s)));
}
pub fn set_attachment_state(
&mut self,
state: String,
public_internet_ready: bool,
local_network_ready: bool,
) {
{
let mut inner = self.inner.lock();
inner.ui_state.attachment_state.set(state);
inner
.ui_state
.public_internet_ready
.set(public_internet_ready);
inner.ui_state.local_network_ready.set(local_network_ready);
}
let _ = self.cb_sink.send(Box::new(UI::update_cb));
}
pub fn set_network_status(
&mut self,
started: bool,
bps_down: u64,
bps_up: u64,
peers: Vec<json::JsonValue>,
) {
{
let mut inner = self.inner.lock();
inner.ui_state.network_started.set(started);
inner.ui_state.network_down_up.set((
((bps_down as f64) / 1000.0f64) as f32,
((bps_up as f64) / 1000.0f64) as f32,
));
inner.ui_state.peers_state.set(peers);
}
let _ = self.cb_sink.send(Box::new(UI::update_cb));
}
pub fn set_config(&mut self, config: &json::JsonValue) {
let mut inner = self.inner.lock();
inner
.ui_state
.node_id
.set(config["network"]["routing_table"]["node_id"].to_string());
}
pub fn set_connection_state(&mut self, state: ConnectionState) {
{
let mut inner = self.inner.lock();
inner.ui_state.connection_state.set(state);
}
let _ = self.cb_sink.send(Box::new(UI::update_cb));
}
// Note: Cursive is not re-entrant, can't borrow_mut self.siv again after this
pub async fn run_async(&mut self) {
let mut siv = self.siv.borrow_mut();
siv.run_async().await;
pub fn add_node_event(&self, event: String) {
{
let inner = self.inner.lock();
let color = *inner.log_colors.get(&Level::Info).unwrap();
let mut starting_style: Style = color.into();
for line in event.lines() {
let (spanned_string, end_style) =
cursive::utils::markup::ansi::parse_with_starting_style(starting_style, line);
cursive_flexi_logger_view::push_to_log(spanned_string);
starting_style = end_style;
}
}
let _ = self.cb_sink.send(Box::new(UI::update_cb));
}
// pub fn run(&mut self) {
// let mut siv = self.siv.borrow_mut();
// siv.run();
// }
}

View File

@ -69,7 +69,7 @@ impl ProtectedStore {
));
// Ensure permissions are correct
ensure_file_private_owner(&insecure_keyring_file)?;
ensure_file_private_owner(&insecure_keyring_file).map_err(|e| eyre!("{}", e))?;
// Open the insecure keyring
inner.keyring_manager = Some(

View File

@ -41,15 +41,6 @@ pub use self::veilid_config::*;
pub use self::veilid_layer_filter::*;
pub use veilid_tools as tools;
use enumset::*;
use rkyv::{
bytecheck, bytecheck::CheckBytes, de::deserializers::SharedDeserializeMap, with::Skip,
Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
};
type RkyvDefaultValidator<'t> = rkyv::validation::validators::DefaultValidator<'t>;
use schemars::{schema_for, JsonSchema};
use serde::*;
pub mod veilid_capnp {
include!(concat!(env!("OUT_DIR"), "/proto/veilid_capnp.rs"));
}
@ -94,4 +85,20 @@ pub static DEFAULT_LOG_IGNORE_LIST: [&str; 21] = [
"attohttpc",
];
use cfg_if::*;
use enumset::*;
use eyre::{bail, eyre, Report as EyreReport, Result as EyreResult, WrapErr};
use parking_lot::*;
use rkyv::{
bytecheck, bytecheck::CheckBytes, de::deserializers::SharedDeserializeMap, with::Skip,
Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
};
use tracing::*;
use veilid_tools::*;
type RkyvDefaultValidator<'t> = rkyv::validation::validators::DefaultValidator<'t>;
use futures_util::stream::FuturesUnordered;
use owo_colors::OwoColorize;
use schemars::{schema_for, JsonSchema};
use serde::*;
use stop_token::*;
use thiserror::Error as ThisError;

View File

@ -212,7 +212,8 @@ impl Network {
} else {
// If no address is specified, but the port is, use ipv4 and ipv6 unspecified
// If the address is specified, only use the specified port and fail otherwise
let sockaddrs = listen_address_to_socket_addrs(&listen_address)?;
let sockaddrs =
listen_address_to_socket_addrs(&listen_address).map_err(|e| eyre!("{}", e))?;
if sockaddrs.is_empty() {
bail!("No valid listen address: {}", listen_address);
}
@ -236,7 +237,8 @@ impl Network {
} else {
// If no address is specified, but the port is, use ipv4 and ipv6 unspecified
// If the address is specified, only use the specified port and fail otherwise
let sockaddrs = listen_address_to_socket_addrs(&listen_address)?;
let sockaddrs =
listen_address_to_socket_addrs(&listen_address).map_err(|e| eyre!("{}", e))?;
if sockaddrs.is_empty() {
bail!("No valid listen address: {}", listen_address);
}

View File

@ -167,20 +167,20 @@ impl ClientApi {
if args.len() == 0 {
apibail_generic!("no control request specified");
}
if args[0] == "shutdown" {
if args[0] == "Shutdown" {
if args.len() != 1 {
apibail_generic!("wrong number of arguments");
}
self.shutdown();
Ok("".to_owned())
} else if args[0] == "change_log_level" {
} else if args[0] == "ChangeLogLevel" {
if args.len() != 3 {
apibail_generic!("wrong number of arguments");
}
let log_level: VeilidConfigLogLevel = deserialize_json(&args[2])?;
self.change_log_level(args[1].clone(), log_level)?;
Ok("".to_owned())
} else if args[0] == "get_server_settings" {
} else if args[0] == "GetServerSettings" {
if args.len() != 1 {
apibail_generic!("wrong number of arguments");
}
@ -194,7 +194,7 @@ impl ClientApi {
settings_json["core"]["protected_store"].remove("new_device_encryption_key_password");
let safe_settings_json = settings_json.to_string();
Ok(safe_settings_json)
} else if args[0] == "emit_schema" {
} else if args[0] == "EmitSchema" {
if args.len() != 2 {
apibail_generic!("wrong number of arguments");
}

View File

@ -14,14 +14,10 @@ mod veilid_logs;
#[cfg(windows)]
mod windows;
use cfg_if::*;
#[allow(unused_imports)]
use color_eyre::eyre::{bail, ensure, eyre, Result as EyreResult, WrapErr};
use server::*;
use std::collections::HashMap;
use std::str::FromStr;
use tools::*;
use tracing::*;
use veilid_logs::*;
#[instrument(err)]

View File

@ -1,5 +1,6 @@
use crate::client_api;
use crate::settings::*;
use crate::tools::*;
use crate::veilid_logs::*;
use flume::{unbounded, Receiver, Sender};
use futures_util::select;

View File

@ -2,6 +2,7 @@
use directories::*;
use crate::tools::*;
use serde_derive::*;
use std::ffi::OsStr;
use std::net::SocketAddr;

View File

@ -1,5 +1,8 @@
use cfg_if::*;
use core::future::Future;
pub use cfg_if::*;
pub use color_eyre::eyre::{bail, ensure, eyre, Result as EyreResult, WrapErr};
pub use core::future::Future;
pub use parking_lot::*;
pub use tracing::*;
cfg_if! {
if #[cfg(feature="rt-async-std")] {

View File

@ -28,27 +28,6 @@ mod tools;
#[cfg(target_arch = "wasm32")]
mod wasm;
pub use cfg_if::*;
#[allow(unused_imports)]
pub use eyre::{bail, ensure, eyre, Report as EyreReport, Result as EyreResult, WrapErr};
pub use futures_util::future::{select, Either};
pub use futures_util::select;
pub use futures_util::stream::FuturesUnordered;
pub use futures_util::{AsyncRead, AsyncWrite};
pub use log_thru::*;
pub use owo_colors::OwoColorize;
pub use parking_lot::*;
pub use split_url::*;
pub use static_assertions::*;
pub use stop_token::*;
pub use thiserror::Error as ThisError;
cfg_if! {
if #[cfg(feature = "tracing")] {
pub use tracing::*;
} else {
pub use log::*;
}
}
pub type PinBox<T> = Pin<Box<T>>;
pub type PinBoxFuture<T> = PinBox<dyn Future<Output = T> + 'static>;
pub type PinBoxFutureLifetime<'a, T> = PinBox<dyn Future<Output = T> + 'a>;
@ -120,6 +99,7 @@ pub use eventual_value_clone::*;
pub use interval::*;
pub use ip_addr_port::*;
pub use ip_extra::*;
pub use log_thru::*;
pub use must_join_handle::*;
pub use must_join_single_future::*;
pub use mutable_future::*;
@ -128,17 +108,32 @@ pub use random::*;
pub use single_shot_eventual::*;
pub use sleep::*;
pub use spawn::*;
pub use split_url::*;
pub use tick_task::*;
pub use timeout::*;
pub use timeout_or::*;
pub use timestamp::*;
pub use tools::*;
#[cfg(target_arch = "wasm32")]
pub use wasm::*;
// Tests must be public for wasm-pack tests
pub mod tests;
cfg_if! {
if #[cfg(feature = "tracing")] {
use tracing::*;
} else {
use log::*;
}
}
use cfg_if::*;
use futures_util::{AsyncRead, AsyncWrite};
use parking_lot::*;
use stop_token::*;
use thiserror::Error as ThisError;
// For iOS tests
#[no_mangle]
pub extern "C" fn main_rs() {}

View File

@ -337,19 +337,13 @@ macro_rules! network_result_value_or_log {
($r: expr => $f:tt) => {
match $r {
NetworkResult::Timeout => {
log_network_result!(
"{} at {}@{}:{}",
"Timeout".cyan(),
file!(),
line!(),
column!()
);
log_network_result!("{} at {}@{}:{}", "Timeout", file!(), line!(), column!());
$f
}
NetworkResult::ServiceUnavailable => {
log_network_result!(
"{} at {}@{}:{}",
"ServiceUnavailable".cyan(),
"ServiceUnavailable",
file!(),
line!(),
column!()
@ -359,7 +353,7 @@ macro_rules! network_result_value_or_log {
NetworkResult::NoConnection(e) => {
log_network_result!(
"{}({}) at {}@{}:{}",
"No connection".cyan(),
"No connection",
e.to_string(),
file!(),
line!(),
@ -370,7 +364,7 @@ macro_rules! network_result_value_or_log {
NetworkResult::AlreadyExists(e) => {
log_network_result!(
"{}({}) at {}@{}:{}",
"Already exists".cyan(),
"Already exists",
e.to_string(),
file!(),
line!(),
@ -381,7 +375,7 @@ macro_rules! network_result_value_or_log {
NetworkResult::InvalidMessage(s) => {
log_network_result!(
"{}({}) at {}@{}:{}",
"Invalid message".cyan(),
"Invalid message",
s,
file!(),
line!(),

View File

@ -97,11 +97,13 @@ cfg_if! {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
pub fn split_port(name: &str) -> EyreResult<(String, Option<u16>)> {
pub fn split_port(name: &str) -> Result<(String, Option<u16>), String> {
if let Some(split) = name.rfind(':') {
let hoststr = &name[0..split];
let portstr = &name[split + 1..];
let port: u16 = portstr.parse::<u16>().wrap_err("invalid port")?;
let port: u16 = portstr
.parse::<u16>()
.map_err(|e| format!("invalid port: {}", e))?;
Ok((hoststr.to_string(), Some(port)))
} else {
@ -130,8 +132,8 @@ pub fn ms_to_us(ms: u32) -> u64 {
(ms as u64) * 1000u64
}
pub fn us_to_ms(us: u64) -> EyreResult<u32> {
u32::try_from(us / 1000u64).wrap_err("could not convert microseconds")
pub fn us_to_ms(us: u64) -> Result<u32, String> {
u32::try_from(us / 1000u64).map_err(|e| format!("could not convert microseconds: {}", e))
}
// Calculate retry attempt with logarhythmic falloff
@ -224,7 +226,7 @@ pub fn compatible_unspecified_socket_addr(socket_addr: &SocketAddr) -> SocketAdd
}
}
pub fn listen_address_to_socket_addrs(listen_address: &str) -> EyreResult<Vec<SocketAddr>> {
pub fn listen_address_to_socket_addrs(listen_address: &str) -> Result<Vec<SocketAddr>, String> {
// If no address is specified, but the port is, use ipv4 and ipv6 unspecified
// If the address is specified, only use the specified port and fail otherwise
let ip_addrs = vec![
@ -235,7 +237,7 @@ pub fn listen_address_to_socket_addrs(listen_address: &str) -> EyreResult<Vec<So
Ok(if let Some(portstr) = listen_address.strip_prefix(':') {
let port = portstr
.parse::<u16>()
.wrap_err("Invalid port format in udp listen address")?;
.map_err(|e| format!("Invalid port format in udp listen address: {}", e))?;
ip_addrs.iter().map(|a| SocketAddr::new(*a, port)).collect()
} else if let Ok(port) = listen_address.parse::<u16>() {
ip_addrs.iter().map(|a| SocketAddr::new(*a, port)).collect()
@ -243,11 +245,11 @@ pub fn listen_address_to_socket_addrs(listen_address: &str) -> EyreResult<Vec<So
cfg_if! {
if #[cfg(target_arch = "wasm32")] {
use core::str::FromStr;
vec![SocketAddr::from_str(listen_address).map_err(|e| io_error_other!(e)).wrap_err("Unable to parse address")?]
vec![SocketAddr::from_str(listen_address).map_err(|e| format!("Unable to parse address: {}",e))?]
} else {
listen_address
.to_socket_addrs()
.wrap_err("Unable to resolve address")?
.map_err(|e| format!("Unable to resolve address: {}", e))?
.collect()
}
}
@ -277,7 +279,7 @@ cfg_if::cfg_if! {
use std::os::unix::prelude::PermissionsExt;
use nix::unistd::{Uid, Gid};
pub fn ensure_file_private_owner<P:AsRef<Path>>(path: P) -> EyreResult<()>
pub fn ensure_file_private_owner<P:AsRef<Path>>(path: P) -> Result<(), String>
{
let path = path.as_ref();
if !path.exists() {
@ -286,13 +288,13 @@ cfg_if::cfg_if! {
let uid = Uid::effective();
let gid = Gid::effective();
let meta = std::fs::metadata(path).wrap_err("unable to get metadata for path")?;
let meta = std::fs::metadata(path).map_err(|e| format!("unable to get metadata for path: {}", e))?;
if meta.mode() != 0o600 {
std::fs::set_permissions(path,std::fs::Permissions::from_mode(0o600)).wrap_err("unable to set correct permissions on path")?;
std::fs::set_permissions(path,std::fs::Permissions::from_mode(0o600)).map_err(|e| format!("unable to set correct permissions on path: {}", e))?;
}
if meta.uid() != uid.as_raw() || meta.gid() != gid.as_raw() {
bail!("path has incorrect owner/group");
return Err("path has incorrect owner/group".to_owned());
}
Ok(())
}
@ -300,7 +302,7 @@ cfg_if::cfg_if! {
//use std::os::windows::fs::MetadataExt;
//use windows_permissions::*;
pub fn ensure_file_private_owner<P:AsRef<Path>>(path: P) -> EyreResult<()>
pub fn ensure_file_private_owner<P:AsRef<Path>>(path: P) -> Result<(), String>
{
let path = path.as_ref();
if !path.exists() {