From 8b5fc96c8cc15c3bbdfb839b75ff45a246589183 Mon Sep 17 00:00:00 2001 From: John Smith Date: Mon, 4 Jul 2022 22:44:04 -0400 Subject: [PATCH] checkpoint --- .../native/network_class_discovery.rs | 2 +- .../coders/operations/operation_route.rs | 10 +- .../src/rpc_processor/coders/tunnel.rs | 6 +- veilid-core/src/rpc_processor/debug.rs | 26 -- veilid-core/src/rpc_processor/mod.rs | 249 +++++++++--------- .../src/rpc_processor/private_route.rs | 152 ++++------- veilid-core/src/veilid_api/mod.rs | 4 +- 7 files changed, 197 insertions(+), 252 deletions(-) diff --git a/veilid-core/src/network_manager/native/network_class_discovery.rs b/veilid-core/src/network_manager/native/network_class_discovery.rs index 9308e81f..caf5ac19 100644 --- a/veilid-core/src/network_manager/native/network_class_discovery.rs +++ b/veilid-core/src/network_manager/native/network_class_discovery.rs @@ -85,7 +85,7 @@ impl DiscoveryContext { node_ref )) .map(|sa| { - let ret = sa.sender_info.socket_address; + let ret = sa.answer.socket_address; log_net!("request_public_address: {:?}", ret); ret }) diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_route.rs b/veilid-core/src/rpc_processor/coders/operations/operation_route.rs index 4691dd46..d5e4fffe 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_route.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_route.rs @@ -2,13 +2,21 @@ use crate::*; use rpc_processor::*; #[derive(Debug, Clone)] -struct RoutedOperation { +pub struct RoutedOperation { pub signatures: Vec, pub nonce: Nonce, pub data: Vec, } impl RoutedOperation { + pub fn new(nonce: Nonce, data: Vec) -> Self { + Self { + signatures: Vec::new(), + nonce, + data, + } + } + pub fn decode( reader: &veilid_capnp::routed_operation::Reader, ) -> Result { diff --git a/veilid-core/src/rpc_processor/coders/tunnel.rs b/veilid-core/src/rpc_processor/coders/tunnel.rs index 4991e46a..3c80f10d 100644 --- a/veilid-core/src/rpc_processor/coders/tunnel.rs +++ b/veilid-core/src/rpc_processor/coders/tunnel.rs @@ -61,9 +61,9 @@ pub fn encode_full_tunnel( ) -> Result<(), RPCError> { builder.set_id(full_tunnel.id); builder.set_timeout(full_tunnel.timeout); - let l_builder = builder.init_local(); + let mut l_builder = builder.reborrow().init_local(); encode_tunnel_endpoint(&full_tunnel.local, &mut l_builder)?; - let r_builder = builder.init_remote(); + let mut r_builder = builder.reborrow().init_remote(); encode_tunnel_endpoint(&full_tunnel.remote, &mut r_builder)?; Ok(()) } @@ -92,7 +92,7 @@ pub fn encode_partial_tunnel( ) -> Result<(), RPCError> { builder.set_id(partial_tunnel.id); builder.set_timeout(partial_tunnel.timeout); - let l_builder = builder.init_local(); + let mut l_builder = builder.reborrow().init_local(); encode_tunnel_endpoint(&partial_tunnel.local, &mut l_builder)?; Ok(()) } diff --git a/veilid-core/src/rpc_processor/debug.rs b/veilid-core/src/rpc_processor/debug.rs index 36610f63..62766d5e 100644 --- a/veilid-core/src/rpc_processor/debug.rs +++ b/veilid-core/src/rpc_processor/debug.rs @@ -86,29 +86,3 @@ macro_rules! map_error_panic { |_| panic!("oops") }; } - -impl RPCProcessor { - pub(super) fn get_rpc_message_debug_info( - &self, - message: &capnp::message::Reader, - ) -> String { - let operation = match message.get_root::() { - Ok(v) => v, - Err(e) => { - return format!("invalid operation: {}", e); - } - }; - let op_id = operation.get_op_id(); - let detail = match operation.get_detail().which() { - Ok(v) => v, - Err(e) => { - return format!("(operation detail not in schema: {})", e); - } - }; - format!( - "#{} {}", - op_id, - Self::get_rpc_operation_detail_debug_info(&detail) - ) - } -} diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 308e9b66..5a051a20 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -140,10 +140,10 @@ impl Answer { } struct RenderedOperation { - out: Vec, // The rendered operation bytes - out_node_id: DHTKey, // Node id we're sending to - out_noderef: Option, // Node to send envelope to (may not be destination node id in case of relay) - hopcount: usize, // Total safety + private route hop count + message: Vec, // The rendered operation bytes + node_id: DHTKey, // Node id we're sending to + node_ref: Option, // Node to send envelope to (may not be destination node id in case of relay) + hop_count: usize, // Total safety + private route hop count } ///////////////////////////////////////////////////////////////////// @@ -400,6 +400,9 @@ impl RPCProcessor { } } + // Produce a byte buffer that represents the wire encoding of the entire + // unencrypted envelope body for a RPC message. This incorporates + // wrapping a private and/or safety route if they are specified. #[instrument(level = "debug", skip(self, operation, safety_route_spec), err)] fn render_operation( &self, @@ -407,110 +410,108 @@ impl RPCProcessor { operation: &RPCOperation, safety_route_spec: Option<&SafetyRouteSpec>, ) -> Result { - // Encode message to a builder and make a message reader for it - let mut msg_builder = ::capnp::message::Builder::new_default(); - let mut op_builder = msg_builder.init_root::(); - operation.encode(&mut op_builder)?; - - // Create envelope data let out_node_id; // Envelope Node Id - let mut out_noderef: Option = None; // Node to send envelope to - let hopcount: usize; // Total safety + private route hop count - let out = { - let out; // Envelope data + let mut out_node_ref: Option = None; // Node to send envelope to + let out_hop_count: usize; // Total safety + private route hop count + let out_message; // Envelope data - // To where are we sending the request - match dest { - Destination::Direct(node_ref) | Destination::Relay(node_ref, _) => { - // Send to a node without a private route - // -------------------------------------- + // Encode message to a builder and make a message reader for it + // Then produce the message as an unencrypted byte buffer + let message_vec = { + let mut msg_builder = ::capnp::message::Builder::new_default(); + let mut op_builder = msg_builder.init_root::(); + operation.encode(&mut op_builder)?; + builder_to_vec(msg_builder)? + }; - // Get the actual destination node id accounting for relays - let (node_ref, node_id) = if let Destination::Relay(_, dht_key) = dest { - (node_ref.clone(), dht_key.clone()) - } else { - let node_id = node_ref.node_id(); - (node_ref.clone(), node_id) - }; + // To where are we sending the request + match dest { + Destination::Direct(node_ref) | Destination::Relay(node_ref, _) => { + // Send to a node without a private route + // -------------------------------------- - // Handle the existence of safety route - match safety_route_spec { - None => { - // If no safety route is being used, and we're not sending to a private - // route, we can use a direct envelope instead of routing - out = builder_to_vec(msg_builder)?; + // Get the actual destination node id accounting for relays + let (node_ref, node_id) = if let Destination::Relay(_, dht_key) = dest { + (node_ref.clone(), dht_key.clone()) + } else { + let node_id = node_ref.node_id(); + (node_ref.clone(), node_id) + }; - // Message goes directly to the node - out_node_id = node_id; - out_noderef = Some(node_ref); - hopcount = 1; - } - Some(sr) => { - // No private route was specified for the request - // but we are using a safety route, so we must create an empty private route - let mut pr_builder = ::capnp::message::Builder::new_default(); - let private_route = PrivateRoute::new_stub(node_id); + // Handle the existence of safety route + match safety_route_spec { + None => { + // If no safety route is being used, and we're not sending to a private + // route, we can use a direct envelope instead of routing + out_message = message_vec; - let message_vec = builder_to_vec(msg_builder)?; - // first - out_node_id = sr - .hops - .first() - .ok_or_else(|| rpc_error_internal("no hop in safety route"))? - .dial_info - .node_id - .key; - out = self.wrap_with_route(Some(sr), private_route, message_vec)?; - hopcount = 1 + sr.hops.len(); - } - }; - } - Destination::PrivateRoute(private_route) => { - // Send to private route - // --------------------- - // Reply with 'route' operation - let message_vec = builder_to_vec(msg_builder)?; - out_node_id = match safety_route_spec { - None => { - // If no safety route, the first node is the first hop of the private route - hopcount = private_route.hop_count as usize; - let out_node_id = match &private_route.hops { - Some(rh) => rh.dial_info.node_id.key, - _ => return Err(rpc_error_internal("private route has no hops")), - }; - out = self.wrap_with_route(None, private_route, message_vec)?; - out_node_id - } - Some(sr) => { - // If safety route is in use, first node is the first hop of the safety route - hopcount = 1 + sr.hops.len() + (private_route.hop_count as usize); - let out_node_id = sr - .hops - .first() - .ok_or_else(|| rpc_error_internal("no hop in safety route"))? - .dial_info - .node_id - .key; - out = self.wrap_with_route(Some(sr), private_route, message_vec)?; - out_node_id - } + // Message goes directly to the node + out_node_id = node_id; + out_node_ref = Some(node_ref); + out_hop_count = 1; + } + Some(sr) => { + // No private route was specified for the request + // but we are using a safety route, so we must create an empty private route + let mut pr_builder = ::capnp::message::Builder::new_default(); + let private_route = PrivateRoute::new_stub(node_id); + + // first + out_node_id = sr + .hops + .first() + .ok_or_else(|| rpc_error_internal("no hop in safety route"))? + .dial_info + .node_id + .key; + out_message = self.wrap_with_route(Some(sr), private_route, message_vec)?; + out_hop_count = 1 + sr.hops.len(); + } + }; + } + Destination::PrivateRoute(private_route) => { + // Send to private route + // --------------------- + // Reply with 'route' operation + out_node_id = match safety_route_spec { + None => { + // If no safety route, the first node is the first hop of the private route + out_hop_count = private_route.hop_count as usize; + let out_node_id = match &private_route.hops { + Some(rh) => rh.dial_info.node_id.key, + _ => return Err(rpc_error_internal("private route has no hops")), + }; + out_message = self.wrap_with_route(None, private_route, message_vec)?; + out_node_id + } + Some(sr) => { + // If safety route is in use, first node is the first hop of the safety route + out_hop_count = 1 + sr.hops.len() + (private_route.hop_count as usize); + let out_node_id = sr + .hops + .first() + .ok_or_else(|| rpc_error_internal("no hop in safety route"))? + .dial_info + .node_id + .key; + out_message = self.wrap_with_route(Some(sr), private_route, message_vec)?; + out_node_id } } } - out - }; + } // Verify hop count isn't larger than out maximum routed hop count - if hopcount > self.inner.lock().max_route_hop_count { + if out_hop_count > self.inner.lock().max_route_hop_count { return Err(rpc_error_internal("hop count too long for route")) .map_err(logthru_rpc!(warn)); } Ok(RenderedOperation { - out, - out_node_id, - out_noderef, - hopcount, + message: out_message, + node_id: out_node_id, + node_ref: out_node_ref, + hop_count: out_hop_count, }) } @@ -531,21 +532,17 @@ impl RPCProcessor { // Produce rendered operation let RenderedOperation { - out, - out_node_id, - out_noderef, - hopcount, + message, + node_id, + node_ref, + hop_count, } = self.render_operation(dest, &operation, safety_route_spec)?; - // Calculate answer timeout - // Timeout is number of hops times the timeout per hop - let timeout = self.inner.lock().timeout * (hopcount as u64); - // If we need to resolve the first hop, do it - let node_ref = match out_noderef { + let node_ref = match node_ref { None => { // resolve node - self.resolve_node(out_node_id) + self.resolve_node(node_id) .await .map_err(logthru_rpc!(error))? } @@ -555,15 +552,19 @@ impl RPCProcessor { } }; + // Calculate answer timeout + // Timeout is number of hops times the timeout per hop + let timeout = self.inner.lock().timeout * (hop_count as u64); + // Set up op id eventual let eventual = self.add_op_id_waiter(op_id); // Send question - let bytes = out.len() as u64; + let bytes = message.len() as u64; let send_ts = intf::get_timestamp(); let send_data_kind = match self .network_manager() - .send_envelope(node_ref.clone(), Some(out_node_id), out) + .send_envelope(node_ref.clone(), Some(node_id), message) .await .map_err(RPCError::Internal) { @@ -610,21 +611,17 @@ impl RPCProcessor { // Produce rendered operation let RenderedOperation { - out, - out_node_id, - out_noderef, - hopcount, + message, + node_id, + node_ref, + hop_count, } = self.render_operation(dest, &operation, safety_route_spec)?; - // Calculate answer timeout - // Timeout is number of hops times the timeout per hop - let timeout = self.inner.lock().timeout * (hopcount as u64); - // If we need to resolve the first hop, do it - let node_ref = match out_noderef { + let node_ref = match node_ref { None => { // resolve node - self.resolve_node(out_node_id) + self.resolve_node(node_id) .await .map_err(logthru_rpc!(error))? } @@ -634,12 +631,16 @@ impl RPCProcessor { } }; + // Calculate answer timeout + // Timeout is number of hops times the timeout per hop + let timeout = self.inner.lock().timeout * (hop_count as u64); + // Send statement - let bytes = out.len() as u64; + let bytes = message.len() as u64; let send_ts = intf::get_timestamp(); let send_data_kind = match self .network_manager() - .send_envelope(node_ref.clone(), Some(out_node_id), out) + .send_envelope(node_ref.clone(), Some(node_id), message) .await .map_err(RPCError::Internal) { @@ -709,17 +710,17 @@ impl RPCProcessor { // Produce rendered operation let RenderedOperation { - out, - out_node_id, - out_noderef, - hopcount, + message, + node_id, + node_ref, + hop_count, } = self.render_operation(dest, &operation, safety_route_spec)?; // If we need to resolve the first hop, do it - let node_ref = match out_noderef { + let node_ref = match node_ref { None => { // resolve node - self.resolve_node(out_node_id).await? + self.resolve_node(node_id).await? } Some(nr) => { // got the node in the routing table already @@ -728,10 +729,10 @@ impl RPCProcessor { }; // Send the reply - let bytes = out.len() as u64; + let bytes = message.len() as u64; let send_ts = intf::get_timestamp(); self.network_manager() - .send_envelope(node_ref.clone(), Some(out_node_id), out) + .send_envelope(node_ref.clone(), Some(node_id), message) .await .map_err(RPCError::Internal) .map_err(|e| { diff --git a/veilid-core/src/rpc_processor/private_route.rs b/veilid-core/src/rpc_processor/private_route.rs index 7b2ea1da..089906a5 100644 --- a/veilid-core/src/rpc_processor/private_route.rs +++ b/veilid-core/src/rpc_processor/private_route.rs @@ -15,33 +15,17 @@ impl RPCProcessor { return Err(rpc_error_internal("hop count too long for route")); } - // Build the safety route - let mut sr_pk = builder.reborrow().init_public_key(); - encode_public_key(&safety_route_spec.public_key, &mut sr_pk)?; - - builder.set_hop_count( - u8::try_from(sr_hopcount) - .map_err(map_error_internal!("hop count too large for safety route"))?, - ); - - // Build all the hops in the safety route - let mut hops_builder = builder.reborrow().init_hops(); - if sr_hopcount == 0 { - hops_builder - .set_private(private_route) - .map_err(map_error_internal!( - "invalid private route while encoding safety route" - ))?; + // Create hops + let hops = if sr_hopcount == 0 { + SafetyRouteHops::Private(private_route) } else { // start last blob-to-encrypt data off as private route let mut blob_data = { let mut pr_message = ::capnp::message::Builder::new_default(); - pr_message - .set_root_canonical(private_route) - .map_err(map_error_internal!( - "invalid private route while encoding safety route" - ))?; + let mut pr_builder = pr_message.init_root::(); + encode_private_route(&private_route, &mut pr_builder)?; let mut blob_data = builder_to_vec(pr_message)?; + // append the private route tag so we know how to decode it later blob_data.push(1u8); blob_data @@ -56,16 +40,6 @@ impl RPCProcessor { for h in (1..sr_hopcount).rev() { // Get blob to encrypt for next hop blob_data = { - // RouteHop - let mut rh_message = ::capnp::message::Builder::new_default(); - let mut rh_builder = rh_message.init_root::(); - let mut di_builder = rh_builder.reborrow().init_dial_info(); - encode_node_dial_info(&safety_route_spec.hops[h].dial_info, &mut di_builder)?; - // RouteHopData - let mut rhd_builder = rh_builder.init_next_hop(); - // Add the nonce - let mut rhd_nonce = rhd_builder.reborrow().init_nonce(); - encode_nonce(&nonce, &mut rhd_nonce); // Encrypt the previous blob ENC(nonce, DH(PKhop,SKsr)) let dh_secret = self .crypto @@ -78,21 +52,34 @@ impl RPCProcessor { Crypto::encrypt_aead(blob_data.as_slice(), &nonce, &dh_secret, None) .map_err(map_error_internal!("encryption failed"))?; - rhd_builder.set_blob(enc_msg_data.as_slice()); + // Make route hop data + let route_hop_data = RouteHopData { + nonce, + blob: enc_msg_data, + }; + + // Make route hop + let route_hop = RouteHop { + dial_info: safety_route_spec.hops[h].dial_info.clone(), + next_hop: Some(route_hop_data), + }; + + // Make next blob from route hop + let mut rh_message = ::capnp::message::Builder::new_default(); + let mut rh_builder = rh_message.init_root::(); + encode_route_hop(&route_hop, &mut rh_builder)?; let mut blob_data = builder_to_vec(rh_message)?; - // append the route hop tag so we know how to decode it later + // Append the route hop tag so we know how to decode it later blob_data.push(0u8); blob_data }; + // Make another nonce for the next hop nonce = Crypto::get_random_nonce(); } // Encode first RouteHopData - let mut first_rhd_builder = hops_builder.init_data(); - let mut first_rhd_nonce = first_rhd_builder.reborrow().init_nonce(); - encode_nonce(&nonce, &mut first_rhd_nonce); let dh_secret = self .crypto .cached_dh( @@ -103,86 +90,61 @@ impl RPCProcessor { let enc_msg_data = Crypto::encrypt_aead(blob_data.as_slice(), &nonce, &dh_secret, None) .map_err(map_error_internal!("encryption failed"))?; - first_rhd_builder.set_blob(enc_msg_data.as_slice()); - } + let route_hop_data = RouteHopData { + nonce, + blob: enc_msg_data, + }; - Ok(()) + SafetyRouteHops::Data(route_hop_data) + }; + + // Build safety route + let safety_route = SafetyRoute { + public_key: safety_route_spec.public_key, + hop_count: safety_route_spec.hops.len() as u8, + hops, + }; + + Ok(safety_route) } // Wrap an operation inside a route pub(super) fn wrap_with_route( &self, - safety_route: Option<&SafetyRouteSpec>, + safety_route_spec: Option<&SafetyRouteSpec>, private_route: PrivateRoute, message_data: Vec, ) -> Result, RPCError> { - // Encode the private route - let mut pr_msg_builder = ::capnp::message::Builder::new_default(); - let mut pr_builder = pr_msg_builder.init_root::(); - encode_private_route(&private_route, &mut pr_builder)?; - let pr_reader = pr_builder.into_reader(); - - // Get stuff before we lock inner - let op_id = intf::get_random_u64(); - // Encrypt routed operation + // Xmsg + ENC(Xmsg, DH(PKapr, SKbsr)) let nonce = Crypto::get_random_nonce(); - let pr_pk_reader = private_route - .get_public_key() - .map_err(map_error_internal!("public key is invalid"))?; - let pr_pk = decode_public_key(&pr_pk_reader); - let stub_safety_route = SafetyRouteSpec::new(); - let sr = safety_route.unwrap_or(&stub_safety_route); + let stub_safety_route_spec = SafetyRouteSpec::new(); + let safety_route_spec = safety_route_spec.unwrap_or(&stub_safety_route_spec); let dh_secret = self .crypto - .cached_dh(&pr_pk, &sr.secret_key) + .cached_dh(&private_route.public_key, &safety_route_spec.secret_key) .map_err(map_error_internal!("dh failed"))?; let enc_msg_data = Crypto::encrypt_aead(&message_data, &nonce, &dh_secret, None) .map_err(map_error_internal!("encryption failed"))?; + // Compile the safety route with the private route + let safety_route = self.compile_safety_route(safety_route_spec, private_route)?; + + // Make the routed operation + let operation = RoutedOperation::new(nonce, enc_msg_data); + // Prepare route operation - let route = RPCOperationRoute { - safety_route: todo!(), - operation: todo!(), - }; - - let route_msg = { - let mut route_msg = ::capnp::message::Builder::new_default(); - let mut route_operation = route_msg.init_root::(); - - // Doesn't matter what this op id because there's no answer - // but it shouldn't conflict with any other op id either - route_operation.set_op_id(op_id); - - // Answers don't get a 'respond' - let mut respond_to = route_operation.reborrow().init_respond_to(); - respond_to.set_none(()); - - // Set up 'route' operation - let mut route = route_operation.reborrow().init_detail().init_route(); - - // Set the safety route we've constructed - let mut msg_sr = route.reborrow().init_safety_route(); - self.encode_safety_route(sr, private_route, &mut msg_sr)?; - - // Put in the encrypted operation we're routing - let mut msg_operation = route.init_operation(); - msg_operation.reborrow().init_signatures(0); - let mut route_nonce = msg_operation.reborrow().init_nonce(); - encode_nonce(&nonce, &mut route_nonce); - let data = msg_operation.reborrow().init_data( - enc_msg_data - .len() - .try_into() - .map_err(map_error_internal!("data too large"))?, - ); - data.copy_from_slice(enc_msg_data.as_slice()); - - route_msg + safety_route, + operation, }; + let operation = + RPCOperation::new_statement(RPCStatement::new(RPCStatementDetail::Route(route))); // Convert message to bytes and return it + let mut route_msg = ::capnp::message::Builder::new_default(); + let mut route_operation = route_msg.init_root::(); + operation.encode(&mut route_operation)?; let out = builder_to_vec(route_msg)?; Ok(out) } diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index f1b7a9cf..59903d7d 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -1581,13 +1581,13 @@ pub enum SignalInfo { } ///////////////////////////////////////////////////////////////////////////////////////////////////// -#[derive(Clone, Debug, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize)] +#[derive(Copy, Clone, Debug, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize)] pub enum TunnelMode { Raw, Turn, } -#[derive(Clone, Debug, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize)] +#[derive(Copy, Clone, Debug, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize)] pub enum TunnelError { BadId, // Tunnel ID was rejected NoEndpoint, // Endpoint was unreachable