private route loopbacks

This commit is contained in:
John Smith 2022-11-24 16:23:33 -05:00
parent 0b2ecd53c7
commit 4d573a966f
4 changed files with 175 additions and 138 deletions

View File

@ -434,15 +434,6 @@ impl DiscoveryContext {
return Ok(true);
}
// XXX: is this necessary?
// Redo our external_1 dial info detection because a failed port mapping attempt
// may cause it to become invalid
// Get our external address from some fast node, call it node 1
// if !self.protocol_get_external_address_1().await {
// // If we couldn't get an external address, then we should just try the whole network class detection again later
// return Ok(false);
// }
// Get the external dial info for our use here
let (node_1, external_1_dial_info, external_1_address, protocol_type, address_type) = {
let inner = self.inner.lock();

View File

@ -864,7 +864,20 @@ impl RouteSpecStore {
safety_selection,
}
} else {
let target = rsd.hop_node_refs[rsd.hops.len() - 2].clone();
// let target = rsd.hop_node_refs[rsd.hops.len() - 2].clone();
// let safety_spec = SafetySpec {
// preferred_route: Some(key.clone()),
// hop_count,
// stability,
// sequencing,
// };
// let safety_selection = SafetySelection::Safe(safety_spec);
// Destination::Direct {
// target,
// safety_selection,
// }
let safety_spec = SafetySpec {
preferred_route: Some(key.clone()),
hop_count,
@ -873,38 +886,22 @@ impl RouteSpecStore {
};
let safety_selection = SafetySelection::Safe(safety_spec);
Destination::Direct {
target,
Destination::PrivateRoute {
private_route,
safety_selection,
}
}
};
// Test with ping to end
let cur_ts = intf::get_timestamp();
let res = match rpc_processor.rpc_call_status(dest).await? {
// Test with double-round trip ping to self
let _res = match rpc_processor.rpc_call_status(dest).await? {
NetworkResult::Value(v) => v,
_ => {
// // Do route stats for single hop route test because it
// // won't get stats for the route since it's done Direct
// if matches!(safety_selection, SafetySelection::Unsafe(_)) {
// self.with_route_stats(cur_ts, &key, |s| s.record_question_lost());
// }
// Did not error, but did not come back, just return false
return Ok(false);
}
};
// // Do route stats for single hop route test because it
// // won't get stats for the route since it's done Direct
// if matches!(safety_selection, SafetySelection::Unsafe(_)) {
// self.with_route_stats(cur_ts, &key, |s| {
// s.record_tested(cur_ts);
// s.record_latency(res.latency);
// });
// }
Ok(true)
}
@ -1065,6 +1062,11 @@ impl RouteSpecStore {
bail!("compiled private route should have first hop");
};
// If the safety route requested is also the private route, this is a loopback test, just accept it
let sr_pubkey = if safety_spec.preferred_route == Some(private_route.public_key) {
// Private route is also safety route during loopback test
private_route.public_key
} else {
// Get the safety route to use from the spec
let avoid_node_id = match &pr_first_hop.node {
RouteNode::NodeId(n) => n.key,
@ -1074,6 +1076,8 @@ impl RouteSpecStore {
// No safety route could be found for this spec
return Ok(None);
};
sr_pubkey
};
let safety_rsd = Self::detail_mut(inner, &sr_pubkey).unwrap();
// xxx implement caching here!

View File

@ -228,7 +228,16 @@ impl RPCProcessor {
)))
}
SafetySelection::Safe(safety_spec) => {
// Sent directly but with a safety route, respond to private route
// Sent to a private route via a safety route, respond to private route
// Check for loopback test
let pr_key = if safety_spec.preferred_route
== Some(private_route.public_key)
{
// Private route is also safety route during loopback test
private_route.public_key
} else {
// Get the privat route to respond to that matches the safety route spec we sent the request with
let avoid_node_id = match &pr_first_hop.node {
RouteNode::NodeId(n) => n.key,
RouteNode::PeerInfo(p) => p.node_id.key,
@ -239,6 +248,8 @@ impl RPCProcessor {
.map_err(RPCError::internal)? else {
return Ok(NetworkResult::no_connection_other("no private route for response at this time"));
};
pr_key
};
// Get the assembled route for response
let private_route = rss

View File

@ -4,16 +4,17 @@ impl RPCProcessor {
#[instrument(level = "trace", skip_all, err)]
async fn process_route_safety_route_hop(
&self,
route: RPCOperationRoute,
routed_operation: RoutedOperation,
route_hop: RouteHop,
safety_route: SafetyRoute,
) -> Result<NetworkResult<()>, RPCError> {
// Make sure hop count makes sense
if route.safety_route.hop_count as usize > self.unlocked_inner.max_route_hop_count {
if safety_route.hop_count as usize > self.unlocked_inner.max_route_hop_count {
return Ok(NetworkResult::invalid_message(
"Safety route hop count too high to process",
));
}
if route.safety_route.hop_count == 0 {
if safety_route.hop_count == 0 {
return Ok(NetworkResult::invalid_message(
"Safety route hop count should not be zero if there are more hops",
));
@ -55,11 +56,11 @@ impl RPCProcessor {
// Pass along the route
let next_hop_route = RPCOperationRoute {
safety_route: SafetyRoute {
public_key: route.safety_route.public_key,
hop_count: route.safety_route.hop_count - 1,
public_key: safety_route.public_key,
hop_count: safety_route.hop_count - 1,
hops: SafetyRouteHops::Data(route_hop.next_hop.unwrap()),
},
operation: route.operation,
operation: routed_operation,
};
let next_hop_route_stmt = RPCStatement::new(RPCStatementDetail::Route(next_hop_route));
@ -135,7 +136,7 @@ impl RPCProcessor {
&self,
detail: RPCMessageHeaderDetailDirect,
routed_operation: RoutedOperation,
remote_safety_route: &SafetyRoute,
remote_sr_pubkey: DHTKey,
) -> Result<NetworkResult<()>, RPCError> {
// Get sequencing preference
let sequencing = if detail
@ -153,7 +154,7 @@ impl RPCProcessor {
let node_id_secret = self.routing_table.node_id_secret();
let dh_secret = self
.crypto
.cached_dh(&remote_safety_route.public_key, &node_id_secret)
.cached_dh(&remote_sr_pubkey, &node_id_secret)
.map_err(RPCError::protocol)?;
let body = match Crypto::decrypt_aead(
&routed_operation.data,
@ -168,7 +169,7 @@ impl RPCProcessor {
};
// Pass message to RPC system
self.enqueue_safety_routed_message(remote_safety_route.public_key, sequencing, body)
self.enqueue_safety_routed_message(remote_sr_pubkey, sequencing, body)
.map_err(RPCError::internal)?;
Ok(NetworkResult::value(()))
@ -180,8 +181,8 @@ impl RPCProcessor {
&self,
detail: RPCMessageHeaderDetailDirect,
routed_operation: RoutedOperation,
remote_safety_route: &SafetyRoute,
private_route: &PrivateRoute,
remote_sr_pubkey: DHTKey,
pr_pubkey: DHTKey,
) -> Result<NetworkResult<()>, RPCError> {
// Get sender id
let sender_id = detail.envelope.get_sender_id();
@ -190,7 +191,7 @@ impl RPCProcessor {
let rss = self.routing_table.route_spec_store();
let Some((secret_key, safety_spec)) = rss
.validate_signatures(
&private_route.public_key,
&pr_pubkey,
&routed_operation.signatures,
&routed_operation.data,
sender_id,
@ -204,7 +205,7 @@ impl RPCProcessor {
// xxx: punish nodes that send messages that fail to decrypt eventually. How to do this for private routes?
let dh_secret = self
.crypto
.cached_dh(&remote_safety_route.public_key, &secret_key)
.cached_dh(&remote_sr_pubkey, &secret_key)
.map_err(RPCError::protocol)?;
let body = Crypto::decrypt_aead(
&routed_operation.data,
@ -217,7 +218,7 @@ impl RPCProcessor {
))?;
// Pass message to RPC system
self.enqueue_private_routed_message(remote_safety_route.public_key, private_route.public_key, safety_spec, body)
self.enqueue_private_routed_message(remote_sr_pubkey, pr_pubkey, safety_spec, body)
.map_err(RPCError::internal)?;
Ok(NetworkResult::value(()))
@ -228,65 +229,123 @@ impl RPCProcessor {
&self,
detail: RPCMessageHeaderDetailDirect,
routed_operation: RoutedOperation,
safety_route: &SafetyRoute,
private_route: &PrivateRoute,
remote_sr_pubkey: DHTKey,
pr_pubkey: DHTKey,
) -> Result<NetworkResult<()>, RPCError> {
// Make sure hop count makes sense
if safety_route.hop_count != 0 {
return Ok(NetworkResult::invalid_message(
"Safety hop count should be zero if switched to private route",
));
}
if private_route.hop_count != 0 {
return Ok(NetworkResult::invalid_message(
"Private route hop count should be zero if we are at the end",
));
}
// If the private route public key is our node id, then this was sent via safety route to our node directly
// so there will be no signatures to validate
if private_route.public_key == self.routing_table.node_id() {
if pr_pubkey == self.routing_table.node_id() {
// The private route was a stub
self.process_safety_routed_operation(detail, routed_operation, safety_route)
self.process_safety_routed_operation(detail, routed_operation, remote_sr_pubkey)
} else {
// Both safety and private routes used, should reply with a safety route
self.process_private_routed_operation(
detail,
routed_operation,
safety_route,
private_route,
remote_sr_pubkey,
pr_pubkey,
)
}
}
#[instrument(level = "trace", skip_all, err)]
pub(crate) async fn process_private_route_first_hop(
&self,
operation: RoutedOperation,
mut routed_operation: RoutedOperation,
sr_pubkey: DHTKey,
private_route: &PrivateRoute,
mut private_route: PrivateRoute,
) -> Result<NetworkResult<()>, RPCError> {
let PrivateRouteHops::FirstHop(pr_first_hop) = &private_route.hops else {
let Some(pr_first_hop) = private_route.pop_first_hop() else {
return Ok(NetworkResult::invalid_message("switching from safety route to private route requires first hop"));
};
// Switching to private route from safety route
self.process_route_private_route_hop(
operation,
pr_first_hop.node.clone(),
// Check for loopback test where private route is the same as safety route
if sr_pubkey == private_route.public_key {
// If so, we're going to turn this thing right around without transiting the network
let PrivateRouteHops::Data(route_hop_data) = private_route.hops else {
return Ok(NetworkResult::invalid_message("Loopback test requires hops"));
};
// Decrypt route hop data
let route_hop = network_result_try!(self.decrypt_private_route_hop_data(&route_hop_data, &private_route.public_key, &mut routed_operation)?);
// Ensure hop count > 0
if private_route.hop_count == 0 {
return Ok(NetworkResult::invalid_message(
"route should not be at the end",
));
}
// Make next PrivateRoute and pass it on
return self.process_route_private_route_hop(
routed_operation,
route_hop.node,
sr_pubkey,
PrivateRoute {
public_key: private_route.public_key,
hop_count: private_route.hop_count - 1,
hops: pr_first_hop
hops: route_hop
.next_hop
.clone()
.map(|rhd| PrivateRouteHops::Data(rhd))
.unwrap_or(PrivateRouteHops::Empty),
},
)
.await;
}
// Switching to private route from safety route
self.process_route_private_route_hop(
routed_operation,
pr_first_hop,
sr_pubkey,
private_route,
)
.await
}
/// Decrypt route hop data and sign routed operation
pub(crate) fn decrypt_private_route_hop_data(&self, route_hop_data: &RouteHopData, pr_pubkey: &DHTKey, route_operation: &mut RoutedOperation) -> Result<NetworkResult<RouteHop>, RPCError>
{
// Decrypt the blob with DEC(nonce, DH(the PR's public key, this hop's secret)
let node_id_secret = self.routing_table.node_id_secret();
let dh_secret = self
.crypto
.cached_dh(&pr_pubkey, &node_id_secret)
.map_err(RPCError::protocol)?;
let dec_blob_data = match Crypto::decrypt_aead(
&route_hop_data.blob,
&route_hop_data.nonce,
&dh_secret,
None,
) {
Ok(v) => v,
Err(e) => {
return Ok(NetworkResult::invalid_message(format!("unable to decrypt private route hop data: {}", e)));
}
};
let dec_blob_reader = RPCMessageData::new(dec_blob_data).get_reader()?;
// Decode next RouteHop
let route_hop = {
let rh_reader = dec_blob_reader
.get_root::<veilid_capnp::route_hop::Reader>()
.map_err(RPCError::protocol)?;
decode_route_hop(&rh_reader)?
};
// Sign the operation if this is not our last hop
// as the last hop is already signed by the envelope
if route_hop.next_hop.is_some() {
let node_id = self.routing_table.node_id();
let node_id_secret = self.routing_table.node_id_secret();
let sig = sign(&node_id, &node_id_secret, &route_operation.data)
.map_err(RPCError::internal)?;
route_operation.signatures.push(sig);
}
Ok(NetworkResult::value(route_hop))
}
#[instrument(level = "trace", skip(self, msg), ret, err)]
pub(crate) async fn process_route(
&self,
@ -322,14 +381,14 @@ impl RPCProcessor {
// See what kind of safety route we have going on here
match route.safety_route.hops {
// There is a safety route hop
SafetyRouteHops::Data(ref d) => {
SafetyRouteHops::Data(ref route_hop_data) => {
// Decrypt the blob with DEC(nonce, DH(the SR's public key, this hop's secret)
let node_id_secret = self.routing_table.node_id_secret();
let dh_secret = self
.crypto
.cached_dh(&route.safety_route.public_key, &node_id_secret)
.map_err(RPCError::protocol)?;
let mut dec_blob_data = Crypto::decrypt_aead(&d.blob, &d.nonce, &dh_secret, None)
let mut dec_blob_data = Crypto::decrypt_aead(&route_hop_data.blob, &route_hop_data.nonce, &dh_secret, None)
.map_err(RPCError::protocol)?;
// See if this is last hop in safety route, if so, we're decoding a PrivateRoute not a RouteHop
@ -353,7 +412,7 @@ impl RPCProcessor {
network_result_try!(self.process_private_route_first_hop(
route.operation,
route.safety_route.public_key,
&private_route,
private_route,
)
.await?);
} else if dec_blob_tag == 0 {
@ -366,16 +425,16 @@ impl RPCProcessor {
};
// Continue the full safety route with another hop
network_result_try!(self.process_route_safety_route_hop(route, route_hop)
network_result_try!(self.process_route_safety_route_hop(route.operation, route_hop, route.safety_route)
.await?);
} else {
return Ok(NetworkResult::invalid_message("invalid blob tag"));
}
}
// No safety route left, now doing private route
SafetyRouteHops::Private(ref private_route) => {
SafetyRouteHops::Private(private_route) => {
// See if we have a hop, if not, we are at the end of the private route
match &private_route.hops {
match private_route.hops {
PrivateRouteHops::FirstHop(_) => {
// Safety route was a stub, start with the beginning of the private route
network_result_try!(self.process_private_route_first_hop(
@ -386,32 +445,9 @@ impl RPCProcessor {
.await?);
}
PrivateRouteHops::Data(route_hop_data) => {
// Decrypt the blob with DEC(nonce, DH(the PR's public key, this hop's secret)
let node_id_secret = self.routing_table.node_id_secret();
let dh_secret = self
.crypto
.cached_dh(&private_route.public_key, &node_id_secret)
.map_err(RPCError::protocol)?;
let dec_blob_data = match Crypto::decrypt_aead(
&route_hop_data.blob,
&route_hop_data.nonce,
&dh_secret,
None,
) {
Ok(v) => v,
Err(e) => {
return Ok(NetworkResult::invalid_message(format!("unable to decrypt private route hop data: {}", e)));
}
};
let dec_blob_reader = RPCMessageData::new(dec_blob_data).get_reader()?;
// Decode next RouteHop
let route_hop = {
let rh_reader = dec_blob_reader
.get_root::<veilid_capnp::route_hop::Reader>()
.map_err(RPCError::protocol)?;
decode_route_hop(&rh_reader)?
};
// Decrypt route hop data
let route_hop = network_result_try!(self.decrypt_private_route_hop_data(&route_hop_data, &private_route.public_key, &mut route.operation)?);
// Ensure hop count > 0
if private_route.hop_count == 0 {
@ -420,16 +456,6 @@ impl RPCProcessor {
));
}
// Sign the operation if this is not our last hop
// as the last hop is already signed by the envelope
if route_hop.next_hop.is_some() {
let node_id = self.routing_table.node_id();
let node_id_secret = self.routing_table.node_id_secret();
let sig = sign(&node_id, &node_id_secret, &route.operation.data)
.map_err(RPCError::internal)?;
route.operation.signatures.push(sig);
}
// Make next PrivateRoute and pass it on
network_result_try!(self.process_route_private_route_hop(
route.operation,
@ -453,13 +479,18 @@ impl RPCProcessor {
"route should be at the end",
));
}
if route.safety_route.hop_count != 0 {
return Ok(NetworkResult::invalid_message(
"Safety hop count should be zero if switched to private route",
));
}
// No hops left, time to process the routed operation
network_result_try!(self.process_routed_operation(
detail,
route.operation,
&route.safety_route,
private_route,
route.safety_route.public_key,
private_route.public_key,
)?);
}
}