veilid/veilid-cli/src/client_api_connection.rs

378 lines
12 KiB
Rust
Raw Normal View History

2021-11-22 16:28:30 +00:00
use crate::command_processor::*;
2022-06-28 03:46:29 +00:00
use crate::tools::*;
2023-06-08 18:07:09 +00:00
use futures::stream::FuturesUnordered;
use futures::StreamExt;
2021-11-22 16:28:30 +00:00
use std::net::SocketAddr;
2023-06-08 18:07:09 +00:00
use std::time::SystemTime;
use stop_token::{future::FutureExt as _, StopSource};
2023-06-08 01:55:23 +00:00
cfg_if! {
if #[cfg(feature="rt-async-std")] {
use async_std::io::prelude::BufReadExt;
use async_std::io::WriteExt;
use async_std::io::BufReader;
} else if #[cfg(feature="rt-tokio")] {
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
2022-07-01 16:13:52 +00:00
}
}
2021-11-22 16:28:30 +00:00
struct ClientApiConnectionInner {
comproc: CommandProcessor,
connect_addr: Option<SocketAddr>,
2023-06-08 18:07:09 +00:00
request_sender: Option<flume::Sender<String>>,
2023-06-08 01:55:23 +00:00
disconnector: Option<StopSource>,
2021-11-22 16:28:30 +00:00
disconnect_requested: bool,
2023-06-08 18:07:09 +00:00
reply_channels: HashMap<u32, flume::Sender<json::JsonValue>>,
next_req_id: u32,
2021-11-22 16:28:30 +00:00
}
#[derive(Clone)]
pub struct ClientApiConnection {
2023-06-08 18:07:09 +00:00
inner: Arc<Mutex<ClientApiConnectionInner>>,
2021-11-22 16:28:30 +00:00
}
impl ClientApiConnection {
pub fn new(comproc: CommandProcessor) -> Self {
Self {
2023-06-08 18:07:09 +00:00
inner: Arc::new(Mutex::new(ClientApiConnectionInner {
2021-11-28 02:31:01 +00:00
comproc,
2021-11-22 16:28:30 +00:00
connect_addr: None,
2023-06-08 18:07:09 +00:00
request_sender: None,
2023-06-08 01:55:23 +00:00
disconnector: None,
2021-11-22 16:28:30 +00:00
disconnect_requested: false,
2023-06-08 18:07:09 +00:00
reply_channels: HashMap::new(),
next_req_id: 0,
2021-11-22 16:28:30 +00:00
})),
}
}
2022-11-03 15:28:29 +00:00
2023-06-08 18:07:09 +00:00
pub fn cancel_all(&self) {
let mut inner = self.inner.lock();
inner.reply_channels.clear();
2022-11-03 15:28:29 +00:00
}
2023-06-09 23:08:49 +00:00
async fn process_veilid_state<'a>(&self, state: &json::JsonValue) {
let comproc = self.inner.lock().comproc.clone();
comproc.update_attachment(&state["attachment"]);
comproc.update_network_status(&state["network"]);
comproc.update_config(&state["config"]);
}
2023-06-08 01:55:23 +00:00
2023-06-08 18:07:09 +00:00
async fn process_response(&self, response: json::JsonValue) {
// find the operation id and send the response to the channel for it
2023-06-09 23:08:49 +00:00
let Some(id) = response["id"].as_u32() else {
2023-06-08 18:07:09 +00:00
error!("invalid id: {}", response);
return;
};
let reply_channel = {
let mut inner = self.inner.lock();
inner.reply_channels.remove(&id)
};
let Some(reply_channel) = reply_channel else {
warn!("received cancelled reply: {}", response);
return;
};
if let Err(e) = reply_channel.send_async(response).await {
error!("failed to process reply: {}", e);
return;
}
}
2023-06-09 23:08:49 +00:00
async fn process_veilid_update(&self, update: json::JsonValue) {
2023-06-08 18:07:09 +00:00
let comproc = self.inner.lock().comproc.clone();
2023-06-08 01:55:23 +00:00
let Some(kind) = update["kind"].as_str() else {
2023-06-21 02:18:59 +00:00
comproc.log_message(Level::Error, format!("missing update kind: {}", update));
2023-06-08 01:55:23 +00:00
return;
2023-03-03 15:55:31 +00:00
};
2023-06-08 01:55:23 +00:00
match kind {
"Log" => {
2023-06-09 23:08:49 +00:00
comproc.update_log(&update);
2023-06-08 01:55:23 +00:00
}
"AppMessage" => {
2023-06-09 23:08:49 +00:00
comproc.update_app_message(&update);
2023-06-08 01:55:23 +00:00
}
"AppCall" => {
2023-06-09 23:08:49 +00:00
comproc.update_app_call(&update);
2023-06-08 01:55:23 +00:00
}
"Attachment" => {
2023-06-09 23:08:49 +00:00
comproc.update_attachment(&update);
2023-06-08 01:55:23 +00:00
}
"Network" => {
2023-06-09 23:08:49 +00:00
comproc.update_network_status(&update);
2023-06-08 01:55:23 +00:00
}
"Config" => {
2023-06-09 23:08:49 +00:00
comproc.update_config(&update);
2023-06-08 01:55:23 +00:00
}
"RouteChange" => {
2023-06-09 23:08:49 +00:00
comproc.update_route(&update);
2023-06-08 01:55:23 +00:00
}
"Shutdown" => comproc.update_shutdown(),
"ValueChange" => {
2023-06-09 23:08:49 +00:00
comproc.update_value_change(&update);
2023-06-08 01:55:23 +00:00
}
_ => {
2023-06-21 02:18:59 +00:00
comproc.log_message(Level::Error, format!("unknown update kind: {}", update));
2023-06-08 01:55:23 +00:00
}
}
2022-06-08 13:33:41 +00:00
}
2023-06-08 18:07:09 +00:00
async fn handle_connection(&self, connect_addr: SocketAddr) -> Result<(), String> {
2022-06-08 13:33:41 +00:00
trace!("ClientApiConnection::handle_connection");
2022-11-16 17:49:53 +00:00
2022-06-08 13:33:41 +00:00
// Connect the TCP socket
2022-06-28 03:46:29 +00:00
let stream = TcpStream::connect(connect_addr)
2022-06-08 13:33:41 +00:00
.await
.map_err(map_to_string)?;
2023-06-08 01:55:23 +00:00
2022-06-08 13:33:41 +00:00
// If it succeed, disable nagle algorithm
stream.set_nodelay(true).map_err(map_to_string)?;
2023-06-08 18:07:09 +00:00
// State we connected
let comproc = self.inner.lock().comproc.clone();
comproc.set_connection_state(ConnectionState::Connected(connect_addr, SystemTime::now()));
2023-06-08 01:55:23 +00:00
// Split the stream
2022-06-28 03:46:29 +00:00
cfg_if! {
if #[cfg(feature="rt-async-std")] {
2022-06-29 14:34:23 +00:00
use futures::AsyncReadExt;
2023-06-08 18:07:09 +00:00
let (reader, mut writer) = stream.split();
2023-06-08 01:55:23 +00:00
let mut reader = BufReader::new(reader);
2022-06-28 03:46:29 +00:00
} else if #[cfg(feature="rt-tokio")] {
2023-06-08 18:07:09 +00:00
let (reader, mut writer) = stream.into_split();
2023-06-08 01:55:23 +00:00
let mut reader = BufReader::new(reader);
2022-06-28 03:46:29 +00:00
}
}
2023-06-08 18:07:09 +00:00
// Requests to send
let (requests_tx, requests_rx) = flume::unbounded();
2023-06-09 23:08:49 +00:00
// Create disconnection mechanism
2023-06-08 18:07:09 +00:00
let stop_token = {
let stop_source = StopSource::new();
let token = stop_source.token();
let mut inner = self.inner.lock();
inner.connect_addr = Some(connect_addr);
inner.disconnector = Some(stop_source);
inner.request_sender = Some(requests_tx);
token
};
// Futures to process unordered
let mut unord = FuturesUnordered::new();
2023-06-08 01:55:23 +00:00
// Process lines
2023-06-08 18:07:09 +00:00
let this = self.clone();
let recv_messages_future = async move {
2023-06-09 23:08:49 +00:00
let mut linebuf = String::new();
while let Ok(size) = reader.read_line(&mut linebuf).await {
2023-06-08 18:07:09 +00:00
// Exit on EOF
if size == 0 {
2023-06-08 01:55:23 +00:00
// Disconnected
2023-06-08 18:07:09 +00:00
break;
2023-06-08 01:55:23 +00:00
}
2021-11-22 16:28:30 +00:00
2023-06-09 23:08:49 +00:00
let line = linebuf.trim().to_owned();
linebuf.clear();
2023-06-08 18:07:09 +00:00
// Unmarshal json
2023-06-09 23:08:49 +00:00
let j = match json::parse(&line) {
2023-06-08 18:07:09 +00:00
Ok(v) => v,
Err(e) => {
error!("failed to parse server response: {}", e);
continue;
}
};
if j["type"] == "Update" {
2023-06-09 23:08:49 +00:00
this.process_veilid_update(j).await;
2023-06-08 18:07:09 +00:00
} else if j["type"] == "Response" {
this.process_response(j).await;
2022-11-03 15:28:29 +00:00
}
2023-06-08 18:07:09 +00:00
}
//
let mut inner = this.inner.lock();
inner.request_sender = None;
};
unord.push(system_boxed(recv_messages_future));
2023-06-08 01:55:23 +00:00
2023-06-08 18:07:09 +00:00
// Requests send processor
let send_requests_future = async move {
while let Ok(req) = requests_rx.recv_async().await {
if let Err(e) = writer.write_all(req.as_bytes()).await {
error!("failed to write request: {}", e)
}
2023-06-08 01:55:23 +00:00
}
2023-06-08 18:07:09 +00:00
};
unord.push(system_boxed(send_requests_future));
2023-06-09 23:08:49 +00:00
// Request initial server state
let capi = self.clone();
spawn_detached_local(async move {
let mut req = json::JsonValue::new_object();
req["op"] = "GetState".into();
let Some(resp) = capi.perform_request(req).await else {
error!("failed to get state");
return;
};
if resp.has_key("error") {
error!("failed to get state: {}", resp["error"]);
return;
}
capi.process_veilid_state(&resp["value"]).await;
});
2023-06-08 18:07:09 +00:00
// Send and receive until we're done or a stop is requested
while let Ok(Some(())) = unord.next().timeout_at(stop_token.clone()).await {}
// // Drop the server and disconnector too (if we still have it)
let mut inner = self.inner.lock();
let disconnect_requested = inner.disconnect_requested;
inner.request_sender = None;
inner.disconnector = None;
inner.disconnect_requested = false;
inner.connect_addr = None;
2023-06-08 01:55:23 +00:00
// Connection finished
2023-06-08 18:07:09 +00:00
if disconnect_requested {
Ok(())
} else {
Err("Connection lost".to_owned())
}
}
2023-06-08 01:55:23 +00:00
2023-06-08 18:07:09 +00:00
async fn perform_request(&self, mut req: json::JsonValue) -> Option<json::JsonValue> {
let (sender, reply_rx) = {
let mut inner = self.inner.lock();
2023-06-08 01:55:23 +00:00
2023-06-08 18:07:09 +00:00
// Get the request sender
let Some(sender) = inner.request_sender.clone() else {
error!("dropping request, not connected");
return None;
};
2023-06-08 01:55:23 +00:00
2023-06-08 18:07:09 +00:00
// Get next id
let id = inner.next_req_id;
inner.next_req_id += 1;
2023-06-08 01:55:23 +00:00
2023-06-08 18:07:09 +00:00
// Add the id
req["id"] = id.into();
2022-11-03 15:28:29 +00:00
2023-06-08 18:07:09 +00:00
// Make a reply receiver
let (reply_tx, reply_rx) = flume::bounded(1);
inner.reply_channels.insert(id, reply_tx);
(sender, reply_rx)
};
// Send the request
let req_ndjson = req.dump() + "\n";
if let Err(e) = sender.send_async(req_ndjson).await {
error!("failed to send request: {}", e);
return None;
}
2023-06-08 01:55:23 +00:00
2023-06-08 18:07:09 +00:00
// Wait for the reply
let Ok(r) = reply_rx.recv_async().await else {
// Cancelled
return None;
};
Some(r)
}
pub async fn server_attach(&self) -> Result<(), String> {
2021-11-22 16:28:30 +00:00
trace!("ClientApiConnection::server_attach");
2023-06-08 18:07:09 +00:00
let mut req = json::JsonValue::new_object();
req["op"] = "Attach".into();
let Some(resp) = self.perform_request(req).await else {
return Err("Cancelled".to_owned());
2021-11-22 16:28:30 +00:00
};
2023-06-08 18:07:09 +00:00
if resp.has_key("error") {
return Err(resp["error"].to_string());
}
Ok(())
2021-11-22 16:28:30 +00:00
}
2023-06-08 18:07:09 +00:00
pub async fn server_detach(&self) -> Result<(), String> {
2021-11-22 16:28:30 +00:00
trace!("ClientApiConnection::server_detach");
2023-06-08 18:07:09 +00:00
let mut req = json::JsonValue::new_object();
req["op"] = "Detach".into();
let Some(resp) = self.perform_request(req).await else {
return Err("Cancelled".to_owned());
2021-11-22 16:28:30 +00:00
};
2023-06-08 18:07:09 +00:00
if resp.has_key("error") {
return Err(resp["error"].to_string());
}
Ok(())
2021-11-22 16:28:30 +00:00
}
2023-06-08 18:07:09 +00:00
pub async fn server_shutdown(&self) -> Result<(), String> {
2021-11-22 16:28:30 +00:00
trace!("ClientApiConnection::server_shutdown");
2023-06-08 18:07:09 +00:00
let mut req = json::JsonValue::new_object();
req["op"] = "Control".into();
req["args"] = json::JsonValue::new_array();
req["args"].push("Shutdown").unwrap();
let Some(resp) = self.perform_request(req).await else {
return Err("Cancelled".to_owned());
2021-11-22 16:28:30 +00:00
};
2023-06-08 18:07:09 +00:00
if resp.has_key("error") {
return Err(resp["error"].to_string());
}
Ok(())
2021-12-11 01:14:33 +00:00
}
2023-06-08 18:07:09 +00:00
pub async fn server_debug(&self, what: String) -> Result<String, String> {
2021-12-11 01:14:33 +00:00
trace!("ClientApiConnection::server_debug");
2023-06-08 18:07:09 +00:00
let mut req = json::JsonValue::new_object();
req["op"] = "Debug".into();
req["command"] = what.into();
let Some(resp) = self.perform_request(req).await else {
return Err("Cancelled".to_owned());
2021-12-11 01:14:33 +00:00
};
2023-06-08 18:07:09 +00:00
if resp.has_key("error") {
return Err(resp["error"].to_string());
}
Ok(resp["value"].to_string())
2022-07-01 16:13:52 +00:00
}
pub async fn server_change_log_level(
2023-06-08 18:07:09 +00:00
&self,
2022-07-01 16:13:52 +00:00
layer: String,
2023-06-08 18:07:09 +00:00
log_level: String,
2022-07-01 16:13:52 +00:00
) -> Result<(), String> {
trace!("ClientApiConnection::change_log_level");
2023-06-08 18:07:09 +00:00
let mut req = json::JsonValue::new_object();
req["op"] = "Control".into();
req["args"] = json::JsonValue::new_array();
req["args"].push("ChangeLogLevel").unwrap();
req["args"].push(layer).unwrap();
req["args"].push(log_level).unwrap();
let Some(resp) = self.perform_request(req).await else {
return Err("Cancelled".to_owned());
2022-07-01 16:13:52 +00:00
};
2023-06-08 18:07:09 +00:00
if resp.has_key("error") {
return Err(resp["error"].to_string());
}
Ok(())
2022-10-01 02:37:55 +00:00
}
2021-11-22 16:28:30 +00:00
// Start Client API connection
2023-06-08 18:07:09 +00:00
pub async fn connect(&self, connect_addr: SocketAddr) -> Result<(), String> {
2021-11-22 16:28:30 +00:00
trace!("ClientApiConnection::connect");
// Save the address to connect to
2022-11-16 17:49:53 +00:00
self.handle_connection(connect_addr).await
2021-11-22 16:28:30 +00:00
}
// End Client API connection
2023-06-08 18:07:09 +00:00
pub async fn disconnect(&self) {
2021-11-22 16:28:30 +00:00
trace!("ClientApiConnection::disconnect");
2023-06-08 18:07:09 +00:00
let mut inner = self.inner.lock();
if inner.disconnector.is_some() {
inner.disconnector = None;
inner.disconnect_requested = true;
2021-11-22 16:28:30 +00:00
}
}
}