mod coders; mod destination; mod operation_waiter; mod rpc_app_call; mod rpc_app_message; mod rpc_cancel_tunnel; mod rpc_complete_tunnel; mod rpc_error; mod rpc_find_block; mod rpc_find_node; mod rpc_get_value; mod rpc_return_receipt; mod rpc_route; mod rpc_set_value; mod rpc_signal; mod rpc_start_tunnel; mod rpc_status; mod rpc_supply_block; mod rpc_validate_dial_info; mod rpc_value_changed; mod rpc_watch_value; pub use coders::*; pub use destination::*; pub use operation_waiter::*; pub use rpc_error::*; pub use rpc_status::*; use super::*; use crate::crypto::*; use futures_util::StreamExt; use network_manager::*; use receipt_manager::*; use routing_table::*; use stop_token::future::FutureExt; ///////////////////////////////////////////////////////////////////// #[derive(Debug, Clone)] struct RPCMessageHeaderDetailDirect { /// The decoded header of the envelope envelope: Envelope, /// The noderef of the peer that sent the message (not the original sender). Ensures node doesn't get evicted from routing table until we're done with it peer_noderef: NodeRef, /// The connection from the peer sent the message (not the original sender) connection_descriptor: ConnectionDescriptor, /// The routing domain the message was sent through routing_domain: RoutingDomain, } /// Header details for rpc messages received over only a safety route but not a private route #[derive(Debug, Clone)] struct RPCMessageHeaderDetailSafetyRouted { /// Remote safety route used remote_safety_route: PublicKey, /// The sequencing used for this route sequencing: Sequencing, } /// Header details for rpc messages received over a private route #[derive(Debug, Clone)] struct RPCMessageHeaderDetailPrivateRouted { /// Remote safety route used (or possibly node id the case of no safety route) remote_safety_route: PublicKey, /// The private route we received the rpc over private_route: PublicKey, // The safety spec for replying to this private routed rpc safety_spec: SafetySpec, } #[derive(Debug, Clone)] enum RPCMessageHeaderDetail { Direct(RPCMessageHeaderDetailDirect), SafetyRouted(RPCMessageHeaderDetailSafetyRouted), PrivateRouted(RPCMessageHeaderDetailPrivateRouted), } /// The decoded header of an RPC message #[derive(Debug, Clone)] struct RPCMessageHeader { /// Time the message was received, not sent timestamp: Timestamp, /// The length in bytes of the rpc message body body_len: ByteCount, /// The header detail depending on which way the message was received detail: RPCMessageHeaderDetail, } impl RPCMessageHeader {} #[derive(Debug)] pub struct RPCMessageData { contents: Vec, // rpc messages must be a canonicalized single segment } impl RPCMessageData { pub fn new(contents: Vec) -> Self { Self { contents } } pub fn get_reader( &self, ) -> Result, RPCError> { capnp::serialize_packed::read_message( self.contents.as_slice(), capnp::message::ReaderOptions::new(), ) .map_err(RPCError::protocol) } } #[derive(Debug)] struct RPCMessageEncoded { header: RPCMessageHeader, data: RPCMessageData, } #[derive(Debug)] pub(crate) struct RPCMessage { header: RPCMessageHeader, operation: RPCOperation, opt_sender_nr: Option, } pub fn builder_to_vec<'a, T>(builder: capnp::message::Builder) -> Result, RPCError> where T: capnp::message::Allocator + 'a, { let mut buffer = vec![]; capnp::serialize_packed::write_message(&mut buffer, &builder) .map_err(RPCError::protocol) .map_err(logthru_rpc!())?; Ok(buffer) } #[derive(Debug)] struct WaitableReply { handle: OperationWaitHandle, timeout_us: TimestampDuration, node_ref: NodeRef, send_ts: Timestamp, send_data_kind: SendDataKind, safety_route: Option, remote_private_route: Option, reply_private_route: Option, } ///////////////////////////////////////////////////////////////////// #[derive(Clone, Debug, Default)] pub struct Answer { pub latency: TimestampDuration, // how long it took to get this answer pub answer: T, // the answer itself } impl Answer { pub fn new(latency: TimestampDuration, answer: T) -> Self { Self { latency, answer } } } /// An operation that has been fully prepared for envelope r struct RenderedOperation { /// The rendered operation bytes message: Vec, /// Destination node id we're sending to node_id: PublicKey, /// Node to send envelope to (may not be destination node id in case of relay) node_ref: NodeRef, /// Total safety + private route hop count + 1 hop for the initial send hop_count: usize, /// The safety route used to send the message safety_route: Option, /// The private route used to send the message remote_private_route: Option, /// The private route requested to receive the reply reply_private_route: Option, } /// Node information exchanged during every RPC message #[derive(Default, Debug, Clone)] pub struct SenderSignedNodeInfo { /// The current signed node info of the sender if required signed_node_info: Option, /// The last timestamp of the target's node info to assist remote node with sending its latest node info target_node_info_ts: Timestamp, } impl SenderSignedNodeInfo { pub fn new_no_sni(target_node_info_ts: Timestamp) -> Self { Self { signed_node_info: None, target_node_info_ts, } } pub fn new(sender_signed_node_info: SignedNodeInfo, target_node_info_ts: Timestamp) -> Self { Self { signed_node_info: Some(sender_signed_node_info), target_node_info_ts, } } } #[derive(Copy, Clone, Debug)] enum RPCKind { Question, Statement, Answer, } ///////////////////////////////////////////////////////////////////// pub struct RPCProcessorInner { send_channel: Option, RPCMessageEncoded)>>, stop_source: Option, worker_join_handles: Vec>, } pub struct RPCProcessorUnlockedInner { timeout_us: TimestampDuration, queue_size: u32, concurrency: u32, max_route_hop_count: usize, validate_dial_info_receipt_time_ms: u32, update_callback: UpdateCallback, waiting_rpc_table: OperationWaiter, waiting_app_call_table: OperationWaiter>, } #[derive(Clone)] pub struct RPCProcessor { crypto: Crypto, config: VeilidConfig, network_manager: NetworkManager, routing_table: RoutingTable, inner: Arc>, unlocked_inner: Arc, } impl RPCProcessor { fn new_inner() -> RPCProcessorInner { RPCProcessorInner { send_channel: None, stop_source: None, worker_join_handles: Vec::new(), } } fn new_unlocked_inner( config: VeilidConfig, update_callback: UpdateCallback, ) -> RPCProcessorUnlockedInner { // make local copy of node id for easy access let c = config.get(); // set up channel let mut concurrency = c.network.rpc.concurrency; let queue_size = c.network.rpc.queue_size; let timeout_us = TimestampDuration::new(ms_to_us(c.network.rpc.timeout_ms)); let max_route_hop_count = c.network.rpc.max_route_hop_count as usize; if concurrency == 0 { concurrency = get_concurrency() / 2; if concurrency == 0 { concurrency = 1; } } let validate_dial_info_receipt_time_ms = c.network.dht.validate_dial_info_receipt_time_ms; RPCProcessorUnlockedInner { timeout_us, queue_size, concurrency, max_route_hop_count, validate_dial_info_receipt_time_ms, update_callback, waiting_rpc_table: OperationWaiter::new(), waiting_app_call_table: OperationWaiter::new(), } } pub fn new(network_manager: NetworkManager, update_callback: UpdateCallback) -> Self { let config = network_manager.config(); Self { crypto: network_manager.crypto(), config: config.clone(), network_manager: network_manager.clone(), routing_table: network_manager.routing_table(), inner: Arc::new(Mutex::new(Self::new_inner())), unlocked_inner: Arc::new(Self::new_unlocked_inner(config, update_callback)), } } pub fn network_manager(&self) -> NetworkManager { self.network_manager.clone() } pub fn routing_table(&self) -> RoutingTable { self.routing_table.clone() } ////////////////////////////////////////////////////////////////////// #[instrument(level = "debug", skip_all, err)] pub async fn startup(&self) -> EyreResult<()> { trace!("startup rpc processor"); let mut inner = self.inner.lock(); let channel = flume::bounded(self.unlocked_inner.queue_size as usize); inner.send_channel = Some(channel.0.clone()); inner.stop_source = Some(StopSource::new()); // spin up N workers trace!( "Spinning up {} RPC workers", self.unlocked_inner.concurrency ); for _ in 0..self.unlocked_inner.concurrency { let this = self.clone(); let receiver = channel.1.clone(); let jh = spawn(Self::rpc_worker( this, inner.stop_source.as_ref().unwrap().token(), receiver, )); inner.worker_join_handles.push(jh); } Ok(()) } #[instrument(level = "debug", skip_all)] pub async fn shutdown(&self) { debug!("starting rpc processor shutdown"); // Stop the rpc workers let mut unord = FuturesUnordered::new(); { let mut inner = self.inner.lock(); // take the join handles out for h in inner.worker_join_handles.drain(..) { unord.push(h); } // drop the stop drop(inner.stop_source.take()); } debug!("stopping {} rpc worker tasks", unord.len()); // Wait for them to complete while unord.next().await.is_some() {} debug!("resetting rpc processor state"); // Release the rpc processor *self.inner.lock() = Self::new_inner(); debug!("finished rpc processor shutdown"); } ////////////////////////////////////////////////////////////////////// /// Determine if a SignedNodeInfo can be placed into the specified routing domain fn filter_node_info( &self, routing_domain: RoutingDomain, signed_node_info: &SignedNodeInfo, ) -> bool { let routing_table = self.routing_table(); routing_table.signed_node_info_is_valid_in_routing_domain(routing_domain, &signed_node_info) } ////////////////////////////////////////////////////////////////////// /// Search the DHT for a single node closest to a key and add it to the routing table and return the node reference /// If no node was found in the timeout, this returns None pub async fn search_dht_single_key( &self, _node_id: PublicKey, _count: u32, _fanout: u32, _timeout: Option, ) -> Result, RPCError> { //let routing_table = self.routing_table(); // xxx find node but stop if we find the exact node we want // xxx return whatever node is closest after the timeout Err(RPCError::unimplemented("search_dht_single_key")).map_err(logthru_rpc!(error)) } /// Search the DHT for the 'count' closest nodes to a key, adding them all to the routing table if they are not there and returning their node references pub async fn search_dht_multi_key( &self, _node_id: PublicKey, _count: u32, _fanout: u32, _timeout: Option, ) -> Result, RPCError> { // xxx return closest nodes after the timeout Err(RPCError::unimplemented("search_dht_multi_key")).map_err(logthru_rpc!(error)) } /// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference /// Note: This routine can possible be recursive, hence the SendPinBoxFuture async form pub fn resolve_node( &self, node_id: PublicKey, ) -> SendPinBoxFuture, RPCError>> { let this = self.clone(); Box::pin(async move { let routing_table = this.routing_table(); // First see if we have the node in our routing table already if let Some(nr) = routing_table.lookup_node_ref(node_id) { // ensure we have some dial info for the entry already, // if not, we should do the find_node anyway if nr.has_any_dial_info() { return Ok(Some(nr)); } } // If nobody knows where this node is, ask the DHT for it let (count, fanout, timeout) = { let c = this.config.get(); ( c.network.dht.resolve_node_count, c.network.dht.resolve_node_fanout, c.network.dht.resolve_node_timeout_ms.map(ms_to_us), ) }; let nr = this .search_dht_single_key(node_id, count, fanout, timeout) .await?; if let Some(nr) = &nr { if nr.node_id() != node_id { // found a close node, but not exact within our configured resolve_node timeout return Ok(None); } } Ok(nr) }) } #[instrument(level = "trace", skip(self, waitable_reply), err)] async fn wait_for_reply( &self, waitable_reply: WaitableReply, ) -> Result, RPCError> { let out = self .unlocked_inner .waiting_rpc_table .wait_for_op(waitable_reply.handle, waitable_reply.timeout_us) .await; match &out { Err(_) | Ok(TimeoutOr::Timeout) => { self.record_question_lost( waitable_reply.send_ts, waitable_reply.node_ref.clone(), waitable_reply.safety_route, waitable_reply.remote_private_route, waitable_reply.reply_private_route, ); } Ok(TimeoutOr::Value((rpcreader, _))) => { // Reply received let recv_ts = get_aligned_timestamp(); // Record answer received self.record_answer_received( waitable_reply.send_ts, recv_ts, rpcreader.header.body_len, waitable_reply.node_ref.clone(), waitable_reply.safety_route, waitable_reply.remote_private_route, waitable_reply.reply_private_route, ) } }; out } /// Wrap an operation with a private route inside a safety route fn wrap_with_route( &self, safety_selection: SafetySelection, remote_private_route: PrivateRoute, reply_private_route: Option, message_data: Vec, ) -> Result, RPCError> { let routing_table = self.routing_table(); let rss = routing_table.route_spec_store(); let pr_is_stub = remote_private_route.is_stub(); let pr_hop_count = remote_private_route.hop_count; let pr_pubkey = remote_private_route.public_key; // Compile the safety route with the private route let compiled_route: CompiledRoute = match rss .compile_safety_route(safety_selection, remote_private_route) .map_err(RPCError::internal)? { Some(cr) => cr, None => { return Ok(NetworkResult::no_connection_other( "private route could not be compiled at this time", )) } }; let sr_is_stub = compiled_route.safety_route.is_stub(); let sr_pubkey = compiled_route.safety_route.public_key; // Encrypt routed operation // Xmsg + ENC(Xmsg, DH(PKapr, SKbsr)) // xxx use factory method, get version from somewhere... let nonce = Crypto::get_random_nonce(); let dh_secret = self .crypto .cached_dh(&pr_pubkey, &compiled_route.secret) .map_err(RPCError::map_internal("dh failed"))?; let enc_msg_data = Crypto::encrypt_aead(&message_data, &nonce, &dh_secret, None) .map_err(RPCError::map_internal("encryption failed"))?; // Make the routed operation // xxx: replace MAX_CRYPTO_VERSION with the version from the factory let operation = RoutedOperation::new(safety_selection.get_sequencing(), nonce, enc_msg_data); // Prepare route operation let sr_hop_count = compiled_route.safety_route.hop_count; let route_operation = RPCOperationRoute { safety_route: compiled_route.safety_route, operation, }; let ssni_route = self .get_sender_signed_node_info(&Destination::direct(compiled_route.first_hop.clone()))?; let operation = RPCOperation::new_statement( RPCStatement::new(RPCStatementDetail::Route(route_operation)), ssni_route, ); // Convert message to bytes and return it let mut route_msg = ::capnp::message::Builder::new_default(); let mut route_operation = route_msg.init_root::(); operation.encode(&mut route_operation)?; let out_message = builder_to_vec(route_msg)?; // Get the first hop this is going to let out_node_id = compiled_route.first_hop.node_id(); let out_hop_count = (1 + sr_hop_count + pr_hop_count) as usize; let out = RenderedOperation { message: out_message, node_id: out_node_id, node_ref: compiled_route.first_hop, hop_count: out_hop_count, safety_route: if sr_is_stub { None } else { Some(sr_pubkey) }, remote_private_route: if pr_is_stub { None } else { Some(pr_pubkey) }, reply_private_route, }; Ok(NetworkResult::value(out)) } /// Produce a byte buffer that represents the wire encoding of the entire /// unencrypted envelope body for a RPC message. This incorporates /// wrapping a private and/or safety route if they are specified. #[instrument(level = "debug", skip(self, operation), err)] fn render_operation( &self, dest: Destination, operation: &RPCOperation, ) -> Result, RPCError> { let out: NetworkResult; // Encode message to a builder and make a message reader for it // Then produce the message as an unencrypted byte buffer let message = { let mut msg_builder = ::capnp::message::Builder::new_default(); let mut op_builder = msg_builder.init_root::(); operation.encode(&mut op_builder)?; builder_to_vec(msg_builder)? }; // Get reply private route if we are asking for one to be used in our 'respond to' let reply_private_route = match operation.kind() { RPCOperationKind::Question(q) => match q.respond_to() { RespondTo::Sender => None, RespondTo::PrivateRoute(pr) => Some(pr.public_key), }, RPCOperationKind::Statement(_) | RPCOperationKind::Answer(_) => None, }; // To where are we sending the request match dest { Destination::Direct { target: ref node_ref, safety_selection, } | Destination::Relay { relay: ref node_ref, target: _, safety_selection, } => { // Send to a node without a private route // -------------------------------------- // Get the actual destination node id accounting for relays let (node_ref, node_id) = if let Destination::Relay { relay: _, target: ref dht_key, safety_selection: _, } = dest { (node_ref.clone(), dht_key.clone()) } else { let node_id = node_ref.node_id(); (node_ref.clone(), node_id) }; // Handle the existence of safety route match safety_selection { SafetySelection::Unsafe(sequencing) => { // Apply safety selection sequencing requirement if it is more strict than the node_ref's sequencing requirement let mut node_ref = node_ref.clone(); if sequencing > node_ref.sequencing() { node_ref.set_sequencing(sequencing) } // Reply private route should be None here, even for questions assert!(reply_private_route.is_none()); // If no safety route is being used, and we're not sending to a private // route, we can use a direct envelope instead of routing out = NetworkResult::value(RenderedOperation { message, node_id, node_ref, hop_count: 1, safety_route: None, remote_private_route: None, reply_private_route: None, }); } SafetySelection::Safe(_) => { // No private route was specified for the request // but we are using a safety route, so we must create an empty private route let peer_info = match node_ref.make_peer_info(RoutingDomain::PublicInternet) { None => { return Ok(NetworkResult::no_connection_other( "No PublicInternet peer info for stub private route", )) } Some(pi) => pi, }; let private_route = PrivateRoute::new_stub(node_id, RouteNode::PeerInfo(peer_info)); // Wrap with safety route out = self.wrap_with_route( safety_selection, private_route, reply_private_route, message, )?; } }; } Destination::PrivateRoute { private_route, safety_selection, } => { // Send to private route // --------------------- // Reply with 'route' operation out = self.wrap_with_route( safety_selection, private_route, reply_private_route, message, )?; } } Ok(out) } /// Get signed node info to package with RPC messages to improve /// routing table caching when it is okay to do so #[instrument(level = "trace", skip(self), ret, err)] fn get_sender_signed_node_info( &self, dest: &Destination, ) -> Result { // Don't do this if the sender is to remain private // Otherwise we would be attaching the original sender's identity to the final destination, // thus defeating the purpose of the safety route entirely :P match dest.get_safety_selection() { SafetySelection::Unsafe(_) => {} SafetySelection::Safe(_) => { return Ok(SenderSignedNodeInfo::default()); } } // Get the target we're sending to let routing_table = self.routing_table(); let target = match dest { Destination::Direct { target, safety_selection: _, } => target.clone(), Destination::Relay { relay: _, target, safety_selection: _, } => { if let Some(target) = routing_table.lookup_node_ref(*target) { target } else { // Target was not in our routing table return Ok(SenderSignedNodeInfo::default()); } } Destination::PrivateRoute { private_route: _, safety_selection: _, } => { return Ok(SenderSignedNodeInfo::default()); } }; let Some(routing_domain) = target.best_routing_domain() else { // No routing domain for target? return Err(RPCError::internal(format!("No routing domain for target: {}", target))); }; // Get the target's node info timestamp let target_node_info_ts = target.node_info_ts(routing_domain); // Don't return our node info if it's not valid yet let Some(own_peer_info) = routing_table.get_own_peer_info(routing_domain) else { return Ok(SenderSignedNodeInfo::new_no_sni(target_node_info_ts)); }; // Get our node info timestamp let our_node_info_ts = own_peer_info.signed_node_info.timestamp(); // If the target has seen our node info already don't send it again if target.has_seen_our_node_info_ts(routing_domain, our_node_info_ts) { return Ok(SenderSignedNodeInfo::new_no_sni(target_node_info_ts)); } Ok(SenderSignedNodeInfo::new( own_peer_info.signed_node_info, target_node_info_ts, )) } /// Record failure to send to node or route fn record_send_failure( &self, rpc_kind: RPCKind, send_ts: Timestamp, node_ref: NodeRef, safety_route: Option, remote_private_route: Option, ) { let wants_answer = matches!(rpc_kind, RPCKind::Question); // Record for node if this was not sent via a route if safety_route.is_none() && remote_private_route.is_none() { node_ref.stats_failed_to_send(send_ts, wants_answer); return; } // If safety route was in use, record failure to send there if let Some(sr_pubkey) = &safety_route { let rss = self.routing_table.route_spec_store(); rss.with_route_stats(send_ts, sr_pubkey, |s| s.record_send_failed()); } else { // If no safety route was in use, then it's the private route's fault if we have one if let Some(pr_pubkey) = &remote_private_route { let rss = self.routing_table.route_spec_store(); rss.with_route_stats(send_ts, pr_pubkey, |s| s.record_send_failed()); } } } /// Record question lost to node or route fn record_question_lost( &self, send_ts: Timestamp, node_ref: NodeRef, safety_route: Option, remote_private_route: Option, private_route: Option, ) { // Record for node if this was not sent via a route if safety_route.is_none() && remote_private_route.is_none() { node_ref.stats_question_lost(); return; } // Get route spec store let rss = self.routing_table.route_spec_store(); // If safety route was used, record question lost there if let Some(sr_pubkey) = &safety_route { let rss = self.routing_table.route_spec_store(); rss.with_route_stats(send_ts, sr_pubkey, |s| { s.record_question_lost(); }); } // If remote private route was used, record question lost there if let Some(rpr_pubkey) = &remote_private_route { rss.with_route_stats(send_ts, rpr_pubkey, |s| { s.record_question_lost(); }); } // If private route was used, record question lost there if let Some(pr_pubkey) = &private_route { rss.with_route_stats(send_ts, pr_pubkey, |s| { s.record_question_lost(); }); } } /// Record success sending to node or route fn record_send_success( &self, rpc_kind: RPCKind, send_ts: Timestamp, bytes: ByteCount, node_ref: NodeRef, safety_route: Option, remote_private_route: Option, ) { let wants_answer = matches!(rpc_kind, RPCKind::Question); // Record for node if this was not sent via a route if safety_route.is_none() && remote_private_route.is_none() { node_ref.stats_question_sent(send_ts, bytes, wants_answer); return; } // Get route spec store let rss = self.routing_table.route_spec_store(); // If safety route was used, record send there if let Some(sr_pubkey) = &safety_route { rss.with_route_stats(send_ts, sr_pubkey, |s| { s.record_sent(send_ts, bytes); }); } // If remote private route was used, record send there if let Some(pr_pubkey) = &remote_private_route { let rss = self.routing_table.route_spec_store(); rss.with_route_stats(send_ts, pr_pubkey, |s| { s.record_sent(send_ts, bytes); }); } } /// Record answer received from node or route fn record_answer_received( &self, send_ts: Timestamp, recv_ts: Timestamp, bytes: ByteCount, node_ref: NodeRef, safety_route: Option, remote_private_route: Option, reply_private_route: Option, ) { // Record stats for remote node if this was direct if safety_route.is_none() && remote_private_route.is_none() && reply_private_route.is_none() { node_ref.stats_answer_rcvd(send_ts, recv_ts, bytes); return; } // Get route spec store let rss = self.routing_table.route_spec_store(); // Get latency for all local routes let mut total_local_latency = TimestampDuration::new(0u64); let total_latency: TimestampDuration = recv_ts.saturating_sub(send_ts); // If safety route was used, record route there if let Some(sr_pubkey) = &safety_route { rss.with_route_stats(send_ts, sr_pubkey, |s| { // If we received an answer, the safety route we sent over can be considered tested s.record_tested(recv_ts); // If we used a safety route to send, use our last tested latency total_local_latency += s.latency_stats().average }); } // If local private route was used, record route there if let Some(pr_pubkey) = &reply_private_route { rss.with_route_stats(send_ts, pr_pubkey, |s| { // Record received bytes s.record_received(recv_ts, bytes); // If we used a private route to receive, use our last tested latency total_local_latency += s.latency_stats().average }); } // If remote private route was used, record there if let Some(rpr_pubkey) = &remote_private_route { rss.with_route_stats(send_ts, rpr_pubkey, |s| { // Record received bytes s.record_received(recv_ts, bytes); // The remote route latency is recorded using the total latency minus the total local latency let remote_latency = total_latency.saturating_sub(total_local_latency); s.record_latency(remote_latency); }); // If we sent to a private route without a safety route // We need to mark our own node info as having been seen so we can optimize sending it if let Err(e) = rss.mark_remote_private_route_seen_our_node_info(&rpr_pubkey, recv_ts) { log_rpc!(error "private route missing: {}", e); } // We can't record local route latency if a remote private route was used because // there is no way other than the prior latency estimation to determine how much time was spent // in the remote private route // Instead, we rely on local route testing to give us latency numbers for our local routes } else { // If no remote private route was used, then record half the total latency on our local routes // This is fine because if we sent with a local safety route, // then we must have received with a local private route too, per the design rules if let Some(sr_pubkey) = &safety_route { let rss = self.routing_table.route_spec_store(); rss.with_route_stats(send_ts, sr_pubkey, |s| { s.record_latency(total_latency / 2u64); }); } if let Some(pr_pubkey) = &reply_private_route { rss.with_route_stats(send_ts, pr_pubkey, |s| { s.record_latency(total_latency / 2u64); }); } } } /// Record question or statement received from node or route fn record_question_received(&self, msg: &RPCMessage) { let recv_ts = msg.header.timestamp; let bytes = msg.header.body_len; // Process messages based on how they were received match &msg.header.detail { // Process direct messages RPCMessageHeaderDetail::Direct(_) => { if let Some(sender_nr) = msg.opt_sender_nr.clone() { sender_nr.stats_question_rcvd(recv_ts, bytes); return; } } // Process messages that arrived with no private route (private route stub) RPCMessageHeaderDetail::SafetyRouted(d) => { let rss = self.routing_table.route_spec_store(); // This may record nothing if the remote safety route is not also // a remote private route that been imported, but that's okay rss.with_route_stats(recv_ts, &d.remote_safety_route, |s| { s.record_received(recv_ts, bytes); }); } // Process messages that arrived to our private route RPCMessageHeaderDetail::PrivateRouted(d) => { let rss = self.routing_table.route_spec_store(); // This may record nothing if the remote safety route is not also // a remote private route that been imported, but that's okay // it could also be a node id if no remote safety route was used // in which case this also will do nothing rss.with_route_stats(recv_ts, &d.remote_safety_route, |s| { s.record_received(recv_ts, bytes); }); // Record for our local private route we received over rss.with_route_stats(recv_ts, &d.private_route, |s| { s.record_received(recv_ts, bytes); }); } } } /// Issue a question over the network, possibly using an anonymized route #[instrument(level = "debug", skip(self, question), err)] async fn question( &self, dest: Destination, question: RPCQuestion, ) -> Result, RPCError> { // Get sender signed node info if we should send that let ssni = self.get_sender_signed_node_info(&dest)?; // Wrap question in operation let operation = RPCOperation::new_question(question, ssni); let op_id = operation.op_id(); // Log rpc send trace!(target: "rpc_message", dir = "send", kind = "question", op_id = op_id.as_u64(), desc = operation.kind().desc(), ?dest); // Produce rendered operation let RenderedOperation { message, node_id, node_ref, hop_count, safety_route, remote_private_route, reply_private_route, } = network_result_try!(self.render_operation(dest.clone(), &operation)?); // Calculate answer timeout // Timeout is number of hops times the timeout per hop let timeout_us = self.unlocked_inner.timeout_us * (hop_count as u64); // Set up op id eventual let handle = self.unlocked_inner.waiting_rpc_table.add_op_waiter(op_id); // Send question let bytes: ByteCount = (message.len() as u64).into(); let send_ts = get_aligned_timestamp(); let send_data_kind = network_result_try!(self .network_manager() .send_envelope(node_ref.clone(), Some(node_id), message) .await .map_err(|e| { // If we're returning an error, clean up self.record_send_failure(RPCKind::Question, send_ts, node_ref.clone(), safety_route, remote_private_route); RPCError::network(e) })? => { // If we couldn't send we're still cleaning up self.record_send_failure(RPCKind::Question, send_ts, node_ref.clone(), safety_route, remote_private_route); } ); // Successfully sent self.record_send_success( RPCKind::Question, send_ts, bytes, node_ref.clone(), safety_route, remote_private_route, ); // Pass back waitable reply completion Ok(NetworkResult::value(WaitableReply { handle, timeout_us, node_ref, send_ts, send_data_kind, safety_route, remote_private_route, reply_private_route, })) } // Issue a statement over the network, possibly using an anonymized route #[instrument(level = "debug", skip(self, statement), err)] async fn statement( &self, dest: Destination, statement: RPCStatement, ) -> Result, RPCError> { // Get sender signed node info if we should send that let ssni = self.get_sender_signed_node_info(&dest)?; // Wrap statement in operation let operation = RPCOperation::new_statement(statement, ssni); // Log rpc send trace!(target: "rpc_message", dir = "send", kind = "statement", op_id = operation.op_id().as_u64(), desc = operation.kind().desc(), ?dest); // Produce rendered operation let RenderedOperation { message, node_id, node_ref, hop_count: _, safety_route, remote_private_route, reply_private_route: _, } = network_result_try!(self.render_operation(dest, &operation)?); // Send statement let bytes: ByteCount = (message.len() as u64).into(); let send_ts = get_aligned_timestamp(); let _send_data_kind = network_result_try!(self .network_manager() .send_envelope(node_ref.clone(), Some(node_id), message) .await .map_err(|e| { // If we're returning an error, clean up self.record_send_failure(RPCKind::Statement, send_ts, node_ref.clone(), safety_route, remote_private_route); RPCError::network(e) })? => { // If we couldn't send we're still cleaning up self.record_send_failure(RPCKind::Statement, send_ts, node_ref.clone(), safety_route, remote_private_route); } ); // Successfully sent self.record_send_success( RPCKind::Statement, send_ts, bytes, node_ref, safety_route, remote_private_route, ); Ok(NetworkResult::value(())) } // Issue a reply over the network, possibly using an anonymized route // The request must want a response, or this routine fails #[instrument(level = "debug", skip(self, request, answer), err)] async fn answer( &self, request: RPCMessage, answer: RPCAnswer, ) -> Result, RPCError> { // Extract destination from respond_to let dest = network_result_try!(self.get_respond_to_destination(&request)); // Get sender signed node info if we should send that let ssni = self.get_sender_signed_node_info(&dest)?; // Wrap answer in operation let operation = RPCOperation::new_answer(&request.operation, answer, ssni); // Log rpc send trace!(target: "rpc_message", dir = "send", kind = "answer", op_id = operation.op_id().as_u64(), desc = operation.kind().desc(), ?dest); // Produce rendered operation let RenderedOperation { message, node_id, node_ref, hop_count: _, safety_route, remote_private_route, reply_private_route: _, } = network_result_try!(self.render_operation(dest, &operation)?); // Send the reply let bytes: ByteCount = (message.len() as u64).into(); let send_ts = get_aligned_timestamp(); network_result_try!(self.network_manager() .send_envelope(node_ref.clone(), Some(node_id), message) .await .map_err(|e| { // If we're returning an error, clean up self.record_send_failure(RPCKind::Answer, send_ts, node_ref.clone(), safety_route, remote_private_route); RPCError::network(e) })? => { // If we couldn't send we're still cleaning up self.record_send_failure(RPCKind::Answer, send_ts, node_ref.clone(), safety_route, remote_private_route); } ); // Reply successfully sent self.record_send_success( RPCKind::Answer, send_ts, bytes, node_ref, safety_route, remote_private_route, ); Ok(NetworkResult::value(())) } ////////////////////////////////////////////////////////////////////// #[instrument(level = "trace", skip(self, encoded_msg), err)] async fn process_rpc_message( &self, encoded_msg: RPCMessageEncoded, ) -> Result, RPCError> { // Decode operation appropriately based on header detail let msg = match &encoded_msg.header.detail { RPCMessageHeaderDetail::Direct(detail) => { // Get the routing domain this message came over let routing_domain = detail.routing_domain; // Decode the operation let sender_node_id = detail.envelope.get_sender_id(); // Decode the RPC message let operation = { let reader = encoded_msg.data.get_reader()?; let op_reader = reader .get_root::() .map_err(RPCError::protocol) .map_err(logthru_rpc!())?; RPCOperation::decode(&op_reader, Some(&sender_node_id))? }; // Get the sender noderef, incorporating and 'sender node info' let mut opt_sender_nr: Option = None; if let Some(sender_node_info) = operation.sender_node_info() { // Sender NodeInfo was specified, update our routing table with it if !self.filter_node_info(routing_domain, &sender_node_info) { return Err(RPCError::invalid_format( "sender signednodeinfo has invalid peer scope", )); } opt_sender_nr = self.routing_table().register_node_with_signed_node_info( routing_domain, sender_node_id, sender_node_info.clone(), false, ); } // look up sender node, in case it's different than our peer due to relaying if opt_sender_nr.is_none() { opt_sender_nr = self.routing_table().lookup_node_ref(sender_node_id) } // Update the 'seen our node info' timestamp to determine if this node needs a // 'node info update' ping if let Some(sender_nr) = &opt_sender_nr { sender_nr.set_our_node_info_ts(routing_domain, operation.target_node_info_ts()); } // Make the RPC message RPCMessage { header: encoded_msg.header, operation, opt_sender_nr, } } RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => { // Decode the RPC message let operation = { let reader = encoded_msg.data.get_reader()?; let op_reader = reader .get_root::() .map_err(RPCError::protocol) .map_err(logthru_rpc!())?; RPCOperation::decode(&op_reader, None)? }; // Make the RPC message RPCMessage { header: encoded_msg.header, operation, opt_sender_nr: None, } } }; // Process stats for questions/statements received let kind = match msg.operation.kind() { RPCOperationKind::Question(_) => { self.record_question_received(&msg); if let Some(sender_nr) = msg.opt_sender_nr.clone() { sender_nr.stats_question_rcvd(msg.header.timestamp, msg.header.body_len); } "question" } RPCOperationKind::Statement(_) => { if let Some(sender_nr) = msg.opt_sender_nr.clone() { sender_nr.stats_question_rcvd(msg.header.timestamp, msg.header.body_len); } "statement" } RPCOperationKind::Answer(_) => { // Answer stats are processed in wait_for_reply "answer" } }; // Log rpc receive trace!(target: "rpc_message", dir = "recv", kind, op_id = msg.operation.op_id().as_u64(), desc = msg.operation.kind().desc(), header = ?msg.header); // Process specific message kind match msg.operation.kind() { RPCOperationKind::Question(q) => match q.detail() { RPCQuestionDetail::StatusQ(_) => self.process_status_q(msg).await, RPCQuestionDetail::FindNodeQ(_) => self.process_find_node_q(msg).await, RPCQuestionDetail::AppCallQ(_) => self.process_app_call_q(msg).await, RPCQuestionDetail::GetValueQ(_) => self.process_get_value_q(msg).await, RPCQuestionDetail::SetValueQ(_) => self.process_set_value_q(msg).await, RPCQuestionDetail::WatchValueQ(_) => self.process_watch_value_q(msg).await, RPCQuestionDetail::SupplyBlockQ(_) => self.process_supply_block_q(msg).await, RPCQuestionDetail::FindBlockQ(_) => self.process_find_block_q(msg).await, RPCQuestionDetail::StartTunnelQ(_) => self.process_start_tunnel_q(msg).await, RPCQuestionDetail::CompleteTunnelQ(_) => self.process_complete_tunnel_q(msg).await, RPCQuestionDetail::CancelTunnelQ(_) => self.process_cancel_tunnel_q(msg).await, }, RPCOperationKind::Statement(s) => match s.detail() { RPCStatementDetail::ValidateDialInfo(_) => { self.process_validate_dial_info(msg).await } RPCStatementDetail::Route(_) => self.process_route(msg).await, RPCStatementDetail::ValueChanged(_) => self.process_value_changed(msg).await, RPCStatementDetail::Signal(_) => self.process_signal(msg).await, RPCStatementDetail::ReturnReceipt(_) => self.process_return_receipt(msg).await, RPCStatementDetail::AppMessage(_) => self.process_app_message(msg).await, }, RPCOperationKind::Answer(_) => { self.unlocked_inner .waiting_rpc_table .complete_op_waiter(msg.operation.op_id(), msg) .await?; Ok(NetworkResult::value(())) } } } async fn rpc_worker( self, stop_token: StopToken, receiver: flume::Receiver<(Option, RPCMessageEncoded)>, ) { while let Ok(Ok((_span_id, msg))) = receiver.recv_async().timeout_at(stop_token.clone()).await { let rpc_worker_span = span!(parent: None, Level::TRACE, "rpc_worker recv"); // xxx: causes crash (Missing otel data span extensions) // rpc_worker_span.follows_from(span_id); let res = match self .process_rpc_message(msg) .instrument(rpc_worker_span) .await { Err(e) => { log_rpc!(error "couldn't process rpc message: {}", e); continue; } Ok(v) => v, }; network_result_value_or_log!(res => {}); } } #[instrument(level = "trace", skip(self, body), err)] pub fn enqueue_direct_message( &self, envelope: Envelope, peer_noderef: NodeRef, connection_descriptor: ConnectionDescriptor, routing_domain: RoutingDomain, body: Vec, ) -> EyreResult<()> { let msg = RPCMessageEncoded { header: RPCMessageHeader { detail: RPCMessageHeaderDetail::Direct(RPCMessageHeaderDetailDirect { envelope, peer_noderef, connection_descriptor, routing_domain, }), timestamp: get_aligned_timestamp(), body_len: ByteCount::new(body.len() as u64), }, data: RPCMessageData { contents: body }, }; let send_channel = { let inner = self.inner.lock(); inner.send_channel.as_ref().unwrap().clone() }; let span_id = Span::current().id(); send_channel .try_send((span_id, msg)) .wrap_err("failed to enqueue received RPC message")?; Ok(()) } #[instrument(level = "trace", skip(self, body), err)] pub fn enqueue_safety_routed_message( &self, remote_safety_route: PublicKey, sequencing: Sequencing, body: Vec, ) -> EyreResult<()> { let msg = RPCMessageEncoded { header: RPCMessageHeader { detail: RPCMessageHeaderDetail::SafetyRouted(RPCMessageHeaderDetailSafetyRouted { remote_safety_route, sequencing, }), timestamp: get_aligned_timestamp(), body_len: (body.len() as u64).into(), }, data: RPCMessageData { contents: body }, }; let send_channel = { let inner = self.inner.lock(); inner.send_channel.as_ref().unwrap().clone() }; let span_id = Span::current().id(); send_channel .try_send((span_id, msg)) .wrap_err("failed to enqueue received RPC message")?; Ok(()) } #[instrument(level = "trace", skip(self, body), err)] pub fn enqueue_private_routed_message( &self, remote_safety_route: PublicKey, private_route: PublicKey, safety_spec: SafetySpec, body: Vec, ) -> EyreResult<()> { let msg = RPCMessageEncoded { header: RPCMessageHeader { detail: RPCMessageHeaderDetail::PrivateRouted( RPCMessageHeaderDetailPrivateRouted { remote_safety_route, private_route, safety_spec, }, ), timestamp: get_aligned_timestamp(), body_len: (body.len() as u64).into(), }, data: RPCMessageData { contents: body }, }; let send_channel = { let inner = self.inner.lock(); inner.send_channel.as_ref().unwrap().clone() }; let span_id = Span::current().id(); send_channel .try_send((span_id, msg)) .wrap_err("failed to enqueue received RPC message")?; Ok(()) } }