2021-11-22 16:28:30 +00:00
mod coders ;
2022-09-03 17:57:25 +00:00
mod destination ;
2022-09-25 22:04:53 +00:00
mod operation_waiter ;
mod rpc_app_call ;
mod rpc_app_message ;
2022-07-04 16:03:21 +00:00
mod rpc_cancel_tunnel ;
mod rpc_complete_tunnel ;
2022-07-10 21:36:50 +00:00
mod rpc_error ;
2022-07-04 16:03:21 +00:00
mod rpc_find_block ;
mod rpc_find_node ;
mod rpc_get_value ;
mod rpc_node_info_update ;
mod rpc_return_receipt ;
mod rpc_route ;
mod rpc_set_value ;
mod rpc_signal ;
mod rpc_start_tunnel ;
mod rpc_status ;
mod rpc_supply_block ;
mod rpc_validate_dial_info ;
mod rpc_value_changed ;
mod rpc_watch_value ;
2021-12-17 02:57:28 +00:00
2022-10-20 19:09:04 +00:00
pub use coders ::* ;
2022-09-03 17:57:25 +00:00
pub use destination ::* ;
2022-09-25 22:04:53 +00:00
pub use operation_waiter ::* ;
2022-07-10 21:36:50 +00:00
pub use rpc_error ::* ;
2022-11-02 19:36:01 +00:00
pub use rpc_status ::* ;
2021-11-22 16:28:30 +00:00
2022-07-04 16:03:21 +00:00
use super ::* ;
2022-10-30 23:29:31 +00:00
use crate ::crypto ::* ;
2021-11-22 16:28:30 +00:00
use crate ::xx ::* ;
2022-06-13 00:58:02 +00:00
use futures_util ::StreamExt ;
2021-11-22 16:28:30 +00:00
use network_manager ::* ;
use receipt_manager ::* ;
use routing_table ::* ;
2022-06-13 00:58:02 +00:00
use stop_token ::future ::FutureExt ;
2021-11-22 16:28:30 +00:00
/////////////////////////////////////////////////////////////////////
type OperationId = u64 ;
#[ derive(Debug, Clone) ]
2022-10-30 02:15:50 +00:00
struct RPCMessageHeaderDetailDirect {
2022-08-25 23:21:50 +00:00
/// The decoded header of the envelope
2022-05-31 23:54:52 +00:00
envelope : Envelope ,
2022-08-25 23:21:50 +00:00
/// The noderef of the peer that sent the message (not the original sender). Ensures node doesn't get evicted from routing table until we're done with it
peer_noderef : NodeRef ,
/// The connection from the peer sent the message (not the original sender)
connection_descriptor : ConnectionDescriptor ,
2022-09-03 17:57:25 +00:00
/// The routing domain the message was sent through
routing_domain : RoutingDomain ,
2022-10-30 02:15:50 +00:00
}
2022-11-02 02:42:34 +00:00
/// Header details for rpc messages received over only a safety route but not a private route
2022-10-30 02:15:50 +00:00
#[ derive(Debug, Clone) ]
2022-11-02 02:42:34 +00:00
struct RPCMessageHeaderDetailSafetyRouted {
/// The sequencing used for this route
sequencing : Sequencing ,
}
/// Header details for rpc messages received over a private route
#[ derive(Debug, Clone) ]
struct RPCMessageHeaderDetailPrivateRouted {
2022-10-30 02:15:50 +00:00
/// The private route we received the rpc over
private_route : DHTKey ,
2022-11-02 19:36:01 +00:00
// The safety spec for replying to this private routed rpc
safety_spec : SafetySpec ,
2022-10-30 02:15:50 +00:00
}
#[ derive(Debug, Clone) ]
enum RPCMessageHeaderDetail {
Direct ( RPCMessageHeaderDetailDirect ) ,
2022-11-02 02:42:34 +00:00
SafetyRouted ( RPCMessageHeaderDetailSafetyRouted ) ,
PrivateRouted ( RPCMessageHeaderDetailPrivateRouted ) ,
2022-10-30 02:15:50 +00:00
}
/// The decoded header of an RPC message
#[ derive(Debug, Clone) ]
struct RPCMessageHeader {
/// Time the message was received, not sent
timestamp : u64 ,
/// The length in bytes of the rpc message body
body_len : u64 ,
/// The header detail depending on which way the message was received
detail : RPCMessageHeaderDetail ,
2021-11-22 16:28:30 +00:00
}
2022-10-30 23:29:31 +00:00
impl RPCMessageHeader { }
2022-06-30 02:17:19 +00:00
#[ derive(Debug) ]
2022-11-04 23:29:44 +00:00
pub struct RPCMessageData {
2022-07-04 16:03:21 +00:00
contents : Vec < u8 > , // rpc messages must be a canonicalized single segment
2021-11-22 16:28:30 +00:00
}
2022-11-04 23:29:44 +00:00
impl RPCMessageData {
pub fn new ( contents : Vec < u8 > ) -> Self {
Self { contents }
}
2022-11-05 22:50:20 +00:00
pub fn get_reader (
& self ,
) -> Result < capnp ::message ::Reader < capnp ::serialize ::OwnedSegments > , RPCError > {
capnp ::serialize_packed ::read_message (
self . contents . as_slice ( ) ,
capnp ::message ::ReaderOptions ::new ( ) ,
)
. map_err ( RPCError ::protocol )
2021-11-22 16:28:30 +00:00
}
}
2022-11-05 22:50:20 +00:00
// impl ReaderSegments for RPCMessageData {
// fn get_segment(&self, idx: u32) -> Option<&[u8]> {
// if idx > 0 {
// None
// } else {
// Some(self.contents.as_slice())
// }
// }
// }
2021-11-22 16:28:30 +00:00
#[ derive(Debug) ]
2022-07-04 16:03:21 +00:00
struct RPCMessageEncoded {
2021-11-22 16:28:30 +00:00
header : RPCMessageHeader ,
data : RPCMessageData ,
}
2022-08-20 21:08:48 +00:00
#[ derive(Debug) ]
2022-07-05 03:09:15 +00:00
pub ( crate ) struct RPCMessage {
2021-11-22 16:28:30 +00:00
header : RPCMessageHeader ,
2022-07-04 16:03:21 +00:00
operation : RPCOperation ,
2022-04-03 16:58:06 +00:00
opt_sender_nr : Option < NodeRef > ,
2021-11-22 16:28:30 +00:00
}
2022-10-21 03:11:41 +00:00
pub fn builder_to_vec < ' a , T > ( builder : capnp ::message ::Builder < T > ) -> Result < Vec < u8 > , RPCError >
2021-11-22 16:28:30 +00:00
where
T : capnp ::message ::Allocator + ' a ,
{
2022-11-05 22:50:20 +00:00
let mut buffer = vec! [ ] ;
capnp ::serialize_packed ::write_message ( & mut buffer , & builder )
2022-07-10 21:36:50 +00:00
. map_err ( RPCError ::protocol )
2021-12-18 00:18:25 +00:00
. map_err ( logthru_rpc! ( ) ) ? ;
2022-11-05 22:50:20 +00:00
Ok ( buffer )
// let wordvec = builder
// .into_reader()
// .canonicalize()
// .map_err(RPCError::protocol)
// .map_err(logthru_rpc!())?;
// Ok(capnp::Word::words_to_bytes(wordvec.as_slice()).to_vec())
2021-11-22 16:28:30 +00:00
}
2022-07-10 21:36:50 +00:00
// fn reader_to_vec<'a, T>(reader: &capnp::message::Reader<T>) -> Result<Vec<u8>, RPCError>
// where
// T: capnp::message::ReaderSegments + 'a,
// {
// let wordvec = reader
// .canonicalize()
// .map_err(RPCError::protocol)
// .map_err(logthru_rpc!())?;
// Ok(capnp::Word::words_to_bytes(wordvec.as_slice()).to_vec())
// }
2021-11-22 16:28:30 +00:00
#[ derive(Debug) ]
struct WaitableReply {
2022-09-25 22:04:53 +00:00
handle : OperationWaitHandle < RPCMessage > ,
2021-11-22 16:28:30 +00:00
timeout : u64 ,
node_ref : NodeRef ,
send_ts : u64 ,
2022-04-23 01:30:09 +00:00
send_data_kind : SendDataKind ,
2021-11-22 16:28:30 +00:00
}
/////////////////////////////////////////////////////////////////////
#[ derive(Clone, Debug, Default) ]
2022-07-04 16:03:21 +00:00
pub struct Answer < T > {
pub latency : u64 , // how long it took to get this answer
pub answer : T , // the answer itself
2021-11-22 16:28:30 +00:00
}
2022-07-04 16:03:21 +00:00
impl < T > Answer < T > {
pub fn new ( latency : u64 , answer : T ) -> Self {
Self { latency , answer }
}
2021-11-22 16:28:30 +00:00
}
2022-07-04 03:20:30 +00:00
struct RenderedOperation {
2022-10-14 02:05:43 +00:00
message : Vec < u8 > , // The rendered operation bytes
node_id : DHTKey , // Destination node id we're sending to
node_ref : NodeRef , // Node to send envelope to (may not be destination node id in case of relay)
2022-10-20 19:09:04 +00:00
hop_count : usize , // Total safety + private route hop count + 1 hop for the initial send
2022-07-04 03:20:30 +00:00
}
2021-11-22 16:28:30 +00:00
/////////////////////////////////////////////////////////////////////
pub struct RPCProcessorInner {
2022-09-25 22:04:53 +00:00
send_channel : Option < flume ::Sender < ( Option < Id > , RPCMessageEncoded ) > > ,
stop_source : Option < StopSource > ,
worker_join_handles : Vec < MustJoinHandle < ( ) > > ,
}
pub struct RPCProcessorUnlockedInner {
2021-11-22 16:28:30 +00:00
timeout : u64 ,
2022-09-25 22:04:53 +00:00
queue_size : u32 ,
concurrency : u32 ,
2021-11-22 16:28:30 +00:00
max_route_hop_count : usize ,
2022-09-25 22:04:53 +00:00
validate_dial_info_receipt_time_ms : u32 ,
update_callback : UpdateCallback ,
waiting_rpc_table : OperationWaiter < RPCMessage > ,
waiting_app_call_table : OperationWaiter < Vec < u8 > > ,
2021-11-22 16:28:30 +00:00
}
#[ derive(Clone) ]
pub struct RPCProcessor {
crypto : Crypto ,
config : VeilidConfig ,
2022-09-25 22:04:53 +00:00
network_manager : NetworkManager ,
routing_table : RoutingTable ,
2021-11-22 16:28:30 +00:00
inner : Arc < Mutex < RPCProcessorInner > > ,
2022-09-25 22:04:53 +00:00
unlocked_inner : Arc < RPCProcessorUnlockedInner > ,
2021-11-22 16:28:30 +00:00
}
impl RPCProcessor {
2022-09-25 22:04:53 +00:00
fn new_inner ( ) -> RPCProcessorInner {
2021-11-22 16:28:30 +00:00
RPCProcessorInner {
2021-12-17 02:57:28 +00:00
send_channel : None ,
2022-06-13 00:58:02 +00:00
stop_source : None ,
2021-11-22 16:28:30 +00:00
worker_join_handles : Vec ::new ( ) ,
}
}
2022-09-25 22:04:53 +00:00
fn new_unlocked_inner (
config : VeilidConfig ,
update_callback : UpdateCallback ,
) -> RPCProcessorUnlockedInner {
// make local copy of node id for easy access
let c = config . get ( ) ;
// set up channel
let mut concurrency = c . network . rpc . concurrency ;
2022-10-21 03:11:41 +00:00
let queue_size = c . network . rpc . queue_size ;
let timeout = ms_to_us ( c . network . rpc . timeout_ms ) ;
let max_route_hop_count = c . network . rpc . max_route_hop_count as usize ;
2022-09-25 22:04:53 +00:00
if concurrency = = 0 {
concurrency = intf ::get_concurrency ( ) / 2 ;
if concurrency = = 0 {
concurrency = 1 ;
}
}
let validate_dial_info_receipt_time_ms = c . network . dht . validate_dial_info_receipt_time_ms ;
RPCProcessorUnlockedInner {
timeout ,
queue_size ,
concurrency ,
max_route_hop_count ,
validate_dial_info_receipt_time_ms ,
update_callback ,
waiting_rpc_table : OperationWaiter ::new ( ) ,
waiting_app_call_table : OperationWaiter ::new ( ) ,
}
}
pub fn new ( network_manager : NetworkManager , update_callback : UpdateCallback ) -> Self {
let config = network_manager . config ( ) ;
2021-11-22 16:28:30 +00:00
Self {
crypto : network_manager . crypto ( ) ,
2022-09-25 22:04:53 +00:00
config : config . clone ( ) ,
network_manager : network_manager . clone ( ) ,
routing_table : network_manager . routing_table ( ) ,
inner : Arc ::new ( Mutex ::new ( Self ::new_inner ( ) ) ) ,
unlocked_inner : Arc ::new ( Self ::new_unlocked_inner ( config , update_callback ) ) ,
2021-11-22 16:28:30 +00:00
}
}
pub fn network_manager ( & self ) -> NetworkManager {
2022-09-25 22:04:53 +00:00
self . network_manager . clone ( )
2021-11-22 16:28:30 +00:00
}
pub fn routing_table ( & self ) -> RoutingTable {
2022-09-25 22:04:53 +00:00
self . routing_table . clone ( )
2021-11-22 16:28:30 +00:00
}
2022-09-25 22:04:53 +00:00
//////////////////////////////////////////////////////////////////////
#[ instrument(level = " debug " , skip_all, err) ]
pub async fn startup ( & self ) -> EyreResult < ( ) > {
trace! ( " startup rpc processor " ) ;
let mut inner = self . inner . lock ( ) ;
let channel = flume ::bounded ( self . unlocked_inner . queue_size as usize ) ;
inner . send_channel = Some ( channel . 0. clone ( ) ) ;
inner . stop_source = Some ( StopSource ::new ( ) ) ;
// spin up N workers
trace! (
" Spinning up {} RPC workers " ,
self . unlocked_inner . concurrency
) ;
for _ in 0 .. self . unlocked_inner . concurrency {
let this = self . clone ( ) ;
let receiver = channel . 1. clone ( ) ;
let jh = intf ::spawn ( Self ::rpc_worker (
this ,
inner . stop_source . as_ref ( ) . unwrap ( ) . token ( ) ,
receiver ,
) ) ;
inner . worker_join_handles . push ( jh ) ;
}
Ok ( ( ) )
2021-11-22 16:28:30 +00:00
}
2022-09-25 22:04:53 +00:00
#[ instrument(level = " debug " , skip_all) ]
pub async fn shutdown ( & self ) {
debug! ( " starting rpc processor shutdown " ) ;
// Stop the rpc workers
let mut unord = FuturesUnordered ::new ( ) ;
{
let mut inner = self . inner . lock ( ) ;
// take the join handles out
for h in inner . worker_join_handles . drain ( .. ) {
unord . push ( h ) ;
}
// drop the stop
drop ( inner . stop_source . take ( ) ) ;
}
debug! ( " stopping {} rpc worker tasks " , unord . len ( ) ) ;
// Wait for them to complete
while unord . next ( ) . await . is_some ( ) { }
debug! ( " resetting rpc processor state " ) ;
// Release the rpc processor
* self . inner . lock ( ) = Self ::new_inner ( ) ;
debug! ( " finished rpc processor shutdown " ) ;
2021-11-22 16:28:30 +00:00
}
//////////////////////////////////////////////////////////////////////
2022-11-10 03:27:37 +00:00
/// Determine if a SignedNodeInfo can be placed into the specified routing domain
fn filter_node_info (
& self ,
routing_domain : RoutingDomain ,
signed_node_info : & SignedNodeInfo ,
) -> bool {
2022-09-04 18:17:28 +00:00
let routing_table = self . routing_table ( ) ;
2022-11-10 03:27:37 +00:00
routing_table . signed_node_info_is_valid_in_routing_domain ( routing_domain , & signed_node_info )
2021-12-27 16:31:31 +00:00
}
2021-11-22 16:28:30 +00:00
//////////////////////////////////////////////////////////////////////
2022-08-27 02:52:08 +00:00
/// Search the DHT for a single node closest to a key and add it to the routing table and return the node reference
/// If no node was found in the timeout, this returns None
2021-11-22 16:28:30 +00:00
pub async fn search_dht_single_key (
& self ,
2022-07-10 21:36:50 +00:00
_node_id : DHTKey ,
2021-11-22 16:28:30 +00:00
_count : u32 ,
_fanout : u32 ,
_timeout : Option < u64 > ,
2022-07-20 13:39:38 +00:00
) -> Result < Option < NodeRef > , RPCError > {
2022-07-10 21:36:50 +00:00
//let routing_table = self.routing_table();
2021-11-22 16:28:30 +00:00
// xxx find node but stop if we find the exact node we want
// xxx return whatever node is closest after the timeout
2022-07-10 21:36:50 +00:00
Err ( RPCError ::unimplemented ( " search_dht_single_key " ) ) . map_err ( logthru_rpc! ( error ) )
2021-11-22 16:28:30 +00:00
}
2022-08-27 02:52:08 +00:00
/// Search the DHT for the 'count' closest nodes to a key, adding them all to the routing table if they are not there and returning their node references
2021-11-22 16:28:30 +00:00
pub async fn search_dht_multi_key (
& self ,
2022-05-31 23:54:52 +00:00
_node_id : DHTKey ,
2021-11-22 16:28:30 +00:00
_count : u32 ,
_fanout : u32 ,
_timeout : Option < u64 > ,
) -> Result < Vec < NodeRef > , RPCError > {
// xxx return closest nodes after the timeout
2022-07-10 21:36:50 +00:00
Err ( RPCError ::unimplemented ( " search_dht_multi_key " ) ) . map_err ( logthru_rpc! ( error ) )
2021-11-22 16:28:30 +00:00
}
2022-08-27 02:52:08 +00:00
/// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference
/// Note: This routine can possible be recursive, hence the SendPinBoxFuture async form
2022-07-20 13:39:38 +00:00
pub fn resolve_node (
& self ,
node_id : DHTKey ,
) -> SendPinBoxFuture < Result < Option < NodeRef > , RPCError > > {
2022-03-27 01:25:24 +00:00
let this = self . clone ( ) ;
Box ::pin ( async move {
let routing_table = this . routing_table ( ) ;
// First see if we have the node in our routing table already
if let Some ( nr ) = routing_table . lookup_node_ref ( node_id ) {
2022-04-16 15:18:54 +00:00
// ensure we have some dial info for the entry already,
2022-03-27 01:25:24 +00:00
// if not, we should do the find_node anyway
2022-04-17 17:28:39 +00:00
if nr . has_any_dial_info ( ) {
2022-07-20 13:39:38 +00:00
return Ok ( Some ( nr ) ) ;
2022-03-27 01:25:24 +00:00
}
}
2021-11-22 16:28:30 +00:00
2022-03-27 01:25:24 +00:00
// If nobody knows where this node is, ask the DHT for it
let ( count , fanout , timeout ) = {
let c = this . config . get ( ) ;
(
c . network . dht . resolve_node_count ,
c . network . dht . resolve_node_fanout ,
c . network . dht . resolve_node_timeout_ms . map ( ms_to_us ) ,
)
} ;
let nr = this
. search_dht_single_key ( node_id , count , fanout , timeout )
. await ? ;
2021-11-22 16:28:30 +00:00
2022-07-20 13:39:38 +00:00
if let Some ( nr ) = & nr {
if nr . node_id ( ) ! = node_id {
// found a close node, but not exact within our configured resolve_node timeout
return Ok ( None ) ;
}
2022-03-27 01:25:24 +00:00
}
Ok ( nr )
} )
2021-11-22 16:28:30 +00:00
}
2022-07-20 13:39:38 +00:00
#[ instrument(level = " trace " , skip(self, waitable_reply), err) ]
2021-11-22 16:28:30 +00:00
async fn wait_for_reply (
& self ,
waitable_reply : WaitableReply ,
2022-07-20 13:39:38 +00:00
) -> Result < TimeoutOr < ( RPCMessage , u64 ) > , RPCError > {
2022-09-25 22:04:53 +00:00
let out = self
. unlocked_inner
. waiting_rpc_table
. wait_for_op ( waitable_reply . handle , waitable_reply . timeout )
. await ;
2021-11-22 16:28:30 +00:00
match & out {
2022-07-20 13:39:38 +00:00
Err ( _ ) | Ok ( TimeoutOr ::Timeout ) = > {
2022-08-31 01:21:16 +00:00
waitable_reply . node_ref . stats_question_lost ( ) ;
2021-11-22 16:28:30 +00:00
}
2022-07-20 13:39:38 +00:00
Ok ( TimeoutOr ::Value ( ( rpcreader , _ ) ) ) = > {
2021-11-22 16:28:30 +00:00
// Reply received
2022-06-28 03:46:29 +00:00
let recv_ts = intf ::get_timestamp ( ) ;
2022-08-31 01:21:16 +00:00
waitable_reply . node_ref . stats_answer_rcvd (
2022-04-18 22:49:33 +00:00
waitable_reply . send_ts ,
recv_ts ,
rpcreader . header . body_len ,
)
2021-11-22 16:28:30 +00:00
}
} ;
out
}
2022-10-14 02:05:43 +00:00
// Wrap an operation with a private route inside a safety route
2022-11-02 01:05:48 +00:00
fn wrap_with_route (
2022-10-14 02:05:43 +00:00
& self ,
2022-10-22 01:27:07 +00:00
safety_selection : SafetySelection ,
2022-10-14 02:05:43 +00:00
private_route : PrivateRoute ,
message_data : Vec < u8 > ,
2022-10-20 19:09:04 +00:00
) -> Result < NetworkResult < RenderedOperation > , RPCError > {
let routing_table = self . routing_table ( ) ;
2022-10-31 03:23:12 +00:00
let rss = routing_table . route_spec_store ( ) ;
2022-10-21 03:11:41 +00:00
let pr_hop_count = private_route . hop_count ;
let pr_pubkey = private_route . public_key ;
2022-10-31 03:23:12 +00:00
// Compile the safety route with the private route
let compiled_route : CompiledRoute = match rss
2022-11-01 02:03:05 +00:00
. compile_safety_route ( safety_selection , private_route )
2022-10-31 03:23:12 +00:00
. map_err ( RPCError ::internal ) ?
{
Some ( cr ) = > cr ,
None = > {
return Ok ( NetworkResult ::no_connection_other (
" private route could not be compiled at this time " ,
) )
}
} ;
2022-10-14 02:05:43 +00:00
// Encrypt routed operation
// Xmsg + ENC(Xmsg, DH(PKapr, SKbsr))
2022-10-30 23:29:31 +00:00
// xxx use factory method, get version from somewhere...
2022-10-14 02:05:43 +00:00
let nonce = Crypto ::get_random_nonce ( ) ;
let dh_secret = self
. crypto
2022-10-21 03:11:41 +00:00
. cached_dh ( & pr_pubkey , & compiled_route . secret )
2022-10-14 02:05:43 +00:00
. map_err ( RPCError ::map_internal ( " dh failed " ) ) ? ;
let enc_msg_data = Crypto ::encrypt_aead ( & message_data , & nonce , & dh_secret , None )
. map_err ( RPCError ::map_internal ( " encryption failed " ) ) ? ;
// Make the routed operation
2022-10-30 23:29:31 +00:00
// xxx: replace MAX_CRYPTO_VERSION with the version from the factory
let operation = RoutedOperation ::new ( MAX_CRYPTO_VERSION , nonce , enc_msg_data ) ;
2022-10-14 02:05:43 +00:00
// Prepare route operation
2022-10-21 03:11:41 +00:00
let sr_hop_count = compiled_route . safety_route . hop_count ;
2022-10-20 19:09:04 +00:00
let route_operation = RPCOperationRoute {
safety_route : compiled_route . safety_route ,
2022-10-14 02:05:43 +00:00
operation ,
} ;
2022-10-20 19:09:04 +00:00
let operation = RPCOperation ::new_statement (
RPCStatement ::new ( RPCStatementDetail ::Route ( route_operation ) ) ,
None ,
) ;
2022-10-14 02:05:43 +00:00
// Convert message to bytes and return it
let mut route_msg = ::capnp ::message ::Builder ::new_default ( ) ;
let mut route_operation = route_msg . init_root ::< veilid_capnp ::operation ::Builder > ( ) ;
operation . encode ( & mut route_operation ) ? ;
2022-10-20 19:09:04 +00:00
let out_message = builder_to_vec ( route_msg ) ? ;
// Get the first hop this is going to
2022-10-21 03:11:41 +00:00
let out_node_id = compiled_route . first_hop . node_id ( ) ;
let out_hop_count = ( 1 + sr_hop_count + pr_hop_count ) as usize ;
2022-10-20 19:09:04 +00:00
let out = RenderedOperation {
message : out_message ,
node_id : out_node_id ,
node_ref : compiled_route . first_hop ,
hop_count : out_hop_count ,
} ;
2022-10-14 02:05:43 +00:00
2022-10-20 19:09:04 +00:00
Ok ( NetworkResult ::value ( out ) )
2022-10-14 02:05:43 +00:00
}
2022-08-27 02:52:08 +00:00
/// Produce a byte buffer that represents the wire encoding of the entire
/// unencrypted envelope body for a RPC message. This incorporates
/// wrapping a private and/or safety route if they are specified.
2022-09-01 01:41:48 +00:00
#[ instrument(level = " debug " , skip(self, operation), err) ]
2022-07-04 16:03:21 +00:00
fn render_operation (
& self ,
2022-07-04 21:58:26 +00:00
dest : Destination ,
2022-07-04 16:03:21 +00:00
operation : & RPCOperation ,
2022-10-20 19:09:04 +00:00
) -> Result < NetworkResult < RenderedOperation > , RPCError > {
let out : NetworkResult < RenderedOperation > ;
2022-07-05 02:44:04 +00:00
2022-07-04 03:20:30 +00:00
// Encode message to a builder and make a message reader for it
2022-07-05 02:44:04 +00:00
// Then produce the message as an unencrypted byte buffer
2022-10-14 02:05:43 +00:00
let message = {
2022-07-05 02:44:04 +00:00
let mut msg_builder = ::capnp ::message ::Builder ::new_default ( ) ;
let mut op_builder = msg_builder . init_root ::< veilid_capnp ::operation ::Builder > ( ) ;
operation . encode ( & mut op_builder ) ? ;
builder_to_vec ( msg_builder ) ?
} ;
2021-11-22 16:28:30 +00:00
2022-07-05 02:44:04 +00:00
// To where are we sending the request
match dest {
2022-09-03 17:57:25 +00:00
Destination ::Direct {
2022-09-04 19:40:35 +00:00
target : ref node_ref ,
2022-10-22 01:27:07 +00:00
safety_selection ,
2022-09-03 17:57:25 +00:00
}
| Destination ::Relay {
2022-09-04 19:40:35 +00:00
relay : ref node_ref ,
2022-09-03 17:57:25 +00:00
target : _ ,
2022-10-22 01:27:07 +00:00
safety_selection ,
2022-09-03 17:57:25 +00:00
} = > {
2022-07-05 02:44:04 +00:00
// Send to a node without a private route
// --------------------------------------
// Get the actual destination node id accounting for relays
2022-09-03 17:57:25 +00:00
let ( node_ref , node_id ) = if let Destination ::Relay {
relay : _ ,
2022-09-04 19:40:35 +00:00
target : ref dht_key ,
2022-10-22 01:27:07 +00:00
safety_selection : _ ,
2022-09-03 17:57:25 +00:00
} = dest
{
2022-07-05 02:44:04 +00:00
( node_ref . clone ( ) , dht_key . clone ( ) )
} else {
let node_id = node_ref . node_id ( ) ;
( node_ref . clone ( ) , node_id )
} ;
// Handle the existence of safety route
2022-10-22 01:27:07 +00:00
match safety_selection {
SafetySelection ::Unsafe ( sequencing ) = > {
// Apply safety selection sequencing requirement if it is more strict than the node_ref's sequencing requirement
2022-11-02 01:05:48 +00:00
let mut node_ref = node_ref . clone ( ) ;
2022-10-22 01:27:07 +00:00
if sequencing > node_ref . sequencing ( ) {
node_ref . set_sequencing ( sequencing )
}
2022-07-05 02:44:04 +00:00
// If no safety route is being used, and we're not sending to a private
// route, we can use a direct envelope instead of routing
2022-10-20 19:09:04 +00:00
out = NetworkResult ::value ( RenderedOperation {
2022-10-14 02:05:43 +00:00
message ,
node_id ,
node_ref ,
hop_count : 1 ,
2022-10-20 19:09:04 +00:00
} ) ;
2022-07-05 02:44:04 +00:00
}
2022-10-22 01:27:07 +00:00
SafetySelection ::Safe ( _ ) = > {
2022-07-05 02:44:04 +00:00
// No private route was specified for the request
// but we are using a safety route, so we must create an empty private route
2022-10-30 02:15:50 +00:00
let peer_info = match node_ref . make_peer_info ( RoutingDomain ::PublicInternet )
{
None = > {
return Ok ( NetworkResult ::no_connection_other (
" No PublicInternet peer info for stub private route " ,
) )
}
Some ( pi ) = > pi ,
} ;
let private_route =
PrivateRoute ::new_stub ( node_id , RouteNode ::PeerInfo ( peer_info ) ) ;
2022-07-05 02:44:04 +00:00
2022-10-14 02:05:43 +00:00
// Wrap with safety route
2022-10-22 01:27:07 +00:00
out = self . wrap_with_route ( safety_selection , private_route , message ) ? ;
2022-07-05 02:44:04 +00:00
}
} ;
}
2022-09-03 17:57:25 +00:00
Destination ::PrivateRoute {
private_route ,
2022-10-22 01:27:07 +00:00
safety_selection ,
2022-09-03 17:57:25 +00:00
} = > {
2022-07-05 02:44:04 +00:00
// Send to private route
// ---------------------
// Reply with 'route' operation
2022-10-22 01:27:07 +00:00
out = self . wrap_with_route ( safety_selection , private_route , message ) ? ;
2021-11-22 16:28:30 +00:00
}
2022-07-05 02:44:04 +00:00
}
2021-11-22 16:28:30 +00:00
2022-10-14 02:05:43 +00:00
Ok ( out )
2022-07-04 03:20:30 +00:00
}
2022-09-04 18:17:28 +00:00
// Get signed node info to package with RPC messages to improve
// routing table caching when it is okay to do so
// This is only done in the PublicInternet routing domain because
// as far as we can tell this is the only domain that will really benefit
2022-11-10 03:27:37 +00:00
fn get_sender_signed_node_info ( & self , dest : & Destination ) -> Option < SignedNodeInfo > {
2022-09-04 18:17:28 +00:00
// Don't do this if the sender is to remain private
2022-10-21 03:11:41 +00:00
// Otherwise we would be attaching the original sender's identity to the final destination,
// thus defeating the purpose of the safety route entirely :P
2022-10-22 01:27:07 +00:00
match dest . get_safety_selection ( ) {
SafetySelection ::Unsafe ( _ ) = > { }
SafetySelection ::Safe ( _ ) = > {
return None ;
}
2022-09-04 18:17:28 +00:00
}
// Don't do this if our own signed node info isn't valid yet
let routing_table = self . routing_table ( ) ;
2022-10-21 03:11:41 +00:00
if ! routing_table . has_valid_own_node_info ( RoutingDomain ::PublicInternet ) {
2022-09-04 18:17:28 +00:00
return None ;
}
match dest {
Destination ::Direct {
target ,
2022-10-22 01:27:07 +00:00
safety_selection : _ ,
2022-09-04 18:17:28 +00:00
} = > {
// If the target has seen our node info already don't do this
if target . has_seen_our_node_info ( RoutingDomain ::PublicInternet ) {
return None ;
}
Some ( routing_table . get_own_signed_node_info ( RoutingDomain ::PublicInternet ) )
}
Destination ::Relay {
relay : _ ,
target ,
2022-10-22 01:27:07 +00:00
safety_selection : _ ,
2022-09-04 18:17:28 +00:00
} = > {
if let Some ( target ) = routing_table . lookup_node_ref ( * target ) {
if target . has_seen_our_node_info ( RoutingDomain ::PublicInternet ) {
return None ;
}
Some ( routing_table . get_own_signed_node_info ( RoutingDomain ::PublicInternet ) )
} else {
None
}
}
Destination ::PrivateRoute {
private_route : _ ,
2022-10-22 01:27:07 +00:00
safety_selection : _ ,
2022-09-04 18:17:28 +00:00
} = > None ,
}
}
2022-07-04 03:20:30 +00:00
// Issue a question over the network, possibly using an anonymized route
2022-09-03 17:57:25 +00:00
#[ instrument(level = " debug " , skip(self, question), err) ]
2022-07-04 03:20:30 +00:00
async fn question (
& self ,
dest : Destination ,
question : RPCQuestion ,
2022-07-20 13:39:38 +00:00
) -> Result < NetworkResult < WaitableReply > , RPCError > {
2022-09-03 17:57:25 +00:00
// Get sender info if we should send that
2022-09-04 18:17:28 +00:00
let opt_sender_info = self . get_sender_signed_node_info ( & dest ) ;
2022-09-03 17:57:25 +00:00
2022-07-04 03:20:30 +00:00
// Wrap question in operation
2022-09-04 18:17:28 +00:00
let operation = RPCOperation ::new_question ( question , opt_sender_info ) ;
2022-07-04 21:58:26 +00:00
let op_id = operation . op_id ( ) ;
// Log rpc send
2022-11-09 22:11:35 +00:00
trace! ( target : " rpc_message " , dir = " send " , kind = " question " , op_id , desc = operation . kind ( ) . desc ( ) , ? dest ) ;
2022-07-04 03:20:30 +00:00
// Produce rendered operation
let RenderedOperation {
2022-07-05 02:44:04 +00:00
message ,
node_id ,
node_ref ,
hop_count ,
2022-10-20 19:09:04 +00:00
} = network_result_try! ( self . render_operation ( dest , & operation ) ? ) ;
2022-07-04 03:20:30 +00:00
2022-07-05 02:44:04 +00:00
// Calculate answer timeout
// Timeout is number of hops times the timeout per hop
2022-09-25 22:04:53 +00:00
let timeout = self . unlocked_inner . timeout * ( hop_count as u64 ) ;
2022-07-05 02:44:04 +00:00
2022-07-04 03:20:30 +00:00
// Set up op id eventual
2022-09-25 22:04:53 +00:00
let handle = self . unlocked_inner . waiting_rpc_table . add_op_waiter ( op_id ) ;
2022-07-03 19:52:27 +00:00
2022-07-04 03:20:30 +00:00
// Send question
2022-07-05 02:44:04 +00:00
let bytes = message . len ( ) as u64 ;
2022-06-28 03:46:29 +00:00
let send_ts = intf ::get_timestamp ( ) ;
2022-07-20 13:39:38 +00:00
let send_data_kind = network_result_try! ( self
2021-11-22 16:28:30 +00:00
. network_manager ( )
2022-07-05 02:44:04 +00:00
. send_envelope ( node_ref . clone ( ) , Some ( node_id ) , message )
2021-11-22 16:28:30 +00:00
. await
2022-08-20 01:10:47 +00:00
. map_err ( | e | {
// If we're returning an error, clean up
2022-08-31 01:21:16 +00:00
node_ref
. stats_failed_to_send ( send_ts , true ) ;
RPCError ::network ( e )
} ) ? = > {
2022-08-20 01:10:47 +00:00
// If we couldn't send we're still cleaning up
2022-08-31 01:21:16 +00:00
node_ref
. stats_failed_to_send ( send_ts , true ) ;
2021-11-22 16:28:30 +00:00
}
2022-07-20 13:39:38 +00:00
) ;
2021-11-22 16:28:30 +00:00
// Successfully sent
2022-08-31 01:21:16 +00:00
node_ref . stats_question_sent ( send_ts , bytes , true ) ;
2021-11-22 16:28:30 +00:00
// Pass back waitable reply completion
2022-07-20 13:39:38 +00:00
Ok ( NetworkResult ::value ( WaitableReply {
2022-09-25 22:04:53 +00:00
handle ,
2022-07-04 03:20:30 +00:00
timeout ,
node_ref ,
send_ts ,
send_data_kind ,
2022-07-20 13:39:38 +00:00
} ) )
2021-11-22 16:28:30 +00:00
}
2022-07-04 16:03:21 +00:00
// Issue a statement over the network, possibly using an anonymized route
2022-09-04 18:17:28 +00:00
#[ instrument(level = " debug " , skip(self, statement), err) ]
2022-07-04 16:03:21 +00:00
async fn statement (
2021-11-22 16:28:30 +00:00
& self ,
2022-07-04 16:03:21 +00:00
dest : Destination ,
statement : RPCStatement ,
2022-07-20 13:39:38 +00:00
) -> Result < NetworkResult < ( ) > , RPCError > {
2022-09-04 18:17:28 +00:00
// Get sender info if we should send that
let opt_sender_info = self . get_sender_signed_node_info ( & dest ) ;
2022-07-04 16:03:21 +00:00
// Wrap statement in operation
2022-09-04 18:17:28 +00:00
let operation = RPCOperation ::new_statement ( statement , opt_sender_info ) ;
2021-11-22 16:28:30 +00:00
2022-07-04 21:58:26 +00:00
// Log rpc send
2022-11-09 22:11:35 +00:00
trace! ( target : " rpc_message " , dir = " send " , kind = " statement " , op_id = operation . op_id ( ) , desc = operation . kind ( ) . desc ( ) , ? dest ) ;
2022-07-04 21:58:26 +00:00
2022-07-04 16:03:21 +00:00
// Produce rendered operation
let RenderedOperation {
2022-07-05 02:44:04 +00:00
message ,
node_id ,
node_ref ,
2022-07-05 03:09:15 +00:00
hop_count : _ ,
2022-10-20 19:09:04 +00:00
} = network_result_try! ( self . render_operation ( dest , & operation ) ? ) ;
2021-11-22 16:28:30 +00:00
2022-07-04 16:03:21 +00:00
// Send statement
2022-07-05 02:44:04 +00:00
let bytes = message . len ( ) as u64 ;
2022-07-04 16:03:21 +00:00
let send_ts = intf ::get_timestamp ( ) ;
2022-07-20 13:39:38 +00:00
let _send_data_kind = network_result_try! ( self
2022-07-04 16:03:21 +00:00
. network_manager ( )
2022-07-05 02:44:04 +00:00
. send_envelope ( node_ref . clone ( ) , Some ( node_id ) , message )
2022-07-04 16:03:21 +00:00
. await
2022-08-20 01:10:47 +00:00
. map_err ( | e | {
// If we're returning an error, clean up
2022-08-31 01:21:16 +00:00
node_ref
. stats_failed_to_send ( send_ts , true ) ;
RPCError ::network ( e )
} ) ? = > {
2022-08-20 01:10:47 +00:00
// If we couldn't send we're still cleaning up
2022-08-31 01:21:16 +00:00
node_ref
. stats_failed_to_send ( send_ts , true ) ;
2021-11-22 16:28:30 +00:00
}
2022-07-20 13:39:38 +00:00
) ;
2021-11-22 16:28:30 +00:00
2022-07-04 16:03:21 +00:00
// Successfully sent
2022-08-31 01:21:16 +00:00
node_ref . stats_question_sent ( send_ts , bytes , true ) ;
2022-07-04 16:03:21 +00:00
2022-07-20 13:39:38 +00:00
Ok ( NetworkResult ::value ( ( ) ) )
2022-07-04 16:03:21 +00:00
}
// Convert the 'RespondTo' into a 'Destination' for a response
2022-10-30 02:15:50 +00:00
fn get_respond_to_destination ( & self , request : & RPCMessage ) -> NetworkResult < Destination > {
2022-07-04 16:03:21 +00:00
// Get the question 'respond to'
let respond_to = match request . operation . kind ( ) {
RPCOperationKind ::Question ( q ) = > q . respond_to ( ) ,
_ = > {
panic! ( " not a question " ) ;
}
} ;
// To where should we respond?
match respond_to {
2022-09-04 18:17:28 +00:00
RespondTo ::Sender = > {
2022-10-30 02:15:50 +00:00
// Parse out the header detail from the question
let detail = match & request . header . detail {
RPCMessageHeaderDetail ::Direct ( detail ) = > detail ,
2022-11-02 02:42:34 +00:00
RPCMessageHeaderDetail ::SafetyRouted ( _ )
| RPCMessageHeaderDetail ::PrivateRouted ( _ ) = > {
2022-10-30 02:15:50 +00:00
// If this was sent via a private route, we don't know what the sender was, so drop this
return NetworkResult ::invalid_message (
2022-11-02 02:42:34 +00:00
" can't respond directly to non-direct question " ,
2022-10-30 02:15:50 +00:00
) ;
}
} ;
2022-07-04 16:03:21 +00:00
// Reply directly to the request's source
2022-10-30 02:15:50 +00:00
let sender_id = detail . envelope . get_sender_id ( ) ;
2022-07-04 16:03:21 +00:00
// This may be a different node's reference than the 'sender' in the case of a relay
2022-10-30 02:15:50 +00:00
let peer_noderef = detail . peer_noderef . clone ( ) ;
2022-07-04 16:03:21 +00:00
// If the sender_id is that of the peer, then this is a direct reply
// else it is a relayed reply through the peer
if peer_noderef . node_id ( ) = = sender_id {
2022-10-30 02:15:50 +00:00
NetworkResult ::value ( Destination ::direct ( peer_noderef ) )
2022-07-04 16:03:21 +00:00
} else {
2022-10-30 02:15:50 +00:00
NetworkResult ::value ( Destination ::relay ( peer_noderef , sender_id ) )
2022-07-04 16:03:21 +00:00
}
}
2022-10-30 02:15:50 +00:00
RespondTo ::PrivateRoute ( pr ) = > {
2022-11-02 02:42:34 +00:00
match & request . header . detail {
2022-10-30 02:15:50 +00:00
RPCMessageHeaderDetail ::Direct ( _ ) = > {
2022-11-02 02:42:34 +00:00
// If this was sent directly, we should only ever respond directly
2022-10-30 02:15:50 +00:00
return NetworkResult ::invalid_message (
" not responding to private route from direct question " ,
) ;
}
2022-11-02 02:42:34 +00:00
RPCMessageHeaderDetail ::SafetyRouted ( detail ) = > {
2022-11-02 19:36:01 +00:00
// If this was sent via a safety route, but not received over our private route, don't respond with a safety route,
2022-11-02 02:42:34 +00:00
// it would give away which safety routes belong to this node
NetworkResult ::value ( Destination ::private_route (
pr . clone ( ) ,
SafetySelection ::Unsafe ( detail . sequencing ) ,
) )
}
RPCMessageHeaderDetail ::PrivateRouted ( detail ) = > {
// If this was received over our private route, it's okay to respond to a private route via our safety route
NetworkResult ::value ( Destination ::private_route (
pr . clone ( ) ,
2022-11-02 19:36:01 +00:00
SafetySelection ::Safe ( detail . safety_spec . clone ( ) ) ,
2022-11-02 02:42:34 +00:00
) )
}
}
2022-10-30 02:15:50 +00:00
}
2022-07-04 16:03:21 +00:00
}
}
// Issue a reply over the network, possibly using an anonymized route
// The request must want a response, or this routine fails
2022-09-04 18:17:28 +00:00
#[ instrument(level = " debug " , skip(self, request, answer), err) ]
2022-07-04 16:03:21 +00:00
async fn answer (
& self ,
request : RPCMessage ,
answer : RPCAnswer ,
2022-07-20 13:39:38 +00:00
) -> Result < NetworkResult < ( ) > , RPCError > {
2022-07-04 16:03:21 +00:00
// Extract destination from respond_to
2022-10-30 02:15:50 +00:00
let dest = network_result_try! ( self . get_respond_to_destination ( & request ) ) ;
2022-07-04 16:03:21 +00:00
2022-09-04 18:17:28 +00:00
// Get sender info if we should send that
let opt_sender_info = self . get_sender_signed_node_info ( & dest ) ;
// Wrap answer in operation
let operation = RPCOperation ::new_answer ( & request . operation , answer , opt_sender_info ) ;
2022-07-04 21:58:26 +00:00
// Log rpc send
2022-11-09 22:11:35 +00:00
trace! ( target : " rpc_message " , dir = " send " , kind = " answer " , op_id = operation . op_id ( ) , desc = operation . kind ( ) . desc ( ) , ? dest ) ;
2022-07-04 21:58:26 +00:00
2022-07-04 16:03:21 +00:00
// Produce rendered operation
let RenderedOperation {
2022-07-05 02:44:04 +00:00
message ,
node_id ,
node_ref ,
2022-07-05 03:09:15 +00:00
hop_count : _ ,
2022-10-20 19:09:04 +00:00
} = network_result_try! ( self . render_operation ( dest , & operation ) ? ) ;
2022-07-04 16:03:21 +00:00
2021-11-22 16:28:30 +00:00
// Send the reply
2022-07-05 02:44:04 +00:00
let bytes = message . len ( ) as u64 ;
2022-06-28 03:46:29 +00:00
let send_ts = intf ::get_timestamp ( ) ;
2022-07-20 13:39:38 +00:00
network_result_try! ( self . network_manager ( )
2022-07-05 02:44:04 +00:00
. send_envelope ( node_ref . clone ( ) , Some ( node_id ) , message )
2021-11-22 16:28:30 +00:00
. await
2022-08-20 01:10:47 +00:00
. map_err ( | e | {
// If we're returning an error, clean up
2022-08-31 01:21:16 +00:00
node_ref
. stats_failed_to_send ( send_ts , true ) ;
RPCError ::network ( e )
} ) ? = > {
2022-08-20 01:10:47 +00:00
// If we couldn't send we're still cleaning up
2022-08-31 01:21:16 +00:00
node_ref
. stats_failed_to_send ( send_ts , false ) ;
2022-07-20 13:39:38 +00:00
}
) ;
2021-11-22 16:28:30 +00:00
// Reply successfully sent
2022-08-31 01:21:16 +00:00
node_ref . stats_answer_sent ( bytes ) ;
2021-11-22 16:28:30 +00:00
2022-07-20 13:39:38 +00:00
Ok ( NetworkResult ::value ( ( ) ) )
2021-11-22 16:28:30 +00:00
}
2022-07-04 16:03:21 +00:00
//////////////////////////////////////////////////////////////////////
2022-07-20 13:39:38 +00:00
#[ instrument(level = " trace " , skip(self, encoded_msg), err) ]
2022-10-30 23:29:31 +00:00
async fn process_rpc_message ( & self , encoded_msg : RPCMessageEncoded ) -> Result < ( ) , RPCError > {
// Decode operation appropriately based on header detail
let msg = match & encoded_msg . header . detail {
RPCMessageHeaderDetail ::Direct ( detail ) = > {
// Get the routing domain this message came over
let routing_domain = detail . routing_domain ;
// Decode the operation
let sender_node_id = detail . envelope . get_sender_id ( ) ;
// Decode the RPC message
let operation = {
2022-11-05 22:50:20 +00:00
let reader = encoded_msg . data . get_reader ( ) ? ;
2022-10-30 23:29:31 +00:00
let op_reader = reader
. get_root ::< veilid_capnp ::operation ::Reader > ( )
. map_err ( RPCError ::protocol )
. map_err ( logthru_rpc! ( ) ) ? ;
RPCOperation ::decode ( & op_reader , Some ( & sender_node_id ) ) ?
} ;
2022-05-26 00:56:13 +00:00
2022-10-30 23:29:31 +00:00
// Get the sender noderef, incorporating and 'sender node info'
let mut opt_sender_nr : Option < NodeRef > = None ;
if let Some ( sender_node_info ) = operation . sender_node_info ( ) {
// Sender NodeInfo was specified, update our routing table with it
if ! self . filter_node_info ( routing_domain , & sender_node_info . node_info ) {
return Err ( RPCError ::invalid_format (
" sender signednodeinfo has invalid peer scope " ,
) ) ;
}
opt_sender_nr = self . routing_table ( ) . register_node_with_signed_node_info (
routing_domain ,
sender_node_id ,
sender_node_info . clone ( ) ,
false ,
) ;
}
2022-09-04 18:17:28 +00:00
2022-10-30 23:29:31 +00:00
// look up sender node, in case it's different than our peer due to relaying
if opt_sender_nr . is_none ( ) {
opt_sender_nr = self . routing_table ( ) . lookup_node_ref ( sender_node_id )
}
2021-11-22 16:28:30 +00:00
2022-10-30 23:29:31 +00:00
// Mark this sender as having seen our node info over this routing domain
// because it managed to reach us over that routing domain
if let Some ( sender_nr ) = & opt_sender_nr {
sender_nr . set_seen_our_node_info ( routing_domain ) ;
}
2022-09-04 18:17:28 +00:00
2022-10-30 23:29:31 +00:00
// Make the RPC message
RPCMessage {
header : encoded_msg . header ,
operation ,
opt_sender_nr ,
}
}
2022-11-02 02:42:34 +00:00
RPCMessageHeaderDetail ::SafetyRouted ( _ ) | RPCMessageHeaderDetail ::PrivateRouted ( _ ) = > {
2022-10-30 23:29:31 +00:00
// Decode the RPC message
let operation = {
2022-11-05 22:50:20 +00:00
let reader = encoded_msg . data . get_reader ( ) ? ;
2022-10-30 23:29:31 +00:00
let op_reader = reader
. get_root ::< veilid_capnp ::operation ::Reader > ( )
. map_err ( RPCError ::protocol )
. map_err ( logthru_rpc! ( ) ) ? ;
RPCOperation ::decode ( & op_reader , None ) ?
} ;
// Make the RPC message
RPCMessage {
header : encoded_msg . header ,
operation ,
opt_sender_nr : None ,
}
}
2022-04-17 23:10:10 +00:00
} ;
2022-07-04 16:03:21 +00:00
// Process stats
let kind = match msg . operation . kind ( ) {
RPCOperationKind ::Question ( _ ) = > {
if let Some ( sender_nr ) = msg . opt_sender_nr . clone ( ) {
2022-08-31 01:21:16 +00:00
sender_nr . stats_question_rcvd ( msg . header . timestamp , msg . header . body_len ) ;
2022-07-04 16:03:21 +00:00
}
" question "
2021-11-22 16:28:30 +00:00
}
2022-07-04 16:03:21 +00:00
RPCOperationKind ::Statement ( _ ) = > {
if let Some ( sender_nr ) = msg . opt_sender_nr . clone ( ) {
2022-08-31 01:21:16 +00:00
sender_nr . stats_question_rcvd ( msg . header . timestamp , msg . header . body_len ) ;
2021-11-22 16:28:30 +00:00
}
2022-07-04 16:03:21 +00:00
" statement "
}
RPCOperationKind ::Answer ( _ ) = > {
// Answer stats are processed in wait_for_reply
" answer "
}
2022-04-03 16:58:06 +00:00
} ;
2022-07-04 16:03:21 +00:00
// Log rpc receive
2022-11-09 22:11:35 +00:00
trace! ( target : " rpc_message " , dir = " recv " , kind , op_id = msg . operation . op_id ( ) , desc = msg . operation . kind ( ) . desc ( ) , header = ? msg . header ) ;
2022-07-04 16:03:21 +00:00
// Process specific message kind
match msg . operation . kind ( ) {
RPCOperationKind ::Question ( q ) = > match q . detail ( ) {
RPCQuestionDetail ::StatusQ ( _ ) = > self . process_status_q ( msg ) . await ,
RPCQuestionDetail ::FindNodeQ ( _ ) = > self . process_find_node_q ( msg ) . await ,
2022-09-25 22:04:53 +00:00
RPCQuestionDetail ::AppCallQ ( _ ) = > self . process_app_call_q ( msg ) . await ,
2022-07-04 16:03:21 +00:00
RPCQuestionDetail ::GetValueQ ( _ ) = > self . process_get_value_q ( msg ) . await ,
RPCQuestionDetail ::SetValueQ ( _ ) = > self . process_set_value_q ( msg ) . await ,
RPCQuestionDetail ::WatchValueQ ( _ ) = > self . process_watch_value_q ( msg ) . await ,
RPCQuestionDetail ::SupplyBlockQ ( _ ) = > self . process_supply_block_q ( msg ) . await ,
RPCQuestionDetail ::FindBlockQ ( _ ) = > self . process_find_block_q ( msg ) . await ,
RPCQuestionDetail ::StartTunnelQ ( _ ) = > self . process_start_tunnel_q ( msg ) . await ,
RPCQuestionDetail ::CompleteTunnelQ ( _ ) = > self . process_complete_tunnel_q ( msg ) . await ,
RPCQuestionDetail ::CancelTunnelQ ( _ ) = > self . process_cancel_tunnel_q ( msg ) . await ,
} ,
RPCOperationKind ::Statement ( s ) = > match s . detail ( ) {
RPCStatementDetail ::ValidateDialInfo ( _ ) = > {
self . process_validate_dial_info ( msg ) . await
}
RPCStatementDetail ::Route ( _ ) = > self . process_route ( msg ) . await ,
RPCStatementDetail ::NodeInfoUpdate ( _ ) = > self . process_node_info_update ( msg ) . await ,
RPCStatementDetail ::ValueChanged ( _ ) = > self . process_value_changed ( msg ) . await ,
RPCStatementDetail ::Signal ( _ ) = > self . process_signal ( msg ) . await ,
RPCStatementDetail ::ReturnReceipt ( _ ) = > self . process_return_receipt ( msg ) . await ,
2022-09-25 22:04:53 +00:00
RPCStatementDetail ::AppMessage ( _ ) = > self . process_app_message ( msg ) . await ,
2022-07-04 16:03:21 +00:00
} ,
2022-09-25 22:04:53 +00:00
RPCOperationKind ::Answer ( _ ) = > {
self . unlocked_inner
. waiting_rpc_table
. complete_op_waiter ( msg . operation . op_id ( ) , msg )
. await
}
2021-11-22 16:28:30 +00:00
}
}
2022-07-20 13:39:38 +00:00
async fn rpc_worker (
self ,
stop_token : StopToken ,
receiver : flume ::Receiver < ( Option < Id > , RPCMessageEncoded ) > ,
) {
2022-10-04 15:27:38 +00:00
while let Ok ( Ok ( ( _span_id , msg ) ) ) =
2022-07-20 13:39:38 +00:00
receiver . recv_async ( ) . timeout_at ( stop_token . clone ( ) ) . await
{
2022-10-04 15:27:38 +00:00
let rpc_worker_span = span! ( parent : None , Level ::TRACE , " rpc_worker recv " ) ;
// xxx: causes crash (Missing otel data span extensions)
// rpc_worker_span.follows_from(span_id);
2021-12-17 02:57:28 +00:00
let _ = self
. process_rpc_message ( msg )
2022-10-02 22:47:36 +00:00
. instrument ( rpc_worker_span )
2021-12-17 02:57:28 +00:00
. await
. map_err ( logthru_rpc! ( " couldn't process rpc message " ) ) ;
2021-11-22 16:28:30 +00:00
}
}
2022-07-20 13:39:38 +00:00
#[ instrument(level = " trace " , skip(self, body), err) ]
2022-10-30 02:15:50 +00:00
pub fn enqueue_direct_message (
2021-11-22 16:28:30 +00:00
& self ,
2022-05-31 23:54:52 +00:00
envelope : Envelope ,
2021-11-22 16:28:30 +00:00
peer_noderef : NodeRef ,
2022-08-25 23:21:50 +00:00
connection_descriptor : ConnectionDescriptor ,
2022-09-03 17:57:25 +00:00
routing_domain : RoutingDomain ,
2022-10-30 02:15:50 +00:00
body : Vec < u8 > ,
2022-07-10 21:36:50 +00:00
) -> EyreResult < ( ) > {
2022-07-04 16:03:21 +00:00
let msg = RPCMessageEncoded {
2021-11-22 16:28:30 +00:00
header : RPCMessageHeader {
2022-10-30 02:15:50 +00:00
detail : RPCMessageHeaderDetail ::Direct ( RPCMessageHeaderDetailDirect {
envelope ,
peer_noderef ,
connection_descriptor ,
routing_domain ,
} ) ,
timestamp : intf ::get_timestamp ( ) ,
body_len : body . len ( ) as u64 ,
} ,
data : RPCMessageData { contents : body } ,
} ;
let send_channel = {
let inner = self . inner . lock ( ) ;
inner . send_channel . as_ref ( ) . unwrap ( ) . clone ( )
} ;
let span_id = Span ::current ( ) . id ( ) ;
send_channel
. try_send ( ( span_id , msg ) )
. wrap_err ( " failed to enqueue received RPC message " ) ? ;
Ok ( ( ) )
}
#[ instrument(level = " trace " , skip(self, body), err) ]
2022-11-02 02:42:34 +00:00
pub fn enqueue_safety_routed_message (
2022-11-02 19:36:01 +00:00
& self ,
sequencing : Sequencing ,
2022-11-02 02:42:34 +00:00
body : Vec < u8 > ,
) -> EyreResult < ( ) > {
let msg = RPCMessageEncoded {
header : RPCMessageHeader {
2022-11-02 19:36:01 +00:00
detail : RPCMessageHeaderDetail ::SafetyRouted ( RPCMessageHeaderDetailSafetyRouted {
sequencing ,
} ) ,
2022-11-02 02:42:34 +00:00
timestamp : intf ::get_timestamp ( ) ,
body_len : body . len ( ) as u64 ,
} ,
data : RPCMessageData { contents : body } ,
} ;
let send_channel = {
let inner = self . inner . lock ( ) ;
inner . send_channel . as_ref ( ) . unwrap ( ) . clone ( )
} ;
let span_id = Span ::current ( ) . id ( ) ;
send_channel
. try_send ( ( span_id , msg ) )
. wrap_err ( " failed to enqueue received RPC message " ) ? ;
Ok ( ( ) )
}
#[ instrument(level = " trace " , skip(self, body), err) ]
pub fn enqueue_private_routed_message (
2022-10-30 02:15:50 +00:00
& self ,
private_route : DHTKey ,
2022-11-02 19:36:01 +00:00
safety_spec : SafetySpec ,
2022-10-30 02:15:50 +00:00
body : Vec < u8 > ,
) -> EyreResult < ( ) > {
let msg = RPCMessageEncoded {
header : RPCMessageHeader {
2022-11-02 02:42:34 +00:00
detail : RPCMessageHeaderDetail ::PrivateRouted (
RPCMessageHeaderDetailPrivateRouted {
private_route ,
2022-11-02 19:36:01 +00:00
safety_spec ,
2022-11-02 02:42:34 +00:00
} ,
) ,
2022-06-28 03:46:29 +00:00
timestamp : intf ::get_timestamp ( ) ,
2021-11-22 16:28:30 +00:00
body_len : body . len ( ) as u64 ,
} ,
data : RPCMessageData { contents : body } ,
} ;
let send_channel = {
let inner = self . inner . lock ( ) ;
2021-12-17 02:57:28 +00:00
inner . send_channel . as_ref ( ) . unwrap ( ) . clone ( )
2021-11-22 16:28:30 +00:00
} ;
2022-07-20 13:39:38 +00:00
let span_id = Span ::current ( ) . id ( ) ;
2021-12-17 02:57:28 +00:00
send_channel
2022-07-20 13:39:38 +00:00
. try_send ( ( span_id , msg ) )
2022-07-10 21:36:50 +00:00
. wrap_err ( " failed to enqueue received RPC message " ) ? ;
2021-11-22 16:28:30 +00:00
Ok ( ( ) )
}
}