veilid/veilid-server/src/client_api.rs

409 lines
13 KiB
Rust
Raw Normal View History

2022-06-28 03:46:29 +00:00
use crate::tools::*;
2021-11-22 16:28:30 +00:00
use crate::veilid_client_capnp::*;
use capnp::capability::Promise;
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};
2022-06-29 14:13:49 +00:00
use cfg_if::*;
2021-11-22 16:28:30 +00:00
use failure::*;
2022-06-29 14:34:23 +00:00
use futures_util::{future::try_join_all, FutureExt as FuturesFutureExt, StreamExt};
2021-11-22 16:28:30 +00:00
use std::cell::RefCell;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::rc::Rc;
2022-06-29 14:13:49 +00:00
use stop_token::future::FutureExt;
use stop_token::*;
2022-06-08 01:31:05 +00:00
use tracing::*;
use veilid_core::*;
2021-11-22 16:28:30 +00:00
#[derive(Fail, Debug)]
#[fail(display = "Client API error: {}", _0)]
pub struct ClientAPIError(String);
// --- interface Registration ---------------------------------
struct RegistrationHandle {
client: veilid_client::Client,
requests_in_flight: i32,
}
struct RegistrationMap {
registrations: HashMap<u64, RegistrationHandle>,
}
impl RegistrationMap {
fn new() -> Self {
Self {
registrations: HashMap::new(),
}
}
}
struct RegistrationImpl {
id: u64,
registration_map: Rc<RefCell<RegistrationMap>>,
}
impl RegistrationImpl {
fn new(id: u64, registrations: Rc<RefCell<RegistrationMap>>) -> Self {
Self {
2021-11-28 02:31:01 +00:00
id,
2021-11-22 16:28:30 +00:00
registration_map: registrations,
}
}
}
impl Drop for RegistrationImpl {
fn drop(&mut self) {
debug!("Registration dropped");
self.registration_map
.borrow_mut()
.registrations
.remove(&self.id);
}
}
impl registration::Server for RegistrationImpl {}
// --- interface VeilidServer ---------------------------------
struct VeilidServerImpl {
veilid_api: veilid_core::VeilidAPI,
next_id: u64,
pub registration_map: Rc<RefCell<RegistrationMap>>,
}
impl VeilidServerImpl {
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip_all)]
2021-11-22 16:28:30 +00:00
pub fn new(veilid_api: veilid_core::VeilidAPI) -> Self {
Self {
next_id: 0,
registration_map: Rc::new(RefCell::new(RegistrationMap::new())),
2021-11-28 02:31:01 +00:00
veilid_api,
2021-11-22 16:28:30 +00:00
}
}
}
impl veilid_server::Server for VeilidServerImpl {
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip_all)]
2021-11-22 16:28:30 +00:00
fn register(
&mut self,
params: veilid_server::RegisterParams,
mut results: veilid_server::RegisterResults,
) -> Promise<(), ::capnp::Error> {
trace!("VeilidServerImpl::register");
2022-01-18 23:28:22 +00:00
2021-11-22 16:28:30 +00:00
self.registration_map.borrow_mut().registrations.insert(
self.next_id,
RegistrationHandle {
client: pry!(pry!(params.get()).get_veilid_client()),
requests_in_flight: 0,
},
);
2022-01-18 23:28:22 +00:00
let veilid_api = self.veilid_api.clone();
let registration = capnp_rpc::new_client(RegistrationImpl::new(
self.next_id,
self.registration_map.clone(),
));
2021-11-22 16:28:30 +00:00
self.next_id += 1;
2022-01-18 23:28:22 +00:00
Promise::from_future(async move {
let state = veilid_api
.get_state()
.await
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?;
2022-06-08 01:31:05 +00:00
let state = serialize_json(state);
2022-01-18 23:28:22 +00:00
let mut res = results.get();
res.set_registration(registration);
2022-06-08 01:31:05 +00:00
let mut rpc_state = res.init_state(
state
.len()
.try_into()
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?,
);
rpc_state.push_str(&state);
2022-01-18 23:28:22 +00:00
Ok(())
})
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip_all)]
fn debug(
&mut self,
params: veilid_server::DebugParams,
mut results: veilid_server::DebugResults,
) -> Promise<(), ::capnp::Error> {
trace!("VeilidServerImpl::debug");
2021-11-22 16:28:30 +00:00
let veilid_api = self.veilid_api.clone();
let what = pry!(pry!(params.get()).get_what()).to_owned();
2021-11-22 16:28:30 +00:00
Promise::from_future(async move {
let output = veilid_api
.debug(what)
2021-12-08 03:09:45 +00:00
.await
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?;
results.get().set_output(output.as_str());
Ok(())
2021-11-22 16:28:30 +00:00
})
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip_all)]
2021-11-22 16:28:30 +00:00
fn attach(
&mut self,
_params: veilid_server::AttachParams,
mut _results: veilid_server::AttachResults,
) -> Promise<(), ::capnp::Error> {
trace!("VeilidServerImpl::attach");
let veilid_api = self.veilid_api.clone();
Promise::from_future(async move {
2021-12-08 03:09:45 +00:00
veilid_api
.attach()
.await
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))
2021-11-22 16:28:30 +00:00
})
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip_all)]
2021-11-22 16:28:30 +00:00
fn detach(
&mut self,
_params: veilid_server::DetachParams,
mut _results: veilid_server::DetachResults,
) -> Promise<(), ::capnp::Error> {
trace!("VeilidServerImpl::detach");
let veilid_api = self.veilid_api.clone();
Promise::from_future(async move {
2021-12-08 03:09:45 +00:00
veilid_api
.detach()
.await
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))
2021-11-22 16:28:30 +00:00
})
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip_all)]
2021-11-22 16:28:30 +00:00
fn shutdown(
&mut self,
_params: veilid_server::ShutdownParams,
mut _results: veilid_server::ShutdownResults,
) -> Promise<(), ::capnp::Error> {
trace!("VeilidServerImpl::shutdown");
cfg_if::cfg_if! {
if #[cfg(windows)] {
assert!(false, "write me!");
}
else {
2022-01-15 23:50:56 +00:00
crate::server::shutdown();
2021-11-22 16:28:30 +00:00
}
}
Promise::ok(())
}
2021-12-11 01:14:33 +00:00
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip_all)]
fn get_state(
2021-12-11 01:14:33 +00:00
&mut self,
_params: veilid_server::GetStateParams,
mut results: veilid_server::GetStateResults,
2021-12-11 01:14:33 +00:00
) -> Promise<(), ::capnp::Error> {
trace!("VeilidServerImpl::get_state");
2021-12-11 01:14:33 +00:00
let veilid_api = self.veilid_api.clone();
Promise::from_future(async move {
let state = veilid_api
.get_state()
2021-12-11 01:14:33 +00:00
.await
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?;
2022-06-08 01:31:05 +00:00
let state = serialize_json(state);
let res = results.get();
let mut rpc_state = res.init_state(
state
.len()
.try_into()
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?,
);
rpc_state.push_str(&state);
2021-12-11 01:14:33 +00:00
Ok(())
})
}
2021-11-22 16:28:30 +00:00
}
// --- Client API Server-Side ---------------------------------
2021-11-29 01:08:50 +00:00
type ClientApiAllFuturesJoinHandle =
2022-06-28 03:46:29 +00:00
JoinHandle<Result<Vec<()>, Box<(dyn std::error::Error + 'static)>>>;
2021-11-29 01:08:50 +00:00
2021-11-22 16:28:30 +00:00
struct ClientApiInner {
veilid_api: veilid_core::VeilidAPI,
registration_map: Rc<RefCell<RegistrationMap>>,
2022-06-29 14:13:49 +00:00
stop: Option<StopSource>,
2021-11-29 01:08:50 +00:00
join_handle: Option<ClientApiAllFuturesJoinHandle>,
2021-11-22 16:28:30 +00:00
}
pub struct ClientApi {
inner: RefCell<ClientApiInner>,
}
impl ClientApi {
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip_all)]
2021-11-22 16:28:30 +00:00
pub fn new(veilid_api: veilid_core::VeilidAPI) -> Rc<Self> {
Rc::new(Self {
inner: RefCell::new(ClientApiInner {
2021-11-28 02:31:01 +00:00
veilid_api,
2021-11-22 16:28:30 +00:00
registration_map: Rc::new(RefCell::new(RegistrationMap::new())),
2022-06-29 14:13:49 +00:00
stop: Some(StopSource::new()),
2021-11-22 16:28:30 +00:00
join_handle: None,
}),
})
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip(self))]
2021-11-22 16:28:30 +00:00
pub async fn stop(self: Rc<Self>) {
trace!("ClientApi::stop requested");
let jh = {
let mut inner = self.inner.borrow_mut();
if inner.join_handle.is_none() {
trace!("ClientApi stop ignored");
return;
}
2022-06-29 14:13:49 +00:00
drop(inner.stop.take());
2021-11-22 16:28:30 +00:00
inner.join_handle.take().unwrap()
};
trace!("ClientApi::stop: waiting for stop");
2021-12-14 14:48:33 +00:00
if let Err(err) = jh.await {
error!("{}", err);
}
2021-11-22 16:28:30 +00:00
trace!("ClientApi::stop: stopped");
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip(self, client), err)]
2021-11-22 16:28:30 +00:00
async fn handle_incoming(
self: Rc<Self>,
bind_addr: SocketAddr,
client: veilid_server::Client,
) -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind(bind_addr).await?;
debug!("Client API listening on: {:?}", bind_addr);
2022-06-28 03:46:29 +00:00
// Process the incoming accept stream
2022-06-29 14:13:49 +00:00
cfg_if! {
if #[cfg(feature="rt-async-std")] {
let mut incoming_stream = listener.incoming();
} else if #[cfg(feature="rt-tokio")] {
let mut incoming_stream = tokio_stream::wrappers::TcpListenerStream::new(listener);
}
}
let stop_token = self.inner.borrow().stop.as_ref().unwrap().token();
2021-11-22 16:28:30 +00:00
let incoming_loop = async move {
2022-06-29 14:13:49 +00:00
while let Ok(Some(stream_result)) =
incoming_stream.next().timeout_at(stop_token.clone()).await
{
2021-11-22 16:28:30 +00:00
let stream = stream_result?;
stream.set_nodelay(true)?;
2022-06-29 14:13:49 +00:00
cfg_if! {
if #[cfg(feature="rt-async-std")] {
2022-06-29 14:34:23 +00:00
use futures_util::AsyncReadExt;
2022-06-29 14:13:49 +00:00
let (reader, writer) = stream.split();
} else if #[cfg(feature="rt-tokio")] {
use tokio_util::compat::*;
let (reader, writer) = stream.into_split();
let reader = reader.compat();
let writer = writer.compat_write();
}
}
2021-11-22 16:28:30 +00:00
let network = twoparty::VatNetwork::new(
reader,
writer,
rpc_twoparty_capnp::Side::Server,
Default::default(),
);
let rpc_system = RpcSystem::new(Box::new(network), Some(client.clone().client));
2022-06-28 03:46:29 +00:00
spawn_local(rpc_system.map(drop));
2021-11-22 16:28:30 +00:00
}
Ok::<(), Box<dyn std::error::Error>>(())
};
incoming_loop.await
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip_all)]
2021-12-11 01:14:33 +00:00
fn send_request_to_all_clients<F, T>(self: Rc<Self>, request: F)
where
2022-06-08 01:31:05 +00:00
F: Fn(u64, &mut RegistrationHandle) -> Option<::capnp::capability::RemotePromise<T>>,
2021-12-11 01:14:33 +00:00
T: capnp::traits::Pipelined + for<'a> capnp::traits::Owned<'a> + 'static + Unpin,
{
2021-11-22 16:28:30 +00:00
// Send status update to each registered client
let registration_map = self.inner.borrow().registration_map.clone();
let registration_map1 = registration_map.clone();
let regs = &mut registration_map.borrow_mut().registrations;
for (&id, mut registration) in regs.iter_mut() {
if registration.requests_in_flight > 5 {
2021-12-11 03:04:38 +00:00
println!(
2021-12-11 01:14:33 +00:00
"too many requests in flight: {}",
2021-11-22 16:28:30 +00:00
registration.requests_in_flight
);
}
registration.requests_in_flight += 1;
2021-12-11 01:14:33 +00:00
2022-06-08 01:31:05 +00:00
if let Some(request_promise) = request(id, registration) {
let registration_map2 = registration_map1.clone();
2022-06-28 03:46:29 +00:00
spawn_local(request_promise.promise.map(move |r| match r {
2022-06-08 01:31:05 +00:00
Ok(_) => {
if let Some(ref mut s) =
registration_map2.borrow_mut().registrations.get_mut(&id)
{
s.requests_in_flight -= 1;
}
2021-11-29 01:08:50 +00:00
}
2022-06-08 01:31:05 +00:00
Err(e) => {
println!("Got error: {:?}. Dropping registation.", e);
registration_map2.borrow_mut().registrations.remove(&id);
}
}));
}
2021-11-22 16:28:30 +00:00
}
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip(self))]
pub fn handle_update(self: Rc<Self>, veilid_update: veilid_core::VeilidUpdate) {
2022-06-08 01:31:05 +00:00
// serialize update
let veilid_update = serialize_json(veilid_update);
2021-12-11 01:14:33 +00:00
2022-06-08 01:31:05 +00:00
// Pass other updates to clients
2021-12-11 01:14:33 +00:00
self.send_request_to_all_clients(|_id, registration| {
2022-06-08 01:31:05 +00:00
match veilid_update
.len()
.try_into()
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))
{
Ok(len) => {
let mut request = registration.client.update_request();
let mut rpc_veilid_update = request.get().init_veilid_update(len);
rpc_veilid_update.push_str(&veilid_update);
Some(request.send())
}
Err(_) => None,
}
2021-12-11 01:14:33 +00:00
});
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip(self))]
2021-11-22 16:28:30 +00:00
pub fn run(self: Rc<Self>, bind_addrs: Vec<SocketAddr>) {
// Create client api VeilidServer
let veilid_server_impl = VeilidServerImpl::new(self.inner.borrow().veilid_api.clone());
self.inner.borrow_mut().registration_map = veilid_server_impl.registration_map.clone();
// Make a client object for the server to send to each rpc client
let client: veilid_server::Client = capnp_rpc::new_client(veilid_server_impl);
let bind_futures = bind_addrs
.iter()
2021-11-28 02:31:01 +00:00
.map(|addr| self.clone().handle_incoming(*addr, client.clone()));
2022-06-29 14:34:23 +00:00
let bind_futures_join = try_join_all(bind_futures);
2022-06-28 03:46:29 +00:00
self.inner.borrow_mut().join_handle = Some(spawn_local(bind_futures_join));
2021-11-22 16:28:30 +00:00
}
}