adjust routes
This commit is contained in:
		| @@ -187,8 +187,8 @@ pub struct RouteSpecDetail { | ||||
|     directions: DirectionSet, | ||||
|     /// Stability preference (prefer reliable nodes over faster) | ||||
|     stability: Stability, | ||||
|     /// Sequencing preference (connection oriented protocols vs datagram) | ||||
|     sequencing: Sequencing, | ||||
|     /// Sequencing capability (connection oriented protocols vs datagram) | ||||
|     can_do_sequenced: bool, | ||||
|     /// Stats | ||||
|     stats: RouteStats, | ||||
| } | ||||
| @@ -206,6 +206,21 @@ impl RouteSpecDetail { | ||||
|     pub fn hop_count(&self) -> usize { | ||||
|         self.hops.len() | ||||
|     } | ||||
|     pub fn get_secret_key(&self) -> DHTKeySecret { | ||||
|         self.secret_key | ||||
|     } | ||||
|     pub fn get_stability(&self) -> Stability { | ||||
|         self.stability | ||||
|     } | ||||
|     pub fn is_sequencing_match(&self, sequencing: Sequencing) -> bool { | ||||
|         match sequencing { | ||||
|             Sequencing::NoPreference => true, | ||||
|             Sequencing::PreferOrdered => true, | ||||
|             Sequencing::EnsureOrdered => { | ||||
|                 self.can_do_sequenced | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// The core representation of the RouteSpecStore that can be serialized | ||||
| @@ -336,8 +351,8 @@ fn _get_route_permutation_count(hop_count: usize) -> usize { | ||||
|     // hop_count = 4 -> 3! -> 6 | ||||
|     (3..hop_count).into_iter().fold(2usize, |acc, x| acc * x) | ||||
| } | ||||
|  | ||||
| type PermFunc<'t> = Box<dyn Fn(&[usize]) -> Option<(Vec<usize>, Vec<u8>)> + Send + 't>; | ||||
| type PermReturnType = (Vec<usize>, Vec<u8>, bool); | ||||
| type PermFunc<'t> = Box<dyn Fn(&[usize]) -> Option<PermReturnType> + Send + 't>; | ||||
|  | ||||
| /// get the route permutation at particular 'perm' index, starting at the 'start' index | ||||
| /// for a set of 'hop_count' nodes. the first node is always fixed, and the maximum | ||||
| @@ -347,7 +362,7 @@ fn with_route_permutations( | ||||
|     hop_count: usize, | ||||
|     start: usize, | ||||
|     f: &PermFunc, | ||||
| ) -> Option<(Vec<usize>, Vec<u8>)> { | ||||
| ) -> Option<PermReturnType> { | ||||
|     if hop_count == 0 { | ||||
|         unreachable!(); | ||||
|     } | ||||
| @@ -366,7 +381,7 @@ fn with_route_permutations( | ||||
|         permutation: &mut [usize], | ||||
|         size: usize, | ||||
|         f: &PermFunc, | ||||
|     ) -> Option<(Vec<usize>, Vec<u8>)> { | ||||
|     ) -> Option<PermReturnType> { | ||||
|         if size == 1 { | ||||
|             return f(&permutation); | ||||
|         } | ||||
| @@ -759,6 +774,24 @@ impl RouteSpecStore { | ||||
|                 return cmp_used; | ||||
|             } | ||||
|  | ||||
|             // apply sequencing preference | ||||
|             // ensureordered will be taken care of by filter | ||||
|             // and nopreference doesn't care | ||||
|             if matches!(sequencing, Sequencing::PreferOrdered) { | ||||
|                 let cmp_seq = v1.1.as_ref().unwrap().with(rti, |rti, e1| { | ||||
|                     v2.1.as_ref() | ||||
|                         .unwrap() | ||||
|                         .with(rti, |_rti, e2| { | ||||
|                             let e1_can_do_ordered = e1.signed_node_info(RoutingDomain::PublicInternet).map(|sni| sni.has_sequencing_matched_dial_info(sequencing)).unwrap_or(false); | ||||
|                             let e2_can_do_ordered = e2.signed_node_info(RoutingDomain::PublicInternet).map(|sni| sni.has_sequencing_matched_dial_info(sequencing)).unwrap_or(false); | ||||
|                             e2_can_do_ordered.cmp(&e1_can_do_ordered) | ||||
|                         }) | ||||
|                 }); | ||||
|                 if !matches!(cmp_seq, Ordering::Equal) { | ||||
|                     return cmp_seq; | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             // always prioritize reliable nodes, but sort by oldest or fastest | ||||
|             let cmpout = v1.1.as_ref().unwrap().with(rti, |rti, e1| { | ||||
|                 v2.1.as_ref() | ||||
| @@ -825,6 +858,7 @@ impl RouteSpecStore { | ||||
|             } | ||||
|  | ||||
|             // Ensure this route is viable by checking that each node can contact the next one | ||||
|             let mut can_do_sequenced = true; | ||||
|             if directions.contains(Direction::Outbound) { | ||||
|                 let mut previous_node = &our_peer_info; | ||||
|                 let mut reachable = true; | ||||
| @@ -841,6 +875,21 @@ impl RouteSpecStore { | ||||
|                         reachable = false; | ||||
|                         break; | ||||
|                     } | ||||
|  | ||||
|                     // Check if we can do sequenced specifically | ||||
|                     if can_do_sequenced { | ||||
|                         let cm = rti.get_contact_method( | ||||
|                             RoutingDomain::PublicInternet, | ||||
|                             previous_node, | ||||
|                             current_node, | ||||
|                             DialInfoFilter::all(), | ||||
|                             Sequencing::EnsureOrdered, | ||||
|                         ); | ||||
|                         if matches!(cm, ContactMethod::Unreachable) { | ||||
|                             can_do_sequenced = false; | ||||
|                         } | ||||
|                     } | ||||
|  | ||||
|                     previous_node = current_node; | ||||
|                 } | ||||
|                 if !reachable { | ||||
| @@ -863,6 +912,20 @@ impl RouteSpecStore { | ||||
|                         reachable = false; | ||||
|                         break; | ||||
|                     } | ||||
|  | ||||
|                     // Check if we can do sequenced specifically | ||||
|                     if can_do_sequenced { | ||||
|                         let cm = rti.get_contact_method( | ||||
|                             RoutingDomain::PublicInternet, | ||||
|                             next_node, | ||||
|                             current_node, | ||||
|                             DialInfoFilter::all(), | ||||
|                             Sequencing::EnsureOrdered, | ||||
|                         ); | ||||
|                         if matches!(cm, ContactMethod::Unreachable) { | ||||
|                             can_do_sequenced = false; | ||||
|                         } | ||||
|                     } | ||||
|                     next_node = current_node; | ||||
|                 } | ||||
|                 if !reachable { | ||||
| @@ -871,17 +934,19 @@ impl RouteSpecStore { | ||||
|             } | ||||
|             // Keep this route | ||||
|             let route_nodes = permutation.to_vec(); | ||||
|             Some((route_nodes, cache_key)) | ||||
|             Some((route_nodes, cache_key, can_do_sequenced)) | ||||
|         }) as PermFunc; | ||||
|  | ||||
|         let mut route_nodes: Vec<usize> = Vec::new(); | ||||
|         let mut cache_key: Vec<u8> = Vec::new(); | ||||
|         let mut can_do_sequenced: bool = true; | ||||
|  | ||||
|         for start in 0..(nodes.len() - hop_count) { | ||||
|             // Try the permutations available starting with 'start' | ||||
|             if let Some((rn, ck)) = with_route_permutations(hop_count, start, &perm_func) { | ||||
|             if let Some((rn, ck, cds)) = with_route_permutations(hop_count, start, &perm_func) { | ||||
|                 route_nodes = rn; | ||||
|                 cache_key = ck; | ||||
|                 can_do_sequenced = cds; | ||||
|                 break; | ||||
|             } | ||||
|         } | ||||
| @@ -902,6 +967,8 @@ impl RouteSpecStore { | ||||
|  | ||||
|         let (public_key, secret_key) = generate_secret(); | ||||
|  | ||||
|          | ||||
|  | ||||
|         let rsd = RouteSpecDetail { | ||||
|             secret_key, | ||||
|             hops, | ||||
| @@ -909,7 +976,7 @@ impl RouteSpecStore { | ||||
|             published: false, | ||||
|             directions, | ||||
|             stability, | ||||
|             sequencing, | ||||
|             can_do_sequenced, | ||||
|             stats: RouteStats::new(cur_ts), | ||||
|         }; | ||||
|  | ||||
| @@ -924,14 +991,18 @@ impl RouteSpecStore { | ||||
|         Ok(Some(public_key)) | ||||
|     } | ||||
|  | ||||
|     #[instrument(level = "trace", skip(self, data), ret)] | ||||
|     pub fn validate_signatures( | ||||
|     #[instrument(level = "trace", skip(self, data, callback), ret)] | ||||
|     pub fn with_signature_validated_route<F,R>( | ||||
|         &self, | ||||
|         public_key: &DHTKey, | ||||
|         signatures: &[DHTSignature], | ||||
|         data: &[u8], | ||||
|         last_hop_id: DHTKey, | ||||
|     ) -> Option<(DHTKeySecret, SafetySpec)> { | ||||
|         callback: F, | ||||
|     ) -> Option<R>  | ||||
|     where F: FnOnce(&RouteSpecDetail) -> R,  | ||||
|     R: fmt::Debug, | ||||
|     { | ||||
|         let inner = &*self.inner.lock(); | ||||
|         let Some(rsd) = Self::detail(inner, &public_key) else { | ||||
|             log_rpc!(debug "route does not exist: {:?}", public_key); | ||||
| @@ -962,16 +1033,8 @@ impl RouteSpecStore { | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         // We got the correct signatures, return a key and response safety spec | ||||
|         Some(( | ||||
|             rsd.secret_key, | ||||
|             SafetySpec { | ||||
|                 preferred_route: Some(*public_key), | ||||
|                 hop_count: rsd.hops.len(), | ||||
|                 stability: rsd.stability, | ||||
|                 sequencing: rsd.sequencing, | ||||
|             }, | ||||
|         )) | ||||
|         // We got the correct signatures, return a key and response safety spec      | ||||
|         Some(callback(rsd)) | ||||
|     } | ||||
|  | ||||
|     #[instrument(level = "trace", skip(self), ret, err)] | ||||
| @@ -982,9 +1045,13 @@ impl RouteSpecStore { | ||||
|  | ||||
|             let inner = &mut *self.inner.lock(); | ||||
|             let rsd = Self::detail(inner, &key).ok_or_else(|| eyre!("route does not exist"))?; | ||||
|  | ||||
|             // Match the private route's hop length for safety route length | ||||
|             let hop_count = rsd.hops.len(); | ||||
|             let stability = rsd.stability; | ||||
|             let sequencing = rsd.sequencing; | ||||
|             // Always test routes with safety routes that are more likely to succeed | ||||
|             let stability = Stability::Reliable; | ||||
|             // Routes can test with whatever sequencing they were allocated with | ||||
|             let sequencing = Sequencing::NoPreference; | ||||
|  | ||||
|             let safety_spec = SafetySpec { | ||||
|                 preferred_route: Some(key.clone()), | ||||
| @@ -1157,7 +1224,7 @@ impl RouteSpecStore { | ||||
|         // but definitely prefer routes that have been recently tested | ||||
|         for detail in &inner.content.details { | ||||
|             if detail.1.stability >= stability | ||||
|                 && detail.1.sequencing >= sequencing | ||||
|                 && detail.1.is_sequencing_match(sequencing) | ||||
|                 && detail.1.hops.len() >= min_hop_count | ||||
|                 && detail.1.hops.len() <= max_hop_count | ||||
|                 && detail.1.directions.is_superset(directions) | ||||
|   | ||||
| @@ -14,6 +14,7 @@ mod peer_info; | ||||
| mod private_safety_route; | ||||
| mod protocol_type_set; | ||||
| mod sender_info; | ||||
| mod sequencing; | ||||
| mod signal_info; | ||||
| mod signed_direct_node_info; | ||||
| mod signed_node_info; | ||||
| @@ -39,6 +40,7 @@ pub use peer_info::*; | ||||
| pub use private_safety_route::*; | ||||
| pub use protocol_type_set::*; | ||||
| pub use sender_info::*; | ||||
| pub use sequencing::*; | ||||
| pub use signal_info::*; | ||||
| pub use signed_direct_node_info::*; | ||||
| pub use signed_node_info::*; | ||||
|   | ||||
| @@ -3,15 +3,17 @@ use super::*; | ||||
| #[derive(Debug, Clone)] | ||||
| pub struct RoutedOperation { | ||||
|     pub version: u8, | ||||
|     pub sequencing: Sequencing, | ||||
|     pub signatures: Vec<DHTSignature>, | ||||
|     pub nonce: Nonce, | ||||
|     pub data: Vec<u8>, | ||||
| } | ||||
|  | ||||
| impl RoutedOperation { | ||||
|     pub fn new(version: u8, nonce: Nonce, data: Vec<u8>) -> Self { | ||||
|     pub fn new(version: u8, sequencing: Sequencing, nonce: Nonce, data: Vec<u8>) -> Self { | ||||
|         Self { | ||||
|             version, | ||||
|             sequencing, | ||||
|             signatures: Vec::new(), | ||||
|             nonce, | ||||
|             data, | ||||
| @@ -34,12 +36,14 @@ impl RoutedOperation { | ||||
|         } | ||||
|  | ||||
|         let version = reader.get_version(); | ||||
|         let sequencing = decode_sequencing(reader.get_sequencing().map_err(RPCError::protocol)?); | ||||
|         let n_reader = reader.get_nonce().map_err(RPCError::protocol)?; | ||||
|         let nonce = decode_nonce(&n_reader); | ||||
|         let data = reader.get_data().map_err(RPCError::protocol)?.to_vec(); | ||||
|  | ||||
|         Ok(RoutedOperation { | ||||
|             version, | ||||
|             sequencing, | ||||
|             signatures, | ||||
|             nonce, | ||||
|             data, | ||||
| @@ -51,6 +55,9 @@ impl RoutedOperation { | ||||
|         builder: &mut veilid_capnp::routed_operation::Builder, | ||||
|     ) -> Result<(), RPCError> { | ||||
|         builder.reborrow().set_version(self.version); | ||||
|         builder | ||||
|             .reborrow() | ||||
|             .set_sequencing(encode_sequencing(self.sequencing)); | ||||
|         let mut sigs_builder = builder.reborrow().init_signatures( | ||||
|             self.signatures | ||||
|                 .len() | ||||
|   | ||||
| @@ -520,7 +520,12 @@ impl RPCProcessor { | ||||
|  | ||||
|         // Make the routed operation | ||||
|         // xxx: replace MAX_CRYPTO_VERSION with the version from the factory | ||||
|         let operation = RoutedOperation::new(MAX_CRYPTO_VERSION, nonce, enc_msg_data); | ||||
|         let operation = RoutedOperation::new( | ||||
|             MAX_CRYPTO_VERSION, | ||||
|             safety_selection.get_sequencing(), | ||||
|             nonce, | ||||
|             enc_msg_data, | ||||
|         ); | ||||
|  | ||||
|         // Prepare route operation | ||||
|         let sr_hop_count = compiled_route.safety_route.hop_count; | ||||
|   | ||||
| @@ -121,6 +121,7 @@ where | ||||
|         Ok(res | ||||
|             .on_timeout(|| { | ||||
|                 log_rpc!(debug "op wait timed out: {}", handle.op_id); | ||||
|                 log_rpc!(debug "backtrace: {}", debug_backtrace()); | ||||
|                 self.cancel_op_waiter(handle.op_id); | ||||
|             }) | ||||
|             .map(|res| { | ||||
|   | ||||
| @@ -26,7 +26,7 @@ impl RPCProcessor { | ||||
|         } | ||||
|  | ||||
|         // Get next hop node ref | ||||
|         let next_hop_nr = match route_hop.node { | ||||
|         let mut next_hop_nr = match route_hop.node { | ||||
|             RouteNode::NodeId(id) => { | ||||
|                 // | ||||
|                 let Some(nr) = self.routing_table.lookup_node_ref(id.key) else { | ||||
| @@ -53,6 +53,9 @@ impl RPCProcessor { | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         // Apply sequencing preference | ||||
|         next_hop_nr.set_sequencing(routed_operation.sequencing); | ||||
|  | ||||
|         // Pass along the route | ||||
|         let next_hop_route = RPCOperationRoute { | ||||
|             safety_route: SafetyRoute { | ||||
| @@ -85,7 +88,7 @@ impl RPCProcessor { | ||||
|         } | ||||
|  | ||||
|         // Get next hop node ref | ||||
|         let next_hop_nr = match &next_route_node { | ||||
|         let mut next_hop_nr = match &next_route_node { | ||||
|             RouteNode::NodeId(id) => { | ||||
|                 // | ||||
|                 self.routing_table | ||||
| @@ -110,6 +113,9 @@ impl RPCProcessor { | ||||
|             } | ||||
|         }?; | ||||
|  | ||||
|         // Apply sequencing preference | ||||
|         next_hop_nr.set_sequencing(routed_operation.sequencing); | ||||
|  | ||||
|         // Pass along the route | ||||
|         let next_hop_route = RPCOperationRoute { | ||||
|             safety_route: SafetyRoute { | ||||
| @@ -134,20 +140,10 @@ impl RPCProcessor { | ||||
|     #[instrument(level = "trace", skip_all, err)] | ||||
|     fn process_safety_routed_operation( | ||||
|         &self, | ||||
|         detail: RPCMessageHeaderDetailDirect, | ||||
|         _detail: RPCMessageHeaderDetailDirect, | ||||
|         routed_operation: RoutedOperation, | ||||
|         remote_sr_pubkey: DHTKey, | ||||
|     ) -> Result<NetworkResult<()>, RPCError> { | ||||
|         // Get sequencing preference | ||||
|         let sequencing = if detail | ||||
|             .connection_descriptor | ||||
|             .protocol_type() | ||||
|             .is_connection_oriented() | ||||
|         { | ||||
|             Sequencing::EnsureOrdered | ||||
|         } else { | ||||
|             Sequencing::NoPreference | ||||
|         }; | ||||
|  | ||||
|         // Now that things are valid, decrypt the routed operation with DEC(nonce, DH(the SR's public key, the PR's (or node's) secret) | ||||
|         // xxx: punish nodes that send messages that fail to decrypt eventually? How to do this for safety routes? | ||||
| @@ -169,7 +165,7 @@ impl RPCProcessor { | ||||
|         }; | ||||
|          | ||||
|         // Pass message to RPC system | ||||
|         self.enqueue_safety_routed_message(remote_sr_pubkey, sequencing, body) | ||||
|         self.enqueue_safety_routed_message(remote_sr_pubkey, routed_operation.sequencing, body) | ||||
|             .map_err(RPCError::internal)?; | ||||
|  | ||||
|         Ok(NetworkResult::value(())) | ||||
| @@ -188,18 +184,31 @@ impl RPCProcessor { | ||||
|         let sender_id = detail.envelope.get_sender_id(); | ||||
|  | ||||
|         // Look up the private route and ensure it's one in our spec store | ||||
|         // Ensure the route is validated, and construct a return safetyspec that matches the inbound preferences | ||||
|         let rss = self.routing_table.route_spec_store(); | ||||
|         let Some((secret_key, safety_spec)) = rss | ||||
|             .validate_signatures( | ||||
|             .with_signature_validated_route( | ||||
|                 &pr_pubkey, | ||||
|                 &routed_operation.signatures, | ||||
|                 &routed_operation.data, | ||||
|                 sender_id, | ||||
|                 |rsd| { | ||||
|                     ( | ||||
|                         rsd.get_secret_key(), | ||||
|                         SafetySpec { | ||||
|                             preferred_route: Some(pr_pubkey), | ||||
|                             hop_count: rsd.hop_count(), | ||||
|                             stability: rsd.get_stability(), | ||||
|                             sequencing: routed_operation.sequencing, | ||||
|                         }, | ||||
|                     ) | ||||
|                 } | ||||
|             ) | ||||
|             else { | ||||
|                 return Ok(NetworkResult::invalid_message("signatures did not validate for private route")); | ||||
|             }; | ||||
|  | ||||
|  | ||||
|         // Now that things are valid, decrypt the routed operation with DEC(nonce, DH(the SR's public key, the PR's (or node's) secret) | ||||
|         // xxx: punish nodes that send messages that fail to decrypt eventually. How to do this for private routes? | ||||
|         let dh_secret = self | ||||
| @@ -231,7 +240,7 @@ impl RPCProcessor { | ||||
|         remote_sr_pubkey: DHTKey, | ||||
|         pr_pubkey: DHTKey, | ||||
|     ) -> Result<NetworkResult<()>, RPCError> { | ||||
|         | ||||
|  | ||||
|         // If the private route public key is our node id, then this was sent via safety route to our node directly | ||||
|         // so there will be no signatures to validate | ||||
|         if pr_pubkey == self.routing_table.node_id() { | ||||
|   | ||||
| @@ -462,6 +462,15 @@ pub enum SafetySelection { | ||||
|     Safe(SafetySpec), | ||||
| } | ||||
|  | ||||
| impl SafetySelection { | ||||
|     pub fn get_sequencing(&self) -> Sequencing { | ||||
|         match self { | ||||
|             SafetySelection::Unsafe(seq) => *seq, | ||||
|             SafetySelection::Safe(ss) => ss.sequencing, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Options for safety routes (sender privacy) | ||||
| #[derive( | ||||
|     Copy, | ||||
|   | ||||
		Reference in New Issue
	
	Block a user