rpc sender dialinfo
This commit is contained in:
parent
c276dd7796
commit
cc715dfc96
@ -30,6 +30,7 @@ pub enum BucketEntryState {
|
|||||||
pub struct BucketEntry {
|
pub struct BucketEntry {
|
||||||
pub(super) ref_count: u32,
|
pub(super) ref_count: u32,
|
||||||
min_max_version: Option<(u8, u8)>,
|
min_max_version: Option<(u8, u8)>,
|
||||||
|
seen_our_dial_info: bool,
|
||||||
last_connection: Option<(ConnectionDescriptor, u64)>,
|
last_connection: Option<(ConnectionDescriptor, u64)>,
|
||||||
dial_infos: Vec<DialInfo>,
|
dial_infos: Vec<DialInfo>,
|
||||||
latency_stats_accounting: LatencyStatsAccounting,
|
latency_stats_accounting: LatencyStatsAccounting,
|
||||||
@ -43,6 +44,7 @@ impl BucketEntry {
|
|||||||
Self {
|
Self {
|
||||||
ref_count: 0,
|
ref_count: 0,
|
||||||
min_max_version: None,
|
min_max_version: None,
|
||||||
|
seen_our_dial_info: false,
|
||||||
last_connection: None,
|
last_connection: None,
|
||||||
dial_infos: Vec::new(),
|
dial_infos: Vec::new(),
|
||||||
latency_stats_accounting: LatencyStatsAccounting::new(),
|
latency_stats_accounting: LatencyStatsAccounting::new(),
|
||||||
@ -132,6 +134,14 @@ impl BucketEntry {
|
|||||||
self.peer_stats.node_info = Some(node_info);
|
self.peer_stats.node_info = Some(node_info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn set_seen_our_dial_info(&mut self, seen: bool) {
|
||||||
|
self.seen_our_dial_info = seen;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn has_seen_our_dial_info(&self) -> bool {
|
||||||
|
self.seen_our_dial_info
|
||||||
|
}
|
||||||
|
|
||||||
///// stats methods
|
///// stats methods
|
||||||
// called every ROLLING_TRANSFERS_INTERVAL_SECS seconds
|
// called every ROLLING_TRANSFERS_INTERVAL_SECS seconds
|
||||||
pub(super) fn roll_transfers(&mut self, last_ts: u64, cur_ts: u64) {
|
pub(super) fn roll_transfers(&mut self, last_ts: u64, cur_ts: u64) {
|
||||||
@ -168,8 +178,17 @@ impl BucketEntry {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if this node needs a ping right now to validate it is still reachable
|
||||||
pub(super) fn needs_ping(&self, cur_ts: u64) -> bool {
|
pub(super) fn needs_ping(&self, cur_ts: u64) -> bool {
|
||||||
// if we need a ping right now to validate us
|
// See which ping pattern we are to use
|
||||||
|
let mut state = self.state(cur_ts);
|
||||||
|
|
||||||
|
// If the current dial info hasn't been recognized,
|
||||||
|
// then we gotta ping regardless so treat the node as unreliable, briefly
|
||||||
|
if !self.seen_our_dial_info && matches!(state, BucketEntryState::Reliable) {
|
||||||
|
state = BucketEntryState::Unreliable;
|
||||||
|
}
|
||||||
|
|
||||||
match self.state(cur_ts) {
|
match self.state(cur_ts) {
|
||||||
BucketEntryState::Reliable => {
|
BucketEntryState::Reliable => {
|
||||||
// If we are in a reliable state, we need a ping on an exponential scale
|
// If we are in a reliable state, we need a ping on an exponential scale
|
||||||
|
@ -220,6 +220,11 @@ impl RoutingTable {
|
|||||||
timestamp,
|
timestamp,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Re-sort dial info to endure preference ordering
|
||||||
|
inner
|
||||||
|
.dial_info_details
|
||||||
|
.sort_by(|a, b| a.dial_info.cmp(&b.dial_info));
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"{}Dial Info: {}",
|
"{}Dial Info: {}",
|
||||||
if dial_info.is_local() {
|
if dial_info.is_local() {
|
||||||
@ -257,6 +262,14 @@ impl RoutingTable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn trigger_changed_dial_info(inner: &mut RoutingTableInner) {
|
fn trigger_changed_dial_info(inner: &mut RoutingTableInner) {
|
||||||
|
// Clear 'seen dial info' bits on routing table entries so we know to ping them
|
||||||
|
for b in inner.buckets {
|
||||||
|
for e in b.entries_mut() {
|
||||||
|
e.1.set_seen_our_dial_info(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Release any waiters
|
||||||
let mut new_eventual = Eventual::new();
|
let mut new_eventual = Eventual::new();
|
||||||
core::mem::swap(&mut inner.eventual_changed_dial_info, &mut new_eventual);
|
core::mem::swap(&mut inner.eventual_changed_dial_info, &mut new_eventual);
|
||||||
spawn(new_eventual.resolve()).detach();
|
spawn(new_eventual.resolve()).detach();
|
||||||
@ -454,11 +467,12 @@ impl RoutingTable {
|
|||||||
let rpc_processor = self.rpc_processor();
|
let rpc_processor = self.rpc_processor();
|
||||||
|
|
||||||
let res = rpc_processor
|
let res = rpc_processor
|
||||||
|
.clone()
|
||||||
.rpc_call_find_node(
|
.rpc_call_find_node(
|
||||||
Destination::Direct(node_ref.clone()),
|
Destination::Direct(node_ref.clone()),
|
||||||
node_id,
|
node_id,
|
||||||
None,
|
None,
|
||||||
RespondTo::Sender,
|
rpc_processor.get_respond_to_sender(node_ref.clone()),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(map_to_string)
|
.map_err(map_to_string)
|
||||||
|
@ -35,9 +35,9 @@ impl NodeRef {
|
|||||||
self.node_id
|
self.node_id
|
||||||
}
|
}
|
||||||
|
|
||||||
// pub fn dial_info_filter(&self) -> DialInfoFilter {
|
pub fn dial_info_filter(&self) -> &DialInfoFilter {
|
||||||
// self.dial_info_filter.clone()
|
&self.dial_info_filter
|
||||||
// }
|
}
|
||||||
|
|
||||||
pub fn operate<T, F>(&self, f: F) -> T
|
pub fn operate<T, F>(&self, f: F) -> T
|
||||||
where
|
where
|
||||||
@ -46,6 +46,16 @@ impl NodeRef {
|
|||||||
self.routing_table.operate_on_bucket_entry(self.node_id, f)
|
self.routing_table.operate_on_bucket_entry(self.node_id, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns if this node has seen and acknowledged our node's dial info yet
|
||||||
|
pub fn has_seen_our_dial_info(&self) -> bool {
|
||||||
|
let nm = self.routing_table.network_manager();
|
||||||
|
self.operate(|e| e.has_seen_our_dial_info())
|
||||||
|
}
|
||||||
|
pub fn set_seen_our_dial_info(&self) {
|
||||||
|
let nm = self.routing_table.network_manager();
|
||||||
|
self.operate(|e| e.set_seen_our_dial_info(true));
|
||||||
|
}
|
||||||
|
|
||||||
// Returns the best dial info to attempt a connection to this node
|
// Returns the best dial info to attempt a connection to this node
|
||||||
pub fn best_dial_info(&self) -> Option<DialInfo> {
|
pub fn best_dial_info(&self) -> Option<DialInfo> {
|
||||||
let nm = self.routing_table.network_manager();
|
let nm = self.routing_table.network_manager();
|
||||||
|
@ -31,7 +31,7 @@ pub enum Destination {
|
|||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum RespondTo {
|
pub enum RespondTo {
|
||||||
None,
|
None,
|
||||||
Sender,
|
Sender(Option<DialInfo>),
|
||||||
PrivateRoute(PrivateRoute),
|
PrivateRoute(PrivateRoute),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -44,8 +44,12 @@ impl RespondTo {
|
|||||||
Self::None => {
|
Self::None => {
|
||||||
builder.set_none(());
|
builder.set_none(());
|
||||||
}
|
}
|
||||||
Self::Sender => {
|
Self::Sender(Some(di)) => {
|
||||||
builder.set_sender(());
|
let mut di_builder = builder.init_sender();
|
||||||
|
encode_dial_info(di, &mut di_builder)?;
|
||||||
|
}
|
||||||
|
Self::Sender(None) => {
|
||||||
|
builder.init_sender();
|
||||||
}
|
}
|
||||||
Self::PrivateRoute(pr) => {
|
Self::PrivateRoute(pr) => {
|
||||||
let mut pr_builder = builder.reborrow().init_private_route();
|
let mut pr_builder = builder.reborrow().init_private_route();
|
||||||
@ -357,6 +361,9 @@ impl RPCProcessor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok((rpcreader, _)) => {
|
Ok((rpcreader, _)) => {
|
||||||
|
// Note that we definitely received this peer info since we got a reply
|
||||||
|
waitable_reply.node_ref.set_seen_our_dial_info();
|
||||||
|
|
||||||
// Reply received
|
// Reply received
|
||||||
let recv_ts = get_timestamp();
|
let recv_ts = get_timestamp();
|
||||||
if waitable_reply.is_ping {
|
if waitable_reply.is_ping {
|
||||||
@ -1333,6 +1340,21 @@ impl RPCProcessor {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Gets a 'RespondTo::Sender' that contains either our dial info,
|
||||||
|
// or None if the peer has seen our dial info before
|
||||||
|
pub fn get_respond_to_sender(&self, peer: NodeRef) -> RespondTo {
|
||||||
|
if peer.has_seen_our_dial_info() {
|
||||||
|
RespondTo::Sender(None)
|
||||||
|
} else if let Some(did) = self
|
||||||
|
.routing_table()
|
||||||
|
.first_filtered_dial_info_detail(peer.dial_info_filter())
|
||||||
|
{
|
||||||
|
RespondTo::Sender(Some(did.dial_info.clone()))
|
||||||
|
} else {
|
||||||
|
RespondTo::Sender(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Send InfoQ RPC request, receive InfoA answer
|
// Send InfoQ RPC request, receive InfoA answer
|
||||||
pub async fn rpc_call_info(self, peer: NodeRef) -> Result<InfoAnswer, RPCError> {
|
pub async fn rpc_call_info(self, peer: NodeRef) -> Result<InfoAnswer, RPCError> {
|
||||||
let info_q_msg = {
|
let info_q_msg = {
|
||||||
@ -1340,7 +1362,8 @@ impl RPCProcessor {
|
|||||||
let mut question = info_q_msg.init_root::<veilid_capnp::operation::Builder>();
|
let mut question = info_q_msg.init_root::<veilid_capnp::operation::Builder>();
|
||||||
question.set_op_id(self.get_next_op_id());
|
question.set_op_id(self.get_next_op_id());
|
||||||
let mut respond_to = question.reborrow().init_respond_to();
|
let mut respond_to = question.reborrow().init_respond_to();
|
||||||
respond_to.set_sender(());
|
self.get_respond_to_sender(peer.clone())
|
||||||
|
.encode(&mut respond_to);
|
||||||
let detail = question.reborrow().init_detail();
|
let detail = question.reborrow().init_detail();
|
||||||
detail.init_info_q();
|
detail.init_info_q();
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user