Merge branch 'fanout-work' into 'main'
Fanout Work Closes #287, #304, and #249 See merge request veilid/veilid!179
This commit is contained in:
commit
f7ee2635b3
@ -116,7 +116,7 @@ impl DiscoveryContext {
|
|||||||
);
|
);
|
||||||
|
|
||||||
log_net!(
|
log_net!(
|
||||||
"request_public_address {:?}: Value({:?})",
|
debug "request_public_address {:?}: Value({:?})",
|
||||||
node_ref,
|
node_ref,
|
||||||
res.answer
|
res.answer
|
||||||
);
|
);
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
impl RoutingTable {
|
impl RoutingTable {
|
||||||
/// Utility to find all closest nodes to a particular key, including possibly our own node and nodes further away from the key than our own, returning their peer info
|
/// Utility to find the closest nodes to a particular key, preferring reliable nodes first,
|
||||||
pub fn find_all_closest_peers(
|
/// including possibly our own node and nodes further away from the key than our own,
|
||||||
|
/// returning their peer info
|
||||||
|
pub fn find_preferred_closest_peers(
|
||||||
&self,
|
&self,
|
||||||
key: TypedKey,
|
key: TypedKey,
|
||||||
capabilities: &[Capability],
|
capabilities: &[Capability],
|
||||||
@ -49,7 +51,7 @@ impl RoutingTable {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let own_peer_info = self.get_own_peer_info(RoutingDomain::PublicInternet);
|
let own_peer_info = self.get_own_peer_info(RoutingDomain::PublicInternet);
|
||||||
let closest_nodes = match self.find_closest_nodes(
|
let closest_nodes = match self.find_preferred_closest_nodes(
|
||||||
node_count,
|
node_count,
|
||||||
key,
|
key,
|
||||||
filters,
|
filters,
|
||||||
@ -68,9 +70,10 @@ impl RoutingTable {
|
|||||||
NetworkResult::value(closest_nodes)
|
NetworkResult::value(closest_nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Utility to find nodes that are closer to a key than our own node, returning their peer info
|
/// Utility to find nodes that are closer to a key than our own node,
|
||||||
|
/// preferring reliable nodes first, and returning their peer info
|
||||||
/// Can filter based on a particular set of capabiltiies
|
/// Can filter based on a particular set of capabiltiies
|
||||||
pub fn find_peers_closer_to_key(
|
pub fn find_preferred_peers_closer_to_key(
|
||||||
&self,
|
&self,
|
||||||
key: TypedKey,
|
key: TypedKey,
|
||||||
required_capabilities: Vec<Capability>,
|
required_capabilities: Vec<Capability>,
|
||||||
@ -126,7 +129,7 @@ impl RoutingTable {
|
|||||||
};
|
};
|
||||||
|
|
||||||
//
|
//
|
||||||
let closest_nodes = match self.find_closest_nodes(
|
let closest_nodes = match self.find_preferred_closest_nodes(
|
||||||
node_count,
|
node_count,
|
||||||
key,
|
key,
|
||||||
filters,
|
filters,
|
||||||
|
@ -54,6 +54,9 @@ const ROUTING_TABLE: &str = "routing_table";
|
|||||||
const SERIALIZED_BUCKET_MAP: &[u8] = b"serialized_bucket_map";
|
const SERIALIZED_BUCKET_MAP: &[u8] = b"serialized_bucket_map";
|
||||||
const CACHE_VALIDITY_KEY: &[u8] = b"cache_validity_key";
|
const CACHE_VALIDITY_KEY: &[u8] = b"cache_validity_key";
|
||||||
|
|
||||||
|
// Critical sections
|
||||||
|
const LOCK_TAG_TICK: &str = "TICK";
|
||||||
|
|
||||||
pub type LowLevelProtocolPorts = BTreeSet<(LowLevelProtocolType, AddressType, u16)>;
|
pub type LowLevelProtocolPorts = BTreeSet<(LowLevelProtocolType, AddressType, u16)>;
|
||||||
pub type ProtocolToPortMapping = BTreeMap<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>;
|
pub type ProtocolToPortMapping = BTreeMap<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>;
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
@ -939,7 +942,7 @@ impl RoutingTable {
|
|||||||
|
|
||||||
let filters = VecDeque::from([filter]);
|
let filters = VecDeque::from([filter]);
|
||||||
|
|
||||||
self.find_fastest_nodes(
|
self.find_preferred_fastest_nodes(
|
||||||
protocol_types_len * 2 * max_per_type,
|
protocol_types_len * 2 * max_per_type,
|
||||||
filters,
|
filters,
|
||||||
|_rti, entry: Option<Arc<BucketEntry>>| {
|
|_rti, entry: Option<Arc<BucketEntry>>| {
|
||||||
@ -990,7 +993,7 @@ impl RoutingTable {
|
|||||||
.find_peers_with_sort_and_filter(node_count, cur_ts, filters, compare, transform)
|
.find_peers_with_sort_and_filter(node_count, cur_ts, filters, compare, transform)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn find_fastest_nodes<'a, T, O>(
|
pub fn find_preferred_fastest_nodes<'a, T, O>(
|
||||||
&self,
|
&self,
|
||||||
node_count: usize,
|
node_count: usize,
|
||||||
filters: VecDeque<RoutingTableEntryFilter>,
|
filters: VecDeque<RoutingTableEntryFilter>,
|
||||||
@ -1001,10 +1004,10 @@ impl RoutingTable {
|
|||||||
{
|
{
|
||||||
self.inner
|
self.inner
|
||||||
.read()
|
.read()
|
||||||
.find_fastest_nodes(node_count, filters, transform)
|
.find_preferred_fastest_nodes(node_count, filters, transform)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn find_closest_nodes<'a, T, O>(
|
pub fn find_preferred_closest_nodes<'a, T, O>(
|
||||||
&self,
|
&self,
|
||||||
node_count: usize,
|
node_count: usize,
|
||||||
node_id: TypedKey,
|
node_id: TypedKey,
|
||||||
@ -1016,14 +1019,14 @@ impl RoutingTable {
|
|||||||
{
|
{
|
||||||
self.inner
|
self.inner
|
||||||
.read()
|
.read()
|
||||||
.find_closest_nodes(node_count, node_id, filters, transform)
|
.find_preferred_closest_nodes(node_count, node_id, filters, transform)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn sort_and_clean_closest_noderefs(
|
pub fn sort_and_clean_closest_noderefs(
|
||||||
&self,
|
&self,
|
||||||
node_id: TypedKey,
|
node_id: TypedKey,
|
||||||
closest_nodes: &mut Vec<NodeRef>,
|
closest_nodes: &[NodeRef],
|
||||||
) {
|
) -> Vec<NodeRef> {
|
||||||
self.inner
|
self.inner
|
||||||
.read()
|
.read()
|
||||||
.sort_and_clean_closest_noderefs(node_id, closest_nodes)
|
.sort_and_clean_closest_noderefs(node_id, closest_nodes)
|
||||||
|
@ -129,9 +129,11 @@ impl RoutingDomainEditor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Briefly pause routing table ticker while changes are made
|
// Briefly pause routing table ticker while changes are made
|
||||||
if pause_tasks {
|
let _tick_guard = if pause_tasks {
|
||||||
self.routing_table.pause_tasks(true).await;
|
Some(self.routing_table.pause_tasks().await)
|
||||||
}
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
// Apply changes
|
// Apply changes
|
||||||
let mut changed = false;
|
let mut changed = false;
|
||||||
@ -262,8 +264,5 @@ impl RoutingDomainEditor {
|
|||||||
rss.reset();
|
rss.reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unpause routing table ticker
|
|
||||||
self.routing_table.pause_tasks(false).await;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ use super::*;
|
|||||||
use weak_table::PtrWeakHashSet;
|
use weak_table::PtrWeakHashSet;
|
||||||
|
|
||||||
const RECENT_PEERS_TABLE_SIZE: usize = 64;
|
const RECENT_PEERS_TABLE_SIZE: usize = 64;
|
||||||
|
|
||||||
pub type EntryCounts = BTreeMap<(RoutingDomain, CryptoKind), usize>;
|
pub type EntryCounts = BTreeMap<(RoutingDomain, CryptoKind), usize>;
|
||||||
//////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
@ -34,8 +35,9 @@ pub struct RoutingTableInner {
|
|||||||
pub(super) recent_peers: LruCache<TypedKey, RecentPeersEntry>,
|
pub(super) recent_peers: LruCache<TypedKey, RecentPeersEntry>,
|
||||||
/// Storage for private/safety RouteSpecs
|
/// Storage for private/safety RouteSpecs
|
||||||
pub(super) route_spec_store: Option<RouteSpecStore>,
|
pub(super) route_spec_store: Option<RouteSpecStore>,
|
||||||
/// Tick paused or not
|
/// Async tagged critical sections table
|
||||||
pub(super) tick_paused: bool,
|
/// Tag: "tick" -> in ticker
|
||||||
|
pub(super) critical_sections: AsyncTagLockTable<&'static str>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RoutingTableInner {
|
impl RoutingTableInner {
|
||||||
@ -52,7 +54,7 @@ impl RoutingTableInner {
|
|||||||
self_transfer_stats: TransferStatsDownUp::default(),
|
self_transfer_stats: TransferStatsDownUp::default(),
|
||||||
recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE),
|
recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE),
|
||||||
route_spec_store: None,
|
route_spec_store: None,
|
||||||
tick_paused: false,
|
critical_sections: AsyncTagLockTable::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -963,7 +965,7 @@ impl RoutingTableInner {
|
|||||||
}) as RoutingTableEntryFilter;
|
}) as RoutingTableEntryFilter;
|
||||||
filters.push_front(public_node_filter);
|
filters.push_front(public_node_filter);
|
||||||
|
|
||||||
self.find_fastest_nodes(
|
self.find_preferred_fastest_nodes(
|
||||||
node_count,
|
node_count,
|
||||||
filters,
|
filters,
|
||||||
|_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
|
|_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
|
||||||
@ -1062,7 +1064,7 @@ impl RoutingTableInner {
|
|||||||
out
|
out
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn find_fastest_nodes<T, O>(
|
pub fn find_preferred_fastest_nodes<T, O>(
|
||||||
&self,
|
&self,
|
||||||
node_count: usize,
|
node_count: usize,
|
||||||
mut filters: VecDeque<RoutingTableEntryFilter>,
|
mut filters: VecDeque<RoutingTableEntryFilter>,
|
||||||
@ -1154,7 +1156,7 @@ impl RoutingTableInner {
|
|||||||
out
|
out
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn find_closest_nodes<T, O>(
|
pub fn find_preferred_closest_nodes<T, O>(
|
||||||
&self,
|
&self,
|
||||||
node_count: usize,
|
node_count: usize,
|
||||||
node_id: TypedKey,
|
node_id: TypedKey,
|
||||||
@ -1242,8 +1244,8 @@ impl RoutingTableInner {
|
|||||||
pub fn sort_and_clean_closest_noderefs(
|
pub fn sort_and_clean_closest_noderefs(
|
||||||
&self,
|
&self,
|
||||||
node_id: TypedKey,
|
node_id: TypedKey,
|
||||||
closest_nodes: &mut Vec<NodeRef>,
|
closest_nodes: &[NodeRef],
|
||||||
) {
|
) -> Vec<NodeRef> {
|
||||||
// Lock all noderefs
|
// Lock all noderefs
|
||||||
let kind = node_id.kind;
|
let kind = node_id.kind;
|
||||||
let mut closest_nodes_locked: Vec<NodeRefLocked> = closest_nodes
|
let mut closest_nodes_locked: Vec<NodeRefLocked> = closest_nodes
|
||||||
@ -1263,7 +1265,7 @@ impl RoutingTableInner {
|
|||||||
closest_nodes_locked.sort_by(sort);
|
closest_nodes_locked.sort_by(sort);
|
||||||
|
|
||||||
// Unlock noderefs
|
// Unlock noderefs
|
||||||
*closest_nodes = closest_nodes_locked.iter().map(|x| x.unlocked()).collect();
|
closest_nodes_locked.iter().map(|x| x.unlocked()).collect()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1271,7 +1273,6 @@ fn make_closest_noderef_sort(
|
|||||||
crypto: Crypto,
|
crypto: Crypto,
|
||||||
node_id: TypedKey,
|
node_id: TypedKey,
|
||||||
) -> impl Fn(&NodeRefLocked, &NodeRefLocked) -> core::cmp::Ordering {
|
) -> impl Fn(&NodeRefLocked, &NodeRefLocked) -> core::cmp::Ordering {
|
||||||
let cur_ts = get_aligned_timestamp();
|
|
||||||
let kind = node_id.kind;
|
let kind = node_id.kind;
|
||||||
// Get cryptoversion to check distance with
|
// Get cryptoversion to check distance with
|
||||||
let vcrypto = crypto.get(kind).unwrap();
|
let vcrypto = crypto.get(kind).unwrap();
|
||||||
@ -1282,19 +1283,8 @@ fn make_closest_noderef_sort(
|
|||||||
return core::cmp::Ordering::Equal;
|
return core::cmp::Ordering::Equal;
|
||||||
}
|
}
|
||||||
|
|
||||||
// reliable nodes come first, pessimistically treating our own node as unreliable
|
|
||||||
a.operate(|_rti, a_entry| {
|
a.operate(|_rti, a_entry| {
|
||||||
b.operate(|_rti, b_entry| {
|
b.operate(|_rti, b_entry| {
|
||||||
let ra = a_entry.check_reliable(cur_ts);
|
|
||||||
let rb = b_entry.check_reliable(cur_ts);
|
|
||||||
if ra != rb {
|
|
||||||
if ra {
|
|
||||||
return core::cmp::Ordering::Less;
|
|
||||||
} else {
|
|
||||||
return core::cmp::Ordering::Greater;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// get keys
|
// get keys
|
||||||
let a_key = a_entry.node_ids().get(kind).unwrap();
|
let a_key = a_entry.node_ids().get(kind).unwrap();
|
||||||
let b_key = b_entry.node_ids().get(kind).unwrap();
|
let b_key = b_entry.node_ids().get(kind).unwrap();
|
||||||
|
@ -126,9 +126,13 @@ impl RoutingTable {
|
|||||||
/// to run tick tasks which may run at slower tick rates as configured
|
/// to run tick tasks which may run at slower tick rates as configured
|
||||||
pub async fn tick(&self) -> EyreResult<()> {
|
pub async fn tick(&self) -> EyreResult<()> {
|
||||||
// Don't tick if paused
|
// Don't tick if paused
|
||||||
if self.inner.read().tick_paused {
|
let opt_tick_guard = {
|
||||||
|
let inner = self.inner.read();
|
||||||
|
inner.critical_sections.try_lock_tag(LOCK_TAG_TICK)
|
||||||
|
};
|
||||||
|
let Some(_tick_guard) = opt_tick_guard else {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
};
|
||||||
|
|
||||||
// Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs
|
// Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs
|
||||||
self.unlocked_inner.rolling_transfers_task.tick().await?;
|
self.unlocked_inner.rolling_transfers_task.tick().await?;
|
||||||
@ -183,22 +187,9 @@ impl RoutingTable {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
pub(crate) async fn pause_tasks(&self, paused: bool) {
|
pub(crate) async fn pause_tasks(&self) -> AsyncTagLockGuard<&'static str> {
|
||||||
let cancel = {
|
let critical_sections = self.inner.read().critical_sections.clone();
|
||||||
let mut inner = self.inner.write();
|
critical_sections.lock_tag(LOCK_TAG_TICK).await
|
||||||
if !inner.tick_paused && paused {
|
|
||||||
inner.tick_paused = true;
|
|
||||||
true
|
|
||||||
} else if inner.tick_paused && !paused {
|
|
||||||
inner.tick_paused = false;
|
|
||||||
false
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if cancel {
|
|
||||||
self.cancel_tasks().await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn cancel_tasks(&self) {
|
pub(crate) async fn cancel_tasks(&self) {
|
||||||
|
@ -67,7 +67,7 @@ impl RoutingTable {
|
|||||||
) as RoutingTableEntryFilter;
|
) as RoutingTableEntryFilter;
|
||||||
filters.push_front(filter);
|
filters.push_front(filter);
|
||||||
|
|
||||||
let noderefs = routing_table.find_fastest_nodes(
|
let noderefs = routing_table.find_preferred_fastest_nodes(
|
||||||
min_peer_count,
|
min_peer_count,
|
||||||
filters,
|
filters,
|
||||||
|_rti, entry: Option<Arc<BucketEntry>>| {
|
|_rti, entry: Option<Arc<BucketEntry>>| {
|
||||||
|
@ -4,8 +4,7 @@ struct FanoutContext<R>
|
|||||||
where
|
where
|
||||||
R: Unpin,
|
R: Unpin,
|
||||||
{
|
{
|
||||||
closest_nodes: Vec<NodeRef>,
|
fanout_queue: FanoutQueue,
|
||||||
called_nodes: HashSet<TypedKey>,
|
|
||||||
result: Option<Result<R, RPCError>>,
|
result: Option<Result<R, RPCError>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -72,8 +71,7 @@ where
|
|||||||
check_done: D,
|
check_done: D,
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
let context = Mutex::new(FanoutContext {
|
let context = Mutex::new(FanoutContext {
|
||||||
closest_nodes: Vec::with_capacity(node_count),
|
fanout_queue: FanoutQueue::new(node_id.kind),
|
||||||
called_nodes: HashSet::new(),
|
|
||||||
result: None,
|
result: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -91,82 +89,44 @@ where
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add_new_nodes(self: Arc<Self>, new_nodes: Vec<NodeRef>) {
|
fn evaluate_done(self: Arc<Self>, ctx: &mut FanoutContext<R>) -> bool {
|
||||||
let mut ctx = self.context.lock();
|
|
||||||
|
|
||||||
for nn in new_nodes {
|
|
||||||
// Make sure the new node isnt already in the list
|
|
||||||
let mut dup = false;
|
|
||||||
for cn in &ctx.closest_nodes {
|
|
||||||
if cn.same_entry(&nn) {
|
|
||||||
dup = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !dup {
|
|
||||||
// Add the new node if we haven't already called it before (only one call per node ever)
|
|
||||||
if let Some(key) = nn.node_ids().get(self.crypto_kind) {
|
|
||||||
if !ctx.called_nodes.contains(&key) {
|
|
||||||
ctx.closest_nodes.push(nn.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
self.routing_table
|
|
||||||
.sort_and_clean_closest_noderefs(self.node_id, &mut ctx.closest_nodes);
|
|
||||||
ctx.closest_nodes.truncate(self.node_count);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn remove_node(self: Arc<Self>, dead_node: NodeRef) {
|
|
||||||
let mut ctx = self.context.lock();
|
|
||||||
for n in 0..ctx.closest_nodes.len() {
|
|
||||||
let cn = &ctx.closest_nodes[n];
|
|
||||||
if cn.same_entry(&dead_node) {
|
|
||||||
ctx.closest_nodes.remove(n);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_next_node(self: Arc<Self>) -> Option<NodeRef> {
|
|
||||||
let mut next_node = None;
|
|
||||||
let mut ctx = self.context.lock();
|
|
||||||
for cn in ctx.closest_nodes.clone() {
|
|
||||||
if let Some(key) = cn.node_ids().get(self.crypto_kind) {
|
|
||||||
if !ctx.called_nodes.contains(&key) {
|
|
||||||
// New fanout call candidate found
|
|
||||||
next_node = Some(cn.clone());
|
|
||||||
ctx.called_nodes.insert(key);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
next_node
|
|
||||||
}
|
|
||||||
|
|
||||||
fn evaluate_done(self: Arc<Self>) -> bool {
|
|
||||||
let mut ctx = self.context.lock();
|
|
||||||
|
|
||||||
// If we have a result, then we're done
|
// If we have a result, then we're done
|
||||||
if ctx.result.is_some() {
|
if ctx.result.is_some() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for a new done result
|
// Check for a new done result
|
||||||
ctx.result = (self.check_done)(&ctx.closest_nodes).map(|o| Ok(o));
|
ctx.result = (self.check_done)(ctx.fanout_queue.nodes()).map(|o| Ok(o));
|
||||||
ctx.result.is_some()
|
ctx.result.is_some()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn add_to_fanout_queue(self: Arc<Self>, new_nodes: &[NodeRef]) {
|
||||||
|
let ctx = &mut *self.context.lock();
|
||||||
|
let this = self.clone();
|
||||||
|
ctx.fanout_queue.add(&new_nodes, |current_nodes| {
|
||||||
|
let mut current_nodes_vec = this
|
||||||
|
.routing_table
|
||||||
|
.sort_and_clean_closest_noderefs(this.node_id, current_nodes);
|
||||||
|
current_nodes_vec.truncate(self.node_count);
|
||||||
|
current_nodes_vec
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
async fn fanout_processor(self: Arc<Self>) {
|
async fn fanout_processor(self: Arc<Self>) {
|
||||||
// Check to see if we have a result or are done
|
// Loop until we have a result or are done
|
||||||
while !self.clone().evaluate_done() {
|
loop {
|
||||||
// Get the closest node we haven't processed yet
|
// Get the closest node we haven't processed yet if we're not done yet
|
||||||
let next_node = self.clone().get_next_node();
|
let next_node = {
|
||||||
|
let mut ctx = self.context.lock();
|
||||||
|
if self.clone().evaluate_done(&mut ctx) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
ctx.fanout_queue.next()
|
||||||
|
};
|
||||||
|
|
||||||
// If we don't have a node to process, stop fanning out
|
// If we don't have a node to process, stop fanning out
|
||||||
let Some(next_node) = next_node else {
|
let Some(next_node) = next_node else {
|
||||||
return;
|
break;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Do the call for this node
|
// Do the call for this node
|
||||||
@ -188,20 +148,18 @@ where
|
|||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// Call succeeded
|
// Call succeeded
|
||||||
// Register the returned nodes and add them to the closest nodes list in sorted order
|
// Register the returned nodes and add them to the fanout queue in sorted order
|
||||||
let new_nodes = self
|
let new_nodes = self
|
||||||
.routing_table
|
.routing_table
|
||||||
.register_find_node_answer(self.crypto_kind, filtered_v);
|
.register_find_node_answer(self.crypto_kind, filtered_v);
|
||||||
self.clone().add_new_nodes(new_nodes);
|
self.clone().add_to_fanout_queue(&new_nodes);
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
// Call failed, remove the node so it isn't considered as part of the fanout
|
// Call failed, node will node be considered again
|
||||||
self.clone().remove_node(next_node);
|
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Error happened, abort everything and return the error
|
// Error happened, abort everything and return the error
|
||||||
let mut ctx = self.context.lock();
|
self.context.lock().result = Some(Err(e));
|
||||||
ctx.result = Some(Err(e));
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -231,7 +189,7 @@ where
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check our node info ilter
|
// Check our node info filter
|
||||||
let node_ids = e.node_ids().to_vec();
|
let node_ids = e.node_ids().to_vec();
|
||||||
if !(node_info_filter)(&node_ids, signed_node_info.node_info()) {
|
if !(node_info_filter)(&node_ids, signed_node_info.node_info()) {
|
||||||
return false;
|
return false;
|
||||||
@ -248,12 +206,10 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
routing_table
|
routing_table
|
||||||
.find_closest_nodes(self.node_count, self.node_id, filters, transform)
|
.find_preferred_closest_nodes(self.node_count, self.node_id, filters, transform)
|
||||||
.map_err(RPCError::invalid_format)?
|
.map_err(RPCError::invalid_format)?
|
||||||
};
|
};
|
||||||
|
self.clone().add_to_fanout_queue(&closest_nodes);
|
||||||
let mut ctx = self.context.lock();
|
|
||||||
ctx.closest_nodes = closest_nodes;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -272,9 +228,11 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Do a quick check to see if we're already done
|
// Do a quick check to see if we're already done
|
||||||
if self.clone().evaluate_done() {
|
{
|
||||||
let mut ctx = self.context.lock();
|
let mut ctx = self.context.lock();
|
||||||
return TimeoutOr::value(ctx.result.take().transpose());
|
if self.clone().evaluate_done(&mut ctx) {
|
||||||
|
return TimeoutOr::value(ctx.result.take().transpose());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If not, do the fanout
|
// If not, do the fanout
|
||||||
@ -287,19 +245,12 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Wait for them to complete
|
// Wait for them to complete
|
||||||
timeout(timeout_ms, async {
|
timeout(timeout_ms, async { while unord.next().await.is_some() {} })
|
||||||
while let Some(_) = unord.next().await {
|
.await
|
||||||
if self.clone().evaluate_done() {
|
.into_timeout_or()
|
||||||
break;
|
.map(|_| {
|
||||||
}
|
// Finished, return whatever value we came up with
|
||||||
}
|
self.context.lock().result.take().transpose()
|
||||||
})
|
})
|
||||||
.await
|
|
||||||
.into_timeout_or()
|
|
||||||
.map(|_| {
|
|
||||||
// Finished, return whatever value we came up with
|
|
||||||
let mut ctx = self.context.lock();
|
|
||||||
ctx.result.take().transpose()
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
73
veilid-core/src/rpc_processor/fanout_queue.rs
Normal file
73
veilid-core/src/rpc_processor/fanout_queue.rs
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
use super::*;
|
||||||
|
|
||||||
|
pub struct FanoutQueue {
|
||||||
|
crypto_kind: CryptoKind,
|
||||||
|
current_nodes: VecDeque<NodeRef>,
|
||||||
|
returned_nodes: HashSet<TypedKey>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FanoutQueue {
|
||||||
|
// Create a queue for fanout candidates that have a crypto-kind compatible node id
|
||||||
|
pub fn new(crypto_kind: CryptoKind) -> Self {
|
||||||
|
Self {
|
||||||
|
crypto_kind,
|
||||||
|
current_nodes: VecDeque::new(),
|
||||||
|
returned_nodes: HashSet::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add new nodes to list of fanout candidates
|
||||||
|
// Run a cleanup routine afterwards to trim down the list of candidates so it doesn't grow too large
|
||||||
|
pub fn add<F: FnOnce(&[NodeRef]) -> Vec<NodeRef>>(
|
||||||
|
&mut self,
|
||||||
|
new_nodes: &[NodeRef],
|
||||||
|
cleanup: F,
|
||||||
|
) {
|
||||||
|
for nn in new_nodes {
|
||||||
|
// Ensure the node has a comparable key with our current crypto kind
|
||||||
|
let Some(key) = nn.node_ids().get(self.crypto_kind) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
// Check if we have already done this node before (only one call per node ever)
|
||||||
|
if self.returned_nodes.contains(&key) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the new node isnt already in the list
|
||||||
|
let mut dup = false;
|
||||||
|
for cn in &self.current_nodes {
|
||||||
|
if cn.same_entry(nn) {
|
||||||
|
dup = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !dup {
|
||||||
|
// Add the new node
|
||||||
|
self.current_nodes.push_front(nn.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the deque is a single slice
|
||||||
|
self.current_nodes.make_contiguous();
|
||||||
|
|
||||||
|
// Sort and trim the candidate set
|
||||||
|
self.current_nodes =
|
||||||
|
VecDeque::from_iter(cleanup(self.current_nodes.as_slices().0).iter().cloned());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return next fanout candidate
|
||||||
|
pub fn next(&mut self) -> Option<NodeRef> {
|
||||||
|
let cn = self.current_nodes.pop_front()?;
|
||||||
|
self.current_nodes.make_contiguous();
|
||||||
|
let key = cn.node_ids().get(self.crypto_kind).unwrap();
|
||||||
|
|
||||||
|
// Ensure we don't return this node again
|
||||||
|
self.returned_nodes.insert(key);
|
||||||
|
Some(cn)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get a slice of all the current fanout candidates
|
||||||
|
pub fn nodes(&self) -> &[NodeRef] {
|
||||||
|
self.current_nodes.as_slices().0
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,7 @@
|
|||||||
mod coders;
|
mod coders;
|
||||||
mod destination;
|
mod destination;
|
||||||
mod fanout_call;
|
mod fanout_call;
|
||||||
|
mod fanout_queue;
|
||||||
mod operation_waiter;
|
mod operation_waiter;
|
||||||
mod rpc_app_call;
|
mod rpc_app_call;
|
||||||
mod rpc_app_message;
|
mod rpc_app_message;
|
||||||
@ -31,6 +32,7 @@ mod rpc_start_tunnel;
|
|||||||
pub use coders::*;
|
pub use coders::*;
|
||||||
pub use destination::*;
|
pub use destination::*;
|
||||||
pub use fanout_call::*;
|
pub use fanout_call::*;
|
||||||
|
pub use fanout_queue::*;
|
||||||
pub use operation_waiter::*;
|
pub use operation_waiter::*;
|
||||||
pub use rpc_error::*;
|
pub use rpc_error::*;
|
||||||
pub use rpc_status::*;
|
pub use rpc_status::*;
|
||||||
|
@ -105,7 +105,7 @@ impl RPCProcessor {
|
|||||||
// Get a chunk of the routing table near the requested node id
|
// Get a chunk of the routing table near the requested node id
|
||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
let closest_nodes =
|
let closest_nodes =
|
||||||
network_result_try!(routing_table.find_all_closest_peers(node_id, &capabilities));
|
network_result_try!(routing_table.find_preferred_closest_peers(node_id, &capabilities));
|
||||||
|
|
||||||
// Make FindNode answer
|
// Make FindNode answer
|
||||||
let find_node_a = RPCOperationFindNodeA::new(closest_nodes)?;
|
let find_node_a = RPCOperationFindNodeA::new(closest_nodes)?;
|
||||||
|
@ -78,7 +78,7 @@ impl RPCProcessor {
|
|||||||
log_rpc!(debug "{}", debug_string);
|
log_rpc!(debug "{}", debug_string);
|
||||||
|
|
||||||
let waitable_reply = network_result_try!(
|
let waitable_reply = network_result_try!(
|
||||||
self.question(dest, question, Some(question_context))
|
self.question(dest.clone(), question, Some(question_context))
|
||||||
.await?
|
.await?
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -99,29 +99,35 @@ impl RPCProcessor {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let (value, peers, descriptor) = get_value_a.destructure();
|
let (value, peers, descriptor) = get_value_a.destructure();
|
||||||
|
#[cfg(feature="debug-dht")]
|
||||||
|
{
|
||||||
|
let debug_string_value = value.as_ref().map(|v| {
|
||||||
|
format!(" len={} seq={} writer={}",
|
||||||
|
v.value_data().data().len(),
|
||||||
|
v.value_data().seq(),
|
||||||
|
v.value_data().writer(),
|
||||||
|
)
|
||||||
|
}).unwrap_or_default();
|
||||||
|
|
||||||
|
let debug_string_answer = format!(
|
||||||
|
"OUT <== GetValueA({} #{}{}{} peers={}) <= {}",
|
||||||
|
key,
|
||||||
|
subkey,
|
||||||
|
debug_string_value,
|
||||||
|
if descriptor.is_some() {
|
||||||
|
" +desc"
|
||||||
|
} else {
|
||||||
|
""
|
||||||
|
},
|
||||||
|
peers.len(),
|
||||||
|
dest
|
||||||
|
);
|
||||||
|
|
||||||
let debug_string_value = value.as_ref().map(|v| {
|
log_rpc!(debug "{}", debug_string_answer);
|
||||||
format!(" len={} seq={} writer={}",
|
|
||||||
v.value_data().data().len(),
|
let peer_ids:Vec<String> = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect();
|
||||||
v.value_data().seq(),
|
log_rpc!(debug "Peers: {:#?}", peer_ids);
|
||||||
v.value_data().writer(),
|
}
|
||||||
)
|
|
||||||
}).unwrap_or_default();
|
|
||||||
|
|
||||||
let debug_string_answer = format!(
|
|
||||||
"OUT <== GetValueA({} #{}{}{} peers={})",
|
|
||||||
key,
|
|
||||||
subkey,
|
|
||||||
debug_string_value,
|
|
||||||
if descriptor.is_some() {
|
|
||||||
" +desc"
|
|
||||||
} else {
|
|
||||||
""
|
|
||||||
},
|
|
||||||
peers.len(),
|
|
||||||
);
|
|
||||||
|
|
||||||
log_rpc!(debug "{}", debug_string_answer);
|
|
||||||
|
|
||||||
// Validate peers returned are, in fact, closer to the key than the node we sent this to
|
// Validate peers returned are, in fact, closer to the key than the node we sent this to
|
||||||
let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) {
|
let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) {
|
||||||
@ -201,21 +207,24 @@ impl RPCProcessor {
|
|||||||
|
|
||||||
// Get the nodes that we know about that are closer to the the key than our own node
|
// Get the nodes that we know about that are closer to the the key than our own node
|
||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
let closer_to_key_peers = network_result_try!(routing_table.find_peers_closer_to_key(key, vec![CAP_DHT]));
|
let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT]));
|
||||||
|
|
||||||
let debug_string = format!(
|
#[cfg(feature="debug-dht")]
|
||||||
"IN <=== GetValueQ({} #{}{}) <== {}",
|
{
|
||||||
key,
|
let debug_string = format!(
|
||||||
subkey,
|
"IN <=== GetValueQ({} #{}{}) <== {}",
|
||||||
if want_descriptor {
|
key,
|
||||||
" +wantdesc"
|
subkey,
|
||||||
} else {
|
if want_descriptor {
|
||||||
""
|
" +wantdesc"
|
||||||
},
|
} else {
|
||||||
msg.header.direct_sender_node_id()
|
""
|
||||||
);
|
},
|
||||||
|
msg.header.direct_sender_node_id()
|
||||||
|
);
|
||||||
|
|
||||||
log_rpc!(debug "{}", debug_string);
|
log_rpc!(debug "{}", debug_string);
|
||||||
|
}
|
||||||
|
|
||||||
// See if we have this record ourselves
|
// See if we have this record ourselves
|
||||||
let storage_manager = self.storage_manager();
|
let storage_manager = self.storage_manager();
|
||||||
|
@ -92,7 +92,7 @@ impl RPCProcessor {
|
|||||||
log_rpc!(debug "{}", debug_string);
|
log_rpc!(debug "{}", debug_string);
|
||||||
|
|
||||||
let waitable_reply = network_result_try!(
|
let waitable_reply = network_result_try!(
|
||||||
self.question(dest, question, Some(question_context))
|
self.question(dest.clone(), question, Some(question_context))
|
||||||
.await?
|
.await?
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -114,28 +114,36 @@ impl RPCProcessor {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let (set, value, peers) = set_value_a.destructure();
|
let (set, value, peers) = set_value_a.destructure();
|
||||||
|
|
||||||
let debug_string_value = value.as_ref().map(|v| {
|
|
||||||
format!(" len={} writer={}",
|
|
||||||
v.value_data().data().len(),
|
|
||||||
v.value_data().writer(),
|
|
||||||
)
|
|
||||||
}).unwrap_or_default();
|
|
||||||
|
|
||||||
let debug_string_answer = format!(
|
|
||||||
"OUT <== SetValueA({} #{}{}{} peers={})",
|
|
||||||
key,
|
|
||||||
subkey,
|
|
||||||
if set {
|
|
||||||
" +set"
|
|
||||||
} else {
|
|
||||||
""
|
|
||||||
},
|
|
||||||
debug_string_value,
|
|
||||||
peers.len(),
|
|
||||||
);
|
|
||||||
|
|
||||||
log_rpc!(debug "{}", debug_string_answer);
|
#[cfg(feature="debug-dht")]
|
||||||
|
{
|
||||||
|
let debug_string_value = value.as_ref().map(|v| {
|
||||||
|
format!(" len={} writer={}",
|
||||||
|
v.value_data().data().len(),
|
||||||
|
v.value_data().writer(),
|
||||||
|
)
|
||||||
|
}).unwrap_or_default();
|
||||||
|
|
||||||
|
|
||||||
|
let debug_string_answer = format!(
|
||||||
|
"OUT <== SetValueA({} #{}{}{} peers={}) <= {}",
|
||||||
|
key,
|
||||||
|
subkey,
|
||||||
|
if set {
|
||||||
|
" +set"
|
||||||
|
} else {
|
||||||
|
""
|
||||||
|
},
|
||||||
|
debug_string_value,
|
||||||
|
peers.len(),
|
||||||
|
dest,
|
||||||
|
);
|
||||||
|
|
||||||
|
log_rpc!(debug "{}", debug_string_answer);
|
||||||
|
|
||||||
|
let peer_ids:Vec<String> = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect();
|
||||||
|
log_rpc!(debug "Peers: {:#?}", peer_ids);
|
||||||
|
}
|
||||||
|
|
||||||
// Validate peers returned are, in fact, closer to the key than the node we sent this to
|
// Validate peers returned are, in fact, closer to the key than the node we sent this to
|
||||||
let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) {
|
let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) {
|
||||||
@ -213,7 +221,7 @@ impl RPCProcessor {
|
|||||||
|
|
||||||
// Get the nodes that we know about that are closer to the the key than our own node
|
// Get the nodes that we know about that are closer to the the key than our own node
|
||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
let closer_to_key_peers = network_result_try!(routing_table.find_peers_closer_to_key(key, vec![CAP_DHT]));
|
let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT]));
|
||||||
|
|
||||||
let debug_string = format!(
|
let debug_string = format!(
|
||||||
"IN <=== SetValueQ({} #{} len={} seq={} writer={}{}) <== {}",
|
"IN <=== SetValueQ({} #{} len={} seq={} writer={}{}) <== {}",
|
||||||
@ -235,7 +243,7 @@ impl RPCProcessor {
|
|||||||
// If there are less than 'set_value_count' peers that are closer, then store here too
|
// If there are less than 'set_value_count' peers that are closer, then store here too
|
||||||
let set_value_count = {
|
let set_value_count = {
|
||||||
let c = self.config.get();
|
let c = self.config.get();
|
||||||
c.network.dht.set_value_fanout as usize
|
c.network.dht.set_value_count as usize
|
||||||
};
|
};
|
||||||
let (set, new_value) = if closer_to_key_peers.len() >= set_value_count {
|
let (set, new_value) = if closer_to_key_peers.len() >= set_value_count {
|
||||||
// Not close enough
|
// Not close enough
|
||||||
|
@ -176,9 +176,13 @@ impl StorageManager {
|
|||||||
}
|
}
|
||||||
// If we finished with consensus (enough nodes returning the same value)
|
// If we finished with consensus (enough nodes returning the same value)
|
||||||
TimeoutOr::Value(Ok(Some(()))) => {
|
TimeoutOr::Value(Ok(Some(()))) => {
|
||||||
log_stor!(debug "GetValue Fanout Consensus");
|
|
||||||
// Return the best answer we've got
|
// Return the best answer we've got
|
||||||
let ctx = context.lock();
|
let ctx = context.lock();
|
||||||
|
if ctx.value_count >= consensus_count {
|
||||||
|
log_stor!(debug "GetValue Fanout Consensus");
|
||||||
|
} else {
|
||||||
|
log_stor!(debug "GetValue Fanout Non-Consensus: {}", ctx.value_count);
|
||||||
|
}
|
||||||
Ok(SubkeyResult {
|
Ok(SubkeyResult {
|
||||||
value: ctx.value.clone(),
|
value: ctx.value.clone(),
|
||||||
descriptor: ctx.descriptor.clone(),
|
descriptor: ctx.descriptor.clone(),
|
||||||
@ -188,7 +192,11 @@ impl StorageManager {
|
|||||||
TimeoutOr::Value(Ok(None)) => {
|
TimeoutOr::Value(Ok(None)) => {
|
||||||
// Return the best answer we've got
|
// Return the best answer we've got
|
||||||
let ctx = context.lock();
|
let ctx = context.lock();
|
||||||
log_stor!(debug "GetValue Fanout No Consensus: {}", ctx.value_count);
|
if ctx.value_count >= consensus_count {
|
||||||
|
log_stor!(debug "GetValue Fanout Exhausted Consensus");
|
||||||
|
} else {
|
||||||
|
log_stor!(debug "GetValue Fanout Exhausted Non-Consensus: {}", ctx.value_count);
|
||||||
|
}
|
||||||
Ok(SubkeyResult {
|
Ok(SubkeyResult {
|
||||||
value: ctx.value.clone(),
|
value: ctx.value.clone(),
|
||||||
descriptor: ctx.descriptor.clone(),
|
descriptor: ctx.descriptor.clone(),
|
||||||
|
@ -93,13 +93,14 @@ impl<T: PrimInt + Unsigned + fmt::Display + fmt::Debug> LimitedSize<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
log_stor!(debug "Commit ({}): {} => {}", self.description, self.value, uncommitted_value);
|
log_stor!(debug "Commit ({}): {} => {}", self.description, self.value, uncommitted_value);
|
||||||
|
self.uncommitted_value = None;
|
||||||
self.value = uncommitted_value;
|
self.value = uncommitted_value;
|
||||||
}
|
}
|
||||||
Ok(self.value)
|
Ok(self.value)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn rollback(&mut self) -> T {
|
pub fn rollback(&mut self) -> T {
|
||||||
if let Some(uv) = self.uncommitted_value {
|
if let Some(uv) = self.uncommitted_value.take() {
|
||||||
log_stor!(debug "Rollback ({}): {} (drop {})", self.description, self.value, uv);
|
log_stor!(debug "Rollback ({}): {} (drop {})", self.description, self.value, uv);
|
||||||
}
|
}
|
||||||
return self.value;
|
return self.value;
|
||||||
|
@ -7,6 +7,21 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
use hashlink::LruCache;
|
use hashlink::LruCache;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
/// A dead record that is yet to be purged from disk and statistics
|
||||||
|
struct DeadRecord<D>
|
||||||
|
where
|
||||||
|
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
|
||||||
|
{
|
||||||
|
// The key used in the record_index
|
||||||
|
key: RecordTableKey,
|
||||||
|
// The actual record
|
||||||
|
record: Record<D>,
|
||||||
|
// True if this record is accounted for in the total storage
|
||||||
|
// and needs to have the statistics updated or not when purged
|
||||||
|
in_total_storage: bool,
|
||||||
|
}
|
||||||
|
|
||||||
pub struct RecordStore<D>
|
pub struct RecordStore<D>
|
||||||
where
|
where
|
||||||
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
|
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
|
||||||
@ -15,16 +30,24 @@ where
|
|||||||
name: String,
|
name: String,
|
||||||
limits: RecordStoreLimits,
|
limits: RecordStoreLimits,
|
||||||
|
|
||||||
|
/// The tabledb used for record data
|
||||||
record_table: Option<TableDB>,
|
record_table: Option<TableDB>,
|
||||||
|
/// The tabledb used for subkey data
|
||||||
subkey_table: Option<TableDB>,
|
subkey_table: Option<TableDB>,
|
||||||
|
/// The in-memory index that keeps track of what records are in the tabledb
|
||||||
record_index: LruCache<RecordTableKey, Record<D>>,
|
record_index: LruCache<RecordTableKey, Record<D>>,
|
||||||
|
/// The in-memory cache of commonly accessed subkey data so we don't have to keep hitting the db
|
||||||
subkey_cache: LruCache<SubkeyTableKey, RecordData>,
|
subkey_cache: LruCache<SubkeyTableKey, RecordData>,
|
||||||
|
/// Total storage space or subkey data inclusive of structures in memory
|
||||||
subkey_cache_total_size: LimitedSize<usize>,
|
subkey_cache_total_size: LimitedSize<usize>,
|
||||||
|
/// Total storage space of records in the tabledb inclusive of subkey data and structures
|
||||||
total_storage_space: LimitedSize<u64>,
|
total_storage_space: LimitedSize<u64>,
|
||||||
|
/// Records to be removed from the tabledb upon next purge
|
||||||
dead_records: Vec<(RecordTableKey, Record<D>)>,
|
dead_records: Vec<DeadRecord<D>>,
|
||||||
|
/// The list of records that have changed since last flush to disk (optimization for batched writes)
|
||||||
changed_records: HashSet<RecordTableKey>,
|
changed_records: HashSet<RecordTableKey>,
|
||||||
|
|
||||||
|
/// A mutex to ensure we handle this concurrently
|
||||||
purge_dead_records_mutex: Arc<AsyncMutex<()>>,
|
purge_dead_records_mutex: Arc<AsyncMutex<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -97,33 +120,45 @@ where
|
|||||||
|
|
||||||
// Sort the record index by last touched time and insert in sorted order
|
// Sort the record index by last touched time and insert in sorted order
|
||||||
record_index_saved.sort_by(|a, b| a.1.last_touched().cmp(&b.1.last_touched()));
|
record_index_saved.sort_by(|a, b| a.1.last_touched().cmp(&b.1.last_touched()));
|
||||||
let mut dead_records = Vec::new();
|
let mut dead_records = Vec::<DeadRecord<D>>::new();
|
||||||
for ri in record_index_saved {
|
for ri in record_index_saved {
|
||||||
// total the storage space
|
// total the storage space
|
||||||
self.total_storage_space
|
self.total_storage_space
|
||||||
.add(mem::size_of::<RecordTableKey>() as u64)
|
.add((mem::size_of::<RecordTableKey>() + ri.1.total_size()) as u64)
|
||||||
.unwrap();
|
|
||||||
self.total_storage_space
|
|
||||||
.add(ri.1.total_size() as u64)
|
|
||||||
.unwrap();
|
.unwrap();
|
||||||
if let Err(_) = self.total_storage_space.commit() {
|
if let Err(_) = self.total_storage_space.commit() {
|
||||||
// If we overflow the limit, kill off the record
|
// Revert the total storage space because the commit failed
|
||||||
dead_records.push((ri.0, ri.1));
|
self.total_storage_space.rollback();
|
||||||
|
|
||||||
|
// If we overflow the limit, kill off the record, noting that it has not yet been added to the total storage space
|
||||||
|
dead_records.push(DeadRecord {
|
||||||
|
key: ri.0,
|
||||||
|
record: ri.1,
|
||||||
|
in_total_storage: false,
|
||||||
|
});
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// add to index and ensure we deduplicate in the case of an error
|
// add to index and ensure we deduplicate in the case of an error
|
||||||
if let Some(v) = self.record_index.insert_with_callback(ri.0, ri.1, |k, v| {
|
if let Some(v) = self.record_index.insert_with_callback(ri.0, ri.1, |k, v| {
|
||||||
// If the configuration change, we only want to keep the 'limits.max_records' records
|
// If the configuration change, we only want to keep the 'limits.max_records' records
|
||||||
dead_records.push((k, v));
|
dead_records.push(DeadRecord {
|
||||||
|
key: k,
|
||||||
|
record: v,
|
||||||
|
in_total_storage: true,
|
||||||
|
});
|
||||||
}) {
|
}) {
|
||||||
// This shouldn't happen, but deduplicate anyway
|
// This shouldn't happen, but deduplicate anyway
|
||||||
log_stor!(warn "duplicate record in table: {:?}", ri.0);
|
log_stor!(warn "duplicate record in table: {:?}", ri.0);
|
||||||
dead_records.push((ri.0, v));
|
dead_records.push(DeadRecord {
|
||||||
|
key: ri.0,
|
||||||
|
record: v,
|
||||||
|
in_total_storage: true,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (k, v) in dead_records {
|
for dr in dead_records {
|
||||||
self.add_dead_record(k, v);
|
self.dead_records.push(dr);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.record_table = Some(record_table);
|
self.record_table = Some(record_table);
|
||||||
@ -132,7 +167,11 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn add_dead_record(&mut self, key: RecordTableKey, record: Record<D>) {
|
fn add_dead_record(&mut self, key: RecordTableKey, record: Record<D>) {
|
||||||
self.dead_records.push((key, record));
|
self.dead_records.push(DeadRecord {
|
||||||
|
key,
|
||||||
|
record,
|
||||||
|
in_total_storage: true,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
fn mark_record_changed(&mut self, key: RecordTableKey) {
|
fn mark_record_changed(&mut self, key: RecordTableKey) {
|
||||||
@ -208,23 +247,23 @@ where
|
|||||||
let rt_xact = record_table.transact();
|
let rt_xact = record_table.transact();
|
||||||
let st_xact = subkey_table.transact();
|
let st_xact = subkey_table.transact();
|
||||||
let dead_records = mem::take(&mut self.dead_records);
|
let dead_records = mem::take(&mut self.dead_records);
|
||||||
for (k, v) in dead_records {
|
for dr in dead_records {
|
||||||
// Record should already be gone from index
|
// Record should already be gone from index
|
||||||
if self.record_index.contains_key(&k) {
|
if self.record_index.contains_key(&dr.key) {
|
||||||
log_stor!(error "dead record found in index: {:?}", k);
|
log_stor!(error "dead record found in index: {:?}", dr.key);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete record
|
// Delete record
|
||||||
if let Err(e) = rt_xact.delete(0, &k.bytes()) {
|
if let Err(e) = rt_xact.delete(0, &dr.key.bytes()) {
|
||||||
log_stor!(error "record could not be deleted: {}", e);
|
log_stor!(error "record could not be deleted: {}", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete subkeys
|
// Delete subkeys
|
||||||
let stored_subkeys = v.stored_subkeys();
|
let stored_subkeys = dr.record.stored_subkeys();
|
||||||
for sk in stored_subkeys.iter() {
|
for sk in stored_subkeys.iter() {
|
||||||
// From table
|
// From table
|
||||||
let stk = SubkeyTableKey {
|
let stk = SubkeyTableKey {
|
||||||
key: k.key,
|
key: dr.key.key,
|
||||||
subkey: sk,
|
subkey: sk,
|
||||||
};
|
};
|
||||||
let stkb = stk.bytes();
|
let stkb = stk.bytes();
|
||||||
@ -237,11 +276,12 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Remove from total size
|
// Remove from total size
|
||||||
self.total_storage_space
|
if dr.in_total_storage {
|
||||||
.saturating_sub(mem::size_of::<RecordTableKey>() as u64);
|
self.total_storage_space.saturating_sub(
|
||||||
self.total_storage_space
|
(mem::size_of::<RecordTableKey>() + dr.record.total_size()) as u64,
|
||||||
.saturating_sub(v.total_size() as u64);
|
);
|
||||||
self.total_storage_space.commit().unwrap();
|
self.total_storage_space.commit().unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if let Err(e) = rt_xact.commit().await {
|
if let Err(e) = rt_xact.commit().await {
|
||||||
log_stor!(error "failed to commit record table transaction: {}", e);
|
log_stor!(error "failed to commit record table transaction: {}", e);
|
||||||
@ -306,6 +346,9 @@ where
|
|||||||
.await
|
.await
|
||||||
.map_err(VeilidAPIError::internal)?;
|
.map_err(VeilidAPIError::internal)?;
|
||||||
|
|
||||||
|
// Update storage space (won't fail due to check_limit above)
|
||||||
|
self.total_storage_space.commit().unwrap();
|
||||||
|
|
||||||
// Save to record index
|
// Save to record index
|
||||||
let mut dead_records = Vec::new();
|
let mut dead_records = Vec::new();
|
||||||
if let Some(v) = self.record_index.insert_with_callback(rtk, record, |k, v| {
|
if let Some(v) = self.record_index.insert_with_callback(rtk, record, |k, v| {
|
||||||
@ -319,9 +362,6 @@ where
|
|||||||
self.add_dead_record(dr.0, dr.1);
|
self.add_dead_record(dr.0, dr.1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update storage space
|
|
||||||
self.total_storage_space.commit().unwrap();
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -407,7 +447,7 @@ where
|
|||||||
subkey: ValueSubkey,
|
subkey: ValueSubkey,
|
||||||
want_descriptor: bool,
|
want_descriptor: bool,
|
||||||
) -> VeilidAPIResult<Option<SubkeyResult>> {
|
) -> VeilidAPIResult<Option<SubkeyResult>> {
|
||||||
// record from index
|
// Get record from index
|
||||||
let Some((subkey_count, has_subkey, opt_descriptor)) = self.with_record(key, |record| {
|
let Some((subkey_count, has_subkey, opt_descriptor)) = self.with_record(key, |record| {
|
||||||
(record.subkey_count(), record.stored_subkeys().contains(subkey), if want_descriptor {
|
(record.subkey_count(), record.stored_subkeys().contains(subkey), if want_descriptor {
|
||||||
Some(record.descriptor().clone())
|
Some(record.descriptor().clone())
|
||||||
@ -545,9 +585,9 @@ where
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get record from index
|
// Get record subkey count and total size of all record subkey data exclusive of structures
|
||||||
let Some((subkey_count, total_size)) = self.with_record(key, |record| {
|
let Some((subkey_count, prior_record_data_size)) = self.with_record(key, |record| {
|
||||||
(record.subkey_count(), record.total_size())
|
(record.subkey_count(), record.record_data_size())
|
||||||
}) else {
|
}) else {
|
||||||
apibail_invalid_argument!("no record at this key", "key", key);
|
apibail_invalid_argument!("no record at this key", "key", key);
|
||||||
};
|
};
|
||||||
@ -563,14 +603,14 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Get the previous subkey and ensure we aren't going over the record size limit
|
// Get the previous subkey and ensure we aren't going over the record size limit
|
||||||
let mut prior_record_data_size = 0usize;
|
let mut prior_subkey_size = 0usize;
|
||||||
|
|
||||||
// If subkey exists in subkey cache, use that
|
// If subkey exists in subkey cache, use that
|
||||||
let stk = SubkeyTableKey { key, subkey };
|
let stk = SubkeyTableKey { key, subkey };
|
||||||
let stk_bytes = stk.bytes();
|
let stk_bytes = stk.bytes();
|
||||||
|
|
||||||
if let Some(record_data) = self.subkey_cache.peek(&stk) {
|
if let Some(record_data) = self.subkey_cache.peek(&stk) {
|
||||||
prior_record_data_size = record_data.total_size();
|
prior_subkey_size = record_data.data_size();
|
||||||
} else {
|
} else {
|
||||||
// If not in cache, try to pull from table store
|
// If not in cache, try to pull from table store
|
||||||
if let Some(record_data) = subkey_table
|
if let Some(record_data) = subkey_table
|
||||||
@ -578,26 +618,26 @@ where
|
|||||||
.await
|
.await
|
||||||
.map_err(VeilidAPIError::internal)?
|
.map_err(VeilidAPIError::internal)?
|
||||||
{
|
{
|
||||||
prior_record_data_size = record_data.total_size();
|
prior_subkey_size = record_data.data_size();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make new record data
|
// Make new record data
|
||||||
let record_data = RecordData::new(signed_value_data);
|
let subkey_record_data = RecordData::new(signed_value_data);
|
||||||
|
|
||||||
// Check new total record size
|
// Check new total record size
|
||||||
let new_record_data_size = record_data.total_size();
|
let new_subkey_size = subkey_record_data.data_size();
|
||||||
let new_total_size = total_size + new_record_data_size - prior_record_data_size;
|
let new_record_data_size = prior_record_data_size - prior_subkey_size + new_subkey_size;
|
||||||
if new_total_size > self.limits.max_record_total_size {
|
if new_record_data_size > self.limits.max_record_total_size {
|
||||||
apibail_generic!("dht record too large");
|
apibail_generic!("dht record too large");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check new total storage space
|
// Check new total storage space
|
||||||
self.total_storage_space
|
self.total_storage_space
|
||||||
.sub(prior_record_data_size as u64)
|
.sub(prior_subkey_size as u64)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
self.total_storage_space
|
self.total_storage_space
|
||||||
.add(new_record_data_size as u64)
|
.add(new_subkey_size as u64)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
if !self.total_storage_space.check_limit() {
|
if !self.total_storage_space.check_limit() {
|
||||||
apibail_try_again!();
|
apibail_try_again!();
|
||||||
@ -605,17 +645,17 @@ where
|
|||||||
|
|
||||||
// Write subkey
|
// Write subkey
|
||||||
subkey_table
|
subkey_table
|
||||||
.store_json(0, &stk_bytes, &record_data)
|
.store_json(0, &stk_bytes, &subkey_record_data)
|
||||||
.await
|
.await
|
||||||
.map_err(VeilidAPIError::internal)?;
|
.map_err(VeilidAPIError::internal)?;
|
||||||
|
|
||||||
// Write to subkey cache
|
// Write to subkey cache
|
||||||
self.add_to_subkey_cache(stk, record_data);
|
self.add_to_subkey_cache(stk, subkey_record_data);
|
||||||
|
|
||||||
// Update record
|
// Update record
|
||||||
self.with_record_mut(key, |record| {
|
self.with_record_mut(key, |record| {
|
||||||
record.store_subkey(subkey);
|
record.store_subkey(subkey);
|
||||||
record.set_record_data_size(new_total_size);
|
record.set_record_data_size(new_record_data_size);
|
||||||
})
|
})
|
||||||
.expect("record should still be here");
|
.expect("record should still be here");
|
||||||
|
|
||||||
@ -666,7 +706,7 @@ where
|
|||||||
out += &format!("Total Storage Space: {}\n", self.total_storage_space.get());
|
out += &format!("Total Storage Space: {}\n", self.total_storage_space.get());
|
||||||
out += &format!("Dead Records: {}\n", self.dead_records.len());
|
out += &format!("Dead Records: {}\n", self.dead_records.len());
|
||||||
for dr in &self.dead_records {
|
for dr in &self.dead_records {
|
||||||
out += &format!(" {}\n", dr.0.key.to_string());
|
out += &format!(" {}\n", dr.key.key.to_string());
|
||||||
}
|
}
|
||||||
out += &format!("Changed Records: {}\n", self.changed_records.len());
|
out += &format!("Changed Records: {}\n", self.changed_records.len());
|
||||||
for cr in &self.changed_records {
|
for cr in &self.changed_records {
|
||||||
|
@ -5,7 +5,9 @@ struct OutboundSetValueContext {
|
|||||||
/// The latest value of the subkey, may be the value passed in
|
/// The latest value of the subkey, may be the value passed in
|
||||||
pub value: SignedValueData,
|
pub value: SignedValueData,
|
||||||
/// The consensus count for the value we have received
|
/// The consensus count for the value we have received
|
||||||
pub value_count: usize,
|
pub set_count: usize,
|
||||||
|
/// The number of non-sets since the last set we have received
|
||||||
|
pub missed_since_last_set: usize,
|
||||||
/// The parsed schema from the descriptor if we have one
|
/// The parsed schema from the descriptor if we have one
|
||||||
pub schema: DHTSchema,
|
pub schema: DHTSchema,
|
||||||
}
|
}
|
||||||
@ -38,7 +40,8 @@ impl StorageManager {
|
|||||||
let schema = descriptor.schema()?;
|
let schema = descriptor.schema()?;
|
||||||
let context = Arc::new(Mutex::new(OutboundSetValueContext {
|
let context = Arc::new(Mutex::new(OutboundSetValueContext {
|
||||||
value,
|
value,
|
||||||
value_count: 0,
|
set_count: 0,
|
||||||
|
missed_since_last_set: 0,
|
||||||
schema,
|
schema,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
@ -98,7 +101,8 @@ impl StorageManager {
|
|||||||
// If the sequence number is greater, keep it
|
// If the sequence number is greater, keep it
|
||||||
ctx.value = value;
|
ctx.value = value;
|
||||||
// One node has shown us this value so far
|
// One node has shown us this value so far
|
||||||
ctx.value_count = 1;
|
ctx.set_count = 1;
|
||||||
|
ctx.missed_since_last_set = 0;
|
||||||
} else {
|
} else {
|
||||||
// If the sequence number is older, or an equal sequence number,
|
// If the sequence number is older, or an equal sequence number,
|
||||||
// node should have not returned a value here.
|
// node should have not returned a value here.
|
||||||
@ -108,8 +112,12 @@ impl StorageManager {
|
|||||||
} else {
|
} else {
|
||||||
// It was set on this node and no newer value was found and returned,
|
// It was set on this node and no newer value was found and returned,
|
||||||
// so increase our consensus count
|
// so increase our consensus count
|
||||||
ctx.value_count += 1;
|
ctx.set_count += 1;
|
||||||
|
ctx.missed_since_last_set = 0;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
let mut ctx = context.lock();
|
||||||
|
ctx.missed_since_last_set += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return peers if we have some
|
// Return peers if we have some
|
||||||
@ -122,9 +130,18 @@ impl StorageManager {
|
|||||||
|
|
||||||
// Routine to call to check if we're done at each step
|
// Routine to call to check if we're done at each step
|
||||||
let check_done = |_closest_nodes: &[NodeRef]| {
|
let check_done = |_closest_nodes: &[NodeRef]| {
|
||||||
// If we have reached sufficient consensus, return done
|
|
||||||
let ctx = context.lock();
|
let ctx = context.lock();
|
||||||
if ctx.value_count >= consensus_count {
|
|
||||||
|
// If we have reached sufficient consensus, return done
|
||||||
|
if ctx.set_count >= consensus_count {
|
||||||
|
return Some(());
|
||||||
|
}
|
||||||
|
// If we have missed more than our consensus count since our last set, return done
|
||||||
|
// This keeps the traversal from searching too many nodes when we aren't converging
|
||||||
|
// Only do this if we have gotten at least half our desired sets.
|
||||||
|
if ctx.set_count >= ((consensus_count + 1) / 2)
|
||||||
|
&& ctx.missed_since_last_set >= consensus_count
|
||||||
|
{
|
||||||
return Some(());
|
return Some(());
|
||||||
}
|
}
|
||||||
None
|
None
|
||||||
@ -150,18 +167,26 @@ impl StorageManager {
|
|||||||
let ctx = context.lock();
|
let ctx = context.lock();
|
||||||
Ok(ctx.value.clone())
|
Ok(ctx.value.clone())
|
||||||
}
|
}
|
||||||
// If we finished with consensus (enough nodes returning the same value)
|
// If we finished with or without consensus (enough nodes returning the same value)
|
||||||
TimeoutOr::Value(Ok(Some(()))) => {
|
TimeoutOr::Value(Ok(Some(()))) => {
|
||||||
log_stor!(debug "SetValue Fanout Consensus");
|
|
||||||
// Return the best answer we've got
|
// Return the best answer we've got
|
||||||
let ctx = context.lock();
|
let ctx = context.lock();
|
||||||
|
if ctx.set_count >= consensus_count {
|
||||||
|
log_stor!(debug "SetValue Fanout Consensus");
|
||||||
|
} else {
|
||||||
|
log_stor!(debug "SetValue Fanout Non-Consensus: {}", ctx.set_count);
|
||||||
|
}
|
||||||
Ok(ctx.value.clone())
|
Ok(ctx.value.clone())
|
||||||
}
|
}
|
||||||
// If we finished without consensus (ran out of nodes before getting consensus)
|
// If we ran out of nodes before getting consensus)
|
||||||
TimeoutOr::Value(Ok(None)) => {
|
TimeoutOr::Value(Ok(None)) => {
|
||||||
// Return the best answer we've got
|
// Return the best answer we've got
|
||||||
let ctx = context.lock();
|
let ctx = context.lock();
|
||||||
log_stor!(debug "SetValue Fanout No Consensus: {}", ctx.value_count);
|
if ctx.set_count >= consensus_count {
|
||||||
|
log_stor!(debug "SetValue Fanout Exhausted Consensus");
|
||||||
|
} else {
|
||||||
|
log_stor!(debug "SetValue Fanout Exhausted Non-Consensus: {}", ctx.set_count);
|
||||||
|
}
|
||||||
Ok(ctx.value.clone())
|
Ok(ctx.value.clone())
|
||||||
}
|
}
|
||||||
// Failed
|
// Failed
|
||||||
|
@ -74,7 +74,9 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn total_size(&self) -> usize {
|
pub fn total_size(&self) -> usize {
|
||||||
mem::size_of::<Record<D>>() + self.descriptor.total_size() + self.record_data_size
|
(mem::size_of::<Self>() - mem::size_of::<SignedValueDescriptor>())
|
||||||
|
+ self.descriptor.total_size()
|
||||||
|
+ self.record_data_size
|
||||||
}
|
}
|
||||||
|
|
||||||
// pub fn detail(&self) -> &D {
|
// pub fn detail(&self) -> &D {
|
||||||
|
@ -12,7 +12,11 @@ impl RecordData {
|
|||||||
pub fn signed_value_data(&self) -> &SignedValueData {
|
pub fn signed_value_data(&self) -> &SignedValueData {
|
||||||
&self.signed_value_data
|
&self.signed_value_data
|
||||||
}
|
}
|
||||||
|
pub fn data_size(&self) -> usize {
|
||||||
|
self.signed_value_data.data_size()
|
||||||
|
}
|
||||||
pub fn total_size(&self) -> usize {
|
pub fn total_size(&self) -> usize {
|
||||||
mem::size_of::<RecordData>() + self.signed_value_data.value_data().data().len()
|
(mem::size_of::<Self>() - mem::size_of::<SignedValueData>())
|
||||||
|
+ self.signed_value_data.total_size()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -56,6 +56,10 @@ impl SignedValueData {
|
|||||||
&self.signature
|
&self.signature
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn data_size(&self) -> usize {
|
||||||
|
self.value_data.data_size()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn total_size(&self) -> usize {
|
pub fn total_size(&self) -> usize {
|
||||||
(mem::size_of::<Self>() - mem::size_of::<ValueData>()) + self.value_data.total_size()
|
(mem::size_of::<Self>() - mem::size_of::<ValueData>()) + self.value_data.total_size()
|
||||||
}
|
}
|
||||||
|
@ -56,6 +56,10 @@ impl ValueData {
|
|||||||
&self.data
|
&self.data
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn data_size(&self) -> usize {
|
||||||
|
self.data.len()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn total_size(&self) -> usize {
|
pub fn total_size(&self) -> usize {
|
||||||
mem::size_of::<Self>() + self.data.len()
|
mem::size_of::<Self>() + self.data.len()
|
||||||
}
|
}
|
||||||
|
@ -120,16 +120,16 @@ Future<VeilidConfig> getDefaultVeilidConfig(String programName) async {
|
|||||||
defaultRouteHopCount: 1,
|
defaultRouteHopCount: 1,
|
||||||
),
|
),
|
||||||
dht: VeilidConfigDHT(
|
dht: VeilidConfigDHT(
|
||||||
|
maxFindNodeCount: 20,
|
||||||
resolveNodeTimeoutMs: 10000,
|
resolveNodeTimeoutMs: 10000,
|
||||||
resolveNodeCount: 1,
|
resolveNodeCount: 1,
|
||||||
resolveNodeFanout: 4,
|
resolveNodeFanout: 4,
|
||||||
maxFindNodeCount: 20,
|
|
||||||
getValueTimeoutMs: 10000,
|
getValueTimeoutMs: 10000,
|
||||||
getValueCount: 3,
|
getValueCount: 3,
|
||||||
getValueFanout: 4,
|
getValueFanout: 4,
|
||||||
setValueTimeoutMs: 10000,
|
setValueTimeoutMs: 10000,
|
||||||
setValueCount: 4,
|
setValueCount: 5,
|
||||||
setValueFanout: 6,
|
setValueFanout: 4,
|
||||||
minPeerCount: 20,
|
minPeerCount: 20,
|
||||||
minPeerRefreshTimeMs: 60000,
|
minPeerRefreshTimeMs: 60000,
|
||||||
validateDialInfoReceiptTimeMs: 2000,
|
validateDialInfoReceiptTimeMs: 2000,
|
||||||
|
@ -3,7 +3,7 @@ SCRIPTDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
|
|||||||
|
|
||||||
pushd $SCRIPTDIR 2>/dev/null
|
pushd $SCRIPTDIR 2>/dev/null
|
||||||
if [[ "$1" == "wasm" ]]; then
|
if [[ "$1" == "wasm" ]]; then
|
||||||
WASM_BINDGEN_TEST_TIMEOUT=120 wasm-pack test --firefox --headless --features=rt-wasm-bindgen
|
WASM_BINDGEN_TEST_TIMEOUT=120 wasm-pack test --firefox --headless --no-default-features --features=rt-wasm-bindgen
|
||||||
elif [[ "$1" == "ios" ]]; then
|
elif [[ "$1" == "ios" ]]; then
|
||||||
SYMROOT=/tmp/testout
|
SYMROOT=/tmp/testout
|
||||||
APPNAME=veilidtools-tests
|
APPNAME=veilidtools-tests
|
||||||
|
@ -33,16 +33,16 @@ where
|
|||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
let mut inner = self.table.inner.lock();
|
let mut inner = self.table.inner.lock();
|
||||||
// Inform the table we're dropping this guard
|
// Inform the table we're dropping this guard
|
||||||
let waiters = {
|
let guards = {
|
||||||
// Get the table entry, it must exist since we have a guard locked
|
// Get the table entry, it must exist since we have a guard locked
|
||||||
let entry = inner.table.get_mut(&self.tag).unwrap();
|
let entry = inner.table.get_mut(&self.tag).unwrap();
|
||||||
// Decrement the number of waiters
|
// Decrement the number of guards
|
||||||
entry.waiters -= 1;
|
entry.guards -= 1;
|
||||||
// Return the number of waiters left
|
// Return the number of guards left
|
||||||
entry.waiters
|
entry.guards
|
||||||
};
|
};
|
||||||
// If there are no waiters left, we remove the tag from the table
|
// If there are no guards left, we remove the tag from the table
|
||||||
if waiters == 0 {
|
if guards == 0 {
|
||||||
inner.table.remove(&self.tag).unwrap();
|
inner.table.remove(&self.tag).unwrap();
|
||||||
}
|
}
|
||||||
// Proceed with releasing _guard, which may cause some concurrent tag lock to acquire
|
// Proceed with releasing _guard, which may cause some concurrent tag lock to acquire
|
||||||
@ -52,7 +52,7 @@ where
|
|||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
struct AsyncTagLockTableEntry {
|
struct AsyncTagLockTableEntry {
|
||||||
mutex: Arc<AsyncMutex<()>>,
|
mutex: Arc<AsyncMutex<()>>,
|
||||||
waiters: usize,
|
guards: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct AsyncTagLockTableInner<T>
|
struct AsyncTagLockTableInner<T>
|
||||||
@ -108,11 +108,11 @@ where
|
|||||||
.entry(tag.clone())
|
.entry(tag.clone())
|
||||||
.or_insert_with(|| AsyncTagLockTableEntry {
|
.or_insert_with(|| AsyncTagLockTableEntry {
|
||||||
mutex: Arc::new(AsyncMutex::new(())),
|
mutex: Arc::new(AsyncMutex::new(())),
|
||||||
waiters: 0,
|
guards: 0,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Increment the number of waiters
|
// Increment the number of guards
|
||||||
entry.waiters += 1;
|
entry.guards += 1;
|
||||||
|
|
||||||
// Return the mutex associated with the tag
|
// Return the mutex associated with the tag
|
||||||
entry.mutex.clone()
|
entry.mutex.clone()
|
||||||
@ -121,18 +121,36 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Lock the tag lock
|
// Lock the tag lock
|
||||||
let guard;
|
let guard = asyncmutex_lock_arc!(mutex);
|
||||||
cfg_if! {
|
|
||||||
if #[cfg(feature="rt-tokio")] {
|
|
||||||
// tokio version
|
|
||||||
guard = mutex.lock_owned().await;
|
|
||||||
} else {
|
|
||||||
// async-std and wasm async-lock version
|
|
||||||
guard = mutex.lock_arc().await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return the locked guard
|
// Return the locked guard
|
||||||
AsyncTagLockGuard::new(self.clone(), tag, guard)
|
AsyncTagLockGuard::new(self.clone(), tag, guard)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn try_lock_tag(&self, tag: T) -> Option<AsyncTagLockGuard<T>> {
|
||||||
|
// Get or create a tag lock entry
|
||||||
|
let mut inner = self.inner.lock();
|
||||||
|
|
||||||
|
// See if this tag is in the table
|
||||||
|
// and if not, add a new mutex for this tag
|
||||||
|
let entry = inner.table.entry(tag.clone());
|
||||||
|
|
||||||
|
// Lock the tag lock
|
||||||
|
let guard = match entry {
|
||||||
|
std::collections::hash_map::Entry::Occupied(mut o) => {
|
||||||
|
let e = o.get_mut();
|
||||||
|
let guard = asyncmutex_try_lock_arc!(e.mutex)?;
|
||||||
|
e.guards += 1;
|
||||||
|
guard
|
||||||
|
}
|
||||||
|
std::collections::hash_map::Entry::Vacant(v) => {
|
||||||
|
let mutex = Arc::new(AsyncMutex::new(()));
|
||||||
|
let guard = asyncmutex_try_lock_arc!(mutex)?;
|
||||||
|
v.insert(AsyncTagLockTableEntry { mutex, guards: 1 });
|
||||||
|
guard
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// Return guard
|
||||||
|
Some(AsyncTagLockGuard::new(self.clone(), tag, guard))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -55,6 +55,29 @@ pub async fn test_simple_single_contention() {
|
|||||||
assert_eq!(table.len(), 1);
|
assert_eq!(table.len(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn test_simple_try() {
|
||||||
|
info!("test_simple_try");
|
||||||
|
|
||||||
|
let table = AsyncTagLockTable::new();
|
||||||
|
|
||||||
|
let a1 = SocketAddr::new("1.2.3.4".parse().unwrap(), 1234);
|
||||||
|
let a2 = SocketAddr::new("1.2.3.5".parse().unwrap(), 1235);
|
||||||
|
|
||||||
|
{
|
||||||
|
let _g1 = table.lock_tag(a1).await;
|
||||||
|
|
||||||
|
let opt_g2 = table.try_lock_tag(a1);
|
||||||
|
let opt_g3 = table.try_lock_tag(a2);
|
||||||
|
|
||||||
|
assert!(opt_g2.is_none());
|
||||||
|
assert!(opt_g3.is_some());
|
||||||
|
}
|
||||||
|
let opt_g4 = table.try_lock_tag(a1);
|
||||||
|
assert!(opt_g4.is_some());
|
||||||
|
|
||||||
|
assert_eq!(table.len(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn test_simple_double_contention() {
|
pub async fn test_simple_double_contention() {
|
||||||
info!("test_simple_double_contention");
|
info!("test_simple_double_contention");
|
||||||
|
|
||||||
@ -153,6 +176,7 @@ pub async fn test_parallel_single_contention() {
|
|||||||
|
|
||||||
pub async fn test_all() {
|
pub async fn test_all() {
|
||||||
test_simple_no_contention().await;
|
test_simple_no_contention().await;
|
||||||
|
test_simple_try().await;
|
||||||
test_simple_single_contention().await;
|
test_simple_single_contention().await;
|
||||||
test_parallel_single_contention().await;
|
test_parallel_single_contention().await;
|
||||||
}
|
}
|
||||||
|
@ -47,6 +47,13 @@ cfg_if::cfg_if! {
|
|||||||
$x.clone().lock_owned().await
|
$x.clone().lock_owned().await
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! asyncmutex_try_lock_arc {
|
||||||
|
($x:expr) => {
|
||||||
|
$x.clone().try_lock_owned().ok()
|
||||||
|
};
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! asyncmutex_try_lock {
|
macro_rules! asyncmutex_try_lock {
|
||||||
@ -60,6 +67,12 @@ cfg_if::cfg_if! {
|
|||||||
$x.lock_arc().await
|
$x.lock_arc().await
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! asyncmutex_try_lock_arc {
|
||||||
|
($x:expr) => {
|
||||||
|
$x.try_lock_arc()
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,7 +4,6 @@
|
|||||||
use cfg_if::*;
|
use cfg_if::*;
|
||||||
use parking_lot::Once;
|
use parking_lot::Once;
|
||||||
use veilid_tools::tests::*;
|
use veilid_tools::tests::*;
|
||||||
use veilid_tools::*;
|
|
||||||
|
|
||||||
use wasm_bindgen_test::*;
|
use wasm_bindgen_test::*;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user