checkpoint

This commit is contained in:
John Smith 2023-02-26 18:11:10 -05:00
parent 66db856c50
commit c330faa4fa
12 changed files with 158 additions and 114 deletions

View File

@ -769,33 +769,33 @@ impl NetworkManager {
/// Called by the RPC handler when we want to issue an RPC request or response
/// node_ref is the direct destination to which the envelope will be sent
/// If 'envelope_node_ref' is specified, it can be different than the node_ref being sent to
/// If 'destination_node_ref' is specified, it can be different than the node_ref being sent to
/// which will cause the envelope to be relayed
#[instrument(level = "trace", skip(self, body), ret, err)]
pub async fn send_envelope<B: AsRef<[u8]>>(
&self,
node_ref: NodeRef,
envelope_node_ref: Option<NodeRef>,
destination_node_ref: Option<NodeRef>,
body: B,
) -> EyreResult<NetworkResult<SendDataKind>> {
let end_node_ref = envelope_node_ref.as_ref().unwrap_or(&node_ref).clone();
let destination_node_ref = destination_node_ref.as_ref().unwrap_or(&node_ref).clone();
if !node_ref.same_entry(&end_node_ref) {
if !node_ref.same_entry(&destination_node_ref) {
log_net!(
"sending envelope to {:?} via {:?}",
end_node_ref,
destination_node_ref,
node_ref
);
} else {
log_net!("sending envelope to {:?}", node_ref);
}
let best_node_id = end_node_ref.best_node_id();
let best_node_id = destination_node_ref.best_node_id();
// Get node's envelope versions and see if we can send to it
// and if so, get the max version we can use
let Some(envelope_version) = end_node_ref.best_envelope_version() else {
let Some(envelope_version) = destination_node_ref.best_envelope_version() else {
bail!(
"can't talk to this node {} because we dont support its envelope versions",
node_ref

View File

@ -25,6 +25,19 @@ impl RemotePrivateRouteInfo {
pub fn get_private_routes(&self) -> &[PrivateRoute] {
&self.private_routes
}
pub fn best_private_route(&self) -> Option<PrivateRoute> {
self.private_routes
.iter()
.reduce(|acc, x| {
if x.public_key < acc.public_key {
x
} else {
acc
}
})
.filter(|x| VALID_CRYPTO_KINDS.contains(&x.public_key.kind))
.cloned()
}
pub fn get_stats(&self) -> &RouteStats {
&self.stats
}

View File

@ -616,7 +616,7 @@ impl RouteSpecStore {
let safety_selection = SafetySelection::Safe(safety_spec);
Destination::PrivateRoute {
private_route_id,
private_route,
safety_selection,
}
};
@ -650,7 +650,7 @@ impl RouteSpecStore {
let safety_selection = SafetySelection::Safe(safety_spec);
Destination::PrivateRoute {
private_route_id,
private_route,
safety_selection,
}
};
@ -816,6 +816,15 @@ impl RouteSpecStore {
//////////////////////////////////////////////////////////////////////
/// Choose the best private route from a private route set to communicate with
pub fn best_remote_private_route(&self, id: &RouteId) -> Option<PrivateRoute> {
let inner = &mut *self.inner.lock();
let cur_ts = get_aligned_timestamp();
let rpri = inner.cache.get_remote_private_route(cur_ts, id)?;
rpri.best_private_route()
}
/// Compiles a safety route to the private route, with caching
/// Returns an Err() if the parameters are wrong
/// Returns Ok(None) if no allocation could happen at this time (not an error)
@ -1313,12 +1322,12 @@ impl RouteSpecStore {
}
/// Check if a remote private route id is valid
#[instrument(level = "trace", skip(self), ret)]
pub fn is_valid_remote_private_route(&self, id: &RouteId) -> bool {
let inner = &mut *self.inner.lock();
let cur_ts = get_aligned_timestamp();
inner.cache.peek_remote_private_route_mut(cur_ts, id).is_some()
}
// #[instrument(level = "trace", skip(self), ret)]
// pub fn is_valid_remote_private_route(&self, id: &RouteId) -> bool {
// let inner = &mut *self.inner.lock();
// let cur_ts = get_aligned_timestamp();
// inner.cache.peek_remote_private_route_mut(cur_ts, id).is_some()
// }
// /// Retrieve an imported remote private route by its public key
// pub fn get_remote_private_route(&self, id: &String) -> Option<PrivateRoute> {
@ -1339,9 +1348,41 @@ impl RouteSpecStore {
// })
// }
/// Get a route id for a route's public key
pub fn get_route_id_for_key(&self, key: &PublicKey) -> Option<RouteId>
{
let inner = &mut *self.inner.lock();
// Check for local route
if let Some(id) = inner.content.get_id_by_key(key) {
return Some(id);
}
// Check for remote route
if let Some(rrid) = inner.cache.get_remote_private_route_id_by_key(key) {
return Some(rrid);
}
None
}
/// Check to see if this remote (not ours) private route has seen our current node info yet
/// This happens when you communicate with a private route without a safety route
pub fn has_remote_private_route_seen_our_node_info(&self, id: &RouteId) -> bool {
pub fn has_remote_private_route_seen_our_node_info(&self, key: &PublicKey) -> bool {
let inner = &mut *self.inner.lock();
// Check for local route. If this is not a remote private route,
// we may be running a test and using our own local route as the destination private route.
// In that case we definitely have already seen our own node info
if let Some(_) = inner.content.get_id_by_key(key) {
return true;
}
if let Some(rrid) = inner.cache.get_remote_private_route_id_by_key(key) {
let cur_ts = get_aligned_timestamp();
if let Some(rpri) = inner.cache.peek_remote_private_route_mut(cur_ts, &rrid)
{
let our_node_info_ts = {
let rti = &*self.unlocked_inner.routing_table.inner.read();
let Some(ts) = rti.get_own_node_info_ts(RoutingDomain::PublicInternet) else {
@ -1351,12 +1392,11 @@ impl RouteSpecStore {
ts
};
let inner = &mut *self.inner.lock();
let cur_ts = get_aligned_timestamp();
let Some(rpri) = inner.cache.peek_remote_private_route_mut(cur_ts, &id) else {
return false;
};
rpri.has_seen_our_node_info_ts(our_node_info_ts)
return rpri.has_seen_our_node_info_ts(our_node_info_ts);
}
}
false
}
/// Mark a remote private route as having seen our current node info

View File

@ -29,7 +29,7 @@ impl RoutedOperation {
.map_err(RPCError::map_internal("too many signatures"))?,
);
for s in sigs_reader.iter() {
let sig = decode_signature512(&s)?;
let sig = decode_signature512(&s);
signatures.push(sig);
}

View File

@ -19,10 +19,10 @@ pub enum Destination {
/// Require safety route or not
safety_selection: SafetySelection,
},
/// Send to private route (privateroute)
/// Send to private route
PrivateRoute {
/// A private route set id to send to
private_route_id: RouteId,
/// A private route to send to
private_route: PrivateRoute,
/// Require safety route or not
safety_selection: SafetySelection,
},
@ -44,9 +44,9 @@ impl Destination {
safety_selection: SafetySelection::Unsafe(sequencing),
}
}
pub fn private_route(private_route_id: RouteId, safety_selection: SafetySelection) -> Self {
pub fn private_route(private_route: PrivateRoute, safety_selection: SafetySelection) -> Self {
Self::PrivateRoute {
private_route_id,
private_route,
safety_selection,
}
}
@ -70,10 +70,10 @@ impl Destination {
safety_selection,
},
Destination::PrivateRoute {
private_route_id,
private_route,
safety_selection: _,
} => Self::PrivateRoute {
private_route_id,
private_route,
safety_selection,
},
}
@ -91,7 +91,7 @@ impl Destination {
safety_selection,
} => safety_selection,
Destination::PrivateRoute {
private_route_id: _,
private_route: _,
safety_selection,
} => safety_selection,
}
@ -127,7 +127,7 @@ impl fmt::Display for Destination {
write!(f, "{}@{}{}", target, relay, sr)
}
Destination::PrivateRoute {
private_route_id,
private_route,
safety_selection,
} => {
let sr = if matches!(safety_selection, SafetySelection::Safe(_)) {
@ -136,7 +136,7 @@ impl fmt::Display for Destination {
""
};
write!(f, "{}{}", private_route_id, sr)
write!(f, "{}{}", private_route.public_key, sr)
}
}
}
@ -162,9 +162,6 @@ impl RPCProcessor {
}
SafetySelection::Safe(safety_spec) => {
// Sent directly but with a safety route, respond to private route
xxx continue here. ensure crypto kind makes sense with get_private_route_for_safety_spec and then make it work.
let ck = target.best_node_id().kind;
let Some(pr_key) = rss
.get_private_route_for_safety_spec(ck, safety_spec, &target.node_ids())
@ -210,9 +207,10 @@ xxx continue here. ensure crypto kind makes sense with get_private_route_for_saf
}
},
Destination::PrivateRoute {
private_route_id,
private_route,
safety_selection,
} => {
let Some(avoid_node_id) = private_route.first_hop_node_id() else {
return Err(RPCError::internal("destination private route must have first hop"));
};
@ -224,22 +222,19 @@ xxx continue here. ensure crypto kind makes sense with get_private_route_for_saf
// Sent to a private route with no safety route, use a stub safety route for the response
// Determine if we can use optimized nodeinfo
let route_node = match rss
.has_remote_private_route_seen_our_node_info(&private_route_id)
let route_node = if rss
.has_remote_private_route_seen_our_node_info(&private_route.public_key.key)
{
true => {
if !routing_table.has_valid_own_node_info(RoutingDomain::PublicInternet) {
return Ok(NetworkResult::no_connection_other("Own node info must be valid to use private route"));
}
RouteNode::NodeId(routing_table.node_id(crypto_kind).key)
}
false => {
} else {
let Some(own_peer_info) =
routing_table.get_own_peer_info(RoutingDomain::PublicInternet) else {
return Ok(NetworkResult::no_connection_other("Own peer info must be valid to use private route"));
};
RouteNode::PeerInfo(own_peer_info)
},
};
Ok(NetworkResult::value(RespondTo::PrivateRoute(
@ -250,10 +245,10 @@ xxx continue here. ensure crypto kind makes sense with get_private_route_for_saf
// Sent to a private route via a safety route, respond to private route
// Check for loopback test
let pr_key = if safety_spec.preferred_route == Some(private_route_id)
let opt_private_route_id = rss.get_route_id_for_key(&private_route.public_key.key);
let pr_key = if safety_spec.preferred_route == opt_private_route_id
{
// Private route is also safety route during loopback test
xxx build loopback routine? get_private_route_for_loopback_test?
private_route.public_key.key
} else {
// Get the private route to respond to that matches the safety route spec we sent the request with

View File

@ -172,9 +172,9 @@ impl<T> Answer<T> {
struct RenderedOperation {
/// The rendered operation bytes
message: Vec<u8>,
/// Destination node id we're sending to
node_id: PublicKey,
/// Node to send envelope to (may not be destination node id in case of relay)
/// Destination node we're sending to
destination_node_ref: NodeRef,
/// Node to send envelope to (may not be destination node in case of relay)
node_ref: NodeRef,
/// Total safety + private route hop count + 1 hop for the initial send
hop_count: usize,
@ -415,7 +415,7 @@ impl RPCProcessor {
let routing_table = this.routing_table();
// First see if we have the node in our routing table already
if let Some(nr) = routing_table.lookup_node_ref(node_id) {
if let Some(nr) = routing_table.lookup_any_node_ref(node_id) {
// ensure we have some dial info for the entry already,
// if not, we should do the find_node anyway
if nr.has_any_dial_info() {
@ -438,7 +438,7 @@ impl RPCProcessor {
.await?;
if let Some(nr) = &nr {
if nr.node_id() != node_id {
if nr.node_ids().contains_key(&node_id) {
// found a close node, but not exact within our configured resolve_node timeout
return Ok(None);
}
@ -497,9 +497,15 @@ impl RPCProcessor {
) -> Result<NetworkResult<RenderedOperation>, RPCError> {
let routing_table = self.routing_table();
let rss = routing_table.route_spec_store();
// Get useful private route properties
let pr_is_stub = remote_private_route.is_stub();
let pr_hop_count = remote_private_route.hop_count;
let pr_pubkey = remote_private_route.public_key;
let pr_pubkey = remote_private_route.public_key.key;
let crypto_kind = remote_private_route.crypto_kind();
let Some(vcrypto) = self.crypto.get(crypto_kind) else {
return Err(RPCError::internal("crypto not available for selected private route"));
};
// Compile the safety route with the private route
let compiled_route: CompiledRoute = match rss
@ -514,21 +520,18 @@ impl RPCProcessor {
}
};
let sr_is_stub = compiled_route.safety_route.is_stub();
let sr_pubkey = compiled_route.safety_route.public_key;
let sr_pubkey = compiled_route.safety_route.public_key.key;
// Encrypt routed operation
// Xmsg + ENC(Xmsg, DH(PKapr, SKbsr))
// xxx use factory method, get version from somewhere...
let nonce = Crypto::get_random_nonce();
let dh_secret = self
.crypto
let nonce = vcrypto.random_nonce();
let dh_secret = vcrypto
.cached_dh(&pr_pubkey, &compiled_route.secret)
.map_err(RPCError::map_internal("dh failed"))?;
let enc_msg_data = Crypto::encrypt_aead(&message_data, &nonce, &dh_secret, None)
let enc_msg_data = vcrypto.encrypt_aead(&message_data, &nonce, &dh_secret, None)
.map_err(RPCError::map_internal("encryption failed"))?;
// Make the routed operation
// xxx: replace MAX_CRYPTO_VERSION with the version from the factory
let operation =
RoutedOperation::new(safety_selection.get_sequencing(), nonce, enc_msg_data);
@ -539,7 +542,7 @@ impl RPCProcessor {
operation,
};
let ssni_route = self
.get_sender_signed_node_info(&Destination::direct(compiled_route.first_hop.clone()))?;
.get_sender_peer_info(&Destination::direct(compiled_route.first_hop.clone()))?;
let operation = RPCOperation::new_statement(
RPCStatement::new(RPCStatementDetail::Route(route_operation)),
ssni_route,
@ -552,12 +555,11 @@ impl RPCProcessor {
let out_message = builder_to_vec(route_msg)?;
// Get the first hop this is going to
let out_node_id = compiled_route.first_hop.node_id();
let out_hop_count = (1 + sr_hop_count + pr_hop_count) as usize;
let out = RenderedOperation {
message: out_message,
node_id: out_node_id,
destination_node_ref: compiled_route.first_hop.clone(),
node_ref: compiled_route.first_hop,
hop_count: out_hop_count,
safety_route: if sr_is_stub { None } else { Some(sr_pubkey) },
@ -592,7 +594,7 @@ impl RPCProcessor {
let reply_private_route = match operation.kind() {
RPCOperationKind::Question(q) => match q.respond_to() {
RespondTo::Sender => None,
RespondTo::PrivateRoute(pr) => Some(pr.public_key),
RespondTo::PrivateRoute(pr) => Some(pr.public_key.key),
},
RPCOperationKind::Statement(_) | RPCOperationKind::Answer(_) => None,
};
@ -612,16 +614,15 @@ impl RPCProcessor {
// --------------------------------------
// Get the actual destination node id accounting for relays
let (node_ref, node_id) = if let Destination::Relay {
let (node_ref, destination_node_ref) = if let Destination::Relay {
relay: _,
target: ref dht_key,
target: ref target,
safety_selection: _,
} = dest
{
(node_ref.clone(), dht_key.clone())
(node_ref.clone(), target.clone())
} else {
let node_id = node_ref.node_id();
(node_ref.clone(), node_id)
(node_ref.clone(), node_ref.clone())
};
// Handle the existence of safety route
@ -640,7 +641,7 @@ impl RPCProcessor {
// route, we can use a direct envelope instead of routing
out = NetworkResult::value(RenderedOperation {
message,
node_id,
destination_node_ref,
node_ref,
hop_count: 1,
safety_route: None,
@ -651,7 +652,8 @@ impl RPCProcessor {
SafetySelection::Safe(_) => {
// No private route was specified for the request
// but we are using a safety route, so we must create an empty private route
let peer_info = match node_ref.make_peer_info(RoutingDomain::PublicInternet)
// Destination relay is ignored for safety routed operations
let peer_info = match destination_node_ref.make_peer_info(RoutingDomain::PublicInternet)
{
None => {
return Ok(NetworkResult::no_connection_other(
@ -661,7 +663,7 @@ impl RPCProcessor {
Some(pi) => pi,
};
let private_route =
PrivateRoute::new_stub(node_id, RouteNode::PeerInfo(peer_info));
PrivateRoute::new_stub(destination_node_ref.best_node_id(), RouteNode::PeerInfo(peer_info));
// Wrap with safety route
out = self.wrap_with_route(
@ -674,7 +676,7 @@ impl RPCProcessor {
};
}
Destination::PrivateRoute {
private_route_id,
private_route,
safety_selection,
} => {
// Send to private route
@ -695,7 +697,7 @@ impl RPCProcessor {
/// Get signed node info to package with RPC messages to improve
/// routing table caching when it is okay to do so
#[instrument(level = "trace", skip(self), ret, err)]
fn get_sender_signed_node_info(&self, dest: &Destination) -> Result<SenderPeerInfo, RPCError> {
fn get_sender_peer_info(&self, dest: &Destination) -> Result<SenderPeerInfo, RPCError> {
// Don't do this if the sender is to remain private
// Otherwise we would be attaching the original sender's identity to the final destination,
// thus defeating the purpose of the safety route entirely :P
@ -717,16 +719,9 @@ impl RPCProcessor {
relay: _,
target,
safety_selection: _,
} => {
if let Some(target) = routing_table.lookup_node_ref(*target) {
target
} else {
// Target was not in our routing table
return Ok(SenderPeerInfo::default());
}
}
} => target.clone(),
Destination::PrivateRoute {
private_route_id: _,
private_route: _,
safety_selection: _,
} => {
return Ok(SenderPeerInfo::default());
@ -755,7 +750,7 @@ impl RPCProcessor {
}
Ok(SenderPeerInfo::new(
own_peer_info.signed_node_info,
own_peer_info,
target_node_info_ts,
))
}
@ -1001,11 +996,11 @@ impl RPCProcessor {
dest: Destination,
question: RPCQuestion,
) -> Result<NetworkResult<WaitableReply>, RPCError> {
// Get sender signed node info if we should send that
let ssni = self.get_sender_signed_node_info(&dest)?;
// Get sender peer info if we should send that
let spi = self.get_sender_peer_info(&dest)?;
// Wrap question in operation
let operation = RPCOperation::new_question(question, ssni);
let operation = RPCOperation::new_question(question, spi);
let op_id = operation.op_id();
// Log rpc send
@ -1014,7 +1009,7 @@ impl RPCProcessor {
// Produce rendered operation
let RenderedOperation {
message,
node_id,
destination_node_ref,
node_ref,
hop_count,
safety_route,
@ -1034,7 +1029,7 @@ impl RPCProcessor {
let send_ts = get_aligned_timestamp();
let send_data_kind = network_result_try!(self
.network_manager()
.send_envelope(node_ref.clone(), Some(node_id), message)
.send_envelope(node_ref.clone(), Some(destination_node_ref), message)
.await
.map_err(|e| {
// If we're returning an error, clean up
@ -1076,11 +1071,11 @@ impl RPCProcessor {
dest: Destination,
statement: RPCStatement,
) -> Result<NetworkResult<()>, RPCError> {
// Get sender signed node info if we should send that
let ssni = self.get_sender_signed_node_info(&dest)?;
// Get sender peer info if we should send that
let spi = self.get_sender_peer_info(&dest)?;
// Wrap statement in operation
let operation = RPCOperation::new_statement(statement, ssni);
let operation = RPCOperation::new_statement(statement, spi);
// Log rpc send
trace!(target: "rpc_message", dir = "send", kind = "statement", op_id = operation.op_id().as_u64(), desc = operation.kind().desc(), ?dest);
@ -1088,7 +1083,7 @@ impl RPCProcessor {
// Produce rendered operation
let RenderedOperation {
message,
node_id,
destination_node_ref,
node_ref,
hop_count: _,
safety_route,
@ -1101,7 +1096,7 @@ impl RPCProcessor {
let send_ts = get_aligned_timestamp();
let _send_data_kind = network_result_try!(self
.network_manager()
.send_envelope(node_ref.clone(), Some(node_id), message)
.send_envelope(node_ref.clone(), Some(destination_node_ref), message)
.await
.map_err(|e| {
// If we're returning an error, clean up
@ -1138,10 +1133,10 @@ impl RPCProcessor {
let dest = network_result_try!(self.get_respond_to_destination(&request));
// Get sender signed node info if we should send that
let ssni = self.get_sender_signed_node_info(&dest)?;
let spi = self.get_sender_peer_info(&dest)?;
// Wrap answer in operation
let operation = RPCOperation::new_answer(&request.operation, answer, ssni);
let operation = RPCOperation::new_answer(&request.operation, answer, spi);
// Log rpc send
trace!(target: "rpc_message", dir = "send", kind = "answer", op_id = operation.op_id().as_u64(), desc = operation.kind().desc(), ?dest);
@ -1149,7 +1144,7 @@ impl RPCProcessor {
// Produce rendered operation
let RenderedOperation {
message,
node_id,
destination_node_ref,
node_ref,
hop_count: _,
safety_route,
@ -1161,7 +1156,7 @@ impl RPCProcessor {
let bytes: ByteCount = (message.len() as u64).into();
let send_ts = get_aligned_timestamp();
network_result_try!(self.network_manager()
.send_envelope(node_ref.clone(), Some(node_id), message)
.send_envelope(node_ref.clone(), Some(destination_node_ref), message)
.await
.map_err(|e| {
// If we're returning an error, clean up

View File

@ -17,7 +17,7 @@ impl RPCProcessor {
if matches!(
dest,
Destination::PrivateRoute {
private_route_id: _,
private_route: _,
safety_selection: _
}
) {

View File

@ -13,7 +13,7 @@ impl RPCProcessor {
if matches!(
dest,
Destination::PrivateRoute {
private_route_id: _,
private_route: _,
safety_selection: _
}
) {

View File

@ -53,7 +53,7 @@ impl RPCProcessor {
(Some(target.clone()), routing_domain)
}
Destination::PrivateRoute {
private_route_id: _,
private_route: _,
safety_selection: _,
} => (None, RoutingDomain::PublicInternet),
};
@ -169,7 +169,7 @@ impl RPCProcessor {
safety_selection: _,
}
| Destination::PrivateRoute {
private_route_id: _,
private_route: _,
safety_selection: _,
} => {
// sender info is irrelevant over relays and routes

View File

@ -134,14 +134,14 @@ fn get_destination(routing_table: RoutingTable) -> impl FnOnce(&str) -> Option<D
let private_route_id = dc.imported_routes.get(n)?.clone();
let rss = routing_table.route_spec_store();
if !rss.is_valid_remote_private_route(&private_route_id) {
let Some(private_route) = rss.best_remote_private_route(&private_route_id) else {
// Remove imported route
dc.imported_routes.remove(n);
info!("removed dead imported route {}", n);
return None;
};
Some(Destination::private_route(
private_route_id,
private_route,
ss.unwrap_or(SafetySelection::Unsafe(Sequencing::default())),
))
} else {

View File

@ -121,12 +121,13 @@ impl RoutingContext {
Target::PrivateRoute(rsid) => {
// Get remote private route
let rss = self.api.routing_table()?.route_spec_store();
if !rss.is_valid_remote_private_route(&rsid) {
let Some(private_route) = rss.best_remote_private_route(&rsid) else {
apibail_invalid_target!();
};
Ok(rpc_processor::Destination::PrivateRoute {
private_route_id: rsid,
private_route,
safety_selection: self.unlocked_inner.safety_selection,
})
}