proper node info filter for fanout

This commit is contained in:
Christien Rioux 2023-07-20 17:52:45 -04:00
parent 57b5de0639
commit 3224a315c3
5 changed files with 59 additions and 10 deletions

View File

@ -10,6 +10,15 @@ where
} }
pub type FanoutCallReturnType = Result<Option<Vec<PeerInfo>>, RPCError>; pub type FanoutCallReturnType = Result<Option<Vec<PeerInfo>>, RPCError>;
pub type FanoutNodeInfoFilter = Arc<dyn Fn(&[TypedKey], &NodeInfo) -> bool + Send + Sync>;
pub fn empty_fanout_node_info_filter() -> FanoutNodeInfoFilter {
Arc::new(|_, _| true)
}
pub fn capability_fanout_node_info_filter(caps: Vec<Capability>) -> FanoutNodeInfoFilter {
Arc::new(move |_, ni| ni.has_capabilities(&caps))
}
/// Contains the logic for generically searing the Veilid routing table for a set of nodes and applying an /// Contains the logic for generically searing the Veilid routing table for a set of nodes and applying an
/// RPC operation that eventually converges on satisfactory result, or times out and returns some /// RPC operation that eventually converges on satisfactory result, or times out and returns some
@ -40,6 +49,7 @@ where
node_count: usize, node_count: usize,
fanout: usize, fanout: usize,
timeout_us: TimestampDuration, timeout_us: TimestampDuration,
node_info_filter: FanoutNodeInfoFilter,
call_routine: C, call_routine: C,
check_done: D, check_done: D,
} }
@ -57,6 +67,7 @@ where
node_count: usize, node_count: usize,
fanout: usize, fanout: usize,
timeout_us: TimestampDuration, timeout_us: TimestampDuration,
node_info_filter: FanoutNodeInfoFilter,
call_routine: C, call_routine: C,
check_done: D, check_done: D,
) -> Arc<Self> { ) -> Arc<Self> {
@ -74,6 +85,7 @@ where
node_count, node_count,
fanout, fanout,
timeout_us, timeout_us,
node_info_filter,
call_routine, call_routine,
check_done, check_done,
}) })
@ -160,11 +172,26 @@ where
// Do the call for this node // Do the call for this node
match (self.call_routine)(next_node.clone()).await { match (self.call_routine)(next_node.clone()).await {
Ok(Some(v)) => { Ok(Some(v)) => {
// Filter returned nodes
let filtered_v: Vec<PeerInfo> = v
.into_iter()
.filter(|pi| {
let node_ids = pi.node_ids().to_vec();
if !(self.node_info_filter)(
&node_ids,
pi.signed_node_info().node_info(),
) {
return false;
}
true
})
.collect();
// Call succeeded // Call succeeded
// Register the returned nodes and add them to the closest nodes list in sorted order // Register the returned nodes and add them to the closest nodes list in sorted order
let new_nodes = self let new_nodes = self
.routing_table .routing_table
.register_find_node_answer(self.crypto_kind, v); .register_find_node_answer(self.crypto_kind, filtered_v);
self.clone().add_new_nodes(new_nodes); self.clone().add_new_nodes(new_nodes);
} }
Ok(None) => { Ok(None) => {
@ -185,20 +212,33 @@ where
// Get the 'node_count' closest nodes to the key out of our routing table // Get the 'node_count' closest nodes to the key out of our routing table
let closest_nodes = { let closest_nodes = {
let routing_table = self.routing_table.clone(); let routing_table = self.routing_table.clone();
let node_info_filter = self.node_info_filter.clone();
let filter = Box::new( let filter = Box::new(
move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| { move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
// Exclude our own node // Exclude our own node
if opt_entry.is_none() { if opt_entry.is_none() {
return false; return false;
} }
let entry = opt_entry.unwrap();
// Ensure only things that are valid/signed in the PublicInternet domain are returned // Filter entries
rti.filter_has_valid_signed_node_info( entry.with(rti, |_rti, e| {
RoutingDomain::PublicInternet, let Some(signed_node_info) = e.signed_node_info(RoutingDomain::PublicInternet) else {
true, return false;
opt_entry, };
) // Ensure only things that are valid/signed in the PublicInternet domain are returned
if !signed_node_info.has_any_signature() {
return false;
}
// Check our node info ilter
let node_ids = e.node_ids().to_vec();
if !(node_info_filter)(&node_ids, signed_node_info.node_info()) {
return false;
}
true
})
}, },
) as RoutingTableEntryFilter; ) as RoutingTableEntryFilter;
let filters = VecDeque::from([filter]); let filters = VecDeque::from([filter]);

View File

@ -510,6 +510,7 @@ impl RPCProcessor {
count, count,
fanout, fanout,
timeout_us, timeout_us,
empty_fanout_node_info_filter(),
call_routine, call_routine,
check_done, check_done,
); );

View File

@ -136,7 +136,9 @@ impl StorageManager {
} }
// Return peers if we have some // Return peers if we have some
log_stor!(debug "Fanout call returned peers {}", gva.answer.peers.len()); #[cfg(feature="network-result-extra")]
log_stor!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len());
Ok(Some(gva.answer.peers)) Ok(Some(gva.answer.peers))
} }
}; };
@ -158,6 +160,7 @@ impl StorageManager {
key_count, key_count,
fanout, fanout,
timeout_us, timeout_us,
capability_fanout_node_info_filter(vec![CAP_DHT]),
call_routine, call_routine,
check_done, check_done,
); );

View File

@ -18,7 +18,8 @@ use storage_manager_inner::*;
pub use types::*; pub use types::*;
use super::*; use super::*;
use crate::rpc_processor::*; use routing_table::*;
use rpc_processor::*;
/// The maximum size of a single subkey /// The maximum size of a single subkey
const MAX_SUBKEY_SIZE: usize = ValueData::MAX_LEN; const MAX_SUBKEY_SIZE: usize = ValueData::MAX_LEN;

View File

@ -116,6 +116,9 @@ impl StorageManager {
} }
// Return peers if we have some // Return peers if we have some
#[cfg(feature="network-result-extra")]
log_stor!(debug "SetValue fanout call returned peers {}", sva.answer.peers.len());
Ok(Some(sva.answer.peers)) Ok(Some(sva.answer.peers))
} }
}; };
@ -137,6 +140,7 @@ impl StorageManager {
key_count, key_count,
fanout, fanout,
timeout_us, timeout_us,
capability_fanout_node_info_filter(vec![CAP_DHT]),
call_routine, call_routine,
check_done, check_done,
); );