checkpoint

This commit is contained in:
John Smith
2022-12-08 10:24:33 -05:00
parent 847623f2b4
commit 0b059e0ef9
16 changed files with 427 additions and 241 deletions

View File

@@ -58,24 +58,30 @@ impl RPCOperationKind {
pub struct RPCOperation {
op_id: u64,
sender_node_info: Option<SignedNodeInfo>,
target_node_info_ts: u64,
kind: RPCOperationKind,
}
impl RPCOperation {
pub fn new_question(question: RPCQuestion, sender_node_info: Option<SignedNodeInfo>) -> Self {
pub fn new_question(
question: RPCQuestion,
sender_signed_node_info: SenderSignedNodeInfo,
) -> Self {
Self {
op_id: get_random_u64(),
sender_node_info,
sender_node_info: sender_signed_node_info.signed_node_info,
target_node_info_ts: sender_signed_node_info.target_node_info_ts,
kind: RPCOperationKind::Question(question),
}
}
pub fn new_statement(
statement: RPCStatement,
sender_node_info: Option<SignedNodeInfo>,
sender_signed_node_info: SenderSignedNodeInfo,
) -> Self {
Self {
op_id: get_random_u64(),
sender_node_info,
sender_node_info: sender_signed_node_info.signed_node_info,
target_node_info_ts: sender_signed_node_info.target_node_info_ts,
kind: RPCOperationKind::Statement(statement),
}
}
@@ -83,11 +89,12 @@ impl RPCOperation {
pub fn new_answer(
request: &RPCOperation,
answer: RPCAnswer,
sender_node_info: Option<SignedNodeInfo>,
sender_signed_node_info: SenderSignedNodeInfo,
) -> Self {
Self {
op_id: request.op_id,
sender_node_info,
sender_node_info: sender_signed_node_info.signed_node_info,
target_node_info_ts: sender_signed_node_info.target_node_info_ts,
kind: RPCOperationKind::Answer(answer),
}
}
@@ -99,6 +106,9 @@ impl RPCOperation {
pub fn sender_node_info(&self) -> Option<&SignedNodeInfo> {
self.sender_node_info.as_ref()
}
pub fn target_node_info_ts(&self) -> u64 {
self.target_node_info_ts
}
pub fn kind(&self) -> &RPCOperationKind {
&self.kind
@@ -128,12 +138,15 @@ impl RPCOperation {
None
};
let target_node_info_ts = operation_reader.get_target_node_info_ts();
let kind_reader = operation_reader.get_kind();
let kind = RPCOperationKind::decode(&kind_reader, opt_sender_node_id)?;
Ok(RPCOperation {
op_id,
sender_node_info,
target_node_info_ts,
kind,
})
}
@@ -144,6 +157,7 @@ impl RPCOperation {
let mut si_builder = builder.reborrow().init_sender_node_info();
encode_signed_node_info(&sender_info, &mut si_builder)?;
}
builder.set_target_node_info_ts(self.target_node_info_ts);
let mut k_builder = builder.reborrow().init_kind();
self.kind.encode(&mut k_builder)?;
Ok(())

View File

@@ -217,10 +217,19 @@ impl RPCProcessor {
let route_node = match rss
.has_remote_private_route_seen_our_node_info(&private_route.public_key)
{
true => RouteNode::NodeId(NodeId::new(routing_table.node_id())),
false => RouteNode::PeerInfo(
routing_table.get_own_peer_info(RoutingDomain::PublicInternet),
),
true => {
if !routing_table.has_valid_own_node_info(RoutingDomain::PublicInternet) {
return Ok(NetworkResult::no_connection_other("Own node info must be valid to use private route"));
}
RouteNode::NodeId(NodeId::new(routing_table.node_id()))
}
false => {
let Some(own_peer_info) =
routing_table.get_own_peer_info(RoutingDomain::PublicInternet) else {
return Ok(NetworkResult::no_connection_other("Own peer info must be valid to use private route"));
};
RouteNode::PeerInfo(own_peer_info)
},
};
Ok(NetworkResult::value(RespondTo::PrivateRoute(

View File

@@ -189,14 +189,45 @@ impl<T> Answer<T> {
}
}
/// An operation that has been fully prepared for envelope r
struct RenderedOperation {
message: Vec<u8>, // The rendered operation bytes
node_id: DHTKey, // Destination node id we're sending to
node_ref: NodeRef, // Node to send envelope to (may not be destination node id in case of relay)
hop_count: usize, // Total safety + private route hop count + 1 hop for the initial send
safety_route: Option<DHTKey>, // The safety route used to send the message
remote_private_route: Option<DHTKey>, // The private route used to send the message
reply_private_route: Option<DHTKey>, // The private route requested to receive the reply
/// The rendered operation bytes
message: Vec<u8>,
/// Destination node id we're sending to
node_id: DHTKey,
/// Node to send envelope to (may not be destination node id in case of relay)
node_ref: NodeRef,
/// Total safety + private route hop count + 1 hop for the initial send
hop_count: usize,
/// The safety route used to send the message
safety_route: Option<DHTKey>,
/// The private route used to send the message
remote_private_route: Option<DHTKey>,
/// The private route requested to receive the reply
reply_private_route: Option<DHTKey>,
}
/// Node information exchanged during every RPC message
#[derive(Default, Debug, Clone)]
struct SenderSignedNodeInfo {
/// The current signed node info of the sender if required
signed_node_info: Option<SignedNodeInfo>,
/// The last timestamp of the target's node info to assist remote node with sending its latest node info
target_node_info_ts: u64,
}
impl SenderSignedNodeInfo {
pub fn new_no_sni(target_node_info_ts: u64) -> Self {
Self {
signed_node_info: None,
target_node_info_ts,
}
}
pub fn new(sender_signed_node_info: SignedNodeInfo, target_node_info_ts: u64) -> Self {
Self {
signed_node_info: Some(sender_signed_node_info),
target_node_info_ts,
}
}
}
#[derive(Copy, Clone, Debug)]
@@ -474,11 +505,10 @@ impl RPCProcessor {
)
}
};
out
}
// Wrap an operation with a private route inside a safety route
/// Wrap an operation with a private route inside a safety route
fn wrap_with_route(
&self,
safety_selection: SafetySelection,
@@ -528,9 +558,11 @@ impl RPCProcessor {
safety_route: compiled_route.safety_route,
operation,
};
let ssni_route =
self.get_sender_signed_node_info(&Destination::direct(compiled_route.first_hop))?;
let operation = RPCOperation::new_statement(
RPCStatement::new(RPCStatementDetail::Route(route_operation)),
None,
ssni_route,
);
// Convert message to bytes and return it
@@ -680,64 +712,75 @@ impl RPCProcessor {
Ok(out)
}
// Get signed node info to package with RPC messages to improve
// routing table caching when it is okay to do so
// This is only done in the PublicInternet routing domain because
// as far as we can tell this is the only domain that will really benefit
fn get_sender_signed_node_info(&self, dest: &Destination) -> Option<SignedNodeInfo> {
/// Get signed node info to package with RPC messages to improve
/// routing table caching when it is okay to do so
#[instrument(skip(self), ret, err)]
fn get_sender_signed_node_info(
&self,
dest: &Destination,
) -> Result<SenderSignedNodeInfo, RPCError> {
// Don't do this if the sender is to remain private
// Otherwise we would be attaching the original sender's identity to the final destination,
// thus defeating the purpose of the safety route entirely :P
match dest.get_safety_selection() {
SafetySelection::Unsafe(_) => {}
SafetySelection::Safe(_) => {
return None;
return Ok(SenderSignedNodeInfo::default());
}
}
// Don't do this if our own signed node info isn't valid yet
let routing_table = self.routing_table();
if !routing_table.has_valid_own_node_info(RoutingDomain::PublicInternet) {
return None;
}
match dest {
// Get the target we're sending to
let routing_table = self.routing_table();
let target = match dest {
Destination::Direct {
target,
safety_selection: _,
} => {
// If the target has seen our node info already don't do this
if target.has_seen_our_node_info(RoutingDomain::PublicInternet) {
return None;
}
Some(
routing_table
.get_own_peer_info(RoutingDomain::PublicInternet)
.signed_node_info,
)
}
} => target.clone(),
Destination::Relay {
relay: _,
target,
safety_selection: _,
} => {
if let Some(target) = routing_table.lookup_node_ref(*target) {
if target.has_seen_our_node_info(RoutingDomain::PublicInternet) {
return None;
}
Some(
routing_table
.get_own_peer_info(RoutingDomain::PublicInternet)
.signed_node_info,
)
target
} else {
None
// Target was not in our routing table
return Ok(SenderSignedNodeInfo::default());
}
}
Destination::PrivateRoute {
private_route: _,
safety_selection: _,
} => None,
} => {
return Ok(SenderSignedNodeInfo::default());
}
};
let Some(routing_domain) = target.best_routing_domain() else {
// No routing domain for target?
return Err(RPCError::internal(format!("No routing domain for target: {}", target)));
};
// Get the target's node info timestamp
let target_node_info_ts = target.node_info_ts(routing_domain);
// Don't return our node info if it's not valid yet
let Some(own_peer_info) = routing_table.get_own_peer_info(routing_domain) else {
return Ok(SenderSignedNodeInfo::new_no_sni(target_node_info_ts));
};
// Get our node info timestamp
let our_node_info_ts = own_peer_info.signed_node_info.timestamp();
// If the target has seen our node info already don't send it again
if target.has_seen_our_node_info_ts(routing_domain, our_node_info_ts) {
return Ok(SenderSignedNodeInfo::new_no_sni(target_node_info_ts));
}
Ok(SenderSignedNodeInfo::new(
own_peer_info.signed_node_info,
target_node_info_ts,
))
}
/// Record failure to send to node or route
@@ -981,11 +1024,11 @@ impl RPCProcessor {
dest: Destination,
question: RPCQuestion,
) -> Result<NetworkResult<WaitableReply>, RPCError> {
// Get sender info if we should send that
let opt_sender_info = self.get_sender_signed_node_info(&dest);
// Get sender signed node info if we should send that
let ssni = self.get_sender_signed_node_info(&dest)?;
// Wrap question in operation
let operation = RPCOperation::new_question(question, opt_sender_info);
let operation = RPCOperation::new_question(question, ssni);
let op_id = operation.op_id();
// Log rpc send
@@ -1056,11 +1099,11 @@ impl RPCProcessor {
dest: Destination,
statement: RPCStatement,
) -> Result<NetworkResult<()>, RPCError> {
// Get sender info if we should send that
let opt_sender_info = self.get_sender_signed_node_info(&dest);
// Get sender signed node info if we should send that
let ssni = self.get_sender_signed_node_info(&dest)?;
// Wrap statement in operation
let operation = RPCOperation::new_statement(statement, opt_sender_info);
let operation = RPCOperation::new_statement(statement, ssni);
// Log rpc send
trace!(target: "rpc_message", dir = "send", kind = "statement", op_id = operation.op_id(), desc = operation.kind().desc(), ?dest);
@@ -1117,11 +1160,11 @@ impl RPCProcessor {
// Extract destination from respond_to
let dest = network_result_try!(self.get_respond_to_destination(&request));
// Get sender info if we should send that
let opt_sender_info = self.get_sender_signed_node_info(&dest);
// Get sender signed node info if we should send that
let ssni = self.get_sender_signed_node_info(&dest)?;
// Wrap answer in operation
let operation = RPCOperation::new_answer(&request.operation, answer, opt_sender_info);
let operation = RPCOperation::new_answer(&request.operation, answer, ssni);
// Log rpc send
trace!(target: "rpc_message", dir = "send", kind = "answer", op_id = operation.op_id(), desc = operation.kind().desc(), ?dest);
@@ -1213,10 +1256,10 @@ impl RPCProcessor {
opt_sender_nr = self.routing_table().lookup_node_ref(sender_node_id)
}
// Mark this sender as having seen our node info over this routing domain
// because it managed to reach us over that routing domain
// Update the 'seen our node info' timestamp to determine if this node needs a
// 'node info update' ping
if let Some(sender_nr) = &opt_sender_nr {
sender_nr.set_seen_our_node_info(routing_domain);
sender_nr.set_our_node_info_ts(routing_domain, operation.target_node_info_ts());
}
// Make the RPC message

View File

@@ -92,9 +92,8 @@ impl RPCProcessor {
// add node information for the requesting node to our routing table
let routing_table = self.routing_table();
let has_valid_own_node_info =
routing_table.has_valid_own_node_info(RoutingDomain::PublicInternet);
let own_peer_info = routing_table.get_own_peer_info(RoutingDomain::PublicInternet);
let has_valid_own_node_info = own_peer_info.is_some();
// find N nodes closest to the target node in our routing table
@@ -116,7 +115,7 @@ impl RPCProcessor {
|rti, k, v| {
rti.transform_to_peer_info(
RoutingDomain::PublicInternet,
own_peer_info.clone(),
own_peer_info.as_ref().unwrap().clone(),
k,
v,
)