checkpoint

This commit is contained in:
John Smith 2023-06-06 19:09:29 -04:00
parent 771a7c9e7e
commit 88db69c28f
6 changed files with 63 additions and 364 deletions

3
Cargo.lock generated
View File

@ -6329,9 +6329,6 @@ dependencies = [
"async-tungstenite 0.22.2", "async-tungstenite 0.22.2",
"backtrace", "backtrace",
"bugsalot", "bugsalot",
"capnp",
"capnp-rpc",
"capnpc",
"cfg-if 1.0.0", "cfg-if 1.0.0",
"clap 3.2.25", "clap 3.2.25",
"color-eyre", "color-eyre",

View File

@ -3,7 +3,6 @@ name = "veilid-server"
version = "0.1.0" version = "0.1.0"
authors = ["John Smith <jsmith@example.com>"] authors = ["John Smith <jsmith@example.com>"]
edition = "2021" edition = "2021"
build = "build.rs"
license = "LGPL-2.0-or-later OR MPL-2.0 OR (MIT AND BSD-3-Clause)" license = "LGPL-2.0-or-later OR MPL-2.0 OR (MIT AND BSD-3-Clause)"
[[bin]] [[bin]]
@ -39,9 +38,7 @@ color-eyre = { version = "^0", default-features = false }
backtrace = "^0" backtrace = "^0"
clap = "^3" clap = "^3"
directories = "^4" directories = "^4"
capnp = "^0"
parking_lot = "^0" parking_lot = "^0"
capnp-rpc = "^0"
config = { version = "^0", features = ["yaml"] } config = { version = "^0", features = ["yaml"] }
cfg-if = "^1" cfg-if = "^1"
serde = "^1" serde = "^1"
@ -74,6 +71,3 @@ tracing-journald = "^0"
[dev-dependencies] [dev-dependencies]
serial_test = "^0" serial_test = "^0"
[build-dependencies]
capnpc = "^0"

View File

@ -1,6 +0,0 @@
fn main() {
::capnpc::CompilerCommand::new()
.file("proto/veilid-client.capnp")
.run()
.expect("compiling schema");
}

View File

@ -1,25 +0,0 @@
@0xd29582d26b2fb073;
struct ApiResult @0x8111724bdb812929 {
union {
ok @0 :Text;
err @1 :Text;
}
}
interface Registration @0xdd45f30a7c22e391 {}
interface VeilidServer @0xcb2c699f14537f94 {
register @0 (veilidClient :VeilidClient) -> (registration :Registration, state :Text, settings :Text);
debug @1 (command :Text) -> (result :ApiResult);
attach @2 () -> (result :ApiResult);
detach @3 () -> (result :ApiResult);
shutdown @4 ();
getState @5 () -> (result :ApiResult);
changeLogLevel @6 (layer :Text, logLevel :Text) -> (result :ApiResult);
appCallReply @7 (id :UInt64, message :Data) -> (result :ApiResult);
}
interface VeilidClient @0xbfcea60fb2ba4736 {
update @0 (veilidUpdate :Text);
}

View File

@ -2,8 +2,6 @@ use crate::settings::*;
use crate::tools::*; use crate::tools::*;
use crate::veilid_client_capnp::*; use crate::veilid_client_capnp::*;
use crate::veilid_logs::VeilidLogs; use crate::veilid_logs::VeilidLogs;
use capnp::capability::Promise;
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};
use cfg_if::*; use cfg_if::*;
use futures_util::{future::try_join_all, FutureExt as FuturesFutureExt, StreamExt}; use futures_util::{future::try_join_all, FutureExt as FuturesFutureExt, StreamExt};
use serde::*; use serde::*;
@ -11,267 +9,72 @@ use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::rc::Rc;
use stop_token::future::FutureExt; use stop_token::future::FutureExt;
use stop_token::*; use stop_token::*;
use tracing::*; use tracing::*;
use veilid_core::*; use veilid_core::*;
// Encoding for ApiResult // struct VeilidServerImpl {
fn encode_api_result<T: Serialize + fmt::Debug>( // veilid_api: veilid_core::VeilidAPI,
result: &VeilidAPIResult<T>, // veilid_logs: VeilidLogs,
builder: &mut api_result::Builder, // settings: Settings,
) { // next_id: u64,
match result { // }
Ok(v) => {
builder.set_ok(&serialize_json(v));
}
Err(e) => {
builder.set_err(&serialize_json(e));
}
}
}
// --- interface Registration --------------------------------- // impl VeilidServerImpl {
// #[instrument(level = "trace", skip_all)]
// pub fn new(
// veilid_api: veilid_core::VeilidAPI,
// veilid_logs: VeilidLogs,
// settings: Settings,
// ) -> Self {
// Self {
// next_id: 0,
// veilid_api,
// veilid_logs,
// settings,
// }
// }
struct RegistrationHandle { // #[instrument(level = "trace", skip_all)]
client: veilid_client::Client, // fn shutdown(
requests_in_flight: i32, // &mut self,
} // _params: veilid_server::ShutdownParams,
// mut _results: veilid_server::ShutdownResults,
// ) -> Promise<(), ::capnp::Error> {
// trace!("VeilidServerImpl::shutdown");
struct RegistrationMap { // cfg_if::cfg_if! {
registrations: HashMap<u64, RegistrationHandle>, // if #[cfg(windows)] {
} // assert!(false, "write me!");
// }
// else {
// crate::server::shutdown();
// }
// }
impl RegistrationMap { // Promise::ok(())
fn new() -> Self { // }
Self {
registrations: HashMap::new(),
}
}
}
struct RegistrationImpl { // #[instrument(level = "trace", skip_all)]
id: u64, // fn change_log_level(
registration_map: Rc<RefCell<RegistrationMap>>, // &mut self,
} // params: veilid_server::ChangeLogLevelParams,
// mut results: veilid_server::ChangeLogLevelResults,
// ) -> Promise<(), ::capnp::Error> {
// trace!("VeilidServerImpl::change_log_level");
impl RegistrationImpl { // let layer = pry!(pry!(params.get()).get_layer()).to_owned();
fn new(id: u64, registrations: Rc<RefCell<RegistrationMap>>) -> Self { // let log_level_json = pry!(pry!(params.get()).get_log_level()).to_owned();
Self { // let log_level: veilid_core::VeilidConfigLogLevel =
id, // pry!(veilid_core::deserialize_json(&log_level_json)
registration_map: registrations, // .map_err(|e| ::capnp::Error::failed(format!("{:?}", e))));
}
}
}
impl Drop for RegistrationImpl { // let result = self.veilid_logs.change_log_level(layer, log_level);
fn drop(&mut self) { // encode_api_result(&result, &mut results.get().init_result());
debug!("Registration dropped"); // Promise::ok(())
self.registration_map // }
.borrow_mut() // }
.registrations
.remove(&self.id);
}
}
impl registration::Server for RegistrationImpl {}
// --- interface VeilidServer ---------------------------------
struct VeilidServerImpl {
veilid_api: veilid_core::VeilidAPI,
veilid_logs: VeilidLogs,
settings: Settings,
next_id: u64,
pub registration_map: Rc<RefCell<RegistrationMap>>,
}
impl VeilidServerImpl {
#[instrument(level = "trace", skip_all)]
pub fn new(
veilid_api: veilid_core::VeilidAPI,
veilid_logs: VeilidLogs,
settings: Settings,
) -> Self {
Self {
next_id: 0,
registration_map: Rc::new(RefCell::new(RegistrationMap::new())),
veilid_api,
veilid_logs,
settings,
}
}
}
impl veilid_server::Server for VeilidServerImpl {
#[instrument(level = "trace", skip_all)]
fn register(
&mut self,
params: veilid_server::RegisterParams,
mut results: veilid_server::RegisterResults,
) -> Promise<(), ::capnp::Error> {
trace!("VeilidServerImpl::register");
self.registration_map.borrow_mut().registrations.insert(
self.next_id,
RegistrationHandle {
client: pry!(pry!(params.get()).get_veilid_client()),
requests_in_flight: 0,
},
);
let veilid_api = self.veilid_api.clone();
let settings = self.settings.clone();
let registration = capnp_rpc::new_client(RegistrationImpl::new(
self.next_id,
self.registration_map.clone(),
));
self.next_id += 1;
Promise::from_future(async move {
let state = veilid_api
.get_state()
.await
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?;
let state = serialize_json(state);
let mut res = results.get();
res.set_registration(registration);
res.set_state(&state);
let settings = &*settings.read();
let settings_json_string = serialize_json(settings);
let mut settings_json = json::parse(&settings_json_string)
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?;
settings_json["core"]["network"].remove("node_id_secret");
let safe_settings_json = settings_json.to_string();
res.set_settings(&safe_settings_json);
Ok(())
})
}
#[instrument(level = "trace", skip_all)]
fn debug(
&mut self,
params: veilid_server::DebugParams,
mut results: veilid_server::DebugResults,
) -> Promise<(), ::capnp::Error> {
trace!("VeilidServerImpl::debug");
let veilid_api = self.veilid_api.clone();
let command = pry!(pry!(params.get()).get_command()).to_owned();
Promise::from_future(async move {
let result = veilid_api.debug(command).await;
encode_api_result(&result, &mut results.get().init_result());
Ok(())
})
}
#[instrument(level = "trace", skip_all)]
fn attach(
&mut self,
_params: veilid_server::AttachParams,
mut results: veilid_server::AttachResults,
) -> Promise<(), ::capnp::Error> {
trace!("VeilidServerImpl::attach");
let veilid_api = self.veilid_api.clone();
Promise::from_future(async move {
let result = veilid_api.attach().await;
encode_api_result(&result, &mut results.get().init_result());
Ok(())
})
}
#[instrument(level = "trace", skip_all)]
fn detach(
&mut self,
_params: veilid_server::DetachParams,
mut results: veilid_server::DetachResults,
) -> Promise<(), ::capnp::Error> {
trace!("VeilidServerImpl::detach");
let veilid_api = self.veilid_api.clone();
Promise::from_future(async move {
let result = veilid_api.detach().await;
encode_api_result(&result, &mut results.get().init_result());
Ok(())
})
}
#[instrument(level = "trace", skip_all)]
fn shutdown(
&mut self,
_params: veilid_server::ShutdownParams,
mut _results: veilid_server::ShutdownResults,
) -> Promise<(), ::capnp::Error> {
trace!("VeilidServerImpl::shutdown");
cfg_if::cfg_if! {
if #[cfg(windows)] {
assert!(false, "write me!");
}
else {
crate::server::shutdown();
}
}
Promise::ok(())
}
#[instrument(level = "trace", skip_all)]
fn get_state(
&mut self,
_params: veilid_server::GetStateParams,
mut results: veilid_server::GetStateResults,
) -> Promise<(), ::capnp::Error> {
trace!("VeilidServerImpl::get_state");
let veilid_api = self.veilid_api.clone();
Promise::from_future(async move {
let result = veilid_api.get_state().await;
encode_api_result(&result, &mut results.get().init_result());
Ok(())
})
}
#[instrument(level = "trace", skip_all)]
fn change_log_level(
&mut self,
params: veilid_server::ChangeLogLevelParams,
mut results: veilid_server::ChangeLogLevelResults,
) -> Promise<(), ::capnp::Error> {
trace!("VeilidServerImpl::change_log_level");
let layer = pry!(pry!(params.get()).get_layer()).to_owned();
let log_level_json = pry!(pry!(params.get()).get_log_level()).to_owned();
let log_level: veilid_core::VeilidConfigLogLevel =
pry!(veilid_core::deserialize_json(&log_level_json)
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e))));
let result = self.veilid_logs.change_log_level(layer, log_level);
encode_api_result(&result, &mut results.get().init_result());
Promise::ok(())
}
#[instrument(level = "trace", skip_all)]
fn app_call_reply(
&mut self,
params: veilid_server::AppCallReplyParams,
mut results: veilid_server::AppCallReplyResults,
) -> Promise<(), ::capnp::Error> {
trace!("VeilidServerImpl::app_call_reply");
let id = OperationId::new(pry!(params.get()).get_id());
let message = pry!(pry!(params.get()).get_message()).to_owned();
let veilid_api = self.veilid_api.clone();
Promise::from_future(async move {
let result = veilid_api.app_call_reply(id, message).await;
encode_api_result(&result, &mut results.get().init_result());
Ok(())
})
}
}
// --- Client API Server-Side --------------------------------- // --- Client API Server-Side ---------------------------------
@ -282,13 +85,12 @@ struct ClientApiInner {
veilid_api: veilid_core::VeilidAPI, veilid_api: veilid_core::VeilidAPI,
veilid_logs: VeilidLogs, veilid_logs: VeilidLogs,
settings: Settings, settings: Settings,
registration_map: Rc<RefCell<RegistrationMap>>,
stop: Option<StopSource>, stop: Option<StopSource>,
join_handle: Option<ClientApiAllFuturesJoinHandle>, join_handle: Option<ClientApiAllFuturesJoinHandle>,
} }
pub struct ClientApi { pub struct ClientApi {
inner: RefCell<ClientApiInner>, inner: Arc<Mutex<ClientApiInner>>,
} }
impl ClientApi { impl ClientApi {
@ -303,7 +105,6 @@ impl ClientApi {
veilid_api, veilid_api,
veilid_logs, veilid_logs,
settings, settings,
registration_map: Rc::new(RefCell::new(RegistrationMap::new())),
stop: Some(StopSource::new()), stop: Some(StopSource::new()),
join_handle: None, join_handle: None,
}), }),
@ -329,11 +130,10 @@ impl ClientApi {
trace!("ClientApi::stop: stopped"); trace!("ClientApi::stop: stopped");
} }
#[instrument(level = "trace", skip(self, client), err)] #[instrument(level = "trace", skip(self), err)]
async fn handle_incoming( async fn handle_incoming(
self: Rc<Self>, self,
bind_addr: SocketAddr, bind_addr: SocketAddr,
client: veilid_server::Client,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind(bind_addr).await?; let listener = TcpListener::bind(bind_addr).await?;
debug!("Client API listening on: {:?}", bind_addr); debug!("Client API listening on: {:?}", bind_addr);
@ -347,7 +147,7 @@ impl ClientApi {
} }
} }
let stop_token = self.inner.borrow().stop.as_ref().unwrap().token(); let stop_token = self.inner.lock().stop.as_ref().unwrap().token();
let incoming_loop = async move { let incoming_loop = async move {
while let Ok(Some(stream_result)) = while let Ok(Some(stream_result)) =
incoming_stream.next().timeout_at(stop_token.clone()).await incoming_stream.next().timeout_at(stop_token.clone()).await
@ -365,15 +165,8 @@ impl ClientApi {
let writer = writer.compat_write(); let writer = writer.compat_write();
} }
} }
let network = twoparty::VatNetwork::new(
reader,
writer,
rpc_twoparty_capnp::Side::Server,
Default::default(),
);
let rpc_system = RpcSystem::new(Box::new(network), Some(client.clone().client));
xxx spawn json_api handler
spawn_local(rpc_system.map(drop)); spawn_local(rpc_system.map(drop));
} }
Ok::<(), Box<dyn std::error::Error>>(()) Ok::<(), Box<dyn std::error::Error>>(())
@ -382,44 +175,6 @@ impl ClientApi {
incoming_loop.await incoming_loop.await
} }
#[instrument(level = "trace", skip_all)]
fn send_request_to_all_clients<F, T>(self: Rc<Self>, request: F)
where
F: Fn(u64, &mut RegistrationHandle) -> Option<::capnp::capability::RemotePromise<T>>,
T: capnp::traits::Pipelined + capnp::traits::Owned + 'static + Unpin,
{
// Send status update to each registered client
let registration_map = self.inner.borrow().registration_map.clone();
let registration_map1 = registration_map.clone();
let regs = &mut registration_map.borrow_mut().registrations;
for (&id, mut registration) in regs.iter_mut() {
if registration.requests_in_flight >= 256 {
println!(
"too many requests in flight: {}",
registration.requests_in_flight
);
}
registration.requests_in_flight += 1;
if let Some(request_promise) = request(id, registration) {
let registration_map2 = registration_map1.clone();
spawn_local(request_promise.promise.map(move |r| match r {
Ok(_) => {
if let Some(ref mut s) =
registration_map2.borrow_mut().registrations.get_mut(&id)
{
s.requests_in_flight -= 1;
}
}
Err(e) => {
println!("Got error: {:?}. Dropping registation.", e);
registration_map2.borrow_mut().registrations.remove(&id);
}
}));
}
}
}
#[instrument(level = "trace", skip(self))] #[instrument(level = "trace", skip(self))]
pub fn handle_update(self: Rc<Self>, veilid_update: veilid_core::VeilidUpdate) { pub fn handle_update(self: Rc<Self>, veilid_update: veilid_core::VeilidUpdate) {
// serialize update // serialize update
@ -444,21 +199,10 @@ impl ClientApi {
} }
#[instrument(level = "trace", skip(self))] #[instrument(level = "trace", skip(self))]
pub fn run(self: Rc<Self>, bind_addrs: Vec<SocketAddr>) { pub fn run(&self, bind_addrs: Vec<SocketAddr>) {
// Create client api VeilidServer
let veilid_server_impl = VeilidServerImpl::new(
self.inner.borrow().veilid_api.clone(),
self.inner.borrow().veilid_logs.clone(),
self.inner.borrow().settings.clone(),
);
self.inner.borrow_mut().registration_map = veilid_server_impl.registration_map.clone();
// Make a client object for the server to send to each rpc client
let client: veilid_server::Client = capnp_rpc::new_client(veilid_server_impl);
let bind_futures = bind_addrs let bind_futures = bind_addrs
.iter() .iter()
.map(|addr| self.clone().handle_incoming(*addr, client.clone())); .map(|addr| self.clone().handle_incoming(*addr));
let bind_futures_join = try_join_all(bind_futures); let bind_futures_join = try_join_all(bind_futures);
self.inner.borrow_mut().join_handle = Some(spawn_local(bind_futures_join)); self.inner.borrow_mut().join_handle = Some(spawn_local(bind_futures_join));
} }

View File

@ -24,11 +24,6 @@ use tools::*;
use tracing::*; use tracing::*;
use veilid_logs::*; use veilid_logs::*;
#[allow(clippy::all)]
pub mod veilid_client_capnp {
include!(concat!(env!("OUT_DIR"), "/proto/veilid_client_capnp.rs"));
}
#[instrument(err)] #[instrument(err)]
fn main() -> EyreResult<()> { fn main() -> EyreResult<()> {
#[cfg(windows)] #[cfg(windows)]
@ -76,7 +71,7 @@ fn main() -> EyreResult<()> {
if matches.occurrences_of("emit-schema") != 0 { if matches.occurrences_of("emit-schema") != 0 {
if let Some(esstr) = matches.value_of("emit-schema") { if let Some(esstr) = matches.value_of("emit-schema") {
let mut schemas = HashMap::<String, String>::new(); let mut schemas = HashMap::<String, String>::new();
veilid_core::emit_schemas(&mut schemas); veilid_core::json_api::emit_schemas(&mut schemas);
if let Some(schema) = schemas.get(esstr) { if let Some(schema) = schemas.get(esstr) {
println!("{}", schema); println!("{}", schema);