This commit is contained in:
John Smith
2023-04-22 20:48:24 -04:00
parent 438553a1ca
commit cb4abaefd7
29 changed files with 334 additions and 284 deletions
+3 -8
View File
@@ -56,11 +56,6 @@ pub use typed_signature::*;
use super::*;
#[derive(Debug, Clone)]
pub struct DecodeContext {
config: VeilidConfig,
}
#[derive(Debug, Clone)]
pub enum QuestionContext {
GetValue(ValidateGetValueContext),
@@ -69,7 +64,7 @@ pub enum QuestionContext {
#[derive(Clone)]
pub struct RPCValidateContext {
crypto: Crypto,
rpc_processor: RPCProcessor,
question_context: Option<QuestionContext>,
pub crypto: Crypto,
pub rpc_processor: RPCProcessor,
pub question_context: Option<QuestionContext>,
}
@@ -133,10 +133,7 @@ impl RPCOperation {
)
}
pub fn decode(
context: &DecodeContext,
operation_reader: &veilid_capnp::operation::Reader,
) -> Result<Self, RPCError> {
pub fn decode(operation_reader: &veilid_capnp::operation::Reader) -> Result<Self, RPCError> {
let op_id = OperationId::new(operation_reader.get_op_id());
let sender_peer_info = if operation_reader.has_sender_peer_info() {
@@ -5,9 +5,9 @@ const MAX_GET_VALUE_A_PEERS_LEN: usize = 20;
#[derive(Clone)]
pub struct ValidateGetValueContext {
last_descriptor: Option<SignedValueDescriptor>,
subkey: ValueSubkey,
vcrypto: CryptoSystemVersion,
pub last_descriptor: Option<SignedValueDescriptor>,
pub subkey: ValueSubkey,
pub vcrypto: CryptoSystemVersion,
}
impl fmt::Debug for ValidateGetValueContext {
@@ -6,7 +6,7 @@ pub struct RPCOperationReturnReceipt {
}
impl RPCOperationReturnReceipt {
pub fn new(receipt: &[u8]) -> Result<Self, RPCError> {
pub fn new(receipt: Vec<u8>) -> Result<Self, RPCError> {
if receipt.len() < MIN_RECEIPT_SIZE {
return Err(RPCError::protocol("ReturnReceipt receipt too short to set"));
}
@@ -14,9 +14,7 @@ impl RPCOperationReturnReceipt {
return Err(RPCError::protocol("ReturnReceipt receipt too long to set"));
}
Ok(Self {
receipt: receipt.to_vec(),
})
Ok(Self { receipt })
}
pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> {
Ok(())
@@ -44,8 +44,5 @@ pub fn decode_peer_info(reader: &veilid_capnp::peer_info::Reader) -> Result<Peer
if node_ids.len() == 0 {
return Err(RPCError::protocol("no verified node ids"));
}
Ok(PeerInfo {
node_ids,
signed_node_info,
})
Ok(PeerInfo::new(node_ids, signed_node_info))
}
+9 -12
View File
@@ -312,7 +312,7 @@ impl RPCProcessor {
}
pub fn set_storage_manager(&self, opt_storage_manager: Option<StorageManager>) {
let inner = self.inner.lock();
let mut inner = self.inner.lock();
inner.opt_storage_manager = opt_storage_manager
}
@@ -555,10 +555,7 @@ impl RPCProcessor {
// Prepare route operation
let sr_hop_count = compiled_route.safety_route.hop_count;
let route_operation = RPCOperationRoute {
safety_route: compiled_route.safety_route,
operation,
};
let route_operation = RPCOperationRoute::new(compiled_route.safety_route, operation);
let ssni_route =
self.get_sender_peer_info(&Destination::direct(compiled_route.first_hop.clone()));
let operation = RPCOperation::new_statement(
@@ -1216,18 +1213,15 @@ impl RPCProcessor {
.get_root::<veilid_capnp::operation::Reader>()
.map_err(RPCError::protocol)
.map_err(logthru_rpc!())?;
let decode_context = DecodeContext {
config: self.config.clone(),
};
let operation = RPCOperation::decode(&decode_context, &op_reader)?;
let mut operation = RPCOperation::decode(&op_reader)?;
// Validate the RPC message
self.validate_rpc_operation(&operation)?;
self.validate_rpc_operation(&mut operation)?;
Ok(operation)
}
fn validate_rpc_operation(&self, operation: &RPCOperation) -> Result<(), RPCError> {
fn validate_rpc_operation(&self, operation: &mut RPCOperation) -> Result<(), RPCError> {
// If this is an answer, get the question context for this answer
// If we received an answer for a question we did not ask, this will return an error
let question_context = if let RPCOperationKind::Answer(_) = operation.kind() {
@@ -1260,7 +1254,10 @@ impl RPCProcessor {
let msg = match &encoded_msg.header.detail {
RPCMessageHeaderDetail::Direct(detail) => {
// Decode and validate the RPC operation
let operation = self.decode_rpc_operation(&encoded_msg)?;
let operation = match self.decode_rpc_operation(&encoded_msg) {
Ok(v) => v,
Err(e) => return Ok(NetworkResult::invalid_message(e)),
};
// Get the routing domain this message came over
let routing_domain = detail.routing_domain;
@@ -100,7 +100,7 @@ where
/// Get operation context
pub fn get_op_context(&self, op_id: OperationId) -> Result<C, RPCError> {
let mut inner = self.inner.lock();
let inner = self.inner.lock();
let Some(waiting_op) = inner.waiting_op_table.get(&op_id) else {
return Err(RPCError::internal("Missing operation id getting op context"));
};
@@ -9,7 +9,7 @@ impl RPCProcessor {
dest: Destination,
message: Vec<u8>,
) -> Result<NetworkResult<Answer<Vec<u8>>>, RPCError> {
let app_call_q = RPCOperationAppCallQ::new(&message)?;
let app_call_q = RPCOperationAppCallQ::new(message)?;
let question = RPCQuestion::new(
network_result_try!(self.get_destination_respond_to(&dest)?),
RPCQuestionDetail::AppCallQ(app_call_q),
@@ -91,7 +91,7 @@ impl RPCProcessor {
};
// Return the appcall answer
let app_call_a = RPCOperationAppCallA::new(&message_a)?;
let app_call_a = RPCOperationAppCallA::new(message_a)?;
// Send status answer
self.answer(msg, RPCAnswer::new(RPCAnswerDetail::AppCallA(app_call_a)))
@@ -9,7 +9,7 @@ impl RPCProcessor {
dest: Destination,
message: Vec<u8>,
) -> Result<NetworkResult<()>, RPCError> {
let app_message = RPCOperationAppMessage { message };
let app_message = RPCOperationAppMessage::new(message)?;
let statement = RPCStatement::new(RPCStatementDetail::AppMessage(app_message));
// Send the app message request
@@ -22,7 +22,7 @@ impl RPCProcessor {
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Get the statement
let (op_id, _, _, kind) = msg.operation.destructure();
let (_, _, _, kind) = msg.operation.destructure();
let app_message = match kind {
RPCOperationKind::Statement(s) => match s.destructure() {
RPCStatementDetail::AppMessage(s) => s,
+19 -8
View File
@@ -1,8 +1,12 @@
use super::*;
use crate::storage_manager::SignedValueDescriptor;
use crate::storage_manager::{SignedValueData, SignedValueDescriptor};
#[derive(Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash)]
pub enum GetValueAnswer {}
#[derive(Clone, Debug)]
pub struct GetValueAnswer {
pub value: Option<SignedValueData>,
pub peers: Vec<PeerInfo>,
pub descriptor: Option<SignedValueDescriptor>,
}
impl RPCProcessor {
// Sends a get value request and wait for response
@@ -44,17 +48,24 @@ impl RPCProcessor {
// Get the right answer type
let (_, _, _, kind) = msg.operation.destructure();
let app_call_a = match kind {
let get_value_a = match kind {
RPCOperationKind::Answer(a) => match a.destructure() {
RPCAnswerDetail::AppCallA(a) => a,
_ => return Err(RPCError::invalid_format("not an appcall answer")),
RPCAnswerDetail::GetValueA(a) => a,
_ => return Err(RPCError::invalid_format("not a getvalue answer")),
},
_ => return Err(RPCError::invalid_format("not an answer")),
};
let a_message = app_call_a.destructure();
let (value, peers, descriptor) = get_value_a.destructure();
Ok(NetworkResult::value(Answer::new(latency, a_message)))
Ok(NetworkResult::value(Answer::new(
latency,
GetValueAnswer {
value,
peers,
descriptor,
},
)))
}
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
@@ -11,7 +11,7 @@ impl RPCProcessor {
) -> Result<NetworkResult<()>, RPCError> {
let receipt = receipt.as_ref().to_vec();
let return_receipt = RPCOperationReturnReceipt { receipt };
let return_receipt = RPCOperationReturnReceipt::new(receipt)?;
let statement = RPCStatement::new(RPCStatementDetail::ReturnReceipt(return_receipt));
// Send the return_receipt request
@@ -27,9 +27,9 @@ impl RPCProcessor {
) -> Result<NetworkResult<()>, RPCError> {
// Get the statement
let (_, _, _, kind) = msg.operation.destructure();
let RPCOperationReturnReceipt { receipt } = match kind {
let receipt = match kind {
RPCOperationKind::Statement(s) => match s.destructure() {
RPCStatementDetail::ReturnReceipt(s) => s,
RPCStatementDetail::ReturnReceipt(s) => s.destructure(),
_ => panic!("not a return receipt"),
},
_ => panic!("not a statement"),
+37 -36
View File
@@ -34,17 +34,17 @@ impl RPCProcessor {
};
// Apply sequencing preference
next_hop_nr.set_sequencing(routed_operation.sequencing);
next_hop_nr.set_sequencing(routed_operation.sequencing());
// Pass along the route
let next_hop_route = RPCOperationRoute {
safety_route: SafetyRoute {
let next_hop_route = RPCOperationRoute::new(
SafetyRoute {
public_key: safety_route.public_key,
hop_count: safety_route.hop_count - 1,
hops: SafetyRouteHops::Data(route_hop.next_hop.unwrap()),
},
operation: routed_operation,
};
routed_operation,
);
let next_hop_route_stmt = RPCStatement::new(RPCStatementDetail::Route(next_hop_route));
// Send the next route statement
@@ -76,17 +76,17 @@ impl RPCProcessor {
};
// Apply sequencing preference
next_hop_nr.set_sequencing(routed_operation.sequencing);
next_hop_nr.set_sequencing(routed_operation.sequencing());
// Pass along the route
let next_hop_route = RPCOperationRoute {
safety_route: SafetyRoute {
let next_hop_route = RPCOperationRoute::new(
SafetyRoute {
public_key: safety_route_public_key,
hop_count: 0,
hops: SafetyRouteHops::Private(next_private_route),
},
operation: routed_operation,
};
routed_operation,
);
let next_hop_route_stmt = RPCStatement::new(RPCStatementDetail::Route(next_hop_route));
// Send the next route statement
@@ -114,8 +114,8 @@ impl RPCProcessor {
.cached_dh(&remote_sr_pubkey.value, &node_id_secret)
.map_err(RPCError::protocol)?;
let body = match vcrypto.decrypt_aead(
&routed_operation.data,
&routed_operation.nonce,
routed_operation.data(),
routed_operation.nonce(),
&dh_secret,
None,
) {
@@ -132,7 +132,7 @@ impl RPCProcessor {
self.enqueue_safety_routed_message(
detail,
remote_sr_pubkey.value,
routed_operation.sequencing,
routed_operation.sequencing(),
body,
)
.map_err(RPCError::internal)?;
@@ -162,8 +162,8 @@ impl RPCProcessor {
let Some((secret_key, safety_spec)) = rss
.with_signature_validated_route(
&pr_pubkey,
&routed_operation.signatures,
&routed_operation.data,
routed_operation.signatures(),
routed_operation.data(),
sender_id.value,
|rssd, rsd| {
(
@@ -172,7 +172,7 @@ impl RPCProcessor {
preferred_route,
hop_count: rssd.hop_count(),
stability: rssd.get_stability(),
sequencing: routed_operation.sequencing,
sequencing: routed_operation.sequencing(),
},
)
}
@@ -188,8 +188,8 @@ impl RPCProcessor {
.map_err(RPCError::protocol)?;
let body = vcrypto
.decrypt_aead(
&routed_operation.data,
&routed_operation.nonce,
routed_operation.data(),
routed_operation.nonce(),
&dh_secret,
None,
)
@@ -353,9 +353,9 @@ impl RPCProcessor {
let node_id = self.routing_table.node_id(crypto_kind);
let node_id_secret = self.routing_table.node_id_secret_key(crypto_kind);
let sig = vcrypto
.sign(&node_id.value, &node_id_secret, &route_operation.data)
.sign(&node_id.value, &node_id_secret, route_operation.data())
.map_err(RPCError::internal)?;
route_operation.signatures.push(sig);
route_operation.add_signature(sig);
}
Ok(NetworkResult::value(route_hop))
@@ -378,7 +378,7 @@ impl RPCProcessor {
// Get the statement
let (_,_,_,kind) = msg.operation.destructure();
let mut route = match kind {
let route = match kind {
RPCOperationKind::Statement(s) => match s.destructure() {
RPCStatementDetail::Route(s) => s,
_ => panic!("not a route statement"),
@@ -387,7 +387,7 @@ impl RPCProcessor {
};
// Get crypto kind
let crypto_kind = route.safety_route.crypto_kind();
let crypto_kind = route.safety_route().crypto_kind();
let Some(vcrypto) = self.crypto.get(crypto_kind) else {
return Ok(NetworkResult::invalid_message(
"routed operation crypto is not supported",
@@ -395,13 +395,14 @@ impl RPCProcessor {
};
// See what kind of safety route we have going on here
match route.safety_route.hops {
let (safety_route, mut routed_operation) = route.destructure();
match safety_route.hops {
// There is a safety route hop
SafetyRouteHops::Data(ref route_hop_data) => {
// Decrypt the blob with DEC(nonce, DH(the SR's public key, this hop's secret)
let node_id_secret = self.routing_table.node_id_secret_key(crypto_kind);
let dh_secret = vcrypto
.cached_dh(&route.safety_route.public_key.value, &node_id_secret)
.cached_dh(&safety_route.public_key.value, &node_id_secret)
.map_err(RPCError::protocol)?;
let mut dec_blob_data = vcrypto
.decrypt_aead(
@@ -435,8 +436,8 @@ impl RPCProcessor {
// Switching from full safety route to private route first hop
network_result_try!(
self.process_private_route_first_hop(
route.operation,
route.safety_route.public_key,
routed_operation,
safety_route.public_key,
private_route,
)
.await?
@@ -456,9 +457,9 @@ impl RPCProcessor {
// Continue the full safety route with another hop
network_result_try!(
self.process_route_safety_route_hop(
route.operation,
routed_operation,
route_hop,
route.safety_route
safety_route
)
.await?
);
@@ -474,8 +475,8 @@ impl RPCProcessor {
// Safety route was a stub, start with the beginning of the private route
network_result_try!(
self.process_private_route_first_hop(
route.operation,
route.safety_route.public_key,
routed_operation,
safety_route.public_key,
private_route,
)
.await?
@@ -486,7 +487,7 @@ impl RPCProcessor {
let route_hop = network_result_try!(self.decrypt_private_route_hop_data(
&route_hop_data,
&private_route.public_key,
&mut route.operation
&mut routed_operation
)?);
// Ensure hop count > 0
@@ -499,9 +500,9 @@ impl RPCProcessor {
// Make next PrivateRoute and pass it on
network_result_try!(
self.process_route_private_route_hop(
route.operation,
routed_operation,
route_hop.node,
route.safety_route.public_key,
safety_route.public_key,
PrivateRoute {
public_key: private_route.public_key,
hop_count: private_route.hop_count - 1,
@@ -521,7 +522,7 @@ impl RPCProcessor {
"route should be at the end",
));
}
if route.safety_route.hop_count != 0 {
if safety_route.hop_count != 0 {
return Ok(NetworkResult::invalid_message(
"Safety hop count should be zero if switched to private route",
));
@@ -531,8 +532,8 @@ impl RPCProcessor {
network_result_try!(self.process_routed_operation(
detail,
vcrypto,
route.operation,
route.safety_route.public_key,
routed_operation,
safety_route.public_key,
private_route.public_key,
)?);
}
+1 -1
View File
@@ -22,7 +22,7 @@ impl RPCProcessor {
));
}
let signal = RPCOperationSignal { signal_info };
let signal = RPCOperationSignal::new(signal_info);
let statement = RPCStatement::new(RPCStatementDetail::Signal(signal));
// Send the signal request
+9 -11
View File
@@ -68,7 +68,7 @@ impl RPCProcessor {
}
};
let status_q = RPCOperationStatusQ { node_status };
let status_q = RPCOperationStatusQ::new(node_status);
let question = RPCQuestion::new(
network_result_try!(self.get_destination_respond_to(&dest)?),
RPCQuestionDetail::StatusQ(status_q),
@@ -88,8 +88,9 @@ impl RPCProcessor {
};
// Get the right answer type
let status_a = match msg.operation.into_kind() {
RPCOperationKind::Answer(a) => match a.into_detail() {
let (_, _, _, kind) = msg.operation.destructure();
let status_a = match kind {
RPCOperationKind::Answer(a) => match a.destructure() {
RPCAnswerDetail::StatusA(a) => a,
_ => return Err(RPCError::invalid_format("not a status answer")),
},
@@ -98,7 +99,7 @@ impl RPCProcessor {
// Ensure the returned node status is the kind for the routing domain we asked for
if let Some(target_nr) = opt_target_nr {
if let Some(node_status) = status_a.node_status {
if let Some(node_status) = status_a.node_status() {
match routing_domain {
RoutingDomain::PublicInternet => {
if !matches!(node_status, NodeStatus::PublicInternet(_)) {
@@ -117,7 +118,7 @@ impl RPCProcessor {
}
// Update latest node status in routing table
target_nr.update_node_status(node_status);
target_nr.update_node_status(node_status.clone());
}
}
@@ -131,7 +132,7 @@ impl RPCProcessor {
safety_selection,
} => {
if matches!(safety_selection, SafetySelection::Unsafe(_)) {
if let Some(sender_info) = status_a.sender_info {
if let Some(sender_info) = status_a.sender_info() {
match send_data_kind {
SendDataKind::Direct(connection_descriptor) => {
// Directly requested status that actually gets sent directly and not over a relay will tell us what our IP address appears as
@@ -199,7 +200,7 @@ impl RPCProcessor {
let routing_domain = detail.routing_domain;
// Ensure the node status from the question is the kind for the routing domain we received the request in
if let Some(node_status) = &status_q.node_status {
if let Some(node_status) = status_q.node_status() {
match routing_domain {
RoutingDomain::PublicInternet => {
if !matches!(node_status, NodeStatus::PublicInternet(_)) {
@@ -244,10 +245,7 @@ impl RPCProcessor {
};
// Make status answer
let status_a = RPCOperationStatusA {
node_status,
sender_info,
};
let status_a = RPCOperationStatusA::new(node_status, sender_info);
// Send status answer
self.answer(msg, RPCAnswer::new(RPCAnswerDetail::StatusA(status_a)))
@@ -17,11 +17,7 @@ impl RPCProcessor {
.generate_single_shot_receipt(receipt_time, [])
.map_err(RPCError::internal)?;
let validate_dial_info = RPCOperationValidateDialInfo {
dial_info,
receipt,
redirect,
};
let validate_dial_info = RPCOperationValidateDialInfo::new(dial_info, receipt, redirect)?;
let statement = RPCStatement::new(RPCStatementDetail::ValidateDialInfo(validate_dial_info));
// Send the validate_dial_info request
@@ -134,11 +130,8 @@ impl RPCProcessor {
}
// Make a copy of the request, without the redirect flag
let validate_dial_info = RPCOperationValidateDialInfo {
dial_info: dial_info.clone(),
receipt: receipt.clone(),
redirect: false,
};
let validate_dial_info =
RPCOperationValidateDialInfo::new(dial_info.clone(), receipt.clone(), false)?;
let statement =
RPCStatement::new(RPCStatementDetail::ValidateDialInfo(validate_dial_info));