fanout work

This commit is contained in:
Christien Rioux 2023-09-08 20:38:31 -04:00
parent aa490d5e65
commit 0aa7cf5ef2
10 changed files with 146 additions and 129 deletions

View File

@ -1,8 +1,10 @@
use super::*; use super::*;
impl RoutingTable { impl RoutingTable {
/// Utility to find all closest nodes to a particular key, including possibly our own node and nodes further away from the key than our own, returning their peer info /// Utility to find the closest nodes to a particular key, preferring reliable nodes first,
pub fn find_all_closest_peers( /// including possibly our own node and nodes further away from the key than our own,
/// returning their peer info
pub fn find_preferred_closest_peers(
&self, &self,
key: TypedKey, key: TypedKey,
capabilities: &[Capability], capabilities: &[Capability],
@ -49,7 +51,7 @@ impl RoutingTable {
}; };
let own_peer_info = self.get_own_peer_info(RoutingDomain::PublicInternet); let own_peer_info = self.get_own_peer_info(RoutingDomain::PublicInternet);
let closest_nodes = match self.find_closest_nodes( let closest_nodes = match self.find_preferred_closest_nodes(
node_count, node_count,
key, key,
filters, filters,
@ -68,9 +70,10 @@ impl RoutingTable {
NetworkResult::value(closest_nodes) NetworkResult::value(closest_nodes)
} }
/// Utility to find nodes that are closer to a key than our own node, returning their peer info /// Utility to find nodes that are closer to a key than our own node,
/// preferring reliable nodes first, and returning their peer info
/// Can filter based on a particular set of capabiltiies /// Can filter based on a particular set of capabiltiies
pub fn find_peers_closer_to_key( pub fn find_preferred_peers_closer_to_key(
&self, &self,
key: TypedKey, key: TypedKey,
required_capabilities: Vec<Capability>, required_capabilities: Vec<Capability>,
@ -126,7 +129,7 @@ impl RoutingTable {
}; };
// //
let closest_nodes = match self.find_closest_nodes( let closest_nodes = match self.find_preferred_closest_nodes(
node_count, node_count,
key, key,
filters, filters,

View File

@ -939,7 +939,7 @@ impl RoutingTable {
let filters = VecDeque::from([filter]); let filters = VecDeque::from([filter]);
self.find_fastest_nodes( self.find_preferred_fastest_nodes(
protocol_types_len * 2 * max_per_type, protocol_types_len * 2 * max_per_type,
filters, filters,
|_rti, entry: Option<Arc<BucketEntry>>| { |_rti, entry: Option<Arc<BucketEntry>>| {
@ -990,7 +990,7 @@ impl RoutingTable {
.find_peers_with_sort_and_filter(node_count, cur_ts, filters, compare, transform) .find_peers_with_sort_and_filter(node_count, cur_ts, filters, compare, transform)
} }
pub fn find_fastest_nodes<'a, T, O>( pub fn find_preferred_fastest_nodes<'a, T, O>(
&self, &self,
node_count: usize, node_count: usize,
filters: VecDeque<RoutingTableEntryFilter>, filters: VecDeque<RoutingTableEntryFilter>,
@ -1001,10 +1001,10 @@ impl RoutingTable {
{ {
self.inner self.inner
.read() .read()
.find_fastest_nodes(node_count, filters, transform) .find_preferred_fastest_nodes(node_count, filters, transform)
} }
pub fn find_closest_nodes<'a, T, O>( pub fn find_preferred_closest_nodes<'a, T, O>(
&self, &self,
node_count: usize, node_count: usize,
node_id: TypedKey, node_id: TypedKey,
@ -1016,14 +1016,14 @@ impl RoutingTable {
{ {
self.inner self.inner
.read() .read()
.find_closest_nodes(node_count, node_id, filters, transform) .find_preferred_closest_nodes(node_count, node_id, filters, transform)
} }
pub fn sort_and_clean_closest_noderefs( pub fn sort_and_clean_closest_noderefs(
&self, &self,
node_id: TypedKey, node_id: TypedKey,
closest_nodes: &mut Vec<NodeRef>, closest_nodes: &[NodeRef],
) { ) -> Vec<NodeRef> {
self.inner self.inner
.read() .read()
.sort_and_clean_closest_noderefs(node_id, closest_nodes) .sort_and_clean_closest_noderefs(node_id, closest_nodes)

View File

@ -963,7 +963,7 @@ impl RoutingTableInner {
}) as RoutingTableEntryFilter; }) as RoutingTableEntryFilter;
filters.push_front(public_node_filter); filters.push_front(public_node_filter);
self.find_fastest_nodes( self.find_preferred_fastest_nodes(
node_count, node_count,
filters, filters,
|_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| { |_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
@ -1062,7 +1062,7 @@ impl RoutingTableInner {
out out
} }
pub fn find_fastest_nodes<T, O>( pub fn find_preferred_fastest_nodes<T, O>(
&self, &self,
node_count: usize, node_count: usize,
mut filters: VecDeque<RoutingTableEntryFilter>, mut filters: VecDeque<RoutingTableEntryFilter>,
@ -1154,7 +1154,7 @@ impl RoutingTableInner {
out out
} }
pub fn find_closest_nodes<T, O>( pub fn find_preferred_closest_nodes<T, O>(
&self, &self,
node_count: usize, node_count: usize,
node_id: TypedKey, node_id: TypedKey,
@ -1242,8 +1242,8 @@ impl RoutingTableInner {
pub fn sort_and_clean_closest_noderefs( pub fn sort_and_clean_closest_noderefs(
&self, &self,
node_id: TypedKey, node_id: TypedKey,
closest_nodes: &mut Vec<NodeRef>, closest_nodes: &[NodeRef],
) { ) -> Vec<NodeRef> {
// Lock all noderefs // Lock all noderefs
let kind = node_id.kind; let kind = node_id.kind;
let mut closest_nodes_locked: Vec<NodeRefLocked> = closest_nodes let mut closest_nodes_locked: Vec<NodeRefLocked> = closest_nodes
@ -1263,7 +1263,7 @@ impl RoutingTableInner {
closest_nodes_locked.sort_by(sort); closest_nodes_locked.sort_by(sort);
// Unlock noderefs // Unlock noderefs
*closest_nodes = closest_nodes_locked.iter().map(|x| x.unlocked()).collect(); closest_nodes_locked.iter().map(|x| x.unlocked()).collect()
} }
} }
@ -1271,7 +1271,6 @@ fn make_closest_noderef_sort(
crypto: Crypto, crypto: Crypto,
node_id: TypedKey, node_id: TypedKey,
) -> impl Fn(&NodeRefLocked, &NodeRefLocked) -> core::cmp::Ordering { ) -> impl Fn(&NodeRefLocked, &NodeRefLocked) -> core::cmp::Ordering {
let cur_ts = get_aligned_timestamp();
let kind = node_id.kind; let kind = node_id.kind;
// Get cryptoversion to check distance with // Get cryptoversion to check distance with
let vcrypto = crypto.get(kind).unwrap(); let vcrypto = crypto.get(kind).unwrap();
@ -1282,19 +1281,8 @@ fn make_closest_noderef_sort(
return core::cmp::Ordering::Equal; return core::cmp::Ordering::Equal;
} }
// reliable nodes come first, pessimistically treating our own node as unreliable
a.operate(|_rti, a_entry| { a.operate(|_rti, a_entry| {
b.operate(|_rti, b_entry| { b.operate(|_rti, b_entry| {
let ra = a_entry.check_reliable(cur_ts);
let rb = b_entry.check_reliable(cur_ts);
if ra != rb {
if ra {
return core::cmp::Ordering::Less;
} else {
return core::cmp::Ordering::Greater;
}
}
// get keys // get keys
let a_key = a_entry.node_ids().get(kind).unwrap(); let a_key = a_entry.node_ids().get(kind).unwrap();
let b_key = b_entry.node_ids().get(kind).unwrap(); let b_key = b_entry.node_ids().get(kind).unwrap();

View File

@ -67,7 +67,7 @@ impl RoutingTable {
) as RoutingTableEntryFilter; ) as RoutingTableEntryFilter;
filters.push_front(filter); filters.push_front(filter);
let noderefs = routing_table.find_fastest_nodes( let noderefs = routing_table.find_preferred_fastest_nodes(
min_peer_count, min_peer_count,
filters, filters,
|_rti, entry: Option<Arc<BucketEntry>>| { |_rti, entry: Option<Arc<BucketEntry>>| {

View File

@ -4,8 +4,7 @@ struct FanoutContext<R>
where where
R: Unpin, R: Unpin,
{ {
closest_nodes: Vec<NodeRef>, fanout_queue: FanoutQueue,
called_nodes: HashSet<TypedKey>,
result: Option<Result<R, RPCError>>, result: Option<Result<R, RPCError>>,
} }
@ -72,8 +71,7 @@ where
check_done: D, check_done: D,
) -> Arc<Self> { ) -> Arc<Self> {
let context = Mutex::new(FanoutContext { let context = Mutex::new(FanoutContext {
closest_nodes: Vec::with_capacity(node_count), fanout_queue: FanoutQueue::new(node_id.kind),
called_nodes: HashSet::new(),
result: None, result: None,
}); });
@ -91,82 +89,44 @@ where
}) })
} }
fn add_new_nodes(self: Arc<Self>, new_nodes: Vec<NodeRef>) { fn evaluate_done(self: Arc<Self>, ctx: &mut FanoutContext<R>) -> bool {
let mut ctx = self.context.lock();
for nn in new_nodes {
// Make sure the new node isnt already in the list
let mut dup = false;
for cn in &ctx.closest_nodes {
if cn.same_entry(&nn) {
dup = true;
break;
}
}
if !dup {
// Add the new node if we haven't already called it before (only one call per node ever)
if let Some(key) = nn.node_ids().get(self.crypto_kind) {
if !ctx.called_nodes.contains(&key) {
ctx.closest_nodes.push(nn.clone());
}
}
}
}
self.routing_table
.sort_and_clean_closest_noderefs(self.node_id, &mut ctx.closest_nodes);
ctx.closest_nodes.truncate(self.node_count);
}
fn remove_node(self: Arc<Self>, dead_node: NodeRef) {
let mut ctx = self.context.lock();
for n in 0..ctx.closest_nodes.len() {
let cn = &ctx.closest_nodes[n];
if cn.same_entry(&dead_node) {
ctx.closest_nodes.remove(n);
break;
}
}
}
fn get_next_node(self: Arc<Self>) -> Option<NodeRef> {
let mut next_node = None;
let mut ctx = self.context.lock();
for cn in ctx.closest_nodes.clone() {
if let Some(key) = cn.node_ids().get(self.crypto_kind) {
if !ctx.called_nodes.contains(&key) {
// New fanout call candidate found
next_node = Some(cn.clone());
ctx.called_nodes.insert(key);
break;
}
}
}
next_node
}
fn evaluate_done(self: Arc<Self>) -> bool {
let mut ctx = self.context.lock();
// If we have a result, then we're done // If we have a result, then we're done
if ctx.result.is_some() { if ctx.result.is_some() {
return true; return true;
} }
// Check for a new done result // Check for a new done result
ctx.result = (self.check_done)(&ctx.closest_nodes).map(|o| Ok(o)); ctx.result = (self.check_done)(ctx.fanout_queue.nodes()).map(|o| Ok(o));
ctx.result.is_some() ctx.result.is_some()
} }
fn add_to_fanout_queue(self: Arc<Self>, new_nodes: &[NodeRef]) {
let ctx = &mut *self.context.lock();
let this = self.clone();
ctx.fanout_queue.add(&new_nodes, |current_nodes| {
let mut current_nodes_vec = this
.routing_table
.sort_and_clean_closest_noderefs(this.node_id, current_nodes);
current_nodes_vec.truncate(self.node_count);
current_nodes_vec
});
}
async fn fanout_processor(self: Arc<Self>) { async fn fanout_processor(self: Arc<Self>) {
// Check to see if we have a result or are done // Loop until we have a result or are done
while !self.clone().evaluate_done() { loop {
// Get the closest node we haven't processed yet // Get the closest node we haven't processed yet if we're not done yet
let next_node = self.clone().get_next_node(); let next_node = {
let mut ctx = self.context.lock();
if self.clone().evaluate_done(&mut ctx) {
break;
}
self.context.lock().fanout_queue.next()
};
// If we don't have a node to process, stop fanning out // If we don't have a node to process, stop fanning out
let Some(next_node) = next_node else { let Some(next_node) = next_node else {
return; break;
}; };
// Do the call for this node // Do the call for this node
@ -188,20 +148,18 @@ where
.collect(); .collect();
// Call succeeded // Call succeeded
// Register the returned nodes and add them to the closest nodes list in sorted order // Register the returned nodes and add them to the fanout queue in sorted order
let new_nodes = self let new_nodes = self
.routing_table .routing_table
.register_find_node_answer(self.crypto_kind, filtered_v); .register_find_node_answer(self.crypto_kind, filtered_v);
self.clone().add_new_nodes(new_nodes); self.clone().add_to_fanout_queue(&new_nodes);
} }
Ok(None) => { Ok(None) => {
// Call failed, remove the node so it isn't considered as part of the fanout // Call failed, node will node be considered again
self.clone().remove_node(next_node);
} }
Err(e) => { Err(e) => {
// Error happened, abort everything and return the error // Error happened, abort everything and return the error
let mut ctx = self.context.lock(); self.context.lock().result = Some(Err(e));
ctx.result = Some(Err(e));
return; return;
} }
}; };
@ -231,7 +189,7 @@ where
return false; return false;
} }
// Check our node info ilter // Check our node info filter
let node_ids = e.node_ids().to_vec(); let node_ids = e.node_ids().to_vec();
if !(node_info_filter)(&node_ids, signed_node_info.node_info()) { if !(node_info_filter)(&node_ids, signed_node_info.node_info()) {
return false; return false;
@ -248,12 +206,10 @@ where
}; };
routing_table routing_table
.find_closest_nodes(self.node_count, self.node_id, filters, transform) .find_preferred_closest_nodes(self.node_count, self.node_id, filters, transform)
.map_err(RPCError::invalid_format)? .map_err(RPCError::invalid_format)?
}; };
self.clone().add_to_fanout_queue(&closest_nodes);
let mut ctx = self.context.lock();
ctx.closest_nodes = closest_nodes;
Ok(()) Ok(())
} }
@ -272,9 +228,11 @@ where
} }
// Do a quick check to see if we're already done // Do a quick check to see if we're already done
if self.clone().evaluate_done() { {
let mut ctx = self.context.lock(); let mut ctx = self.context.lock();
return TimeoutOr::value(ctx.result.take().transpose()); if self.clone().evaluate_done(&mut ctx) {
return TimeoutOr::value(ctx.result.take().transpose());
}
} }
// If not, do the fanout // If not, do the fanout
@ -287,19 +245,12 @@ where
} }
} }
// Wait for them to complete // Wait for them to complete
timeout(timeout_ms, async { timeout(timeout_ms, async { while unord.next().await.is_some() {} })
while let Some(_) = unord.next().await { .await
if self.clone().evaluate_done() { .into_timeout_or()
break; .map(|_| {
} // Finished, return whatever value we came up with
} self.context.lock().result.take().transpose()
}) })
.await
.into_timeout_or()
.map(|_| {
// Finished, return whatever value we came up with
let mut ctx = self.context.lock();
ctx.result.take().transpose()
})
} }
} }

View File

@ -0,0 +1,73 @@
use super::*;
pub struct FanoutQueue {
crypto_kind: CryptoKind,
current_nodes: VecDeque<NodeRef>,
returned_nodes: HashSet<TypedKey>,
}
impl FanoutQueue {
// Create a queue for fanout candidates that have a crypto-kind compatible node id
pub fn new(crypto_kind: CryptoKind) -> Self {
Self {
crypto_kind,
current_nodes: VecDeque::new(),
returned_nodes: HashSet::new(),
}
}
// Add new nodes to list of fanout candidates
// Run a cleanup routine afterwards to trim down the list of candidates so it doesn't grow too large
pub fn add<F: FnOnce(&[NodeRef]) -> Vec<NodeRef>>(
&mut self,
new_nodes: &[NodeRef],
cleanup: F,
) {
for nn in new_nodes {
// Ensure the node has a comparable key with our current crypto kind
let Some(key) = nn.node_ids().get(self.crypto_kind) else {
continue;
};
// Check if we have already done this node before (only one call per node ever)
if self.returned_nodes.contains(&key) {
continue;
}
// Make sure the new node isnt already in the list
let mut dup = false;
for cn in &self.current_nodes {
if cn.same_entry(nn) {
dup = true;
break;
}
}
if !dup {
// Add the new node
self.current_nodes.push_front(nn.clone());
}
}
// Make sure the deque is a single slice
self.current_nodes.make_contiguous();
// Sort and trim the candidate set
self.current_nodes =
VecDeque::from_iter(cleanup(self.current_nodes.as_slices().0).iter().cloned());
}
// Return next fanout candidate
pub fn next(&mut self) -> Option<NodeRef> {
let cn = self.current_nodes.pop_front()?;
self.current_nodes.make_contiguous();
let key = cn.node_ids().get(self.crypto_kind).unwrap();
// Ensure we don't return this node again
self.returned_nodes.insert(key);
Some(cn)
}
// Get a slice of all the current fanout candidates
pub fn nodes(&self) -> &[NodeRef] {
self.current_nodes.as_slices().0
}
}

View File

@ -1,6 +1,7 @@
mod coders; mod coders;
mod destination; mod destination;
mod fanout_call; mod fanout_call;
mod fanout_queue;
mod operation_waiter; mod operation_waiter;
mod rpc_app_call; mod rpc_app_call;
mod rpc_app_message; mod rpc_app_message;
@ -31,6 +32,7 @@ mod rpc_start_tunnel;
pub use coders::*; pub use coders::*;
pub use destination::*; pub use destination::*;
pub use fanout_call::*; pub use fanout_call::*;
pub use fanout_queue::*;
pub use operation_waiter::*; pub use operation_waiter::*;
pub use rpc_error::*; pub use rpc_error::*;
pub use rpc_status::*; pub use rpc_status::*;

View File

@ -105,7 +105,7 @@ impl RPCProcessor {
// Get a chunk of the routing table near the requested node id // Get a chunk of the routing table near the requested node id
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let closest_nodes = let closest_nodes =
network_result_try!(routing_table.find_all_closest_peers(node_id, &capabilities)); network_result_try!(routing_table.find_preferred_closest_peers(node_id, &capabilities));
// Make FindNode answer // Make FindNode answer
let find_node_a = RPCOperationFindNodeA::new(closest_nodes)?; let find_node_a = RPCOperationFindNodeA::new(closest_nodes)?;

View File

@ -201,7 +201,7 @@ impl RPCProcessor {
// Get the nodes that we know about that are closer to the the key than our own node // Get the nodes that we know about that are closer to the the key than our own node
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let closer_to_key_peers = network_result_try!(routing_table.find_peers_closer_to_key(key, vec![CAP_DHT])); let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT]));
let debug_string = format!( let debug_string = format!(
"IN <=== GetValueQ({} #{}{}) <== {}", "IN <=== GetValueQ({} #{}{}) <== {}",

View File

@ -213,7 +213,7 @@ impl RPCProcessor {
// Get the nodes that we know about that are closer to the the key than our own node // Get the nodes that we know about that are closer to the the key than our own node
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let closer_to_key_peers = network_result_try!(routing_table.find_peers_closer_to_key(key, vec![CAP_DHT])); let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT]));
let debug_string = format!( let debug_string = format!(
"IN <=== SetValueQ({} #{} len={} seq={} writer={}{}) <== {}", "IN <=== SetValueQ({} #{} len={} seq={} writer={}{}) <== {}",