veilid/veilid-server/src/client_api.rs

432 lines
15 KiB
Rust
Raw Normal View History

2022-11-16 17:49:53 +00:00
use crate::settings::*;
2022-06-28 03:46:29 +00:00
use crate::tools::*;
2022-07-01 16:13:52 +00:00
use crate::veilid_logs::VeilidLogs;
2022-06-29 14:13:49 +00:00
use cfg_if::*;
2023-06-07 21:39:10 +00:00
use futures_util::{future::try_join_all, stream::FuturesUnordered, StreamExt};
use parking_lot::Mutex;
2021-11-22 16:28:30 +00:00
use std::collections::HashMap;
use std::net::SocketAddr;
2023-06-07 21:39:10 +00:00
use std::sync::Arc;
2023-06-09 23:08:49 +00:00
use stop_token::future::FutureExt as _;
2022-06-29 14:13:49 +00:00
use stop_token::*;
2022-06-08 01:31:05 +00:00
use tracing::*;
2023-06-09 23:08:49 +00:00
use veilid_core::json_api::JsonRequestProcessor;
2023-06-07 21:39:10 +00:00
use veilid_core::tools::*;
2022-06-08 01:31:05 +00:00
use veilid_core::*;
2023-06-07 21:39:10 +00:00
use wg::AsyncWaitGroup;
2021-11-22 16:28:30 +00:00
2023-06-07 21:39:10 +00:00
cfg_if! {
if #[cfg(feature="rt-async-std")] {
use async_std::io::prelude::BufReadExt;
use async_std::io::WriteExt;
} else if #[cfg(feature="rt-tokio")] {
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
}
}
2021-11-22 16:28:30 +00:00
// --- Client API Server-Side ---------------------------------
2023-06-07 21:39:10 +00:00
type ClientApiAllFuturesJoinHandle = MustJoinHandle<std::io::Result<Vec<()>>>;
2021-11-29 01:08:50 +00:00
2023-06-09 23:08:49 +00:00
struct RequestLine {
// Request to process
line: String,
// Where to send the response
responses_tx: flume::Sender<String>,
}
2021-11-22 16:28:30 +00:00
struct ClientApiInner {
veilid_api: veilid_core::VeilidAPI,
2022-07-01 16:13:52 +00:00
veilid_logs: VeilidLogs,
2022-11-16 17:49:53 +00:00
settings: Settings,
2022-06-29 14:13:49 +00:00
stop: Option<StopSource>,
2021-11-29 01:08:50 +00:00
join_handle: Option<ClientApiAllFuturesJoinHandle>,
2023-06-07 21:39:10 +00:00
update_channels: HashMap<(SocketAddr, SocketAddr), flume::Sender<String>>,
2021-11-22 16:28:30 +00:00
}
2023-06-07 21:39:10 +00:00
#[derive(Clone)]
2021-11-22 16:28:30 +00:00
pub struct ClientApi {
2023-06-06 23:09:29 +00:00
inner: Arc<Mutex<ClientApiInner>>,
2021-11-22 16:28:30 +00:00
}
impl ClientApi {
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip_all)]
2022-11-16 17:49:53 +00:00
pub fn new(
veilid_api: veilid_core::VeilidAPI,
veilid_logs: VeilidLogs,
settings: Settings,
2023-06-07 21:39:10 +00:00
) -> Self {
Self {
inner: Arc::new(Mutex::new(ClientApiInner {
2021-11-28 02:31:01 +00:00
veilid_api,
2022-07-01 16:13:52 +00:00
veilid_logs,
2022-11-16 17:49:53 +00:00
settings,
2022-06-29 14:13:49 +00:00
stop: Some(StopSource::new()),
2021-11-22 16:28:30 +00:00
join_handle: None,
2023-06-07 21:39:10 +00:00
update_channels: HashMap::new(),
})),
}
}
#[instrument(level = "trace", skip_all)]
fn shutdown(&self) {
trace!("ClientApi::shutdown");
crate::server::shutdown();
}
#[instrument(level = "trace", skip_all)]
fn change_log_level(
&self,
layer: String,
log_level: VeilidConfigLogLevel,
) -> VeilidAPIResult<()> {
trace!("ClientApi::change_log_level");
let veilid_logs = self.inner.lock().veilid_logs.clone();
veilid_logs.change_log_level(layer, log_level)
2021-11-22 16:28:30 +00:00
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip(self))]
2023-06-07 21:39:10 +00:00
pub async fn stop(&self) {
2021-11-22 16:28:30 +00:00
trace!("ClientApi::stop requested");
let jh = {
2023-06-07 21:39:10 +00:00
let mut inner = self.inner.lock();
2021-11-22 16:28:30 +00:00
if inner.join_handle.is_none() {
trace!("ClientApi stop ignored");
return;
}
2022-06-29 14:13:49 +00:00
drop(inner.stop.take());
2021-11-22 16:28:30 +00:00
inner.join_handle.take().unwrap()
};
trace!("ClientApi::stop: waiting for stop");
2021-12-14 14:48:33 +00:00
if let Err(err) = jh.await {
2023-06-15 01:06:10 +00:00
eprintln!("{}", err);
2021-12-14 14:48:33 +00:00
}
2021-11-22 16:28:30 +00:00
trace!("ClientApi::stop: stopped");
}
2023-06-06 23:09:29 +00:00
#[instrument(level = "trace", skip(self), err)]
2023-06-07 21:39:10 +00:00
async fn handle_incoming(self, bind_addr: SocketAddr) -> std::io::Result<()> {
2021-11-22 16:28:30 +00:00
let listener = TcpListener::bind(bind_addr).await?;
debug!("Client API listening on: {:?}", bind_addr);
2022-06-28 03:46:29 +00:00
// Process the incoming accept stream
2022-06-29 14:13:49 +00:00
cfg_if! {
if #[cfg(feature="rt-async-std")] {
let mut incoming_stream = listener.incoming();
} else if #[cfg(feature="rt-tokio")] {
let mut incoming_stream = tokio_stream::wrappers::TcpListenerStream::new(listener);
}
}
2023-06-07 21:39:10 +00:00
// Make wait group for all incoming connections
let awg = AsyncWaitGroup::new();
let stop_token = self.inner.lock().stop.as_ref().unwrap().token();
while let Ok(Some(stream_result)) =
incoming_stream.next().timeout_at(stop_token.clone()).await
{
// Get the stream to process
let stream = stream_result?;
stream.set_nodelay(true)?;
// Increment wait group
awg.add(1);
let t_awg = awg.clone();
// Process the connection
spawn(self.clone().handle_connection(stream, t_awg)).detach();
}
// Wait for all connections to terminate
awg.wait().await;
Ok(())
}
// Process control messages for the server
async fn process_control(self, args: Vec<String>) -> VeilidAPIResult<String> {
if args.len() == 0 {
apibail_generic!("no control request specified");
}
2023-06-08 18:07:09 +00:00
if args[0] == "Shutdown" {
2023-06-07 21:39:10 +00:00
if args.len() != 1 {
apibail_generic!("wrong number of arguments");
}
self.shutdown();
Ok("".to_owned())
2023-06-08 18:07:09 +00:00
} else if args[0] == "ChangeLogLevel" {
2023-06-07 21:39:10 +00:00
if args.len() != 3 {
apibail_generic!("wrong number of arguments");
}
2023-06-09 23:08:49 +00:00
let log_level = VeilidConfigLogLevel::from_str(&args[2])?;
2023-06-07 21:39:10 +00:00
self.change_log_level(args[1].clone(), log_level)?;
Ok("".to_owned())
2023-06-08 18:07:09 +00:00
} else if args[0] == "GetServerSettings" {
2023-06-07 21:39:10 +00:00
if args.len() != 1 {
apibail_generic!("wrong number of arguments");
}
let settings = self.inner.lock().settings.clone();
let settings = &*settings.read();
let settings_json_string = serialize_json(settings);
let mut settings_json =
json::parse(&settings_json_string).map_err(VeilidAPIError::internal)?;
settings_json["core"]["network"].remove("node_id_secret");
settings_json["core"]["protected_store"].remove("device_encryption_key_password");
settings_json["core"]["protected_store"].remove("new_device_encryption_key_password");
let safe_settings_json = settings_json.to_string();
Ok(safe_settings_json)
2023-06-08 18:07:09 +00:00
} else if args[0] == "EmitSchema" {
2023-06-07 21:39:10 +00:00
if args.len() != 2 {
apibail_generic!("wrong number of arguments");
}
let mut schemas = HashMap::<String, String>::new();
veilid_core::json_api::emit_schemas(&mut schemas);
let Some(schema) = schemas.get(&args[1]) else {
apibail_invalid_argument!("invalid schema", "schema", args[1].clone());
};
Ok(schema.clone())
} else {
apibail_generic!("unknown control message");
}
}
2023-06-09 23:08:49 +00:00
async fn process_request_line(
self,
jrp: JsonRequestProcessor,
request_line: RequestLine,
) -> VeilidAPIResult<Option<RequestLine>> {
let line = request_line.line;
let responses_tx = request_line.responses_tx;
// Unmarshal NDJSON - newline => json
// (trim all whitespace around input lines just to make things more permissive for API users)
let request: json_api::Request = deserialize_json(&line)?;
// See if this is a control message or a veilid-core message
let response = if let json_api::RequestOp::Control { args } = request.op {
// Process control messages
json_api::Response {
id: request.id,
op: json_api::ResponseOp::Control {
result: json_api::to_json_api_result(self.process_control(args).await),
},
}
} else {
// Process with ndjson api
jrp.clone().process_request(request).await
};
// Marshal json + newline => NDJSON
let response_string = serialize_json(json_api::RecvMessage::Response(response)) + "\n";
if let Err(e) = responses_tx.send_async(response_string).await {
2023-06-15 01:06:10 +00:00
eprintln!("response not sent: {}", e)
2023-06-09 23:08:49 +00:00
}
VeilidAPIResult::Ok(None)
}
async fn next_request_line(
requests_rx: flume::Receiver<Option<RequestLine>>,
) -> VeilidAPIResult<Option<RequestLine>> {
Ok(requests_rx.recv_async().await.ok().flatten())
}
async fn receive_requests<R: AsyncBufReadExt + Unpin>(
self,
conn_tuple: (SocketAddr, SocketAddr),
mut reader: R,
requests_tx: flume::Sender<Option<RequestLine>>,
responses_tx: flume::Sender<String>,
) -> VeilidAPIResult<Option<RequestLine>> {
// responses_tx becomes owned by recv_requests_future
// Start sending updates
self.inner
.lock()
.update_channels
.insert(conn_tuple, responses_tx.clone());
let mut linebuf = String::new();
while let Ok(size) = reader.read_line(&mut linebuf).await {
// Eof?
if size == 0 {
break;
}
// Put the processing in the async queue
let line = linebuf.trim().to_owned();
linebuf.clear();
// Ignore newlines
if line.len() == 0 {
continue;
}
// Enqueue the line for processing in parallel
let request_line = RequestLine {
line,
responses_tx: responses_tx.clone(),
};
if let Err(e) = requests_tx.send_async(Some(request_line)).await {
2023-06-15 01:06:10 +00:00
eprintln!("failed to enqueue request: {}", e);
2023-06-09 23:08:49 +00:00
break;
}
}
// Stop sending updates
// Will cause send_responses_future to stop because we drop the responses_tx
self.inner.lock().update_channels.remove(&conn_tuple);
VeilidAPIResult::Ok(None)
}
async fn send_responses<W: AsyncWriteExt + Unpin>(
self,
responses_rx: flume::Receiver<String>,
mut writer: W,
) -> VeilidAPIResult<Option<RequestLine>> {
while let Ok(resp) = responses_rx.recv_async().await {
if let Err(e) = writer.write_all(resp.as_bytes()).await {
2023-06-15 01:06:10 +00:00
eprintln!("failed to write response: {}", e)
2023-06-09 23:08:49 +00:00
}
}
VeilidAPIResult::Ok(None)
}
2023-06-07 21:39:10 +00:00
pub async fn handle_connection(self, stream: TcpStream, awg: AsyncWaitGroup) {
// Get address of peer
let peer_addr = match stream.peer_addr() {
Ok(v) => v,
Err(e) => {
2023-06-15 01:06:10 +00:00
eprintln!("can't get peer address: {}", e);
2023-06-07 21:39:10 +00:00
return;
}
};
// Get local address
let local_addr = match stream.local_addr() {
Ok(v) => v,
Err(e) => {
2023-06-15 01:06:10 +00:00
eprintln!("can't get local address: {}", e);
2023-06-07 21:39:10 +00:00
return;
}
};
// Get connection tuple
let conn_tuple = (local_addr, peer_addr);
debug!(
"Accepted Client API Connection: {:?} -> {:?}",
peer_addr, local_addr
);
// Make stop token to quit when stop() is requested externally
2023-06-06 23:09:29 +00:00
let stop_token = self.inner.lock().stop.as_ref().unwrap().token();
2023-06-07 21:39:10 +00:00
// Split into reader and writer halves
// with line buffering on the reader
cfg_if! {
if #[cfg(feature="rt-async-std")] {
use futures_util::AsyncReadExt;
let (reader, mut writer) = stream.split();
2023-06-09 23:08:49 +00:00
let reader = BufReader::new(reader);
2023-06-07 21:39:10 +00:00
} else if #[cfg(feature="rt-tokio")] {
2023-06-09 23:08:49 +00:00
let (reader, writer) = stream.into_split();
let reader = BufReader::new(reader);
2023-06-07 21:39:10 +00:00
}
}
// Make request processor for this connection
let api = self.inner.lock().veilid_api.clone();
let jrp = json_api::JsonRequestProcessor::new(api);
// Futures to process unordered
let mut unord = FuturesUnordered::new();
2023-06-09 23:08:49 +00:00
// Requests and responses are done serially to the socket
// but the requests are processed in parallel by the FuturesUnordered
let (requests_tx, requests_rx) = flume::unbounded();
2023-06-07 21:39:10 +00:00
let (responses_tx, responses_rx) = flume::unbounded();
2023-06-09 23:08:49 +00:00
// Request receive processor future
// Receives from socket and enqueues RequestLines
// Completes when the connection is closed or there is a failure
unord.push(system_boxed(self.clone().receive_requests(
conn_tuple,
reader,
requests_tx,
responses_tx,
)));
2021-11-22 16:28:30 +00:00
2023-06-07 21:39:10 +00:00
// Response send processor
2023-06-09 23:08:49 +00:00
// Sends finished response strings out the socket
// Completes when the responses channel is closed
unord.push(system_boxed(
self.clone().send_responses(responses_rx, writer),
));
// Add future to process first request
unord.push(system_boxed(Self::next_request_line(requests_rx.clone())));
2021-11-22 16:28:30 +00:00
2023-06-07 21:39:10 +00:00
// Send and receive until we're done or a stop is requested
while let Ok(Some(r)) = unord.next().timeout_at(stop_token.clone()).await {
2023-06-09 23:08:49 +00:00
// See if we got some work to do
let request_line = match r {
Ok(Some(request_line)) => {
// Add future to process next request
unord.push(system_boxed(Self::next_request_line(requests_rx.clone())));
// Socket receive future returned something to process
request_line
}
Ok(None) => {
// Non-request future finished
continue;
}
2023-06-07 21:39:10 +00:00
Err(e) => {
2023-06-09 23:08:49 +00:00
// Connection processing failure, abort
2023-06-15 01:06:10 +00:00
eprintln!("Connection processing failure: {}", e);
2023-06-09 23:08:49 +00:00
break;
2023-06-07 21:39:10 +00:00
}
2023-06-09 23:08:49 +00:00
};
// Enqueue unordered future to process request line in parallel
unord.push(system_boxed(
self.clone().process_request_line(jrp.clone(), request_line),
));
2023-06-07 21:39:10 +00:00
}
debug!(
"Closed Client API Connection: {:?} -> {:?}",
peer_addr, local_addr
);
awg.done();
2021-11-22 16:28:30 +00:00
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip(self))]
2023-06-07 21:39:10 +00:00
pub fn handle_update(&self, veilid_update: veilid_core::VeilidUpdate) {
// serialize update to NDJSON
let veilid_update = serialize_json(json_api::RecvMessage::Update(veilid_update)) + "\n";
2021-12-11 01:14:33 +00:00
2022-06-08 01:31:05 +00:00
// Pass other updates to clients
2023-06-07 21:39:10 +00:00
let inner = self.inner.lock();
for ch in inner.update_channels.values() {
if let Err(e) = ch.send(veilid_update.clone()) {
eprintln!("failed to send update: {}", e);
2022-06-08 01:31:05 +00:00
}
2023-06-07 21:39:10 +00:00
}
2021-12-11 01:14:33 +00:00
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip(self))]
2023-06-06 23:09:29 +00:00
pub fn run(&self, bind_addrs: Vec<SocketAddr>) {
2021-11-22 16:28:30 +00:00
let bind_futures = bind_addrs
.iter()
2023-06-06 23:09:29 +00:00
.map(|addr| self.clone().handle_incoming(*addr));
2022-06-29 14:34:23 +00:00
let bind_futures_join = try_join_all(bind_futures);
2023-06-07 21:39:10 +00:00
self.inner.lock().join_handle = Some(spawn(bind_futures_join));
2021-11-22 16:28:30 +00:00
}
}