diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index 509bd3fa..8dd9db50 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -390,12 +390,12 @@ impl RoutingTable { } #[instrument(level = "trace", skip(self), ret, err)] - pub fn register_find_node_answer(&self, fna: FindNodeAnswer) -> Result, String> { + pub fn register_find_node_answer(&self, peers: Vec) -> Result, String> { let node_id = self.node_id(); // register nodes we'd found - let mut out = Vec::::with_capacity(fna.peers.len()); - for p in fna.peers { + let mut out = Vec::::with_capacity(peers.len()); + for p in peers { // if our own node if is in the list then ignore it, as we don't add ourselves to our own routing table if p.node_id.key == node_id { continue; @@ -436,7 +436,7 @@ impl RoutingTable { .map_err(logthru_rtab!())?; // register nodes we'd found - self.register_find_node_answer(res) + self.register_find_node_answer(res.answer) } #[instrument(level = "trace", skip(self), ret, err)] diff --git a/veilid-core/src/rpc_processor/coders/operations/answer.rs b/veilid-core/src/rpc_processor/coders/operations/answer.rs index eb727600..b179d2ee 100644 --- a/veilid-core/src/rpc_processor/coders/operations/answer.rs +++ b/veilid-core/src/rpc_processor/coders/operations/answer.rs @@ -67,52 +67,52 @@ impl RPCAnswerDetail { let which_reader = reader.which().map_err(map_error_capnp_notinschema!())?; let out = match which_reader { veilid_capnp::answer::detail::StatusA(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationStatusA::decode(&op_reader)?; RPCAnswerDetail::StatusA(out) } veilid_capnp::answer::detail::FindNodeA(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationFindNodeA::decode(&op_reader)?; RPCAnswerDetail::FindNodeA(out) } veilid_capnp::answer::detail::GetValueA(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationGetValueA::decode(&op_reader)?; RPCAnswerDetail::GetValueA(out) } veilid_capnp::answer::detail::SetValueA(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationSetValueA::decode(&op_reader)?; RPCAnswerDetail::SetValueA(out) } veilid_capnp::answer::detail::WatchValueA(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationWatchValueA::decode(&op_reader)?; RPCAnswerDetail::WatchValueA(out) } veilid_capnp::answer::detail::SupplyBlockA(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationSupplyBlockA::decode(&op_reader)?; RPCAnswerDetail::SupplyBlockA(out) } veilid_capnp::answer::detail::FindBlockA(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationFindBlockA::decode(&op_reader)?; RPCAnswerDetail::FindBlockA(out) } veilid_capnp::answer::detail::StartTunnelA(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationStartTunnelA::decode(&op_reader)?; RPCAnswerDetail::StartTunnelA(out) } veilid_capnp::answer::detail::CompleteTunnelA(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationCompleteTunnelA::decode(&op_reader)?; RPCAnswerDetail::CompleteTunnelA(out) } veilid_capnp::answer::detail::CancelTunnelA(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationCancelTunnelA::decode(&op_reader)?; RPCAnswerDetail::CancelTunnelA(out) } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation.rs b/veilid-core/src/rpc_processor/coders/operations/operation.rs index 06a7eae9..75ac2260 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation.rs @@ -26,17 +26,17 @@ impl RPCOperationKind { .map_err(map_error_capnp_notinschema!())?; let out = match which_reader { veilid_capnp::operation::kind::Which::Question(r) => { - let q_reader = r.map_err(map_error_capnp_notinschema!())?; + let q_reader = r.map_err(map_error_capnp_error!())?; let out = RPCQuestion::decode(&q_reader, sender_node_id)?; RPCOperationKind::Question(out) } veilid_capnp::operation::kind::Which::Statement(r) => { - let q_reader = r.map_err(map_error_capnp_notinschema!())?; + let q_reader = r.map_err(map_error_capnp_error!())?; let out = RPCStatement::decode(&q_reader, sender_node_id)?; RPCOperationKind::Statement(out) } veilid_capnp::operation::kind::Which::Answer(r) => { - let q_reader = r.map_err(map_error_capnp_notinschema!())?; + let q_reader = r.map_err(map_error_capnp_error!())?; let out = RPCAnswer::decode(&q_reader)?; RPCOperationKind::Answer(out) } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_status.rs b/veilid-core/src/rpc_processor/coders/operations/operation_status.rs index 7599484e..c7106cad 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_status.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_status.rs @@ -37,9 +37,7 @@ impl RPCOperationStatusA { let ns_reader = reader.get_node_status().map_err(map_error_capnp_error!())?; let node_status = decode_node_status(&ns_reader)?; - let si_reader = reader - .get_sender_info() - .map_err(map_error_capnp_notinschema!())?; + let si_reader = reader.get_sender_info().map_err(map_error_capnp_error!())?; let sender_info = decode_sender_info(&si_reader)?; Ok(RPCOperationStatusA { diff --git a/veilid-core/src/rpc_processor/coders/operations/question.rs b/veilid-core/src/rpc_processor/coders/operations/question.rs index 165c822b..b8b09c1f 100644 --- a/veilid-core/src/rpc_processor/coders/operations/question.rs +++ b/veilid-core/src/rpc_processor/coders/operations/question.rs @@ -80,52 +80,52 @@ impl RPCQuestionDetail { let which_reader = reader.which().map_err(map_error_capnp_notinschema!())?; let out = match which_reader { veilid_capnp::question::detail::StatusQ(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationStatusQ::decode(&op_reader)?; RPCQuestionDetail::StatusQ(out) } veilid_capnp::question::detail::FindNodeQ(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationFindNodeQ::decode(&op_reader)?; RPCQuestionDetail::FindNodeQ(out) } veilid_capnp::question::detail::GetValueQ(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationGetValueQ::decode(&op_reader)?; RPCQuestionDetail::GetValueQ(out) } veilid_capnp::question::detail::SetValueQ(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationSetValueQ::decode(&op_reader)?; RPCQuestionDetail::SetValueQ(out) } veilid_capnp::question::detail::WatchValueQ(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationWatchValueQ::decode(&op_reader)?; RPCQuestionDetail::WatchValueQ(out) } veilid_capnp::question::detail::SupplyBlockQ(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationSupplyBlockQ::decode(&op_reader)?; RPCQuestionDetail::SupplyBlockQ(out) } veilid_capnp::question::detail::FindBlockQ(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationFindBlockQ::decode(&op_reader)?; RPCQuestionDetail::FindBlockQ(out) } veilid_capnp::question::detail::StartTunnelQ(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationStartTunnelQ::decode(&op_reader)?; RPCQuestionDetail::StartTunnelQ(out) } veilid_capnp::question::detail::CompleteTunnelQ(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationCompleteTunnelQ::decode(&op_reader)?; RPCQuestionDetail::CompleteTunnelQ(out) } veilid_capnp::question::detail::CancelTunnelQ(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationCancelTunnelQ::decode(&op_reader)?; RPCQuestionDetail::CancelTunnelQ(out) } diff --git a/veilid-core/src/rpc_processor/coders/operations/statement.rs b/veilid-core/src/rpc_processor/coders/operations/statement.rs index 9dfffa83..a33f84a6 100644 --- a/veilid-core/src/rpc_processor/coders/operations/statement.rs +++ b/veilid-core/src/rpc_processor/coders/operations/statement.rs @@ -14,7 +14,7 @@ impl RPCStatement { pub fn detail(&self) -> &RPCStatementDetail { &self.detail } - pub fn into_detail(self) -> RPCQuestionDetail { + pub fn into_detail(self) -> RPCStatementDetail { self.detail } pub fn desc(&self) -> &'static str { @@ -62,32 +62,32 @@ impl RPCStatementDetail { let which_reader = reader.which().map_err(map_error_capnp_notinschema!())?; let out = match which_reader { veilid_capnp::statement::detail::ValidateDialInfo(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationValidateDialInfo::decode(&op_reader)?; RPCStatementDetail::ValidateDialInfo(out) } veilid_capnp::statement::detail::Route(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationRoute::decode(&op_reader)?; RPCStatementDetail::Route(out) } veilid_capnp::statement::detail::NodeInfoUpdate(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationNodeInfoUpdate::decode(&op_reader, sender_node_id)?; RPCStatementDetail::NodeInfoUpdate(out) } veilid_capnp::statement::detail::ValueChanged(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationValueChanged::decode(&op_reader)?; RPCStatementDetail::ValueChanged(out) } veilid_capnp::statement::detail::Signal(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationSignal::decode(&op_reader)?; RPCStatementDetail::Signal(out) } veilid_capnp::statement::detail::ReturnReceipt(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let op_reader = r.map_err(map_error_capnp_error!())?; let out = RPCOperationReturnReceipt::decode(&op_reader)?; RPCStatementDetail::ReturnReceipt(out) } diff --git a/veilid-core/src/rpc_processor/coders/private_safety_route.rs b/veilid-core/src/rpc_processor/coders/private_safety_route.rs index e2f36e03..4de50cc5 100644 --- a/veilid-core/src/rpc_processor/coders/private_safety_route.rs +++ b/veilid-core/src/rpc_processor/coders/private_safety_route.rs @@ -21,6 +21,16 @@ pub struct PrivateRoute { pub hops: Option, } +impl PrivateRoute { + pub fn new_stub(public_key: DHTKey) -> Self { + Self { + public_key, + hop_count: 0, + hops: None, + } + } +} + impl fmt::Display for PrivateRoute { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( diff --git a/veilid-core/src/rpc_processor/coders/tunnel.rs b/veilid-core/src/rpc_processor/coders/tunnel.rs index 90beef94..4991e46a 100644 --- a/veilid-core/src/rpc_processor/coders/tunnel.rs +++ b/veilid-core/src/rpc_processor/coders/tunnel.rs @@ -74,9 +74,9 @@ pub fn decode_full_tunnel( let id = reader.get_id(); let timeout = reader.get_timeout(); let l_reader = reader.get_local().map_err(map_error_capnp_error!())?; - let local = decode_tunnel_endpoint(&l_reader).map_err(map_error_capnp_error!())?; + let local = decode_tunnel_endpoint(&l_reader)?; let r_reader = reader.get_remote().map_err(map_error_capnp_error!())?; - let remote = decode_tunnel_endpoint(&r_reader).map_err(map_error_capnp_error!())?; + let remote = decode_tunnel_endpoint(&r_reader)?; Ok(FullTunnel { id, @@ -103,7 +103,7 @@ pub fn decode_partial_tunnel( let id = reader.get_id(); let timeout = reader.get_timeout(); let l_reader = reader.get_local().map_err(map_error_capnp_error!())?; - let local = decode_tunnel_endpoint(&l_reader).map_err(map_error_capnp_error!())?; + let local = decode_tunnel_endpoint(&l_reader)?; Ok(PartialTunnel { id, timeout, local }) } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 7bff1129..308e9b66 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -403,7 +403,7 @@ impl RPCProcessor { #[instrument(level = "debug", skip(self, operation, safety_route_spec), err)] fn render_operation( &self, - dest: &Destination, + dest: Destination, operation: &RPCOperation, safety_route_spec: Option<&SafetyRouteSpec>, ) -> Result { @@ -420,7 +420,7 @@ impl RPCProcessor { let out; // Envelope data // To where are we sending the request - match &dest { + match dest { Destination::Direct(node_ref) | Destination::Relay(node_ref, _) => { // Send to a node without a private route // -------------------------------------- @@ -449,8 +449,7 @@ impl RPCProcessor { // No private route was specified for the request // but we are using a safety route, so we must create an empty private route let mut pr_builder = ::capnp::message::Builder::new_default(); - let private_route = - self.new_stub_private_route(node_id, &mut pr_builder)?; + let private_route = PrivateRoute::new_stub(node_id); let message_vec = builder_to_vec(msg_builder)?; // first @@ -469,14 +468,6 @@ impl RPCProcessor { Destination::PrivateRoute(private_route) => { // Send to private route // --------------------- - - // Encode the private route - let mut pr_msg_builder = ::capnp::message::Builder::new_default(); - let mut pr_builder = - pr_msg_builder.init_root::(); - encode_private_route(private_route, &mut pr_builder)?; - let pr_reader = pr_builder.into_reader(); - // Reply with 'route' operation let message_vec = builder_to_vec(msg_builder)?; out_node_id = match safety_route_spec { @@ -487,7 +478,7 @@ impl RPCProcessor { Some(rh) => rh.dial_info.node_id.key, _ => return Err(rpc_error_internal("private route has no hops")), }; - out = self.wrap_with_route(None, pr_reader, message_vec)?; + out = self.wrap_with_route(None, private_route, message_vec)?; out_node_id } Some(sr) => { @@ -500,7 +491,7 @@ impl RPCProcessor { .dial_info .node_id .key; - out = self.wrap_with_route(Some(sr), pr_reader, message_vec)?; + out = self.wrap_with_route(Some(sr), private_route, message_vec)?; out_node_id } } @@ -533,6 +524,10 @@ impl RPCProcessor { ) -> Result { // Wrap question in operation let operation = RPCOperation::new_question(question); + let op_id = operation.op_id(); + + // Log rpc send + debug!(target: "rpc_message", dir = "send", kind = "question", op_id, desc = operation.kind().desc(), ?dest); // Produce rendered operation let RenderedOperation { @@ -540,7 +535,7 @@ impl RPCProcessor { out_node_id, out_noderef, hopcount, - } = self.render_operation(&dest, &operation, safety_route_spec)?; + } = self.render_operation(dest, &operation, safety_route_spec)?; // Calculate answer timeout // Timeout is number of hops times the timeout per hop @@ -561,12 +556,8 @@ impl RPCProcessor { }; // Set up op id eventual - let op_id = operation.op_id(); let eventual = self.add_op_id_waiter(op_id); - // Log rpc send - debug!(target: "rpc_message", dir = "send", kind = "question", op_id, desc = operation.kind().desc(), ?dest); - // Send question let bytes = out.len() as u64; let send_ts = intf::get_timestamp(); @@ -614,13 +605,16 @@ impl RPCProcessor { // Wrap statement in operation let operation = RPCOperation::new_statement(statement); + // Log rpc send + debug!(target: "rpc_message", dir = "send", kind = "statement", op_id = operation.op_id(), desc = operation.kind().desc(), ?dest); + // Produce rendered operation let RenderedOperation { out, out_node_id, out_noderef, hopcount, - } = self.render_operation(&dest, &operation, safety_route_spec)?; + } = self.render_operation(dest, &operation, safety_route_spec)?; // Calculate answer timeout // Timeout is number of hops times the timeout per hop @@ -640,9 +634,6 @@ impl RPCProcessor { } }; - // Log rpc send - debug!(target: "rpc_message", dir = "send", kind = "statement", op_id = operation.op_id(), desc = operation.kind().desc(), ?dest); - // Send statement let bytes = out.len() as u64; let send_ts = intf::get_timestamp(); @@ -713,13 +704,16 @@ impl RPCProcessor { // Extract destination from respond_to let dest = self.get_respond_to_destination(&request); + // Log rpc send + debug!(target: "rpc_message", dir = "send", kind = "answer", op_id = operation.op_id(), desc = operation.kind().desc(), ?dest); + // Produce rendered operation let RenderedOperation { out, out_node_id, out_noderef, hopcount, - } = self.render_operation(&dest, &operation, safety_route_spec)?; + } = self.render_operation(dest, &operation, safety_route_spec)?; // If we need to resolve the first hop, do it let node_ref = match out_noderef { @@ -733,9 +727,6 @@ impl RPCProcessor { } }; - // Log rpc send - debug!(target: "rpc_message", dir = "send", kind = "answer", op_id = operation.op_id(), desc = operation.kind().desc(), ?dest); - // Send the reply let bytes = out.len() as u64; let send_ts = intf::get_timestamp(); @@ -768,16 +759,18 @@ impl RPCProcessor { &self, encoded_msg: RPCMessageEncoded, ) -> Result<(), RPCError> { - // Make an operation reader - let reader = capnp::message::Reader::new(encoded_msg.data, Default::default()); + // Decode the operation let sender_node_id = encoded_msg.header.envelope.get_sender_id(); - let operation = reader - .get_root::() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?; // Decode the RPC message - let operation = RPCOperation::decode(&operation, &sender_node_id)?; + let operation = { + let reader = capnp::message::Reader::new(encoded_msg.data, Default::default()); + let op_reader = reader + .get_root::() + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; + RPCOperation::decode(&op_reader, &sender_node_id)? + }; // Get the sender noderef, incorporating and 'sender node info' we have from a question let mut opt_sender_nr: Option = None; diff --git a/veilid-core/src/rpc_processor/private_route.rs b/veilid-core/src/rpc_processor/private_route.rs index 4b242de4..7b2ea1da 100644 --- a/veilid-core/src/rpc_processor/private_route.rs +++ b/veilid-core/src/rpc_processor/private_route.rs @@ -2,32 +2,14 @@ use super::*; impl RPCProcessor { ////////////////////////////////////////////////////////////////////// - pub(super) fn new_stub_private_route<'a, T>( + fn compile_safety_route( &self, - dest_node_id: DHTKey, - builder: &'a mut ::capnp::message::Builder, - ) -> Result, RPCError> - where - T: capnp::message::Allocator + 'a, - { - let mut pr = builder.init_root::(); - - let mut pr_pk = pr.reborrow().init_public_key(); - encode_public_key(&dest_node_id, &mut pr_pk)?; - pr.set_hop_count(0u8); - // leave firstHop as null - Ok(pr.into_reader()) - } - - fn encode_safety_route<'a>( - &self, - safety_route: &SafetyRouteSpec, - private_route: veilid_capnp::private_route::Reader<'a>, - builder: &'a mut veilid_capnp::safety_route::Builder<'a>, - ) -> Result<(), RPCError> { + safety_route_spec: &SafetyRouteSpec, + private_route: PrivateRoute, + ) -> Result { // Ensure the total hop count isn't too long for our config - let pr_hopcount = private_route.get_hop_count() as usize; - let sr_hopcount = safety_route.hops.len(); + let pr_hopcount = private_route.hop_count as usize; + let sr_hopcount = safety_route_spec.hops.len(); let hopcount = 1 + sr_hopcount + pr_hopcount; if hopcount > self.inner.lock().max_route_hop_count { return Err(rpc_error_internal("hop count too long for route")); @@ -35,7 +17,7 @@ impl RPCProcessor { // Build the safety route let mut sr_pk = builder.reborrow().init_public_key(); - encode_public_key(&safety_route.public_key, &mut sr_pk)?; + encode_public_key(&safety_route_spec.public_key, &mut sr_pk)?; builder.set_hop_count( u8::try_from(sr_hopcount) @@ -78,7 +60,7 @@ impl RPCProcessor { let mut rh_message = ::capnp::message::Builder::new_default(); let mut rh_builder = rh_message.init_root::(); let mut di_builder = rh_builder.reborrow().init_dial_info(); - encode_node_dial_info(&safety_route.hops[h].dial_info, &mut di_builder)?; + encode_node_dial_info(&safety_route_spec.hops[h].dial_info, &mut di_builder)?; // RouteHopData let mut rhd_builder = rh_builder.init_next_hop(); // Add the nonce @@ -88,8 +70,8 @@ impl RPCProcessor { let dh_secret = self .crypto .cached_dh( - &safety_route.hops[h].dial_info.node_id.key, - &safety_route.secret_key, + &safety_route_spec.hops[h].dial_info.node_id.key, + &safety_route_spec.secret_key, ) .map_err(map_error_internal!("dh failed"))?; let enc_msg_data = @@ -114,8 +96,8 @@ impl RPCProcessor { let dh_secret = self .crypto .cached_dh( - &safety_route.hops[0].dial_info.node_id.key, - &safety_route.secret_key, + &safety_route_spec.hops[0].dial_info.node_id.key, + &safety_route_spec.secret_key, ) .map_err(map_error_internal!("dh failed"))?; let enc_msg_data = Crypto::encrypt_aead(blob_data.as_slice(), &nonce, &dh_secret, None) @@ -128,14 +110,21 @@ impl RPCProcessor { } // Wrap an operation inside a route - pub(super) fn wrap_with_route<'a>( + pub(super) fn wrap_with_route( &self, safety_route: Option<&SafetyRouteSpec>, - private_route: veilid_capnp::private_route::Reader<'a>, + private_route: PrivateRoute, message_data: Vec, ) -> Result, RPCError> { + // Encode the private route + let mut pr_msg_builder = ::capnp::message::Builder::new_default(); + let mut pr_builder = pr_msg_builder.init_root::(); + encode_private_route(&private_route, &mut pr_builder)?; + let pr_reader = pr_builder.into_reader(); + // Get stuff before we lock inner - let op_id = self.get_next_op_id(); + let op_id = intf::get_random_u64(); + // Encrypt routed operation let nonce = Crypto::get_random_nonce(); let pr_pk_reader = private_route @@ -152,6 +141,12 @@ impl RPCProcessor { .map_err(map_error_internal!("encryption failed"))?; // Prepare route operation + + let route = RPCOperationRoute { + safety_route: todo!(), + operation: todo!(), + }; + let route_msg = { let mut route_msg = ::capnp::message::Builder::new_default(); let mut route_operation = route_msg.init_root::(); diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index dbe37dfc..a61ade81 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -9,140 +9,75 @@ impl RPCProcessor { key: DHTKey, safety_route: Option<&SafetyRouteSpec>, respond_to: RespondTo, - ) -> Result { - let find_node_q_msg = { - let mut find_node_q_msg = ::capnp::message::Builder::new_default(); - let mut question = find_node_q_msg.init_root::(); - question.set_op_id(self.get_next_op_id()); - let mut respond_to_builder = question.reborrow().init_respond_to(); - respond_to.encode(&mut respond_to_builder)?; - let detail = question.reborrow().init_detail(); - let mut fnq = detail.init_find_node_q(); - let mut node_id_builder = fnq.reborrow().init_node_id(); - encode_public_key(&key, &mut node_id_builder)?; - - find_node_q_msg.into_reader() - }; + ) -> Result>, RPCError> { + let find_node_q = RPCOperationFindNodeQ { node_id: key }; + let question = RPCQuestion::new(respond_to, RPCQuestionDetail::FindNodeQ(find_node_q)); // Send the find_node request - let waitable_reply = self - .request(dest, find_node_q_msg, safety_route) - .await? - .unwrap(); + let waitable_reply = self.question(dest, question, safety_route).await?; // Wait for reply - let (rpcreader, latency) = self.wait_for_reply(waitable_reply).await?; + let (msg, latency) = self.wait_for_reply(waitable_reply).await?; - let response_operation = rpcreader - .reader - .get_root::() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?; - let find_node_a = match response_operation - .get_detail() - .which() - .map_err(map_error_capnp_notinschema!()) - .map_err(logthru_rpc!())? - { - veilid_capnp::operation::detail::FindNodeA(a) => { - a.map_err(map_error_internal!("Invalid FindNodeA"))? - } - _ => return Err(rpc_error_internal("Incorrect RPC answer for question")), + // Get the right answer type + let find_node_a = match msg.operation.into_kind() { + RPCOperationKind::Answer(a) => match a.into_detail() { + RPCAnswerDetail::FindNodeA(a) => a, + _ => return Err(rpc_error_invalid_format("not a find_node answer")), + }, + _ => return Err(rpc_error_invalid_format("not an answer")), }; - let peers_reader = find_node_a - .get_peers() - .map_err(map_error_internal!("Missing peers"))?; - let mut peers = Vec::::with_capacity( - peers_reader - .len() - .try_into() - .map_err(map_error_internal!("too many peers"))?, - ); - for p in peers_reader.iter() { - let peer_info = decode_peer_info(&p, true)?; - + // Verify peers are in the correct peer scope + for peer_info in &find_node_a.peers { if !self.filter_peer_scope(&peer_info.signed_node_info.node_info) { return Err(rpc_error_invalid_format( "find_node response has invalid peer scope", )); } - - peers.push(peer_info); } - let out = FindNodeAnswer { latency, peers }; - - Ok(out) + Ok(Answer::new(latency, find_node_a.peers)) } - pub(crate) async fn process_find_node_q(&self, rpcreader: RPCMessage) -> Result<(), RPCError> { - // - let reply_msg = { - let operation = rpcreader - .reader - .get_root::() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?; - - // find_node must always want an answer - if !self.wants_answer(&operation)? { - return Err(rpc_error_invalid_format("find_node_q should want answer")); - } - - // get findNodeQ reader - let fnq_reader = match operation.get_detail().which() { - Ok(veilid_capnp::operation::detail::Which::FindNodeQ(Ok(x))) => x, - _ => panic!("invalid operation type in process_find_node_q"), - }; - - // get the node id we want to look up - let target_node_id = decode_public_key( - &fnq_reader - .get_node_id() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?, - ); - - // add node information for the requesting node to our routing table - let routing_table = self.routing_table(); - - // find N nodes closest to the target node in our routing table - let own_peer_info = routing_table.get_own_peer_info(); - let own_peer_info_is_valid = own_peer_info.signed_node_info.is_valid(); - - let closest_nodes = routing_table.find_closest_nodes( - target_node_id, - // filter - Some(move |_k, v| { - RoutingTable::filter_has_valid_signed_node_info(v, own_peer_info_is_valid) - }), - // transform - move |k, v| RoutingTable::transform_to_peer_info(k, v, &own_peer_info), - ); - log_rpc!(">>>> Returning {} closest peers", closest_nodes.len()); - - // Send find_node answer - let mut reply_msg = ::capnp::message::Builder::new_default(); - let mut answer = reply_msg.init_root::(); - answer.set_op_id(operation.get_op_id()); - let mut respond_to = answer.reborrow().init_respond_to(); - respond_to.set_none(()); - let detail = answer.reborrow().init_detail(); - let fna = detail.init_find_node_a(); - let mut peers_builder = fna.init_peers( - closest_nodes - .len() - .try_into() - .map_err(map_error_internal!("invalid closest nodes list length"))?, - ); - for (i, closest_node) in closest_nodes.iter().enumerate() { - let mut pi_builder = peers_builder.reborrow().get(i as u32); - encode_peer_info(closest_node, &mut pi_builder)?; - } - reply_msg.into_reader() + pub(crate) async fn process_find_node_q(&self, msg: RPCMessage) -> Result<(), RPCError> { + // Get the question + let find_node_q = match msg.operation.kind() { + RPCOperationKind::Question(q) => match q.detail() { + RPCQuestionDetail::FindNodeQ(q) => q, + _ => panic!("not a status question"), + }, + _ => panic!("not a question"), }; - self.reply(rpcreader, reply_msg, None).await + // add node information for the requesting node to our routing table + let routing_table = self.routing_table(); + + // find N nodes closest to the target node in our routing table + let own_peer_info = routing_table.get_own_peer_info(); + let own_peer_info_is_valid = own_peer_info.signed_node_info.is_valid(); + + let closest_nodes = routing_table.find_closest_nodes( + find_node_q.node_id, + // filter + Some(move |_k, v| { + RoutingTable::filter_has_valid_signed_node_info(v, own_peer_info_is_valid) + }), + // transform + move |k, v| RoutingTable::transform_to_peer_info(k, v, &own_peer_info), + ); + + // Make status answer + let find_node_a = RPCOperationFindNodeA { + peers: closest_nodes, + }; + + // Send status answer + self.answer( + msg, + RPCAnswer::new(RPCAnswerDetail::FindNodeA(find_node_a)), + None, + ) + .await } } diff --git a/veilid-core/src/rpc_processor/rpc_node_info_update.rs b/veilid-core/src/rpc_processor/rpc_node_info_update.rs index 70a577fc..84e7e5a2 100644 --- a/veilid-core/src/rpc_processor/rpc_node_info_update.rs +++ b/veilid-core/src/rpc_processor/rpc_node_info_update.rs @@ -4,73 +4,41 @@ impl RPCProcessor { // Sends a our node info to another node // Can be sent via all methods including relays and routes pub async fn rpc_call_node_info_update( - &self, + self, dest: Destination, safety_route: Option<&SafetyRouteSpec>, ) -> Result<(), RPCError> { - let sni_msg = { - let mut sni_msg = ::capnp::message::Builder::new_default(); - let mut question = sni_msg.init_root::(); - question.set_op_id(self.get_next_op_id()); - let mut respond_to = question.reborrow().init_respond_to(); - respond_to.set_none(()); - let detail = question.reborrow().init_detail(); - let niu_builder = detail.init_node_info_update(); - let mut sni_builder = niu_builder.init_signed_node_info(); - let sni = self.routing_table().get_own_signed_node_info(); - encode_signed_node_info(&sni, &mut sni_builder)?; - - sni_msg.into_reader() - }; + let signed_node_info = self.routing_table().get_own_signed_node_info(); + let node_info_update = RPCOperationNodeInfoUpdate { signed_node_info }; + let statement = RPCStatement::new(RPCStatementDetail::NodeInfoUpdate(node_info_update)); // Send the node_info_update request - self.request(dest, sni_msg, safety_route).await?; + self.statement(dest, statement, safety_route).await?; Ok(()) } - pub(crate) async fn process_node_info_update( - &self, - rpcreader: RPCMessage, - ) -> Result<(), RPCError> { - // - let sender_node_id = rpcreader.header.envelope.get_sender_id(); - let signed_node_info = { - let operation = rpcreader - .reader - .get_root::() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?; + pub(crate) async fn process_node_info_update(&self, msg: RPCMessage) -> Result<(), RPCError> { + let sender_node_id = msg.header.envelope.get_sender_id(); - // This should never want an answer - if self.wants_answer(&operation)? { - return Err(rpc_error_invalid_format( - "node_info_update should not want answer", - )); - } - - // get nodeInfoUpdate reader - let niumsg_reader = match operation.get_detail().which() { - Ok(veilid_capnp::operation::detail::Which::NodeInfoUpdate(Ok(x))) => x, - _ => panic!("invalid operation type in process_node_info_update"), - }; - - // Parse out fields - let sni_reader = niumsg_reader - .get_signed_node_info() - .map_err(map_error_internal!("no valid signed node info"))?; - decode_signed_node_info(&sni_reader, &sender_node_id, true)? + // Get the statement + let node_info_update = match msg.operation.into_kind() { + RPCOperationKind::Statement(s) => match s.into_detail() { + RPCStatementDetail::NodeInfoUpdate(s) => s, + _ => panic!("not a node info update"), + }, + _ => panic!("not a statement"), }; // Update our routing table with signed node info - if !self.filter_peer_scope(&signed_node_info.node_info) { + if !self.filter_peer_scope(&node_info_update.signed_node_info.node_info) { return Err(rpc_error_invalid_format( "node_info_update has invalid peer scope", )); } let _ = self .routing_table() - .register_node_with_signed_node_info(sender_node_id, signed_node_info) + .register_node_with_signed_node_info(sender_node_id, node_info_update.signed_node_info) .map_err(RPCError::Internal)?; Ok(()) diff --git a/veilid-core/src/rpc_processor/rpc_return_receipt.rs b/veilid-core/src/rpc_processor/rpc_return_receipt.rs index 51d47889..2166f4e8 100644 --- a/veilid-core/src/rpc_processor/rpc_return_receipt.rs +++ b/veilid-core/src/rpc_processor/rpc_return_receipt.rs @@ -4,72 +4,36 @@ impl RPCProcessor { // Sends a unidirectional in-band return receipt // Can be sent via all methods including relays and routes pub async fn rpc_call_return_receipt>( - &self, + self, dest: Destination, safety_route: Option<&SafetyRouteSpec>, receipt: D, ) -> Result<(), RPCError> { - let receipt = receipt.as_ref(); + let receipt = receipt.as_ref().to_vec(); - let rr_msg = { - let mut rr_msg = ::capnp::message::Builder::new_default(); - let mut question = rr_msg.init_root::(); - question.set_op_id(self.get_next_op_id()); - let mut respond_to = question.reborrow().init_respond_to(); - respond_to.set_none(()); - let detail = question.reborrow().init_detail(); - let rr_builder = detail.init_return_receipt(); - let r_builder = rr_builder.init_receipt(receipt.len().try_into().map_err( - map_error_protocol!("invalid receipt length in return receipt"), - )?); - r_builder.copy_from_slice(receipt); + let return_receipt = RPCOperationReturnReceipt { receipt }; + let statement = RPCStatement::new(RPCStatementDetail::ReturnReceipt(return_receipt)); - rr_msg.into_reader() - }; - - // Send the return receipt request - self.request(dest, rr_msg, safety_route).await?; + // Send the return_receipt request + self.statement(dest, statement, safety_route).await?; Ok(()) } - pub(crate) async fn process_return_receipt( - &self, - rpcreader: RPCMessage, - ) -> Result<(), RPCError> { - let receipt = { - let operation = rpcreader - .reader - .get_root::() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?; - - // This should never want an answer - if self.wants_answer(&operation)? { - return Err(rpc_error_invalid_format( - "return receipt should not want answer", - )); - } - - // get returnReceipt reader - let rr_reader = match operation.get_detail().which() { - Ok(veilid_capnp::operation::detail::Which::ReturnReceipt(Ok(x))) => x, - _ => panic!("invalid operation type in process_return_receipt"), - }; - - // Get receipt - rr_reader - .get_receipt() - .map_err(map_error_internal!( - "no valid receipt in process_return_receipt" - ))? - .to_vec() + pub(crate) async fn process_return_receipt(&self, msg: RPCMessage) -> Result<(), RPCError> { + // Get the statement + let RPCOperationReturnReceipt { receipt } = match msg.operation.into_kind() { + RPCOperationKind::Statement(s) => match s.into_detail() { + RPCStatementDetail::ReturnReceipt(s) => s, + _ => panic!("not a return receipt"), + }, + _ => panic!("not a statement"), }; // Handle it let network_manager = self.network_manager(); network_manager - .handle_in_band_receipt(receipt, rpcreader.header.peer_noderef) + .handle_in_band_receipt(receipt, msg.header.peer_noderef) .await .map_err(map_error_string!()) } diff --git a/veilid-core/src/rpc_processor/rpc_signal.rs b/veilid-core/src/rpc_processor/rpc_signal.rs index 172e9912..c764b621 100644 --- a/veilid-core/src/rpc_processor/rpc_signal.rs +++ b/veilid-core/src/rpc_processor/rpc_signal.rs @@ -4,57 +4,35 @@ impl RPCProcessor { // Sends a unidirectional signal to a node // Can be sent via all methods including relays and routes pub async fn rpc_call_signal( - &self, + self, dest: Destination, safety_route: Option<&SafetyRouteSpec>, signal_info: SignalInfo, ) -> Result<(), RPCError> { - let sig_msg = { - let mut sig_msg = ::capnp::message::Builder::new_default(); - let mut question = sig_msg.init_root::(); - question.set_op_id(self.get_next_op_id()); - let mut respond_to = question.reborrow().init_respond_to(); - respond_to.set_none(()); - let detail = question.reborrow().init_detail(); - let mut sig_builder = detail.init_signal(); - encode_signal_info(&signal_info, &mut sig_builder)?; - - sig_msg.into_reader() - }; + //let signed_node_info = self.routing_table().get_own_signed_node_info(); + let signal = RPCOperationSignal { signal_info }; + let statement = RPCStatement::new(RPCStatementDetail::Signal(signal)); // Send the signal request - self.request(dest, sig_msg, safety_route).await?; + self.statement(dest, statement, safety_route).await?; Ok(()) } - pub(crate) async fn process_signal(&self, rpcreader: RPCMessage) -> Result<(), RPCError> { - let signal_info = { - let operation = rpcreader - .reader - .get_root::() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?; - - // This should never want an answer - if self.wants_answer(&operation)? { - return Err(rpc_error_invalid_format("signal should not want answer")); - } - - // get signal reader - let sig_reader = match operation.get_detail().which() { - Ok(veilid_capnp::operation::detail::Which::Signal(Ok(x))) => x, - _ => panic!("invalid operation type in process_signal"), - }; - - // Get signal info - decode_signal_info(&sig_reader)? + pub(crate) async fn process_signal(&self, msg: RPCMessage) -> Result<(), RPCError> { + // Get the statement + let signal = match msg.operation.into_kind() { + RPCOperationKind::Statement(s) => match s.into_detail() { + RPCStatementDetail::Signal(s) => s, + _ => panic!("not a node info update"), + }, + _ => panic!("not a statement"), }; // Handle it let network_manager = self.network_manager(); network_manager - .handle_signal(signal_info) + .handle_signal(signal.signal_info) .await .map_err(map_error_string!()) } diff --git a/veilid-core/src/rpc_processor/rpc_status.rs b/veilid-core/src/rpc_processor/rpc_status.rs index c3f5492c..37cc49a5 100644 --- a/veilid-core/src/rpc_processor/rpc_status.rs +++ b/veilid-core/src/rpc_processor/rpc_status.rs @@ -3,10 +3,7 @@ use super::*; impl RPCProcessor { // Send StatusQ RPC request, receive StatusA answer // Can be sent via relays, but not via routes - pub async fn rpc_call_status( - self, - peer: NodeRef, - ) -> Result, RPCError> { + pub async fn rpc_call_status(self, peer: NodeRef) -> Result, RPCError> { let node_status = self.network_manager().generate_node_status(); let status_q = RPCOperationStatusQ { node_status }; let respond_to = self.make_respond_to_sender(peer.clone()); @@ -56,7 +53,7 @@ impl RPCProcessor { } } - Ok(Answer::new(latency, status_a)) + Ok(Answer::new(latency, status_a.sender_info)) } pub(crate) async fn process_status_q(&self, msg: RPCMessage) -> Result<(), RPCError> { diff --git a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs index 4003c735..6fb56e6a 100644 --- a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs +++ b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs @@ -3,7 +3,7 @@ use super::*; impl RPCProcessor { // Can only be sent directly, not via relays or routes pub async fn rpc_call_validate_dial_info( - &self, + self, peer: NodeRef, dial_info: DialInfo, redirect: bool, @@ -16,38 +16,24 @@ impl RPCProcessor { .dht .validate_dial_info_receipt_time_ms, ); - // - let (vdi_msg, eventual_value) = { - let mut vdi_msg = ::capnp::message::Builder::new_default(); - let mut question = vdi_msg.init_root::(); - question.set_op_id(self.get_next_op_id()); - let mut respond_to = question.reborrow().init_respond_to(); - respond_to.set_none(()); - let detail = question.reborrow().init_detail(); - let mut vdi_builder = detail.init_validate_dial_info(); - // Generate receipt and waitable eventual so we can see if we get the receipt back - let (receipt, eventual_value) = network_manager - .generate_single_shot_receipt(receipt_time, []) - .map_err(map_error_string!())?; + // Generate receipt and waitable eventual so we can see if we get the receipt back + let (receipt, eventual_value) = network_manager + .generate_single_shot_receipt(receipt_time, []) + .map_err(map_error_string!())?; - vdi_builder.set_redirect(redirect); - let mut di_builder = vdi_builder.reborrow().init_dial_info(); - encode_dial_info(&dial_info, &mut di_builder)?; - let r_builder = vdi_builder.init_receipt(receipt.len().try_into().map_err( - map_error_protocol!("invalid receipt length in validate dial info"), - )?); - r_builder.copy_from_slice(&receipt); - - (vdi_msg.into_reader(), eventual_value) + let validate_dial_info = RPCOperationValidateDialInfo { + dial_info, + receipt, + redirect, }; + let statement = RPCStatement::new(RPCStatementDetail::ValidateDialInfo(validate_dial_info)); // Send the validate_dial_info request // This can only be sent directly, as relays can not validate dial info - self.request(Destination::Direct(peer), vdi_msg, None) + self.statement(Destination::Direct(peer), statement, None) .await?; - log_net!(debug "waiting for validate_dial_info receipt"); // Wait for receipt match eventual_value.await.take_value().unwrap() { ReceiptEvent::ReturnedInBand { inbound_noderef: _ } => Err(rpc_error_internal( @@ -67,44 +53,18 @@ impl RPCProcessor { } } - pub(crate) async fn process_validate_dial_info( - &self, - rpcreader: RPCMessage, - ) -> Result<(), RPCError> { - // - let (redirect, dial_info, receipt) = { - let operation = rpcreader - .reader - .get_root::() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?; - - // This should never want an answer - if self.wants_answer(&operation)? { - return Err(rpc_error_invalid_format( - "validate dial info should not want answer", - )); - } - - // get validateDialInfo reader - let vdi_reader = match operation.get_detail().which() { - Ok(veilid_capnp::operation::detail::Which::ValidateDialInfo(Ok(x))) => x, - _ => panic!("invalid operation type in process_validate_dial_info"), - }; - - // Parse out fields - let redirect = vdi_reader.get_redirect(); - let dial_info = decode_dial_info(&vdi_reader.get_dial_info().map_err( - map_error_internal!("no valid dial info in process_validate_dial_info"), - )?)?; - let receipt = vdi_reader - .get_receipt() - .map_err(map_error_internal!( - "no valid receipt in process_validate_dial_info" - ))? - .to_vec(); - - (redirect, dial_info, receipt) + pub(crate) async fn process_validate_dial_info(&self, msg: RPCMessage) -> Result<(), RPCError> { + // Get the statement + let RPCOperationValidateDialInfo { + dial_info, + receipt, + redirect, + } = match msg.operation.into_kind() { + RPCOperationKind::Statement(s) => match s.into_detail() { + RPCStatementDetail::ValidateDialInfo(s) => s, + _ => panic!("not a validate dial info"), + }, + _ => panic!("not a statement"), }; // Redirect this request if we are asked to @@ -115,19 +75,19 @@ impl RPCProcessor { // an ipv6 address let routing_table = self.routing_table(); let filter = DialInfoFilter::global().with_address_type(dial_info.address_type()); - let sender_id = rpcreader.header.envelope.get_sender_id(); + let sender_id = msg.header.envelope.get_sender_id(); let node_count = { let c = self.config.get(); c.network.dht.max_find_node_count as usize }; - let mut peers = routing_table.find_fast_public_nodes_filtered(node_count, &filter); + let peers = routing_table.find_fast_public_nodes_filtered(node_count, &filter); if peers.is_empty() { return Err(rpc_error_internal(format!( "no peers matching filter '{:?}'", filter ))); } - for peer in &mut peers { + for mut peer in peers { // Ensure the peer is not the one asking for the validation if peer.node_id() == sender_id { continue; @@ -166,26 +126,17 @@ impl RPCProcessor { } // Make a copy of the request, without the redirect flag - let vdi_msg_reader = { - let mut vdi_msg = ::capnp::message::Builder::new_default(); - let mut question = vdi_msg.init_root::(); - question.set_op_id(self.get_next_op_id()); - let mut respond_to = question.reborrow().init_respond_to(); - respond_to.set_none(()); - let detail = question.reborrow().init_detail(); - let mut vdi_builder = detail.init_validate_dial_info(); - vdi_builder.set_redirect(false); - let mut di_builder = vdi_builder.reborrow().init_dial_info(); - encode_dial_info(&dial_info, &mut di_builder)?; - let r_builder = vdi_builder.init_receipt(receipt.len().try_into().map_err( - map_error_protocol!("invalid receipt length in process_validate_dial_info"), - )?); - r_builder.copy_from_slice(&receipt); - vdi_msg.into_reader() + let validate_dial_info = RPCOperationValidateDialInfo { + dial_info: dial_info.clone(), + receipt: receipt.clone(), + redirect: false, }; + let statement = + RPCStatement::new(RPCStatementDetail::ValidateDialInfo(validate_dial_info)); - // Send the validate_dial_info request until we succeed - self.request(Destination::Direct(peer.clone()), vdi_msg_reader, None) + // Send the validate_dial_info request + // This can only be sent directly, as relays can not validate dial info + self.statement(Destination::Direct(peer), statement, None) .await?; } return Ok(()); diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 73b04650..f1b7a9cf 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -21,12 +21,11 @@ pub use intf::ProtectedStore; pub use intf::TableStore; pub use network_manager::NetworkManager; pub use routing_table::RoutingTable; -pub use rpc_processor::StatusAnswer; +//pub use rpc_processor::RPCProcessor; use core::fmt; use core_context::{api_shutdown, VeilidCoreContext}; use enumset::*; -use rpc_processor::{RPCError, RPCProcessor}; use serde::*; use xx::*; @@ -116,26 +115,26 @@ impl fmt::Display for VeilidAPIError { } } -fn convert_rpc_error(x: RPCError) -> VeilidAPIError { - match x { - RPCError::Timeout => VeilidAPIError::Timeout, - RPCError::Unreachable(n) => VeilidAPIError::NodeNotFound { - node_id: NodeId::new(n), - }, - RPCError::Unimplemented(s) => VeilidAPIError::Unimplemented { message: s }, - RPCError::Internal(s) => VeilidAPIError::Internal { message: s }, - RPCError::Protocol(s) => VeilidAPIError::Internal { message: s }, - RPCError::InvalidFormat(s) => VeilidAPIError::Internal { - message: format!("Invalid RPC format: {}", s), - }, - } -} +// fn convert_rpc_error(x: RPCError) -> VeilidAPIError { +// match x { +// RPCError::Timeout => VeilidAPIError::Timeout, +// RPCError::Unreachable(n) => VeilidAPIError::NodeNotFound { +// node_id: NodeId::new(n), +// }, +// RPCError::Unimplemented(s) => VeilidAPIError::Unimplemented { message: s }, +// RPCError::Internal(s) => VeilidAPIError::Internal { message: s }, +// RPCError::Protocol(s) => VeilidAPIError::Internal { message: s }, +// RPCError::InvalidFormat(s) => VeilidAPIError::Internal { +// message: format!("Invalid RPC format: {}", s), +// }, +// } +// } -macro_rules! map_rpc_error { - () => { - |x| convert_rpc_error(x) - }; -} +// macro_rules! map_rpc_error { +// () => { +// |x| convert_rpc_error(x) +// }; +// } macro_rules! parse_error { ($msg:expr, $val:expr) => { @@ -1643,6 +1642,17 @@ pub struct PrivateRouteSpec { pub hops: Vec, } +impl PrivateRouteSpec { + pub fn new() -> Self { + let (pk, sk) = generate_secret(); + PrivateRouteSpec { + public_key: pk, + secret_key: sk, + hops: Vec::new(), + } + } +} + #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct SafetyRouteSpec { pub public_key: DHTKey, @@ -1837,13 +1847,14 @@ impl VeilidAPI { } Err(VeilidAPIError::NotInitialized) } - pub fn rpc_processor(&self) -> Result { - let inner = self.inner.lock(); - if let Some(context) = &inner.context { - return Ok(context.attachment_manager.network_manager().rpc_processor()); - } - Err(VeilidAPIError::NotInitialized) - } + + // pub fn rpc_processor(&self) -> Result { + // let inner = self.inner.lock(); + // if let Some(context) = &inner.context { + // return Ok(context.attachment_manager.network_manager().rpc_processor()); + // } + // Err(VeilidAPIError::NotInitialized) + // } //////////////////////////////////////////////////////////////// // Attach/Detach