veilid/veilid-core/src/rpc_processor/mod.rs

1481 lines
54 KiB
Rust
Raw Normal View History

2021-11-22 16:28:30 +00:00
mod coders;
2022-09-03 17:57:25 +00:00
mod destination;
mod operation_waiter;
mod rpc_app_call;
mod rpc_app_message;
2022-07-04 16:03:21 +00:00
mod rpc_cancel_tunnel;
mod rpc_complete_tunnel;
2022-07-10 21:36:50 +00:00
mod rpc_error;
2022-07-04 16:03:21 +00:00
mod rpc_find_block;
mod rpc_find_node;
mod rpc_get_value;
mod rpc_node_info_update;
mod rpc_return_receipt;
mod rpc_route;
mod rpc_set_value;
mod rpc_signal;
mod rpc_start_tunnel;
mod rpc_status;
mod rpc_supply_block;
mod rpc_validate_dial_info;
mod rpc_value_changed;
mod rpc_watch_value;
2021-12-17 02:57:28 +00:00
2022-10-20 19:09:04 +00:00
pub use coders::*;
2022-09-03 17:57:25 +00:00
pub use destination::*;
pub use operation_waiter::*;
2022-07-10 21:36:50 +00:00
pub use rpc_error::*;
2022-11-02 19:36:01 +00:00
pub use rpc_status::*;
2021-11-22 16:28:30 +00:00
2022-07-04 16:03:21 +00:00
use super::*;
2022-11-27 02:37:23 +00:00
2022-10-30 23:29:31 +00:00
use crate::crypto::*;
2022-06-13 00:58:02 +00:00
use futures_util::StreamExt;
2021-11-22 16:28:30 +00:00
use network_manager::*;
use receipt_manager::*;
use routing_table::*;
2022-06-13 00:58:02 +00:00
use stop_token::future::FutureExt;
2021-11-22 16:28:30 +00:00
/////////////////////////////////////////////////////////////////////
type OperationId = u64;
#[derive(Debug, Clone)]
2022-10-30 02:15:50 +00:00
struct RPCMessageHeaderDetailDirect {
2022-08-25 23:21:50 +00:00
/// The decoded header of the envelope
2022-05-31 23:54:52 +00:00
envelope: Envelope,
2022-08-25 23:21:50 +00:00
/// The noderef of the peer that sent the message (not the original sender). Ensures node doesn't get evicted from routing table until we're done with it
peer_noderef: NodeRef,
/// The connection from the peer sent the message (not the original sender)
connection_descriptor: ConnectionDescriptor,
2022-09-03 17:57:25 +00:00
/// The routing domain the message was sent through
routing_domain: RoutingDomain,
2022-10-30 02:15:50 +00:00
}
2022-11-02 02:42:34 +00:00
/// Header details for rpc messages received over only a safety route but not a private route
2022-10-30 02:15:50 +00:00
#[derive(Debug, Clone)]
2022-11-02 02:42:34 +00:00
struct RPCMessageHeaderDetailSafetyRouted {
2022-11-24 03:12:48 +00:00
/// Remote safety route used
remote_safety_route: DHTKey,
2022-11-02 02:42:34 +00:00
/// The sequencing used for this route
sequencing: Sequencing,
}
/// Header details for rpc messages received over a private route
#[derive(Debug, Clone)]
struct RPCMessageHeaderDetailPrivateRouted {
2022-11-24 03:12:48 +00:00
/// Remote safety route used (or possibly node id the case of no safety route)
remote_safety_route: DHTKey,
2022-10-30 02:15:50 +00:00
/// The private route we received the rpc over
private_route: DHTKey,
2022-11-02 19:36:01 +00:00
// The safety spec for replying to this private routed rpc
safety_spec: SafetySpec,
2022-10-30 02:15:50 +00:00
}
#[derive(Debug, Clone)]
enum RPCMessageHeaderDetail {
Direct(RPCMessageHeaderDetailDirect),
2022-11-02 02:42:34 +00:00
SafetyRouted(RPCMessageHeaderDetailSafetyRouted),
PrivateRouted(RPCMessageHeaderDetailPrivateRouted),
2022-10-30 02:15:50 +00:00
}
/// The decoded header of an RPC message
#[derive(Debug, Clone)]
struct RPCMessageHeader {
/// Time the message was received, not sent
timestamp: u64,
/// The length in bytes of the rpc message body
body_len: u64,
/// The header detail depending on which way the message was received
detail: RPCMessageHeaderDetail,
2021-11-22 16:28:30 +00:00
}
2022-10-30 23:29:31 +00:00
impl RPCMessageHeader {}
2022-06-30 02:17:19 +00:00
#[derive(Debug)]
2022-11-04 23:29:44 +00:00
pub struct RPCMessageData {
2022-07-04 16:03:21 +00:00
contents: Vec<u8>, // rpc messages must be a canonicalized single segment
2021-11-22 16:28:30 +00:00
}
2022-11-04 23:29:44 +00:00
impl RPCMessageData {
pub fn new(contents: Vec<u8>) -> Self {
Self { contents }
}
2022-11-05 22:50:20 +00:00
pub fn get_reader(
&self,
) -> Result<capnp::message::Reader<capnp::serialize::OwnedSegments>, RPCError> {
capnp::serialize_packed::read_message(
self.contents.as_slice(),
capnp::message::ReaderOptions::new(),
)
.map_err(RPCError::protocol)
2021-11-22 16:28:30 +00:00
}
}
2022-11-05 22:50:20 +00:00
// impl ReaderSegments for RPCMessageData {
// fn get_segment(&self, idx: u32) -> Option<&[u8]> {
// if idx > 0 {
// None
// } else {
// Some(self.contents.as_slice())
// }
// }
// }
2021-11-22 16:28:30 +00:00
#[derive(Debug)]
2022-07-04 16:03:21 +00:00
struct RPCMessageEncoded {
2021-11-22 16:28:30 +00:00
header: RPCMessageHeader,
data: RPCMessageData,
}
2022-08-20 21:08:48 +00:00
#[derive(Debug)]
2022-07-05 03:09:15 +00:00
pub(crate) struct RPCMessage {
2021-11-22 16:28:30 +00:00
header: RPCMessageHeader,
2022-07-04 16:03:21 +00:00
operation: RPCOperation,
opt_sender_nr: Option<NodeRef>,
2021-11-22 16:28:30 +00:00
}
2022-10-21 03:11:41 +00:00
pub fn builder_to_vec<'a, T>(builder: capnp::message::Builder<T>) -> Result<Vec<u8>, RPCError>
2021-11-22 16:28:30 +00:00
where
T: capnp::message::Allocator + 'a,
{
2022-11-05 22:50:20 +00:00
let mut buffer = vec![];
capnp::serialize_packed::write_message(&mut buffer, &builder)
2022-07-10 21:36:50 +00:00
.map_err(RPCError::protocol)
2021-12-18 00:18:25 +00:00
.map_err(logthru_rpc!())?;
2022-11-05 22:50:20 +00:00
Ok(buffer)
// let wordvec = builder
// .into_reader()
// .canonicalize()
// .map_err(RPCError::protocol)
// .map_err(logthru_rpc!())?;
// Ok(capnp::Word::words_to_bytes(wordvec.as_slice()).to_vec())
2021-11-22 16:28:30 +00:00
}
2022-07-10 21:36:50 +00:00
// fn reader_to_vec<'a, T>(reader: &capnp::message::Reader<T>) -> Result<Vec<u8>, RPCError>
// where
// T: capnp::message::ReaderSegments + 'a,
// {
// let wordvec = reader
// .canonicalize()
// .map_err(RPCError::protocol)
// .map_err(logthru_rpc!())?;
// Ok(capnp::Word::words_to_bytes(wordvec.as_slice()).to_vec())
// }
2021-11-22 16:28:30 +00:00
#[derive(Debug)]
struct WaitableReply {
handle: OperationWaitHandle<RPCMessage>,
2021-11-22 16:28:30 +00:00
timeout: u64,
node_ref: NodeRef,
send_ts: u64,
2022-04-23 01:30:09 +00:00
send_data_kind: SendDataKind,
2022-11-24 03:12:48 +00:00
safety_route: Option<DHTKey>,
remote_private_route: Option<DHTKey>,
reply_private_route: Option<DHTKey>,
2021-11-22 16:28:30 +00:00
}
/////////////////////////////////////////////////////////////////////
#[derive(Clone, Debug, Default)]
2022-07-04 16:03:21 +00:00
pub struct Answer<T> {
pub latency: u64, // how long it took to get this answer
pub answer: T, // the answer itself
2021-11-22 16:28:30 +00:00
}
2022-07-04 16:03:21 +00:00
impl<T> Answer<T> {
pub fn new(latency: u64, answer: T) -> Self {
Self { latency, answer }
}
2021-11-22 16:28:30 +00:00
}
2022-12-08 15:24:33 +00:00
/// An operation that has been fully prepared for envelope r
2022-07-04 03:20:30 +00:00
struct RenderedOperation {
2022-12-08 15:24:33 +00:00
/// The rendered operation bytes
message: Vec<u8>,
/// Destination node id we're sending to
node_id: DHTKey,
/// Node to send envelope to (may not be destination node id in case of relay)
node_ref: NodeRef,
/// Total safety + private route hop count + 1 hop for the initial send
hop_count: usize,
/// The safety route used to send the message
safety_route: Option<DHTKey>,
/// The private route used to send the message
remote_private_route: Option<DHTKey>,
/// The private route requested to receive the reply
reply_private_route: Option<DHTKey>,
}
/// Node information exchanged during every RPC message
#[derive(Default, Debug, Clone)]
struct SenderSignedNodeInfo {
/// The current signed node info of the sender if required
signed_node_info: Option<SignedNodeInfo>,
/// The last timestamp of the target's node info to assist remote node with sending its latest node info
target_node_info_ts: u64,
}
impl SenderSignedNodeInfo {
pub fn new_no_sni(target_node_info_ts: u64) -> Self {
Self {
signed_node_info: None,
target_node_info_ts,
}
}
pub fn new(sender_signed_node_info: SignedNodeInfo, target_node_info_ts: u64) -> Self {
Self {
signed_node_info: Some(sender_signed_node_info),
target_node_info_ts,
}
}
2022-07-04 03:20:30 +00:00
}
2022-11-23 03:48:03 +00:00
#[derive(Copy, Clone, Debug)]
enum RPCKind {
Question,
Statement,
2022-11-24 03:12:48 +00:00
Answer,
2022-11-23 03:48:03 +00:00
}
2021-11-22 16:28:30 +00:00
/////////////////////////////////////////////////////////////////////
pub struct RPCProcessorInner {
send_channel: Option<flume::Sender<(Option<Id>, RPCMessageEncoded)>>,
stop_source: Option<StopSource>,
worker_join_handles: Vec<MustJoinHandle<()>>,
}
pub struct RPCProcessorUnlockedInner {
2021-11-22 16:28:30 +00:00
timeout: u64,
queue_size: u32,
concurrency: u32,
2021-11-22 16:28:30 +00:00
max_route_hop_count: usize,
validate_dial_info_receipt_time_ms: u32,
update_callback: UpdateCallback,
waiting_rpc_table: OperationWaiter<RPCMessage>,
waiting_app_call_table: OperationWaiter<Vec<u8>>,
2021-11-22 16:28:30 +00:00
}
#[derive(Clone)]
pub struct RPCProcessor {
crypto: Crypto,
config: VeilidConfig,
network_manager: NetworkManager,
routing_table: RoutingTable,
2021-11-22 16:28:30 +00:00
inner: Arc<Mutex<RPCProcessorInner>>,
unlocked_inner: Arc<RPCProcessorUnlockedInner>,
2021-11-22 16:28:30 +00:00
}
impl RPCProcessor {
fn new_inner() -> RPCProcessorInner {
2021-11-22 16:28:30 +00:00
RPCProcessorInner {
2021-12-17 02:57:28 +00:00
send_channel: None,
2022-06-13 00:58:02 +00:00
stop_source: None,
2021-11-22 16:28:30 +00:00
worker_join_handles: Vec::new(),
}
}
fn new_unlocked_inner(
config: VeilidConfig,
update_callback: UpdateCallback,
) -> RPCProcessorUnlockedInner {
// make local copy of node id for easy access
let c = config.get();
// set up channel
let mut concurrency = c.network.rpc.concurrency;
2022-10-21 03:11:41 +00:00
let queue_size = c.network.rpc.queue_size;
let timeout = ms_to_us(c.network.rpc.timeout_ms);
let max_route_hop_count = c.network.rpc.max_route_hop_count as usize;
if concurrency == 0 {
2022-11-27 02:37:23 +00:00
concurrency = get_concurrency() / 2;
if concurrency == 0 {
concurrency = 1;
}
}
let validate_dial_info_receipt_time_ms = c.network.dht.validate_dial_info_receipt_time_ms;
RPCProcessorUnlockedInner {
timeout,
queue_size,
concurrency,
max_route_hop_count,
validate_dial_info_receipt_time_ms,
update_callback,
waiting_rpc_table: OperationWaiter::new(),
waiting_app_call_table: OperationWaiter::new(),
}
}
pub fn new(network_manager: NetworkManager, update_callback: UpdateCallback) -> Self {
let config = network_manager.config();
2021-11-22 16:28:30 +00:00
Self {
crypto: network_manager.crypto(),
config: config.clone(),
network_manager: network_manager.clone(),
routing_table: network_manager.routing_table(),
inner: Arc::new(Mutex::new(Self::new_inner())),
unlocked_inner: Arc::new(Self::new_unlocked_inner(config, update_callback)),
2021-11-22 16:28:30 +00:00
}
}
pub fn network_manager(&self) -> NetworkManager {
self.network_manager.clone()
2021-11-22 16:28:30 +00:00
}
pub fn routing_table(&self) -> RoutingTable {
self.routing_table.clone()
2021-11-22 16:28:30 +00:00
}
//////////////////////////////////////////////////////////////////////
#[instrument(level = "debug", skip_all, err)]
pub async fn startup(&self) -> EyreResult<()> {
trace!("startup rpc processor");
let mut inner = self.inner.lock();
let channel = flume::bounded(self.unlocked_inner.queue_size as usize);
inner.send_channel = Some(channel.0.clone());
inner.stop_source = Some(StopSource::new());
// spin up N workers
trace!(
"Spinning up {} RPC workers",
self.unlocked_inner.concurrency
);
for _ in 0..self.unlocked_inner.concurrency {
let this = self.clone();
let receiver = channel.1.clone();
2022-11-27 02:37:23 +00:00
let jh = spawn(Self::rpc_worker(
this,
inner.stop_source.as_ref().unwrap().token(),
receiver,
));
inner.worker_join_handles.push(jh);
}
Ok(())
2021-11-22 16:28:30 +00:00
}
#[instrument(level = "debug", skip_all)]
pub async fn shutdown(&self) {
debug!("starting rpc processor shutdown");
// Stop the rpc workers
let mut unord = FuturesUnordered::new();
{
let mut inner = self.inner.lock();
// take the join handles out
for h in inner.worker_join_handles.drain(..) {
unord.push(h);
}
// drop the stop
drop(inner.stop_source.take());
}
debug!("stopping {} rpc worker tasks", unord.len());
// Wait for them to complete
while unord.next().await.is_some() {}
debug!("resetting rpc processor state");
// Release the rpc processor
*self.inner.lock() = Self::new_inner();
debug!("finished rpc processor shutdown");
2021-11-22 16:28:30 +00:00
}
//////////////////////////////////////////////////////////////////////
2022-11-10 03:27:37 +00:00
/// Determine if a SignedNodeInfo can be placed into the specified routing domain
fn filter_node_info(
&self,
routing_domain: RoutingDomain,
signed_node_info: &SignedNodeInfo,
) -> bool {
2022-09-04 18:17:28 +00:00
let routing_table = self.routing_table();
2022-11-10 03:27:37 +00:00
routing_table.signed_node_info_is_valid_in_routing_domain(routing_domain, &signed_node_info)
2021-12-27 16:31:31 +00:00
}
2021-11-22 16:28:30 +00:00
//////////////////////////////////////////////////////////////////////
2022-08-27 02:52:08 +00:00
/// Search the DHT for a single node closest to a key and add it to the routing table and return the node reference
/// If no node was found in the timeout, this returns None
2021-11-22 16:28:30 +00:00
pub async fn search_dht_single_key(
&self,
2022-07-10 21:36:50 +00:00
_node_id: DHTKey,
2021-11-22 16:28:30 +00:00
_count: u32,
_fanout: u32,
_timeout: Option<u64>,
2022-07-20 13:39:38 +00:00
) -> Result<Option<NodeRef>, RPCError> {
2022-07-10 21:36:50 +00:00
//let routing_table = self.routing_table();
2021-11-22 16:28:30 +00:00
// xxx find node but stop if we find the exact node we want
// xxx return whatever node is closest after the timeout
2022-07-10 21:36:50 +00:00
Err(RPCError::unimplemented("search_dht_single_key")).map_err(logthru_rpc!(error))
2021-11-22 16:28:30 +00:00
}
2022-08-27 02:52:08 +00:00
/// Search the DHT for the 'count' closest nodes to a key, adding them all to the routing table if they are not there and returning their node references
2021-11-22 16:28:30 +00:00
pub async fn search_dht_multi_key(
&self,
2022-05-31 23:54:52 +00:00
_node_id: DHTKey,
2021-11-22 16:28:30 +00:00
_count: u32,
_fanout: u32,
_timeout: Option<u64>,
) -> Result<Vec<NodeRef>, RPCError> {
// xxx return closest nodes after the timeout
2022-07-10 21:36:50 +00:00
Err(RPCError::unimplemented("search_dht_multi_key")).map_err(logthru_rpc!(error))
2021-11-22 16:28:30 +00:00
}
2022-08-27 02:52:08 +00:00
/// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference
/// Note: This routine can possible be recursive, hence the SendPinBoxFuture async form
2022-07-20 13:39:38 +00:00
pub fn resolve_node(
&self,
node_id: DHTKey,
) -> SendPinBoxFuture<Result<Option<NodeRef>, RPCError>> {
2022-03-27 01:25:24 +00:00
let this = self.clone();
Box::pin(async move {
let routing_table = this.routing_table();
// First see if we have the node in our routing table already
if let Some(nr) = routing_table.lookup_node_ref(node_id) {
2022-04-16 15:18:54 +00:00
// ensure we have some dial info for the entry already,
2022-03-27 01:25:24 +00:00
// if not, we should do the find_node anyway
2022-04-17 17:28:39 +00:00
if nr.has_any_dial_info() {
2022-07-20 13:39:38 +00:00
return Ok(Some(nr));
2022-03-27 01:25:24 +00:00
}
}
2021-11-22 16:28:30 +00:00
2022-03-27 01:25:24 +00:00
// If nobody knows where this node is, ask the DHT for it
let (count, fanout, timeout) = {
let c = this.config.get();
(
c.network.dht.resolve_node_count,
c.network.dht.resolve_node_fanout,
c.network.dht.resolve_node_timeout_ms.map(ms_to_us),
)
};
let nr = this
.search_dht_single_key(node_id, count, fanout, timeout)
.await?;
2021-11-22 16:28:30 +00:00
2022-07-20 13:39:38 +00:00
if let Some(nr) = &nr {
if nr.node_id() != node_id {
// found a close node, but not exact within our configured resolve_node timeout
return Ok(None);
}
2022-03-27 01:25:24 +00:00
}
Ok(nr)
})
2021-11-22 16:28:30 +00:00
}
2022-07-20 13:39:38 +00:00
#[instrument(level = "trace", skip(self, waitable_reply), err)]
2021-11-22 16:28:30 +00:00
async fn wait_for_reply(
&self,
waitable_reply: WaitableReply,
2022-07-20 13:39:38 +00:00
) -> Result<TimeoutOr<(RPCMessage, u64)>, RPCError> {
let out = self
.unlocked_inner
.waiting_rpc_table
.wait_for_op(waitable_reply.handle, waitable_reply.timeout)
.await;
2021-11-22 16:28:30 +00:00
match &out {
2022-07-20 13:39:38 +00:00
Err(_) | Ok(TimeoutOr::Timeout) => {
2022-11-24 03:12:48 +00:00
self.record_question_lost(
waitable_reply.send_ts,
waitable_reply.node_ref.clone(),
waitable_reply.safety_route,
waitable_reply.remote_private_route,
waitable_reply.reply_private_route,
);
2021-11-22 16:28:30 +00:00
}
2022-07-20 13:39:38 +00:00
Ok(TimeoutOr::Value((rpcreader, _))) => {
2021-11-22 16:28:30 +00:00
// Reply received
2022-11-27 02:37:23 +00:00
let recv_ts = get_timestamp();
2022-11-24 03:12:48 +00:00
// Record answer received
self.record_answer_received(
2022-04-18 22:49:33 +00:00
waitable_reply.send_ts,
recv_ts,
rpcreader.header.body_len,
2022-11-24 03:12:48 +00:00
waitable_reply.node_ref.clone(),
waitable_reply.safety_route,
waitable_reply.remote_private_route,
waitable_reply.reply_private_route,
)
2021-11-22 16:28:30 +00:00
}
};
out
}
2022-12-08 15:24:33 +00:00
/// Wrap an operation with a private route inside a safety route
2022-11-02 01:05:48 +00:00
fn wrap_with_route(
2022-10-14 02:05:43 +00:00
&self,
2022-10-22 01:27:07 +00:00
safety_selection: SafetySelection,
2022-11-24 03:12:48 +00:00
remote_private_route: PrivateRoute,
reply_private_route: Option<DHTKey>,
2022-10-14 02:05:43 +00:00
message_data: Vec<u8>,
2022-10-20 19:09:04 +00:00
) -> Result<NetworkResult<RenderedOperation>, RPCError> {
let routing_table = self.routing_table();
2022-10-31 03:23:12 +00:00
let rss = routing_table.route_spec_store();
2022-11-24 03:12:48 +00:00
let pr_is_stub = remote_private_route.is_stub();
let pr_hop_count = remote_private_route.hop_count;
let pr_pubkey = remote_private_route.public_key;
2022-10-21 03:11:41 +00:00
2022-10-31 03:23:12 +00:00
// Compile the safety route with the private route
let compiled_route: CompiledRoute = match rss
2022-11-24 03:12:48 +00:00
.compile_safety_route(safety_selection, remote_private_route)
2022-10-31 03:23:12 +00:00
.map_err(RPCError::internal)?
{
Some(cr) => cr,
None => {
return Ok(NetworkResult::no_connection_other(
"private route could not be compiled at this time",
))
}
};
2022-11-24 03:12:48 +00:00
let sr_is_stub = compiled_route.safety_route.is_stub();
let sr_pubkey = compiled_route.safety_route.public_key;
2022-10-14 02:05:43 +00:00
// Encrypt routed operation
// Xmsg + ENC(Xmsg, DH(PKapr, SKbsr))
2022-10-30 23:29:31 +00:00
// xxx use factory method, get version from somewhere...
2022-10-14 02:05:43 +00:00
let nonce = Crypto::get_random_nonce();
let dh_secret = self
.crypto
2022-10-21 03:11:41 +00:00
.cached_dh(&pr_pubkey, &compiled_route.secret)
2022-10-14 02:05:43 +00:00
.map_err(RPCError::map_internal("dh failed"))?;
let enc_msg_data = Crypto::encrypt_aead(&message_data, &nonce, &dh_secret, None)
.map_err(RPCError::map_internal("encryption failed"))?;
// Make the routed operation
2022-10-30 23:29:31 +00:00
// xxx: replace MAX_CRYPTO_VERSION with the version from the factory
let operation = RoutedOperation::new(MAX_CRYPTO_VERSION, nonce, enc_msg_data);
2022-10-14 02:05:43 +00:00
// Prepare route operation
2022-10-21 03:11:41 +00:00
let sr_hop_count = compiled_route.safety_route.hop_count;
2022-10-20 19:09:04 +00:00
let route_operation = RPCOperationRoute {
safety_route: compiled_route.safety_route,
2022-10-14 02:05:43 +00:00
operation,
};
2022-12-08 15:24:33 +00:00
let ssni_route =
self.get_sender_signed_node_info(&Destination::direct(compiled_route.first_hop))?;
2022-10-20 19:09:04 +00:00
let operation = RPCOperation::new_statement(
RPCStatement::new(RPCStatementDetail::Route(route_operation)),
2022-12-08 15:24:33 +00:00
ssni_route,
2022-10-20 19:09:04 +00:00
);
2022-10-14 02:05:43 +00:00
// Convert message to bytes and return it
let mut route_msg = ::capnp::message::Builder::new_default();
let mut route_operation = route_msg.init_root::<veilid_capnp::operation::Builder>();
operation.encode(&mut route_operation)?;
2022-10-20 19:09:04 +00:00
let out_message = builder_to_vec(route_msg)?;
// Get the first hop this is going to
2022-10-21 03:11:41 +00:00
let out_node_id = compiled_route.first_hop.node_id();
let out_hop_count = (1 + sr_hop_count + pr_hop_count) as usize;
2022-10-20 19:09:04 +00:00
let out = RenderedOperation {
message: out_message,
node_id: out_node_id,
node_ref: compiled_route.first_hop,
hop_count: out_hop_count,
2022-11-24 03:12:48 +00:00
safety_route: if sr_is_stub { None } else { Some(sr_pubkey) },
remote_private_route: if pr_is_stub { None } else { Some(pr_pubkey) },
reply_private_route,
2022-10-20 19:09:04 +00:00
};
2022-10-14 02:05:43 +00:00
2022-10-20 19:09:04 +00:00
Ok(NetworkResult::value(out))
2022-10-14 02:05:43 +00:00
}
2022-08-27 02:52:08 +00:00
/// Produce a byte buffer that represents the wire encoding of the entire
/// unencrypted envelope body for a RPC message. This incorporates
/// wrapping a private and/or safety route if they are specified.
2022-09-01 01:41:48 +00:00
#[instrument(level = "debug", skip(self, operation), err)]
2022-07-04 16:03:21 +00:00
fn render_operation(
&self,
2022-07-04 21:58:26 +00:00
dest: Destination,
2022-07-04 16:03:21 +00:00
operation: &RPCOperation,
2022-10-20 19:09:04 +00:00
) -> Result<NetworkResult<RenderedOperation>, RPCError> {
let out: NetworkResult<RenderedOperation>;
2022-07-05 02:44:04 +00:00
2022-07-04 03:20:30 +00:00
// Encode message to a builder and make a message reader for it
2022-07-05 02:44:04 +00:00
// Then produce the message as an unencrypted byte buffer
2022-10-14 02:05:43 +00:00
let message = {
2022-07-05 02:44:04 +00:00
let mut msg_builder = ::capnp::message::Builder::new_default();
let mut op_builder = msg_builder.init_root::<veilid_capnp::operation::Builder>();
operation.encode(&mut op_builder)?;
builder_to_vec(msg_builder)?
};
2021-11-22 16:28:30 +00:00
2022-11-24 03:12:48 +00:00
// Get reply private route if we are asking for one to be used in our 'respond to'
let reply_private_route = match operation.kind() {
RPCOperationKind::Question(q) => match q.respond_to() {
RespondTo::Sender => None,
RespondTo::PrivateRoute(pr) => Some(pr.public_key),
},
RPCOperationKind::Statement(_) | RPCOperationKind::Answer(_) => None,
};
2022-07-05 02:44:04 +00:00
// To where are we sending the request
match dest {
2022-09-03 17:57:25 +00:00
Destination::Direct {
2022-09-04 19:40:35 +00:00
target: ref node_ref,
2022-10-22 01:27:07 +00:00
safety_selection,
2022-09-03 17:57:25 +00:00
}
| Destination::Relay {
2022-09-04 19:40:35 +00:00
relay: ref node_ref,
2022-09-03 17:57:25 +00:00
target: _,
2022-10-22 01:27:07 +00:00
safety_selection,
2022-09-03 17:57:25 +00:00
} => {
2022-07-05 02:44:04 +00:00
// Send to a node without a private route
// --------------------------------------
// Get the actual destination node id accounting for relays
2022-09-03 17:57:25 +00:00
let (node_ref, node_id) = if let Destination::Relay {
relay: _,
2022-09-04 19:40:35 +00:00
target: ref dht_key,
2022-10-22 01:27:07 +00:00
safety_selection: _,
2022-09-03 17:57:25 +00:00
} = dest
{
2022-07-05 02:44:04 +00:00
(node_ref.clone(), dht_key.clone())
} else {
let node_id = node_ref.node_id();
(node_ref.clone(), node_id)
};
// Handle the existence of safety route
2022-10-22 01:27:07 +00:00
match safety_selection {
SafetySelection::Unsafe(sequencing) => {
// Apply safety selection sequencing requirement if it is more strict than the node_ref's sequencing requirement
2022-11-02 01:05:48 +00:00
let mut node_ref = node_ref.clone();
2022-10-22 01:27:07 +00:00
if sequencing > node_ref.sequencing() {
node_ref.set_sequencing(sequencing)
}
2022-11-24 03:12:48 +00:00
// Reply private route should be None here, even for questions
assert!(reply_private_route.is_none());
2022-07-05 02:44:04 +00:00
// If no safety route is being used, and we're not sending to a private
// route, we can use a direct envelope instead of routing
2022-10-20 19:09:04 +00:00
out = NetworkResult::value(RenderedOperation {
2022-10-14 02:05:43 +00:00
message,
node_id,
node_ref,
hop_count: 1,
2022-11-23 03:48:03 +00:00
safety_route: None,
2022-11-24 03:12:48 +00:00
remote_private_route: None,
reply_private_route: None,
2022-10-20 19:09:04 +00:00
});
2022-07-05 02:44:04 +00:00
}
2022-10-22 01:27:07 +00:00
SafetySelection::Safe(_) => {
2022-07-05 02:44:04 +00:00
// No private route was specified for the request
// but we are using a safety route, so we must create an empty private route
2022-10-30 02:15:50 +00:00
let peer_info = match node_ref.make_peer_info(RoutingDomain::PublicInternet)
{
None => {
return Ok(NetworkResult::no_connection_other(
"No PublicInternet peer info for stub private route",
))
}
Some(pi) => pi,
};
let private_route =
PrivateRoute::new_stub(node_id, RouteNode::PeerInfo(peer_info));
2022-07-05 02:44:04 +00:00
2022-10-14 02:05:43 +00:00
// Wrap with safety route
2022-11-24 03:12:48 +00:00
out = self.wrap_with_route(
safety_selection,
private_route,
reply_private_route,
message,
)?;
2022-07-05 02:44:04 +00:00
}
};
}
2022-09-03 17:57:25 +00:00
Destination::PrivateRoute {
private_route,
2022-10-22 01:27:07 +00:00
safety_selection,
2022-09-03 17:57:25 +00:00
} => {
2022-07-05 02:44:04 +00:00
// Send to private route
// ---------------------
// Reply with 'route' operation
2022-11-24 03:12:48 +00:00
out = self.wrap_with_route(
safety_selection,
private_route,
reply_private_route,
message,
)?;
2021-11-22 16:28:30 +00:00
}
2022-07-05 02:44:04 +00:00
}
2021-11-22 16:28:30 +00:00
2022-10-14 02:05:43 +00:00
Ok(out)
2022-07-04 03:20:30 +00:00
}
2022-12-08 15:24:33 +00:00
/// Get signed node info to package with RPC messages to improve
/// routing table caching when it is okay to do so
#[instrument(skip(self), ret, err)]
fn get_sender_signed_node_info(
&self,
dest: &Destination,
) -> Result<SenderSignedNodeInfo, RPCError> {
2022-09-04 18:17:28 +00:00
// Don't do this if the sender is to remain private
2022-10-21 03:11:41 +00:00
// Otherwise we would be attaching the original sender's identity to the final destination,
// thus defeating the purpose of the safety route entirely :P
2022-10-22 01:27:07 +00:00
match dest.get_safety_selection() {
SafetySelection::Unsafe(_) => {}
SafetySelection::Safe(_) => {
2022-12-08 15:24:33 +00:00
return Ok(SenderSignedNodeInfo::default());
2022-10-22 01:27:07 +00:00
}
2022-09-04 18:17:28 +00:00
}
2022-12-08 15:24:33 +00:00
// Get the target we're sending to
let routing_table = self.routing_table();
let target = match dest {
2022-09-04 18:17:28 +00:00
Destination::Direct {
target,
2022-10-22 01:27:07 +00:00
safety_selection: _,
2022-12-08 15:24:33 +00:00
} => target.clone(),
2022-09-04 18:17:28 +00:00
Destination::Relay {
relay: _,
target,
2022-10-22 01:27:07 +00:00
safety_selection: _,
2022-09-04 18:17:28 +00:00
} => {
if let Some(target) = routing_table.lookup_node_ref(*target) {
2022-12-08 15:24:33 +00:00
target
2022-09-04 18:17:28 +00:00
} else {
2022-12-08 15:24:33 +00:00
// Target was not in our routing table
return Ok(SenderSignedNodeInfo::default());
2022-09-04 18:17:28 +00:00
}
}
Destination::PrivateRoute {
private_route: _,
2022-10-22 01:27:07 +00:00
safety_selection: _,
2022-12-08 15:24:33 +00:00
} => {
return Ok(SenderSignedNodeInfo::default());
}
};
let Some(routing_domain) = target.best_routing_domain() else {
// No routing domain for target?
return Err(RPCError::internal(format!("No routing domain for target: {}", target)));
};
// Get the target's node info timestamp
let target_node_info_ts = target.node_info_ts(routing_domain);
// Don't return our node info if it's not valid yet
let Some(own_peer_info) = routing_table.get_own_peer_info(routing_domain) else {
return Ok(SenderSignedNodeInfo::new_no_sni(target_node_info_ts));
};
// Get our node info timestamp
let our_node_info_ts = own_peer_info.signed_node_info.timestamp();
// If the target has seen our node info already don't send it again
if target.has_seen_our_node_info_ts(routing_domain, our_node_info_ts) {
return Ok(SenderSignedNodeInfo::new_no_sni(target_node_info_ts));
2022-09-04 18:17:28 +00:00
}
2022-12-08 15:24:33 +00:00
Ok(SenderSignedNodeInfo::new(
own_peer_info.signed_node_info,
target_node_info_ts,
))
2022-09-04 18:17:28 +00:00
}
2022-11-23 03:48:03 +00:00
/// Record failure to send to node or route
2022-11-24 03:12:48 +00:00
fn record_send_failure(
&self,
rpc_kind: RPCKind,
send_ts: u64,
node_ref: NodeRef,
safety_route: Option<DHTKey>,
remote_private_route: Option<DHTKey>,
) {
let wants_answer = matches!(rpc_kind, RPCKind::Question);
// Record for node if this was not sent via a route
if safety_route.is_none() && remote_private_route.is_none() {
node_ref.stats_failed_to_send(send_ts, wants_answer);
return;
}
// If safety route was in use, record failure to send there
if let Some(sr_pubkey) = &safety_route {
let rss = self.routing_table.route_spec_store();
rss.with_route_stats(send_ts, sr_pubkey, |s| s.record_send_failed());
} else {
// If no safety route was in use, then it's the private route's fault if we have one
if let Some(pr_pubkey) = &remote_private_route {
let rss = self.routing_table.route_spec_store();
rss.with_route_stats(send_ts, pr_pubkey, |s| s.record_send_failed());
}
}
}
/// Record question lost to node or route
fn record_question_lost(
&self,
send_ts: u64,
node_ref: NodeRef,
safety_route: Option<DHTKey>,
remote_private_route: Option<DHTKey>,
private_route: Option<DHTKey>,
) {
// Record for node if this was not sent via a route
if safety_route.is_none() && remote_private_route.is_none() {
node_ref.stats_question_lost();
return;
}
// Get route spec store
let rss = self.routing_table.route_spec_store();
// If safety route was used, record question lost there
if let Some(sr_pubkey) = &safety_route {
let rss = self.routing_table.route_spec_store();
rss.with_route_stats(send_ts, sr_pubkey, |s| {
s.record_question_lost();
});
}
// If remote private route was used, record question lost there
if let Some(rpr_pubkey) = &remote_private_route {
rss.with_route_stats(send_ts, rpr_pubkey, |s| {
s.record_question_lost();
});
}
// If private route was used, record question lost there
if let Some(pr_pubkey) = &private_route {
rss.with_route_stats(send_ts, pr_pubkey, |s| {
s.record_question_lost();
});
}
2022-11-23 03:48:03 +00:00
}
/// Record success sending to node or route
2022-11-24 03:12:48 +00:00
fn record_send_success(
&self,
rpc_kind: RPCKind,
send_ts: u64,
bytes: u64,
node_ref: NodeRef,
safety_route: Option<DHTKey>,
remote_private_route: Option<DHTKey>,
) {
let wants_answer = matches!(rpc_kind, RPCKind::Question);
// Record for node if this was not sent via a route
if safety_route.is_none() && remote_private_route.is_none() {
node_ref.stats_question_sent(send_ts, bytes, wants_answer);
return;
}
// Get route spec store
let rss = self.routing_table.route_spec_store();
// If safety route was used, record send there
if let Some(sr_pubkey) = &safety_route {
rss.with_route_stats(send_ts, sr_pubkey, |s| {
s.record_sent(send_ts, bytes);
});
}
// If remote private route was used, record send there
if let Some(pr_pubkey) = &remote_private_route {
let rss = self.routing_table.route_spec_store();
rss.with_route_stats(send_ts, pr_pubkey, |s| {
s.record_sent(send_ts, bytes);
});
}
}
/// Record answer received from node or route
fn record_answer_received(
&self,
send_ts: u64,
recv_ts: u64,
bytes: u64,
node_ref: NodeRef,
safety_route: Option<DHTKey>,
remote_private_route: Option<DHTKey>,
reply_private_route: Option<DHTKey>,
) {
// Record stats for remote node if this was direct
if safety_route.is_none() && remote_private_route.is_none() && reply_private_route.is_none()
{
node_ref.stats_answer_rcvd(send_ts, recv_ts, bytes);
return;
}
// Get route spec store
let rss = self.routing_table.route_spec_store();
// Get latency for all local routes
let mut total_local_latency = 0u64;
let total_latency = recv_ts.saturating_sub(send_ts);
// If safety route was used, record route there
if let Some(sr_pubkey) = &safety_route {
rss.with_route_stats(send_ts, sr_pubkey, |s| {
// If we received an answer, the safety route we sent over can be considered tested
s.record_tested(recv_ts);
// If we used a safety route to send, use our last tested latency
total_local_latency += s.latency_stats().average
});
}
// If local private route was used, record route there
if let Some(pr_pubkey) = &reply_private_route {
rss.with_route_stats(send_ts, pr_pubkey, |s| {
// Record received bytes
s.record_received(recv_ts, bytes);
// If we used a private route to receive, use our last tested latency
total_local_latency += s.latency_stats().average
});
}
// If remote private route was used, record there
if let Some(rpr_pubkey) = &remote_private_route {
rss.with_route_stats(send_ts, rpr_pubkey, |s| {
// Record received bytes
s.record_received(recv_ts, bytes);
// The remote route latency is recorded using the total latency minus the total local latency
let remote_latency = total_latency.saturating_sub(total_local_latency);
s.record_latency(remote_latency);
});
// If we sent to a private route without a safety route
// We need to mark our own node info as having been seen so we can optimize sending it
if let Err(e) = rss.mark_remote_private_route_seen_our_node_info(&rpr_pubkey, recv_ts) {
log_rpc!(error "private route missing: {}", e);
}
// We can't record local route latency if a remote private route was used because
// there is no way other than the prior latency estimation to determine how much time was spent
// in the remote private route
// Instead, we rely on local route testing to give us latency numbers for our local routes
} else {
// If no remote private route was used, then record half the total latency on our local routes
// This is fine because if we sent with a local safety route,
// then we must have received with a local private route too, per the design rules
if let Some(sr_pubkey) = &safety_route {
let rss = self.routing_table.route_spec_store();
rss.with_route_stats(send_ts, sr_pubkey, |s| {
s.record_latency(total_latency / 2);
});
}
if let Some(pr_pubkey) = &reply_private_route {
rss.with_route_stats(send_ts, pr_pubkey, |s| {
s.record_latency(total_latency / 2);
});
}
}
}
/// Record question or statement received from node or route
fn record_question_received(&self, msg: &RPCMessage) {
let recv_ts = msg.header.timestamp;
let bytes = msg.header.body_len;
// Process messages based on how they were received
match &msg.header.detail {
// Process direct messages
RPCMessageHeaderDetail::Direct(_) => {
if let Some(sender_nr) = msg.opt_sender_nr.clone() {
sender_nr.stats_question_rcvd(recv_ts, bytes);
return;
}
}
// Process messages that arrived with no private route (private route stub)
RPCMessageHeaderDetail::SafetyRouted(d) => {
let rss = self.routing_table.route_spec_store();
// This may record nothing if the remote safety route is not also
// a remote private route that been imported, but that's okay
rss.with_route_stats(recv_ts, &d.remote_safety_route, |s| {
s.record_received(recv_ts, bytes);
});
}
// Process messages that arrived to our private route
RPCMessageHeaderDetail::PrivateRouted(d) => {
let rss = self.routing_table.route_spec_store();
// This may record nothing if the remote safety route is not also
// a remote private route that been imported, but that's okay
// it could also be a node id if no remote safety route was used
// in which case this also will do nothing
rss.with_route_stats(recv_ts, &d.remote_safety_route, |s| {
s.record_received(recv_ts, bytes);
});
// Record for our local private route we received over
rss.with_route_stats(recv_ts, &d.private_route, |s| {
s.record_received(recv_ts, bytes);
});
}
}
2022-11-23 03:48:03 +00:00
}
/// Issue a question over the network, possibly using an anonymized route
2022-09-03 17:57:25 +00:00
#[instrument(level = "debug", skip(self, question), err)]
2022-07-04 03:20:30 +00:00
async fn question(
&self,
dest: Destination,
question: RPCQuestion,
2022-07-20 13:39:38 +00:00
) -> Result<NetworkResult<WaitableReply>, RPCError> {
2022-12-08 15:24:33 +00:00
// Get sender signed node info if we should send that
let ssni = self.get_sender_signed_node_info(&dest)?;
2022-09-03 17:57:25 +00:00
2022-07-04 03:20:30 +00:00
// Wrap question in operation
2022-12-08 15:24:33 +00:00
let operation = RPCOperation::new_question(question, ssni);
2022-07-04 21:58:26 +00:00
let op_id = operation.op_id();
// Log rpc send
2022-11-09 22:11:35 +00:00
trace!(target: "rpc_message", dir = "send", kind = "question", op_id, desc = operation.kind().desc(), ?dest);
2022-07-04 03:20:30 +00:00
// Produce rendered operation
let RenderedOperation {
2022-07-05 02:44:04 +00:00
message,
node_id,
node_ref,
hop_count,
2022-11-23 03:48:03 +00:00
safety_route,
2022-11-24 03:12:48 +00:00
remote_private_route,
reply_private_route,
2022-11-22 03:50:42 +00:00
} = network_result_try!(self.render_operation(dest.clone(), &operation)?);
2022-07-04 03:20:30 +00:00
2022-07-05 02:44:04 +00:00
// Calculate answer timeout
// Timeout is number of hops times the timeout per hop
let timeout = self.unlocked_inner.timeout * (hop_count as u64);
2022-07-05 02:44:04 +00:00
2022-07-04 03:20:30 +00:00
// Set up op id eventual
let handle = self.unlocked_inner.waiting_rpc_table.add_op_waiter(op_id);
2022-07-03 19:52:27 +00:00
2022-07-04 03:20:30 +00:00
// Send question
2022-07-05 02:44:04 +00:00
let bytes = message.len() as u64;
2022-11-27 02:37:23 +00:00
let send_ts = get_timestamp();
2022-07-20 13:39:38 +00:00
let send_data_kind = network_result_try!(self
2021-11-22 16:28:30 +00:00
.network_manager()
2022-07-05 02:44:04 +00:00
.send_envelope(node_ref.clone(), Some(node_id), message)
2021-11-22 16:28:30 +00:00
.await
2022-08-20 01:10:47 +00:00
.map_err(|e| {
// If we're returning an error, clean up
2022-11-24 03:12:48 +00:00
self.record_send_failure(RPCKind::Question, send_ts, node_ref.clone(), safety_route, remote_private_route);
2022-08-31 01:21:16 +00:00
RPCError::network(e)
})? => {
2022-08-20 01:10:47 +00:00
// If we couldn't send we're still cleaning up
2022-11-24 03:12:48 +00:00
self.record_send_failure(RPCKind::Question, send_ts, node_ref.clone(), safety_route, remote_private_route);
2021-11-22 16:28:30 +00:00
}
2022-07-20 13:39:38 +00:00
);
2021-11-22 16:28:30 +00:00
// Successfully sent
2022-11-24 03:12:48 +00:00
self.record_send_success(
RPCKind::Question,
send_ts,
bytes,
node_ref.clone(),
safety_route,
remote_private_route,
);
2022-11-23 03:48:03 +00:00
2021-11-22 16:28:30 +00:00
// Pass back waitable reply completion
2022-07-20 13:39:38 +00:00
Ok(NetworkResult::value(WaitableReply {
handle,
2022-07-04 03:20:30 +00:00
timeout,
node_ref,
send_ts,
send_data_kind,
2022-11-24 03:12:48 +00:00
safety_route,
remote_private_route,
reply_private_route,
2022-07-20 13:39:38 +00:00
}))
2021-11-22 16:28:30 +00:00
}
2022-07-04 16:03:21 +00:00
// Issue a statement over the network, possibly using an anonymized route
2022-09-04 18:17:28 +00:00
#[instrument(level = "debug", skip(self, statement), err)]
2022-07-04 16:03:21 +00:00
async fn statement(
2021-11-22 16:28:30 +00:00
&self,
2022-07-04 16:03:21 +00:00
dest: Destination,
statement: RPCStatement,
2022-07-20 13:39:38 +00:00
) -> Result<NetworkResult<()>, RPCError> {
2022-12-08 15:24:33 +00:00
// Get sender signed node info if we should send that
let ssni = self.get_sender_signed_node_info(&dest)?;
2022-09-04 18:17:28 +00:00
2022-07-04 16:03:21 +00:00
// Wrap statement in operation
2022-12-08 15:24:33 +00:00
let operation = RPCOperation::new_statement(statement, ssni);
2021-11-22 16:28:30 +00:00
2022-07-04 21:58:26 +00:00
// Log rpc send
2022-11-09 22:11:35 +00:00
trace!(target: "rpc_message", dir = "send", kind = "statement", op_id = operation.op_id(), desc = operation.kind().desc(), ?dest);
2022-07-04 21:58:26 +00:00
2022-07-04 16:03:21 +00:00
// Produce rendered operation
let RenderedOperation {
2022-07-05 02:44:04 +00:00
message,
node_id,
node_ref,
2022-07-05 03:09:15 +00:00
hop_count: _,
2022-11-23 03:48:03 +00:00
safety_route,
2022-11-24 03:12:48 +00:00
remote_private_route,
reply_private_route: _,
2022-10-20 19:09:04 +00:00
} = network_result_try!(self.render_operation(dest, &operation)?);
2021-11-22 16:28:30 +00:00
2022-07-04 16:03:21 +00:00
// Send statement
2022-07-05 02:44:04 +00:00
let bytes = message.len() as u64;
2022-11-27 02:37:23 +00:00
let send_ts = get_timestamp();
2022-07-20 13:39:38 +00:00
let _send_data_kind = network_result_try!(self
2022-07-04 16:03:21 +00:00
.network_manager()
2022-07-05 02:44:04 +00:00
.send_envelope(node_ref.clone(), Some(node_id), message)
2022-07-04 16:03:21 +00:00
.await
2022-08-20 01:10:47 +00:00
.map_err(|e| {
// If we're returning an error, clean up
2022-11-24 03:12:48 +00:00
self.record_send_failure(RPCKind::Statement, send_ts, node_ref.clone(), safety_route, remote_private_route);
2022-08-31 01:21:16 +00:00
RPCError::network(e)
})? => {
2022-08-20 01:10:47 +00:00
// If we couldn't send we're still cleaning up
2022-11-24 03:12:48 +00:00
self.record_send_failure(RPCKind::Statement, send_ts, node_ref.clone(), safety_route, remote_private_route);
2021-11-22 16:28:30 +00:00
}
2022-07-20 13:39:38 +00:00
);
2021-11-22 16:28:30 +00:00
2022-07-04 16:03:21 +00:00
// Successfully sent
2022-11-24 03:12:48 +00:00
self.record_send_success(
RPCKind::Statement,
send_ts,
bytes,
node_ref,
safety_route,
remote_private_route,
);
2022-07-04 16:03:21 +00:00
2022-07-20 13:39:38 +00:00
Ok(NetworkResult::value(()))
2022-07-04 16:03:21 +00:00
}
// Issue a reply over the network, possibly using an anonymized route
// The request must want a response, or this routine fails
2022-09-04 18:17:28 +00:00
#[instrument(level = "debug", skip(self, request, answer), err)]
2022-07-04 16:03:21 +00:00
async fn answer(
&self,
request: RPCMessage,
answer: RPCAnswer,
2022-07-20 13:39:38 +00:00
) -> Result<NetworkResult<()>, RPCError> {
2022-07-04 16:03:21 +00:00
// Extract destination from respond_to
2022-10-30 02:15:50 +00:00
let dest = network_result_try!(self.get_respond_to_destination(&request));
2022-07-04 16:03:21 +00:00
2022-12-08 15:24:33 +00:00
// Get sender signed node info if we should send that
let ssni = self.get_sender_signed_node_info(&dest)?;
2022-09-04 18:17:28 +00:00
// Wrap answer in operation
2022-12-08 15:24:33 +00:00
let operation = RPCOperation::new_answer(&request.operation, answer, ssni);
2022-09-04 18:17:28 +00:00
2022-07-04 21:58:26 +00:00
// Log rpc send
2022-11-09 22:11:35 +00:00
trace!(target: "rpc_message", dir = "send", kind = "answer", op_id = operation.op_id(), desc = operation.kind().desc(), ?dest);
2022-07-04 21:58:26 +00:00
2022-07-04 16:03:21 +00:00
// Produce rendered operation
let RenderedOperation {
2022-07-05 02:44:04 +00:00
message,
node_id,
node_ref,
2022-07-05 03:09:15 +00:00
hop_count: _,
2022-11-23 03:48:03 +00:00
safety_route,
2022-11-24 03:12:48 +00:00
remote_private_route,
reply_private_route: _,
2022-10-20 19:09:04 +00:00
} = network_result_try!(self.render_operation(dest, &operation)?);
2022-07-04 16:03:21 +00:00
2021-11-22 16:28:30 +00:00
// Send the reply
2022-07-05 02:44:04 +00:00
let bytes = message.len() as u64;
2022-11-27 02:37:23 +00:00
let send_ts = get_timestamp();
2022-07-20 13:39:38 +00:00
network_result_try!(self.network_manager()
2022-07-05 02:44:04 +00:00
.send_envelope(node_ref.clone(), Some(node_id), message)
2021-11-22 16:28:30 +00:00
.await
2022-08-20 01:10:47 +00:00
.map_err(|e| {
// If we're returning an error, clean up
2022-11-24 03:12:48 +00:00
self.record_send_failure(RPCKind::Answer, send_ts, node_ref.clone(), safety_route, remote_private_route);
2022-08-31 01:21:16 +00:00
RPCError::network(e)
})? => {
2022-08-20 01:10:47 +00:00
// If we couldn't send we're still cleaning up
2022-11-24 03:12:48 +00:00
self.record_send_failure(RPCKind::Answer, send_ts, node_ref.clone(), safety_route, remote_private_route);
2022-07-20 13:39:38 +00:00
}
);
2021-11-22 16:28:30 +00:00
// Reply successfully sent
2022-11-24 03:12:48 +00:00
self.record_send_success(
RPCKind::Answer,
send_ts,
bytes,
node_ref,
safety_route,
remote_private_route,
);
2022-11-23 03:48:03 +00:00
2022-07-20 13:39:38 +00:00
Ok(NetworkResult::value(()))
2021-11-22 16:28:30 +00:00
}
2022-07-04 16:03:21 +00:00
//////////////////////////////////////////////////////////////////////
2022-07-20 13:39:38 +00:00
#[instrument(level = "trace", skip(self, encoded_msg), err)]
2022-11-21 03:30:45 +00:00
async fn process_rpc_message(
&self,
encoded_msg: RPCMessageEncoded,
) -> Result<NetworkResult<()>, RPCError> {
2022-10-30 23:29:31 +00:00
// Decode operation appropriately based on header detail
let msg = match &encoded_msg.header.detail {
RPCMessageHeaderDetail::Direct(detail) => {
// Get the routing domain this message came over
let routing_domain = detail.routing_domain;
// Decode the operation
let sender_node_id = detail.envelope.get_sender_id();
// Decode the RPC message
let operation = {
2022-11-05 22:50:20 +00:00
let reader = encoded_msg.data.get_reader()?;
2022-10-30 23:29:31 +00:00
let op_reader = reader
.get_root::<veilid_capnp::operation::Reader>()
.map_err(RPCError::protocol)
.map_err(logthru_rpc!())?;
RPCOperation::decode(&op_reader, Some(&sender_node_id))?
};
2022-05-26 00:56:13 +00:00
2022-10-30 23:29:31 +00:00
// Get the sender noderef, incorporating and 'sender node info'
let mut opt_sender_nr: Option<NodeRef> = None;
if let Some(sender_node_info) = operation.sender_node_info() {
// Sender NodeInfo was specified, update our routing table with it
2022-11-11 02:53:45 +00:00
if !self.filter_node_info(routing_domain, &sender_node_info) {
2022-10-30 23:29:31 +00:00
return Err(RPCError::invalid_format(
"sender signednodeinfo has invalid peer scope",
));
}
opt_sender_nr = self.routing_table().register_node_with_signed_node_info(
routing_domain,
sender_node_id,
sender_node_info.clone(),
false,
);
}
2022-09-04 18:17:28 +00:00
2022-10-30 23:29:31 +00:00
// look up sender node, in case it's different than our peer due to relaying
if opt_sender_nr.is_none() {
opt_sender_nr = self.routing_table().lookup_node_ref(sender_node_id)
}
2021-11-22 16:28:30 +00:00
2022-12-08 15:24:33 +00:00
// Update the 'seen our node info' timestamp to determine if this node needs a
// 'node info update' ping
2022-10-30 23:29:31 +00:00
if let Some(sender_nr) = &opt_sender_nr {
2022-12-08 15:24:33 +00:00
sender_nr.set_our_node_info_ts(routing_domain, operation.target_node_info_ts());
2022-10-30 23:29:31 +00:00
}
2022-09-04 18:17:28 +00:00
2022-10-30 23:29:31 +00:00
// Make the RPC message
RPCMessage {
header: encoded_msg.header,
operation,
opt_sender_nr,
}
}
2022-11-02 02:42:34 +00:00
RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => {
2022-10-30 23:29:31 +00:00
// Decode the RPC message
let operation = {
2022-11-05 22:50:20 +00:00
let reader = encoded_msg.data.get_reader()?;
2022-10-30 23:29:31 +00:00
let op_reader = reader
.get_root::<veilid_capnp::operation::Reader>()
.map_err(RPCError::protocol)
.map_err(logthru_rpc!())?;
RPCOperation::decode(&op_reader, None)?
};
// Make the RPC message
RPCMessage {
header: encoded_msg.header,
operation,
opt_sender_nr: None,
}
}
2022-04-17 23:10:10 +00:00
};
2022-11-24 03:12:48 +00:00
// Process stats for questions/statements received
2022-07-04 16:03:21 +00:00
let kind = match msg.operation.kind() {
RPCOperationKind::Question(_) => {
2022-11-24 03:12:48 +00:00
self.record_question_received(&msg);
2022-07-04 16:03:21 +00:00
if let Some(sender_nr) = msg.opt_sender_nr.clone() {
2022-08-31 01:21:16 +00:00
sender_nr.stats_question_rcvd(msg.header.timestamp, msg.header.body_len);
2022-07-04 16:03:21 +00:00
}
"question"
2021-11-22 16:28:30 +00:00
}
2022-07-04 16:03:21 +00:00
RPCOperationKind::Statement(_) => {
if let Some(sender_nr) = msg.opt_sender_nr.clone() {
2022-08-31 01:21:16 +00:00
sender_nr.stats_question_rcvd(msg.header.timestamp, msg.header.body_len);
2021-11-22 16:28:30 +00:00
}
2022-07-04 16:03:21 +00:00
"statement"
}
RPCOperationKind::Answer(_) => {
// Answer stats are processed in wait_for_reply
"answer"
}
};
2022-07-04 16:03:21 +00:00
// Log rpc receive
2022-11-09 22:11:35 +00:00
trace!(target: "rpc_message", dir = "recv", kind, op_id = msg.operation.op_id(), desc = msg.operation.kind().desc(), header = ?msg.header);
2022-07-04 16:03:21 +00:00
// Process specific message kind
match msg.operation.kind() {
RPCOperationKind::Question(q) => match q.detail() {
RPCQuestionDetail::StatusQ(_) => self.process_status_q(msg).await,
RPCQuestionDetail::FindNodeQ(_) => self.process_find_node_q(msg).await,
RPCQuestionDetail::AppCallQ(_) => self.process_app_call_q(msg).await,
2022-07-04 16:03:21 +00:00
RPCQuestionDetail::GetValueQ(_) => self.process_get_value_q(msg).await,
RPCQuestionDetail::SetValueQ(_) => self.process_set_value_q(msg).await,
RPCQuestionDetail::WatchValueQ(_) => self.process_watch_value_q(msg).await,
RPCQuestionDetail::SupplyBlockQ(_) => self.process_supply_block_q(msg).await,
RPCQuestionDetail::FindBlockQ(_) => self.process_find_block_q(msg).await,
RPCQuestionDetail::StartTunnelQ(_) => self.process_start_tunnel_q(msg).await,
RPCQuestionDetail::CompleteTunnelQ(_) => self.process_complete_tunnel_q(msg).await,
RPCQuestionDetail::CancelTunnelQ(_) => self.process_cancel_tunnel_q(msg).await,
},
RPCOperationKind::Statement(s) => match s.detail() {
RPCStatementDetail::ValidateDialInfo(_) => {
self.process_validate_dial_info(msg).await
}
RPCStatementDetail::Route(_) => self.process_route(msg).await,
RPCStatementDetail::NodeInfoUpdate(_) => self.process_node_info_update(msg).await,
RPCStatementDetail::ValueChanged(_) => self.process_value_changed(msg).await,
RPCStatementDetail::Signal(_) => self.process_signal(msg).await,
RPCStatementDetail::ReturnReceipt(_) => self.process_return_receipt(msg).await,
RPCStatementDetail::AppMessage(_) => self.process_app_message(msg).await,
2022-07-04 16:03:21 +00:00
},
RPCOperationKind::Answer(_) => {
self.unlocked_inner
.waiting_rpc_table
.complete_op_waiter(msg.operation.op_id(), msg)
2022-11-22 01:21:46 +00:00
.await?;
Ok(NetworkResult::value(()))
}
2021-11-22 16:28:30 +00:00
}
}
2022-07-20 13:39:38 +00:00
async fn rpc_worker(
self,
stop_token: StopToken,
receiver: flume::Receiver<(Option<Id>, RPCMessageEncoded)>,
) {
while let Ok(Ok((_span_id, msg))) =
2022-07-20 13:39:38 +00:00
receiver.recv_async().timeout_at(stop_token.clone()).await
{
let rpc_worker_span = span!(parent: None, Level::TRACE, "rpc_worker recv");
// xxx: causes crash (Missing otel data span extensions)
// rpc_worker_span.follows_from(span_id);
2022-11-21 03:30:45 +00:00
let res = match self
2021-12-17 02:57:28 +00:00
.process_rpc_message(msg)
2022-10-02 22:47:36 +00:00
.instrument(rpc_worker_span)
2021-12-17 02:57:28 +00:00
.await
2022-11-21 03:30:45 +00:00
{
Err(e) => {
log_rpc!(error "couldn't process rpc message: {}", e);
continue;
}
Ok(v) => v,
};
2022-11-22 01:21:46 +00:00
cfg_if::cfg_if! {
if #[cfg(debug_assertions)] {
network_result_value_or_log!(warn res => {});
} else {
network_result_value_or_log!(debug res => {});
}
}
2021-11-22 16:28:30 +00:00
}
}
2022-07-20 13:39:38 +00:00
#[instrument(level = "trace", skip(self, body), err)]
2022-10-30 02:15:50 +00:00
pub fn enqueue_direct_message(
2021-11-22 16:28:30 +00:00
&self,
2022-05-31 23:54:52 +00:00
envelope: Envelope,
2021-11-22 16:28:30 +00:00
peer_noderef: NodeRef,
2022-08-25 23:21:50 +00:00
connection_descriptor: ConnectionDescriptor,
2022-09-03 17:57:25 +00:00
routing_domain: RoutingDomain,
2022-10-30 02:15:50 +00:00
body: Vec<u8>,
2022-07-10 21:36:50 +00:00
) -> EyreResult<()> {
2022-07-04 16:03:21 +00:00
let msg = RPCMessageEncoded {
2021-11-22 16:28:30 +00:00
header: RPCMessageHeader {
2022-10-30 02:15:50 +00:00
detail: RPCMessageHeaderDetail::Direct(RPCMessageHeaderDetailDirect {
envelope,
peer_noderef,
connection_descriptor,
routing_domain,
}),
2022-11-27 02:37:23 +00:00
timestamp: get_timestamp(),
2022-10-30 02:15:50 +00:00
body_len: body.len() as u64,
},
data: RPCMessageData { contents: body },
};
let send_channel = {
let inner = self.inner.lock();
inner.send_channel.as_ref().unwrap().clone()
};
let span_id = Span::current().id();
send_channel
.try_send((span_id, msg))
.wrap_err("failed to enqueue received RPC message")?;
Ok(())
}
#[instrument(level = "trace", skip(self, body), err)]
2022-11-02 02:42:34 +00:00
pub fn enqueue_safety_routed_message(
2022-11-02 19:36:01 +00:00
&self,
2022-11-24 03:12:48 +00:00
remote_safety_route: DHTKey,
2022-11-02 19:36:01 +00:00
sequencing: Sequencing,
2022-11-02 02:42:34 +00:00
body: Vec<u8>,
) -> EyreResult<()> {
let msg = RPCMessageEncoded {
header: RPCMessageHeader {
2022-11-02 19:36:01 +00:00
detail: RPCMessageHeaderDetail::SafetyRouted(RPCMessageHeaderDetailSafetyRouted {
2022-11-24 03:12:48 +00:00
remote_safety_route,
2022-11-02 19:36:01 +00:00
sequencing,
}),
2022-11-27 02:37:23 +00:00
timestamp: get_timestamp(),
2022-11-02 02:42:34 +00:00
body_len: body.len() as u64,
},
data: RPCMessageData { contents: body },
};
let send_channel = {
let inner = self.inner.lock();
inner.send_channel.as_ref().unwrap().clone()
};
let span_id = Span::current().id();
send_channel
.try_send((span_id, msg))
.wrap_err("failed to enqueue received RPC message")?;
Ok(())
}
#[instrument(level = "trace", skip(self, body), err)]
pub fn enqueue_private_routed_message(
2022-10-30 02:15:50 +00:00
&self,
2022-11-24 03:12:48 +00:00
remote_safety_route: DHTKey,
2022-10-30 02:15:50 +00:00
private_route: DHTKey,
2022-11-02 19:36:01 +00:00
safety_spec: SafetySpec,
2022-10-30 02:15:50 +00:00
body: Vec<u8>,
) -> EyreResult<()> {
let msg = RPCMessageEncoded {
header: RPCMessageHeader {
2022-11-02 02:42:34 +00:00
detail: RPCMessageHeaderDetail::PrivateRouted(
RPCMessageHeaderDetailPrivateRouted {
2022-11-24 03:12:48 +00:00
remote_safety_route,
2022-11-02 02:42:34 +00:00
private_route,
2022-11-02 19:36:01 +00:00
safety_spec,
2022-11-02 02:42:34 +00:00
},
),
2022-11-27 02:37:23 +00:00
timestamp: get_timestamp(),
2021-11-22 16:28:30 +00:00
body_len: body.len() as u64,
},
data: RPCMessageData { contents: body },
};
let send_channel = {
let inner = self.inner.lock();
2021-12-17 02:57:28 +00:00
inner.send_channel.as_ref().unwrap().clone()
2021-11-22 16:28:30 +00:00
};
2022-07-20 13:39:38 +00:00
let span_id = Span::current().id();
2021-12-17 02:57:28 +00:00
send_channel
2022-07-20 13:39:38 +00:00
.try_send((span_id, msg))
2022-07-10 21:36:50 +00:00
.wrap_err("failed to enqueue received RPC message")?;
2021-11-22 16:28:30 +00:00
Ok(())
}
}