json api cli working

This commit is contained in:
John Smith
2023-06-09 19:08:49 -04:00
parent 419bfcd8ce
commit 532bcf2e2a
5 changed files with 254 additions and 257 deletions

View File

@@ -3,14 +3,14 @@ use crate::tools::*;
use crate::veilid_logs::VeilidLogs;
use cfg_if::*;
use futures_util::{future::try_join_all, stream::FuturesUnordered, StreamExt};
use parking_lot::Mutex;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use stop_token::future::FutureExt;
use stop_token::future::FutureExt as _;
use stop_token::*;
use tracing::*;
use veilid_core::json_api::JsonRequestProcessor;
use veilid_core::tools::*;
use veilid_core::*;
use wg::AsyncWaitGroup;
@@ -24,34 +24,18 @@ cfg_if! {
use tokio::io::AsyncWriteExt;
}
}
// struct VeilidServerImpl {
// veilid_api: veilid_core::VeilidAPI,
// veilid_logs: VeilidLogs,
// settings: Settings,
// next_id: u64,
// }
// impl VeilidServerImpl {
// #[instrument(level = "trace", skip_all)]
// pub fn new(
// veilid_api: veilid_core::VeilidAPI,
// veilid_logs: VeilidLogs,
// settings: Settings,
// ) -> Self {
// Self {
// next_id: 0,
// veilid_api,
// veilid_logs,
// settings,
// }
// }
// }
// --- Client API Server-Side ---------------------------------
type ClientApiAllFuturesJoinHandle = MustJoinHandle<std::io::Result<Vec<()>>>;
struct RequestLine {
// Request to process
line: String,
// Where to send the response
responses_tx: flume::Sender<String>,
}
struct ClientApiInner {
veilid_api: veilid_core::VeilidAPI,
veilid_logs: VeilidLogs,
@@ -177,7 +161,7 @@ impl ClientApi {
if args.len() != 3 {
apibail_generic!("wrong number of arguments");
}
let log_level: VeilidConfigLogLevel = deserialize_json(&args[2])?;
let log_level = VeilidConfigLogLevel::from_str(&args[2])?;
self.change_log_level(args[1].clone(), log_level)?;
Ok("".to_owned())
} else if args[0] == "GetServerSettings" {
@@ -212,6 +196,107 @@ impl ClientApi {
}
}
async fn process_request_line(
self,
jrp: JsonRequestProcessor,
request_line: RequestLine,
) -> VeilidAPIResult<Option<RequestLine>> {
let line = request_line.line;
let responses_tx = request_line.responses_tx;
// Unmarshal NDJSON - newline => json
// (trim all whitespace around input lines just to make things more permissive for API users)
let request: json_api::Request = deserialize_json(&line)?;
// See if this is a control message or a veilid-core message
let response = if let json_api::RequestOp::Control { args } = request.op {
// Process control messages
json_api::Response {
id: request.id,
op: json_api::ResponseOp::Control {
result: json_api::to_json_api_result(self.process_control(args).await),
},
}
} else {
// Process with ndjson api
jrp.clone().process_request(request).await
};
// Marshal json + newline => NDJSON
let response_string = serialize_json(json_api::RecvMessage::Response(response)) + "\n";
if let Err(e) = responses_tx.send_async(response_string).await {
warn!("response not sent: {}", e)
}
VeilidAPIResult::Ok(None)
}
async fn next_request_line(
requests_rx: flume::Receiver<Option<RequestLine>>,
) -> VeilidAPIResult<Option<RequestLine>> {
Ok(requests_rx.recv_async().await.ok().flatten())
}
async fn receive_requests<R: AsyncBufReadExt + Unpin>(
self,
conn_tuple: (SocketAddr, SocketAddr),
mut reader: R,
requests_tx: flume::Sender<Option<RequestLine>>,
responses_tx: flume::Sender<String>,
) -> VeilidAPIResult<Option<RequestLine>> {
// responses_tx becomes owned by recv_requests_future
// Start sending updates
self.inner
.lock()
.update_channels
.insert(conn_tuple, responses_tx.clone());
let mut linebuf = String::new();
while let Ok(size) = reader.read_line(&mut linebuf).await {
// Eof?
if size == 0 {
break;
}
// Put the processing in the async queue
let line = linebuf.trim().to_owned();
linebuf.clear();
// Ignore newlines
if line.len() == 0 {
continue;
}
// Enqueue the line for processing in parallel
let request_line = RequestLine {
line,
responses_tx: responses_tx.clone(),
};
if let Err(e) = requests_tx.send_async(Some(request_line)).await {
error!("failed to enqueue request: {}", e);
break;
}
}
// Stop sending updates
// Will cause send_responses_future to stop because we drop the responses_tx
self.inner.lock().update_channels.remove(&conn_tuple);
VeilidAPIResult::Ok(None)
}
async fn send_responses<W: AsyncWriteExt + Unpin>(
self,
responses_rx: flume::Receiver<String>,
mut writer: W,
) -> VeilidAPIResult<Option<RequestLine>> {
while let Ok(resp) = responses_rx.recv_async().await {
if let Err(e) = writer.write_all(resp.as_bytes()).await {
error!("failed to write response: {}", e)
}
}
VeilidAPIResult::Ok(None)
}
pub async fn handle_connection(self, stream: TcpStream, awg: AsyncWaitGroup) {
// Get address of peer
let peer_addr = match stream.peer_addr() {
@@ -246,10 +331,10 @@ impl ClientApi {
if #[cfg(feature="rt-async-std")] {
use futures_util::AsyncReadExt;
let (reader, mut writer) = stream.split();
let mut reader = BufReader::new(reader);
let reader = BufReader::new(reader);
} else if #[cfg(feature="rt-tokio")] {
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
let (reader, writer) = stream.into_split();
let reader = BufReader::new(reader);
}
}
@@ -259,105 +344,58 @@ impl ClientApi {
// Futures to process unordered
let mut unord = FuturesUnordered::new();
let (more_futures_tx, more_futures_rx) = flume::unbounded();
// Output to serialize
// Requests and responses are done serially to the socket
// but the requests are processed in parallel by the FuturesUnordered
let (requests_tx, requests_rx) = flume::unbounded();
let (responses_tx, responses_rx) = flume::unbounded();
// Request receive processor
let this = self.clone();
let recv_requests_future = async move {
// Start sending updates
this.inner
.lock()
.update_channels
.insert(conn_tuple, responses_tx.clone());
let mut line = String::new();
while let Ok(size) = reader.read_line(&mut line).await {
// Eof?
if size == 0 {
break;
}
// Put the processing in the async queue
let jrp = jrp.clone();
let line = line.trim().to_owned();
// Ignore newlines
if line.len() == 0 {
continue;
}
let responses_tx = responses_tx.clone();
let this2 = this.clone();
let process_request = async move {
// Unmarshal NDJSON - newline => json
// (trim all whitespace around input lines just to make things more permissive for API users)
let request: json_api::Request = deserialize_json(&line)?;
// See if this is a control message or a veilid-core message
let response = if let json_api::RequestOp::Control { args } = request.op {
// Process control messages
json_api::Response {
id: request.id,
op: json_api::ResponseOp::Control {
result: json_api::to_json_api_result(
this2.process_control(args).await,
),
},
}
} else {
// Process with ndjson api
jrp.clone().process_request(request).await
};
// Marshal json + newline => NDJSON
let response_string =
serialize_json(json_api::RecvMessage::Response(response)) + "\n";
if let Err(e) = responses_tx.send_async(response_string).await {
warn!("response not sent: {}", e)
}
VeilidAPIResult::Ok(())
};
if let Err(e) = more_futures_tx
.send_async(system_boxed(process_request))
.await
{
warn!("request dropped: {}", e)
}
}
// Stop sending updates
// Will cause send_responses_future to stop because we drop the responses_tx
this.inner.lock().update_channels.remove(&conn_tuple);
VeilidAPIResult::Ok(())
};
unord.push(system_boxed(recv_requests_future));
// Request receive processor future
// Receives from socket and enqueues RequestLines
// Completes when the connection is closed or there is a failure
unord.push(system_boxed(self.clone().receive_requests(
conn_tuple,
reader,
requests_tx,
responses_tx,
)));
// Response send processor
let send_responses_future = async move {
while let Ok(resp) = responses_rx.recv_async().await {
if let Err(e) = writer.write_all(resp.as_bytes()).await {
error!("failed to write response: {}", e)
}
}
VeilidAPIResult::Ok(())
};
unord.push(system_boxed(send_responses_future));
// Sends finished response strings out the socket
// Completes when the responses channel is closed
unord.push(system_boxed(
self.clone().send_responses(responses_rx, writer),
));
// Add future to process first request
unord.push(system_boxed(Self::next_request_line(requests_rx.clone())));
// Send and receive until we're done or a stop is requested
while let Ok(Some(r)) = unord.next().timeout_at(stop_token.clone()).await {
match r {
Ok(()) => {}
Err(e) => {
warn!("JSON API Failure: {}", e);
// See if we got some work to do
let request_line = match r {
Ok(Some(request_line)) => {
// Add future to process next request
unord.push(system_boxed(Self::next_request_line(requests_rx.clone())));
// Socket receive future returned something to process
request_line
}
}
// Add more futures if we had one that completed
// Allows processing requests in an async fashion
for fut in more_futures_rx.drain() {
unord.push(fut);
}
Ok(None) => {
// Non-request future finished
continue;
}
Err(e) => {
// Connection processing failure, abort
error!("Connection processing failure: {}", e);
break;
}
};
// Enqueue unordered future to process request line in parallel
unord.push(system_boxed(
self.clone().process_request_line(jrp.clone(), request_line),
));
}
debug!(