From baa171494332d63d32d76e4e7c8134ff77b2a670 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 25 Sep 2022 18:04:53 -0400 Subject: [PATCH] app call/message and private routing checkpoint --- veilid-cli/src/client_api_connection.rs | 4 +- veilid-cli/src/command_processor.rs | 2 +- veilid-core/proto/veilid.capnp | 30 +- veilid-core/src/api_tracing_layer.rs | 2 +- veilid-core/src/network_manager/mod.rs | 10 +- veilid-core/src/routing_table/find_nodes.rs | 10 +- veilid-core/src/routing_table/node_ref.rs | 23 +- .../rpc_processor/coders/operations/answer.rs | 11 +- .../rpc_processor/coders/operations/mod.rs | 4 + .../coders/operations/operation_app_call.rs | 44 ++ .../operations/operation_app_message.rs | 23 + .../coders/operations/question.rs | 8 + .../coders/operations/statement.rs | 10 + .../coders/private_safety_route.rs | 74 ---- veilid-core/src/rpc_processor/mod.rs | 319 +++++++------- .../src/rpc_processor/operation_waiter.rs | 136 ++++++ .../src/rpc_processor/private_route.rs | 2 +- veilid-core/src/rpc_processor/rpc_app_call.rs | 99 +++++ .../src/rpc_processor/rpc_app_message.rs | 42 ++ veilid-core/src/rpc_processor/rpc_error.rs | 17 +- veilid-core/src/rpc_processor/rpc_signal.rs | 2 +- .../rpc_processor/rpc_validate_dial_info.rs | 8 +- veilid-core/src/veilid_api/mod.rs | 403 +++++++----------- veilid-core/src/veilid_api/privacy.rs | 124 ++++++ veilid-core/src/veilid_api/routing_context.rs | 213 +++++++++ veilid-core/src/xx/eventual_value.rs | 8 + veilid-core/src/xx/timeout_or.rs | 9 + veilid-flutter/example/lib/main.dart | 26 +- veilid-flutter/lib/veilid.dart | 14 +- 29 files changed, 1139 insertions(+), 538 deletions(-) create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_app_call.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_app_message.rs create mode 100644 veilid-core/src/rpc_processor/operation_waiter.rs create mode 100644 veilid-core/src/rpc_processor/rpc_app_call.rs create mode 100644 veilid-core/src/rpc_processor/rpc_app_message.rs create mode 100644 veilid-core/src/veilid_api/privacy.rs create mode 100644 veilid-core/src/veilid_api/routing_context.rs diff --git a/veilid-cli/src/client_api_connection.rs b/veilid-cli/src/client_api_connection.rs index 2ae3de6c..22828531 100644 --- a/veilid-cli/src/client_api_connection.rs +++ b/veilid-cli/src/client_api_connection.rs @@ -196,9 +196,9 @@ impl ClientApiConnection { // Wait until rpc system completion or disconnect was requested let res = rpc_jh.await; - #[cfg(feature="rt-tokio")] + #[cfg(feature = "rt-tokio")] let res = res.map_err(|e| format!("join error: {}", e))?; - res.map_err(|e| format!("client RPC system error: {}", e)) + res.map_err(|e| format!("client RPC system error: {}", e)) } async fn handle_connection(&mut self) -> Result<(), String> { diff --git a/veilid-cli/src/command_processor.rs b/veilid-cli/src/command_processor.rs index 67dcaad8..1e6d4ad6 100644 --- a/veilid-cli/src/command_processor.rs +++ b/veilid-cli/src/command_processor.rs @@ -331,7 +331,7 @@ change_log_level - change the log level for a tracing layer ); } - pub fn update_log(&mut self, log: veilid_core::VeilidStateLog) { + pub fn update_log(&mut self, log: veilid_core::VeilidLog) { self.inner().ui.add_node_event(format!( "{}: {}{}", log.log_level, diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index a0f791a4..558686b2 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -294,6 +294,19 @@ struct OperationNodeInfoUpdate { signedNodeInfo @0 :SignedNodeInfo; # Our signed node info } + +struct OperationAppCallQ { + message @0 :Data; # Opaque request to application +} + +struct OperationAppCallA { + message @0 :Data; # Opaque response from application +} + +struct OperationAppMessage { + message @0 :Data; # Opaque message to application +} + struct OperationGetValueQ { key @0 :ValueKey; # key for value to get } @@ -445,11 +458,12 @@ struct Question { watchValueQ @6 :OperationWatchValueQ; supplyBlockQ @7 :OperationSupplyBlockQ; findBlockQ @8 :OperationFindBlockQ; + appCallQ @9 :OperationAppCallQ; # Tunnel operations - startTunnelQ @9 :OperationStartTunnelQ; - completeTunnelQ @10 :OperationCompleteTunnelQ; - cancelTunnelQ @11 :OperationCancelTunnelQ; + startTunnelQ @10 :OperationStartTunnelQ; + completeTunnelQ @11 :OperationCompleteTunnelQ; + cancelTunnelQ @12 :OperationCancelTunnelQ; } } @@ -465,6 +479,7 @@ struct Statement { valueChanged @3 :OperationValueChanged; signal @4 :OperationSignal; returnReceipt @5 :OperationReturnReceipt; + appMessage @6 :OperationAppMessage; } } @@ -480,12 +495,13 @@ struct Answer { setValueA @3 :OperationSetValueA; watchValueA @4 :OperationWatchValueA; supplyBlockA @5 :OperationSupplyBlockA; - findBlockA @6 :OperationFindBlockA; + findBlockA @6 :OperationFindBlockA; + appCallA @7 :OperationAppCallA; # Tunnel operations - startTunnelA @7 :OperationStartTunnelA; - completeTunnelA @8 :OperationCompleteTunnelA; - cancelTunnelA @9 :OperationCancelTunnelA; + startTunnelA @8 :OperationStartTunnelA; + completeTunnelA @9 :OperationCompleteTunnelA; + cancelTunnelA @10 :OperationCancelTunnelA; } } diff --git a/veilid-core/src/api_tracing_layer.rs b/veilid-core/src/api_tracing_layer.rs index 3106e2f9..dac00241 100644 --- a/veilid-core/src/api_tracing_layer.rs +++ b/veilid-core/src/api_tracing_layer.rs @@ -103,7 +103,7 @@ impl registry::LookupSpan<'a>> Layer for ApiTracingLa None }; - (inner.update_callback)(VeilidUpdate::Log(VeilidStateLog { + (inner.update_callback)(VeilidUpdate::Log(VeilidLog { log_level, message, backtrace, diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index f9090069..3fe83c63 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -358,7 +358,15 @@ impl NetworkManager { self.routing_table(), connection_manager.clone(), ); - let rpc_processor = RPCProcessor::new(self.clone()); + let rpc_processor = RPCProcessor::new( + self.clone(), + self.unlocked_inner + .update_callback + .read() + .as_ref() + .unwrap() + .clone(), + ); let receipt_manager = ReceiptManager::new(self.clone()); *self.unlocked_inner.components.write() = Some(NetworkComponents { net: net.clone(), diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index 581c98a4..ca9841a2 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -22,7 +22,9 @@ impl RoutingTable { move |e| { if let Some(ni) = e.node_info(routing_domain) { if ni - .first_filtered_dial_info_detail(|did| did.matches_filter(&dial_info_filter)) + .first_filtered_dial_info_detail(DialInfoDetail::NO_SORT, |did| { + did.matches_filter(&dial_info_filter) + }) .is_some() { return true; @@ -436,8 +438,10 @@ impl RoutingTable { let can_serve_as_relay = e .node_info(RoutingDomain::PublicInternet) .map(|n| { - let dids = - n.all_filtered_dial_info_details(|did| did.matches_filter(&outbound_dif)); + let dids = n.all_filtered_dial_info_details( + Some(DialInfoDetail::reliable_sort), // By default, choose reliable protocol for relay + |did| did.matches_filter(&outbound_dif), + ); for did in &dids { let pt = did.dial_info.protocol_type(); let at = did.dial_info.address_type(); diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 26db4a8b..0b64de14 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -71,6 +71,7 @@ pub struct NodeRef { node_id: DHTKey, entry: Arc, filter: Option, + reliable: bool, #[cfg(feature = "tracking")] track_id: usize, } @@ -89,6 +90,7 @@ impl NodeRef { node_id, entry, filter, + reliable: true, #[cfg(feature = "tracking")] track_id: entry.track(), } @@ -126,6 +128,10 @@ impl NodeRef { self.filter = filter } + pub fn set_reliable(&mut self) { + self.reliable = true; + } + pub fn merge_filter(&mut self, filter: NodeRefFilter) { if let Some(self_filter) = self.filter.take() { self.filter = Some(self_filter.filtered(&filter)); @@ -267,11 +273,17 @@ impl NodeRef { let routing_domain_set = self.routing_domain_set(); let dial_info_filter = self.dial_info_filter(); + let sort = if self.reliable { + Some(DialInfoDetail::reliable_sort) + } else { + None + }; + self.operate(|_rt, e| { for routing_domain in routing_domain_set { if let Some(ni) = e.node_info(routing_domain) { let filter = |did: &DialInfoDetail| did.matches_filter(&dial_info_filter); - if let Some(did) = ni.first_filtered_dial_info_detail(filter) { + if let Some(did) = ni.first_filtered_dial_info_detail(sort, filter) { return Some(did); } } @@ -284,12 +296,18 @@ impl NodeRef { let routing_domain_set = self.routing_domain_set(); let dial_info_filter = self.dial_info_filter(); + let sort = if self.reliable { + Some(DialInfoDetail::reliable_sort) + } else { + None + }; + let mut out = Vec::new(); self.operate(|_rt, e| { for routing_domain in routing_domain_set { if let Some(ni) = e.node_info(routing_domain) { let filter = |did: &DialInfoDetail| did.matches_filter(&dial_info_filter); - if let Some(did) = ni.first_filtered_dial_info_detail(filter) { + if let Some(did) = ni.first_filtered_dial_info_detail(sort, filter) { out.push(did); } } @@ -390,6 +408,7 @@ impl Clone for NodeRef { node_id: self.node_id, entry: self.entry.clone(), filter: self.filter.clone(), + reliable: self.reliable, #[cfg(feature = "tracking")] track_id: e.track(), } diff --git a/veilid-core/src/rpc_processor/coders/operations/answer.rs b/veilid-core/src/rpc_processor/coders/operations/answer.rs index 906132f2..e6d801a3 100644 --- a/veilid-core/src/rpc_processor/coders/operations/answer.rs +++ b/veilid-core/src/rpc_processor/coders/operations/answer.rs @@ -11,9 +11,6 @@ impl RPCAnswer { pub fn new(detail: RPCAnswerDetail) -> Self { Self { detail } } - // pub fn detail(&self) -> &RPCAnswerDetail { - // &self.detail - // } pub fn into_detail(self) -> RPCAnswerDetail { self.detail } @@ -35,6 +32,7 @@ impl RPCAnswer { pub enum RPCAnswerDetail { StatusA(RPCOperationStatusA), FindNodeA(RPCOperationFindNodeA), + AppCallA(RPCOperationAppCallA), GetValueA(RPCOperationGetValueA), SetValueA(RPCOperationSetValueA), WatchValueA(RPCOperationWatchValueA), @@ -50,6 +48,7 @@ impl RPCAnswerDetail { match self { RPCAnswerDetail::StatusA(_) => "StatusA", RPCAnswerDetail::FindNodeA(_) => "FindNodeA", + RPCAnswerDetail::AppCallA(_) => "AppCallA", RPCAnswerDetail::GetValueA(_) => "GetValueA", RPCAnswerDetail::SetValueA(_) => "SetValueA", RPCAnswerDetail::WatchValueA(_) => "WatchValueA", @@ -76,6 +75,11 @@ impl RPCAnswerDetail { let out = RPCOperationFindNodeA::decode(&op_reader)?; RPCAnswerDetail::FindNodeA(out) } + veilid_capnp::answer::detail::AppCallA(r) => { + let op_reader = r.map_err(RPCError::protocol)?; + let out = RPCOperationAppCallA::decode(&op_reader)?; + RPCAnswerDetail::AppCallA(out) + } veilid_capnp::answer::detail::GetValueA(r) => { let op_reader = r.map_err(RPCError::protocol)?; let out = RPCOperationGetValueA::decode(&op_reader)?; @@ -126,6 +130,7 @@ impl RPCAnswerDetail { match self { RPCAnswerDetail::StatusA(d) => d.encode(&mut builder.reborrow().init_status_a()), RPCAnswerDetail::FindNodeA(d) => d.encode(&mut builder.reborrow().init_find_node_a()), + RPCAnswerDetail::AppCallA(d) => d.encode(&mut builder.reborrow().init_app_call_a()), RPCAnswerDetail::GetValueA(d) => d.encode(&mut builder.reborrow().init_get_value_a()), RPCAnswerDetail::SetValueA(d) => d.encode(&mut builder.reborrow().init_set_value_a()), RPCAnswerDetail::WatchValueA(d) => { diff --git a/veilid-core/src/rpc_processor/coders/operations/mod.rs b/veilid-core/src/rpc_processor/coders/operations/mod.rs index 8426ff1b..687ca68e 100644 --- a/veilid-core/src/rpc_processor/coders/operations/mod.rs +++ b/veilid-core/src/rpc_processor/coders/operations/mod.rs @@ -1,5 +1,7 @@ mod answer; mod operation; +mod operation_app_call; +mod operation_app_message; mod operation_cancel_tunnel; mod operation_complete_tunnel; mod operation_find_block; @@ -22,6 +24,8 @@ mod statement; pub use answer::*; pub use operation::*; +pub use operation_app_call::*; +pub use operation_app_message::*; pub use operation_cancel_tunnel::*; pub use operation_complete_tunnel::*; pub use operation_find_block::*; diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_app_call.rs b/veilid-core/src/rpc_processor/coders/operations/operation_app_call.rs new file mode 100644 index 00000000..609999bb --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation_app_call.rs @@ -0,0 +1,44 @@ +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub struct RPCOperationAppCallQ { + pub message: Vec, +} + +impl RPCOperationAppCallQ { + pub fn decode( + reader: &veilid_capnp::operation_app_call_q::Reader, + ) -> Result { + let message = reader.get_message().map_err(RPCError::protocol)?.to_vec(); + Ok(RPCOperationAppCallQ { message }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_app_call_q::Builder, + ) -> Result<(), RPCError> { + builder.set_message(&self.message); + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct RPCOperationAppCallA { + pub message: Vec, +} + +impl RPCOperationAppCallA { + pub fn decode( + reader: &veilid_capnp::operation_app_call_a::Reader, + ) -> Result { + let message = reader.get_message().map_err(RPCError::protocol)?.to_vec(); + Ok(RPCOperationAppCallA { message }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_app_call_a::Builder, + ) -> Result<(), RPCError> { + builder.set_message(&self.message); + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_app_message.rs b/veilid-core/src/rpc_processor/coders/operations/operation_app_message.rs new file mode 100644 index 00000000..5a844f02 --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation_app_message.rs @@ -0,0 +1,23 @@ +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub struct RPCOperationAppMessage { + pub message: Vec, +} + +impl RPCOperationAppMessage { + pub fn decode( + reader: &veilid_capnp::operation_app_message::Reader, + ) -> Result { + let message = reader.get_message().map_err(RPCError::protocol)?.to_vec(); + Ok(RPCOperationAppMessage { message }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_app_message::Builder, + ) -> Result<(), RPCError> { + builder.set_message(&self.message); + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/question.rs b/veilid-core/src/rpc_processor/coders/operations/question.rs index ae62ca72..02995b26 100644 --- a/veilid-core/src/rpc_processor/coders/operations/question.rs +++ b/veilid-core/src/rpc_processor/coders/operations/question.rs @@ -40,6 +40,7 @@ impl RPCQuestion { pub enum RPCQuestionDetail { StatusQ(RPCOperationStatusQ), FindNodeQ(RPCOperationFindNodeQ), + AppCallQ(RPCOperationAppCallQ), GetValueQ(RPCOperationGetValueQ), SetValueQ(RPCOperationSetValueQ), WatchValueQ(RPCOperationWatchValueQ), @@ -55,6 +56,7 @@ impl RPCQuestionDetail { match self { RPCQuestionDetail::StatusQ(_) => "StatusQ", RPCQuestionDetail::FindNodeQ(_) => "FindNodeQ", + RPCQuestionDetail::AppCallQ(_) => "AppCallQ", RPCQuestionDetail::GetValueQ(_) => "GetValueQ", RPCQuestionDetail::SetValueQ(_) => "SetValueQ", RPCQuestionDetail::WatchValueQ(_) => "WatchValueQ", @@ -81,6 +83,11 @@ impl RPCQuestionDetail { let out = RPCOperationFindNodeQ::decode(&op_reader)?; RPCQuestionDetail::FindNodeQ(out) } + veilid_capnp::question::detail::Which::AppCallQ(r) => { + let op_reader = r.map_err(RPCError::protocol)?; + let out = RPCOperationAppCallQ::decode(&op_reader)?; + RPCQuestionDetail::AppCallQ(out) + } veilid_capnp::question::detail::GetValueQ(r) => { let op_reader = r.map_err(RPCError::protocol)?; let out = RPCOperationGetValueQ::decode(&op_reader)?; @@ -131,6 +138,7 @@ impl RPCQuestionDetail { match self { RPCQuestionDetail::StatusQ(d) => d.encode(&mut builder.reborrow().init_status_q()), RPCQuestionDetail::FindNodeQ(d) => d.encode(&mut builder.reborrow().init_find_node_q()), + RPCQuestionDetail::AppCallQ(d) => d.encode(&mut builder.reborrow().init_app_call_q()), RPCQuestionDetail::GetValueQ(d) => d.encode(&mut builder.reborrow().init_get_value_q()), RPCQuestionDetail::SetValueQ(d) => d.encode(&mut builder.reborrow().init_set_value_q()), RPCQuestionDetail::WatchValueQ(d) => { diff --git a/veilid-core/src/rpc_processor/coders/operations/statement.rs b/veilid-core/src/rpc_processor/coders/operations/statement.rs index 4eb75f5c..bbac6455 100644 --- a/veilid-core/src/rpc_processor/coders/operations/statement.rs +++ b/veilid-core/src/rpc_processor/coders/operations/statement.rs @@ -42,6 +42,7 @@ pub enum RPCStatementDetail { ValueChanged(RPCOperationValueChanged), Signal(RPCOperationSignal), ReturnReceipt(RPCOperationReturnReceipt), + AppMessage(RPCOperationAppMessage), } impl RPCStatementDetail { @@ -53,6 +54,7 @@ impl RPCStatementDetail { RPCStatementDetail::ValueChanged(_) => "ValueChanged", RPCStatementDetail::Signal(_) => "Signal", RPCStatementDetail::ReturnReceipt(_) => "ReturnReceipt", + RPCStatementDetail::AppMessage(_) => "AppMessage", } } pub fn decode( @@ -91,6 +93,11 @@ impl RPCStatementDetail { let out = RPCOperationReturnReceipt::decode(&op_reader)?; RPCStatementDetail::ReturnReceipt(out) } + veilid_capnp::statement::detail::AppMessage(r) => { + let op_reader = r.map_err(RPCError::protocol)?; + let out = RPCOperationAppMessage::decode(&op_reader)?; + RPCStatementDetail::AppMessage(out) + } }; Ok(out) } @@ -113,6 +120,9 @@ impl RPCStatementDetail { RPCStatementDetail::ReturnReceipt(d) => { d.encode(&mut builder.reborrow().init_return_receipt()) } + RPCStatementDetail::AppMessage(d) => { + d.encode(&mut builder.reborrow().init_app_message()) + } } } } diff --git a/veilid-core/src/rpc_processor/coders/private_safety_route.rs b/veilid-core/src/rpc_processor/coders/private_safety_route.rs index 6c3c70f6..f96261cb 100644 --- a/veilid-core/src/rpc_processor/coders/private_safety_route.rs +++ b/veilid-core/src/rpc_processor/coders/private_safety_route.rs @@ -2,80 +2,6 @@ use super::*; //////////////////////////////////////////////////////////////////////////////////////////////////// -#[derive(Clone, Debug)] -pub struct RouteHopData { - pub nonce: Nonce, - pub blob: Vec, -} - -#[derive(Clone, Debug)] -pub struct RouteHop { - pub dial_info: NodeDialInfo, - pub next_hop: Option, -} - -#[derive(Clone, Debug)] -pub struct PrivateRoute { - pub public_key: DHTKey, - pub hop_count: u8, - pub hops: Option, -} - -impl PrivateRoute { - pub fn new_stub(public_key: DHTKey) -> Self { - Self { - public_key, - hop_count: 0, - hops: None, - } - } -} - -impl fmt::Display for PrivateRoute { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "PR({:?}+{}{})", - self.public_key, - self.hop_count, - if let Some(hops) = &self.hops { - format!("->{}", hops.dial_info) - } else { - "".to_owned() - } - ) - } -} - -#[derive(Clone, Debug)] -pub enum SafetyRouteHops { - Data(RouteHopData), - Private(PrivateRoute), -} - -#[derive(Clone, Debug)] -pub struct SafetyRoute { - pub public_key: DHTKey, - pub hop_count: u8, - pub hops: SafetyRouteHops, -} - -impl fmt::Display for SafetyRoute { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "SR({:?}+{}{})", - self.public_key, - self.hop_count, - match &self.hops { - SafetyRouteHops::Data(_) => "".to_owned(), - SafetyRouteHops::Private(p) => format!("->{}", p), - } - ) - } -} -//////////////////////////////////////////////////////////////////////////////////////////////////// - pub fn encode_route_hop_data( route_hop_data: &RouteHopData, builder: &mut veilid_capnp::route_hop_data::Builder, diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index b4c38203..6a6ac7ad 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -1,6 +1,9 @@ mod coders; mod destination; +mod operation_waiter; mod private_route; +mod rpc_app_call; +mod rpc_app_message; mod rpc_cancel_tunnel; mod rpc_complete_tunnel; mod rpc_error; @@ -20,6 +23,7 @@ mod rpc_value_changed; mod rpc_watch_value; pub use destination::*; +pub use operation_waiter::*; pub use private_route::*; pub use rpc_error::*; @@ -108,8 +112,7 @@ where #[derive(Debug)] struct WaitableReply { - op_id: OperationId, - eventual: EventualValue<(Option, RPCMessage)>, + handle: OperationWaitHandle, timeout: u64, node_ref: NodeRef, send_ts: u64, @@ -138,62 +141,162 @@ struct RenderedOperation { ///////////////////////////////////////////////////////////////////// pub struct RPCProcessorInner { - network_manager: NetworkManager, - routing_table: RoutingTable, - node_id: DHTKey, - node_id_secret: DHTKeySecret, send_channel: Option, RPCMessageEncoded)>>, - timeout: u64, - max_route_hop_count: usize, - waiting_rpc_table: BTreeMap, RPCMessage)>>, stop_source: Option, worker_join_handles: Vec>, } +pub struct RPCProcessorUnlockedInner { + node_id: DHTKey, + node_id_secret: DHTKeySecret, + timeout: u64, + queue_size: u32, + concurrency: u32, + max_route_hop_count: usize, + validate_dial_info_receipt_time_ms: u32, + update_callback: UpdateCallback, + waiting_rpc_table: OperationWaiter, + waiting_app_call_table: OperationWaiter>, +} + #[derive(Clone)] pub struct RPCProcessor { crypto: Crypto, config: VeilidConfig, + network_manager: NetworkManager, + routing_table: RoutingTable, inner: Arc>, + unlocked_inner: Arc, } impl RPCProcessor { - fn new_inner(network_manager: NetworkManager) -> RPCProcessorInner { + fn new_inner() -> RPCProcessorInner { RPCProcessorInner { - network_manager: network_manager.clone(), - routing_table: network_manager.routing_table(), - node_id: DHTKey::default(), - node_id_secret: DHTKeySecret::default(), send_channel: None, - timeout: 10000000, - max_route_hop_count: 7, - waiting_rpc_table: BTreeMap::new(), stop_source: None, worker_join_handles: Vec::new(), } } - pub fn new(network_manager: NetworkManager) -> Self { + fn new_unlocked_inner( + config: VeilidConfig, + update_callback: UpdateCallback, + ) -> RPCProcessorUnlockedInner { + // make local copy of node id for easy access + let c = config.get(); + let node_id = c.network.node_id; + let node_id_secret = c.network.node_id_secret; + + // set up channel + let mut concurrency = c.network.rpc.concurrency; + let mut queue_size = c.network.rpc.queue_size; + let mut timeout = ms_to_us(c.network.rpc.timeout_ms); + let mut max_route_hop_count = c.network.rpc.max_route_hop_count as usize; + if concurrency == 0 { + concurrency = intf::get_concurrency() / 2; + if concurrency == 0 { + concurrency = 1; + } + } + if queue_size == 0 { + queue_size = 1024; + } + if timeout == 0 { + timeout = 10000000; + } + if max_route_hop_count == 0 { + max_route_hop_count = 7usize; + } + let validate_dial_info_receipt_time_ms = c.network.dht.validate_dial_info_receipt_time_ms; + + RPCProcessorUnlockedInner { + node_id, + node_id_secret, + timeout, + queue_size, + concurrency, + max_route_hop_count, + validate_dial_info_receipt_time_ms, + update_callback, + waiting_rpc_table: OperationWaiter::new(), + waiting_app_call_table: OperationWaiter::new(), + } + } + pub fn new(network_manager: NetworkManager, update_callback: UpdateCallback) -> Self { + let config = network_manager.config(); Self { crypto: network_manager.crypto(), - config: network_manager.config(), - inner: Arc::new(Mutex::new(Self::new_inner(network_manager))), + config: config.clone(), + network_manager: network_manager.clone(), + routing_table: network_manager.routing_table(), + inner: Arc::new(Mutex::new(Self::new_inner())), + unlocked_inner: Arc::new(Self::new_unlocked_inner(config, update_callback)), } } pub fn network_manager(&self) -> NetworkManager { - self.inner.lock().network_manager.clone() + self.network_manager.clone() } pub fn routing_table(&self) -> RoutingTable { - self.inner.lock().routing_table.clone() + self.routing_table.clone() } - pub fn node_id(&self) -> DHTKey { - self.inner.lock().node_id + ////////////////////////////////////////////////////////////////////// + + #[instrument(level = "debug", skip_all, err)] + pub async fn startup(&self) -> EyreResult<()> { + trace!("startup rpc processor"); + let mut inner = self.inner.lock(); + + let channel = flume::bounded(self.unlocked_inner.queue_size as usize); + inner.send_channel = Some(channel.0.clone()); + inner.stop_source = Some(StopSource::new()); + + // spin up N workers + trace!( + "Spinning up {} RPC workers", + self.unlocked_inner.concurrency + ); + for _ in 0..self.unlocked_inner.concurrency { + let this = self.clone(); + let receiver = channel.1.clone(); + let jh = intf::spawn(Self::rpc_worker( + this, + inner.stop_source.as_ref().unwrap().token(), + receiver, + )); + inner.worker_join_handles.push(jh); + } + + Ok(()) } - pub fn node_id_secret(&self) -> DHTKeySecret { - self.inner.lock().node_id_secret + #[instrument(level = "debug", skip_all)] + pub async fn shutdown(&self) { + debug!("starting rpc processor shutdown"); + + // Stop the rpc workers + let mut unord = FuturesUnordered::new(); + { + let mut inner = self.inner.lock(); + // take the join handles out + for h in inner.worker_join_handles.drain(..) { + unord.push(h); + } + // drop the stop + drop(inner.stop_source.take()); + } + debug!("stopping {} rpc worker tasks", unord.len()); + + // Wait for them to complete + while unord.next().await.is_some() {} + + debug!("resetting rpc processor state"); + + // Release the rpc processor + *self.inner.lock() = Self::new_inner(); + + debug!("finished rpc processor shutdown"); } ////////////////////////////////////////////////////////////////////// @@ -278,71 +381,18 @@ impl RPCProcessor { }) } - // set up wait for reply - fn add_op_id_waiter(&self, op_id: OperationId) -> EventualValue<(Option, RPCMessage)> { - let mut inner = self.inner.lock(); - let e = EventualValue::new(); - inner.waiting_rpc_table.insert(op_id, e.clone()); - e - } - - // remove wait for reply - fn cancel_op_id_waiter(&self, op_id: OperationId) { - let mut inner = self.inner.lock(); - inner.waiting_rpc_table.remove(&op_id); - } - - // complete the reply - #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] - async fn complete_op_id_waiter(&self, msg: RPCMessage) -> Result<(), RPCError> { - let op_id = msg.operation.op_id(); - let eventual = { - let mut inner = self.inner.lock(); - inner - .waiting_rpc_table - .remove(&op_id) - .ok_or_else(RPCError::else_internal(format!( - "Unmatched operation id: {:#?}", - msg - )))? - }; - eventual.resolve((Span::current().id(), msg)).await; - Ok(()) - } - - // wait for reply - async fn do_wait_for_reply( - &self, - waitable_reply: &WaitableReply, - ) -> Result, RPCError> { - let timeout_ms = u32::try_from(waitable_reply.timeout / 1000u64) - .map_err(RPCError::map_internal("invalid timeout"))?; - // wait for eventualvalue - let start_ts = intf::get_timestamp(); - let res = intf::timeout(timeout_ms, waitable_reply.eventual.instance()) - .await - .into_timeout_or(); - Ok(res.map(|res| { - let (_span_id, rpcreader) = res.take_value().unwrap(); - let end_ts = intf::get_timestamp(); - - // fixme: causes crashes? "Missing otel data span extensions"?? - //Span::current().follows_from(span_id); - - (rpcreader, end_ts - start_ts) - })) - } - #[instrument(level = "trace", skip(self, waitable_reply), err)] async fn wait_for_reply( &self, waitable_reply: WaitableReply, ) -> Result, RPCError> { - let out = self.do_wait_for_reply(&waitable_reply).await; + let out = self + .unlocked_inner + .waiting_rpc_table + .wait_for_op(waitable_reply.handle, waitable_reply.timeout) + .await; match &out { Err(_) | Ok(TimeoutOr::Timeout) => { - self.cancel_op_id_waiter(waitable_reply.op_id); - waitable_reply.node_ref.stats_question_lost(); } Ok(TimeoutOr::Value((rpcreader, _))) => { @@ -476,7 +526,7 @@ impl RPCProcessor { } // Verify hop count isn't larger than out maximum routed hop count - if out_hop_count > self.inner.lock().max_route_hop_count { + if out_hop_count > self.unlocked_inner.max_route_hop_count { return Err(RPCError::internal("hop count too long for route")) .map_err(logthru_rpc!(warn)); } @@ -574,10 +624,10 @@ impl RPCProcessor { // Calculate answer timeout // Timeout is number of hops times the timeout per hop - let timeout = self.inner.lock().timeout * (hop_count as u64); + let timeout = self.unlocked_inner.timeout * (hop_count as u64); // Set up op id eventual - let eventual = self.add_op_id_waiter(op_id); + let handle = self.unlocked_inner.waiting_rpc_table.add_op_waiter(op_id); // Send question let bytes = message.len() as u64; @@ -588,13 +638,11 @@ impl RPCProcessor { .await .map_err(|e| { // If we're returning an error, clean up - self.cancel_op_id_waiter(op_id); node_ref .stats_failed_to_send(send_ts, true); RPCError::network(e) })? => { // If we couldn't send we're still cleaning up - self.cancel_op_id_waiter(op_id); node_ref .stats_failed_to_send(send_ts, true); } @@ -605,8 +653,7 @@ impl RPCProcessor { // Pass back waitable reply completion Ok(NetworkResult::value(WaitableReply { - op_id, - eventual, + handle, timeout, node_ref, send_ts, @@ -794,7 +841,7 @@ impl RPCProcessor { let mut opt_sender_nr: Option = None; if let Some(sender_node_info) = operation.sender_node_info() { // Sender NodeInfo was specified, update our routing table with it - if !self.filter_node_info(RoutingDomain::PublicInternet, &sender_node_info.node_info) { + if !self.filter_node_info(routing_domain, &sender_node_info.node_info) { return Err(RPCError::invalid_format( "sender signednodeinfo has invalid peer scope", )); @@ -853,6 +900,7 @@ impl RPCProcessor { RPCOperationKind::Question(q) => match q.detail() { RPCQuestionDetail::StatusQ(_) => self.process_status_q(msg).await, RPCQuestionDetail::FindNodeQ(_) => self.process_find_node_q(msg).await, + RPCQuestionDetail::AppCallQ(_) => self.process_app_call_q(msg).await, RPCQuestionDetail::GetValueQ(_) => self.process_get_value_q(msg).await, RPCQuestionDetail::SetValueQ(_) => self.process_set_value_q(msg).await, RPCQuestionDetail::WatchValueQ(_) => self.process_watch_value_q(msg).await, @@ -871,8 +919,14 @@ impl RPCProcessor { RPCStatementDetail::ValueChanged(_) => self.process_value_changed(msg).await, RPCStatementDetail::Signal(_) => self.process_signal(msg).await, RPCStatementDetail::ReturnReceipt(_) => self.process_return_receipt(msg).await, + RPCStatementDetail::AppMessage(_) => self.process_app_message(msg).await, }, - RPCOperationKind::Answer(_) => self.complete_op_id_waiter(msg).await, + RPCOperationKind::Answer(_) => { + self.unlocked_inner + .waiting_rpc_table + .complete_op_waiter(msg.operation.op_id(), msg) + .await + } } } @@ -908,85 +962,6 @@ impl RPCProcessor { } } - #[instrument(level = "debug", skip_all, err)] - pub async fn startup(&self) -> EyreResult<()> { - trace!("startup rpc processor"); - let mut inner = self.inner.lock(); - // make local copy of node id for easy access - let c = self.config.get(); - inner.node_id = c.network.node_id; - inner.node_id_secret = c.network.node_id_secret; - - // set up channel - let mut concurrency = c.network.rpc.concurrency; - let mut queue_size = c.network.rpc.queue_size; - let mut timeout = ms_to_us(c.network.rpc.timeout_ms); - let mut max_route_hop_count = c.network.rpc.max_route_hop_count as usize; - if concurrency == 0 { - concurrency = intf::get_concurrency() / 2; - if concurrency == 0 { - concurrency = 1; - } - } - if queue_size == 0 { - queue_size = 1024; - } - if timeout == 0 { - timeout = 10000000; - } - if max_route_hop_count == 0 { - max_route_hop_count = 7usize; - } - inner.timeout = timeout; - inner.max_route_hop_count = max_route_hop_count; - let channel = flume::bounded(queue_size as usize); - inner.send_channel = Some(channel.0.clone()); - inner.stop_source = Some(StopSource::new()); - - // spin up N workers - trace!("Spinning up {} RPC workers", concurrency); - for _ in 0..concurrency { - let this = self.clone(); - let receiver = channel.1.clone(); - let jh = intf::spawn(Self::rpc_worker( - this, - inner.stop_source.as_ref().unwrap().token(), - receiver, - )); - inner.worker_join_handles.push(jh); - } - - Ok(()) - } - - #[instrument(level = "debug", skip_all)] - pub async fn shutdown(&self) { - debug!("starting rpc processor shutdown"); - - // Stop the rpc workers - let mut unord = FuturesUnordered::new(); - { - let mut inner = self.inner.lock(); - // take the join handles out - for h in inner.worker_join_handles.drain(..) { - unord.push(h); - } - // drop the stop - drop(inner.stop_source.take()); - } - debug!("stopping {} rpc worker tasks", unord.len()); - - // Wait for them to complete - while unord.next().await.is_some() {} - - debug!("resetting rpc processor state"); - - // Release the rpc processor - *self.inner.lock() = Self::new_inner(self.network_manager()); - - debug!("finished rpc processor shutdown"); - } - #[instrument(level = "trace", skip(self, body), err)] pub fn enqueue_message( &self, diff --git a/veilid-core/src/rpc_processor/operation_waiter.rs b/veilid-core/src/rpc_processor/operation_waiter.rs new file mode 100644 index 00000000..cb5d4afa --- /dev/null +++ b/veilid-core/src/rpc_processor/operation_waiter.rs @@ -0,0 +1,136 @@ +use super::*; + +#[derive(Debug)] +pub struct OperationWaitHandle +where + T: Unpin, +{ + waiter: OperationWaiter, + op_id: OperationId, + eventual_instance: Option, T)>>, +} + +impl Drop for OperationWaitHandle +where + T: Unpin, +{ + fn drop(&mut self) { + if self.eventual_instance.is_some() { + self.waiter.cancel_op_waiter(self.op_id); + } + } +} + +#[derive(Debug)] +pub struct OperationWaiterInner +where + T: Unpin, +{ + waiting_op_table: HashMap, T)>>, +} + +#[derive(Debug)] +pub struct OperationWaiter +where + T: Unpin, +{ + inner: Arc>>, +} + +impl Clone for OperationWaiter +where + T: Unpin, +{ + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl OperationWaiter +where + T: Unpin, +{ + pub fn new() -> Self { + Self { + inner: Arc::new(Mutex::new(OperationWaiterInner { + waiting_op_table: HashMap::new(), + })), + } + } + + // set up wait for op + pub fn add_op_waiter(&self, op_id: OperationId) -> OperationWaitHandle { + let mut inner = self.inner.lock(); + let e = EventualValue::new(); + if inner.waiting_op_table.insert(op_id, e.clone()).is_some() { + error!( + "add_op_waiter collision should not happen for op_id {}", + op_id + ); + } + + OperationWaitHandle { + waiter: self.clone(), + op_id, + eventual_instance: Some(e.instance()), + } + } + + // remove wait for op + fn cancel_op_waiter(&self, op_id: OperationId) { + let mut inner = self.inner.lock(); + inner.waiting_op_table.remove(&op_id); + } + + // complete the app call + #[instrument(level = "trace", skip(self, message), err)] + pub async fn complete_op_waiter(&self, op_id: OperationId, message: T) -> Result<(), RPCError> { + let eventual = { + let mut inner = self.inner.lock(); + inner + .waiting_op_table + .remove(&op_id) + .ok_or_else(RPCError::else_internal(format!( + "Unmatched app call id, possibly too late for timeout: {}", + op_id + )))? + }; + eventual.resolve((Span::current().id(), message)).await; + Ok(()) + } + + pub async fn wait_for_op( + &self, + mut handle: OperationWaitHandle, + timeout: u64, + ) -> Result, RPCError> { + let timeout_ms = u32::try_from(timeout / 1000u64) + .map_err(|e| RPCError::map_internal("invalid timeout")(e))?; + + // Take the instance + // After this, we must manually cancel since the cancel on handle drop is disabled + let eventual_instance = handle.eventual_instance.take().unwrap(); + + // wait for eventualvalue + let start_ts = intf::get_timestamp(); + let res = intf::timeout(timeout_ms, eventual_instance) + .await + .into_timeout_or(); + Ok(res + .on_timeout(|| { + log_rpc!(debug "op wait timed out: {}", handle.op_id); + self.cancel_op_waiter(handle.op_id); + }) + .map(|res| { + let (_span_id, ret) = res.take_value().unwrap(); + let end_ts = intf::get_timestamp(); + + // fixme: causes crashes? "Missing otel data span extensions"?? + //Span::current().follows_from(span_id); + + (ret, end_ts - start_ts) + })) + } +} diff --git a/veilid-core/src/rpc_processor/private_route.rs b/veilid-core/src/rpc_processor/private_route.rs index dba50d60..b552e768 100644 --- a/veilid-core/src/rpc_processor/private_route.rs +++ b/veilid-core/src/rpc_processor/private_route.rs @@ -11,7 +11,7 @@ impl RPCProcessor { let pr_hopcount = private_route.hop_count as usize; let sr_hopcount = safety_route_spec.hops.len(); let hopcount = 1 + sr_hopcount + pr_hopcount; - if hopcount > self.inner.lock().max_route_hop_count { + if hopcount > self.unlocked_inner.max_route_hop_count { return Err(RPCError::internal("hop count too long for route")); } diff --git a/veilid-core/src/rpc_processor/rpc_app_call.rs b/veilid-core/src/rpc_processor/rpc_app_call.rs new file mode 100644 index 00000000..13466f4a --- /dev/null +++ b/veilid-core/src/rpc_processor/rpc_app_call.rs @@ -0,0 +1,99 @@ +use super::*; + +impl RPCProcessor { + // Sends a high level app request and wait for response + // Can be sent via all methods including relays and routes + #[instrument(level = "trace", skip(self), ret, err)] + pub async fn rpc_call_app_call( + self, + dest: Destination, + message: Vec, + ) -> Result>>, RPCError> { + let app_call_q = RPCOperationAppCallQ { message }; + let question = RPCQuestion::new(RespondTo::Sender, RPCQuestionDetail::AppCallQ(app_call_q)); + + // Send the app call question + let waitable_reply = network_result_try!(self.question(dest, question).await?); + + // Wait for reply + let (msg, latency) = match self.wait_for_reply(waitable_reply).await? { + TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), + TimeoutOr::Value(v) => v, + }; + + // Get the right answer type + let app_call_a = match msg.operation.into_kind() { + RPCOperationKind::Answer(a) => match a.into_detail() { + RPCAnswerDetail::AppCallA(a) => a, + _ => return Err(RPCError::invalid_format("not an appcall answer")), + }, + _ => return Err(RPCError::invalid_format("not an answer")), + }; + + Ok(NetworkResult::value(Answer::new( + latency, + app_call_a.message, + ))) + } + + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] + pub(crate) async fn process_app_call_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + // Get the question + let app_call_q = match msg.operation.kind() { + RPCOperationKind::Question(q) => match q.detail() { + RPCQuestionDetail::AppCallQ(q) => q, + _ => panic!("not an appcall question"), + }, + _ => panic!("not a question"), + }; + + // Register a waiter for this app call + let id = msg.operation.op_id(); + let handle = self.unlocked_inner.waiting_app_call_table.add_op_waiter(id); + + // Pass the call up through the update callback + let sender = msg + .opt_sender_nr + .as_ref() + .map(|nr| NodeId::new(nr.node_id())); + let message = app_call_q.message.clone(); + (self.unlocked_inner.update_callback)(VeilidUpdate::AppCall(VeilidAppCall { + sender, + message, + id, + })); + + // Wait for an app call answer to come back from the app + let res = self + .unlocked_inner + .waiting_app_call_table + .wait_for_op(handle, self.unlocked_inner.timeout) + .await?; + let (message, _latency) = match res { + TimeoutOr::Timeout => { + // No message sent on timeout, but this isn't an error + log_rpc!(debug "App call timed out for id {}", id); + return Ok(()); + } + TimeoutOr::Value(v) => v, + }; + + // Return the appcall answer + let app_call_a = RPCOperationAppCallA { message }; + + // Send status answer + let res = self + .answer(msg, RPCAnswer::new(RPCAnswerDetail::AppCallA(app_call_a))) + .await?; + tracing::Span::current().record("res", &tracing::field::display(res)); + Ok(()) + } + + /// Exposed to API for apps to return app call answers + pub async fn app_call_reply(&self, id: u64, message: Vec) -> Result<(), RPCError> { + self.unlocked_inner + .waiting_app_call_table + .complete_op_waiter(id, message) + .await + } +} diff --git a/veilid-core/src/rpc_processor/rpc_app_message.rs b/veilid-core/src/rpc_processor/rpc_app_message.rs new file mode 100644 index 00000000..0b069280 --- /dev/null +++ b/veilid-core/src/rpc_processor/rpc_app_message.rs @@ -0,0 +1,42 @@ +use super::*; + +impl RPCProcessor { + // Sends a high level app message + // Can be sent via all methods including relays and routes + #[instrument(level = "trace", skip(self), ret, err)] + pub async fn rpc_call_app_message( + self, + dest: Destination, + message: Vec, + ) -> Result, RPCError> { + let app_message = RPCOperationAppMessage { message }; + let statement = RPCStatement::new(RPCStatementDetail::AppMessage(app_message)); + + // Send the app message request + network_result_try!(self.statement(dest, statement).await?); + + Ok(NetworkResult::value(())) + } + + #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)] + pub(crate) async fn process_app_message(&self, msg: RPCMessage) -> Result<(), RPCError> { + // Get the statement + let app_message = match msg.operation.into_kind() { + RPCOperationKind::Statement(s) => match s.into_detail() { + RPCStatementDetail::AppMessage(s) => s, + _ => panic!("not an app message"), + }, + _ => panic!("not a statement"), + }; + + // Pass the message up through the update callback + let sender = msg.opt_sender_nr.map(|nr| NodeId::new(nr.node_id())); + let message = app_message.message; + (self.unlocked_inner.update_callback)(VeilidUpdate::AppMessage(VeilidAppMessage { + sender, + message, + })); + + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/rpc_error.rs b/veilid-core/src/rpc_processor/rpc_error.rs index bf650778..89c9eb11 100644 --- a/veilid-core/src/rpc_processor/rpc_error.rs +++ b/veilid-core/src/rpc_processor/rpc_error.rs @@ -3,8 +3,6 @@ use super::*; #[derive(ThisError, Debug, Clone, PartialOrd, PartialEq, Eq, Ord)] #[must_use] pub enum RPCError { - #[error("[RPCError: Unreachable({0})]")] - Unreachable(DHTKey), #[error("[RPCError: Unimplemented({0})]")] Unimplemented(String), #[error("[RPCError: InvalidFormat({0})]")] @@ -18,9 +16,6 @@ pub enum RPCError { } impl RPCError { - pub fn unreachable(key: DHTKey) -> Self { - Self::Unreachable(key) - } pub fn unimplemented(x: X) -> Self { Self::Unimplemented(x.to_string()) } @@ -52,3 +47,15 @@ impl RPCError { move |x| Self::Network(format!("{}: {}", message.to_string(), x.to_string())) } } + +impl From for VeilidAPIError { + fn from(e: RPCError) -> Self { + match e { + RPCError::Unimplemented(message) => VeilidAPIError::Unimplemented { message }, + RPCError::InvalidFormat(message) => VeilidAPIError::Generic { message }, + RPCError::Protocol(message) => VeilidAPIError::Generic { message }, + RPCError::Internal(message) => VeilidAPIError::Internal { message }, + RPCError::Network(message) => VeilidAPIError::Generic { message }, + } + } +} diff --git a/veilid-core/src/rpc_processor/rpc_signal.rs b/veilid-core/src/rpc_processor/rpc_signal.rs index 3773e1bc..08cb3713 100644 --- a/veilid-core/src/rpc_processor/rpc_signal.rs +++ b/veilid-core/src/rpc_processor/rpc_signal.rs @@ -24,7 +24,7 @@ impl RPCProcessor { let signal = match msg.operation.into_kind() { RPCOperationKind::Statement(s) => match s.into_detail() { RPCStatementDetail::Signal(s) => s, - _ => panic!("not a node info update"), + _ => panic!("not a signal"), }, _ => panic!("not a statement"), }; diff --git a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs index 3ff0f693..e1cc7788 100644 --- a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs +++ b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs @@ -10,13 +10,7 @@ impl RPCProcessor { redirect: bool, ) -> Result { let network_manager = self.network_manager(); - let receipt_time = ms_to_us( - self.config - .get() - .network - .dht - .validate_dial_info_receipt_time_ms, - ); + let receipt_time = ms_to_us(self.unlocked_inner.validate_dial_info_receipt_time_ms); // Generate receipt and waitable eventual so we can see if we get the receipt back let (receipt, eventual_value) = network_manager diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 79d68d3f..50835a13 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -1,8 +1,13 @@ #![allow(dead_code)] mod debug; +mod privacy; +mod routing_context; mod serialize_helpers; + pub use debug::*; +pub use privacy::*; +pub use routing_context::*; pub use serialize_helpers::*; use crate::*; @@ -15,17 +20,17 @@ pub use alloc::string::ToString; pub use attachment_manager::AttachmentManager; pub use core::str::FromStr; pub use dht::Crypto; -pub use dht::{generate_secret, sign, verify, DHTKey, DHTKeySecret, DHTSignature}; +pub use dht::{generate_secret, sign, verify, DHTKey, DHTKeySecret, DHTSignature, Nonce}; pub use intf::BlockStore; pub use intf::ProtectedStore; pub use intf::TableStore; pub use network_manager::NetworkManager; pub use routing_table::RoutingTable; -//pub use rpc_processor::RPCProcessor; use core::fmt; use core_context::{api_shutdown, VeilidCoreContext}; use enumset::*; +use rpc_processor::RPCProcessor; use serde::*; use xx::*; @@ -68,8 +73,8 @@ pub enum VeilidAPIError { Shutdown, #[error("Node not found: {node_id}")] NodeNotFound { node_id: NodeId }, - #[error("No dial info: {node_id}")] - NoDialInfo { node_id: NodeId }, + #[error("No connection: {message}")] + NoConnection { message: String }, #[error("No peer info: {node_id}")] NoPeerInfo { node_id: NodeId }, #[error("Internal: {message}")] @@ -106,8 +111,8 @@ impl VeilidAPIError { pub fn node_not_found(node_id: NodeId) -> Self { Self::NodeNotFound { node_id } } - pub fn no_dial_info(node_id: NodeId) -> Self { - Self::NoDialInfo { node_id } + pub fn no_connection(message: String) -> Self { + Self::NoConnection { message } } pub fn no_peer_info(node_id: NodeId) -> Self { Self::NoPeerInfo { node_id } @@ -216,12 +221,30 @@ impl fmt::Display for VeilidLogLevel { } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct VeilidStateLog { +pub struct VeilidLog { pub log_level: VeilidLogLevel, pub message: String, pub backtrace: Option, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct VeilidAppMessage { + /// Some(sender) if the message was sent directly, None if received via a private/safety route + pub sender: Option, + /// The content of the message to deliver to the application + pub message: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct VeilidAppCall { + /// Some(sender) if the request was sent directly, None if received via a private/safety route + pub sender: Option, + /// The content of the request to deliver to the application + pub message: Vec, + /// The id to reply to + pub id: u64, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct VeilidStateAttachment { pub state: AttachmentState, @@ -247,7 +270,9 @@ pub struct VeilidStateNetwork { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "kind")] pub enum VeilidUpdate { - Log(VeilidStateLog), + Log(VeilidLog), + AppMessage(VeilidAppMessage), + AppCall(VeilidAppCall), Attachment(VeilidStateAttachment), Network(VeilidStateNetwork), Shutdown, @@ -380,6 +405,24 @@ impl MatchesDialInfoFilter for DialInfoDetail { } } +impl DialInfoDetail { + pub fn reliable_sort(a: &DialInfoDetail, b: &DialInfoDetail) -> core::cmp::Ordering { + if a.class < b.class { + return core::cmp::Ordering::Less; + } + if a.class > b.class { + return core::cmp::Ordering::Greater; + } + DialInfo::reliable_sort(&a.dial_info, &b.dial_info) + } + pub const NO_SORT: std::option::Option< + for<'r, 's> fn( + &'r veilid_api::DialInfoDetail, + &'s veilid_api::DialInfoDetail, + ) -> std::cmp::Ordering, + > = None:: core::cmp::Ordering>; +} + #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)] pub enum NetworkClass { InboundCapable = 0, // I = Inbound capable without relay, may require signal @@ -471,29 +514,59 @@ pub struct NodeInfo { } impl NodeInfo { - pub fn first_filtered_dial_info_detail(&self, filter: F) -> Option + pub fn first_filtered_dial_info_detail( + &self, + sort: Option, + filter: F, + ) -> Option where + S: Fn(&DialInfoDetail, &DialInfoDetail) -> std::cmp::Ordering, F: Fn(&DialInfoDetail) -> bool, { - for did in &self.dial_info_detail_list { - if filter(did) { - return Some(did.clone()); + if let Some(sort) = sort { + let mut dids = self.dial_info_detail_list.clone(); + dids.sort_by(sort); + for did in dids { + if filter(&did) { + return Some(did); + } } - } + } else { + for did in &self.dial_info_detail_list { + if filter(did) { + return Some(did.clone()); + } + } + }; None } - pub fn all_filtered_dial_info_details(&self, filter: F) -> Vec + pub fn all_filtered_dial_info_details( + &self, + sort: Option, + filter: F, + ) -> Vec where + S: Fn(&DialInfoDetail, &DialInfoDetail) -> std::cmp::Ordering, F: Fn(&DialInfoDetail) -> bool, { let mut dial_info_detail_list = Vec::new(); - for did in &self.dial_info_detail_list { - if filter(did) { - dial_info_detail_list.push(did.clone()); + if let Some(sort) = sort { + let mut dids = self.dial_info_detail_list.clone(); + dids.sort_by(sort); + for did in dids { + if filter(&did) { + dial_info_detail_list.push(did); + } } - } + } else { + for did in &self.dial_info_detail_list { + if filter(did) { + dial_info_detail_list.push(did.clone()); + } + } + }; dial_info_detail_list } @@ -599,6 +672,38 @@ impl ProtocolType { ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS => LowLevelProtocolType::TCP, } } + pub fn sort_order(&self, reliable: bool) -> usize { + match self { + ProtocolType::UDP => { + if reliable { + 3 + } else { + 0 + } + } + ProtocolType::TCP => { + if reliable { + 0 + } else { + 1 + } + } + ProtocolType::WS => { + if reliable { + 1 + } else { + 2 + } + } + ProtocolType::WSS => { + if reliable { + 2 + } else { + 3 + } + } + } + } } pub type ProtocolTypeSet = EnumSet; @@ -1372,6 +1477,24 @@ impl DialInfo { } } } + + pub fn reliable_sort(a: &DialInfo, b: &DialInfo) -> core::cmp::Ordering { + let ca = a.protocol_type().sort_order(true); + let cb = b.protocol_type().sort_order(true); + if ca < cb { + return core::cmp::Ordering::Less; + } + if ca > cb { + return core::cmp::Ordering::Greater; + } + match (a, b) { + (DialInfo::UDP(a), DialInfo::UDP(b)) => a.cmp(b), + (DialInfo::TCP(a), DialInfo::TCP(b)) => a.cmp(b), + (DialInfo::WS(a), DialInfo::WS(b)) => a.cmp(b), + (DialInfo::WSS(a), DialInfo::WSS(b)) => a.cmp(b), + _ => unreachable!(), + } + } } impl MatchesDialInfoFilter for DialInfo { @@ -1712,128 +1835,6 @@ pub struct PartialTunnel { ///////////////////////////////////////////////////////////////////////////////////////////////////// -#[derive(Clone, Debug, Default, Serialize, Deserialize)] -pub struct RouteHopSpec { - pub dial_info: NodeDialInfo, -} - -#[derive(Clone, Debug, Default, Serialize, Deserialize)] -pub struct PrivateRouteSpec { - // - pub public_key: DHTKey, - pub secret_key: DHTKeySecret, - pub hops: Vec, -} - -impl PrivateRouteSpec { - pub fn new() -> Self { - let (pk, sk) = generate_secret(); - PrivateRouteSpec { - public_key: pk, - secret_key: sk, - hops: Vec::new(), - } - } -} - -#[derive(Clone, Debug, Default, Serialize, Deserialize)] -pub struct SafetyRouteSpec { - pub public_key: DHTKey, - pub secret_key: DHTKeySecret, - pub hops: Vec, -} - -impl SafetyRouteSpec { - pub fn new() -> Self { - let (pk, sk) = generate_secret(); - SafetyRouteSpec { - public_key: pk, - secret_key: sk, - hops: Vec::new(), - } - } -} - -#[derive(Clone, Debug, Default, Serialize, Deserialize)] -pub struct RoutingContextOptions { - pub safety_route_spec: Option, - pub private_route_spec: Option, -} - -///////////////////////////////////////////////////////////////////////////////////////////////////// - -pub struct RoutingContextInner { - api: VeilidAPI, - options: RoutingContextOptions, -} - -impl Drop for RoutingContextInner { - fn drop(&mut self) { - // self.api - // .borrow_mut() - // .routing_contexts - // //.remove(&self.id); - } -} - -#[derive(Clone)] -pub struct RoutingContext { - inner: Arc>, -} - -impl RoutingContext { - fn new(api: VeilidAPI, options: RoutingContextOptions) -> Self { - Self { - inner: Arc::new(Mutex::new(RoutingContextInner { api, options })), - } - } - - pub fn api(&self) -> VeilidAPI { - self.inner.lock().api.clone() - } - - /////////////////////////////////// - /// - - pub async fn get_value(&self, _value_key: ValueKey) -> Result, VeilidAPIError> { - panic!("unimplemented"); - } - - pub async fn set_value( - &self, - _value_key: ValueKey, - _value: Vec, - ) -> Result { - panic!("unimplemented"); - } - - pub async fn watch_value( - &self, - _value_key: ValueKey, - _callback: ValueChangeCallback, - ) -> Result { - panic!("unimplemented"); - } - - pub async fn cancel_watch_value(&self, _value_key: ValueKey) -> Result { - panic!("unimplemented"); - } - - pub async fn find_block(&self, _block_id: BlockId) -> Result, VeilidAPIError> { - panic!("unimplemented"); - } - - pub async fn supply_block(&self, _block_id: BlockId) -> Result { - panic!("unimplemented"); - } - - pub async fn signal(&self, _data: Vec) -> Result { - panic!("unimplemented"); - } -} - -///////////////////////////////////////////////////////////////////////////////////////////////////// - struct VeilidAPIInner { context: Option, } @@ -1930,14 +1931,13 @@ impl VeilidAPI { } Err(VeilidAPIError::not_initialized()) } - - // pub fn rpc_processor(&self) -> Result { - // let inner = self.inner.lock(); - // if let Some(context) = &inner.context { - // return Ok(context.attachment_manager.network_manager().rpc_processor()); - // } - // Err(VeilidAPIError::NotInitialized) - // } + pub fn rpc_processor(&self) -> Result { + let inner = self.inner.lock(); + if let Some(context) = &inner.context { + return Ok(context.attachment_manager.network_manager().rpc_processor()); + } + Err(VeilidAPIError::NotInitialized) + } //////////////////////////////////////////////////////////////// // Attach/Detach @@ -1978,60 +1978,6 @@ impl VeilidAPI { .map_err(|e| VeilidAPIError::internal(e)) } - //////////////////////////////////////////////////////////////// - // Direct Node Access (pretty much for testing only) - - // #[instrument(level = "debug", err, skip(self))] - // pub async fn search_dht(&self, node_id: NodeId) -> Result { - // let rpc_processor = self.rpc_processor()?; - // let config = self.config()?; - // let (count, fanout, timeout) = { - // let c = config.get(); - // ( - // c.network.dht.resolve_node_count, - // c.network.dht.resolve_node_fanout, - // c.network.dht.resolve_node_timeout_ms.map(ms_to_us), - // ) - // }; - - // let node_ref = rpc_processor - // .search_dht_single_key(node_id.key, count, fanout, timeout) - // .await - // .map_err(map_rpc_error!())?; - - // let answer = node_ref.peer_info(); - // if let Some(answer) = answer { - // Ok(answer) - // } else { - // Err(VeilidAPIError::NoPeerInfo { - // node_id: NodeId::new(node_ref.node_id()), - // }) - // } - // } - - // #[instrument(level = "debug", err, skip(self))] - // pub async fn search_dht_multi(&self, node_id: NodeId) -> Result, VeilidAPIError> { - // let rpc_processor = self.rpc_processor()?; - // let config = self.config()?; - // let (count, fanout, timeout) = { - // let c = config.get(); - // ( - // c.network.dht.resolve_node_count, - // c.network.dht.resolve_node_fanout, - // c.network.dht.resolve_node_timeout_ms.map(ms_to_us), - // ) - // }; - - // let node_refs = rpc_processor - // .search_dht_multi_key(node_id.key, count, fanout, timeout) - // .await - // .map_err(map_rpc_error!())?; - - // let answer = node_refs.iter().filter_map(|x| x.peer_info()).collect(); - - // Ok(answer) - // } - //////////////////////////////////////////////////////////////// // Safety / Private Route Handling @@ -2052,54 +1998,23 @@ impl VeilidAPI { } //////////////////////////////////////////////////////////////// - // Routing Contexts - // - // Safety route specified here is for _this_ node's anonymity as a sender, used via the 'route' operation - // Private route specified here is for _this_ node's anonymity as a receiver, passed out via the 'respond_to' field for replies - - #[instrument(skip(self))] - pub async fn safe_private( - &self, - safety_route_spec: SafetyRouteSpec, - private_route_spec: PrivateRouteSpec, - ) -> RoutingContext { - self.routing_context(RoutingContextOptions { - safety_route_spec: Some(safety_route_spec), - private_route_spec: Some(private_route_spec), - }) - .await - } + // Routing Context #[instrument(level = "debug", skip(self))] - pub async fn safe_public(&self, safety_route_spec: SafetyRouteSpec) -> RoutingContext { - self.routing_context(RoutingContextOptions { - safety_route_spec: Some(safety_route_spec), - private_route_spec: None, - }) - .await + pub fn routing_context(&self) -> RoutingContext { + RoutingContext::new(self.clone()) } - #[instrument(level = "debug", skip(self))] - pub async fn unsafe_private(&self, private_route_spec: PrivateRouteSpec) -> RoutingContext { - self.routing_context(RoutingContextOptions { - safety_route_spec: None, - private_route_spec: Some(private_route_spec), - }) - .await - } + //////////////////////////////////////////////////////////////// + // App Calls #[instrument(level = "debug", skip(self))] - pub async fn unsafe_public(&self) -> RoutingContext { - self.routing_context(RoutingContextOptions { - safety_route_spec: None, - private_route_spec: None, - }) - .await - } - - #[instrument(level = "debug", skip(self))] - pub async fn routing_context(&self, options: RoutingContextOptions) -> RoutingContext { - RoutingContext::new(self.clone(), options) + pub async fn app_call_reply(&self, id: u64, message: Vec) -> Result<(), VeilidAPIError> { + let rpc_processor = self.rpc_processor()?; + rpc_processor + .app_call_reply(id, message) + .await + .map_err(|e| e.into()) } //////////////////////////////////////////////////////////////// diff --git a/veilid-core/src/veilid_api/privacy.rs b/veilid-core/src/veilid_api/privacy.rs new file mode 100644 index 00000000..046785a9 --- /dev/null +++ b/veilid-core/src/veilid_api/privacy.rs @@ -0,0 +1,124 @@ +use super::*; + +///////////////////////////////////////////////////////////////////////////////////////////////////// +// Privacy Specs + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct RouteHopSpec { + pub dial_info: NodeDialInfo, +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct PrivateRouteSpec { + // + pub public_key: DHTKey, + pub secret_key: DHTKeySecret, + pub hops: Vec, +} + +impl PrivateRouteSpec { + pub fn new() -> Self { + let (pk, sk) = generate_secret(); + PrivateRouteSpec { + public_key: pk, + secret_key: sk, + hops: Vec::new(), + } + } +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct SafetyRouteSpec { + pub public_key: DHTKey, + pub secret_key: DHTKeySecret, + pub hops: Vec, +} + +impl SafetyRouteSpec { + pub fn new() -> Self { + let (pk, sk) = generate_secret(); + SafetyRouteSpec { + public_key: pk, + secret_key: sk, + hops: Vec::new(), + } + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Compiled Privacy Objects + +#[derive(Clone, Debug)] +pub struct RouteHopData { + pub nonce: Nonce, + pub blob: Vec, +} + +#[derive(Clone, Debug)] +pub struct RouteHop { + pub dial_info: NodeDialInfo, + pub next_hop: Option, +} + +#[derive(Clone, Debug)] +pub struct PrivateRoute { + pub public_key: DHTKey, + pub hop_count: u8, + pub hops: Option, +} + +impl PrivateRoute { + pub fn new_stub(public_key: DHTKey) -> Self { + Self { + public_key, + hop_count: 0, + hops: None, + } + } +} + +impl fmt::Display for PrivateRoute { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "PR({:?}+{}{})", + self.public_key, + self.hop_count, + if let Some(hops) = &self.hops { + format!("->{}", hops.dial_info) + } else { + "".to_owned() + } + ) + } +} + +#[derive(Clone, Debug)] +pub enum SafetyRouteHops { + Data(RouteHopData), + Private(PrivateRoute), +} + +#[derive(Clone, Debug)] +pub struct SafetyRoute { + pub public_key: DHTKey, + pub hop_count: u8, + pub hops: SafetyRouteHops, +} + +impl fmt::Display for SafetyRoute { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "SR({:?}+{}{})", + self.public_key, + self.hop_count, + match &self.hops { + SafetyRouteHops::Data(_) => "".to_owned(), + SafetyRouteHops::Private(p) => format!("->{}", p), + } + ) + } +} + +// xxx impl to_blob and from_blob using capnp here diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs new file mode 100644 index 00000000..5cfac250 --- /dev/null +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -0,0 +1,213 @@ +use super::*; +/////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Clone, Debug)] +pub enum Target { + NodeId(NodeId), + PrivateRoute(PrivateRoute), +} + +pub struct RoutingContextInner {} + +pub struct RoutingContextUnlockedInner { + /// Safety route specified here is for _this_ node's anonymity as a sender, used via the 'route' operation + safety_route_spec: Option>, + /// Private route specified here is for _this_ node's anonymity as a receiver, passed out via the 'respond_to' field for replies + private_route_spec: Option>, + /// Choose reliable protocols over unreliable/faster protocols when available + reliable: bool, +} + +impl Drop for RoutingContextInner { + fn drop(&mut self) { + // self.api + // .borrow_mut() + // .routing_contexts + // //.remove(&self.id); + } +} + +#[derive(Clone)] +pub struct RoutingContext { + /// Veilid API handle + api: VeilidAPI, + inner: Arc>, + unlocked_inner: Arc, +} + +impl RoutingContext { + //////////////////////////////////////////////////////////////// + + pub(super) fn new(api: VeilidAPI) -> Self { + Self { + api, + inner: Arc::new(Mutex::new(RoutingContextInner {})), + unlocked_inner: Arc::new(RoutingContextUnlockedInner { + safety_route_spec: None, + private_route_spec: None, + reliable: false, + }), + } + } + + pub fn with_privacy( + self, + safety_route_spec: SafetyRouteSpec, + private_route_spec: PrivateRouteSpec, + ) -> Self { + Self { + api: self.api.clone(), + inner: Arc::new(Mutex::new(RoutingContextInner {})), + unlocked_inner: Arc::new(RoutingContextUnlockedInner { + safety_route_spec: Some(Arc::new(safety_route_spec)), + private_route_spec: Some(Arc::new(private_route_spec)), + reliable: self.unlocked_inner.reliable, + }), + } + } + + pub fn with_reliability(self) -> Self { + Self { + api: self.api.clone(), + inner: Arc::new(Mutex::new(RoutingContextInner {})), + unlocked_inner: Arc::new(RoutingContextUnlockedInner { + safety_route_spec: self.unlocked_inner.safety_route_spec.clone(), + private_route_spec: self.unlocked_inner.private_route_spec.clone(), + reliable: true, + }), + } + } + + pub fn api(&self) -> VeilidAPI { + self.api.clone() + } + + async fn get_destination( + &self, + target: Target, + ) -> Result { + let rpc_processor = self.api.rpc_processor()?; + + match target { + Target::NodeId(node_id) => { + // Resolve node + let mut nr = match rpc_processor.resolve_node(node_id.key).await { + Ok(Some(nr)) => nr, + Ok(None) => return Err(VeilidAPIError::NodeNotFound { node_id }), + Err(e) => return Err(e.into()), + }; + // Apply reliability sort + if self.unlocked_inner.reliable { + nr.set_reliable(); + } + Ok(rpc_processor::Destination::Direct { + target: nr, + safety_route_spec: self.unlocked_inner.safety_route_spec.clone(), + }) + } + Target::PrivateRoute(pr) => Ok(rpc_processor::Destination::PrivateRoute { + private_route: pr, + safety_route_spec: self.unlocked_inner.safety_route_spec.clone(), + }), + } + } + + //////////////////////////////////////////////////////////////// + // App-level Messaging + + #[instrument(level = "debug", err, skip(self))] + pub async fn app_call( + &self, + target: Target, + request: Vec, + ) -> Result, VeilidAPIError> { + let rpc_processor = self.api.rpc_processor()?; + + // Get destination + let dest = self.get_destination(target).await?; + + // Send app message + let answer = match rpc_processor.rpc_call_app_call(dest, request).await { + Ok(NetworkResult::Value(v)) => v, + Ok(NetworkResult::Timeout) => return Err(VeilidAPIError::Timeout), + Ok(NetworkResult::NoConnection(e)) => { + return Err(VeilidAPIError::NoConnection { + message: e.to_string(), + }) + } + Ok(NetworkResult::InvalidMessage(message)) => { + return Err(VeilidAPIError::Generic { message }) + } + Err(e) => return Err(e.into()), + }; + + Ok(answer.answer) + } + + #[instrument(level = "debug", err, skip(self))] + pub async fn app_message( + &self, + target: Target, + message: Vec, + ) -> Result<(), VeilidAPIError> { + let rpc_processor = self.api.rpc_processor()?; + + // Get destination + let dest = self.get_destination(target).await?; + + // Send app message + match rpc_processor.rpc_call_app_message(dest, message).await { + Ok(NetworkResult::Value(())) => {} + Ok(NetworkResult::Timeout) => return Err(VeilidAPIError::Timeout), + Ok(NetworkResult::NoConnection(e)) => { + return Err(VeilidAPIError::NoConnection { + message: e.to_string(), + }) + } + Ok(NetworkResult::InvalidMessage(message)) => { + return Err(VeilidAPIError::Generic { message }) + } + Err(e) => return Err(e.into()), + }; + + Ok(()) + } + + /////////////////////////////////// + /// DHT Values + + pub async fn get_value(&self, _value_key: ValueKey) -> Result, VeilidAPIError> { + panic!("unimplemented"); + } + + pub async fn set_value( + &self, + _value_key: ValueKey, + _value: Vec, + ) -> Result { + panic!("unimplemented"); + } + + pub async fn watch_value( + &self, + _value_key: ValueKey, + _callback: ValueChangeCallback, + ) -> Result { + panic!("unimplemented"); + } + + pub async fn cancel_watch_value(&self, _value_key: ValueKey) -> Result { + panic!("unimplemented"); + } + + /////////////////////////////////// + /// Block Store + + pub async fn find_block(&self, _block_id: BlockId) -> Result, VeilidAPIError> { + panic!("unimplemented"); + } + + pub async fn supply_block(&self, _block_id: BlockId) -> Result { + panic!("unimplemented"); + } +} diff --git a/veilid-core/src/xx/eventual_value.rs b/veilid-core/src/xx/eventual_value.rs index ab0825aa..2bdf2a43 100644 --- a/veilid-core/src/xx/eventual_value.rs +++ b/veilid-core/src/xx/eventual_value.rs @@ -61,6 +61,14 @@ pub struct EventualValueFuture { eventual: EventualValue, } +impl core::fmt::Debug for EventualValueFuture { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("EventualValueFuture") + .field("id", &self.id) + .finish() + } +} + impl Future for EventualValueFuture { type Output = EventualValue; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { diff --git a/veilid-core/src/xx/timeout_or.rs b/veilid-core/src/xx/timeout_or.rs index 04f57d53..62786c36 100644 --- a/veilid-core/src/xx/timeout_or.rs +++ b/veilid-core/src/xx/timeout_or.rs @@ -104,6 +104,15 @@ impl TimeoutOr { Self::Value(v) => TimeoutOr::::Value(f(v)), } } + pub fn on_timeout(self, f: F) -> Self { + match self { + Self::Timeout => { + f(); + Self::Timeout + } + Self::Value(v) => Self::Value(v), + } + } pub fn into_timeout_error(self) -> Result { match self { Self::Timeout => Err(TimeoutError {}), diff --git a/veilid-flutter/example/lib/main.dart b/veilid-flutter/example/lib/main.dart index e635f0dd..568b67e5 100644 --- a/veilid-flutter/example/lib/main.dart +++ b/veilid-flutter/example/lib/main.dart @@ -160,31 +160,31 @@ class _MyAppState extends State with UiLoggy { }); } - Future processUpdateLog(VeilidUpdateLog update) async { + Future processLog(VeilidLog log) async { StackTrace? stackTrace; Object? error; - final backtrace = update.backtrace; + final backtrace = log.backtrace; if (backtrace != null) { stackTrace = StackTrace.fromString("$backtrace\n${StackTrace.current.toString()}"); - error = 'embedded stack trace for ${update.logLevel} ${update.message}'; + error = 'embedded stack trace for ${log.logLevel} ${log.message}'; } - switch (update.logLevel) { + switch (log.logLevel) { case VeilidLogLevel.error: - loggy.error(update.message, error, stackTrace); + loggy.error(log.message, error, stackTrace); break; case VeilidLogLevel.warn: - loggy.warning(update.message, error, stackTrace); + loggy.warning(log.message, error, stackTrace); break; case VeilidLogLevel.info: - loggy.info(update.message, error, stackTrace); + loggy.info(log.message, error, stackTrace); break; case VeilidLogLevel.debug: - loggy.debug(update.message, error, stackTrace); + loggy.debug(log.message, error, stackTrace); break; case VeilidLogLevel.trace: - loggy.trace(update.message, error, stackTrace); + loggy.trace(log.message, error, stackTrace); break; } } @@ -193,8 +193,12 @@ class _MyAppState extends State with UiLoggy { var stream = _updateStream; if (stream != null) { await for (final update in stream) { - if (update is VeilidUpdateLog) { - await processUpdateLog(update); + if (update is VeilidLog) { + await processLog(update); + } else if (update is VeilidAppMessage) { + loggy.info("AppMessage: ${update.json}"); + } else if (update is VeilidAppCall) { + loggy.info("AppCall: ${update.json}"); } else { loggy.trace("Update: ${update.json}"); } diff --git a/veilid-flutter/lib/veilid.dart b/veilid-flutter/lib/veilid.dart index b859eca5..85eb8f8f 100644 --- a/veilid-flutter/lib/veilid.dart +++ b/veilid-flutter/lib/veilid.dart @@ -1232,11 +1232,19 @@ abstract class VeilidUpdate { switch (json["kind"]) { case "Log": { - return VeilidUpdateLog( + return VeilidLog( logLevel: veilidLogLevelFromJson(json["log_level"]), message: json["message"], backtrace: json["backtrace"]); } + case "AppMessage": + { + return VeilidAppMessage(); + } + case "AppCall": + { + return VeilidAppCall(); + } case "Attachment": { return VeilidUpdateAttachment( @@ -1256,12 +1264,12 @@ abstract class VeilidUpdate { Map get json; } -class VeilidUpdateLog implements VeilidUpdate { +class VeilidLog implements VeilidUpdate { final VeilidLogLevel logLevel; final String message; final String? backtrace; // - VeilidUpdateLog({ + VeilidLog({ required this.logLevel, required this.message, required this.backtrace,