From cc715dfc968a9b87836b745419785d02c1fb2c67 Mon Sep 17 00:00:00 2001 From: John Smith Date: Thu, 24 Mar 2022 22:07:55 -0400 Subject: [PATCH] rpc sender dialinfo --- veilid-core/src/routing_table/bucket_entry.rs | 21 ++++++++++++- veilid-core/src/routing_table/mod.rs | 16 +++++++++- veilid-core/src/routing_table/node_ref.rs | 16 ++++++++-- veilid-core/src/rpc_processor/mod.rs | 31 ++++++++++++++++--- 4 files changed, 75 insertions(+), 9 deletions(-) diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 64c63d9a..d0cece65 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -30,6 +30,7 @@ pub enum BucketEntryState { pub struct BucketEntry { pub(super) ref_count: u32, min_max_version: Option<(u8, u8)>, + seen_our_dial_info: bool, last_connection: Option<(ConnectionDescriptor, u64)>, dial_infos: Vec, latency_stats_accounting: LatencyStatsAccounting, @@ -43,6 +44,7 @@ impl BucketEntry { Self { ref_count: 0, min_max_version: None, + seen_our_dial_info: false, last_connection: None, dial_infos: Vec::new(), latency_stats_accounting: LatencyStatsAccounting::new(), @@ -132,6 +134,14 @@ impl BucketEntry { self.peer_stats.node_info = Some(node_info); } + pub fn set_seen_our_dial_info(&mut self, seen: bool) { + self.seen_our_dial_info = seen; + } + + pub fn has_seen_our_dial_info(&self) -> bool { + self.seen_our_dial_info + } + ///// stats methods // called every ROLLING_TRANSFERS_INTERVAL_SECS seconds pub(super) fn roll_transfers(&mut self, last_ts: u64, cur_ts: u64) { @@ -168,8 +178,17 @@ impl BucketEntry { } } + // Check if this node needs a ping right now to validate it is still reachable pub(super) fn needs_ping(&self, cur_ts: u64) -> bool { - // if we need a ping right now to validate us + // See which ping pattern we are to use + let mut state = self.state(cur_ts); + + // If the current dial info hasn't been recognized, + // then we gotta ping regardless so treat the node as unreliable, briefly + if !self.seen_our_dial_info && matches!(state, BucketEntryState::Reliable) { + state = BucketEntryState::Unreliable; + } + match self.state(cur_ts) { BucketEntryState::Reliable => { // If we are in a reliable state, we need a ping on an exponential scale diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 468f4b77..51c43e9d 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -220,6 +220,11 @@ impl RoutingTable { timestamp, }); + // Re-sort dial info to endure preference ordering + inner + .dial_info_details + .sort_by(|a, b| a.dial_info.cmp(&b.dial_info)); + info!( "{}Dial Info: {}", if dial_info.is_local() { @@ -257,6 +262,14 @@ impl RoutingTable { } fn trigger_changed_dial_info(inner: &mut RoutingTableInner) { + // Clear 'seen dial info' bits on routing table entries so we know to ping them + for b in inner.buckets { + for e in b.entries_mut() { + e.1.set_seen_our_dial_info(false); + } + } + + // Release any waiters let mut new_eventual = Eventual::new(); core::mem::swap(&mut inner.eventual_changed_dial_info, &mut new_eventual); spawn(new_eventual.resolve()).detach(); @@ -454,11 +467,12 @@ impl RoutingTable { let rpc_processor = self.rpc_processor(); let res = rpc_processor + .clone() .rpc_call_find_node( Destination::Direct(node_ref.clone()), node_id, None, - RespondTo::Sender, + rpc_processor.get_respond_to_sender(node_ref.clone()), ) .await .map_err(map_to_string) diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 47ad5f17..04351df9 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -35,9 +35,9 @@ impl NodeRef { self.node_id } - // pub fn dial_info_filter(&self) -> DialInfoFilter { - // self.dial_info_filter.clone() - // } + pub fn dial_info_filter(&self) -> &DialInfoFilter { + &self.dial_info_filter + } pub fn operate(&self, f: F) -> T where @@ -46,6 +46,16 @@ impl NodeRef { self.routing_table.operate_on_bucket_entry(self.node_id, f) } + // Returns if this node has seen and acknowledged our node's dial info yet + pub fn has_seen_our_dial_info(&self) -> bool { + let nm = self.routing_table.network_manager(); + self.operate(|e| e.has_seen_our_dial_info()) + } + pub fn set_seen_our_dial_info(&self) { + let nm = self.routing_table.network_manager(); + self.operate(|e| e.set_seen_our_dial_info(true)); + } + // Returns the best dial info to attempt a connection to this node pub fn best_dial_info(&self) -> Option { let nm = self.routing_table.network_manager(); diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 415f9dc8..335d25da 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -31,7 +31,7 @@ pub enum Destination { #[derive(Debug, Clone)] pub enum RespondTo { None, - Sender, + Sender(Option), PrivateRoute(PrivateRoute), } @@ -44,8 +44,12 @@ impl RespondTo { Self::None => { builder.set_none(()); } - Self::Sender => { - builder.set_sender(()); + Self::Sender(Some(di)) => { + let mut di_builder = builder.init_sender(); + encode_dial_info(di, &mut di_builder)?; + } + Self::Sender(None) => { + builder.init_sender(); } Self::PrivateRoute(pr) => { let mut pr_builder = builder.reborrow().init_private_route(); @@ -357,6 +361,9 @@ impl RPCProcessor { } } Ok((rpcreader, _)) => { + // Note that we definitely received this peer info since we got a reply + waitable_reply.node_ref.set_seen_our_dial_info(); + // Reply received let recv_ts = get_timestamp(); if waitable_reply.is_ping { @@ -1333,6 +1340,21 @@ impl RPCProcessor { Ok(()) } + // Gets a 'RespondTo::Sender' that contains either our dial info, + // or None if the peer has seen our dial info before + pub fn get_respond_to_sender(&self, peer: NodeRef) -> RespondTo { + if peer.has_seen_our_dial_info() { + RespondTo::Sender(None) + } else if let Some(did) = self + .routing_table() + .first_filtered_dial_info_detail(peer.dial_info_filter()) + { + RespondTo::Sender(Some(did.dial_info.clone())) + } else { + RespondTo::Sender(None) + } + } + // Send InfoQ RPC request, receive InfoA answer pub async fn rpc_call_info(self, peer: NodeRef) -> Result { let info_q_msg = { @@ -1340,7 +1362,8 @@ impl RPCProcessor { let mut question = info_q_msg.init_root::(); question.set_op_id(self.get_next_op_id()); let mut respond_to = question.reborrow().init_respond_to(); - respond_to.set_sender(()); + self.get_respond_to_sender(peer.clone()) + .encode(&mut respond_to); let detail = question.reborrow().init_detail(); detail.init_info_q();