2021-11-22 16:28:30 +00:00
mod coders ;
2021-12-17 02:57:28 +00:00
mod debug ;
mod private_route ;
pub use debug ::* ;
pub use private_route ::* ;
2021-11-22 16:28:30 +00:00
use crate ::dht ::* ;
use crate ::intf ::* ;
use crate ::xx ::* ;
use crate ::* ;
use capnp ::message ::ReaderSegments ;
use coders ::* ;
use core ::convert ::{ TryFrom , TryInto } ;
use core ::fmt ;
use network_manager ::* ;
use receipt_manager ::* ;
use routing_table ::* ;
/////////////////////////////////////////////////////////////////////
type OperationId = u64 ;
2021-12-14 19:20:05 +00:00
#[ derive(Debug, Clone) ]
2021-11-22 16:28:30 +00:00
pub enum Destination {
2022-04-17 17:28:39 +00:00
Direct ( NodeRef ) , // Send to node
Relay ( NodeRef , DHTKey ) , // Send to node for relay purposes
PrivateRoute ( PrivateRoute ) , // Send to private route
2021-11-22 16:28:30 +00:00
}
2021-12-14 19:20:05 +00:00
#[ derive(Debug, Clone) ]
2021-11-22 16:28:30 +00:00
pub enum RespondTo {
None ,
2022-04-08 14:17:09 +00:00
Sender ( Option < NodeInfo > ) ,
2021-11-22 16:28:30 +00:00
PrivateRoute ( PrivateRoute ) ,
}
impl RespondTo {
pub fn encode (
& self ,
builder : & mut veilid_capnp ::operation ::respond_to ::Builder ,
) -> Result < ( ) , RPCError > {
match self {
Self ::None = > {
builder . set_none ( ( ) ) ;
}
2022-04-08 14:17:09 +00:00
Self ::Sender ( Some ( ni ) ) = > {
let mut ni_builder = builder . reborrow ( ) . init_sender ( ) ;
encode_node_info ( ni , & mut ni_builder ) ? ;
2022-03-25 02:07:55 +00:00
}
Self ::Sender ( None ) = > {
2022-03-27 01:25:24 +00:00
builder . reborrow ( ) . init_sender ( ) ;
2021-11-22 16:28:30 +00:00
}
Self ::PrivateRoute ( pr ) = > {
let mut pr_builder = builder . reborrow ( ) . init_private_route ( ) ;
2021-11-27 17:44:21 +00:00
encode_private_route ( pr , & mut pr_builder ) ? ;
2021-11-22 16:28:30 +00:00
}
} ;
Ok ( ( ) )
}
}
#[ derive(Debug, Clone) ]
struct RPCMessageHeader {
timestamp : u64 ,
envelope : envelope ::Envelope ,
body_len : u64 ,
peer_noderef : NodeRef , // ensures node doesn't get evicted from routing table until we're done with it
}
#[ derive(Debug, Clone) ]
struct RPCMessageData {
contents : Vec < u8 > , // rpc messages must be a canonicalized single segment
}
impl ReaderSegments for RPCMessageData {
2021-11-27 17:44:21 +00:00
fn get_segment ( & self , idx : u32 ) -> Option < & [ u8 ] > {
2021-11-22 16:28:30 +00:00
if idx > 0 {
None
} else {
Some ( self . contents . as_slice ( ) )
}
}
}
#[ derive(Debug) ]
struct RPCMessage {
header : RPCMessageHeader ,
data : RPCMessageData ,
}
struct RPCMessageReader {
header : RPCMessageHeader ,
reader : capnp ::message ::Reader < RPCMessageData > ,
2022-04-03 16:58:06 +00:00
opt_sender_nr : Option < NodeRef > ,
2021-11-22 16:28:30 +00:00
}
fn builder_to_vec < ' a , T > ( builder : capnp ::message ::Builder < T > ) -> Result < Vec < u8 > , RPCError >
where
T : capnp ::message ::Allocator + ' a ,
{
let wordvec = builder
. into_reader ( )
. canonicalize ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
Ok ( capnp ::Word ::words_to_bytes ( wordvec . as_slice ( ) ) . to_vec ( ) )
}
fn reader_to_vec < ' a , T > ( reader : capnp ::message ::Reader < T > ) -> Result < Vec < u8 > , RPCError >
where
T : capnp ::message ::ReaderSegments + ' a ,
{
2021-12-18 00:18:25 +00:00
let wordvec = reader
. canonicalize ( )
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
Ok ( capnp ::Word ::words_to_bytes ( wordvec . as_slice ( ) ) . to_vec ( ) )
}
#[ derive(Debug) ]
struct WaitableReply {
op_id : OperationId ,
eventual : EventualValue < RPCMessageReader > ,
timeout : u64 ,
node_ref : NodeRef ,
send_ts : u64 ,
2022-04-23 01:30:09 +00:00
send_data_kind : SendDataKind ,
2021-11-22 16:28:30 +00:00
}
/////////////////////////////////////////////////////////////////////
#[ derive(Clone, Debug, Default) ]
pub struct InfoAnswer {
pub latency : u64 ,
2022-04-16 15:18:54 +00:00
pub node_status : NodeStatus ,
2021-11-22 16:28:30 +00:00
pub sender_info : SenderInfo ,
}
#[ derive(Clone, Debug, Default) ]
pub struct FindNodeAnswer {
pub latency : u64 , // how long it took to get this answer
pub peers : Vec < PeerInfo > , // the list of closer peers
}
/////////////////////////////////////////////////////////////////////
pub struct RPCProcessorInner {
network_manager : NetworkManager ,
routing_table : RoutingTable ,
node_id : key ::DHTKey ,
node_id_secret : key ::DHTKeySecret ,
2022-03-10 15:18:47 +00:00
send_channel : Option < flume ::Sender < RPCMessage > > ,
2021-11-22 16:28:30 +00:00
timeout : u64 ,
max_route_hop_count : usize ,
waiting_rpc_table : BTreeMap < OperationId , EventualValue < RPCMessageReader > > ,
worker_join_handles : Vec < JoinHandle < ( ) > > ,
}
#[ derive(Clone) ]
pub struct RPCProcessor {
crypto : Crypto ,
config : VeilidConfig ,
2022-04-16 15:18:54 +00:00
enable_local_peer_scope : bool ,
2021-11-22 16:28:30 +00:00
inner : Arc < Mutex < RPCProcessorInner > > ,
}
impl RPCProcessor {
fn new_inner ( network_manager : NetworkManager ) -> RPCProcessorInner {
RPCProcessorInner {
network_manager : network_manager . clone ( ) ,
routing_table : network_manager . routing_table ( ) ,
node_id : key ::DHTKey ::default ( ) ,
node_id_secret : key ::DHTKeySecret ::default ( ) ,
2021-12-17 02:57:28 +00:00
send_channel : None ,
2021-11-22 16:28:30 +00:00
timeout : 10000000 ,
max_route_hop_count : 7 ,
waiting_rpc_table : BTreeMap ::new ( ) ,
worker_join_handles : Vec ::new ( ) ,
}
}
pub fn new ( network_manager : NetworkManager ) -> Self {
Self {
crypto : network_manager . crypto ( ) ,
config : network_manager . config ( ) ,
2022-04-16 15:18:54 +00:00
enable_local_peer_scope : network_manager
2021-12-27 16:31:31 +00:00
. config ( )
. get ( )
. network
2022-04-16 15:18:54 +00:00
. enable_local_peer_scope ,
2021-11-22 16:28:30 +00:00
inner : Arc ::new ( Mutex ::new ( Self ::new_inner ( network_manager ) ) ) ,
}
}
pub fn network_manager ( & self ) -> NetworkManager {
self . inner . lock ( ) . network_manager . clone ( )
}
pub fn routing_table ( & self ) -> RoutingTable {
self . inner . lock ( ) . routing_table . clone ( )
}
pub fn node_id ( & self ) -> key ::DHTKey {
self . inner . lock ( ) . node_id
}
pub fn node_id_secret ( & self ) -> key ::DHTKeySecret {
self . inner . lock ( ) . node_id_secret
}
//////////////////////////////////////////////////////////////////////
fn get_next_op_id ( & self ) -> OperationId {
get_random_u64 ( )
}
2022-04-25 00:16:13 +00:00
fn filter_peer_scope ( & self , node_info : & NodeInfo ) -> bool {
2022-04-17 17:28:39 +00:00
// if local peer scope is enabled, then don't reject any peer info
if self . enable_local_peer_scope {
return true ;
}
2021-12-27 16:31:31 +00:00
// reject attempts to include non-public addresses in results
2022-04-25 00:16:13 +00:00
for did in & node_info . dial_info_detail_list {
if ! did . dial_info . is_global ( ) {
2022-04-17 17:28:39 +00:00
// non-public address causes rejection
return false ;
}
}
2022-04-25 00:16:13 +00:00
if let Some ( rpi ) = & node_info . relay_peer_info {
for did in & rpi . node_info . dial_info_detail_list {
if ! did . dial_info . is_global ( ) {
2022-04-08 14:17:09 +00:00
// non-public address causes rejection
return false ;
}
}
2021-12-27 16:31:31 +00:00
}
true
}
2021-11-22 16:28:30 +00:00
//////////////////////////////////////////////////////////////////////
2022-03-27 01:25:24 +00:00
// Search the DHT for a single node closest to a key and add it to the routing table and return the node reference
2021-11-22 16:28:30 +00:00
pub async fn search_dht_single_key (
& self ,
node_id : key ::DHTKey ,
_count : u32 ,
_fanout : u32 ,
_timeout : Option < u64 > ,
) -> Result < NodeRef , RPCError > {
let routing_table = self . routing_table ( ) ;
// xxx find node but stop if we find the exact node we want
// xxx return whatever node is closest after the timeout
2021-12-18 00:18:25 +00:00
Err ( rpc_error_unimplemented ( " search_dht_single_key " ) ) . map_err ( logthru_rpc! ( error ) )
2021-11-22 16:28:30 +00:00
}
// Search the DHT for the 'count' closest nodes to a key, adding them all to the routing table if they are not there and returning their node references
pub async fn search_dht_multi_key (
& self ,
_node_id : key ::DHTKey ,
_count : u32 ,
_fanout : u32 ,
_timeout : Option < u64 > ,
) -> Result < Vec < NodeRef > , RPCError > {
// xxx return closest nodes after the timeout
2021-12-18 00:18:25 +00:00
Err ( rpc_error_unimplemented ( " search_dht_multi_key " ) ) . map_err ( logthru_rpc! ( error ) )
2021-11-22 16:28:30 +00:00
}
// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference
2022-03-27 01:25:24 +00:00
// Note: This routine can possible be recursive, hence the SystemPinBoxFuture async form
pub fn resolve_node (
& self ,
node_id : key ::DHTKey ,
) -> SystemPinBoxFuture < Result < NodeRef , RPCError > > {
let this = self . clone ( ) ;
Box ::pin ( async move {
let routing_table = this . routing_table ( ) ;
// First see if we have the node in our routing table already
if let Some ( nr ) = routing_table . lookup_node_ref ( node_id ) {
2022-04-16 15:18:54 +00:00
// ensure we have some dial info for the entry already,
2022-03-27 01:25:24 +00:00
// if not, we should do the find_node anyway
2022-04-17 17:28:39 +00:00
if nr . has_any_dial_info ( ) {
2022-03-27 01:25:24 +00:00
return Ok ( nr ) ;
}
}
2021-11-22 16:28:30 +00:00
2022-03-27 01:25:24 +00:00
// If nobody knows where this node is, ask the DHT for it
let ( count , fanout , timeout ) = {
let c = this . config . get ( ) ;
(
c . network . dht . resolve_node_count ,
c . network . dht . resolve_node_fanout ,
c . network . dht . resolve_node_timeout_ms . map ( ms_to_us ) ,
)
} ;
let nr = this
. search_dht_single_key ( node_id , count , fanout , timeout )
. await ? ;
2021-11-22 16:28:30 +00:00
2022-03-27 01:25:24 +00:00
if nr . node_id ( ) ! = node_id {
// found a close node, but not exact within our configured resolve_node timeout
return Err ( RPCError ::Timeout ) . map_err ( logthru_rpc! ( ) ) ;
}
Ok ( nr )
} )
2021-11-22 16:28:30 +00:00
}
// set up wait for reply
fn add_op_id_waiter ( & self , op_id : OperationId ) -> EventualValue < RPCMessageReader > {
let mut inner = self . inner . lock ( ) ;
let e = EventualValue ::new ( ) ;
inner . waiting_rpc_table . insert ( op_id , e . clone ( ) ) ;
e
}
// remove wait for reply
fn cancel_op_id_waiter ( & self , op_id : OperationId ) {
let mut inner = self . inner . lock ( ) ;
inner . waiting_rpc_table . remove ( & op_id ) ;
}
// complete the reply
async fn complete_op_id_waiter (
& self ,
op_id : OperationId ,
rpcreader : RPCMessageReader ,
) -> Result < ( ) , RPCError > {
let eventual = {
let mut inner = self . inner . lock ( ) ;
2021-11-27 17:44:21 +00:00
inner
. waiting_rpc_table
. remove ( & op_id )
. ok_or_else ( | | rpc_error_internal ( " Unmatched operation id " ) ) ?
2021-11-22 16:28:30 +00:00
} ;
2021-11-27 17:44:21 +00:00
eventual . resolve ( rpcreader ) . await ;
Ok ( ( ) )
2021-11-22 16:28:30 +00:00
}
// wait for reply
async fn do_wait_for_reply (
& self ,
waitable_reply : & WaitableReply ,
) -> Result < ( RPCMessageReader , u64 ) , RPCError > {
let timeout_ms = u32 ::try_from ( waitable_reply . timeout / 1000 u64 )
. map_err ( map_error_internal! ( " invalid timeout " ) ) ? ;
// wait for eventualvalue
let start_ts = get_timestamp ( ) ;
timeout ( timeout_ms , waitable_reply . eventual . instance ( ) )
. await
. map_err ( | _ | RPCError ::Timeout ) ? ;
match waitable_reply . eventual . take_value ( ) {
None = > panic! ( " there should be a reply value but there wasn't " ) ,
Some ( rpcreader ) = > {
let end_ts = get_timestamp ( ) ;
Ok ( ( rpcreader , end_ts - start_ts ) )
}
}
}
async fn wait_for_reply (
& self ,
waitable_reply : WaitableReply ,
) -> Result < ( RPCMessageReader , u64 ) , RPCError > {
let out = self . do_wait_for_reply ( & waitable_reply ) . await ;
match & out {
Err ( _ ) = > {
self . cancel_op_id_waiter ( waitable_reply . op_id ) ;
2022-04-18 22:49:33 +00:00
self . routing_table ( )
. stats_question_lost ( waitable_reply . node_ref . clone ( ) , waitable_reply . send_ts ) ;
2021-11-22 16:28:30 +00:00
}
Ok ( ( rpcreader , _ ) ) = > {
2022-04-16 15:18:54 +00:00
// Note that we definitely received this node info since we got a reply
waitable_reply . node_ref . set_seen_our_node_info ( ) ;
2022-03-25 02:07:55 +00:00
2021-11-22 16:28:30 +00:00
// Reply received
let recv_ts = get_timestamp ( ) ;
2022-04-18 22:49:33 +00:00
self . routing_table ( ) . stats_answer_rcvd (
waitable_reply . node_ref ,
waitable_reply . send_ts ,
recv_ts ,
rpcreader . header . body_len ,
)
2021-11-22 16:28:30 +00:00
}
} ;
out
}
// Issue a request over the network, possibly using an anonymized route
async fn request < T : capnp ::message ::ReaderSegments > (
& self ,
dest : Destination ,
message : capnp ::message ::Reader < T > ,
safety_route_spec : Option < & SafetyRouteSpec > ,
) -> Result < Option < WaitableReply > , RPCError > {
2021-12-17 02:57:28 +00:00
log_rpc! ( self . get_rpc_request_debug_info ( & dest , & message , & safety_route_spec ) ) ;
2022-04-18 22:49:33 +00:00
let ( op_id , wants_answer ) = {
2021-11-22 16:28:30 +00:00
let operation = message
. get_root ::< veilid_capnp ::operation ::Reader > ( )
2022-01-05 17:01:02 +00:00
. map_err ( map_error_internal! ( " invalid operation " ) )
. map_err ( logthru_rpc! ( error ) ) ? ;
2021-11-22 16:28:30 +00:00
let op_id = operation . get_op_id ( ) ;
2022-01-05 17:01:02 +00:00
let wants_answer = self . wants_answer ( & operation ) . map_err ( logthru_rpc! ( ) ) ? ;
2021-12-17 02:57:28 +00:00
2022-04-18 22:49:33 +00:00
( op_id , wants_answer )
2021-11-22 16:28:30 +00:00
} ;
2022-04-23 01:30:09 +00:00
let out_node_id ; // Envelope Node Id
let mut out_noderef : Option < NodeRef > = None ; // Node to send envelope to
let hopcount : usize ; // Total safety + private route hop count
// Create envelope data
2021-11-22 16:28:30 +00:00
let out = {
2022-04-23 01:30:09 +00:00
let out ; // Envelope data
2021-11-22 16:28:30 +00:00
// To where are we sending the request
2022-04-16 15:18:54 +00:00
match & dest {
2022-04-17 17:28:39 +00:00
Destination ::Direct ( node_ref ) | Destination ::Relay ( node_ref , _ ) = > {
2021-11-22 16:28:30 +00:00
// Send to a node without a private route
// --------------------------------------
2022-04-16 15:18:54 +00:00
2022-04-17 17:28:39 +00:00
// Get the actual destination node id accounting for relays
let ( node_ref , node_id ) = if let Destination ::Relay ( _ , dht_key ) = dest {
( node_ref . clone ( ) , dht_key )
2022-04-16 15:18:54 +00:00
} else {
let node_id = node_ref . node_id ( ) ;
( node_ref . clone ( ) , node_id )
} ;
// Handle the existence of safety route
2021-11-22 16:28:30 +00:00
match safety_route_spec {
None = > {
// If no safety route is being used, and we're not sending to a private
// route, we can use a direct envelope instead of routing
out = reader_to_vec ( message ) ? ;
// Message goes directly to the node
2022-04-16 15:18:54 +00:00
out_node_id = node_id ;
2021-11-22 16:28:30 +00:00
out_noderef = Some ( node_ref ) ;
hopcount = 1 ;
}
Some ( sr ) = > {
// No private route was specified for the request
// but we are using a safety route, so we must create an empty private route
let mut pr_builder = ::capnp ::message ::Builder ::new_default ( ) ;
let private_route =
2022-04-16 15:18:54 +00:00
self . new_stub_private_route ( node_id , & mut pr_builder ) ? ;
2021-11-22 16:28:30 +00:00
let message_vec = reader_to_vec ( message ) ? ;
// first
out_node_id = sr
. hops
. first ( )
2021-11-27 17:44:21 +00:00
. ok_or_else ( | | rpc_error_internal ( " no hop in safety route " ) ) ?
2021-11-22 16:28:30 +00:00
. dial_info
. node_id
. key ;
2021-11-27 17:44:21 +00:00
out = self . wrap_with_route ( Some ( sr ) , private_route , message_vec ) ? ;
2021-11-22 16:28:30 +00:00
hopcount = 1 + sr . hops . len ( ) ;
}
} ;
}
Destination ::PrivateRoute ( private_route ) = > {
// Send to private route
// ---------------------
// Encode the private route
let mut pr_msg_builder = ::capnp ::message ::Builder ::new_default ( ) ;
let mut pr_builder =
pr_msg_builder . init_root ::< veilid_capnp ::private_route ::Builder > ( ) ;
2022-04-17 17:28:39 +00:00
encode_private_route ( private_route , & mut pr_builder ) ? ;
2021-11-22 16:28:30 +00:00
let pr_reader = pr_builder . into_reader ( ) ;
// Reply with 'route' operation
let message_vec = reader_to_vec ( message ) ? ;
out_node_id = match safety_route_spec {
None = > {
// If no safety route, the first node is the first hop of the private route
hopcount = private_route . hop_count as usize ;
2022-04-16 15:18:54 +00:00
let out_node_id = match & private_route . hops {
2021-11-22 16:28:30 +00:00
Some ( rh ) = > rh . dial_info . node_id . key ,
_ = > return Err ( rpc_error_internal ( " private route has no hops " ) ) ,
} ;
out = self . wrap_with_route ( None , pr_reader , message_vec ) ? ;
out_node_id
}
Some ( sr ) = > {
// If safety route is in use, first node is the first hop of the safety route
hopcount = 1 + sr . hops . len ( ) + ( private_route . hop_count as usize ) ;
let out_node_id = sr
. hops
. first ( )
2021-11-27 17:44:21 +00:00
. ok_or_else ( | | rpc_error_internal ( " no hop in safety route " ) ) ?
2021-11-22 16:28:30 +00:00
. dial_info
. node_id
. key ;
2021-11-27 17:44:21 +00:00
out = self . wrap_with_route ( Some ( sr ) , pr_reader , message_vec ) ? ;
2021-11-22 16:28:30 +00:00
out_node_id
}
}
}
}
out
} ;
// Verify hop count isn't larger than out maximum routed hop count
if hopcount > self . inner . lock ( ) . max_route_hop_count {
2022-01-05 17:01:02 +00:00
return Err ( rpc_error_internal ( " hop count too long for route " ) )
. map_err ( logthru_rpc! ( warn ) ) ;
2021-11-22 16:28:30 +00:00
}
// calculate actual timeout
// timeout is number of hops times the timeout per hop
let timeout = self . inner . lock ( ) . timeout * ( hopcount as u64 ) ;
// if we need to resolve the first hop, do it
let node_ref = match out_noderef {
None = > {
// resolve node
2022-04-03 16:58:06 +00:00
self . resolve_node ( out_node_id )
2021-12-17 02:57:28 +00:00
. await
. map_err ( logthru_rpc! ( error ) ) ?
2021-11-22 16:28:30 +00:00
}
Some ( nr ) = > {
// got the node in the routing table already
nr
}
} ;
// set up op id eventual
let eventual = if wants_answer {
Some ( self . add_op_id_waiter ( op_id ) )
} else {
None
} ;
// send question
let bytes = out . len ( ) as u64 ;
2022-04-23 01:30:09 +00:00
let send_data_kind = match self
2021-11-22 16:28:30 +00:00
. network_manager ( )
2022-04-16 15:18:54 +00:00
. send_envelope ( node_ref . clone ( ) , Some ( out_node_id ) , out )
2021-11-22 16:28:30 +00:00
. await
2021-12-17 02:57:28 +00:00
. map_err ( logthru_rpc! ( error ) )
2021-11-27 17:44:21 +00:00
. map_err ( RPCError ::Internal )
2021-11-22 16:28:30 +00:00
{
2022-04-23 01:30:09 +00:00
Ok ( v ) = > v ,
Err ( e ) = > {
// Make sure to clean up op id waiter in case of error
if eventual . is_some ( ) {
self . cancel_op_id_waiter ( op_id ) ;
}
return Err ( e ) ;
2021-11-22 16:28:30 +00:00
}
2022-04-23 01:30:09 +00:00
} ;
2021-11-22 16:28:30 +00:00
// Successfully sent
let send_ts = get_timestamp ( ) ;
2022-04-18 22:49:33 +00:00
self . routing_table ( )
. stats_question_sent ( node_ref . clone ( ) , send_ts , bytes , wants_answer ) ;
2021-11-22 16:28:30 +00:00
// Pass back waitable reply completion
match eventual {
None = > {
// if we don't want an answer, don't wait for one
Ok ( None )
}
2021-11-26 14:54:38 +00:00
Some ( eventual ) = > Ok ( Some ( WaitableReply {
op_id ,
eventual ,
timeout ,
node_ref ,
send_ts ,
2022-04-23 01:30:09 +00:00
send_data_kind ,
2021-11-22 16:28:30 +00:00
} ) ) ,
}
}
// Issue a reply over the network, possibly using an anonymized route
2022-04-23 01:30:09 +00:00
// The request must want a response, or this routine fails
2021-11-22 16:28:30 +00:00
async fn reply < T : capnp ::message ::ReaderSegments > (
& self ,
request_rpcreader : RPCMessageReader ,
reply_msg : capnp ::message ::Reader < T > ,
safety_route_spec : Option < & SafetyRouteSpec > ,
) -> Result < ( ) , RPCError > {
2021-12-17 02:57:28 +00:00
log_rpc! ( self . get_rpc_reply_debug_info ( & request_rpcreader , & reply_msg , & safety_route_spec ) ) ;
2021-11-22 16:28:30 +00:00
//
let out_node_id ;
let mut out_noderef : Option < NodeRef > = None ;
let out = {
let out ;
let request_operation = request_rpcreader
. reader
. get_root ::< veilid_capnp ::operation ::Reader > ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
let reply_vec = reader_to_vec ( reply_msg ) ? ;
// To where should we respond?
match request_operation
. get_respond_to ( )
. which ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_internal! ( " invalid request operation " ) )
. map_err ( logthru_rpc! ( ) ) ?
2021-11-22 16:28:30 +00:00
{
veilid_capnp ::operation ::respond_to ::None ( _ ) = > {
// Do not respond
// --------------
2021-12-18 00:18:25 +00:00
return Err ( rpc_error_internal ( " no response requested " ) )
. map_err ( logthru_rpc! ( ) ) ;
2021-11-22 16:28:30 +00:00
}
veilid_capnp ::operation ::respond_to ::Sender ( _ ) = > {
// Respond to envelope source node, possibly through a relay if the request arrived that way
// -------------------------------
match safety_route_spec {
None = > {
// If no safety route is being used, and we're not replying to a private
// route, we can use a direct envelope instead of routing
out = reply_vec ;
// Reply directly to the request's source
out_node_id = request_rpcreader . header . envelope . get_sender_id ( ) ;
// This may be a different node's reference than the 'sender' in the case of a relay
// But in that case replies to inbound requests are returned through the inbound relay anyway
out_noderef = Some ( request_rpcreader . header . peer_noderef . clone ( ) ) ;
}
Some ( sr ) = > {
// No private route was specified for the return
// but we are using a safety route, so we must create an empty private route
let mut pr_builder = ::capnp ::message ::Builder ::new_default ( ) ;
2021-12-18 00:18:25 +00:00
let private_route = self
. new_stub_private_route (
request_rpcreader . header . envelope . get_sender_id ( ) ,
& mut pr_builder ,
)
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
out = self . wrap_with_route ( Some ( sr ) , private_route , reply_vec ) ? ;
// first
out_node_id = sr
. hops
. first ( )
2021-12-18 00:18:25 +00:00
. ok_or_else ( | | rpc_error_internal ( " no hop in safety route " ) )
. map_err ( logthru_rpc! ( ) ) ?
2021-11-22 16:28:30 +00:00
. dial_info
. node_id
. key ;
}
} ;
}
veilid_capnp ::operation ::respond_to ::PrivateRoute ( pr ) = > {
// Respond to private route
// ------------------------
// Extract private route for reply
let private_route = match pr {
Ok ( v ) = > v ,
2021-12-18 00:18:25 +00:00
Err ( _ ) = > {
return Err ( rpc_error_internal ( " invalid private route " ) )
. map_err ( logthru_rpc! ( ) )
}
2021-11-22 16:28:30 +00:00
} ;
// Reply with 'route' operation
2021-11-27 17:44:21 +00:00
out = self . wrap_with_route ( safety_route_spec , private_route , reply_vec ) ? ;
2021-11-22 16:28:30 +00:00
out_node_id = match safety_route_spec {
None = > {
// If no safety route, the first node is the first hop of the private route
if ! private_route . has_first_hop ( ) {
2021-12-18 00:18:25 +00:00
return Err ( rpc_error_internal ( " private route has no hops " ) )
. map_err ( logthru_rpc! ( ) ) ;
2021-11-22 16:28:30 +00:00
}
let hop = private_route
. get_first_hop ( )
. map_err ( map_error_internal! ( " not a valid first hop " ) ) ? ;
decode_public_key (
& hop . get_dial_info ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_internal! ( " not a valid dial info " ) )
. map_err ( logthru_rpc! ( ) ) ?
2021-11-22 16:28:30 +00:00
. get_node_id ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_internal! ( " not a valid node id " ) )
. map_err ( logthru_rpc! ( ) ) ? ,
2021-11-22 16:28:30 +00:00
)
}
Some ( sr ) = > {
// If safety route is in use, first node is the first hop of the safety route
sr . hops
. first ( )
2021-12-18 00:18:25 +00:00
. ok_or_else ( | | rpc_error_internal ( " no hop in safety route " ) )
. map_err ( logthru_rpc! ( ) ) ?
2021-11-22 16:28:30 +00:00
. dial_info
. node_id
. key
}
}
}
}
out
} ;
// if we need to resolve the first hop, do it
let node_ref = match out_noderef {
None = > {
// resolve node
2022-04-03 16:58:06 +00:00
self . resolve_node ( out_node_id ) . await ?
2021-11-22 16:28:30 +00:00
}
Some ( nr ) = > {
// got the node in the routing table already
nr
}
} ;
// Send the reply
let bytes = out . len ( ) as u64 ;
self . network_manager ( )
2022-04-16 15:18:54 +00:00
. send_envelope ( node_ref . clone ( ) , Some ( out_node_id ) , out )
2021-11-22 16:28:30 +00:00
. await
2021-11-27 17:44:21 +00:00
. map_err ( RPCError ::Internal ) ? ;
2021-11-22 16:28:30 +00:00
// Reply successfully sent
let send_ts = get_timestamp ( ) ;
2022-04-18 22:49:33 +00:00
self . routing_table ( )
. stats_answer_sent ( node_ref , send_ts , bytes ) ;
2021-11-22 16:28:30 +00:00
Ok ( ( ) )
}
2022-04-03 16:58:06 +00:00
fn wants_answer ( & self , operation : & veilid_capnp ::operation ::Reader ) -> Result < bool , RPCError > {
match operation . get_respond_to ( ) . which ( ) {
2021-11-22 16:28:30 +00:00
Ok ( veilid_capnp ::operation ::respond_to ::None ( _ ) ) = > Ok ( false ) ,
Ok ( veilid_capnp ::operation ::respond_to ::Sender ( _ ) ) = > Ok ( true ) ,
Ok ( veilid_capnp ::operation ::respond_to ::PrivateRoute ( _ ) ) = > Ok ( true ) ,
_ = > Err ( rpc_error_internal ( " Unknown respond_to " ) ) ,
}
}
2022-04-16 15:18:54 +00:00
fn get_respond_to_sender_node_info (
2022-04-03 16:58:06 +00:00
& self ,
operation : & veilid_capnp ::operation ::Reader ,
2022-04-16 15:18:54 +00:00
) -> Result < Option < NodeInfo > , RPCError > {
if let veilid_capnp ::operation ::respond_to ::Sender ( Ok ( sender_ni_reader ) ) = operation
2022-04-03 16:58:06 +00:00
. get_respond_to ( )
. which ( )
. map_err ( map_error_capnp_notinschema! ( ) ) ?
{
// Sender DialInfo was specified, update our routing table with it
2022-04-16 15:18:54 +00:00
Ok ( Some ( decode_node_info ( & sender_ni_reader , true ) ? ) )
2021-12-24 01:34:52 +00:00
} else {
2022-04-03 16:58:06 +00:00
Ok ( None )
2021-11-22 16:28:30 +00:00
}
}
//////////////////////////////////////////////////////////////////////
2022-04-17 23:10:10 +00:00
async fn generate_sender_info ( & self , peer_noderef : NodeRef ) -> SenderInfo {
let socket_address = peer_noderef
. last_connection ( )
. await
. map ( | c | c . remote . socket_address ) ;
2021-11-26 14:54:38 +00:00
SenderInfo { socket_address }
2021-11-22 16:28:30 +00:00
}
async fn process_info_q ( & self , rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
2022-04-17 23:10:10 +00:00
let peer_noderef = rpcreader . header . peer_noderef . clone ( ) ;
let sender_info = self . generate_sender_info ( peer_noderef ) . await ;
2021-11-22 16:28:30 +00:00
let reply_msg = {
let operation = rpcreader
. reader
. get_root ::< veilid_capnp ::operation ::Reader > ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
// Don't bother unless we are going to answer
if ! self . wants_answer ( & operation ) ? {
return Ok ( ( ) ) ;
}
2022-04-03 16:58:06 +00:00
// get InfoQ reader
let iq_reader = match operation . get_detail ( ) . which ( ) {
Ok ( veilid_capnp ::operation ::detail ::Which ::InfoQ ( Ok ( x ) ) ) = > x ,
_ = > panic! ( " invalid operation type in process_info_q " ) ,
} ;
// Parse out fields
2022-04-16 15:18:54 +00:00
let node_status = decode_node_status (
2022-04-03 16:58:06 +00:00
& iq_reader
2022-04-16 15:18:54 +00:00
. get_node_status ( )
. map_err ( map_error_internal! ( " no valid node status " ) ) ? ,
2022-04-03 16:58:06 +00:00
) ? ;
2022-04-16 15:18:54 +00:00
// update node status for the requesting node to our routing table
2022-04-03 16:58:06 +00:00
if let Some ( sender_nr ) = rpcreader . opt_sender_nr . clone ( ) {
2022-04-16 15:18:54 +00:00
// Update latest node status in routing table for the infoq sender
2022-04-03 16:58:06 +00:00
sender_nr . operate ( | e | {
2022-04-16 15:18:54 +00:00
e . update_node_status ( node_status ) ;
2022-04-03 16:58:06 +00:00
} ) ;
}
2021-11-22 16:28:30 +00:00
// Send info answer
let mut reply_msg = ::capnp ::message ::Builder ::new_default ( ) ;
let mut answer = reply_msg . init_root ::< veilid_capnp ::operation ::Builder > ( ) ;
answer . set_op_id ( operation . get_op_id ( ) ) ;
let mut respond_to = answer . reborrow ( ) . init_respond_to ( ) ;
respond_to . set_none ( ( ) ) ;
let detail = answer . reborrow ( ) . init_detail ( ) ;
let mut info_a = detail . init_info_a ( ) ;
2022-04-16 15:18:54 +00:00
// Add node status
let node_status = self . network_manager ( ) . generate_node_status ( ) ;
let mut nsb = info_a . reborrow ( ) . init_node_status ( ) ;
encode_node_status ( & node_status , & mut nsb ) ? ;
2021-11-22 16:28:30 +00:00
// Add sender info
let mut sib = info_a . reborrow ( ) . init_sender_info ( ) ;
encode_sender_info ( & sender_info , & mut sib ) ? ;
reply_msg . into_reader ( )
} ;
self . reply ( rpcreader , reply_msg , None ) . await
}
async fn process_validate_dial_info (
& self ,
rpcreader : RPCMessageReader ,
) -> Result < ( ) , RPCError > {
//
let ( alternate_port , redirect , dial_info , rcpt_data ) = {
let operation = rpcreader
. reader
. get_root ::< veilid_capnp ::operation ::Reader > ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
// This should never want an answer
if self . wants_answer ( & operation ) ? {
return Err ( RPCError ::InvalidFormat ) ;
}
// get validateDialInfo reader
let vdi_reader = match operation . get_detail ( ) . which ( ) {
Ok ( veilid_capnp ::operation ::detail ::Which ::ValidateDialInfo ( Ok ( x ) ) ) = > x ,
_ = > panic! ( " invalid operation type in process_validate_dial_info " ) ,
} ;
// Parse out fields
let alternate_port = vdi_reader . get_alternate_port ( ) ;
let redirect = vdi_reader . get_redirect ( ) ;
let dial_info = decode_dial_info (
& vdi_reader
. get_dial_info ( )
. map_err ( map_error_internal! ( " no valid dial info " ) ) ? ,
) ? ;
let rcpt_data = vdi_reader
. get_receipt ( )
. map_err ( map_error_internal! ( " no valid receipt " ) ) ? ;
( alternate_port , redirect , dial_info , rcpt_data )
} ;
// Redirect this request if we are asked to
if redirect {
let routing_table = self . routing_table ( ) ;
2021-12-24 01:34:52 +00:00
let filter = dial_info . make_filter ( true ) ;
2022-04-17 17:28:39 +00:00
let peers = routing_table . find_fast_public_nodes_filtered ( & filter ) ;
2021-11-27 17:44:21 +00:00
if peers . is_empty ( ) {
2021-11-22 16:28:30 +00:00
return Err ( rpc_error_internal ( format! (
2021-12-24 01:34:52 +00:00
" no peers matching filter '{:?}' " ,
filter
2021-11-22 16:28:30 +00:00
) ) ) ;
}
for peer in peers {
2021-12-24 01:34:52 +00:00
// See if this peer will validate dial info
2021-12-24 23:02:53 +00:00
let will_validate_dial_info = peer . operate ( | e : & mut BucketEntry | {
2022-04-16 15:18:54 +00:00
if let Some ( ni ) = & e . peer_stats ( ) . status {
2021-12-24 01:34:52 +00:00
ni . will_validate_dial_info
} else {
true
}
2021-12-24 23:02:53 +00:00
} ) ;
if ! will_validate_dial_info {
2021-12-24 01:34:52 +00:00
continue ;
}
2021-11-22 16:28:30 +00:00
// Make a copy of the request, without the redirect flag
let vdi_msg_reader = {
let mut vdi_msg = ::capnp ::message ::Builder ::new_default ( ) ;
let mut question = vdi_msg . init_root ::< veilid_capnp ::operation ::Builder > ( ) ;
question . set_op_id ( self . get_next_op_id ( ) ) ;
let mut respond_to = question . reborrow ( ) . init_respond_to ( ) ;
respond_to . set_none ( ( ) ) ;
let detail = question . reborrow ( ) . init_detail ( ) ;
let mut vdi_builder = detail . init_validate_dial_info ( ) ;
vdi_builder . set_alternate_port ( alternate_port ) ;
vdi_builder . set_redirect ( false ) ;
let mut di_builder = vdi_builder . reborrow ( ) . init_dial_info ( ) ;
encode_dial_info ( & dial_info , & mut di_builder ) ? ;
let r_builder = vdi_builder . reborrow ( ) . init_receipt (
rcpt_data
. len ( )
. try_into ( )
. map_err ( map_error_internal! ( " receipt too large " ) ) ? ,
) ;
r_builder . copy_from_slice ( rcpt_data ) ;
vdi_msg . into_reader ( )
} ;
// Send the validate_dial_info request until we succeed
self . request ( Destination ::Direct ( peer . clone ( ) ) , vdi_msg_reader , None )
. await ? ;
}
return Ok ( ( ) ) ;
} ;
// Otherwise send a return receipt directly
// Possibly from an alternate port
let network_manager = self . network_manager ( ) ;
network_manager
2022-01-04 19:25:32 +00:00
. send_direct_receipt ( dial_info . clone ( ) , rcpt_data , alternate_port )
2021-11-22 16:28:30 +00:00
. await
2022-01-01 18:38:39 +00:00
. map_err ( map_error_string! ( ) )
. map_err (
logthru_net! ( error " failed to send direct receipt to dial info: {}, alternate_port={} " , dial_info , alternate_port ) ,
) ? ;
2021-11-22 16:28:30 +00:00
Ok ( ( ) )
}
async fn process_find_node_q ( & self , rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
//
let reply_msg = {
let operation = rpcreader
. reader
. get_root ::< veilid_capnp ::operation ::Reader > ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
// find_node must always want an answer
if ! self . wants_answer ( & operation ) ? {
2021-12-18 00:18:25 +00:00
return Err ( RPCError ::InvalidFormat ) . map_err ( logthru_rpc! ( ) ) ;
2021-11-22 16:28:30 +00:00
}
// get findNodeQ reader
let fnq_reader = match operation . get_detail ( ) . which ( ) {
Ok ( veilid_capnp ::operation ::detail ::Which ::FindNodeQ ( Ok ( x ) ) ) = > x ,
_ = > panic! ( " invalid operation type in process_find_node_q " ) ,
} ;
2022-03-27 01:25:24 +00:00
// get the node id we want to look up
2021-12-18 00:18:25 +00:00
let target_node_id = decode_public_key (
& fnq_reader
. get_node_id ( )
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ,
) ;
2022-03-27 01:25:24 +00:00
2021-11-22 16:28:30 +00:00
// add node information for the requesting node to our routing table
let routing_table = self . routing_table ( ) ;
// find N nodes closest to the target node in our routing table
2022-04-16 15:18:54 +00:00
let own_peer_info = routing_table . get_own_peer_info ( ) ;
2021-11-22 16:28:30 +00:00
let closest_nodes = routing_table . find_closest_nodes (
target_node_id ,
// filter
None ,
// transform
2022-04-16 15:18:54 +00:00
| e | RoutingTable ::transform_to_peer_info ( e , & own_peer_info ) ,
2021-11-22 16:28:30 +00:00
) ;
2021-12-18 00:18:25 +00:00
log_rpc! ( " >>>> Returning {} closest peers " , closest_nodes . len ( ) ) ;
2021-11-22 16:28:30 +00:00
// Send find_node answer
let mut reply_msg = ::capnp ::message ::Builder ::new_default ( ) ;
let mut answer = reply_msg . init_root ::< veilid_capnp ::operation ::Builder > ( ) ;
answer . set_op_id ( operation . get_op_id ( ) ) ;
let mut respond_to = answer . reborrow ( ) . init_respond_to ( ) ;
respond_to . set_none ( ( ) ) ;
let detail = answer . reborrow ( ) . init_detail ( ) ;
let info_a = detail . init_find_node_a ( ) ;
let mut peers_builder = info_a . init_peers (
closest_nodes
. len ( )
. try_into ( )
. map_err ( map_error_internal! ( " invalid closest nodes list length " ) ) ? ,
) ;
2021-11-27 17:44:21 +00:00
for ( i , closest_node ) in closest_nodes . iter ( ) . enumerate ( ) {
2021-11-22 16:28:30 +00:00
let mut pi_builder = peers_builder . reborrow ( ) . get ( i as u32 ) ;
2021-11-27 17:44:21 +00:00
encode_peer_info ( closest_node , & mut pi_builder ) ? ;
2021-11-22 16:28:30 +00:00
}
reply_msg . into_reader ( )
} ;
self . reply ( rpcreader , reply_msg , None ) . await
}
async fn process_route ( & self , _rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
// xxx do not process latency for routed messages
Err ( rpc_error_unimplemented ( " process_route " ) )
}
async fn process_get_value_q ( & self , _rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
Err ( rpc_error_unimplemented ( " process_get_value_q " ) )
}
async fn process_set_value_q ( & self , _rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
Err ( rpc_error_unimplemented ( " process_set_value_q " ) )
}
async fn process_watch_value_q ( & self , _rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
Err ( rpc_error_unimplemented ( " process_watch_value_q " ) )
}
async fn process_value_changed ( & self , _rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
Err ( rpc_error_unimplemented ( " process_value_changed " ) )
}
async fn process_supply_block_q ( & self , _rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
Err ( rpc_error_unimplemented ( " process_supply_block_q " ) )
}
async fn process_find_block_q ( & self , _rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
Err ( rpc_error_unimplemented ( " process_find_block_q " ) )
}
2022-04-17 23:10:10 +00:00
async fn process_signal ( & self , rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
let signal_info = {
let operation = rpcreader
. reader
. get_root ::< veilid_capnp ::operation ::Reader > ( )
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
// This should never want an answer
if self . wants_answer ( & operation ) ? {
return Err ( RPCError ::InvalidFormat ) ;
}
// get signal reader
let sig_reader = match operation . get_detail ( ) . which ( ) {
Ok ( veilid_capnp ::operation ::detail ::Which ::Signal ( Ok ( x ) ) ) = > x ,
_ = > panic! ( " invalid operation type in process_signal " ) ,
} ;
// Get signal info
decode_signal_info ( & sig_reader ) ?
} ;
// Handle it
let network_manager = self . network_manager ( ) ;
network_manager
. handle_signal ( signal_info )
. await
. map_err ( map_error_string! ( ) )
2021-11-22 16:28:30 +00:00
}
async fn process_return_receipt ( & self , rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
let rcpt_data = {
let operation = rpcreader
. reader
. get_root ::< veilid_capnp ::operation ::Reader > ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
// This should never want an answer
if self . wants_answer ( & operation ) ? {
return Err ( RPCError ::InvalidFormat ) ;
}
// get returnReceipt reader
let rr_reader = match operation . get_detail ( ) . which ( ) {
Ok ( veilid_capnp ::operation ::detail ::Which ::ReturnReceipt ( Ok ( x ) ) ) = > x ,
_ = > panic! ( " invalid operation type in process_return_receipt " ) ,
} ;
// Get receipt data
let rcpt_data = rr_reader
. get_receipt ( )
. map_err ( map_error_internal! ( " no valid receipt " ) ) ? ;
rcpt_data . to_vec ( )
} ;
// Handle it
let network_manager = self . network_manager ( ) ;
network_manager
2022-04-17 23:10:10 +00:00
. handle_in_band_receipt ( rcpt_data , rpcreader . header . peer_noderef )
2021-11-22 16:28:30 +00:00
. await
. map_err ( map_error_string! ( ) )
}
async fn process_start_tunnel_q ( & self , _rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
Err ( rpc_error_unimplemented ( " process_start_tunnel_q " ) )
}
async fn process_complete_tunnel_q (
& self ,
_rpcreader : RPCMessageReader ,
) -> Result < ( ) , RPCError > {
Err ( rpc_error_unimplemented ( " process_complete_tunnel_q " ) )
}
async fn process_cancel_tunnel_q ( & self , _rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
Err ( rpc_error_unimplemented ( " process_cancel_tunnel_q " ) )
}
async fn process_answer ( & self , rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
// pass answer to the appropriate rpc waiter
let op_id = {
let operation = rpcreader
. reader
. get_root ::< veilid_capnp ::operation ::Reader > ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
operation . get_op_id ( )
} ;
Ok ( self . complete_op_id_waiter ( op_id , rpcreader ) . await ? )
}
//////////////////////////////////////////////////////////////////////
async fn process_rpc_message_version_0 ( & self , msg : RPCMessage ) -> Result < ( ) , RPCError > {
let reader = capnp ::message ::Reader ::new ( msg . data , Default ::default ( ) ) ;
2022-04-03 16:58:06 +00:00
let mut opt_sender_nr : Option < NodeRef > = None ;
2022-03-27 01:25:24 +00:00
let which = {
2022-04-03 16:58:06 +00:00
let operation = reader
2021-11-22 16:28:30 +00:00
. get_root ::< veilid_capnp ::operation ::Reader > ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
2022-03-27 01:25:24 +00:00
let ( which , is_q ) = match operation
2021-11-22 16:28:30 +00:00
. get_detail ( )
. which ( )
. map_err ( map_error_capnp_notinschema! ( ) ) ?
{
veilid_capnp ::operation ::detail ::InfoQ ( _ ) = > ( 0 u32 , true ) ,
veilid_capnp ::operation ::detail ::InfoA ( _ ) = > ( 1 u32 , false ) ,
veilid_capnp ::operation ::detail ::ValidateDialInfo ( _ ) = > ( 2 u32 , true ) ,
veilid_capnp ::operation ::detail ::FindNodeQ ( _ ) = > ( 3 u32 , true ) ,
veilid_capnp ::operation ::detail ::FindNodeA ( _ ) = > ( 4 u32 , false ) ,
veilid_capnp ::operation ::detail ::Route ( _ ) = > ( 5 u32 , true ) ,
veilid_capnp ::operation ::detail ::GetValueQ ( _ ) = > ( 6 u32 , true ) ,
veilid_capnp ::operation ::detail ::GetValueA ( _ ) = > ( 7 u32 , false ) ,
veilid_capnp ::operation ::detail ::SetValueQ ( _ ) = > ( 8 u32 , true ) ,
veilid_capnp ::operation ::detail ::SetValueA ( _ ) = > ( 9 u32 , false ) ,
veilid_capnp ::operation ::detail ::WatchValueQ ( _ ) = > ( 10 u32 , true ) ,
veilid_capnp ::operation ::detail ::WatchValueA ( _ ) = > ( 11 u32 , false ) ,
veilid_capnp ::operation ::detail ::ValueChanged ( _ ) = > ( 12 u32 , true ) ,
veilid_capnp ::operation ::detail ::SupplyBlockQ ( _ ) = > ( 13 u32 , true ) ,
veilid_capnp ::operation ::detail ::SupplyBlockA ( _ ) = > ( 14 u32 , false ) ,
veilid_capnp ::operation ::detail ::FindBlockQ ( _ ) = > ( 15 u32 , true ) ,
veilid_capnp ::operation ::detail ::FindBlockA ( _ ) = > ( 16 u32 , false ) ,
2022-04-16 15:18:54 +00:00
veilid_capnp ::operation ::detail ::Signal ( _ ) = > ( 17 u32 , true ) ,
veilid_capnp ::operation ::detail ::ReturnReceipt ( _ ) = > ( 18 u32 , true ) ,
veilid_capnp ::operation ::detail ::StartTunnelQ ( _ ) = > ( 19 u32 , true ) ,
veilid_capnp ::operation ::detail ::StartTunnelA ( _ ) = > ( 20 u32 , false ) ,
veilid_capnp ::operation ::detail ::CompleteTunnelQ ( _ ) = > ( 21 u32 , true ) ,
veilid_capnp ::operation ::detail ::CompleteTunnelA ( _ ) = > ( 22 u32 , false ) ,
veilid_capnp ::operation ::detail ::CancelTunnelQ ( _ ) = > ( 23 u32 , true ) ,
veilid_capnp ::operation ::detail ::CancelTunnelA ( _ ) = > ( 24 u32 , false ) ,
2022-03-27 01:25:24 +00:00
} ;
// Accounting for questions we receive
if is_q {
2022-04-16 15:18:54 +00:00
// See if we have some Sender NodeInfo to incorporate
2022-04-03 16:58:06 +00:00
opt_sender_nr =
2022-04-16 15:18:54 +00:00
if let Some ( sender_ni ) = self . get_respond_to_sender_node_info ( & operation ) ? {
// Sender NodeInfo was specified, update our routing table with it
2022-04-25 00:16:13 +00:00
if ! self . filter_peer_scope ( & sender_ni ) {
return Err ( RPCError ::InvalidFormat ) ;
}
2022-03-27 01:25:24 +00:00
let nr = self
. routing_table ( )
2022-04-16 15:18:54 +00:00
. register_node_with_node_info (
2022-04-03 16:58:06 +00:00
msg . header . envelope . get_sender_id ( ) ,
2022-04-16 15:18:54 +00:00
sender_ni ,
2022-03-27 01:25:24 +00:00
)
. map_err ( RPCError ::Internal ) ? ;
Some ( nr )
} else {
2022-04-03 16:58:06 +00:00
// look up sender node, in case it's different than our peer due to relaying
2022-03-27 01:25:24 +00:00
self . routing_table ( )
2022-04-03 16:58:06 +00:00
. lookup_node_ref ( msg . header . envelope . get_sender_id ( ) )
2022-03-27 01:25:24 +00:00
} ;
2022-04-03 16:58:06 +00:00
if let Some ( sender_nr ) = opt_sender_nr . clone ( ) {
2022-04-18 22:49:33 +00:00
self . routing_table ( ) . stats_question_rcvd (
sender_nr ,
msg . header . timestamp ,
msg . header . body_len ,
) ;
2021-11-22 16:28:30 +00:00
}
2022-03-27 01:25:24 +00:00
} ;
which
2021-11-22 16:28:30 +00:00
} ;
2022-03-27 01:25:24 +00:00
2022-04-03 16:58:06 +00:00
let rpcreader = RPCMessageReader {
header : msg . header ,
reader ,
opt_sender_nr ,
} ;
2021-11-22 16:28:30 +00:00
match which {
0 = > self . process_info_q ( rpcreader ) . await , // InfoQ
1 = > self . process_answer ( rpcreader ) . await , // InfoA
2 = > self . process_validate_dial_info ( rpcreader ) . await , // ValidateDialInfo
3 = > self . process_find_node_q ( rpcreader ) . await , // FindNodeQ
4 = > self . process_answer ( rpcreader ) . await , // FindNodeA
5 = > self . process_route ( rpcreader ) . await , // Route
6 = > self . process_get_value_q ( rpcreader ) . await , // GetValueQ
7 = > self . process_answer ( rpcreader ) . await , // GetValueA
8 = > self . process_set_value_q ( rpcreader ) . await , // SetValueQ
9 = > self . process_answer ( rpcreader ) . await , // SetValueA
10 = > self . process_watch_value_q ( rpcreader ) . await , // WatchValueQ
11 = > self . process_answer ( rpcreader ) . await , // WatchValueA
12 = > self . process_value_changed ( rpcreader ) . await , // ValueChanged
13 = > self . process_supply_block_q ( rpcreader ) . await , // SupplyBlockQ
14 = > self . process_answer ( rpcreader ) . await , // SupplyBlockA
15 = > self . process_find_block_q ( rpcreader ) . await , // FindBlockQ
16 = > self . process_answer ( rpcreader ) . await , // FindBlockA
2022-04-16 15:18:54 +00:00
17 = > self . process_signal ( rpcreader ) . await , // SignalQ
18 = > self . process_return_receipt ( rpcreader ) . await , // ReturnReceipt
19 = > self . process_start_tunnel_q ( rpcreader ) . await , // StartTunnelQ
20 = > self . process_answer ( rpcreader ) . await , // StartTunnelA
21 = > self . process_complete_tunnel_q ( rpcreader ) . await , // CompleteTunnelQ
22 = > self . process_answer ( rpcreader ) . await , // CompleteTunnelA
23 = > self . process_cancel_tunnel_q ( rpcreader ) . await , // CancelTunnelQ
24 = > self . process_answer ( rpcreader ) . await , // CancelTunnelA
2021-11-22 16:28:30 +00:00
_ = > panic! ( " must update rpc table " ) ,
}
}
async fn process_rpc_message ( & self , msg : RPCMessage ) -> Result < ( ) , RPCError > {
if msg . header . envelope . get_version ( ) = = 0 {
self . process_rpc_message_version_0 ( msg ) . await
} else {
Err ( RPCError ::Internal ( format! (
" unsupported envelope version: {}, newest supported is version 0 " ,
msg . header . envelope . get_version ( )
) ) )
}
}
2022-03-10 15:18:47 +00:00
async fn rpc_worker ( self , receiver : flume ::Receiver < RPCMessage > ) {
while let Ok ( msg ) = receiver . recv_async ( ) . await {
2021-12-17 02:57:28 +00:00
let _ = self
. process_rpc_message ( msg )
. await
. map_err ( logthru_rpc! ( " couldn't process rpc message " ) ) ;
2021-11-22 16:28:30 +00:00
}
}
pub async fn startup ( & self ) -> Result < ( ) , String > {
2022-03-10 14:51:53 +00:00
trace! ( " startup rpc processor " ) ;
2021-11-22 16:28:30 +00:00
let mut inner = self . inner . lock ( ) ;
// make local copy of node id for easy access
let c = self . config . get ( ) ;
inner . node_id = c . network . node_id ;
inner . node_id_secret = c . network . node_id_secret ;
// set up channel
let mut concurrency = c . network . rpc . concurrency ;
let mut queue_size = c . network . rpc . queue_size ;
2022-01-27 14:53:01 +00:00
let mut timeout = ms_to_us ( c . network . rpc . timeout_ms ) ;
2021-11-22 16:28:30 +00:00
let mut max_route_hop_count = c . network . rpc . max_route_hop_count as usize ;
if concurrency = = 0 {
concurrency = get_concurrency ( ) / 2 ;
if concurrency = = 0 {
concurrency = 1 ;
}
}
if queue_size = = 0 {
queue_size = 1024 ;
}
if timeout = = 0 {
timeout = 10000000 ;
}
if max_route_hop_count = = 0 {
max_route_hop_count = 7 usize ;
}
inner . timeout = timeout ;
inner . max_route_hop_count = max_route_hop_count ;
2022-03-10 15:18:47 +00:00
let channel = flume ::bounded ( queue_size as usize ) ;
2021-12-17 02:57:28 +00:00
inner . send_channel = Some ( channel . 0. clone ( ) ) ;
2021-11-22 16:28:30 +00:00
// spin up N workers
trace! ( " Spinning up {} RPC workers " , concurrency ) ;
for _ in 0 .. concurrency {
let this = self . clone ( ) ;
let receiver = channel . 1. clone ( ) ;
let jh = spawn ( Self ::rpc_worker ( this , receiver ) ) ;
inner . worker_join_handles . push ( jh ) ;
}
Ok ( ( ) )
}
pub async fn shutdown ( & self ) {
* self . inner . lock ( ) = Self ::new_inner ( self . network_manager ( ) ) ;
}
2022-01-05 17:01:02 +00:00
pub fn enqueue_message (
2021-11-22 16:28:30 +00:00
& self ,
envelope : envelope ::Envelope ,
body : Vec < u8 > ,
peer_noderef : NodeRef ,
2021-12-17 02:57:28 +00:00
) -> Result < ( ) , String > {
2021-11-22 16:28:30 +00:00
let msg = RPCMessage {
header : RPCMessageHeader {
timestamp : get_timestamp ( ) ,
2021-11-26 14:54:38 +00:00
envelope ,
2021-11-22 16:28:30 +00:00
body_len : body . len ( ) as u64 ,
2021-11-26 14:54:38 +00:00
peer_noderef ,
2021-11-22 16:28:30 +00:00
} ,
data : RPCMessageData { contents : body } ,
} ;
let send_channel = {
let inner = self . inner . lock ( ) ;
2021-12-17 02:57:28 +00:00
inner . send_channel . as_ref ( ) . unwrap ( ) . clone ( )
2021-11-22 16:28:30 +00:00
} ;
2021-12-17 02:57:28 +00:00
send_channel
. try_send ( msg )
. map_err ( | e | format! ( " failed to enqueue received RPC message: {:?} " , e ) ) ? ;
2021-11-22 16:28:30 +00:00
Ok ( ( ) )
}
2022-03-25 02:07:55 +00:00
// Gets a 'RespondTo::Sender' that contains either our dial info,
2022-04-24 02:08:02 +00:00
// or None if the peer has seen our dial info before or our node info is not yet valid
// because of an unknown network class
pub fn make_respond_to_sender ( & self , peer : NodeRef ) -> RespondTo {
let our_node_info = self . routing_table ( ) . get_own_peer_info ( ) . node_info ;
if peer . has_seen_our_node_info ( )
| | matches! ( our_node_info . network_class , NetworkClass ::Invalid )
{
2022-03-25 02:07:55 +00:00
RespondTo ::Sender ( None )
} else {
2022-04-24 02:08:02 +00:00
RespondTo ::Sender ( Some ( our_node_info ) )
2022-03-25 02:07:55 +00:00
}
}
2021-11-22 16:28:30 +00:00
// Send InfoQ RPC request, receive InfoA answer
2022-04-16 15:18:54 +00:00
// Can be sent via relays, but not via routes
2021-11-22 16:28:30 +00:00
pub async fn rpc_call_info ( self , peer : NodeRef ) -> Result < InfoAnswer , RPCError > {
let info_q_msg = {
let mut info_q_msg = ::capnp ::message ::Builder ::new_default ( ) ;
let mut question = info_q_msg . init_root ::< veilid_capnp ::operation ::Builder > ( ) ;
question . set_op_id ( self . get_next_op_id ( ) ) ;
let mut respond_to = question . reborrow ( ) . init_respond_to ( ) ;
2022-04-24 02:08:02 +00:00
self . make_respond_to_sender ( peer . clone ( ) )
2022-03-27 01:25:24 +00:00
. encode ( & mut respond_to ) ? ;
2021-11-22 16:28:30 +00:00
let detail = question . reborrow ( ) . init_detail ( ) ;
2022-04-03 16:58:06 +00:00
let mut iqb = detail . init_info_q ( ) ;
2022-04-16 15:18:54 +00:00
let mut node_status_builder = iqb . reborrow ( ) . init_node_status ( ) ;
let node_status = self . network_manager ( ) . generate_node_status ( ) ;
encode_node_status ( & node_status , & mut node_status_builder ) ? ;
2021-11-22 16:28:30 +00:00
info_q_msg . into_reader ( )
} ;
// Send the info request
let waitable_reply = self
. request ( Destination ::Direct ( peer . clone ( ) ) , info_q_msg , None )
. await ?
. unwrap ( ) ;
2022-04-23 01:30:09 +00:00
// Note what kind of ping this was and to what peer scope
let send_data_kind = waitable_reply . send_data_kind ;
2021-11-22 16:28:30 +00:00
// Wait for reply
let ( rpcreader , latency ) = self . wait_for_reply ( waitable_reply ) . await ? ;
let response_operation = rpcreader
. reader
. get_root ::< veilid_capnp ::operation ::Reader > ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
let info_a = match response_operation
. get_detail ( )
. which ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_notinschema! ( ) )
. map_err ( logthru_rpc! ( ) ) ?
2021-11-22 16:28:30 +00:00
{
veilid_capnp ::operation ::detail ::InfoA ( a ) = > {
a . map_err ( map_error_internal! ( " Invalid InfoA " ) ) ?
}
_ = > return Err ( rpc_error_internal ( " Incorrect RPC answer for question " ) ) ,
} ;
// Decode node info
2022-04-16 15:18:54 +00:00
if ! info_a . has_node_status ( ) {
return Err ( rpc_error_internal ( " Missing node status " ) ) ;
2021-11-22 16:28:30 +00:00
}
2022-04-16 15:18:54 +00:00
let nsr = info_a
. get_node_status ( )
. map_err ( map_error_internal! ( " Broken node status " ) ) ? ;
let node_status = decode_node_status ( & nsr ) ? ;
2021-11-22 16:28:30 +00:00
// Decode sender info
let sender_info = if info_a . has_sender_info ( ) {
let sir = info_a
. get_sender_info ( )
. map_err ( map_error_internal! ( " Broken sender info " ) ) ? ;
decode_sender_info ( & sir ) ?
} else {
SenderInfo ::default ( )
} ;
2022-04-16 15:18:54 +00:00
// Update latest node status in routing table
2021-11-22 16:28:30 +00:00
peer . operate ( | e | {
2022-04-16 15:18:54 +00:00
e . update_node_status ( node_status . clone ( ) ) ;
2021-11-22 16:28:30 +00:00
} ) ;
2022-04-23 01:30:09 +00:00
// Report sender_info IP addresses to network manager
if let Some ( socket_address ) = sender_info . socket_address {
match send_data_kind {
SendDataKind ::LocalDirect = > {
self . network_manager ( )
. report_local_socket_address ( socket_address , peer )
. await ;
}
SendDataKind ::GlobalDirect = > {
self . network_manager ( )
. report_global_socket_address ( socket_address , peer )
. await ;
}
SendDataKind ::GlobalIndirect = > {
// Do nothing in this case, as the socket address returned here would be for any node other than ours
}
}
}
2021-11-22 16:28:30 +00:00
// Return the answer for anyone who may care
let out = InfoAnswer {
2021-11-26 14:54:38 +00:00
latency ,
2022-04-16 15:18:54 +00:00
node_status ,
2021-11-26 14:54:38 +00:00
sender_info ,
2021-11-22 16:28:30 +00:00
} ;
Ok ( out )
}
2022-04-16 15:18:54 +00:00
// Can only be sent directly, not via relays or routes
2021-11-22 16:28:30 +00:00
pub async fn rpc_call_validate_dial_info (
& self ,
peer : NodeRef ,
dial_info : DialInfo ,
redirect : bool ,
alternate_port : bool ,
) -> Result < bool , RPCError > {
let network_manager = self . network_manager ( ) ;
2022-01-27 14:53:01 +00:00
let receipt_time = ms_to_us (
self . config
. get ( )
. network
. dht
. validate_dial_info_receipt_time_ms ,
) ;
2021-11-22 16:28:30 +00:00
//
let ( vdi_msg , eventual_value ) = {
let mut vdi_msg = ::capnp ::message ::Builder ::new_default ( ) ;
let mut question = vdi_msg . init_root ::< veilid_capnp ::operation ::Builder > ( ) ;
question . set_op_id ( self . get_next_op_id ( ) ) ;
let mut respond_to = question . reborrow ( ) . init_respond_to ( ) ;
respond_to . set_none ( ( ) ) ;
let detail = question . reborrow ( ) . init_detail ( ) ;
let mut vdi_builder = detail . init_validate_dial_info ( ) ;
// Generate receipt and waitable eventual so we can see if we get the receipt back
let ( rcpt_data , eventual_value ) = network_manager
. generate_single_shot_receipt ( receipt_time , [ ] )
. map_err ( map_error_string! ( ) ) ? ;
vdi_builder . set_redirect ( redirect ) ;
vdi_builder . set_alternate_port ( alternate_port ) ;
let mut di_builder = vdi_builder . reborrow ( ) . init_dial_info ( ) ;
encode_dial_info ( & dial_info , & mut di_builder ) ? ;
let r_builder = vdi_builder . reborrow ( ) . init_receipt (
rcpt_data
. len ( )
. try_into ( )
. map_err ( map_error_internal! ( " receipt too large " ) ) ? ,
) ;
r_builder . copy_from_slice ( rcpt_data . as_slice ( ) ) ;
( vdi_msg . into_reader ( ) , eventual_value )
} ;
// Send the validate_dial_info request
2022-04-16 15:18:54 +00:00
// This can only be sent directly, as relays can not validate dial info
self . request ( Destination ::Direct ( peer ) , vdi_msg , None )
2021-11-22 16:28:30 +00:00
. await ? ;
// Wait for receipt
match eventual_value . await {
2022-04-17 17:28:39 +00:00
ReceiptEvent ::Returned ( _ ) = > Ok ( true ) ,
2021-11-27 17:44:21 +00:00
ReceiptEvent ::Expired = > Ok ( false ) ,
2022-03-13 16:45:36 +00:00
ReceiptEvent ::Cancelled = > {
Err ( rpc_error_internal ( " receipt was dropped before expiration " ) )
}
2021-11-22 16:28:30 +00:00
}
}
// Send FindNodeQ RPC request, receive FindNodeA answer
2022-04-16 15:18:54 +00:00
// Can be sent via all methods including relays and routes
2021-11-22 16:28:30 +00:00
pub async fn rpc_call_find_node (
self ,
dest : Destination ,
key : key ::DHTKey ,
safety_route : Option < & SafetyRouteSpec > ,
respond_to : RespondTo ,
) -> Result < FindNodeAnswer , RPCError > {
let find_node_q_msg = {
let mut find_node_q_msg = ::capnp ::message ::Builder ::new_default ( ) ;
let mut question = find_node_q_msg . init_root ::< veilid_capnp ::operation ::Builder > ( ) ;
question . set_op_id ( self . get_next_op_id ( ) ) ;
let mut respond_to_builder = question . reborrow ( ) . init_respond_to ( ) ;
respond_to . encode ( & mut respond_to_builder ) ? ;
let detail = question . reborrow ( ) . init_detail ( ) ;
let mut fnq = detail . init_find_node_q ( ) ;
let mut node_id_builder = fnq . reborrow ( ) . init_node_id ( ) ;
encode_public_key ( & key , & mut node_id_builder ) ? ;
find_node_q_msg . into_reader ( )
} ;
// Send the find_node request
let waitable_reply = self
. request ( dest , find_node_q_msg , safety_route )
. await ?
. unwrap ( ) ;
// Wait for reply
let ( rpcreader , latency ) = self . wait_for_reply ( waitable_reply ) . await ? ;
let response_operation = rpcreader
. reader
. get_root ::< veilid_capnp ::operation ::Reader > ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
let find_node_a = match response_operation
. get_detail ( )
. which ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_notinschema! ( ) )
. map_err ( logthru_rpc! ( ) ) ?
2021-11-22 16:28:30 +00:00
{
veilid_capnp ::operation ::detail ::FindNodeA ( a ) = > {
a . map_err ( map_error_internal! ( " Invalid FindNodeA " ) ) ?
}
_ = > return Err ( rpc_error_internal ( " Incorrect RPC answer for question " ) ) ,
} ;
let peers_reader = find_node_a
. get_peers ( )
. map_err ( map_error_internal! ( " Missing peers " ) ) ? ;
2021-11-26 14:54:38 +00:00
let mut peers = Vec ::< PeerInfo > ::with_capacity (
2021-11-22 16:28:30 +00:00
peers_reader
. len ( )
. try_into ( )
. map_err ( map_error_internal! ( " too many peers " ) ) ? ,
) ;
for p in peers_reader . iter ( ) {
2022-04-16 15:18:54 +00:00
let peer_info = decode_peer_info ( & p , true ) ? ;
2021-11-22 16:28:30 +00:00
2022-04-25 00:16:13 +00:00
if ! self . filter_peer_scope ( & peer_info . node_info ) {
2021-12-27 16:31:31 +00:00
return Err ( RPCError ::InvalidFormat ) ;
2021-11-22 16:28:30 +00:00
}
2021-11-26 14:54:38 +00:00
peers . push ( peer_info ) ;
2021-11-22 16:28:30 +00:00
}
2021-11-26 14:54:38 +00:00
let out = FindNodeAnswer { latency , peers } ;
2021-11-22 16:28:30 +00:00
Ok ( out )
}
2022-04-16 15:18:54 +00:00
// Sends a unidirectional signal to a node
// Can be sent via all methods including relays and routes
pub async fn rpc_call_signal (
& self ,
dest : Destination ,
safety_route : Option < & SafetyRouteSpec > ,
signal_info : SignalInfo ,
) -> Result < ( ) , RPCError > {
let sig_msg = {
let mut sig_msg = ::capnp ::message ::Builder ::new_default ( ) ;
let mut question = sig_msg . init_root ::< veilid_capnp ::operation ::Builder > ( ) ;
question . set_op_id ( self . get_next_op_id ( ) ) ;
let mut respond_to = question . reborrow ( ) . init_respond_to ( ) ;
respond_to . set_none ( ( ) ) ;
let detail = question . reborrow ( ) . init_detail ( ) ;
let mut sig_builder = detail . init_signal ( ) ;
encode_signal_info ( & signal_info , & mut sig_builder ) ? ;
sig_msg . into_reader ( )
} ;
// Send the signal request
self . request ( dest , sig_msg , safety_route ) . await ? ;
Ok ( ( ) )
}
2022-04-17 23:10:10 +00:00
// Sends a unidirectional in-band return receipt
// Can be sent via all methods including relays and routes
pub async fn rpc_call_return_receipt < B : AsRef < [ u8 ] > > (
& self ,
dest : Destination ,
safety_route : Option < & SafetyRouteSpec > ,
rcpt_data : B ,
) -> Result < ( ) , RPCError > {
// Validate receipt before we send it, otherwise this may be arbitrary data!
let _ = Receipt ::from_signed_data ( rcpt_data . as_ref ( ) )
. map_err ( | _ | " failed to validate direct receipt " . to_owned ( ) )
. map_err ( map_error_string! ( ) ) ? ;
let rr_msg = {
let mut rr_msg = ::capnp ::message ::Builder ::new_default ( ) ;
let mut question = rr_msg . init_root ::< veilid_capnp ::operation ::Builder > ( ) ;
question . set_op_id ( self . get_next_op_id ( ) ) ;
let mut respond_to = question . reborrow ( ) . init_respond_to ( ) ;
respond_to . set_none ( ( ) ) ;
let detail = question . reborrow ( ) . init_detail ( ) ;
let rr_builder = detail . init_return_receipt ( ) ;
let r_builder = rr_builder . init_receipt ( rcpt_data . as_ref ( ) . len ( ) . try_into ( ) . map_err (
map_error_protocol! ( " invalid receipt length in return receipt " ) ,
) ? ) ;
r_builder . copy_from_slice ( rcpt_data . as_ref ( ) ) ;
rr_msg . into_reader ( )
} ;
// Send the return receipt request
self . request ( dest , rr_msg , safety_route ) . await ? ;
Ok ( ( ) )
}
2021-11-22 16:28:30 +00:00
// xxx do not process latency for routed messages
}