2021-11-22 16:28:30 +00:00
mod coders ;
2021-12-17 02:57:28 +00:00
mod debug ;
mod private_route ;
pub use debug ::* ;
pub use private_route ::* ;
2021-11-22 16:28:30 +00:00
use crate ::dht ::* ;
use crate ::intf ::* ;
use crate ::xx ::* ;
use crate ::* ;
use capnp ::message ::ReaderSegments ;
use coders ::* ;
use core ::convert ::{ TryFrom , TryInto } ;
use core ::fmt ;
use network_manager ::* ;
use receipt_manager ::* ;
use routing_table ::* ;
/////////////////////////////////////////////////////////////////////
type OperationId = u64 ;
2021-12-14 19:20:05 +00:00
#[ derive(Debug, Clone) ]
2021-11-22 16:28:30 +00:00
pub enum Destination {
Direct ( NodeRef ) ,
PrivateRoute ( PrivateRoute ) ,
}
2021-12-14 19:20:05 +00:00
#[ derive(Debug, Clone) ]
2021-11-22 16:28:30 +00:00
pub enum RespondTo {
None ,
2022-04-08 14:17:09 +00:00
Sender ( Option < NodeInfo > ) ,
2021-11-22 16:28:30 +00:00
PrivateRoute ( PrivateRoute ) ,
}
impl RespondTo {
pub fn encode (
& self ,
builder : & mut veilid_capnp ::operation ::respond_to ::Builder ,
) -> Result < ( ) , RPCError > {
match self {
Self ::None = > {
builder . set_none ( ( ) ) ;
}
2022-04-08 14:17:09 +00:00
Self ::Sender ( Some ( ni ) ) = > {
let mut ni_builder = builder . reborrow ( ) . init_sender ( ) ;
encode_node_info ( ni , & mut ni_builder ) ? ;
2022-03-25 02:07:55 +00:00
}
Self ::Sender ( None ) = > {
2022-03-27 01:25:24 +00:00
builder . reborrow ( ) . init_sender ( ) ;
2021-11-22 16:28:30 +00:00
}
Self ::PrivateRoute ( pr ) = > {
let mut pr_builder = builder . reborrow ( ) . init_private_route ( ) ;
2021-11-27 17:44:21 +00:00
encode_private_route ( pr , & mut pr_builder ) ? ;
2021-11-22 16:28:30 +00:00
}
} ;
Ok ( ( ) )
}
}
#[ derive(Debug, Clone) ]
struct RPCMessageHeader {
timestamp : u64 ,
envelope : envelope ::Envelope ,
body_len : u64 ,
peer_noderef : NodeRef , // ensures node doesn't get evicted from routing table until we're done with it
}
#[ derive(Debug, Clone) ]
struct RPCMessageData {
contents : Vec < u8 > , // rpc messages must be a canonicalized single segment
}
impl ReaderSegments for RPCMessageData {
2021-11-27 17:44:21 +00:00
fn get_segment ( & self , idx : u32 ) -> Option < & [ u8 ] > {
2021-11-22 16:28:30 +00:00
if idx > 0 {
None
} else {
Some ( self . contents . as_slice ( ) )
}
}
}
#[ derive(Debug) ]
struct RPCMessage {
header : RPCMessageHeader ,
data : RPCMessageData ,
}
struct RPCMessageReader {
header : RPCMessageHeader ,
reader : capnp ::message ::Reader < RPCMessageData > ,
2022-04-03 16:58:06 +00:00
opt_sender_nr : Option < NodeRef > ,
2021-11-22 16:28:30 +00:00
}
fn builder_to_vec < ' a , T > ( builder : capnp ::message ::Builder < T > ) -> Result < Vec < u8 > , RPCError >
where
T : capnp ::message ::Allocator + ' a ,
{
let wordvec = builder
. into_reader ( )
. canonicalize ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
Ok ( capnp ::Word ::words_to_bytes ( wordvec . as_slice ( ) ) . to_vec ( ) )
}
fn reader_to_vec < ' a , T > ( reader : capnp ::message ::Reader < T > ) -> Result < Vec < u8 > , RPCError >
where
T : capnp ::message ::ReaderSegments + ' a ,
{
2021-12-18 00:18:25 +00:00
let wordvec = reader
. canonicalize ( )
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
Ok ( capnp ::Word ::words_to_bytes ( wordvec . as_slice ( ) ) . to_vec ( ) )
}
#[ derive(Debug) ]
struct WaitableReply {
op_id : OperationId ,
eventual : EventualValue < RPCMessageReader > ,
timeout : u64 ,
node_ref : NodeRef ,
send_ts : u64 ,
is_ping : bool ,
}
/////////////////////////////////////////////////////////////////////
#[ derive(Clone, Debug, Default) ]
pub struct InfoAnswer {
pub latency : u64 ,
pub node_info : NodeInfo ,
pub sender_info : SenderInfo ,
}
#[ derive(Clone, Debug, Default) ]
pub struct FindNodeAnswer {
pub latency : u64 , // how long it took to get this answer
pub peers : Vec < PeerInfo > , // the list of closer peers
}
/////////////////////////////////////////////////////////////////////
pub struct RPCProcessorInner {
network_manager : NetworkManager ,
routing_table : RoutingTable ,
node_id : key ::DHTKey ,
node_id_secret : key ::DHTKeySecret ,
2022-03-10 15:18:47 +00:00
send_channel : Option < flume ::Sender < RPCMessage > > ,
2021-11-22 16:28:30 +00:00
timeout : u64 ,
max_route_hop_count : usize ,
waiting_rpc_table : BTreeMap < OperationId , EventualValue < RPCMessageReader > > ,
worker_join_handles : Vec < JoinHandle < ( ) > > ,
}
#[ derive(Clone) ]
pub struct RPCProcessor {
crypto : Crypto ,
config : VeilidConfig ,
2021-12-27 16:31:31 +00:00
default_peer_scope : PeerScope ,
2021-11-22 16:28:30 +00:00
inner : Arc < Mutex < RPCProcessorInner > > ,
}
impl RPCProcessor {
fn new_inner ( network_manager : NetworkManager ) -> RPCProcessorInner {
RPCProcessorInner {
network_manager : network_manager . clone ( ) ,
routing_table : network_manager . routing_table ( ) ,
node_id : key ::DHTKey ::default ( ) ,
node_id_secret : key ::DHTKeySecret ::default ( ) ,
2021-12-17 02:57:28 +00:00
send_channel : None ,
2021-11-22 16:28:30 +00:00
timeout : 10000000 ,
max_route_hop_count : 7 ,
waiting_rpc_table : BTreeMap ::new ( ) ,
worker_join_handles : Vec ::new ( ) ,
}
}
pub fn new ( network_manager : NetworkManager ) -> Self {
Self {
crypto : network_manager . crypto ( ) ,
config : network_manager . config ( ) ,
2021-12-27 16:31:31 +00:00
default_peer_scope : if ! network_manager
. config ( )
. get ( )
. network
. enable_local_peer_scope
{
PeerScope ::Global
} else {
PeerScope ::All
} ,
2021-11-22 16:28:30 +00:00
inner : Arc ::new ( Mutex ::new ( Self ::new_inner ( network_manager ) ) ) ,
}
}
pub fn network_manager ( & self ) -> NetworkManager {
self . inner . lock ( ) . network_manager . clone ( )
}
pub fn routing_table ( & self ) -> RoutingTable {
self . inner . lock ( ) . routing_table . clone ( )
}
pub fn node_id ( & self ) -> key ::DHTKey {
self . inner . lock ( ) . node_id
}
pub fn node_id_secret ( & self ) -> key ::DHTKeySecret {
self . inner . lock ( ) . node_id_secret
}
//////////////////////////////////////////////////////////////////////
fn get_next_op_id ( & self ) -> OperationId {
get_random_u64 ( )
}
2021-12-27 16:31:31 +00:00
fn filter_peer_scope ( & self , peer_info : & PeerInfo ) -> bool {
// reject attempts to include non-public addresses in results
if self . default_peer_scope = = PeerScope ::Global {
2022-04-08 14:17:09 +00:00
for di in & peer_info . node_info . dial_infos {
if ! di . is_global ( ) {
// non-public address causes rejection
return false ;
}
}
for di in & peer_info . node_info . relay_dial_infos {
2021-12-27 16:31:31 +00:00
if ! di . is_global ( ) {
// non-public address causes rejection
return false ;
}
}
}
true
}
2021-11-22 16:28:30 +00:00
//////////////////////////////////////////////////////////////////////
2022-03-27 01:25:24 +00:00
// Search the DHT for a single node closest to a key and add it to the routing table and return the node reference
2021-11-22 16:28:30 +00:00
pub async fn search_dht_single_key (
& self ,
node_id : key ::DHTKey ,
_count : u32 ,
_fanout : u32 ,
_timeout : Option < u64 > ,
) -> Result < NodeRef , RPCError > {
let routing_table = self . routing_table ( ) ;
// xxx find node but stop if we find the exact node we want
// xxx return whatever node is closest after the timeout
2021-12-18 00:18:25 +00:00
Err ( rpc_error_unimplemented ( " search_dht_single_key " ) ) . map_err ( logthru_rpc! ( error ) )
2021-11-22 16:28:30 +00:00
}
// Search the DHT for the 'count' closest nodes to a key, adding them all to the routing table if they are not there and returning their node references
pub async fn search_dht_multi_key (
& self ,
_node_id : key ::DHTKey ,
_count : u32 ,
_fanout : u32 ,
_timeout : Option < u64 > ,
) -> Result < Vec < NodeRef > , RPCError > {
// xxx return closest nodes after the timeout
2021-12-18 00:18:25 +00:00
Err ( rpc_error_unimplemented ( " search_dht_multi_key " ) ) . map_err ( logthru_rpc! ( error ) )
2021-11-22 16:28:30 +00:00
}
// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference
2022-03-27 01:25:24 +00:00
// Note: This routine can possible be recursive, hence the SystemPinBoxFuture async form
pub fn resolve_node (
& self ,
node_id : key ::DHTKey ,
) -> SystemPinBoxFuture < Result < NodeRef , RPCError > > {
let this = self . clone ( ) ;
Box ::pin ( async move {
let routing_table = this . routing_table ( ) ;
// First see if we have the node in our routing table already
if let Some ( nr ) = routing_table . lookup_node_ref ( node_id ) {
// ensure we have dial_info for the entry already,
// if not, we should do the find_node anyway
2022-04-08 14:17:09 +00:00
if ! nr . has_dial_info ( ) {
2022-03-27 01:25:24 +00:00
return Ok ( nr ) ;
}
}
2021-11-22 16:28:30 +00:00
2022-03-27 01:25:24 +00:00
// If nobody knows where this node is, ask the DHT for it
let ( count , fanout , timeout ) = {
let c = this . config . get ( ) ;
(
c . network . dht . resolve_node_count ,
c . network . dht . resolve_node_fanout ,
c . network . dht . resolve_node_timeout_ms . map ( ms_to_us ) ,
)
} ;
let nr = this
. search_dht_single_key ( node_id , count , fanout , timeout )
. await ? ;
2021-11-22 16:28:30 +00:00
2022-03-27 01:25:24 +00:00
if nr . node_id ( ) ! = node_id {
// found a close node, but not exact within our configured resolve_node timeout
return Err ( RPCError ::Timeout ) . map_err ( logthru_rpc! ( ) ) ;
}
Ok ( nr )
} )
2021-11-22 16:28:30 +00:00
}
// set up wait for reply
fn add_op_id_waiter ( & self , op_id : OperationId ) -> EventualValue < RPCMessageReader > {
let mut inner = self . inner . lock ( ) ;
let e = EventualValue ::new ( ) ;
inner . waiting_rpc_table . insert ( op_id , e . clone ( ) ) ;
e
}
// remove wait for reply
fn cancel_op_id_waiter ( & self , op_id : OperationId ) {
let mut inner = self . inner . lock ( ) ;
inner . waiting_rpc_table . remove ( & op_id ) ;
}
// complete the reply
async fn complete_op_id_waiter (
& self ,
op_id : OperationId ,
rpcreader : RPCMessageReader ,
) -> Result < ( ) , RPCError > {
let eventual = {
let mut inner = self . inner . lock ( ) ;
2021-11-27 17:44:21 +00:00
inner
. waiting_rpc_table
. remove ( & op_id )
. ok_or_else ( | | rpc_error_internal ( " Unmatched operation id " ) ) ?
2021-11-22 16:28:30 +00:00
} ;
2021-11-27 17:44:21 +00:00
eventual . resolve ( rpcreader ) . await ;
Ok ( ( ) )
2021-11-22 16:28:30 +00:00
}
// wait for reply
async fn do_wait_for_reply (
& self ,
waitable_reply : & WaitableReply ,
) -> Result < ( RPCMessageReader , u64 ) , RPCError > {
let timeout_ms = u32 ::try_from ( waitable_reply . timeout / 1000 u64 )
. map_err ( map_error_internal! ( " invalid timeout " ) ) ? ;
// wait for eventualvalue
let start_ts = get_timestamp ( ) ;
timeout ( timeout_ms , waitable_reply . eventual . instance ( ) )
. await
. map_err ( | _ | RPCError ::Timeout ) ? ;
match waitable_reply . eventual . take_value ( ) {
None = > panic! ( " there should be a reply value but there wasn't " ) ,
Some ( rpcreader ) = > {
let end_ts = get_timestamp ( ) ;
Ok ( ( rpcreader , end_ts - start_ts ) )
}
}
}
async fn wait_for_reply (
& self ,
waitable_reply : WaitableReply ,
) -> Result < ( RPCMessageReader , u64 ) , RPCError > {
let out = self . do_wait_for_reply ( & waitable_reply ) . await ;
match & out {
Err ( _ ) = > {
self . cancel_op_id_waiter ( waitable_reply . op_id ) ;
2021-11-26 14:54:38 +00:00
if waitable_reply . is_ping {
self . routing_table ( )
2022-03-20 14:52:03 +00:00
. stats_ping_lost ( waitable_reply . node_ref . clone ( ) , waitable_reply . send_ts ) ;
2021-11-26 14:54:38 +00:00
} else {
2022-03-20 14:52:03 +00:00
self . routing_table ( ) . stats_question_lost (
waitable_reply . node_ref . clone ( ) ,
waitable_reply . send_ts ,
) ;
2021-11-26 14:54:38 +00:00
}
2021-11-22 16:28:30 +00:00
}
Ok ( ( rpcreader , _ ) ) = > {
2022-03-25 02:07:55 +00:00
// Note that we definitely received this peer info since we got a reply
waitable_reply . node_ref . set_seen_our_dial_info ( ) ;
2021-11-22 16:28:30 +00:00
// Reply received
let recv_ts = get_timestamp ( ) ;
2021-11-26 14:54:38 +00:00
if waitable_reply . is_ping {
2022-03-20 14:52:03 +00:00
self . routing_table ( ) . stats_pong_rcvd (
2021-11-26 14:54:38 +00:00
waitable_reply . node_ref ,
waitable_reply . send_ts ,
recv_ts ,
rpcreader . header . body_len ,
)
} else {
2022-03-20 14:52:03 +00:00
self . routing_table ( ) . stats_answer_rcvd (
2021-11-26 14:54:38 +00:00
waitable_reply . node_ref ,
waitable_reply . send_ts ,
recv_ts ,
rpcreader . header . body_len ,
)
}
2021-11-22 16:28:30 +00:00
}
} ;
out
}
// Issue a request over the network, possibly using an anonymized route
// If the request doesn't want a reply, returns immediately
// If the request wants a reply then it waits for one asynchronously
// If it doesn't receive a response in a sufficient time, then it returns a timeout error
async fn request < T : capnp ::message ::ReaderSegments > (
& self ,
dest : Destination ,
message : capnp ::message ::Reader < T > ,
safety_route_spec : Option < & SafetyRouteSpec > ,
) -> Result < Option < WaitableReply > , RPCError > {
2021-12-17 02:57:28 +00:00
log_rpc! ( self . get_rpc_request_debug_info ( & dest , & message , & safety_route_spec ) ) ;
2021-11-22 16:28:30 +00:00
let ( op_id , wants_answer , is_ping ) = {
let operation = message
. get_root ::< veilid_capnp ::operation ::Reader > ( )
2022-01-05 17:01:02 +00:00
. map_err ( map_error_internal! ( " invalid operation " ) )
. map_err ( logthru_rpc! ( error ) ) ? ;
2021-11-22 16:28:30 +00:00
let op_id = operation . get_op_id ( ) ;
2022-01-05 17:01:02 +00:00
let wants_answer = self . wants_answer ( & operation ) . map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
let is_ping = operation . get_detail ( ) . has_info_q ( ) ;
2021-12-17 02:57:28 +00:00
2021-11-22 16:28:30 +00:00
( op_id , wants_answer , is_ping )
} ;
let out_node_id ;
let mut out_noderef : Option < NodeRef > = None ;
let hopcount : usize ;
let out = {
let out ;
// To where are we sending the request
match dest {
Destination ::Direct ( node_ref ) = > {
// Send to a node without a private route
// --------------------------------------
match safety_route_spec {
None = > {
// If no safety route is being used, and we're not sending to a private
// route, we can use a direct envelope instead of routing
out = reader_to_vec ( message ) ? ;
// Message goes directly to the node
out_node_id = node_ref . node_id ( ) ;
out_noderef = Some ( node_ref ) ;
hopcount = 1 ;
}
Some ( sr ) = > {
// No private route was specified for the request
// but we are using a safety route, so we must create an empty private route
let mut pr_builder = ::capnp ::message ::Builder ::new_default ( ) ;
let private_route =
self . new_stub_private_route ( node_ref . node_id ( ) , & mut pr_builder ) ? ;
let message_vec = reader_to_vec ( message ) ? ;
// first
out_node_id = sr
. hops
. first ( )
2021-11-27 17:44:21 +00:00
. ok_or_else ( | | rpc_error_internal ( " no hop in safety route " ) ) ?
2021-11-22 16:28:30 +00:00
. dial_info
. node_id
. key ;
2021-11-27 17:44:21 +00:00
out = self . wrap_with_route ( Some ( sr ) , private_route , message_vec ) ? ;
2021-11-22 16:28:30 +00:00
hopcount = 1 + sr . hops . len ( ) ;
}
} ;
}
Destination ::PrivateRoute ( private_route ) = > {
// Send to private route
// ---------------------
// Encode the private route
let mut pr_msg_builder = ::capnp ::message ::Builder ::new_default ( ) ;
let mut pr_builder =
pr_msg_builder . init_root ::< veilid_capnp ::private_route ::Builder > ( ) ;
encode_private_route ( & private_route , & mut pr_builder ) ? ;
let pr_reader = pr_builder . into_reader ( ) ;
// Reply with 'route' operation
let message_vec = reader_to_vec ( message ) ? ;
out_node_id = match safety_route_spec {
None = > {
// If no safety route, the first node is the first hop of the private route
hopcount = private_route . hop_count as usize ;
let out_node_id = match private_route . hops {
Some ( rh ) = > rh . dial_info . node_id . key ,
_ = > return Err ( rpc_error_internal ( " private route has no hops " ) ) ,
} ;
out = self . wrap_with_route ( None , pr_reader , message_vec ) ? ;
out_node_id
}
Some ( sr ) = > {
// If safety route is in use, first node is the first hop of the safety route
hopcount = 1 + sr . hops . len ( ) + ( private_route . hop_count as usize ) ;
let out_node_id = sr
. hops
. first ( )
2021-11-27 17:44:21 +00:00
. ok_or_else ( | | rpc_error_internal ( " no hop in safety route " ) ) ?
2021-11-22 16:28:30 +00:00
. dial_info
. node_id
. key ;
2021-11-27 17:44:21 +00:00
out = self . wrap_with_route ( Some ( sr ) , pr_reader , message_vec ) ? ;
2021-11-22 16:28:30 +00:00
out_node_id
}
}
}
}
out
} ;
// Verify hop count isn't larger than out maximum routed hop count
if hopcount > self . inner . lock ( ) . max_route_hop_count {
2022-01-05 17:01:02 +00:00
return Err ( rpc_error_internal ( " hop count too long for route " ) )
. map_err ( logthru_rpc! ( warn ) ) ;
2021-11-22 16:28:30 +00:00
}
// calculate actual timeout
// timeout is number of hops times the timeout per hop
let timeout = self . inner . lock ( ) . timeout * ( hopcount as u64 ) ;
// if we need to resolve the first hop, do it
let node_ref = match out_noderef {
None = > {
// resolve node
2022-04-03 16:58:06 +00:00
self . resolve_node ( out_node_id )
2021-12-17 02:57:28 +00:00
. await
. map_err ( logthru_rpc! ( error ) ) ?
2021-11-22 16:28:30 +00:00
}
Some ( nr ) = > {
// got the node in the routing table already
nr
}
} ;
// set up op id eventual
let eventual = if wants_answer {
Some ( self . add_op_id_waiter ( op_id ) )
} else {
None
} ;
// send question
let bytes = out . len ( ) as u64 ;
if let Err ( e ) = self
. network_manager ( )
. send_envelope ( node_ref . clone ( ) , out )
. await
2021-12-17 02:57:28 +00:00
. map_err ( logthru_rpc! ( error ) )
2021-11-27 17:44:21 +00:00
. map_err ( RPCError ::Internal )
2021-11-22 16:28:30 +00:00
{
// Make sure to clean up op id waiter in case of error
if eventual . is_some ( ) {
self . cancel_op_id_waiter ( op_id ) ;
}
return Err ( e ) ;
}
// Successfully sent
let send_ts = get_timestamp ( ) ;
2021-11-26 14:54:38 +00:00
if is_ping {
self . routing_table ( )
2022-03-20 14:52:03 +00:00
. stats_ping_sent ( node_ref . clone ( ) , send_ts , bytes ) ;
2021-11-26 14:54:38 +00:00
} else {
self . routing_table ( )
2022-03-20 14:52:03 +00:00
. stats_question_sent ( node_ref . clone ( ) , send_ts , bytes ) ;
2021-11-26 14:54:38 +00:00
}
2021-11-22 16:28:30 +00:00
// Pass back waitable reply completion
match eventual {
None = > {
// if we don't want an answer, don't wait for one
Ok ( None )
}
2021-11-26 14:54:38 +00:00
Some ( eventual ) = > Ok ( Some ( WaitableReply {
op_id ,
eventual ,
timeout ,
node_ref ,
send_ts ,
is_ping ,
2021-11-22 16:28:30 +00:00
} ) ) ,
}
}
// Issue a reply over the network, possibly using an anonymized route
// If the request doesn't want a reply, this routine does nothing
async fn reply < T : capnp ::message ::ReaderSegments > (
& self ,
request_rpcreader : RPCMessageReader ,
reply_msg : capnp ::message ::Reader < T > ,
safety_route_spec : Option < & SafetyRouteSpec > ,
) -> Result < ( ) , RPCError > {
2021-12-17 02:57:28 +00:00
log_rpc! ( self . get_rpc_reply_debug_info ( & request_rpcreader , & reply_msg , & safety_route_spec ) ) ;
2021-11-22 16:28:30 +00:00
//
let out_node_id ;
let mut out_noderef : Option < NodeRef > = None ;
let is_pong = {
let operation = reply_msg
. get_root ::< veilid_capnp ::operation ::Reader > ( )
. map_err ( map_error_internal! ( " invalid operation " ) ) ? ;
operation . get_detail ( ) . has_info_a ( )
} ;
let out = {
let out ;
let request_operation = request_rpcreader
. reader
. get_root ::< veilid_capnp ::operation ::Reader > ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
let reply_vec = reader_to_vec ( reply_msg ) ? ;
// To where should we respond?
match request_operation
. get_respond_to ( )
. which ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_internal! ( " invalid request operation " ) )
. map_err ( logthru_rpc! ( ) ) ?
2021-11-22 16:28:30 +00:00
{
veilid_capnp ::operation ::respond_to ::None ( _ ) = > {
// Do not respond
// --------------
2021-12-18 00:18:25 +00:00
return Err ( rpc_error_internal ( " no response requested " ) )
. map_err ( logthru_rpc! ( ) ) ;
2021-11-22 16:28:30 +00:00
}
veilid_capnp ::operation ::respond_to ::Sender ( _ ) = > {
// Respond to envelope source node, possibly through a relay if the request arrived that way
// -------------------------------
match safety_route_spec {
None = > {
// If no safety route is being used, and we're not replying to a private
// route, we can use a direct envelope instead of routing
out = reply_vec ;
// Reply directly to the request's source
out_node_id = request_rpcreader . header . envelope . get_sender_id ( ) ;
// This may be a different node's reference than the 'sender' in the case of a relay
// But in that case replies to inbound requests are returned through the inbound relay anyway
out_noderef = Some ( request_rpcreader . header . peer_noderef . clone ( ) ) ;
}
Some ( sr ) = > {
// No private route was specified for the return
// but we are using a safety route, so we must create an empty private route
let mut pr_builder = ::capnp ::message ::Builder ::new_default ( ) ;
2021-12-18 00:18:25 +00:00
let private_route = self
. new_stub_private_route (
request_rpcreader . header . envelope . get_sender_id ( ) ,
& mut pr_builder ,
)
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
out = self . wrap_with_route ( Some ( sr ) , private_route , reply_vec ) ? ;
// first
out_node_id = sr
. hops
. first ( )
2021-12-18 00:18:25 +00:00
. ok_or_else ( | | rpc_error_internal ( " no hop in safety route " ) )
. map_err ( logthru_rpc! ( ) ) ?
2021-11-22 16:28:30 +00:00
. dial_info
. node_id
. key ;
}
} ;
}
veilid_capnp ::operation ::respond_to ::PrivateRoute ( pr ) = > {
// Respond to private route
// ------------------------
// Extract private route for reply
let private_route = match pr {
Ok ( v ) = > v ,
2021-12-18 00:18:25 +00:00
Err ( _ ) = > {
return Err ( rpc_error_internal ( " invalid private route " ) )
. map_err ( logthru_rpc! ( ) )
}
2021-11-22 16:28:30 +00:00
} ;
// Reply with 'route' operation
2021-11-27 17:44:21 +00:00
out = self . wrap_with_route ( safety_route_spec , private_route , reply_vec ) ? ;
2021-11-22 16:28:30 +00:00
out_node_id = match safety_route_spec {
None = > {
// If no safety route, the first node is the first hop of the private route
if ! private_route . has_first_hop ( ) {
2021-12-18 00:18:25 +00:00
return Err ( rpc_error_internal ( " private route has no hops " ) )
. map_err ( logthru_rpc! ( ) ) ;
2021-11-22 16:28:30 +00:00
}
let hop = private_route
. get_first_hop ( )
. map_err ( map_error_internal! ( " not a valid first hop " ) ) ? ;
decode_public_key (
& hop . get_dial_info ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_internal! ( " not a valid dial info " ) )
. map_err ( logthru_rpc! ( ) ) ?
2021-11-22 16:28:30 +00:00
. get_node_id ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_internal! ( " not a valid node id " ) )
. map_err ( logthru_rpc! ( ) ) ? ,
2021-11-22 16:28:30 +00:00
)
}
Some ( sr ) = > {
// If safety route is in use, first node is the first hop of the safety route
sr . hops
. first ( )
2021-12-18 00:18:25 +00:00
. ok_or_else ( | | rpc_error_internal ( " no hop in safety route " ) )
. map_err ( logthru_rpc! ( ) ) ?
2021-11-22 16:28:30 +00:00
. dial_info
. node_id
. key
}
}
}
}
out
} ;
// if we need to resolve the first hop, do it
let node_ref = match out_noderef {
None = > {
// resolve node
2022-04-03 16:58:06 +00:00
self . resolve_node ( out_node_id ) . await ?
2021-11-22 16:28:30 +00:00
}
Some ( nr ) = > {
// got the node in the routing table already
nr
}
} ;
// Send the reply
let bytes = out . len ( ) as u64 ;
self . network_manager ( )
. send_envelope ( node_ref . clone ( ) , out )
. await
2021-11-27 17:44:21 +00:00
. map_err ( RPCError ::Internal ) ? ;
2021-11-22 16:28:30 +00:00
// Reply successfully sent
let send_ts = get_timestamp ( ) ;
2021-11-26 14:54:38 +00:00
if is_pong {
2022-03-20 14:52:03 +00:00
self . routing_table ( )
. stats_pong_sent ( node_ref , send_ts , bytes ) ;
2021-11-26 14:54:38 +00:00
} else {
2022-03-20 14:52:03 +00:00
self . routing_table ( )
. stats_answer_sent ( node_ref , send_ts , bytes ) ;
2021-11-26 14:54:38 +00:00
}
2021-11-22 16:28:30 +00:00
Ok ( ( ) )
}
2022-04-03 16:58:06 +00:00
fn wants_answer ( & self , operation : & veilid_capnp ::operation ::Reader ) -> Result < bool , RPCError > {
match operation . get_respond_to ( ) . which ( ) {
2021-11-22 16:28:30 +00:00
Ok ( veilid_capnp ::operation ::respond_to ::None ( _ ) ) = > Ok ( false ) ,
Ok ( veilid_capnp ::operation ::respond_to ::Sender ( _ ) ) = > Ok ( true ) ,
Ok ( veilid_capnp ::operation ::respond_to ::PrivateRoute ( _ ) ) = > Ok ( true ) ,
_ = > Err ( rpc_error_internal ( " Unknown respond_to " ) ) ,
}
}
2022-04-03 16:58:06 +00:00
fn get_respond_to_sender_dial_info (
& self ,
operation : & veilid_capnp ::operation ::Reader ,
) -> Result < Option < DialInfo > , RPCError > {
if let veilid_capnp ::operation ::respond_to ::Sender ( Ok ( sender_di_reader ) ) = operation
. get_respond_to ( )
. which ( )
. map_err ( map_error_capnp_notinschema! ( ) ) ?
{
// Sender DialInfo was specified, update our routing table with it
Ok ( Some ( decode_dial_info ( & sender_di_reader ) ? ) )
2021-12-24 01:34:52 +00:00
} else {
2022-04-03 16:58:06 +00:00
Ok ( None )
2021-11-22 16:28:30 +00:00
}
}
//////////////////////////////////////////////////////////////////////
fn generate_sender_info ( & self , rpcreader : & RPCMessageReader ) -> SenderInfo {
2022-01-04 14:53:30 +00:00
let socket_address = rpcreader
. header
. peer_noderef
. operate ( | entry | entry . last_connection ( ) . map ( | c | c . remote . socket_address ) ) ;
2021-11-26 14:54:38 +00:00
SenderInfo { socket_address }
2021-11-22 16:28:30 +00:00
}
async fn process_info_q ( & self , rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
//
let reply_msg = {
let operation = rpcreader
. reader
. get_root ::< veilid_capnp ::operation ::Reader > ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
// Don't bother unless we are going to answer
if ! self . wants_answer ( & operation ) ? {
return Ok ( ( ) ) ;
}
2022-04-03 16:58:06 +00:00
// get InfoQ reader
let iq_reader = match operation . get_detail ( ) . which ( ) {
Ok ( veilid_capnp ::operation ::detail ::Which ::InfoQ ( Ok ( x ) ) ) = > x ,
_ = > panic! ( " invalid operation type in process_info_q " ) ,
} ;
// Parse out fields
let node_info = decode_node_info (
& iq_reader
. get_node_info ( )
. map_err ( map_error_internal! ( " no valid node info " ) ) ? ,
) ? ;
// add node information for the requesting node to our routing table
if let Some ( sender_nr ) = rpcreader . opt_sender_nr . clone ( ) {
// Update latest node info in routing table for the infoq sender
sender_nr . operate ( | e | {
e . update_node_info ( node_info ) ;
} ) ;
}
2021-11-22 16:28:30 +00:00
// Send info answer
let mut reply_msg = ::capnp ::message ::Builder ::new_default ( ) ;
let mut answer = reply_msg . init_root ::< veilid_capnp ::operation ::Builder > ( ) ;
answer . set_op_id ( operation . get_op_id ( ) ) ;
let mut respond_to = answer . reborrow ( ) . init_respond_to ( ) ;
respond_to . set_none ( ( ) ) ;
let detail = answer . reborrow ( ) . init_detail ( ) ;
let mut info_a = detail . init_info_a ( ) ;
// Add node info
2022-04-03 16:58:06 +00:00
let node_info = self . network_manager ( ) . generate_node_info ( ) ;
2021-11-22 16:28:30 +00:00
let mut nib = info_a . reborrow ( ) . init_node_info ( ) ;
encode_node_info ( & node_info , & mut nib ) ? ;
// Add sender info
let sender_info = self . generate_sender_info ( & rpcreader ) ;
let mut sib = info_a . reborrow ( ) . init_sender_info ( ) ;
encode_sender_info ( & sender_info , & mut sib ) ? ;
reply_msg . into_reader ( )
} ;
self . reply ( rpcreader , reply_msg , None ) . await
}
async fn process_validate_dial_info (
& self ,
rpcreader : RPCMessageReader ,
) -> Result < ( ) , RPCError > {
//
let ( alternate_port , redirect , dial_info , rcpt_data ) = {
let operation = rpcreader
. reader
. get_root ::< veilid_capnp ::operation ::Reader > ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
// This should never want an answer
if self . wants_answer ( & operation ) ? {
return Err ( RPCError ::InvalidFormat ) ;
}
// get validateDialInfo reader
let vdi_reader = match operation . get_detail ( ) . which ( ) {
Ok ( veilid_capnp ::operation ::detail ::Which ::ValidateDialInfo ( Ok ( x ) ) ) = > x ,
_ = > panic! ( " invalid operation type in process_validate_dial_info " ) ,
} ;
// Parse out fields
let alternate_port = vdi_reader . get_alternate_port ( ) ;
let redirect = vdi_reader . get_redirect ( ) ;
let dial_info = decode_dial_info (
& vdi_reader
. get_dial_info ( )
. map_err ( map_error_internal! ( " no valid dial info " ) ) ? ,
) ? ;
let rcpt_data = vdi_reader
. get_receipt ( )
. map_err ( map_error_internal! ( " no valid receipt " ) ) ? ;
( alternate_port , redirect , dial_info , rcpt_data )
} ;
// Redirect this request if we are asked to
if redirect {
let routing_table = self . routing_table ( ) ;
2021-12-24 01:34:52 +00:00
let filter = dial_info . make_filter ( true ) ;
2021-12-24 23:02:53 +00:00
let peers = routing_table . find_fast_nodes_filtered ( & filter ) ;
2021-11-27 17:44:21 +00:00
if peers . is_empty ( ) {
2021-11-22 16:28:30 +00:00
return Err ( rpc_error_internal ( format! (
2021-12-24 01:34:52 +00:00
" no peers matching filter '{:?}' " ,
filter
2021-11-22 16:28:30 +00:00
) ) ) ;
}
for peer in peers {
2021-12-24 01:34:52 +00:00
// See if this peer will validate dial info
2021-12-24 23:02:53 +00:00
let will_validate_dial_info = peer . operate ( | e : & mut BucketEntry | {
2021-12-24 01:34:52 +00:00
if let Some ( ni ) = & e . peer_stats ( ) . node_info {
ni . will_validate_dial_info
} else {
true
}
2021-12-24 23:02:53 +00:00
} ) ;
if ! will_validate_dial_info {
2021-12-24 01:34:52 +00:00
continue ;
}
2021-11-22 16:28:30 +00:00
// Make a copy of the request, without the redirect flag
let vdi_msg_reader = {
let mut vdi_msg = ::capnp ::message ::Builder ::new_default ( ) ;
let mut question = vdi_msg . init_root ::< veilid_capnp ::operation ::Builder > ( ) ;
question . set_op_id ( self . get_next_op_id ( ) ) ;
let mut respond_to = question . reborrow ( ) . init_respond_to ( ) ;
respond_to . set_none ( ( ) ) ;
let detail = question . reborrow ( ) . init_detail ( ) ;
let mut vdi_builder = detail . init_validate_dial_info ( ) ;
vdi_builder . set_alternate_port ( alternate_port ) ;
vdi_builder . set_redirect ( false ) ;
let mut di_builder = vdi_builder . reborrow ( ) . init_dial_info ( ) ;
encode_dial_info ( & dial_info , & mut di_builder ) ? ;
let r_builder = vdi_builder . reborrow ( ) . init_receipt (
rcpt_data
. len ( )
. try_into ( )
. map_err ( map_error_internal! ( " receipt too large " ) ) ? ,
) ;
r_builder . copy_from_slice ( rcpt_data ) ;
vdi_msg . into_reader ( )
} ;
// Send the validate_dial_info request until we succeed
self . request ( Destination ::Direct ( peer . clone ( ) ) , vdi_msg_reader , None )
. await ? ;
}
return Ok ( ( ) ) ;
} ;
// Otherwise send a return receipt directly
// Possibly from an alternate port
let network_manager = self . network_manager ( ) ;
network_manager
2022-01-04 19:25:32 +00:00
. send_direct_receipt ( dial_info . clone ( ) , rcpt_data , alternate_port )
2021-11-22 16:28:30 +00:00
. await
2022-01-01 18:38:39 +00:00
. map_err ( map_error_string! ( ) )
. map_err (
logthru_net! ( error " failed to send direct receipt to dial info: {}, alternate_port={} " , dial_info , alternate_port ) ,
) ? ;
2021-11-22 16:28:30 +00:00
Ok ( ( ) )
}
async fn process_find_node_q ( & self , rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
//
let reply_msg = {
let operation = rpcreader
. reader
. get_root ::< veilid_capnp ::operation ::Reader > ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
// find_node must always want an answer
if ! self . wants_answer ( & operation ) ? {
2021-12-18 00:18:25 +00:00
return Err ( RPCError ::InvalidFormat ) . map_err ( logthru_rpc! ( ) ) ;
2021-11-22 16:28:30 +00:00
}
// get findNodeQ reader
let fnq_reader = match operation . get_detail ( ) . which ( ) {
Ok ( veilid_capnp ::operation ::detail ::Which ::FindNodeQ ( Ok ( x ) ) ) = > x ,
_ = > panic! ( " invalid operation type in process_find_node_q " ) ,
} ;
2022-03-27 01:25:24 +00:00
// get the node id we want to look up
2021-12-18 00:18:25 +00:00
let target_node_id = decode_public_key (
& fnq_reader
. get_node_id ( )
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ,
) ;
2022-03-27 01:25:24 +00:00
// get the peerinfo/dialinfos of the requesting node
let dil_reader = fnq_reader
. reborrow ( )
. get_dial_info_list ( )
. map_err ( map_error_capnp_error! ( ) ) ? ;
let mut dial_infos = Vec ::< DialInfo > ::with_capacity (
dil_reader
. len ( )
. try_into ( )
. map_err ( map_error_protocol! ( " too many dial infos " ) ) ? ,
) ;
for di in dil_reader . iter ( ) {
dial_infos . push ( decode_dial_info ( & di ) ? )
2021-11-22 16:28:30 +00:00
}
2022-03-27 01:25:24 +00:00
let peer_info = PeerInfo {
node_id : NodeId ::new ( rpcreader . header . envelope . get_sender_id ( ) ) ,
dial_infos ,
} ;
2021-11-22 16:28:30 +00:00
// filter out attempts to pass non-public addresses in for peers
2021-12-27 16:31:31 +00:00
if ! self . filter_peer_scope ( & peer_info ) {
return Err ( RPCError ::InvalidFormat ) ;
2021-11-22 16:28:30 +00:00
}
// add node information for the requesting node to our routing table
let routing_table = self . routing_table ( ) ;
2021-12-14 14:48:33 +00:00
let _requesting_node_ref = routing_table
2021-11-22 16:28:30 +00:00
. register_node_with_dial_info ( peer_info . node_id . key , & peer_info . dial_infos )
. map_err ( map_error_string! ( ) ) ? ;
// find N nodes closest to the target node in our routing table
2021-12-27 16:31:31 +00:00
let own_peer_info = routing_table . get_own_peer_info ( self . default_peer_scope ) ;
2021-11-22 16:28:30 +00:00
let closest_nodes = routing_table . find_closest_nodes (
target_node_id ,
// filter
None ,
// transform
2021-12-27 16:31:31 +00:00
| e | {
RoutingTable ::transform_to_peer_info ( e , self . default_peer_scope , & own_peer_info )
} ,
2021-11-22 16:28:30 +00:00
) ;
2021-12-18 00:18:25 +00:00
log_rpc! ( " >>>> Returning {} closest peers " , closest_nodes . len ( ) ) ;
2021-11-22 16:28:30 +00:00
// Send find_node answer
let mut reply_msg = ::capnp ::message ::Builder ::new_default ( ) ;
let mut answer = reply_msg . init_root ::< veilid_capnp ::operation ::Builder > ( ) ;
answer . set_op_id ( operation . get_op_id ( ) ) ;
let mut respond_to = answer . reborrow ( ) . init_respond_to ( ) ;
respond_to . set_none ( ( ) ) ;
let detail = answer . reborrow ( ) . init_detail ( ) ;
let info_a = detail . init_find_node_a ( ) ;
let mut peers_builder = info_a . init_peers (
closest_nodes
. len ( )
. try_into ( )
. map_err ( map_error_internal! ( " invalid closest nodes list length " ) ) ? ,
) ;
2021-11-27 17:44:21 +00:00
for ( i , closest_node ) in closest_nodes . iter ( ) . enumerate ( ) {
2021-11-22 16:28:30 +00:00
let mut pi_builder = peers_builder . reborrow ( ) . get ( i as u32 ) ;
2021-11-27 17:44:21 +00:00
encode_peer_info ( closest_node , & mut pi_builder ) ? ;
2021-11-22 16:28:30 +00:00
}
reply_msg . into_reader ( )
} ;
self . reply ( rpcreader , reply_msg , None ) . await
}
async fn process_route ( & self , _rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
// xxx do not process latency for routed messages
Err ( rpc_error_unimplemented ( " process_route " ) )
}
async fn process_get_value_q ( & self , _rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
Err ( rpc_error_unimplemented ( " process_get_value_q " ) )
}
async fn process_set_value_q ( & self , _rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
Err ( rpc_error_unimplemented ( " process_set_value_q " ) )
}
async fn process_watch_value_q ( & self , _rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
Err ( rpc_error_unimplemented ( " process_watch_value_q " ) )
}
async fn process_value_changed ( & self , _rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
Err ( rpc_error_unimplemented ( " process_value_changed " ) )
}
async fn process_supply_block_q ( & self , _rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
Err ( rpc_error_unimplemented ( " process_supply_block_q " ) )
}
async fn process_find_block_q ( & self , _rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
Err ( rpc_error_unimplemented ( " process_find_block_q " ) )
}
async fn process_signal_q ( & self , _rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
Err ( rpc_error_unimplemented ( " process_signal_q " ) )
}
async fn process_return_receipt ( & self , rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
let rcpt_data = {
let operation = rpcreader
. reader
. get_root ::< veilid_capnp ::operation ::Reader > ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
// This should never want an answer
if self . wants_answer ( & operation ) ? {
return Err ( RPCError ::InvalidFormat ) ;
}
// get returnReceipt reader
let rr_reader = match operation . get_detail ( ) . which ( ) {
Ok ( veilid_capnp ::operation ::detail ::Which ::ReturnReceipt ( Ok ( x ) ) ) = > x ,
_ = > panic! ( " invalid operation type in process_return_receipt " ) ,
} ;
// Get receipt data
let rcpt_data = rr_reader
. get_receipt ( )
. map_err ( map_error_internal! ( " no valid receipt " ) ) ? ;
rcpt_data . to_vec ( )
} ;
// Handle it
let network_manager = self . network_manager ( ) ;
network_manager
. process_receipt ( rcpt_data )
. await
. map_err ( map_error_string! ( ) )
}
async fn process_start_tunnel_q ( & self , _rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
Err ( rpc_error_unimplemented ( " process_start_tunnel_q " ) )
}
async fn process_complete_tunnel_q (
& self ,
_rpcreader : RPCMessageReader ,
) -> Result < ( ) , RPCError > {
Err ( rpc_error_unimplemented ( " process_complete_tunnel_q " ) )
}
async fn process_cancel_tunnel_q ( & self , _rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
Err ( rpc_error_unimplemented ( " process_cancel_tunnel_q " ) )
}
async fn process_answer ( & self , rpcreader : RPCMessageReader ) -> Result < ( ) , RPCError > {
// pass answer to the appropriate rpc waiter
let op_id = {
let operation = rpcreader
. reader
. get_root ::< veilid_capnp ::operation ::Reader > ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
operation . get_op_id ( )
} ;
Ok ( self . complete_op_id_waiter ( op_id , rpcreader ) . await ? )
}
//////////////////////////////////////////////////////////////////////
async fn process_rpc_message_version_0 ( & self , msg : RPCMessage ) -> Result < ( ) , RPCError > {
let reader = capnp ::message ::Reader ::new ( msg . data , Default ::default ( ) ) ;
2022-04-03 16:58:06 +00:00
let mut opt_sender_nr : Option < NodeRef > = None ;
2022-03-27 01:25:24 +00:00
let which = {
2022-04-03 16:58:06 +00:00
let operation = reader
2021-11-22 16:28:30 +00:00
. get_root ::< veilid_capnp ::operation ::Reader > ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
2022-03-27 01:25:24 +00:00
let ( which , is_q ) = match operation
2021-11-22 16:28:30 +00:00
. get_detail ( )
. which ( )
. map_err ( map_error_capnp_notinschema! ( ) ) ?
{
veilid_capnp ::operation ::detail ::InfoQ ( _ ) = > ( 0 u32 , true ) ,
veilid_capnp ::operation ::detail ::InfoA ( _ ) = > ( 1 u32 , false ) ,
veilid_capnp ::operation ::detail ::ValidateDialInfo ( _ ) = > ( 2 u32 , true ) ,
veilid_capnp ::operation ::detail ::FindNodeQ ( _ ) = > ( 3 u32 , true ) ,
veilid_capnp ::operation ::detail ::FindNodeA ( _ ) = > ( 4 u32 , false ) ,
veilid_capnp ::operation ::detail ::Route ( _ ) = > ( 5 u32 , true ) ,
veilid_capnp ::operation ::detail ::GetValueQ ( _ ) = > ( 6 u32 , true ) ,
veilid_capnp ::operation ::detail ::GetValueA ( _ ) = > ( 7 u32 , false ) ,
veilid_capnp ::operation ::detail ::SetValueQ ( _ ) = > ( 8 u32 , true ) ,
veilid_capnp ::operation ::detail ::SetValueA ( _ ) = > ( 9 u32 , false ) ,
veilid_capnp ::operation ::detail ::WatchValueQ ( _ ) = > ( 10 u32 , true ) ,
veilid_capnp ::operation ::detail ::WatchValueA ( _ ) = > ( 11 u32 , false ) ,
veilid_capnp ::operation ::detail ::ValueChanged ( _ ) = > ( 12 u32 , true ) ,
veilid_capnp ::operation ::detail ::SupplyBlockQ ( _ ) = > ( 13 u32 , true ) ,
veilid_capnp ::operation ::detail ::SupplyBlockA ( _ ) = > ( 14 u32 , false ) ,
veilid_capnp ::operation ::detail ::FindBlockQ ( _ ) = > ( 15 u32 , true ) ,
veilid_capnp ::operation ::detail ::FindBlockA ( _ ) = > ( 16 u32 , false ) ,
veilid_capnp ::operation ::detail ::SignalQ ( _ ) = > ( 17 u32 , true ) ,
veilid_capnp ::operation ::detail ::SignalA ( _ ) = > ( 18 u32 , false ) ,
veilid_capnp ::operation ::detail ::ReturnReceipt ( _ ) = > ( 19 u32 , true ) ,
veilid_capnp ::operation ::detail ::StartTunnelQ ( _ ) = > ( 20 u32 , true ) ,
veilid_capnp ::operation ::detail ::StartTunnelA ( _ ) = > ( 21 u32 , false ) ,
veilid_capnp ::operation ::detail ::CompleteTunnelQ ( _ ) = > ( 22 u32 , true ) ,
veilid_capnp ::operation ::detail ::CompleteTunnelA ( _ ) = > ( 23 u32 , false ) ,
veilid_capnp ::operation ::detail ::CancelTunnelQ ( _ ) = > ( 24 u32 , true ) ,
veilid_capnp ::operation ::detail ::CancelTunnelA ( _ ) = > ( 25 u32 , false ) ,
2022-03-27 01:25:24 +00:00
} ;
// Accounting for questions we receive
if is_q {
// See if we have some Sender DialInfo to incorporate
2022-04-03 16:58:06 +00:00
opt_sender_nr =
if let Some ( sender_di ) = self . get_respond_to_sender_dial_info ( & operation ) ? {
2022-03-27 01:25:24 +00:00
// Sender DialInfo was specified, update our routing table with it
let nr = self
. routing_table ( )
. update_node_with_single_dial_info (
2022-04-03 16:58:06 +00:00
msg . header . envelope . get_sender_id ( ) ,
2022-03-27 01:25:24 +00:00
& sender_di ,
)
. map_err ( RPCError ::Internal ) ? ;
Some ( nr )
} else {
2022-04-03 16:58:06 +00:00
// look up sender node, in case it's different than our peer due to relaying
2022-03-27 01:25:24 +00:00
self . routing_table ( )
2022-04-03 16:58:06 +00:00
. lookup_node_ref ( msg . header . envelope . get_sender_id ( ) )
2022-03-27 01:25:24 +00:00
} ;
2022-04-03 16:58:06 +00:00
if let Some ( sender_nr ) = opt_sender_nr . clone ( ) {
2022-03-27 01:25:24 +00:00
if which = = 0 u32 {
self . routing_table ( ) . stats_ping_rcvd (
sender_nr ,
2022-04-03 16:58:06 +00:00
msg . header . timestamp ,
msg . header . body_len ,
2022-03-27 01:25:24 +00:00
) ;
} else {
self . routing_table ( ) . stats_question_rcvd (
sender_nr ,
2022-04-03 16:58:06 +00:00
msg . header . timestamp ,
msg . header . body_len ,
2022-03-27 01:25:24 +00:00
) ;
}
2021-11-22 16:28:30 +00:00
}
2022-03-27 01:25:24 +00:00
} ;
which
2021-11-22 16:28:30 +00:00
} ;
2022-03-27 01:25:24 +00:00
2022-04-03 16:58:06 +00:00
let rpcreader = RPCMessageReader {
header : msg . header ,
reader ,
opt_sender_nr ,
} ;
2021-11-22 16:28:30 +00:00
match which {
0 = > self . process_info_q ( rpcreader ) . await , // InfoQ
1 = > self . process_answer ( rpcreader ) . await , // InfoA
2 = > self . process_validate_dial_info ( rpcreader ) . await , // ValidateDialInfo
3 = > self . process_find_node_q ( rpcreader ) . await , // FindNodeQ
4 = > self . process_answer ( rpcreader ) . await , // FindNodeA
5 = > self . process_route ( rpcreader ) . await , // Route
6 = > self . process_get_value_q ( rpcreader ) . await , // GetValueQ
7 = > self . process_answer ( rpcreader ) . await , // GetValueA
8 = > self . process_set_value_q ( rpcreader ) . await , // SetValueQ
9 = > self . process_answer ( rpcreader ) . await , // SetValueA
10 = > self . process_watch_value_q ( rpcreader ) . await , // WatchValueQ
11 = > self . process_answer ( rpcreader ) . await , // WatchValueA
12 = > self . process_value_changed ( rpcreader ) . await , // ValueChanged
13 = > self . process_supply_block_q ( rpcreader ) . await , // SupplyBlockQ
14 = > self . process_answer ( rpcreader ) . await , // SupplyBlockA
15 = > self . process_find_block_q ( rpcreader ) . await , // FindBlockQ
16 = > self . process_answer ( rpcreader ) . await , // FindBlockA
17 = > self . process_signal_q ( rpcreader ) . await , // SignalQ
18 = > self . process_answer ( rpcreader ) . await , // SignalA
19 = > self . process_return_receipt ( rpcreader ) . await , // ReturnReceipt
20 = > self . process_start_tunnel_q ( rpcreader ) . await , // StartTunnelQ
21 = > self . process_answer ( rpcreader ) . await , // StartTunnelA
22 = > self . process_complete_tunnel_q ( rpcreader ) . await , // CompleteTunnelQ
23 = > self . process_answer ( rpcreader ) . await , // CompleteTunnelA
24 = > self . process_cancel_tunnel_q ( rpcreader ) . await , // CancelTunnelQ
25 = > self . process_answer ( rpcreader ) . await , // CancelTunnelA
_ = > panic! ( " must update rpc table " ) ,
}
}
async fn process_rpc_message ( & self , msg : RPCMessage ) -> Result < ( ) , RPCError > {
if msg . header . envelope . get_version ( ) = = 0 {
self . process_rpc_message_version_0 ( msg ) . await
} else {
Err ( RPCError ::Internal ( format! (
" unsupported envelope version: {}, newest supported is version 0 " ,
msg . header . envelope . get_version ( )
) ) )
}
}
2022-03-10 15:18:47 +00:00
async fn rpc_worker ( self , receiver : flume ::Receiver < RPCMessage > ) {
while let Ok ( msg ) = receiver . recv_async ( ) . await {
2021-12-17 02:57:28 +00:00
let _ = self
. process_rpc_message ( msg )
. await
. map_err ( logthru_rpc! ( " couldn't process rpc message " ) ) ;
2021-11-22 16:28:30 +00:00
}
}
pub async fn startup ( & self ) -> Result < ( ) , String > {
2022-03-10 14:51:53 +00:00
trace! ( " startup rpc processor " ) ;
2021-11-22 16:28:30 +00:00
let mut inner = self . inner . lock ( ) ;
// make local copy of node id for easy access
let c = self . config . get ( ) ;
inner . node_id = c . network . node_id ;
inner . node_id_secret = c . network . node_id_secret ;
// set up channel
let mut concurrency = c . network . rpc . concurrency ;
let mut queue_size = c . network . rpc . queue_size ;
2022-01-27 14:53:01 +00:00
let mut timeout = ms_to_us ( c . network . rpc . timeout_ms ) ;
2021-11-22 16:28:30 +00:00
let mut max_route_hop_count = c . network . rpc . max_route_hop_count as usize ;
if concurrency = = 0 {
concurrency = get_concurrency ( ) / 2 ;
if concurrency = = 0 {
concurrency = 1 ;
}
}
if queue_size = = 0 {
queue_size = 1024 ;
}
if timeout = = 0 {
timeout = 10000000 ;
}
if max_route_hop_count = = 0 {
max_route_hop_count = 7 usize ;
}
inner . timeout = timeout ;
inner . max_route_hop_count = max_route_hop_count ;
2022-03-10 15:18:47 +00:00
let channel = flume ::bounded ( queue_size as usize ) ;
2021-12-17 02:57:28 +00:00
inner . send_channel = Some ( channel . 0. clone ( ) ) ;
2021-11-22 16:28:30 +00:00
// spin up N workers
trace! ( " Spinning up {} RPC workers " , concurrency ) ;
for _ in 0 .. concurrency {
let this = self . clone ( ) ;
let receiver = channel . 1. clone ( ) ;
let jh = spawn ( Self ::rpc_worker ( this , receiver ) ) ;
inner . worker_join_handles . push ( jh ) ;
}
Ok ( ( ) )
}
pub async fn shutdown ( & self ) {
* self . inner . lock ( ) = Self ::new_inner ( self . network_manager ( ) ) ;
}
2022-01-05 17:01:02 +00:00
pub fn enqueue_message (
2021-11-22 16:28:30 +00:00
& self ,
envelope : envelope ::Envelope ,
body : Vec < u8 > ,
peer_noderef : NodeRef ,
2021-12-17 02:57:28 +00:00
) -> Result < ( ) , String > {
2021-11-22 16:28:30 +00:00
let msg = RPCMessage {
header : RPCMessageHeader {
timestamp : get_timestamp ( ) ,
2021-11-26 14:54:38 +00:00
envelope ,
2021-11-22 16:28:30 +00:00
body_len : body . len ( ) as u64 ,
2021-11-26 14:54:38 +00:00
peer_noderef ,
2021-11-22 16:28:30 +00:00
} ,
data : RPCMessageData { contents : body } ,
} ;
let send_channel = {
let inner = self . inner . lock ( ) ;
2021-12-17 02:57:28 +00:00
inner . send_channel . as_ref ( ) . unwrap ( ) . clone ( )
2021-11-22 16:28:30 +00:00
} ;
2021-12-17 02:57:28 +00:00
send_channel
. try_send ( msg )
. map_err ( | e | format! ( " failed to enqueue received RPC message: {:?} " , e ) ) ? ;
2021-11-22 16:28:30 +00:00
Ok ( ( ) )
}
2022-03-25 02:07:55 +00:00
// Gets a 'RespondTo::Sender' that contains either our dial info,
// or None if the peer has seen our dial info before
pub fn get_respond_to_sender ( & self , peer : NodeRef ) -> RespondTo {
if peer . has_seen_our_dial_info ( ) {
RespondTo ::Sender ( None )
} else if let Some ( did ) = self
. routing_table ( )
. first_filtered_dial_info_detail ( peer . dial_info_filter ( ) )
{
2022-03-27 01:25:24 +00:00
RespondTo ::Sender ( Some ( did . dial_info ) )
2022-03-25 02:07:55 +00:00
} else {
RespondTo ::Sender ( None )
}
}
2021-11-22 16:28:30 +00:00
// Send InfoQ RPC request, receive InfoA answer
pub async fn rpc_call_info ( self , peer : NodeRef ) -> Result < InfoAnswer , RPCError > {
let info_q_msg = {
let mut info_q_msg = ::capnp ::message ::Builder ::new_default ( ) ;
let mut question = info_q_msg . init_root ::< veilid_capnp ::operation ::Builder > ( ) ;
question . set_op_id ( self . get_next_op_id ( ) ) ;
let mut respond_to = question . reborrow ( ) . init_respond_to ( ) ;
2022-03-25 02:07:55 +00:00
self . get_respond_to_sender ( peer . clone ( ) )
2022-03-27 01:25:24 +00:00
. encode ( & mut respond_to ) ? ;
2021-11-22 16:28:30 +00:00
let detail = question . reborrow ( ) . init_detail ( ) ;
2022-04-03 16:58:06 +00:00
let mut iqb = detail . init_info_q ( ) ;
let mut node_info_builder = iqb . reborrow ( ) . init_node_info ( ) ;
let node_info = self . network_manager ( ) . generate_node_info ( ) ;
encode_node_info ( & node_info , & mut node_info_builder ) ? ;
2021-11-22 16:28:30 +00:00
info_q_msg . into_reader ( )
} ;
// Send the info request
let waitable_reply = self
. request ( Destination ::Direct ( peer . clone ( ) ) , info_q_msg , None )
. await ?
. unwrap ( ) ;
// Wait for reply
let ( rpcreader , latency ) = self . wait_for_reply ( waitable_reply ) . await ? ;
let response_operation = rpcreader
. reader
. get_root ::< veilid_capnp ::operation ::Reader > ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
let info_a = match response_operation
. get_detail ( )
. which ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_notinschema! ( ) )
. map_err ( logthru_rpc! ( ) ) ?
2021-11-22 16:28:30 +00:00
{
veilid_capnp ::operation ::detail ::InfoA ( a ) = > {
a . map_err ( map_error_internal! ( " Invalid InfoA " ) ) ?
}
_ = > return Err ( rpc_error_internal ( " Incorrect RPC answer for question " ) ) ,
} ;
// Decode node info
if ! info_a . has_node_info ( ) {
return Err ( rpc_error_internal ( " Missing node info " ) ) ;
}
let nir = info_a
. get_node_info ( )
. map_err ( map_error_internal! ( " Broken node info " ) ) ? ;
let node_info = decode_node_info ( & nir ) ? ;
// Decode sender info
let sender_info = if info_a . has_sender_info ( ) {
let sir = info_a
. get_sender_info ( )
. map_err ( map_error_internal! ( " Broken sender info " ) ) ? ;
decode_sender_info ( & sir ) ?
} else {
SenderInfo ::default ( )
} ;
// Update latest node info in routing table
peer . operate ( | e | {
e . update_node_info ( node_info . clone ( ) ) ;
} ) ;
// Return the answer for anyone who may care
let out = InfoAnswer {
2021-11-26 14:54:38 +00:00
latency ,
node_info ,
sender_info ,
2021-11-22 16:28:30 +00:00
} ;
Ok ( out )
}
pub async fn rpc_call_validate_dial_info (
& self ,
peer : NodeRef ,
dial_info : DialInfo ,
redirect : bool ,
alternate_port : bool ,
) -> Result < bool , RPCError > {
let network_manager = self . network_manager ( ) ;
2022-01-27 14:53:01 +00:00
let receipt_time = ms_to_us (
self . config
. get ( )
. network
. dht
. validate_dial_info_receipt_time_ms ,
) ;
2021-11-22 16:28:30 +00:00
//
let ( vdi_msg , eventual_value ) = {
let mut vdi_msg = ::capnp ::message ::Builder ::new_default ( ) ;
let mut question = vdi_msg . init_root ::< veilid_capnp ::operation ::Builder > ( ) ;
question . set_op_id ( self . get_next_op_id ( ) ) ;
let mut respond_to = question . reborrow ( ) . init_respond_to ( ) ;
respond_to . set_none ( ( ) ) ;
let detail = question . reborrow ( ) . init_detail ( ) ;
let mut vdi_builder = detail . init_validate_dial_info ( ) ;
// Generate receipt and waitable eventual so we can see if we get the receipt back
let ( rcpt_data , eventual_value ) = network_manager
. generate_single_shot_receipt ( receipt_time , [ ] )
. map_err ( map_error_string! ( ) ) ? ;
vdi_builder . set_redirect ( redirect ) ;
vdi_builder . set_alternate_port ( alternate_port ) ;
let mut di_builder = vdi_builder . reborrow ( ) . init_dial_info ( ) ;
encode_dial_info ( & dial_info , & mut di_builder ) ? ;
let r_builder = vdi_builder . reborrow ( ) . init_receipt (
rcpt_data
. len ( )
. try_into ( )
. map_err ( map_error_internal! ( " receipt too large " ) ) ? ,
) ;
r_builder . copy_from_slice ( rcpt_data . as_slice ( ) ) ;
( vdi_msg . into_reader ( ) , eventual_value )
} ;
// Send the validate_dial_info request
self . request ( Destination ::Direct ( peer . clone ( ) ) , vdi_msg , None )
. await ? ;
// Wait for receipt
match eventual_value . await {
2021-11-27 17:44:21 +00:00
ReceiptEvent ::Returned = > Ok ( true ) ,
ReceiptEvent ::Expired = > Ok ( false ) ,
2022-03-13 16:45:36 +00:00
ReceiptEvent ::Cancelled = > {
Err ( rpc_error_internal ( " receipt was dropped before expiration " ) )
}
2021-11-22 16:28:30 +00:00
}
}
// Send FindNodeQ RPC request, receive FindNodeA answer
pub async fn rpc_call_find_node (
self ,
dest : Destination ,
key : key ::DHTKey ,
safety_route : Option < & SafetyRouteSpec > ,
respond_to : RespondTo ,
) -> Result < FindNodeAnswer , RPCError > {
let find_node_q_msg = {
let mut find_node_q_msg = ::capnp ::message ::Builder ::new_default ( ) ;
let mut question = find_node_q_msg . init_root ::< veilid_capnp ::operation ::Builder > ( ) ;
question . set_op_id ( self . get_next_op_id ( ) ) ;
let mut respond_to_builder = question . reborrow ( ) . init_respond_to ( ) ;
respond_to . encode ( & mut respond_to_builder ) ? ;
let detail = question . reborrow ( ) . init_detail ( ) ;
let mut fnq = detail . init_find_node_q ( ) ;
let mut node_id_builder = fnq . reborrow ( ) . init_node_id ( ) ;
encode_public_key ( & key , & mut node_id_builder ) ? ;
2021-12-27 16:31:31 +00:00
let own_peer_info = self
. routing_table ( )
. get_own_peer_info ( self . default_peer_scope ) ;
2021-11-22 16:28:30 +00:00
2022-03-27 01:25:24 +00:00
let mut dil_builder = fnq . reborrow ( ) . init_dial_info_list (
own_peer_info
. dial_infos
. len ( )
. try_into ( )
. map_err ( map_error_internal! ( " too many dial infos in peer info " ) ) ? ,
) ;
for idx in 0 .. own_peer_info . dial_infos . len ( ) {
let mut di_builder = dil_builder . reborrow ( ) . get ( idx as u32 ) ;
encode_dial_info ( & own_peer_info . dial_infos [ idx ] , & mut di_builder ) ? ;
}
2021-11-22 16:28:30 +00:00
find_node_q_msg . into_reader ( )
} ;
// Send the find_node request
let waitable_reply = self
. request ( dest , find_node_q_msg , safety_route )
. await ?
. unwrap ( ) ;
// Wait for reply
let ( rpcreader , latency ) = self . wait_for_reply ( waitable_reply ) . await ? ;
let response_operation = rpcreader
. reader
. get_root ::< veilid_capnp ::operation ::Reader > ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_error! ( ) )
. map_err ( logthru_rpc! ( ) ) ? ;
2021-11-22 16:28:30 +00:00
let find_node_a = match response_operation
. get_detail ( )
. which ( )
2021-12-18 00:18:25 +00:00
. map_err ( map_error_capnp_notinschema! ( ) )
. map_err ( logthru_rpc! ( ) ) ?
2021-11-22 16:28:30 +00:00
{
veilid_capnp ::operation ::detail ::FindNodeA ( a ) = > {
a . map_err ( map_error_internal! ( " Invalid FindNodeA " ) ) ?
}
_ = > return Err ( rpc_error_internal ( " Incorrect RPC answer for question " ) ) ,
} ;
let peers_reader = find_node_a
. get_peers ( )
. map_err ( map_error_internal! ( " Missing peers " ) ) ? ;
2021-11-26 14:54:38 +00:00
let mut peers = Vec ::< PeerInfo > ::with_capacity (
2021-11-22 16:28:30 +00:00
peers_reader
. len ( )
. try_into ( )
. map_err ( map_error_internal! ( " too many peers " ) ) ? ,
) ;
for p in peers_reader . iter ( ) {
let peer_info = decode_peer_info ( & p ) ? ;
2021-12-27 16:31:31 +00:00
if ! self . filter_peer_scope ( & peer_info ) {
return Err ( RPCError ::InvalidFormat ) ;
2021-11-22 16:28:30 +00:00
}
2021-11-26 14:54:38 +00:00
peers . push ( peer_info ) ;
2021-11-22 16:28:30 +00:00
}
2021-11-26 14:54:38 +00:00
let out = FindNodeAnswer { latency , peers } ;
2021-11-22 16:28:30 +00:00
Ok ( out )
}
// xxx do not process latency for routed messages
}