From dc1a27c0a058840c30229c0217f09aca707ed24c Mon Sep 17 00:00:00 2001 From: John Smith Date: Mon, 19 Dec 2022 15:40:41 -0500 Subject: [PATCH] adjust routes --- Cargo.lock | 1 + veilid-core/proto/veilid.capnp | 13 +- .../src/routing_table/route_spec_store.rs | 117 ++++++++++++++---- veilid-core/src/rpc_processor/coders/mod.rs | 2 + .../coders/operations/operation_route.rs | 9 +- veilid-core/src/rpc_processor/mod.rs | 7 +- .../src/rpc_processor/operation_waiter.rs | 1 + veilid-core/src/rpc_processor/rpc_route.rs | 41 +++--- veilid-core/src/veilid_api/types.rs | 9 ++ veilid-tools/Cargo.toml | 1 + veilid-tools/src/tools.rs | 5 + 11 files changed, 160 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7f20d10b..a52f79f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6051,6 +6051,7 @@ dependencies = [ "async-lock", "async-std", "async_executors", + "backtrace", "cfg-if 1.0.0", "console_error_panic_hook", "eyre", diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index a0d13643..93df9af5 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -188,6 +188,12 @@ enum DialInfoClass @0x880005edfdd38b1e { portRestrictedNAT @5; # P = Device without portmap behind address-and-port restricted NAT } +enum Sequencing { + noPreference @0; + preferOrdered @1; + ensureOrdered @2; +} + struct DialInfoDetail @0x96423aa1d67b74d8 { dialInfo @0 :DialInfo; class @1 :DialInfoClass; @@ -266,9 +272,10 @@ struct PeerInfo @0xfe2d722d5d3c4bcb { struct RoutedOperation @0xcbcb8535b839e9dd { version @0 :UInt8; # crypto version in use for the data - signatures @1 :List(Signature); # signatures from nodes that have handled the private route - nonce @2 :Nonce; # nonce Xmsg - data @3 :Data; # operation encrypted with ENC(Xmsg,DH(PKapr,SKbsr)) + sequencing @1 :Sequencing; # sequencing preference to use to pass the message along + signatures @2 :List(Signature); # signatures from nodes that have handled the private route + nonce @3 :Nonce; # nonce Xmsg + data @4 :Data; # operation encrypted with ENC(Xmsg,DH(PKapr,SKbsr)) } struct OperationStatusQ @0x865d80cea70d884a { diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index 87b0d283..77832685 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -187,8 +187,8 @@ pub struct RouteSpecDetail { directions: DirectionSet, /// Stability preference (prefer reliable nodes over faster) stability: Stability, - /// Sequencing preference (connection oriented protocols vs datagram) - sequencing: Sequencing, + /// Sequencing capability (connection oriented protocols vs datagram) + can_do_sequenced: bool, /// Stats stats: RouteStats, } @@ -206,6 +206,21 @@ impl RouteSpecDetail { pub fn hop_count(&self) -> usize { self.hops.len() } + pub fn get_secret_key(&self) -> DHTKeySecret { + self.secret_key + } + pub fn get_stability(&self) -> Stability { + self.stability + } + pub fn is_sequencing_match(&self, sequencing: Sequencing) -> bool { + match sequencing { + Sequencing::NoPreference => true, + Sequencing::PreferOrdered => true, + Sequencing::EnsureOrdered => { + self.can_do_sequenced + } + } + } } /// The core representation of the RouteSpecStore that can be serialized @@ -336,8 +351,8 @@ fn _get_route_permutation_count(hop_count: usize) -> usize { // hop_count = 4 -> 3! -> 6 (3..hop_count).into_iter().fold(2usize, |acc, x| acc * x) } - -type PermFunc<'t> = Box Option<(Vec, Vec)> + Send + 't>; +type PermReturnType = (Vec, Vec, bool); +type PermFunc<'t> = Box Option + Send + 't>; /// get the route permutation at particular 'perm' index, starting at the 'start' index /// for a set of 'hop_count' nodes. the first node is always fixed, and the maximum @@ -347,7 +362,7 @@ fn with_route_permutations( hop_count: usize, start: usize, f: &PermFunc, -) -> Option<(Vec, Vec)> { +) -> Option { if hop_count == 0 { unreachable!(); } @@ -366,7 +381,7 @@ fn with_route_permutations( permutation: &mut [usize], size: usize, f: &PermFunc, - ) -> Option<(Vec, Vec)> { + ) -> Option { if size == 1 { return f(&permutation); } @@ -759,6 +774,24 @@ impl RouteSpecStore { return cmp_used; } + // apply sequencing preference + // ensureordered will be taken care of by filter + // and nopreference doesn't care + if matches!(sequencing, Sequencing::PreferOrdered) { + let cmp_seq = v1.1.as_ref().unwrap().with(rti, |rti, e1| { + v2.1.as_ref() + .unwrap() + .with(rti, |_rti, e2| { + let e1_can_do_ordered = e1.signed_node_info(RoutingDomain::PublicInternet).map(|sni| sni.has_sequencing_matched_dial_info(sequencing)).unwrap_or(false); + let e2_can_do_ordered = e2.signed_node_info(RoutingDomain::PublicInternet).map(|sni| sni.has_sequencing_matched_dial_info(sequencing)).unwrap_or(false); + e2_can_do_ordered.cmp(&e1_can_do_ordered) + }) + }); + if !matches!(cmp_seq, Ordering::Equal) { + return cmp_seq; + } + } + // always prioritize reliable nodes, but sort by oldest or fastest let cmpout = v1.1.as_ref().unwrap().with(rti, |rti, e1| { v2.1.as_ref() @@ -825,6 +858,7 @@ impl RouteSpecStore { } // Ensure this route is viable by checking that each node can contact the next one + let mut can_do_sequenced = true; if directions.contains(Direction::Outbound) { let mut previous_node = &our_peer_info; let mut reachable = true; @@ -841,6 +875,21 @@ impl RouteSpecStore { reachable = false; break; } + + // Check if we can do sequenced specifically + if can_do_sequenced { + let cm = rti.get_contact_method( + RoutingDomain::PublicInternet, + previous_node, + current_node, + DialInfoFilter::all(), + Sequencing::EnsureOrdered, + ); + if matches!(cm, ContactMethod::Unreachable) { + can_do_sequenced = false; + } + } + previous_node = current_node; } if !reachable { @@ -863,6 +912,20 @@ impl RouteSpecStore { reachable = false; break; } + + // Check if we can do sequenced specifically + if can_do_sequenced { + let cm = rti.get_contact_method( + RoutingDomain::PublicInternet, + next_node, + current_node, + DialInfoFilter::all(), + Sequencing::EnsureOrdered, + ); + if matches!(cm, ContactMethod::Unreachable) { + can_do_sequenced = false; + } + } next_node = current_node; } if !reachable { @@ -871,17 +934,19 @@ impl RouteSpecStore { } // Keep this route let route_nodes = permutation.to_vec(); - Some((route_nodes, cache_key)) + Some((route_nodes, cache_key, can_do_sequenced)) }) as PermFunc; let mut route_nodes: Vec = Vec::new(); let mut cache_key: Vec = Vec::new(); + let mut can_do_sequenced: bool = true; for start in 0..(nodes.len() - hop_count) { // Try the permutations available starting with 'start' - if let Some((rn, ck)) = with_route_permutations(hop_count, start, &perm_func) { + if let Some((rn, ck, cds)) = with_route_permutations(hop_count, start, &perm_func) { route_nodes = rn; cache_key = ck; + can_do_sequenced = cds; break; } } @@ -902,6 +967,8 @@ impl RouteSpecStore { let (public_key, secret_key) = generate_secret(); + + let rsd = RouteSpecDetail { secret_key, hops, @@ -909,7 +976,7 @@ impl RouteSpecStore { published: false, directions, stability, - sequencing, + can_do_sequenced, stats: RouteStats::new(cur_ts), }; @@ -924,14 +991,18 @@ impl RouteSpecStore { Ok(Some(public_key)) } - #[instrument(level = "trace", skip(self, data), ret)] - pub fn validate_signatures( + #[instrument(level = "trace", skip(self, data, callback), ret)] + pub fn with_signature_validated_route( &self, public_key: &DHTKey, signatures: &[DHTSignature], data: &[u8], last_hop_id: DHTKey, - ) -> Option<(DHTKeySecret, SafetySpec)> { + callback: F, + ) -> Option + where F: FnOnce(&RouteSpecDetail) -> R, + R: fmt::Debug, + { let inner = &*self.inner.lock(); let Some(rsd) = Self::detail(inner, &public_key) else { log_rpc!(debug "route does not exist: {:?}", public_key); @@ -962,16 +1033,8 @@ impl RouteSpecStore { } } } - // We got the correct signatures, return a key and response safety spec - Some(( - rsd.secret_key, - SafetySpec { - preferred_route: Some(*public_key), - hop_count: rsd.hops.len(), - stability: rsd.stability, - sequencing: rsd.sequencing, - }, - )) + // We got the correct signatures, return a key and response safety spec + Some(callback(rsd)) } #[instrument(level = "trace", skip(self), ret, err)] @@ -982,9 +1045,13 @@ impl RouteSpecStore { let inner = &mut *self.inner.lock(); let rsd = Self::detail(inner, &key).ok_or_else(|| eyre!("route does not exist"))?; + + // Match the private route's hop length for safety route length let hop_count = rsd.hops.len(); - let stability = rsd.stability; - let sequencing = rsd.sequencing; + // Always test routes with safety routes that are more likely to succeed + let stability = Stability::Reliable; + // Routes can test with whatever sequencing they were allocated with + let sequencing = Sequencing::NoPreference; let safety_spec = SafetySpec { preferred_route: Some(key.clone()), @@ -1157,7 +1224,7 @@ impl RouteSpecStore { // but definitely prefer routes that have been recently tested for detail in &inner.content.details { if detail.1.stability >= stability - && detail.1.sequencing >= sequencing + && detail.1.is_sequencing_match(sequencing) && detail.1.hops.len() >= min_hop_count && detail.1.hops.len() <= max_hop_count && detail.1.directions.is_superset(directions) diff --git a/veilid-core/src/rpc_processor/coders/mod.rs b/veilid-core/src/rpc_processor/coders/mod.rs index b5c3c709..dafc01f2 100644 --- a/veilid-core/src/rpc_processor/coders/mod.rs +++ b/veilid-core/src/rpc_processor/coders/mod.rs @@ -14,6 +14,7 @@ mod peer_info; mod private_safety_route; mod protocol_type_set; mod sender_info; +mod sequencing; mod signal_info; mod signed_direct_node_info; mod signed_node_info; @@ -39,6 +40,7 @@ pub use peer_info::*; pub use private_safety_route::*; pub use protocol_type_set::*; pub use sender_info::*; +pub use sequencing::*; pub use signal_info::*; pub use signed_direct_node_info::*; pub use signed_node_info::*; diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_route.rs b/veilid-core/src/rpc_processor/coders/operations/operation_route.rs index 29e56035..88d65fe9 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_route.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_route.rs @@ -3,15 +3,17 @@ use super::*; #[derive(Debug, Clone)] pub struct RoutedOperation { pub version: u8, + pub sequencing: Sequencing, pub signatures: Vec, pub nonce: Nonce, pub data: Vec, } impl RoutedOperation { - pub fn new(version: u8, nonce: Nonce, data: Vec) -> Self { + pub fn new(version: u8, sequencing: Sequencing, nonce: Nonce, data: Vec) -> Self { Self { version, + sequencing, signatures: Vec::new(), nonce, data, @@ -34,12 +36,14 @@ impl RoutedOperation { } let version = reader.get_version(); + let sequencing = decode_sequencing(reader.get_sequencing().map_err(RPCError::protocol)?); let n_reader = reader.get_nonce().map_err(RPCError::protocol)?; let nonce = decode_nonce(&n_reader); let data = reader.get_data().map_err(RPCError::protocol)?.to_vec(); Ok(RoutedOperation { version, + sequencing, signatures, nonce, data, @@ -51,6 +55,9 @@ impl RoutedOperation { builder: &mut veilid_capnp::routed_operation::Builder, ) -> Result<(), RPCError> { builder.reborrow().set_version(self.version); + builder + .reborrow() + .set_sequencing(encode_sequencing(self.sequencing)); let mut sigs_builder = builder.reborrow().init_signatures( self.signatures .len() diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 6ae3d54d..eebcd017 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -520,7 +520,12 @@ impl RPCProcessor { // Make the routed operation // xxx: replace MAX_CRYPTO_VERSION with the version from the factory - let operation = RoutedOperation::new(MAX_CRYPTO_VERSION, nonce, enc_msg_data); + let operation = RoutedOperation::new( + MAX_CRYPTO_VERSION, + safety_selection.get_sequencing(), + nonce, + enc_msg_data, + ); // Prepare route operation let sr_hop_count = compiled_route.safety_route.hop_count; diff --git a/veilid-core/src/rpc_processor/operation_waiter.rs b/veilid-core/src/rpc_processor/operation_waiter.rs index 95044b06..32206ded 100644 --- a/veilid-core/src/rpc_processor/operation_waiter.rs +++ b/veilid-core/src/rpc_processor/operation_waiter.rs @@ -121,6 +121,7 @@ where Ok(res .on_timeout(|| { log_rpc!(debug "op wait timed out: {}", handle.op_id); + log_rpc!(debug "backtrace: {}", debug_backtrace()); self.cancel_op_waiter(handle.op_id); }) .map(|res| { diff --git a/veilid-core/src/rpc_processor/rpc_route.rs b/veilid-core/src/rpc_processor/rpc_route.rs index 78a12696..ad5c8164 100644 --- a/veilid-core/src/rpc_processor/rpc_route.rs +++ b/veilid-core/src/rpc_processor/rpc_route.rs @@ -26,7 +26,7 @@ impl RPCProcessor { } // Get next hop node ref - let next_hop_nr = match route_hop.node { + let mut next_hop_nr = match route_hop.node { RouteNode::NodeId(id) => { // let Some(nr) = self.routing_table.lookup_node_ref(id.key) else { @@ -53,6 +53,9 @@ impl RPCProcessor { } }; + // Apply sequencing preference + next_hop_nr.set_sequencing(routed_operation.sequencing); + // Pass along the route let next_hop_route = RPCOperationRoute { safety_route: SafetyRoute { @@ -85,7 +88,7 @@ impl RPCProcessor { } // Get next hop node ref - let next_hop_nr = match &next_route_node { + let mut next_hop_nr = match &next_route_node { RouteNode::NodeId(id) => { // self.routing_table @@ -110,6 +113,9 @@ impl RPCProcessor { } }?; + // Apply sequencing preference + next_hop_nr.set_sequencing(routed_operation.sequencing); + // Pass along the route let next_hop_route = RPCOperationRoute { safety_route: SafetyRoute { @@ -134,20 +140,10 @@ impl RPCProcessor { #[instrument(level = "trace", skip_all, err)] fn process_safety_routed_operation( &self, - detail: RPCMessageHeaderDetailDirect, + _detail: RPCMessageHeaderDetailDirect, routed_operation: RoutedOperation, remote_sr_pubkey: DHTKey, ) -> Result, RPCError> { - // Get sequencing preference - let sequencing = if detail - .connection_descriptor - .protocol_type() - .is_connection_oriented() - { - Sequencing::EnsureOrdered - } else { - Sequencing::NoPreference - }; // Now that things are valid, decrypt the routed operation with DEC(nonce, DH(the SR's public key, the PR's (or node's) secret) // xxx: punish nodes that send messages that fail to decrypt eventually? How to do this for safety routes? @@ -169,7 +165,7 @@ impl RPCProcessor { }; // Pass message to RPC system - self.enqueue_safety_routed_message(remote_sr_pubkey, sequencing, body) + self.enqueue_safety_routed_message(remote_sr_pubkey, routed_operation.sequencing, body) .map_err(RPCError::internal)?; Ok(NetworkResult::value(())) @@ -188,18 +184,31 @@ impl RPCProcessor { let sender_id = detail.envelope.get_sender_id(); // Look up the private route and ensure it's one in our spec store + // Ensure the route is validated, and construct a return safetyspec that matches the inbound preferences let rss = self.routing_table.route_spec_store(); let Some((secret_key, safety_spec)) = rss - .validate_signatures( + .with_signature_validated_route( &pr_pubkey, &routed_operation.signatures, &routed_operation.data, sender_id, + |rsd| { + ( + rsd.get_secret_key(), + SafetySpec { + preferred_route: Some(pr_pubkey), + hop_count: rsd.hop_count(), + stability: rsd.get_stability(), + sequencing: routed_operation.sequencing, + }, + ) + } ) else { return Ok(NetworkResult::invalid_message("signatures did not validate for private route")); }; + // Now that things are valid, decrypt the routed operation with DEC(nonce, DH(the SR's public key, the PR's (or node's) secret) // xxx: punish nodes that send messages that fail to decrypt eventually. How to do this for private routes? let dh_secret = self @@ -231,7 +240,7 @@ impl RPCProcessor { remote_sr_pubkey: DHTKey, pr_pubkey: DHTKey, ) -> Result, RPCError> { - + // If the private route public key is our node id, then this was sent via safety route to our node directly // so there will be no signatures to validate if pr_pubkey == self.routing_table.node_id() { diff --git a/veilid-core/src/veilid_api/types.rs b/veilid-core/src/veilid_api/types.rs index 922b77cf..493f5939 100644 --- a/veilid-core/src/veilid_api/types.rs +++ b/veilid-core/src/veilid_api/types.rs @@ -462,6 +462,15 @@ pub enum SafetySelection { Safe(SafetySpec), } +impl SafetySelection { + pub fn get_sequencing(&self) -> Sequencing { + match self { + SafetySelection::Unsafe(seq) => *seq, + SafetySelection::Safe(ss) => ss.sequencing, + } + } +} + /// Options for safety routes (sender privacy) #[derive( Copy, diff --git a/veilid-tools/Cargo.toml b/veilid-tools/Cargo.toml index daea35b8..e7f976b6 100644 --- a/veilid-tools/Cargo.toml +++ b/veilid-tools/Cargo.toml @@ -33,6 +33,7 @@ owo-colors = "^3" stop-token = { version = "^0", default-features = false } rand = "^0.7" rust-fsm = "^0" +backtrace = "^0" # Dependencies for native builds only # Linux, Windows, Mac, iOS, Android diff --git a/veilid-tools/src/tools.rs b/veilid-tools/src/tools.rs index 1d388121..7ef635d1 100644 --- a/veilid-tools/src/tools.rs +++ b/veilid-tools/src/tools.rs @@ -298,3 +298,8 @@ pub unsafe fn aligned_8_u8_vec_uninit(n_bytes: usize) -> Vec { cap_units * mem::size_of::(), ) } + +pub fn debug_backtrace() -> String { + let bt = backtrace::Backtrace::new(); + format!("{:?}", bt) +}