diff --git a/veilid-cli/src/client_api_connection.rs b/veilid-cli/src/client_api_connection.rs index a116c478..2200d600 100644 --- a/veilid-cli/src/client_api_connection.rs +++ b/veilid-cli/src/client_api_connection.rs @@ -2,7 +2,6 @@ use crate::command_processor::*; use crate::veilid_client_capnp::*; use veilid_core::xx::*; -use async_std::prelude::*; use capnp::capability::Promise; use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, Disconnector, RpcSystem}; use futures::AsyncReadExt; @@ -87,6 +86,19 @@ impl ClientApiConnection { })), } } + async fn process_veilid_state<'a>( + &'a mut self, + state: veilid_state::Reader<'a>, + ) -> Result<(), String> { + let mut inner = self.inner.borrow_mut(); + + // Process attachment state + let attachment = state.reborrow().get_attachment().map_err(map_to_string)?; + let state = attachment.get_state().map_err(map_to_string)?; + inner.comproc.update_attachment(state); + + Ok(()) + } async fn handle_connection(&mut self) -> Result<(), String> { trace!("ClientApiConnection::handle_connection"); @@ -140,11 +152,20 @@ impl ClientApiConnection { )); } - // Don't drop the registration - rpc_system - .try_join(request.send().promise) - .await - .map_err(map_to_string)?; + // Send the request and get the state object and the registration object + if let Ok(response) = request.send().promise.await { + if let Ok(response) = response.get() { + if let Ok(_registration) = response.get_registration() { + if let Ok(state) = response.get_state() { + // Set up our state for the first time + if self.process_veilid_state(state).await.is_ok() { + // Don't drop the registration + rpc_system.await.map_err(map_to_string)?; + } + } + } + } + } // Drop the server and disconnector too (if we still have it) let mut inner = self.inner.borrow_mut(); @@ -169,7 +190,7 @@ impl ClientApiConnection { inner .server .as_ref() - .ok_or("Not connected, ignoring attach request".to_owned())? + .ok_or_else(|| "Not connected, ignoring attach request".to_owned())? .clone() }; let request = server.borrow().attach_request(); @@ -184,7 +205,7 @@ impl ClientApiConnection { inner .server .as_ref() - .ok_or("Not connected, ignoring detach request".to_owned())? + .ok_or_else(|| "Not connected, ignoring detach request".to_owned())? .clone() }; let request = server.borrow().detach_request(); @@ -199,7 +220,7 @@ impl ClientApiConnection { inner .server .as_ref() - .ok_or("Not connected, ignoring attach request".to_owned())? + .ok_or_else(|| "Not connected, ignoring attach request".to_owned())? .clone() }; let request = server.borrow().shutdown_request(); @@ -214,7 +235,7 @@ impl ClientApiConnection { inner .server .as_ref() - .ok_or("Not connected, ignoring attach request".to_owned())? + .ok_or_else(|| "Not connected, ignoring attach request".to_owned())? .clone() }; let mut request = server.borrow().debug_request(); diff --git a/veilid-core/src/attachment_manager.rs b/veilid-core/src/attachment_manager.rs index 544246ae..bc5e939f 100644 --- a/veilid-core/src/attachment_manager.rs +++ b/veilid-core/src/attachment_manager.rs @@ -333,11 +333,6 @@ impl AttachmentManager { } } - pub async fn send_state_update(&self) { - let attachment_machine = self.inner.lock().attachment_machine.clone(); - attachment_machine.send_state_update().await; - } - pub async fn request_attach(&self) { if !self.is_detached() { trace!("attach request ignored"); @@ -395,10 +390,8 @@ impl AttachmentManager { } else { return false; } - } else { - if eventual.await == state { - break; - } + } else if eventual.await == state { + break; } } true diff --git a/veilid-core/src/callback_state_machine.rs b/veilid-core/src/callback_state_machine.rs index db03d9b2..f6d04505 100644 --- a/veilid-core/src/callback_state_machine.rs +++ b/veilid-core/src/callback_state_machine.rs @@ -68,24 +68,12 @@ where // pub fn clear_state_change_callback(&self) { // self.inner.lock().callback = None; // } + pub fn state_eventual_instance(&self) -> (T::State, EventualValueCloneFuture) { let inner = self.inner.lock(); (inner.state, inner.eventual.instance()) } - pub async fn send_state_update(&self) { - let (state, callback, eventual) = { - let mut inner = self.inner.lock(); - let eventual = - core::mem::replace(&mut inner.eventual, EventualValueClone::::new()); - (inner.state, inner.callback.clone(), eventual) - }; - if let Some(cb) = callback { - cb(state, state).await; - } - eventual.resolve(state).await; - } - pub async fn consume(&self, input: &T::Input) -> Result, ()> { let current_state = self.inner.lock().state; diff --git a/veilid-server/proto/veilid-client.capnp b/veilid-server/proto/veilid-client.capnp index facac702..a8a1b93f 100644 --- a/veilid-server/proto/veilid-client.capnp +++ b/veilid-server/proto/veilid-client.capnp @@ -30,7 +30,7 @@ interface Registration {} interface VeilidServer { - register @0 (veilidClient: VeilidClient) -> (registration: Registration); + register @0 (veilidClient: VeilidClient) -> (registration: Registration, state: VeilidState); debug @1 (what: Text) -> (output: Text); attach @2 (); diff --git a/veilid-server/src/client_api.rs b/veilid-server/src/client_api.rs index 5984dae7..bfccdd20 100644 --- a/veilid-server/src/client_api.rs +++ b/veilid-server/src/client_api.rs @@ -121,6 +121,7 @@ impl veilid_server::Server for VeilidServerImpl { mut results: veilid_server::RegisterResults, ) -> Promise<(), ::capnp::Error> { trace!("VeilidServerImpl::register"); + self.registration_map.borrow_mut().registrations.insert( self.next_id, RegistrationHandle { @@ -129,16 +130,26 @@ impl veilid_server::Server for VeilidServerImpl { }, ); - results - .get() - .set_registration(capnp_rpc::new_client(RegistrationImpl::new( - self.next_id, - self.registration_map.clone(), - ))); - + let veilid_api = self.veilid_api.clone(); + let registration = capnp_rpc::new_client(RegistrationImpl::new( + self.next_id, + self.registration_map.clone(), + )); self.next_id += 1; - Promise::ok(()) + Promise::from_future(async move { + let state = veilid_api + .get_state() + .await + .map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?; + + let mut res = results.get(); + res.set_registration(registration); + let rpc_state = res.init_state(); + convert_state(&state, rpc_state); + + Ok(()) + }) } fn debug(