checkpoint
This commit is contained in:
@@ -15,9 +15,12 @@ impl RPCAnswer {
|
||||
pub fn desc(&self) -> &'static str {
|
||||
self.detail.desc()
|
||||
}
|
||||
pub fn decode(reader: &veilid_capnp::answer::Reader) -> Result<RPCAnswer, RPCError> {
|
||||
pub fn decode(
|
||||
reader: &veilid_capnp::answer::Reader,
|
||||
crypto: Crypto,
|
||||
) -> Result<RPCAnswer, RPCError> {
|
||||
let d_reader = reader.get_detail();
|
||||
let detail = RPCAnswerDetail::decode(&d_reader)?;
|
||||
let detail = RPCAnswerDetail::decode(&d_reader, crypto)?;
|
||||
Ok(RPCAnswer { detail })
|
||||
}
|
||||
pub fn encode(&self, builder: &mut veilid_capnp::answer::Builder) -> Result<(), RPCError> {
|
||||
@@ -60,6 +63,7 @@ impl RPCAnswerDetail {
|
||||
|
||||
pub fn decode(
|
||||
reader: &veilid_capnp::answer::detail::Reader,
|
||||
crypto: Crypto,
|
||||
) -> Result<RPCAnswerDetail, RPCError> {
|
||||
let which_reader = reader.which().map_err(RPCError::protocol)?;
|
||||
let out = match which_reader {
|
||||
@@ -70,7 +74,7 @@ impl RPCAnswerDetail {
|
||||
}
|
||||
veilid_capnp::answer::detail::FindNodeA(r) => {
|
||||
let op_reader = r.map_err(RPCError::protocol)?;
|
||||
let out = RPCOperationFindNodeA::decode(&op_reader)?;
|
||||
let out = RPCOperationFindNodeA::decode(&op_reader, crypto)?;
|
||||
RPCAnswerDetail::FindNodeA(out)
|
||||
}
|
||||
veilid_capnp::answer::detail::AppCallA(r) => {
|
||||
@@ -100,7 +104,7 @@ impl RPCAnswerDetail {
|
||||
}
|
||||
veilid_capnp::answer::detail::FindBlockA(r) => {
|
||||
let op_reader = r.map_err(RPCError::protocol)?;
|
||||
let out = RPCOperationFindBlockA::decode(&op_reader)?;
|
||||
let out = RPCOperationFindBlockA::decode(&op_reader, crypto)?;
|
||||
RPCAnswerDetail::FindBlockA(out)
|
||||
}
|
||||
veilid_capnp::answer::detail::StartTunnelA(r) => {
|
||||
|
@@ -29,12 +29,12 @@ impl RPCOperationKind {
|
||||
}
|
||||
veilid_capnp::operation::kind::Which::Statement(r) => {
|
||||
let q_reader = r.map_err(RPCError::protocol)?;
|
||||
let out = RPCStatement::decode(&q_reader)?;
|
||||
let out = RPCStatement::decode(&q_reader, crypto)?;
|
||||
RPCOperationKind::Statement(out)
|
||||
}
|
||||
veilid_capnp::operation::kind::Which::Answer(r) => {
|
||||
let q_reader = r.map_err(RPCError::protocol)?;
|
||||
let out = RPCAnswer::decode(&q_reader)?;
|
||||
let out = RPCAnswer::decode(&q_reader, crypto)?;
|
||||
RPCOperationKind::Answer(out)
|
||||
}
|
||||
};
|
||||
|
@@ -35,6 +35,7 @@ pub struct RPCOperationFindBlockA {
|
||||
impl RPCOperationFindBlockA {
|
||||
pub fn decode(
|
||||
reader: &veilid_capnp::operation_find_block_a::Reader,
|
||||
crypto: Crypto,
|
||||
) -> Result<RPCOperationFindBlockA, RPCError> {
|
||||
let data = reader.get_data().map_err(RPCError::protocol)?.to_vec();
|
||||
|
||||
@@ -46,7 +47,7 @@ impl RPCOperationFindBlockA {
|
||||
.map_err(RPCError::map_internal("too many suppliers"))?,
|
||||
);
|
||||
for s in suppliers_reader.iter() {
|
||||
let peer_info = decode_peer_info(&s)?;
|
||||
let peer_info = decode_peer_info(&s, crypto.clone())?;
|
||||
suppliers.push(peer_info);
|
||||
}
|
||||
|
||||
@@ -58,7 +59,7 @@ impl RPCOperationFindBlockA {
|
||||
.map_err(RPCError::map_internal("too many peers"))?,
|
||||
);
|
||||
for p in peers_reader.iter() {
|
||||
let peer_info = decode_peer_info(&p)?;
|
||||
let peer_info = decode_peer_info(&p, crypto.clone())?;
|
||||
peers.push(peer_info);
|
||||
}
|
||||
|
||||
|
@@ -10,7 +10,7 @@ impl RPCOperationFindNodeQ {
|
||||
reader: &veilid_capnp::operation_find_node_q::Reader,
|
||||
) -> Result<RPCOperationFindNodeQ, RPCError> {
|
||||
let ni_reader = reader.get_node_id().map_err(RPCError::protocol)?;
|
||||
let node_id = decode_key256(&ni_reader);
|
||||
let node_id = decode_typed_key(&ni_reader)?;
|
||||
Ok(RPCOperationFindNodeQ { node_id })
|
||||
}
|
||||
pub fn encode(
|
||||
@@ -18,7 +18,7 @@ impl RPCOperationFindNodeQ {
|
||||
builder: &mut veilid_capnp::operation_find_node_q::Builder,
|
||||
) -> Result<(), RPCError> {
|
||||
let mut ni_builder = builder.reborrow().init_node_id();
|
||||
encode_key256(&self.node_id, &mut ni_builder)?;
|
||||
encode_typed_key(&self.node_id, &mut ni_builder);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -31,6 +31,7 @@ pub struct RPCOperationFindNodeA {
|
||||
impl RPCOperationFindNodeA {
|
||||
pub fn decode(
|
||||
reader: &veilid_capnp::operation_find_node_a::Reader,
|
||||
crypto: Crypto,
|
||||
) -> Result<RPCOperationFindNodeA, RPCError> {
|
||||
let peers_reader = reader.get_peers().map_err(RPCError::protocol)?;
|
||||
let mut peers = Vec::<PeerInfo>::with_capacity(
|
||||
@@ -40,7 +41,7 @@ impl RPCOperationFindNodeA {
|
||||
.map_err(RPCError::map_internal("too many peers"))?,
|
||||
);
|
||||
for p in peers_reader.iter() {
|
||||
let peer_info = decode_peer_info(&p)?;
|
||||
let peer_info = decode_peer_info(&p, crypto.clone())?;
|
||||
peers.push(peer_info);
|
||||
}
|
||||
|
||||
|
@@ -29,7 +29,7 @@ impl RoutedOperation {
|
||||
.map_err(RPCError::map_internal("too many signatures"))?,
|
||||
);
|
||||
for s in sigs_reader.iter() {
|
||||
let sig = decode_typed_signature(&s);
|
||||
let sig = decode_typed_signature(&s)?;
|
||||
signatures.push(sig);
|
||||
}
|
||||
|
||||
@@ -80,9 +80,10 @@ pub struct RPCOperationRoute {
|
||||
impl RPCOperationRoute {
|
||||
pub fn decode(
|
||||
reader: &veilid_capnp::operation_route::Reader,
|
||||
crypto: Crypto,
|
||||
) -> Result<RPCOperationRoute, RPCError> {
|
||||
let sr_reader = reader.get_safety_route().map_err(RPCError::protocol)?;
|
||||
let safety_route = decode_safety_route(&sr_reader)?;
|
||||
let safety_route = decode_safety_route(&sr_reader, crypto)?;
|
||||
|
||||
let o_reader = reader.get_operation().map_err(RPCError::protocol)?;
|
||||
let operation = RoutedOperation::decode(&o_reader)?;
|
||||
|
@@ -8,8 +8,9 @@ pub struct RPCOperationSignal {
|
||||
impl RPCOperationSignal {
|
||||
pub fn decode(
|
||||
reader: &veilid_capnp::operation_signal::Reader,
|
||||
crypto: Crypto,
|
||||
) -> Result<RPCOperationSignal, RPCError> {
|
||||
let signal_info = decode_signal_info(reader)?;
|
||||
let signal_info = decode_signal_info(reader, crypto)?;
|
||||
Ok(RPCOperationSignal { signal_info })
|
||||
}
|
||||
pub fn encode(
|
||||
|
@@ -18,9 +18,12 @@ impl RPCStatement {
|
||||
pub fn desc(&self) -> &'static str {
|
||||
self.detail.desc()
|
||||
}
|
||||
pub fn decode(reader: &veilid_capnp::statement::Reader) -> Result<RPCStatement, RPCError> {
|
||||
pub fn decode(
|
||||
reader: &veilid_capnp::statement::Reader,
|
||||
crypto: Crypto,
|
||||
) -> Result<RPCStatement, RPCError> {
|
||||
let d_reader = reader.get_detail();
|
||||
let detail = RPCStatementDetail::decode(&d_reader)?;
|
||||
let detail = RPCStatementDetail::decode(&d_reader, crypto)?;
|
||||
Ok(RPCStatement { detail })
|
||||
}
|
||||
pub fn encode(&self, builder: &mut veilid_capnp::statement::Builder) -> Result<(), RPCError> {
|
||||
@@ -52,6 +55,7 @@ impl RPCStatementDetail {
|
||||
}
|
||||
pub fn decode(
|
||||
reader: &veilid_capnp::statement::detail::Reader,
|
||||
crypto: Crypto,
|
||||
) -> Result<RPCStatementDetail, RPCError> {
|
||||
let which_reader = reader.which().map_err(RPCError::protocol)?;
|
||||
let out = match which_reader {
|
||||
@@ -62,7 +66,7 @@ impl RPCStatementDetail {
|
||||
}
|
||||
veilid_capnp::statement::detail::Route(r) => {
|
||||
let op_reader = r.map_err(RPCError::protocol)?;
|
||||
let out = RPCOperationRoute::decode(&op_reader)?;
|
||||
let out = RPCOperationRoute::decode(&op_reader, crypto)?;
|
||||
RPCStatementDetail::Route(out)
|
||||
}
|
||||
veilid_capnp::statement::detail::ValueChanged(r) => {
|
||||
@@ -72,7 +76,7 @@ impl RPCStatementDetail {
|
||||
}
|
||||
veilid_capnp::statement::detail::Signal(r) => {
|
||||
let op_reader = r.map_err(RPCError::protocol)?;
|
||||
let out = RPCOperationSignal::decode(&op_reader)?;
|
||||
let out = RPCOperationSignal::decode(&op_reader, crypto)?;
|
||||
RPCStatementDetail::Signal(out)
|
||||
}
|
||||
veilid_capnp::statement::detail::ReturnReceipt(r) => {
|
||||
|
@@ -164,7 +164,7 @@ impl RPCProcessor {
|
||||
// Sent directly but with a safety route, respond to private route
|
||||
let ck = target.best_node_id().kind;
|
||||
let Some(pr_key) = rss
|
||||
.get_private_route_for_safety_spec(ck, safety_spec, &target.node_ids().keys())
|
||||
.get_private_route_for_safety_spec(ck, safety_spec, &target.node_ids())
|
||||
.map_err(RPCError::internal)? else {
|
||||
return Ok(NetworkResult::no_connection_other("no private route for response at this time"));
|
||||
};
|
||||
@@ -193,7 +193,7 @@ impl RPCProcessor {
|
||||
let mut avoid_nodes = relay.node_ids();
|
||||
avoid_nodes.add_all(&target.node_ids());
|
||||
let Some(pr_key) = rss
|
||||
.get_private_route_for_safety_spec(ck, safety_spec, &avoid_nodes.keys())
|
||||
.get_private_route_for_safety_spec(ck, safety_spec, &avoid_nodes)
|
||||
.map_err(RPCError::internal)? else {
|
||||
return Ok(NetworkResult::no_connection_other("no private route for response at this time"));
|
||||
};
|
||||
|
@@ -87,7 +87,16 @@ struct RPCMessageHeader {
|
||||
detail: RPCMessageHeaderDetail,
|
||||
}
|
||||
|
||||
impl RPCMessageHeader {}
|
||||
impl RPCMessageHeader {
|
||||
/// The crypto kind used on the RPC
|
||||
pub fn crypto_kind(&self) -> CryptoKind {
|
||||
match &self.detail {
|
||||
RPCMessageHeaderDetail::Direct(d) => d.envelope.get_crypto_kind(),
|
||||
RPCMessageHeaderDetail::SafetyRouted(s) => todo!(),
|
||||
RPCMessageHeaderDetail::PrivateRouted(p) => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RPCMessageData {
|
||||
|
@@ -44,6 +44,9 @@ impl RPCProcessor {
|
||||
&self,
|
||||
msg: RPCMessage,
|
||||
) -> Result<NetworkResult<()>, RPCError> {
|
||||
// Get the crypto kind used to send this question
|
||||
let crypto_kind = msg.header.crypto_kind();
|
||||
|
||||
// Get the question
|
||||
let app_call_q = match msg.operation.kind() {
|
||||
RPCOperationKind::Question(q) => match q.detail() {
|
||||
@@ -61,7 +64,7 @@ impl RPCProcessor {
|
||||
let sender = msg
|
||||
.opt_sender_nr
|
||||
.as_ref()
|
||||
.map(|nr| NodeId::new(nr.node_id()));
|
||||
.map(|nr| nr.node_ids().get(crypto_kind).unwrap().key);
|
||||
let message = app_call_q.message.clone();
|
||||
(self.unlocked_inner.update_callback)(VeilidUpdate::AppCall(VeilidAppCall {
|
||||
sender,
|
||||
|
@@ -30,8 +30,16 @@ impl RPCProcessor {
|
||||
_ => panic!("not a statement"),
|
||||
};
|
||||
|
||||
// Get the crypto kind used to send this question
|
||||
let crypto_kind = msg.header.crypto_kind();
|
||||
|
||||
// Get the sender node id this came from
|
||||
let sender = msg
|
||||
.opt_sender_nr
|
||||
.as_ref()
|
||||
.map(|nr| nr.node_ids().get(crypto_kind).unwrap().key);
|
||||
|
||||
// Pass the message up through the update callback
|
||||
let sender = msg.opt_sender_nr.map(|nr| NodeId::new(nr.node_id()));
|
||||
let message = app_message.message;
|
||||
(self.unlocked_inner.update_callback)(VeilidUpdate::AppMessage(VeilidAppMessage {
|
||||
sender,
|
||||
|
@@ -11,7 +11,7 @@ impl RPCProcessor {
|
||||
pub async fn rpc_call_find_node(
|
||||
self,
|
||||
dest: Destination,
|
||||
key: PublicKey,
|
||||
node_id: TypedKey,
|
||||
) -> Result<NetworkResult<Answer<Vec<PeerInfo>>>, RPCError> {
|
||||
// Ensure destination never has a private route
|
||||
if matches!(
|
||||
@@ -26,8 +26,7 @@ impl RPCProcessor {
|
||||
));
|
||||
}
|
||||
|
||||
let find_node_q_detail =
|
||||
RPCQuestionDetail::FindNodeQ(RPCOperationFindNodeQ { node_id: key });
|
||||
let find_node_q_detail = RPCQuestionDetail::FindNodeQ(RPCOperationFindNodeQ { node_id });
|
||||
let find_node_q = RPCQuestion::new(
|
||||
network_result_try!(self.get_destination_respond_to(&dest)?),
|
||||
find_node_q_detail,
|
||||
@@ -90,6 +89,13 @@ impl RPCProcessor {
|
||||
_ => panic!("not a question"),
|
||||
};
|
||||
|
||||
// Get the crypto kinds the requesting node is capable of
|
||||
let crypto_kinds = if let Some(sender_nr) = msg.opt_sender_nr {
|
||||
sender_nr.node_ids().kinds()
|
||||
} else {
|
||||
vec![msg.header.crypto_kind()]
|
||||
};
|
||||
|
||||
// add node information for the requesting node to our routing table
|
||||
let routing_table = self.routing_table();
|
||||
let Some(own_peer_info) = routing_table.get_own_peer_info(RoutingDomain::PublicInternet) else {
|
||||
@@ -98,10 +104,20 @@ impl RPCProcessor {
|
||||
};
|
||||
|
||||
// find N nodes closest to the target node in our routing table
|
||||
|
||||
let filter = Box::new(
|
||||
move |rti: &RoutingTableInner, entry: Option<Arc<BucketEntry>>| {
|
||||
rti.filter_has_valid_signed_node_info(RoutingDomain::PublicInternet, true, entry)
|
||||
move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
|
||||
// ensure the returned nodes have at least the crypto kind used to send the findnodeq
|
||||
if let Some(entry) = opt_entry {
|
||||
if !entry.with(rti, |_rti, e| e.crypto_kinds().contains(&crypto_kind)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
// Ensure only things that are valid/signed in the PublicInternet domain are returned
|
||||
rti.filter_has_valid_signed_node_info(
|
||||
RoutingDomain::PublicInternet,
|
||||
true,
|
||||
opt_entry,
|
||||
)
|
||||
},
|
||||
) as RoutingTableEntryFilter;
|
||||
let filters = VecDeque::from([filter]);
|
||||
|
@@ -42,7 +42,6 @@ impl RPCProcessor {
|
||||
target,
|
||||
safety_selection: _,
|
||||
} => {
|
||||
let opt_target_nr = self.routing_table.lookup_node_ref(*target);
|
||||
let routing_domain = match relay.best_routing_domain() {
|
||||
Some(rd) => rd,
|
||||
None => {
|
||||
@@ -51,7 +50,7 @@ impl RPCProcessor {
|
||||
))
|
||||
}
|
||||
};
|
||||
(opt_target_nr, routing_domain)
|
||||
(Some(target.clone()), routing_domain)
|
||||
}
|
||||
Destination::PrivateRoute {
|
||||
private_route: _,
|
||||
|
@@ -88,7 +88,10 @@ impl RPCProcessor {
|
||||
// Use the address type though, to ensure we reach an ipv6 capable node if this is
|
||||
// an ipv6 address
|
||||
let routing_table = self.routing_table();
|
||||
let sender_id = detail.envelope.get_sender_id();
|
||||
let sender_node_id = TypedKey::new(
|
||||
detail.envelope.get_crypto_kind(),
|
||||
detail.envelope.get_sender_id(),
|
||||
);
|
||||
let routing_domain = detail.routing_domain;
|
||||
let node_count = {
|
||||
let c = self.config.get();
|
||||
@@ -102,7 +105,7 @@ impl RPCProcessor {
|
||||
dial_info.clone(),
|
||||
);
|
||||
let will_validate_dial_info_filter = Box::new(
|
||||
move |rti: &RoutingTableInner, _k: TypedKey, v: Option<Arc<BucketEntry>>| {
|
||||
move |rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
|
||||
let entry = v.unwrap();
|
||||
entry.with(rti, move |_rti, e| {
|
||||
if let Some(status) = &e.node_status(routing_domain) {
|
||||
@@ -129,7 +132,7 @@ impl RPCProcessor {
|
||||
}
|
||||
for peer in peers {
|
||||
// Ensure the peer is not the one asking for the validation
|
||||
if peer.node_id() == sender_id {
|
||||
if peer.node_ids().contains(&sender_node_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user