clean up veilid state processing

This commit is contained in:
John Smith 2022-01-18 18:28:22 -05:00
parent 1b6864bf22
commit 205a6a8fd1
5 changed files with 54 additions and 41 deletions

View File

@ -2,7 +2,6 @@ use crate::command_processor::*;
use crate::veilid_client_capnp::*; use crate::veilid_client_capnp::*;
use veilid_core::xx::*; use veilid_core::xx::*;
use async_std::prelude::*;
use capnp::capability::Promise; use capnp::capability::Promise;
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, Disconnector, RpcSystem}; use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, Disconnector, RpcSystem};
use futures::AsyncReadExt; use futures::AsyncReadExt;
@ -87,6 +86,19 @@ impl ClientApiConnection {
})), })),
} }
} }
async fn process_veilid_state<'a>(
&'a mut self,
state: veilid_state::Reader<'a>,
) -> Result<(), String> {
let mut inner = self.inner.borrow_mut();
// Process attachment state
let attachment = state.reborrow().get_attachment().map_err(map_to_string)?;
let state = attachment.get_state().map_err(map_to_string)?;
inner.comproc.update_attachment(state);
Ok(())
}
async fn handle_connection(&mut self) -> Result<(), String> { async fn handle_connection(&mut self) -> Result<(), String> {
trace!("ClientApiConnection::handle_connection"); trace!("ClientApiConnection::handle_connection");
@ -140,11 +152,20 @@ impl ClientApiConnection {
)); ));
} }
// Don't drop the registration // Send the request and get the state object and the registration object
rpc_system if let Ok(response) = request.send().promise.await {
.try_join(request.send().promise) if let Ok(response) = response.get() {
.await if let Ok(_registration) = response.get_registration() {
.map_err(map_to_string)?; if let Ok(state) = response.get_state() {
// Set up our state for the first time
if self.process_veilid_state(state).await.is_ok() {
// Don't drop the registration
rpc_system.await.map_err(map_to_string)?;
}
}
}
}
}
// Drop the server and disconnector too (if we still have it) // Drop the server and disconnector too (if we still have it)
let mut inner = self.inner.borrow_mut(); let mut inner = self.inner.borrow_mut();
@ -169,7 +190,7 @@ impl ClientApiConnection {
inner inner
.server .server
.as_ref() .as_ref()
.ok_or("Not connected, ignoring attach request".to_owned())? .ok_or_else(|| "Not connected, ignoring attach request".to_owned())?
.clone() .clone()
}; };
let request = server.borrow().attach_request(); let request = server.borrow().attach_request();
@ -184,7 +205,7 @@ impl ClientApiConnection {
inner inner
.server .server
.as_ref() .as_ref()
.ok_or("Not connected, ignoring detach request".to_owned())? .ok_or_else(|| "Not connected, ignoring detach request".to_owned())?
.clone() .clone()
}; };
let request = server.borrow().detach_request(); let request = server.borrow().detach_request();
@ -199,7 +220,7 @@ impl ClientApiConnection {
inner inner
.server .server
.as_ref() .as_ref()
.ok_or("Not connected, ignoring attach request".to_owned())? .ok_or_else(|| "Not connected, ignoring attach request".to_owned())?
.clone() .clone()
}; };
let request = server.borrow().shutdown_request(); let request = server.borrow().shutdown_request();
@ -214,7 +235,7 @@ impl ClientApiConnection {
inner inner
.server .server
.as_ref() .as_ref()
.ok_or("Not connected, ignoring attach request".to_owned())? .ok_or_else(|| "Not connected, ignoring attach request".to_owned())?
.clone() .clone()
}; };
let mut request = server.borrow().debug_request(); let mut request = server.borrow().debug_request();

View File

@ -333,11 +333,6 @@ impl AttachmentManager {
} }
} }
pub async fn send_state_update(&self) {
let attachment_machine = self.inner.lock().attachment_machine.clone();
attachment_machine.send_state_update().await;
}
pub async fn request_attach(&self) { pub async fn request_attach(&self) {
if !self.is_detached() { if !self.is_detached() {
trace!("attach request ignored"); trace!("attach request ignored");
@ -395,10 +390,8 @@ impl AttachmentManager {
} else { } else {
return false; return false;
} }
} else { } else if eventual.await == state {
if eventual.await == state { break;
break;
}
} }
} }
true true

View File

@ -68,24 +68,12 @@ where
// pub fn clear_state_change_callback(&self) { // pub fn clear_state_change_callback(&self) {
// self.inner.lock().callback = None; // self.inner.lock().callback = None;
// } // }
pub fn state_eventual_instance(&self) -> (T::State, EventualValueCloneFuture<T::State>) { pub fn state_eventual_instance(&self) -> (T::State, EventualValueCloneFuture<T::State>) {
let inner = self.inner.lock(); let inner = self.inner.lock();
(inner.state, inner.eventual.instance()) (inner.state, inner.eventual.instance())
} }
pub async fn send_state_update(&self) {
let (state, callback, eventual) = {
let mut inner = self.inner.lock();
let eventual =
core::mem::replace(&mut inner.eventual, EventualValueClone::<T::State>::new());
(inner.state, inner.callback.clone(), eventual)
};
if let Some(cb) = callback {
cb(state, state).await;
}
eventual.resolve(state).await;
}
pub async fn consume(&self, input: &T::Input) -> Result<Option<T::Output>, ()> { pub async fn consume(&self, input: &T::Input) -> Result<Option<T::Output>, ()> {
let current_state = self.inner.lock().state; let current_state = self.inner.lock().state;

View File

@ -30,7 +30,7 @@ interface Registration {}
interface VeilidServer { interface VeilidServer {
register @0 (veilidClient: VeilidClient) -> (registration: Registration); register @0 (veilidClient: VeilidClient) -> (registration: Registration, state: VeilidState);
debug @1 (what: Text) -> (output: Text); debug @1 (what: Text) -> (output: Text);
attach @2 (); attach @2 ();

View File

@ -121,6 +121,7 @@ impl veilid_server::Server for VeilidServerImpl {
mut results: veilid_server::RegisterResults, mut results: veilid_server::RegisterResults,
) -> Promise<(), ::capnp::Error> { ) -> Promise<(), ::capnp::Error> {
trace!("VeilidServerImpl::register"); trace!("VeilidServerImpl::register");
self.registration_map.borrow_mut().registrations.insert( self.registration_map.borrow_mut().registrations.insert(
self.next_id, self.next_id,
RegistrationHandle { RegistrationHandle {
@ -129,16 +130,26 @@ impl veilid_server::Server for VeilidServerImpl {
}, },
); );
results let veilid_api = self.veilid_api.clone();
.get() let registration = capnp_rpc::new_client(RegistrationImpl::new(
.set_registration(capnp_rpc::new_client(RegistrationImpl::new( self.next_id,
self.next_id, self.registration_map.clone(),
self.registration_map.clone(), ));
)));
self.next_id += 1; self.next_id += 1;
Promise::ok(()) Promise::from_future(async move {
let state = veilid_api
.get_state()
.await
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?;
let mut res = results.get();
res.set_registration(registration);
let rpc_state = res.init_state();
convert_state(&state, rpc_state);
Ok(())
})
} }
fn debug( fn debug(