private route work
This commit is contained in:
@@ -25,8 +25,6 @@ pub use routing_domains::*;
|
||||
pub use routing_table_inner::*;
|
||||
pub use stats_accounting::*;
|
||||
|
||||
const RECENT_PEERS_TABLE_SIZE: usize = 64;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
|
||||
pub type LowLevelProtocolPorts = BTreeSet<(LowLevelProtocolType, AddressType, u16)>;
|
||||
@@ -36,6 +34,8 @@ pub struct LowLevelPortInfo {
|
||||
pub low_level_protocol_ports: LowLevelProtocolPorts,
|
||||
pub protocol_to_port: ProtocolToPortMapping,
|
||||
}
|
||||
pub type RoutingTableEntryFilter =
|
||||
Box<dyn for<'r> FnMut(&'r RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> bool + Send>;
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct RoutingTableHealth {
|
||||
@@ -47,7 +47,7 @@ pub struct RoutingTableHealth {
|
||||
pub dead_entry_count: usize,
|
||||
}
|
||||
|
||||
struct RoutingTableUnlockedInner {
|
||||
pub(super) struct RoutingTableUnlockedInner {
|
||||
// Accessors
|
||||
config: VeilidConfig,
|
||||
network_manager: NetworkManager,
|
||||
@@ -164,7 +164,7 @@ impl RoutingTable {
|
||||
debug!("finished route spec store init");
|
||||
|
||||
let mut inner = self.inner.write();
|
||||
inner.init(self.clone());
|
||||
inner.init(self.clone())?;
|
||||
|
||||
inner.route_spec_store = Some(route_spec_store);
|
||||
|
||||
@@ -192,7 +192,9 @@ impl RoutingTable {
|
||||
inner.route_spec_store.take()
|
||||
};
|
||||
if let Some(rss) = rss {
|
||||
rss.save().await;
|
||||
if let Err(e) = rss.save().await {
|
||||
error!("couldn't save route spec store: {}", e);
|
||||
}
|
||||
}
|
||||
debug!("shutting down routing table");
|
||||
|
||||
@@ -545,23 +547,30 @@ impl RoutingTable {
|
||||
}
|
||||
|
||||
/// Makes a filter that finds nodes with a matching inbound dialinfo
|
||||
pub fn make_inbound_dial_info_entry_filter(
|
||||
pub fn make_inbound_dial_info_entry_filter<'a>(
|
||||
routing_domain: RoutingDomain,
|
||||
dial_info_filter: DialInfoFilter,
|
||||
) -> Box<dyn FnMut(&RoutingTableInner, &BucketEntryInner) -> bool> {
|
||||
) -> RoutingTableEntryFilter {
|
||||
// does it have matching public dial info?
|
||||
Box::new(move |_rti, e| {
|
||||
if let Some(ni) = e.node_info(routing_domain) {
|
||||
if ni
|
||||
.first_filtered_dial_info_detail(DialInfoDetail::NO_SORT, |did| {
|
||||
did.matches_filter(&dial_info_filter)
|
||||
})
|
||||
Box::new(move |rti, _k, e| {
|
||||
if let Some(e) = e {
|
||||
e.with(rti, |_rti, e| {
|
||||
if let Some(ni) = e.node_info(routing_domain) {
|
||||
if ni
|
||||
.first_filtered_dial_info_detail(DialInfoDetail::NO_SORT, |did| {
|
||||
did.matches_filter(&dial_info_filter)
|
||||
})
|
||||
.is_some()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
})
|
||||
} else {
|
||||
rti.first_filtered_dial_info_detail(routing_domain.into(), &dial_info_filter)
|
||||
.is_some()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
})
|
||||
}
|
||||
|
||||
@@ -569,48 +578,36 @@ impl RoutingTable {
|
||||
pub fn make_outbound_dial_info_entry_filter(
|
||||
routing_domain: RoutingDomain,
|
||||
dial_info: DialInfo,
|
||||
) -> Box<dyn FnMut(&RoutingTableInner, &BucketEntryInner) -> bool> {
|
||||
) -> RoutingTableEntryFilter {
|
||||
// does the node's outbound capabilities match the dialinfo?
|
||||
Box::new(move |_rti, e| {
|
||||
if let Some(ni) = e.node_info(routing_domain) {
|
||||
let dif = DialInfoFilter::all()
|
||||
.with_protocol_type_set(ni.outbound_protocols)
|
||||
.with_address_type_set(ni.address_types);
|
||||
if dial_info.matches_filter(&dif) {
|
||||
return true;
|
||||
}
|
||||
Box::new(move |rti, _k, e| {
|
||||
if let Some(e) = e {
|
||||
e.with(rti, |_rti, e| {
|
||||
if let Some(ni) = e.node_info(routing_domain) {
|
||||
let dif = DialInfoFilter::all()
|
||||
.with_protocol_type_set(ni.outbound_protocols)
|
||||
.with_address_type_set(ni.address_types);
|
||||
if dial_info.matches_filter(&dif) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
})
|
||||
} else {
|
||||
let dif = rti.get_outbound_dial_info_filter(routing_domain);
|
||||
dial_info.matches_filter(&dif)
|
||||
}
|
||||
false
|
||||
})
|
||||
}
|
||||
|
||||
/// Make a filter that wraps another filter
|
||||
pub fn combine_entry_filters(
|
||||
mut f1: Box<dyn FnMut(&RoutingTableInner, &BucketEntryInner) -> bool>,
|
||||
mut f2: Box<dyn FnMut(&RoutingTableInner, &BucketEntryInner) -> bool>,
|
||||
) -> Box<dyn FnMut(&RoutingTableInner, &BucketEntryInner) -> bool> {
|
||||
Box::new(move |rti, e| {
|
||||
if !f1(rti, e) {
|
||||
return false;
|
||||
}
|
||||
if !f2(rti, e) {
|
||||
return false;
|
||||
}
|
||||
true
|
||||
})
|
||||
}
|
||||
|
||||
pub fn find_fast_public_nodes_filtered<F>(
|
||||
pub fn find_fast_public_nodes_filtered(
|
||||
&self,
|
||||
node_count: usize,
|
||||
mut entry_filter: F,
|
||||
) -> Vec<NodeRef>
|
||||
where
|
||||
F: FnMut(&RoutingTableInner, &BucketEntryInner) -> bool,
|
||||
{
|
||||
filters: VecDeque<RoutingTableEntryFilter>,
|
||||
) -> Vec<NodeRef> {
|
||||
self.inner
|
||||
.read()
|
||||
.find_fast_public_nodes_filtered(self.clone(), node_count, entry_filter)
|
||||
.find_fast_public_nodes_filtered(self.clone(), node_count, filters)
|
||||
}
|
||||
|
||||
/// Retrieve up to N of each type of protocol capable nodes
|
||||
@@ -621,14 +618,12 @@ impl RoutingTable {
|
||||
ProtocolType::WS,
|
||||
ProtocolType::WSS,
|
||||
];
|
||||
let protocol_types_len = protocol_types.len();
|
||||
let mut nodes_proto_v4 = vec![0usize, 0usize, 0usize, 0usize];
|
||||
let mut nodes_proto_v6 = vec![0usize, 0usize, 0usize, 0usize];
|
||||
|
||||
self.find_fastest_nodes(
|
||||
// count
|
||||
protocol_types.len() * 2 * max_per_type,
|
||||
// filter
|
||||
move |rti, _k: DHTKey, v: Option<Arc<BucketEntry>>| {
|
||||
let filter = Box::new(
|
||||
move |rti: &RoutingTableInner, _k: DHTKey, v: Option<Arc<BucketEntry>>| {
|
||||
let entry = v.unwrap();
|
||||
entry.with(rti, |_rti, e| {
|
||||
// skip nodes on our local network here
|
||||
@@ -668,64 +663,68 @@ impl RoutingTable {
|
||||
.unwrap_or(false)
|
||||
})
|
||||
},
|
||||
// transform
|
||||
) as RoutingTableEntryFilter;
|
||||
|
||||
let filters = VecDeque::from([filter]);
|
||||
|
||||
self.find_fastest_nodes(
|
||||
protocol_types_len * 2 * max_per_type,
|
||||
filters,
|
||||
|_rti, k: DHTKey, v: Option<Arc<BucketEntry>>| {
|
||||
NodeRef::new(self.clone(), k, v.unwrap().clone(), None)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub fn find_peers_with_sort_and_filter<'a, 'b, F, C, T, O>(
|
||||
pub fn find_peers_with_sort_and_filter<C, T, O>(
|
||||
&self,
|
||||
node_count: usize,
|
||||
cur_ts: u64,
|
||||
mut filter: F,
|
||||
filters: VecDeque<RoutingTableEntryFilter>,
|
||||
compare: C,
|
||||
mut transform: T,
|
||||
transform: T,
|
||||
) -> Vec<O>
|
||||
where
|
||||
F: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> bool,
|
||||
C: FnMut(
|
||||
C: for<'a, 'b> FnMut(
|
||||
&'a RoutingTableInner,
|
||||
&'b (DHTKey, Option<Arc<BucketEntry>>),
|
||||
&'b (DHTKey, Option<Arc<BucketEntry>>),
|
||||
) -> core::cmp::Ordering,
|
||||
T: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O,
|
||||
T: for<'r> FnMut(&'r RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O + Send,
|
||||
{
|
||||
self.inner
|
||||
.read()
|
||||
.find_peers_with_sort_and_filter(node_count, cur_ts, filter, compare, transform)
|
||||
.find_peers_with_sort_and_filter(node_count, cur_ts, filters, compare, transform)
|
||||
}
|
||||
|
||||
pub fn find_fastest_nodes<'a, T, F, O>(
|
||||
pub fn find_fastest_nodes<'a, T, O>(
|
||||
&self,
|
||||
node_count: usize,
|
||||
mut filter: F,
|
||||
filters: VecDeque<RoutingTableEntryFilter>,
|
||||
transform: T,
|
||||
) -> Vec<O>
|
||||
where
|
||||
F: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> bool,
|
||||
T: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O,
|
||||
T: for<'r> FnMut(&'r RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O + Send,
|
||||
{
|
||||
self.inner
|
||||
.read()
|
||||
.find_fastest_nodes(node_count, filter, transform)
|
||||
.find_fastest_nodes(node_count, filters, transform)
|
||||
}
|
||||
|
||||
pub fn find_closest_nodes<'a, F, T, O>(
|
||||
pub fn find_closest_nodes<'a, T, O>(
|
||||
&self,
|
||||
node_id: DHTKey,
|
||||
filter: F,
|
||||
mut transform: T,
|
||||
filters: VecDeque<RoutingTableEntryFilter>,
|
||||
transform: T,
|
||||
) -> Vec<O>
|
||||
where
|
||||
F: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> bool,
|
||||
T: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O,
|
||||
T: for<'r> FnMut(&'r RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O + Send,
|
||||
{
|
||||
self.inner
|
||||
.read()
|
||||
.find_closest_nodes(node_id, filter, transform)
|
||||
.find_closest_nodes(node_id, filters, transform)
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip(self), ret)]
|
||||
pub fn register_find_node_answer(&self, peers: Vec<PeerInfo>) -> Vec<NodeRef> {
|
||||
let node_id = self.node_id();
|
||||
|
@@ -124,7 +124,7 @@ fn route_permutation_to_hop_cache(nodes: &[(DHTKey, NodeInfo)], perm: &[usize])
|
||||
|
||||
/// number of route permutations is the number of unique orderings
|
||||
/// for a set of nodes, given that the first node is fixed
|
||||
fn get_route_permutation_count(hop_count: usize) -> usize {
|
||||
fn _get_route_permutation_count(hop_count: usize) -> usize {
|
||||
if hop_count == 0 {
|
||||
unreachable!();
|
||||
}
|
||||
@@ -374,39 +374,43 @@ impl RouteSpecStore {
|
||||
|
||||
// Get list of all nodes, and sort them for selection
|
||||
let cur_ts = intf::get_timestamp();
|
||||
let filter = |rti, _k: DHTKey, v: Option<Arc<BucketEntry>>| -> bool {
|
||||
// Exclude our own node from routes
|
||||
if v.is_none() {
|
||||
return false;
|
||||
}
|
||||
let v = v.unwrap();
|
||||
let filter = Box::new(
|
||||
move |rti: &RoutingTableInner, _k: DHTKey, v: Option<Arc<BucketEntry>>| -> bool {
|
||||
// Exclude our own node from routes
|
||||
if v.is_none() {
|
||||
return false;
|
||||
}
|
||||
let v = v.unwrap();
|
||||
|
||||
// Exclude nodes on our local network
|
||||
let on_local_network = v.with(rti, |_rti, e| {
|
||||
e.node_info(RoutingDomain::LocalNetwork).is_some()
|
||||
});
|
||||
if on_local_network {
|
||||
return false;
|
||||
}
|
||||
// Exclude nodes on our local network
|
||||
let on_local_network = v.with(rti, |_rti, e| {
|
||||
e.node_info(RoutingDomain::LocalNetwork).is_some()
|
||||
});
|
||||
if on_local_network {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Exclude nodes with no publicinternet nodeinfo, or incompatible nodeinfo or node status won't route
|
||||
v.with(rti, |_rti, e| {
|
||||
let node_info_ok = if let Some(ni) = e.node_info(RoutingDomain::PublicInternet) {
|
||||
ni.has_sequencing_matched_dial_info(sequencing)
|
||||
} else {
|
||||
false
|
||||
};
|
||||
let node_status_ok = if let Some(ns) = e.node_status(RoutingDomain::PublicInternet)
|
||||
{
|
||||
ns.will_route()
|
||||
} else {
|
||||
false
|
||||
};
|
||||
// Exclude nodes with no publicinternet nodeinfo, or incompatible nodeinfo or node status won't route
|
||||
v.with(rti, move |_rti, e| {
|
||||
let node_info_ok = if let Some(ni) = e.node_info(RoutingDomain::PublicInternet)
|
||||
{
|
||||
ni.has_sequencing_matched_dial_info(sequencing)
|
||||
} else {
|
||||
false
|
||||
};
|
||||
let node_status_ok =
|
||||
if let Some(ns) = e.node_status(RoutingDomain::PublicInternet) {
|
||||
ns.will_route()
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
node_info_ok && node_status_ok
|
||||
})
|
||||
};
|
||||
let compare = |rti,
|
||||
node_info_ok && node_status_ok
|
||||
})
|
||||
},
|
||||
) as RoutingTableEntryFilter;
|
||||
let filters = VecDeque::from([filter]);
|
||||
let compare = |rti: &RoutingTableInner,
|
||||
v1: &(DHTKey, Option<Arc<BucketEntry>>),
|
||||
v2: &(DHTKey, Option<Arc<BucketEntry>>)|
|
||||
-> Ordering {
|
||||
@@ -461,7 +465,10 @@ impl RouteSpecStore {
|
||||
});
|
||||
cmpout
|
||||
};
|
||||
let transform = |rti, k: DHTKey, v: Option<Arc<BucketEntry>>| -> (DHTKey, NodeInfo) {
|
||||
let transform = |rti: &RoutingTableInner,
|
||||
k: DHTKey,
|
||||
v: Option<Arc<BucketEntry>>|
|
||||
-> (DHTKey, NodeInfo) {
|
||||
// Return the key and the nodeinfo for that key
|
||||
(
|
||||
k,
|
||||
@@ -479,7 +486,7 @@ impl RouteSpecStore {
|
||||
BucketEntryState::Unreliable,
|
||||
);
|
||||
let nodes =
|
||||
rti.find_peers_with_sort_and_filter(node_count, cur_ts, filter, compare, transform);
|
||||
rti.find_peers_with_sort_and_filter(node_count, cur_ts, filters, compare, transform);
|
||||
|
||||
// If we couldn't find enough nodes, wait until we have more nodes in the routing table
|
||||
if nodes.len() < hop_count {
|
||||
@@ -606,12 +613,49 @@ impl RouteSpecStore {
|
||||
Ok(Some(public_key))
|
||||
}
|
||||
|
||||
pub fn with_route_spec_detail<F, R>(&self, public_key: &DHTKey, f: F) -> Option<R>
|
||||
where
|
||||
F: FnOnce(&RouteSpecDetail) -> R,
|
||||
{
|
||||
let inner = self.inner.lock();
|
||||
Self::detail(&*inner, &public_key).map(f)
|
||||
pub fn validate_signatures(
|
||||
&self,
|
||||
public_key: &DHTKey,
|
||||
signatures: &[DHTSignature],
|
||||
data: &[u8],
|
||||
last_hop_id: DHTKey,
|
||||
) -> EyreResult<Option<(DHTKeySecret, SafetySelection)>> {
|
||||
let inner = &*self.inner.lock();
|
||||
let rsd = Self::detail(inner, &public_key).ok_or_else(|| eyre!("route does not exist"))?;
|
||||
|
||||
// Ensure we have the right number of signatures
|
||||
if signatures.len() != rsd.hops.len() - 1 {
|
||||
// Wrong number of signatures
|
||||
log_rpc!(debug "wrong number of signatures ({} should be {}) for routed operation on private route {}", signatures.len(), rsd.hops.len() - 1, public_key);
|
||||
return Ok(None);
|
||||
}
|
||||
// Validate signatures to ensure the route was handled by the nodes and not messed with
|
||||
for (hop_n, hop_public_key) in rsd.hops.iter().enumerate() {
|
||||
// The last hop is not signed, as the whole packet is signed
|
||||
if hop_n == signatures.len() {
|
||||
// Verify the node we received the routed operation from is the last hop in our route
|
||||
if *hop_public_key != last_hop_id {
|
||||
log_rpc!(debug "received routed operation from the wrong hop ({} should be {}) on private route {}", hop_public_key.encode(), last_hop_id.encode(), public_key);
|
||||
return Ok(None);
|
||||
}
|
||||
} else {
|
||||
// Verify a signature for a hop node along the route
|
||||
if let Err(e) = verify(hop_public_key, data, &signatures[hop_n]) {
|
||||
log_rpc!(debug "failed to verify signature for hop {} at {} on private route {}: {}", hop_n, hop_public_key, public_key, e);
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
// We got the correct signatures, return a key ans
|
||||
Ok(Some((
|
||||
rsd.secret_key,
|
||||
SafetySelection::Safe(SafetySpec {
|
||||
preferred_route: Some(*public_key),
|
||||
hop_count: rsd.hops.len(),
|
||||
stability: rsd.stability,
|
||||
sequencing: rsd.sequencing,
|
||||
}),
|
||||
)))
|
||||
}
|
||||
|
||||
pub fn release_route(&self, public_key: DHTKey) {
|
||||
|
@@ -2,7 +2,7 @@ use super::*;
|
||||
|
||||
/// Mechanism required to contact another node
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) enum ContactMethod {
|
||||
pub enum ContactMethod {
|
||||
/// Node is not reachable by any means
|
||||
Unreachable,
|
||||
/// Connection should have already existed
|
||||
|
@@ -34,7 +34,7 @@ pub struct RoutingTableInner {
|
||||
}
|
||||
|
||||
impl RoutingTableInner {
|
||||
pub fn new(unlocked_inner: Arc<RoutingTableUnlockedInner>) -> RoutingTableInner {
|
||||
pub(super) fn new(unlocked_inner: Arc<RoutingTableUnlockedInner>) -> RoutingTableInner {
|
||||
RoutingTableInner {
|
||||
unlocked_inner,
|
||||
buckets: Vec::new(),
|
||||
@@ -794,22 +794,16 @@ impl RoutingTableInner {
|
||||
// Find Nodes
|
||||
|
||||
// Retrieve the fastest nodes in the routing table matching an entry filter
|
||||
pub fn find_fast_public_nodes_filtered<F>(
|
||||
pub fn find_fast_public_nodes_filtered(
|
||||
&self,
|
||||
outer_self: RoutingTable,
|
||||
node_count: usize,
|
||||
mut entry_filter: F,
|
||||
) -> Vec<NodeRef>
|
||||
where
|
||||
F: FnMut(&RoutingTableInner, &BucketEntryInner) -> bool,
|
||||
{
|
||||
self.find_fastest_nodes(
|
||||
// count
|
||||
node_count,
|
||||
// filter
|
||||
|rti, _k: DHTKey, v: Option<Arc<BucketEntry>>| {
|
||||
mut filters: VecDeque<RoutingTableEntryFilter>,
|
||||
) -> Vec<NodeRef> {
|
||||
let public_node_filter = Box::new(
|
||||
|rti: &RoutingTableInner, _k: DHTKey, v: Option<Arc<BucketEntry>>| {
|
||||
let entry = v.unwrap();
|
||||
entry.with(rti, |rti, e| {
|
||||
entry.with(rti, |_rti, e| {
|
||||
// skip nodes on local network
|
||||
if e.node_info(RoutingDomain::LocalNetwork).is_some() {
|
||||
return false;
|
||||
@@ -818,12 +812,16 @@ impl RoutingTableInner {
|
||||
if e.node_info(RoutingDomain::PublicInternet).is_none() {
|
||||
return false;
|
||||
}
|
||||
// skip nodes that dont match entry filter
|
||||
entry_filter(rti, e)
|
||||
true
|
||||
})
|
||||
},
|
||||
// transform
|
||||
|_rti, k: DHTKey, v: Option<Arc<BucketEntry>>| {
|
||||
) as RoutingTableEntryFilter;
|
||||
filters.push_front(public_node_filter);
|
||||
|
||||
self.find_fastest_nodes(
|
||||
node_count,
|
||||
filters,
|
||||
|_rti: &RoutingTableInner, k: DHTKey, v: Option<Arc<BucketEntry>>| {
|
||||
NodeRef::new(outer_self.clone(), k, v.unwrap().clone(), None)
|
||||
},
|
||||
)
|
||||
@@ -858,37 +856,42 @@ impl RoutingTableInner {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn find_peers_with_sort_and_filter<'a, 'b, F, C, T, O>(
|
||||
pub fn find_peers_with_sort_and_filter<C, T, O>(
|
||||
&self,
|
||||
node_count: usize,
|
||||
cur_ts: u64,
|
||||
mut filter: F,
|
||||
compare: C,
|
||||
mut filters: VecDeque<RoutingTableEntryFilter>,
|
||||
mut compare: C,
|
||||
mut transform: T,
|
||||
) -> Vec<O>
|
||||
where
|
||||
F: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> bool,
|
||||
C: FnMut(
|
||||
C: for<'a, 'b> FnMut(
|
||||
&'a RoutingTableInner,
|
||||
&'b (DHTKey, Option<Arc<BucketEntry>>),
|
||||
&'b (DHTKey, Option<Arc<BucketEntry>>),
|
||||
) -> core::cmp::Ordering,
|
||||
T: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O,
|
||||
T: for<'r> FnMut(&'r RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O,
|
||||
{
|
||||
// collect all the nodes for sorting
|
||||
let mut nodes =
|
||||
Vec::<(DHTKey, Option<Arc<BucketEntry>>)>::with_capacity(self.bucket_entry_count + 1);
|
||||
|
||||
// add our own node (only one of there with the None entry)
|
||||
if filter(self, self.unlocked_inner.node_id, None) {
|
||||
nodes.push((self.unlocked_inner.node_id, None));
|
||||
for filter in &mut filters {
|
||||
if filter(self, self.unlocked_inner.node_id, None) {
|
||||
nodes.push((self.unlocked_inner.node_id, None));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// add all nodes from buckets
|
||||
self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, k, v| {
|
||||
// Apply filter
|
||||
if filter(rti, k, Some(v.clone())) {
|
||||
nodes.push((k, Some(v.clone())));
|
||||
for filter in &mut filters {
|
||||
if filter(rti, k, Some(v.clone())) {
|
||||
nodes.push((k, Some(v.clone())));
|
||||
break;
|
||||
}
|
||||
}
|
||||
Option::<()>::None
|
||||
});
|
||||
@@ -907,97 +910,99 @@ impl RoutingTableInner {
|
||||
out
|
||||
}
|
||||
|
||||
pub fn find_fastest_nodes<'a, T, F, O>(
|
||||
pub fn find_fastest_nodes<T, O>(
|
||||
&self,
|
||||
node_count: usize,
|
||||
mut filter: F,
|
||||
mut filters: VecDeque<RoutingTableEntryFilter>,
|
||||
transform: T,
|
||||
) -> Vec<O>
|
||||
where
|
||||
F: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> bool,
|
||||
T: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O,
|
||||
T: for<'r> FnMut(&'r RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O,
|
||||
{
|
||||
let cur_ts = intf::get_timestamp();
|
||||
let out = self.find_peers_with_sort_and_filter(
|
||||
node_count,
|
||||
cur_ts,
|
||||
// filter
|
||||
|rti, k, v| {
|
||||
|
||||
// Add filter to remove dead nodes always
|
||||
let filter_dead = Box::new(
|
||||
move |rti: &RoutingTableInner, _k: DHTKey, v: Option<Arc<BucketEntry>>| {
|
||||
if let Some(entry) = &v {
|
||||
// always filter out dead nodes
|
||||
if entry.with(rti, |_rti, e| e.state(cur_ts) == BucketEntryState::Dead) {
|
||||
false
|
||||
} else {
|
||||
filter(rti, k, v)
|
||||
true
|
||||
}
|
||||
} else {
|
||||
// always filter out self peer, as it is irrelevant to the 'fastest nodes' search
|
||||
false
|
||||
}
|
||||
},
|
||||
// sort
|
||||
|rti, (a_key, a_entry), (b_key, b_entry)| {
|
||||
// same nodes are always the same
|
||||
if a_key == b_key {
|
||||
return core::cmp::Ordering::Equal;
|
||||
}
|
||||
// our own node always comes last (should not happen, here for completeness)
|
||||
if a_entry.is_none() {
|
||||
return core::cmp::Ordering::Greater;
|
||||
}
|
||||
if b_entry.is_none() {
|
||||
return core::cmp::Ordering::Less;
|
||||
}
|
||||
// reliable nodes come first
|
||||
let ae = a_entry.as_ref().unwrap();
|
||||
let be = b_entry.as_ref().unwrap();
|
||||
ae.with(rti, |rti, ae| {
|
||||
be.with(rti, |_rti, be| {
|
||||
let ra = ae.check_reliable(cur_ts);
|
||||
let rb = be.check_reliable(cur_ts);
|
||||
if ra != rb {
|
||||
if ra {
|
||||
return core::cmp::Ordering::Less;
|
||||
} else {
|
||||
return core::cmp::Ordering::Greater;
|
||||
}
|
||||
}
|
||||
) as RoutingTableEntryFilter;
|
||||
filters.push_front(filter_dead);
|
||||
|
||||
// latency is the next metric, closer nodes first
|
||||
let a_latency = match ae.peer_stats().latency.as_ref() {
|
||||
None => {
|
||||
// treat unknown latency as slow
|
||||
return core::cmp::Ordering::Greater;
|
||||
}
|
||||
Some(l) => l,
|
||||
};
|
||||
let b_latency = match be.peer_stats().latency.as_ref() {
|
||||
None => {
|
||||
// treat unknown latency as slow
|
||||
return core::cmp::Ordering::Less;
|
||||
}
|
||||
Some(l) => l,
|
||||
};
|
||||
// Sort by average latency
|
||||
a_latency.average.cmp(&b_latency.average)
|
||||
})
|
||||
// Fastest sort
|
||||
let sort = |rti: &RoutingTableInner,
|
||||
(a_key, a_entry): &(DHTKey, Option<Arc<BucketEntry>>),
|
||||
(b_key, b_entry): &(DHTKey, Option<Arc<BucketEntry>>)| {
|
||||
// same nodes are always the same
|
||||
if a_key == b_key {
|
||||
return core::cmp::Ordering::Equal;
|
||||
}
|
||||
// our own node always comes last (should not happen, here for completeness)
|
||||
if a_entry.is_none() {
|
||||
return core::cmp::Ordering::Greater;
|
||||
}
|
||||
if b_entry.is_none() {
|
||||
return core::cmp::Ordering::Less;
|
||||
}
|
||||
// reliable nodes come first
|
||||
let ae = a_entry.as_ref().unwrap();
|
||||
let be = b_entry.as_ref().unwrap();
|
||||
ae.with(rti, |rti, ae| {
|
||||
be.with(rti, |_rti, be| {
|
||||
let ra = ae.check_reliable(cur_ts);
|
||||
let rb = be.check_reliable(cur_ts);
|
||||
if ra != rb {
|
||||
if ra {
|
||||
return core::cmp::Ordering::Less;
|
||||
} else {
|
||||
return core::cmp::Ordering::Greater;
|
||||
}
|
||||
}
|
||||
|
||||
// latency is the next metric, closer nodes first
|
||||
let a_latency = match ae.peer_stats().latency.as_ref() {
|
||||
None => {
|
||||
// treat unknown latency as slow
|
||||
return core::cmp::Ordering::Greater;
|
||||
}
|
||||
Some(l) => l,
|
||||
};
|
||||
let b_latency = match be.peer_stats().latency.as_ref() {
|
||||
None => {
|
||||
// treat unknown latency as slow
|
||||
return core::cmp::Ordering::Less;
|
||||
}
|
||||
Some(l) => l,
|
||||
};
|
||||
// Sort by average latency
|
||||
a_latency.average.cmp(&b_latency.average)
|
||||
})
|
||||
},
|
||||
// transform,
|
||||
transform,
|
||||
);
|
||||
})
|
||||
};
|
||||
|
||||
let out =
|
||||
self.find_peers_with_sort_and_filter(node_count, cur_ts, filters, sort, transform);
|
||||
out
|
||||
}
|
||||
|
||||
pub fn find_closest_nodes<'a, F, T, O>(
|
||||
pub fn find_closest_nodes<T, O>(
|
||||
&self,
|
||||
node_id: DHTKey,
|
||||
filter: F,
|
||||
mut transform: T,
|
||||
filters: VecDeque<RoutingTableEntryFilter>,
|
||||
transform: T,
|
||||
) -> Vec<O>
|
||||
where
|
||||
F: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> bool,
|
||||
T: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O,
|
||||
T: for<'r> FnMut(&'r RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O,
|
||||
{
|
||||
let cur_ts = intf::get_timestamp();
|
||||
let node_count = {
|
||||
@@ -1005,41 +1010,39 @@ impl RoutingTableInner {
|
||||
let c = config.get();
|
||||
c.network.dht.max_find_node_count as usize
|
||||
};
|
||||
let out = self.find_peers_with_sort_and_filter(
|
||||
node_count,
|
||||
cur_ts,
|
||||
// filter
|
||||
filter,
|
||||
// sort
|
||||
|rti, (a_key, a_entry), (b_key, b_entry)| {
|
||||
// same nodes are always the same
|
||||
if a_key == b_key {
|
||||
return core::cmp::Ordering::Equal;
|
||||
}
|
||||
|
||||
// reliable nodes come first, pessimistically treating our own node as unreliable
|
||||
let ra = a_entry
|
||||
.as_ref()
|
||||
.map_or(false, |x| x.with(rti, |_rti, x| x.check_reliable(cur_ts)));
|
||||
let rb = b_entry
|
||||
.as_ref()
|
||||
.map_or(false, |x| x.with(rti, |_rti, x| x.check_reliable(cur_ts)));
|
||||
if ra != rb {
|
||||
if ra {
|
||||
return core::cmp::Ordering::Less;
|
||||
} else {
|
||||
return core::cmp::Ordering::Greater;
|
||||
}
|
||||
}
|
||||
// closest sort
|
||||
let sort = |rti: &RoutingTableInner,
|
||||
(a_key, a_entry): &(DHTKey, Option<Arc<BucketEntry>>),
|
||||
(b_key, b_entry): &(DHTKey, Option<Arc<BucketEntry>>)| {
|
||||
// same nodes are always the same
|
||||
if a_key == b_key {
|
||||
return core::cmp::Ordering::Equal;
|
||||
}
|
||||
|
||||
// distance is the next metric, closer nodes first
|
||||
let da = distance(a_key, &node_id);
|
||||
let db = distance(b_key, &node_id);
|
||||
da.cmp(&db)
|
||||
},
|
||||
// transform,
|
||||
&mut transform,
|
||||
);
|
||||
// reliable nodes come first, pessimistically treating our own node as unreliable
|
||||
let ra = a_entry
|
||||
.as_ref()
|
||||
.map_or(false, |x| x.with(rti, |_rti, x| x.check_reliable(cur_ts)));
|
||||
let rb = b_entry
|
||||
.as_ref()
|
||||
.map_or(false, |x| x.with(rti, |_rti, x| x.check_reliable(cur_ts)));
|
||||
if ra != rb {
|
||||
if ra {
|
||||
return core::cmp::Ordering::Less;
|
||||
} else {
|
||||
return core::cmp::Ordering::Greater;
|
||||
}
|
||||
}
|
||||
|
||||
// distance is the next metric, closer nodes first
|
||||
let da = distance(a_key, &node_id);
|
||||
let db = distance(b_key, &node_id);
|
||||
da.cmp(&db)
|
||||
};
|
||||
|
||||
let out =
|
||||
self.find_peers_with_sort_and_filter(node_count, cur_ts, filters, sort, transform);
|
||||
log_rtab!(">> find_closest_nodes: node count = {}", out.len());
|
||||
out
|
||||
}
|
||||
|
Reference in New Issue
Block a user