From b010c8a61f0ea8ffa6c0376a7810f5227bb15b8f Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 3 Jul 2022 15:52:27 -0400 Subject: [PATCH] checkpoint --- veilid-core/proto/veilid.capnp | 8 +- veilid-core/src/rpc_processor/coders/mod.rs | 6 + .../rpc_processor/coders/operations/mod.rs | 22 ++ .../coders/operations/operation.rs | 98 ++++++++ .../coders/operations/operation_detail.rs | 214 ++++++++++++++++++ .../coders/operations/operation_find_node.rs | 66 ++++++ .../coders/operations/operation_get_value.rs | 67 ++++++ .../operations/operation_node_info_update.rs | 29 +++ .../coders/operations/operation_route.rs | 96 ++++++++ .../coders/operations/operation_status.rs | 60 +++++ .../operation_validate_dial_info.rs | 37 +++ .../coders/operations/respond_to.rs | 62 +++++ .../src/rpc_processor/coders/value_data.rs | 20 ++ .../src/rpc_processor/coders/value_key.rs | 26 +++ veilid-core/src/rpc_processor/debug.rs | 42 +++- veilid-core/src/rpc_processor/mod.rs | 159 +++++++------ veilid-core/src/veilid_api/mod.rs | 18 ++ 17 files changed, 940 insertions(+), 90 deletions(-) create mode 100644 veilid-core/src/rpc_processor/coders/operations/mod.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_detail.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_node_info_update.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_route.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_status.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_validate_dial_info.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/respond_to.rs create mode 100644 veilid-core/src/rpc_processor/coders/value_data.rs create mode 100644 veilid-core/src/rpc_processor/coders/value_key.rs diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index cae2421b..b7421992 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -162,10 +162,10 @@ struct ValueKey { subkey @1 :Text; # the name of the subkey (or empty if the whole key) } -struct ValueKeySeq { - key @0 :ValueKey; # the location of the value - seq @1 :ValueSeqNum; # the sequence number of the value subkey -} +# struct ValueKeySeq { +# key @0 :ValueKey; # the location of the value +# seq @1 :ValueSeqNum; # the sequence number of the value subkey +# } struct ValueData { data @0 :Data; # value or subvalue contents in CBOR format diff --git a/veilid-core/src/rpc_processor/coders/mod.rs b/veilid-core/src/rpc_processor/coders/mod.rs index 9e69b872..8f6f4f7c 100644 --- a/veilid-core/src/rpc_processor/coders/mod.rs +++ b/veilid-core/src/rpc_processor/coders/mod.rs @@ -7,6 +7,7 @@ mod node_dial_info; mod node_info; mod node_status; mod nonce; +mod operations; mod peer_info; mod private_safety_route; mod protocol_set; @@ -16,6 +17,8 @@ mod signal_info; mod signature; mod signed_node_info; mod socket_address; +mod value_data; +mod value_key; pub use address::*; pub use dial_info::*; @@ -26,6 +29,7 @@ pub use node_dial_info::*; pub use node_info::*; pub use node_status::*; pub use nonce::*; +pub use operations::*; pub use peer_info::*; pub use private_safety_route::*; pub use protocol_set::*; @@ -35,5 +39,7 @@ pub use signal_info::*; pub use signature::*; pub use signed_node_info::*; pub use socket_address::*; +pub use value_data::*; +pub use value_key::*; use super::*; diff --git a/veilid-core/src/rpc_processor/coders/operations/mod.rs b/veilid-core/src/rpc_processor/coders/operations/mod.rs new file mode 100644 index 00000000..a595b9bf --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/mod.rs @@ -0,0 +1,22 @@ +mod operation; +mod operation_detail; +mod operation_find_node; +mod operation_get_value; +mod operation_node_info_update; +mod operation_route; +mod operation_status; +mod operation_validate_dial_info; +mod respond_to; + +use super::*; + +pub use operation::*; +pub use operation_detail::*; +pub use operation_find_node::*; +pub use operation_get_value::*; +pub use operation_node_info_update::*; +pub use operation_route::*; +pub use operation_status::*; +pub use operation_validate_dial_info::*; + +pub use respond_to::*; diff --git a/veilid-core/src/rpc_processor/coders/operations/operation.rs b/veilid-core/src/rpc_processor/coders/operations/operation.rs new file mode 100644 index 00000000..b7f59dca --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation.rs @@ -0,0 +1,98 @@ +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +struct RPCOperation { + op_id: u64, + // index: u32, + // is_q: bool, + // wants_answer: bool, + respond_to: RespondTo, + detail: RPCOperationDetail, +} + +impl RPCOperation { + pub fn decode( + operation_reader: &veilid_capnp::operation::Reader, + sender_node_id: &DHTKey, + ) -> Result { + let op_id = operation_reader.get_op_id(); + + let respond_to_reader = operation_reader.get_respond_to(); + let respond_to = RespondTo::decode(&respond_to_reader, sender_node_id)?; + + let detail_reader = operation_reader.get_detail(); + let detail = RPCOperationDetail::decode(&detail_reader, sender_node_id)?; + + Ok(RPCOperation { + op_id, + respond_to, + detail, + }) + } + + pub fn encode(&self, builder: &mut veilid_capnp::operation::Builder) -> Result<(), RPCError> { + builder.set_op_id(self.op_id); + let rt_builder = builder.init_respond_to(); + self.respond_to.encode(&mut rt_builder)?; + let d_builder = builder.init_detail(); + self.detail.encode(&mut d_builder)?; + Ok(()) + } +} + +// let out = match which_reader { +// veilid_capnp::operation::detail::StatusQ(_) => Self { name: "StatusQ", op_id, index: 0, is_q: true, wants_answer: true, respond_to }, +// veilid_capnp::operation::detail::StatusA(_) => Self { name: "StatusA", op_id, index: 1, is_q: false, wants_answer: false, respond_to }, +// veilid_capnp::operation::detail::ValidateDialInfo(_) => Self { name: "ValidateDialInfo", op_id, index: 2, is_q: true, wants_answer: false, respond_to }, +// veilid_capnp::operation::detail::FindNodeQ(_) => Self { name: "FindNodeQ", op_id, index: 3, is_q: true, wants_answer: true, respond_to }, +// veilid_capnp::operation::detail::FindNodeA(_) => Self { name: "FindNodeA", op_id, index: 4, is_q: false, wants_answer: false, respond_to }, +// veilid_capnp::operation::detail::Route(_) => Self { name: "Route", op_id, index: 5, is_q: true, wants_answer: false, respond_to }, +// veilid_capnp::operation::detail::NodeInfoUpdate(_) => Self { name: "NodeInfoUpdate", op_id, index: 6, is_q: true, wants_answer: false, respond_to }, +// veilid_capnp::operation::detail::GetValueQ(_) => Self { name: "GetValueQ", op_id, index: 7, is_q: true, wants_answer: true, respond_to }, +// veilid_capnp::operation::detail::GetValueA(_) => Self { name: "GetValueA", op_id, index: 8, is_q: false, wants_answer: false, respond_to }, +// veilid_capnp::operation::detail::SetValueQ(_) => Self { name: "SetValueQ", op_id, index: 9, is_q: true, wants_answer: true, respond_to }, +// veilid_capnp::operation::detail::SetValueA(_) => Self { name: "SetValueA", op_id, index: 10, is_q: false, wants_answer: false, respond_to}, +// veilid_capnp::operation::detail::WatchValueQ(_) => Self { name: "WatchValueQ", op_id, index: 11, is_q: true, wants_answer: true, respond_to}, +// veilid_capnp::operation::detail::WatchValueA(_) => Self { name: "WatchValueA", op_id, index: 12, is_q: false, wants_answer: false, respond_to}, +// veilid_capnp::operation::detail::ValueChanged(_) => Self { name: "ValueChanged", op_id, index: 13, is_q: true, wants_answer: false, respond_to }, +// veilid_capnp::operation::detail::SupplyBlockQ(_) => Self { name: "SupplyBlockQ", op_id, index: 14, is_q: true, wants_answer: true, respond_to }, +// veilid_capnp::operation::detail::SupplyBlockA(_) => Self { name: "SupplyBlockA", op_id, index: 15, is_q: false, wants_answer: false, respond_to }, +// veilid_capnp::operation::detail::FindBlockQ(_) => Self { name: "FindBlockQ", op_id, index: 16, is_q: true, wants_answer: true, respond_to}, +// veilid_capnp::operation::detail::FindBlockA(_) =>Self { name: "FindBlockA", op_id, index: 17, is_q: false, wants_answer: false, respond_to}, +// veilid_capnp::operation::detail::Signal(_) => Self { name: "Signal", op_id, index: 18, is_q: true, wants_answer: false, respond_to }, +// veilid_capnp::operation::detail::ReturnReceipt(_) => Self { name: "ReturnReceipt", op_id, index: 19, is_q: true, wants_answer: false, respond_to}, +// veilid_capnp::operation::detail::StartTunnelQ(_) => Self { name: "StartTunnelQ", op_id, index: 20, is_q: true, wants_answer: true, respond_to }, +// veilid_capnp::operation::detail::StartTunnelA(_) => Self { name: "StartTunnelA", op_id, index: 21, is_q: false, wants_answer: false, respond_to }, +// veilid_capnp::operation::detail::CompleteTunnelQ(_) =>Self { name: "CompleteTunnelQ", op_id, index: 22, is_q: true, wants_answer: true, respond_to}, +// veilid_capnp::operation::detail::CompleteTunnelA(_) => Self { name: "CompleteTunnelA", op_id, index: 23, is_q: false, wants_answer: false, respond_to}, +// veilid_capnp::operation::detail::CancelTunnelQ(_) => Self { name: "CancelTunnelQ", op_id, index: 24, is_q: true, wants_answer: true, respond_to}, +// veilid_capnp::operation::detail::CancelTunnelA(_) => Self { name: "CancelTunnelA", op_id, index: 25, is_q: false, wants_answer: false, respond_to}, +// }; + +// veilid_capnp::operation::detail::StatusQ(_) => Self { name: "StatusQ", op_id, index: 0, is_q: true, wants_answer: true, respond_to }, +// veilid_capnp::operation::detail::StatusA(_) => Self { name: "StatusA", op_id, index: 1, is_q: false, wants_answer: false, respond_to }, +// veilid_capnp::operation::detail::ValidateDialInfo(_) => Self { name: "ValidateDialInfo", op_id, index: 2, is_q: true, wants_answer: false, respond_to }, +// veilid_capnp::operation::detail::FindNodeQ(_) => Self { name: "FindNodeQ", op_id, index: 3, is_q: true, wants_answer: true, respond_to }, +// veilid_capnp::operation::detail::FindNodeA(_) => Self { name: "FindNodeA", op_id, index: 4, is_q: false, wants_answer: false, respond_to }, +// veilid_capnp::operation::detail::Route(_) => Self { name: "Route", op_id, index: 5, is_q: true, wants_answer: false, respond_to }, +// veilid_capnp::operation::detail::NodeInfoUpdate(_) => Self { name: "NodeInfoUpdate", op_id, index: 6, is_q: true, wants_answer: false, respond_to }, +// veilid_capnp::operation::detail::GetValueQ(_) => Self { name: "GetValueQ", op_id, index: 7, is_q: true, wants_answer: true, respond_to }, +// veilid_capnp::operation::detail::GetValueA(_) => Self { name: "GetValueA", op_id, index: 8, is_q: false, wants_answer: false, respond_to }, +// veilid_capnp::operation::detail::SetValueQ(_) => Self { name: "SetValueQ", op_id, index: 9, is_q: true, wants_answer: true, respond_to }, +// veilid_capnp::operation::detail::SetValueA(_) => Self { name: "SetValueA", op_id, index: 10, is_q: false, wants_answer: false, respond_to}, +// veilid_capnp::operation::detail::WatchValueQ(_) => Self { name: "WatchValueQ", op_id, index: 11, is_q: true, wants_answer: true, respond_to}, +// veilid_capnp::operation::detail::WatchValueA(_) => Self { name: "WatchValueA", op_id, index: 12, is_q: false, wants_answer: false, respond_to}, +// veilid_capnp::operation::detail::ValueChanged(_) => Self { name: "ValueChanged", op_id, index: 13, is_q: true, wants_answer: false, respond_to }, +// veilid_capnp::operation::detail::SupplyBlockQ(_) => Self { name: "SupplyBlockQ", op_id, index: 14, is_q: true, wants_answer: true, respond_to }, +// veilid_capnp::operation::detail::SupplyBlockA(_) => Self { name: "SupplyBlockA", op_id, index: 15, is_q: false, wants_answer: false, respond_to }, +// veilid_capnp::operation::detail::FindBlockQ(_) => Self { name: "FindBlockQ", op_id, index: 16, is_q: true, wants_answer: true, respond_to}, +// veilid_capnp::operation::detail::FindBlockA(_) =>Self { name: "FindBlockA", op_id, index: 17, is_q: false, wants_answer: false, respond_to}, +// veilid_capnp::operation::detail::Signal(_) => Self { name: "Signal", op_id, index: 18, is_q: true, wants_answer: false, respond_to }, +// veilid_capnp::operation::detail::ReturnReceipt(_) => Self { name: "ReturnReceipt", op_id, index: 19, is_q: true, wants_answer: false, respond_to}, +// veilid_capnp::operation::detail::StartTunnelQ(_) => Self { name: "StartTunnelQ", op_id, index: 20, is_q: true, wants_answer: true, respond_to }, +// veilid_capnp::operation::detail::StartTunnelA(_) => Self { name: "StartTunnelA", op_id, index: 21, is_q: false, wants_answer: false, respond_to }, +// veilid_capnp::operation::detail::CompleteTunnelQ(_) =>Self { name: "CompleteTunnelQ", op_id, index: 22, is_q: true, wants_answer: true, respond_to}, +// veilid_capnp::operation::detail::CompleteTunnelA(_) => Self { name: "CompleteTunnelA", op_id, index: 23, is_q: false, wants_answer: false, respond_to}, +// veilid_capnp::operation::detail::CancelTunnelQ(_) => Self { name: "CancelTunnelQ", op_id, index: 24, is_q: true, wants_answer: true, respond_to}, +// veilid_capnp::operation::detail::CancelTunnelA(_) => Self { name: "CancelTunnelA", op_id, index: 25, is_q: false, wants_answer: false, respond_to}, diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_detail.rs b/veilid-core/src/rpc_processor/coders/operations/operation_detail.rs new file mode 100644 index 00000000..55671bfc --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation_detail.rs @@ -0,0 +1,214 @@ +use super::*; +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub enum RPCOperationDetail { + StatusQ(RPCOperationStatusQ), + StatusA(RPCOperationStatusA), + ValidateDialInfo(RPCOperationValidateDialInfo), + FindNodeQ(RPCOperationFindNodeQ), + FindNodeA(RPCOperationFindNodeA), + Route(RPCOperationRoute), + NodeInfoUpdate(RPCOperationNodeInfoUpdate), + GetValueQ(RPCOperationGetValueQ), + GetValueA(RPCOperationGetValueA), + SetValueQ(RPCOperationSetValueQ), + SetValueA(RPCOperationSetValueA), + WatchValueQ(RPCOperationWatchValueQ), + WatchValueA(RPCOperationWatchValueA), + ValueChanged(RPCOperationValueChanged), + SupplyBlockQ(RPCOperationSupplyBlockQ), + SupplyBlockA(RPCOperationSupplyBlockA), + FindBlockQ(RPCOperationFindBlockQ), + FindBlockA(RPCOperationFindBlockA), + Signal(RPCOperationSignal), + ReturnReceipt(RPCOperationReturnReceipt), + StartTunnelQ(RPCOperationStartTunnelQ), + StartTunnelA(RPCOperationStartTunnelA), + CompleteTunnelQ(RPCOperationCompleteTunnelQ), + CompleteTunnelA(RPCOperationCompleteTunnelA), + CancelTunnelQ(CancelTunnelQ), + CancelTunnelA(CancelTunnelA), +} + +impl RPCOperationDetail { + pub fn decode( + reader: &veilid_capnp::operation::detail::Reader, + sender_node_id: &DHTKey, + ) -> Result { + let which_reader = reader.which().map_err(map_error_capnp_notinschema!())?; + let out = match which_reader { + veilid_capnp::operation::detail::StatusQ(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationStatusQ::decode(&op_reader)?; + RPCOperationDetail::StatusQ(out) + } + veilid_capnp::operation::detail::StatusA(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationStatusA::decode(&op_reader)?; + RPCOperationDetail::StatusA(out) + } + veilid_capnp::operation::detail::ValidateDialInfo(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationValidateDialInfo::decode(&op_reader)?; + RPCOperationDetail::ValidateDialInfo(out) + } + veilid_capnp::operation::detail::FindNodeQ(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationFindNodeQ::decode(&op_reader)?; + RPCOperationDetail::FindNodeQ(out) + } + veilid_capnp::operation::detail::FindNodeA(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationFindNodeA::decode(&op_reader)?; + RPCOperationDetail::FindNodeA(out) + } + veilid_capnp::operation::detail::Route(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationRoute::decode(&op_reader)?; + RPCOperationDetail::Route(out) + } + veilid_capnp::operation::detail::NodeInfoUpdate(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationNodeInfoUpdate::decode(&op_reader, sender_node_id)?; + RPCOperationDetail::NodeInfoUpdate(out) + } + veilid_capnp::operation::detail::GetValueQ(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationGetValueQ::decode(&op_reader)?; + RPCOperationDetail::GetValueQ(out) + } + veilid_capnp::operation::detail::GetValueA(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationGetValueA::decode(&op_reader)?; + RPCOperationDetail::GetValueA(out) + } + veilid_capnp::operation::detail::SetValueQ(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationSetValueQ::decode(&op_reader)?; + RPCOperationDetail::SetValueQ(out) + } + veilid_capnp::operation::detail::SetValueA(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationSetValueA::decode(&op_reader)?; + RPCOperationDetail::SetValueA(out) + } + veilid_capnp::operation::detail::WatchValueQ(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationWatchValueQ::decode(&op_reader)?; + RPCOperationDetail::WatchValueQ(out) + } + veilid_capnp::operation::detail::WatchValueA(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationWatchValueA::decode(&op_reader)?; + RPCOperationDetail::WatchValueA(out) + } + veilid_capnp::operation::detail::ValueChanged(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationValueChanged::decode(&op_reader)?; + RPCOperationDetail::ValueChanged(out) + } + veilid_capnp::operation::detail::SupplyBlockQ(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationSupplyBlockQ::decode(&op_reader)?; + RPCOperationDetail::SupplyBlockQ(out) + } + veilid_capnp::operation::detail::SupplyBlockA(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationSupplyBlockA::decode(&op_reader)?; + RPCOperationDetail::SupplyBlockA(out) + } + veilid_capnp::operation::detail::FindBlockQ(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationFindBlockQ::decode(&op_reader)?; + RPCOperationDetail::FindBlockQ(out) + } + veilid_capnp::operation::detail::FindBlockA(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationFindBlockA::decode(&op_reader)?; + RPCOperationDetail::FindBlockA(out) + } + veilid_capnp::operation::detail::Signal(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationSignal::decode(&op_reader)?; + RPCOperationDetail::Signal(out) + } + veilid_capnp::operation::detail::ReturnReceipt(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationReturnReceipt::decode(&op_reader)?; + RPCOperationDetail::ReturnReceipt(out) + } + veilid_capnp::operation::detail::StartTunnelQ(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationStartTunnelQ::decode(&op_reader)?; + RPCOperationDetail::StartTunnelQ(out) + } + veilid_capnp::operation::detail::StartTunnelA(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationStartTunnelA::decode(&op_reader)?; + RPCOperationDetail::StartTunnelA(out) + } + veilid_capnp::operation::detail::CompleteTunnelQ(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationCompleteTunnelQ::decode(&op_reader)?; + RPCOperationDetail::CompleteTunnelQ(out) + } + veilid_capnp::operation::detail::CompleteTunnelA(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationCompleteTunnelA::decode(&op_reader)?; + RPCOperationDetail::CompleteTunnelA(out) + } + veilid_capnp::operation::detail::CancelTunnelQ(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationCancelTunnelQ::decode(&op_reader)?; + RPCOperationDetail::CancelTunnelQ(out) + } + veilid_capnp::operation::detail::CancelTunnelA(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationCancelTunnelA::decode(&op_reader)?; + RPCOperationDetail::CancelTunnelA(out) + } + }; + Ok(out) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation::detail::Builder, + ) -> Result<(), RPCError> { + match self { + RPCOperationDetail::StatusQ(d) => d.encode(&mut builder.init_status_q()), + RPCOperationDetail::StatusA(d) => d.encode(&mut builder.init_status_a()), + RPCOperationDetail::ValidateDialInfo(d) => { + d.encode(&mut builder.init_validate_dial_info()) + } + RPCOperationDetail::FindNodeQ(d) => d.encode(&mut builder.init_find_node_q()), + RPCOperationDetail::FindNodeA(d) => d.encode(&mut builder.init_find_node_a()), + RPCOperationDetail::Route(d) => d.encode(&mut builder.init_route()), + RPCOperationDetail::NodeInfoUpdate(d) => d.encode(&mut builder.init_node_info_update()), + RPCOperationDetail::GetValueQ(d) => d.encode(&mut builder.init_get_value_q()), + RPCOperationDetail::GetValueA(d) => d.encode(&mut builder.init_get_value_a()), + RPCOperationDetail::SetValueQ(d) => d.encode(&mut builder.init_set_value_q()), + RPCOperationDetail::SetValueA(d) => d.encode(&mut builder.init_set_value_a()), + RPCOperationDetail::WatchValueQ(d) => d.encode(&mut builder.init_watch_value_q()), + RPCOperationDetail::WatchValueA(d) => d.encode(&mut builder.init_watch_value_a()), + RPCOperationDetail::ValueChanged(d) => d.encode(&mut builder.init_value_changed()), + RPCOperationDetail::SupplyBlockQ(d) => d.encode(&mut builder.init_supply_block_q()), + RPCOperationDetail::SupplyBlockA(d) => d.encode(&mut builder.init_supply_block_a()), + RPCOperationDetail::FindBlockQ(d) => d.encode(&mut builder.init_find_block_q()), + RPCOperationDetail::FindBlockA(d) => d.encode(&mut builder.init_find_block_a()), + RPCOperationDetail::Signal(d) => d.encode(&mut builder.init_signal()), + RPCOperationDetail::ReturnReceipt(d) => d.encode(&mut builder.init_return_receipt()), + RPCOperationDetail::StartTunnelQ(d) => d.encode(&mut builder.init_start_tunnel_q()), + RPCOperationDetail::StartTunnelA(d) => d.encode(&mut builder.init_start_tunnel_a()), + RPCOperationDetail::CompleteTunnelQ(d) => { + d.encode(&mut builder.init_complete_tunnel_q()) + } + RPCOperationDetail::CompleteTunnelA(d) => { + d.encode(&mut builder.init_complete_tunnel_a()) + } + RPCOperationDetail::CancelTunnelQ(d) => d.encode(&mut builder.init_cancel_tunnel_q()), + RPCOperationDetail::CancelTunnelA(d) => d.encode(&mut builder.init_cancel_tunnel_a()), + } + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs b/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs new file mode 100644 index 00000000..06f30455 --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs @@ -0,0 +1,66 @@ +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub struct RPCOperationFindNodeQ { + node_id: DHTKey, +} + +impl RPCOperationFindNodeQ { + pub fn decode( + reader: &veilid_capnp::operation_find_node_q::Reader, + ) -> Result { + let ni_reader = reader.get_node_id().map_err(map_error_capnp_error!())?; + let node_id = decode_public_key(&ni_reader); + Ok(RPCOperationFindNodeQ { node_id }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_find_node_q::Builder, + ) -> Result<(), RPCError> { + let ni_builder = builder.init_node_id(); + encode_public_key(&self.node_id, &mut ni_builder)?; + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct RPCOperationFindNodeA { + peers: Vec, +} + +impl RPCOperationFindNodeA { + pub fn decode( + reader: &veilid_capnp::operation_find_node_a::Reader, + ) -> Result { + let peers_reader = reader.get_peers().map_err(map_error_capnp_error!())?; + let mut peers = Vec::::with_capacity( + peers_reader + .len() + .try_into() + .map_err(map_error_internal!("too many peers"))?, + ); + for p in peers_reader.iter() { + let peer_info = decode_peer_info(&p, true)?; + peers.push(peer_info); + } + + Ok(RPCOperationFindNodeA { peers }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_find_node_a::Builder, + ) -> Result<(), RPCError> { + let mut peers_builder = builder.init_peers( + self.peers + .len() + .try_into() + .map_err(map_error_internal!("invalid closest nodes list length"))?, + ); + for (i, peer) in self.peers.iter().enumerate() { + let mut pi_builder = peers_builder.reborrow().get(i as u32); + encode_peer_info(peer, &mut pi_builder)?; + } + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs new file mode 100644 index 00000000..e9a62e2b --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs @@ -0,0 +1,67 @@ +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub struct RPCOperationGetValueQ { + key: ValueKey, +} + +impl RPCOperationGetValueQ { + pub fn decode( + reader: &veilid_capnp::operation_get_value_q::Reader, + ) -> Result { + let ni_reader = reader.get_node_id().map_err(map_error_capnp_error!())?; + let node_id = decode_public_key(&ni_reader); + Ok(RPCOperationGetValueQ { node_id }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_get_value_q::Builder, + ) -> Result<(), RPCError> { + let ni_builder = builder.init_node_id(); + encode_public_key(&self.node_id, &mut ni_builder)?; + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub enum RPCOperationGetValueA { + Data(ValueData), + Peers(Vec), +} + +impl RPCOperationGetValueA { + pub fn decode( + reader: &veilid_capnp::operation_get_value_a::Reader, + ) -> Result { + let peers_reader = reader.get_peers().map_err(map_error_capnp_error!())?; + let mut peers = Vec::::with_capacity( + peers_reader + .len() + .try_into() + .map_err(map_error_internal!("too many peers"))?, + ); + for p in peers_reader.iter() { + let peer_info = decode_peer_info(&p, true)?; + peers.push(peer_info); + } + + Ok(RPCOperationGetValueA { peers }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_get_value_a::Builder, + ) -> Result<(), RPCError> { + let mut peers_builder = builder.init_peers( + self.peers + .len() + .try_into() + .map_err(map_error_internal!("invalid closest nodes list length"))?, + ); + for (i, peer) in self.peers.iter().enumerate() { + let mut pi_builder = peers_builder.reborrow().get(i as u32); + encode_peer_info(peer, &mut pi_builder)?; + } + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_node_info_update.rs b/veilid-core/src/rpc_processor/coders/operations/operation_node_info_update.rs new file mode 100644 index 00000000..fc9d954d --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation_node_info_update.rs @@ -0,0 +1,29 @@ +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub struct RPCOperationNodeInfoUpdate { + signed_node_info: SignedNodeInfo, +} + +impl RPCOperationNodeInfoUpdate { + pub fn decode( + reader: &veilid_capnp::operation_node_info_update::Reader, + sender_node_id: &DHTKey, + ) -> Result { + let sni_reader = reader + .get_signed_node_info() + .map_err(map_error_capnp_error!())?; + let signed_node_info = decode_signed_node_info(&sni_reader, sender_node_id, true)?; + + Ok(RPCOperationNodeInfoUpdate { signed_node_info }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_node_info_update::Builder, + ) -> Result<(), RPCError> { + let sni_builder = builder.init_signed_node_info(); + encode_signed_node_info(&self.signed_node_info, &mut sni_builder)?; + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_route.rs b/veilid-core/src/rpc_processor/coders/operations/operation_route.rs new file mode 100644 index 00000000..33de91a6 --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation_route.rs @@ -0,0 +1,96 @@ +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +struct RoutedOperation { + signatures: Vec, + nonce: Nonce, + data: Vec, +} + +impl RoutedOperation { + pub fn decode( + reader: &veilid_capnp::routed_operation::Reader, + ) -> Result { + let sigs_reader = reader.get_signatures().map_err(map_error_capnp_error!())?; + let mut signatures = Vec::::with_capacity( + sigs_reader + .len() + .try_into() + .map_err(map_error_internal!("too many signatures"))?, + ); + for s in sigs_reader.iter() { + let sig = decode_signature(&s); + signatures.push(sig); + } + + let n_reader = reader.get_nonce().map_err(map_error_capnp_error!())?; + let nonce = decode_nonce(&n_reader); + let data = reader + .get_data() + .map_err(map_error_capnp_error!())? + .to_vec(); + + Ok(RoutedOperation { + signatures, + nonce, + data, + }) + } + + pub fn encode( + &self, + builder: &mut veilid_capnp::routed_operation::Builder, + ) -> Result<(), RPCError> { + let mut sigs_builder = builder.init_signatures( + self.signatures + .len() + .try_into() + .map_err(map_error_internal!("invalid signatures list length"))?, + ); + for (i, sig) in self.signatures.iter().enumerate() { + let mut sig_builder = sigs_builder.reborrow().get(i as u32); + encode_signature(sig, &mut sig_builder); + } + let n_builder = builder.init_nonce(); + encode_nonce(&self.nonce, &mut n_builder); + builder.set_data(&self.data); + + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct RPCOperationRoute { + safety_route: SafetyRoute, + operation: RoutedOperation, +} + +impl RPCOperationRoute { + pub fn decode( + reader: &veilid_capnp::operation_route::Reader, + ) -> Result { + let sr_reader = reader + .get_safety_route() + .map_err(map_error_capnp_error!())?; + let safety_route = decode_safety_route(&sr_reader)?; + + let o_reader = reader.get_operation().map_err(map_error_capnp_error!())?; + let operation = RoutedOperation::decode(&o_reader)?; + + Ok(RPCOperationRoute { + safety_route, + operation, + }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_route::Builder, + ) -> Result<(), RPCError> { + let sr_builder = builder.init_safety_route(); + encode_safety_route(&self.safety_route, &mut sr_builder)?; + let o_builder = builder.init_operation(); + self.operation.encode(&mut o_builder)?; + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_status.rs b/veilid-core/src/rpc_processor/coders/operations/operation_status.rs new file mode 100644 index 00000000..09bf7a7a --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation_status.rs @@ -0,0 +1,60 @@ +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub struct RPCOperationStatusQ { + node_status: NodeStatus, +} + +impl RPCOperationStatusQ { + pub fn decode( + reader: &veilid_capnp::operation_status_q::Reader, + ) -> Result { + let ns_reader = reader.get_node_status().map_err(map_error_capnp_error!())?; + let node_status = decode_node_status(&ns_reader)?; + Ok(RPCOperationStatusQ { node_status }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_status_q::Builder, + ) -> Result<(), RPCError> { + let ns_builder = builder.init_node_status(); + encode_node_status(&self.node_status, &mut ns_builder)?; + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct RPCOperationStatusA { + node_status: NodeStatus, + sender_info: SenderInfo, +} + +impl RPCOperationStatusA { + pub fn decode( + reader: &veilid_capnp::operation_status_a::Reader, + ) -> Result { + let ns_reader = reader.get_node_status().map_err(map_error_capnp_error!())?; + let node_status = decode_node_status(&ns_reader)?; + + let si_reader = reader + .get_sender_info() + .map_err(map_error_capnp_notinschema!())?; + let sender_info = decode_sender_info(&si_reader)?; + + Ok(RPCOperationStatusA { + node_status, + sender_info, + }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_status_a::Builder, + ) -> Result<(), RPCError> { + let ns_builder = builder.init_node_status(); + encode_node_status(&self.node_status, &mut ns_builder)?; + let si_builder = builder.init_sender_info(); + encode_sender_info(&self.sender_info, &mut si_builder)?; + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_validate_dial_info.rs b/veilid-core/src/rpc_processor/coders/operations/operation_validate_dial_info.rs new file mode 100644 index 00000000..6a6e8456 --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation_validate_dial_info.rs @@ -0,0 +1,37 @@ +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub struct RPCOperationValidateDialInfo { + dial_info: DialInfo, + receipt: Vec, + redirect: bool, +} + +impl RPCOperationValidateDialInfo { + pub fn decode( + reader: &veilid_capnp::operation_validate_dial_info::Reader, + ) -> Result { + let di_reader = reader.get_dial_info().map_err(map_error_capnp_error!())?; + let dial_info = decode_dial_info(&di_reader)?; + let rcpt_reader = reader.get_receipt().map_err(map_error_capnp_error!())?; + let receipt = rcpt_reader.to_vec(); + let redirect = reader.get_redirect(); + + Ok(RPCOperationValidateDialInfo { + dial_info, + receipt, + redirect, + }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_validate_dial_info::Builder, + ) -> Result<(), RPCError> { + let di_builder = builder.init_dial_info(); + encode_dial_info(&self.dial_info, &mut di_builder)?; + builder.set_receipt(&self.receipt); + builder.set_redirect(self.redirect); + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/respond_to.rs b/veilid-core/src/rpc_processor/coders/operations/respond_to.rs new file mode 100644 index 00000000..308487a6 --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/respond_to.rs @@ -0,0 +1,62 @@ +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub enum RespondTo { + None, + Sender(Option), + PrivateRoute(PrivateRoute), +} + +impl RespondTo { + pub fn encode( + &self, + builder: &mut veilid_capnp::operation::respond_to::Builder, + ) -> Result<(), RPCError> { + match self { + Self::None => { + builder.set_none(()); + } + Self::Sender(Some(sni)) => { + let mut sni_builder = builder.reborrow().init_sender_with_info(); + encode_signed_node_info(sni, &mut sni_builder)?; + } + Self::Sender(None) => { + builder.reborrow().set_sender(()); + } + Self::PrivateRoute(pr) => { + let mut pr_builder = builder.reborrow().init_private_route(); + encode_private_route(pr, &mut pr_builder)?; + } + }; + Ok(()) + } + + pub fn decode( + reader: &veilid_capnp::operation::respond_to::Reader, + sender_node_id: &DHTKey, + ) -> Result { + let respond_to = match reader.which().map_err(map_error_capnp_notinschema!())? { + veilid_capnp::operation::respond_to::None(_) => RespondTo::None, + veilid_capnp::operation::respond_to::Sender(_) => RespondTo::Sender(None), + veilid_capnp::operation::respond_to::SenderWithInfo(Ok(sender_ni_reader)) => { + let sni = decode_signed_node_info(&sender_ni_reader, sender_node_id, true)?; + RespondTo::Sender(Some(sni)) + } + veilid_capnp::operation::respond_to::SenderWithInfo(Err(e)) => { + return Err(rpc_error_protocol(format!( + "invalid signed node info: {}", + e + ))) + } + veilid_capnp::operation::respond_to::PrivateRoute(Ok(pr_reader)) => { + let pr = decode_private_route(&pr_reader)?; + RespondTo::PrivateRoute(pr) + } + veilid_capnp::operation::respond_to::PrivateRoute(Err(e)) => { + return Err(rpc_error_protocol(format!("invalid private route: {}", e))); + } + }; + Ok(respond_to) + } +} diff --git a/veilid-core/src/rpc_processor/coders/value_data.rs b/veilid-core/src/rpc_processor/coders/value_data.rs new file mode 100644 index 00000000..759cb7c3 --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/value_data.rs @@ -0,0 +1,20 @@ +use crate::*; +use rpc_processor::*; + +pub fn encode_value_data( + value_data: &ValueData, + builder: &mut veilid_capnp::value_data::Builder, +) -> Result<(), RPCError> { + builder.set_data(&value_data.data); + builder.set_seq(value_data.seq); + Ok(()) +} + +pub fn decode_value_data(reader: &veilid_capnp::value_data::Reader) -> Result { + let data = reader + .get_data() + .map_err(map_error_capnp_error!())? + .to_vec(); + let seq = reader.get_seq(); + Ok(ValueData { data, seq }) +} diff --git a/veilid-core/src/rpc_processor/coders/value_key.rs b/veilid-core/src/rpc_processor/coders/value_key.rs new file mode 100644 index 00000000..bd56afe6 --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/value_key.rs @@ -0,0 +1,26 @@ +use crate::*; +use rpc_processor::*; + +pub fn encode_value_key( + value_key: &ValueKey, + builder: &mut veilid_capnp::value_key::Builder, +) -> Result<(), RPCError> { + let pk_builder = builder.init_public_key(); + encode_public_key(&value_key.key, &mut pk_builder)?; + if let Some(subkey) = value_key.subkey { + builder.set_subkey(&subkey); + } + Ok(()) +} + +pub fn decode_value_key(reader: &veilid_capnp::value_key::Reader) -> Result { + let pk_reader = reader.get_public_key().map_err(map_error_capnp_error!())?; + let key = decode_public_key(&pk_reader); + let subkey = if !reader.has_subkey() { + None + } else { + let subkey = reader.get_subkey().map_err(map_error_capnp_error!())?; + Some(subkey.to_owned()) + }; + Ok(ValueKey { key, subkey }) +} diff --git a/veilid-core/src/rpc_processor/debug.rs b/veilid-core/src/rpc_processor/debug.rs index 29c4f637..fb433950 100644 --- a/veilid-core/src/rpc_processor/debug.rs +++ b/veilid-core/src/rpc_processor/debug.rs @@ -108,12 +108,17 @@ impl RPCProcessor { format!( "#{} {}", op_id, - self.get_rpc_operation_detail_debug_info(&detail) + Self::get_rpc_operation_detail_debug_info(&detail) ) } - pub(super) fn get_rpc_operation_detail_debug_info( - &self, + struct RpcOperationDetailInfo { + name: &'static str, + index: u32, + is_q: bool, + } + + pub(super) fn get_rpc_operation_detail_info( detail: &veilid_capnp::operation::detail::WhichReader, ) -> String { match detail { @@ -145,4 +150,35 @@ impl RPCProcessor { veilid_capnp::operation::detail::CancelTunnelA(_) => "CancelTunnelA".to_owned(), } } + + pub(super) fn get_rpc_operation_detail_d( + let (which, is_q) = match which_reader + { + veilid_capnp::operation::detail::StatusQ(_) => (0u32, true), + veilid_capnp::operation::detail::StatusA(_) => (1u32, false), + veilid_capnp::operation::detail::ValidateDialInfo(_) => (2u32, true), + veilid_capnp::operation::detail::FindNodeQ(_) => (3u32, true), + veilid_capnp::operation::detail::FindNodeA(_) => (4u32, false), + veilid_capnp::operation::detail::Route(_) => (5u32, true), + veilid_capnp::operation::detail::NodeInfoUpdate(_) => (6u32, true), + veilid_capnp::operation::detail::GetValueQ(_) => (7u32, true), + veilid_capnp::operation::detail::GetValueA(_) => (8u32, false), + veilid_capnp::operation::detail::SetValueQ(_) => (9u32, true), + veilid_capnp::operation::detail::SetValueA(_) => (10u32, false), + veilid_capnp::operation::detail::WatchValueQ(_) => (11u32, true), + veilid_capnp::operation::detail::WatchValueA(_) => (12u32, false), + veilid_capnp::operation::detail::ValueChanged(_) => (13u32, true), + veilid_capnp::operation::detail::SupplyBlockQ(_) => (14u32, true), + veilid_capnp::operation::detail::SupplyBlockA(_) => (15u32, false), + veilid_capnp::operation::detail::FindBlockQ(_) => (16u32, true), + veilid_capnp::operation::detail::FindBlockA(_) => (17u32, false), + veilid_capnp::operation::detail::Signal(_) => (18u32, true), + veilid_capnp::operation::detail::ReturnReceipt(_) => (19u32, true), + veilid_capnp::operation::detail::StartTunnelQ(_) => (20u32, true), + veilid_capnp::operation::detail::StartTunnelA(_) => (21u32, false), + veilid_capnp::operation::detail::CompleteTunnelQ(_) => (22u32, true), + veilid_capnp::operation::detail::CompleteTunnelA(_) => (23u32, false), + veilid_capnp::operation::detail::CancelTunnelQ(_) => (24u32, true), + veilid_capnp::operation::detail::CancelTunnelA(_) => (25u32, false), + }; } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 8ac1af56..75b3e04d 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -43,38 +43,6 @@ impl fmt::Display for Destination { } } -#[derive(Debug, Clone)] -pub enum RespondTo { - None, - Sender(Option), - PrivateRoute(PrivateRoute), -} - -impl RespondTo { - pub fn encode( - &self, - builder: &mut veilid_capnp::operation::respond_to::Builder, - ) -> Result<(), RPCError> { - match self { - Self::None => { - builder.set_none(()); - } - Self::Sender(Some(sni)) => { - let mut sni_builder = builder.reborrow().init_sender_with_info(); - encode_signed_node_info(sni, &mut sni_builder)?; - } - Self::Sender(None) => { - builder.reborrow().set_sender(()); - } - Self::PrivateRoute(pr) => { - let mut pr_builder = builder.reborrow().init_private_route(); - encode_private_route(pr, &mut pr_builder)?; - } - }; - Ok(()) - } -} - #[derive(Debug, Clone)] struct RPCMessageHeader { timestamp: u64, // time the message was received, not sent @@ -142,6 +110,56 @@ struct WaitableReply { send_data_kind: SendDataKind, } +struct RPCOperationInfo { + name: &'static str, + op_id: u64, + index: u32, + is_q: bool, + wants_answer: bool, + respond_to: RespondTo +} + +impl RPCOperationInfo { + pub fn parse(operation_reader: &veilid_capnp::operation::Reader, sender_node_id: &DHTKey) -> Result { + let which_reader = operation_reader.get_detail().which().expect("missing which operation"); + let op_id = operation_reader.get_op_id(); + + let respond_to_reader = operation_reader.get_respond_to(); + let respond_to = RespondTo::decode(&respond_to_reader, sender_node_id)?; + + let out = match which_reader { + veilid_capnp::operation::detail::StatusQ(_) => Self { name: "StatusQ", op_id, index: 0, is_q: true, wants_answer: true, respond_to }, + veilid_capnp::operation::detail::StatusA(_) => Self { name: "StatusA", op_id, index: 1, is_q: false, wants_answer: false, respond_to}, + veilid_capnp::operation::detail::ValidateDialInfo(_) => Self { name: "ValidateDialInfo", op_id, index: 2, is_q: true, wants_answer: false, respond_to }, + veilid_capnp::operation::detail::FindNodeQ(_) => Self { name: "FindNodeQ", op_id, index: 3, is_q: true, wants_answer: true, respond_to }, + veilid_capnp::operation::detail::FindNodeA(_) => Self { name: "FindNodeA", op_id, index: 4, is_q: false, wants_answer: false, respond_to }, + veilid_capnp::operation::detail::Route(_) => Self { name: "Route", op_id, index: 5, is_q: true, wants_answer: false, respond_to }, + veilid_capnp::operation::detail::NodeInfoUpdate(_) => Self { name: "NodeInfoUpdate", op_id, index: 6, is_q: true, wants_answer: false, respond_to }, + veilid_capnp::operation::detail::GetValueQ(_) => Self { name: "GetValueQ", op_id, index: 7, is_q: true, wants_answer: true, respond_to }, + veilid_capnp::operation::detail::GetValueA(_) => Self { name: "GetValueA", op_id, index: 8, is_q: false, wants_answer: false, respond_to }, + veilid_capnp::operation::detail::SetValueQ(_) => Self { name: "SetValueQ", op_id, index: 9, is_q: true, wants_answer: true, respond_to }, + veilid_capnp::operation::detail::SetValueA(_) => Self { name: "SetValueA", op_id, index: 10, is_q: false, wants_answer: false, respond_to}, + veilid_capnp::operation::detail::WatchValueQ(_) => Self { name: "WatchValueQ", op_id, index: 11, is_q: true, wants_answer: true, respond_to}, + veilid_capnp::operation::detail::WatchValueA(_) => Self { name: "WatchValueA", op_id, index: 12, is_q: false, wants_answer: false, respond_to}, + veilid_capnp::operation::detail::ValueChanged(_) => Self { name: "ValueChanged", op_id, index: 13, is_q: true, wants_answer: false, respond_to }, + veilid_capnp::operation::detail::SupplyBlockQ(_) => Self { name: "SupplyBlockQ", op_id, index: 14, is_q: true, wants_answer: true, respond_to }, + veilid_capnp::operation::detail::SupplyBlockA(_) => Self { name: "SupplyBlockA", op_id, index: 15, is_q: false, wants_answer: false, respond_to }, + veilid_capnp::operation::detail::FindBlockQ(_) => Self { name: "FindBlockQ", op_id, index: 16, is_q: true, wants_answer: true, respond_to}, + veilid_capnp::operation::detail::FindBlockA(_) =>Self { name: "FindBlockA", op_id, index: 17, is_q: false, wants_answer: false, respond_to}, + veilid_capnp::operation::detail::Signal(_) => Self { name: "Signal", op_id, index: 18, is_q: true, wants_answer: false, respond_to }, + veilid_capnp::operation::detail::ReturnReceipt(_) => Self { name: "ReturnReceipt", op_id, index: 19, is_q: true, wants_answer: false, respond_to}, + veilid_capnp::operation::detail::StartTunnelQ(_) => Self { name: "StartTunnelQ", op_id, index: 20, is_q: true, wants_answer: true, respond_to }, + veilid_capnp::operation::detail::StartTunnelA(_) => Self { name: "StartTunnelA", op_id, index: 21, is_q: false, wants_answer: false, respond_to }, + veilid_capnp::operation::detail::CompleteTunnelQ(_) =>Self { name: "CompleteTunnelQ", op_id, index: 22, is_q: true, wants_answer: true, respond_to}, + veilid_capnp::operation::detail::CompleteTunnelA(_) => Self { name: "CompleteTunnelA", op_id, index: 23, is_q: false, wants_answer: false, respond_to}, + veilid_capnp::operation::detail::CancelTunnelQ(_) => Self { name: "CancelTunnelQ", op_id, index: 24, is_q: true, wants_answer: true, respond_to}, + veilid_capnp::operation::detail::CancelTunnelA(_) => Self { name: "CancelTunnelA", op_id, index: 25, is_q: false, wants_answer: false, respond_to}, + }; + + Ok(out) + } +} + ///////////////////////////////////////////////////////////////////// #[derive(Clone, Debug, Default)] @@ -403,6 +421,7 @@ impl RPCProcessor { } // Issue a request over the network, possibly using an anonymized route + #[instrument(level = "debug", skip(self, message, safety_route_spec), err)] async fn request( &self, dest: Destination, @@ -410,11 +429,15 @@ impl RPCProcessor { safety_route_spec: Option<&SafetyRouteSpec>, ) -> Result, RPCError> { - let (op_id, wants_answer) = { + let info = { let operation = message .get_root::() .map_err(map_error_internal!("invalid operation")) .map_err(logthru_rpc!(error))?; + RPCOperationInfo::parse() + } + let (op_id, wants_answer) = { + let op_id = operation.get_op_id(); let wants_answer = self.wants_answer(&operation).map_err(logthru_rpc!())?; @@ -549,9 +572,12 @@ impl RPCProcessor { None }; - // send question + // Log rpc receive + debug!(target: "rpc_message", dir = "send", is_q, kind = Self::get_rpc_operation_detail_debug_info(&which_reader), op_id = operation.get_op_id(), sender_id = msg.header.envelope.get_sender_id().encode()); + log_rpc!(debug "==>> REQUEST({}) -> {:?}", self.get_rpc_message_debug_info(&message), dest); + // send question let bytes = out.len() as u64; let send_ts = intf::get_timestamp(); let send_data_kind = match self @@ -597,6 +623,7 @@ impl RPCProcessor { // Issue a reply over the network, possibly using an anonymized route // The request must want a response, or this routine fails + #[instrument(level = "debug", skip(self, request_rpcreader, reply_msg, safety_route_spec), err)] async fn reply( &self, request_rpcreader: RPCMessageReader, @@ -761,44 +788,9 @@ impl RPCProcessor { Ok(()) } - fn wants_answer(&self, operation: &veilid_capnp::operation::Reader) -> Result { - match operation - .get_respond_to() - .which() - .map_err(map_error_capnp_notinschema!())? - { - veilid_capnp::operation::respond_to::None(_) => Ok(false), - veilid_capnp::operation::respond_to::Sender(_) - | veilid_capnp::operation::respond_to::SenderWithInfo(_) - | veilid_capnp::operation::respond_to::PrivateRoute(_) => Ok(true), - } - } - - fn get_respond_to_sender_signed_node_info( - &self, - operation: &veilid_capnp::operation::Reader, - sender_node_id: &DHTKey, - ) -> Result, RPCError> { - match operation - .get_respond_to() - .which() - .map_err(map_error_capnp_notinschema!())? - { - veilid_capnp::operation::respond_to::SenderWithInfo(Ok(sender_ni_reader)) => Ok(Some( - decode_signed_node_info(&sender_ni_reader, sender_node_id, true)?, - )), - veilid_capnp::operation::respond_to::SenderWithInfo(Err(e)) => Err(rpc_error_protocol( - format!("invalid sender_with_info signed node info: {}", e), - )), - veilid_capnp::operation::respond_to::None(_) - | veilid_capnp::operation::respond_to::Sender(_) - | veilid_capnp::operation::respond_to::PrivateRoute(_) => Ok(None), - } - } - ////////////////////////////////////////////////////////////////////// - async fn generate_sender_info(&self, peer_noderef: NodeRef) -> SenderInfo { + async fn generate_sender_info(peer_noderef: NodeRef) -> SenderInfo { let socket_address = peer_noderef .last_connection() .await @@ -808,7 +800,7 @@ impl RPCProcessor { async fn process_status_q(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> { let peer_noderef = rpcreader.header.peer_noderef.clone(); - let sender_info = self.generate_sender_info(peer_noderef).await; + let sender_info = Self::generate_sender_info(peer_noderef).await; let reply_msg = { let operation = rpcreader @@ -818,19 +810,20 @@ impl RPCProcessor { .map_err(logthru_rpc!())?; // Don't bother unless we are going to answer - if !self.wants_answer(&operation)? { + if !Self::wants_answer(&operation)? { return Ok(()); } // get StatusQ reader - let iq_reader = match operation.get_detail().which() { - Ok(veilid_capnp::operation::detail::Which::StatusQ(Ok(x))) => x, + let which_reader = operation.get_detail().which().expect("missing which operation"); + let statusq_reader = match which_reader { + veilid_capnp::operation::detail::Which::StatusQ(Ok(x)) => x, _ => panic!("invalid operation type in process_status_q"), }; // Parse out fields let node_status = decode_node_status( - &iq_reader + &statusq_reader .get_node_status() .map_err(map_error_internal!("no valid node status"))?, )?; @@ -1253,11 +1246,12 @@ impl RPCProcessor { .get_root::() .map_err(map_error_capnp_error!()) .map_err(logthru_rpc!())?; - - let (which, is_q) = match operation + let which_reader = operation .get_detail() .which() - .map_err(map_error_capnp_notinschema!())? + .map_err(map_error_capnp_notinschema!())?; + + let (which, is_q) = match which_reader { veilid_capnp::operation::detail::StatusQ(_) => (0u32, true), veilid_capnp::operation::detail::StatusA(_) => (1u32, false), @@ -1287,11 +1281,8 @@ impl RPCProcessor { veilid_capnp::operation::detail::CancelTunnelA(_) => (25u32, false), }; - log_rpc!(debug "<<== {}({}) <- {:?}", - if is_q { "REQUEST" } else { "REPLY" }, - self.get_rpc_message_debug_info(&reader), - msg.header.envelope.get_sender_id() - ); + // Log rpc receive + debug!(target: "rpc_message", dir = "recv", is_q, kind = Self::get_rpc_operation_detail_debug_info(&which_reader), op_id = operation.get_op_id(), sender_id = msg.header.envelope.get_sender_id().encode()); // Accounting for questions we receive if is_q { @@ -1384,6 +1375,7 @@ impl RPCProcessor { } } + #[instrument(level = "debug", skip_all, err)] pub async fn startup(&self) -> Result<(), String> { trace!("startup rpc processor"); let mut inner = self.inner.lock(); @@ -1430,6 +1422,7 @@ impl RPCProcessor { Ok(()) } + #[instrument(level = "debug", skip_all)] pub async fn shutdown(&self) { debug!("starting rpc processor shutdown"); diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 5196eb33..0679a15c 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -281,6 +281,24 @@ impl ValueKey { } } +#[derive(Clone, Debug, Default, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize)] +pub struct ValueData { + pub data: Vec, + pub seq: u32, +} +impl ValueData { + pub fn new(data: Vec) -> Self { + Self { data, seq: 0 } + } + pub fn new_with_seq(data: Vec, seq: u32) -> Self { + Self { data, seq } + } + pub fn change(&mut self, data: Vec) { + self.data = data; + self.seq += 1; + } +} + #[derive(Clone, Debug, Default, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize)] pub struct BlockId { pub key: DHTKey,