dht fixes

This commit is contained in:
Christien Rioux 2023-09-09 21:44:16 -04:00
parent 07f92b6e3f
commit 60a7e90712
6 changed files with 82 additions and 61 deletions

View File

@ -126,7 +126,7 @@ impl RPCProcessor {
log_rpc!(debug "{}", debug_string_answer); log_rpc!(debug "{}", debug_string_answer);
let peer_ids:Vec<String> = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect(); let peer_ids:Vec<String> = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect();
log_rpc!(debug "Peers: {:#?}", peers); log_rpc!(debug "Peers: {:#?}", peer_ids);
} }
// Validate peers returned are, in fact, closer to the key than the node we sent this to // Validate peers returned are, in fact, closer to the key than the node we sent this to

View File

@ -140,6 +140,7 @@ impl RPCProcessor {
); );
log_rpc!(debug "{}", debug_string_answer); log_rpc!(debug "{}", debug_string_answer);
let peer_ids:Vec<String> = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect(); let peer_ids:Vec<String> = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect();
log_rpc!(debug "Peers: {:#?}", peer_ids); log_rpc!(debug "Peers: {:#?}", peer_ids);
} }

View File

@ -176,9 +176,13 @@ impl StorageManager {
} }
// If we finished with consensus (enough nodes returning the same value) // If we finished with consensus (enough nodes returning the same value)
TimeoutOr::Value(Ok(Some(()))) => { TimeoutOr::Value(Ok(Some(()))) => {
log_stor!(debug "GetValue Fanout Consensus");
// Return the best answer we've got // Return the best answer we've got
let ctx = context.lock(); let ctx = context.lock();
if ctx.value_count >= consensus_count {
log_stor!(debug "GetValue Fanout Consensus");
} else {
log_stor!(debug "GetValue Fanout Non-Consensus: {}", ctx.value_count);
}
Ok(SubkeyResult { Ok(SubkeyResult {
value: ctx.value.clone(), value: ctx.value.clone(),
descriptor: ctx.descriptor.clone(), descriptor: ctx.descriptor.clone(),
@ -188,7 +192,11 @@ impl StorageManager {
TimeoutOr::Value(Ok(None)) => { TimeoutOr::Value(Ok(None)) => {
// Return the best answer we've got // Return the best answer we've got
let ctx = context.lock(); let ctx = context.lock();
log_stor!(debug "GetValue Fanout No Consensus: {}", ctx.value_count); if ctx.value_count >= consensus_count {
log_stor!(debug "GetValue Fanout Exhausted Consensus");
} else {
log_stor!(debug "GetValue Fanout Exhausted Non-Consensus: {}", ctx.value_count);
}
Ok(SubkeyResult { Ok(SubkeyResult {
value: ctx.value.clone(), value: ctx.value.clone(),
descriptor: ctx.descriptor.clone(), descriptor: ctx.descriptor.clone(),

View File

@ -5,7 +5,9 @@ struct OutboundSetValueContext {
/// The latest value of the subkey, may be the value passed in /// The latest value of the subkey, may be the value passed in
pub value: SignedValueData, pub value: SignedValueData,
/// The consensus count for the value we have received /// The consensus count for the value we have received
pub value_count: usize, pub set_count: usize,
/// The number of non-sets since the last set we have received
pub missed_since_last_set: usize,
/// The parsed schema from the descriptor if we have one /// The parsed schema from the descriptor if we have one
pub schema: DHTSchema, pub schema: DHTSchema,
} }
@ -38,7 +40,8 @@ impl StorageManager {
let schema = descriptor.schema()?; let schema = descriptor.schema()?;
let context = Arc::new(Mutex::new(OutboundSetValueContext { let context = Arc::new(Mutex::new(OutboundSetValueContext {
value, value,
value_count: 0, set_count: 0,
missed_since_last_set: 0,
schema, schema,
})); }));
@ -98,7 +101,8 @@ impl StorageManager {
// If the sequence number is greater, keep it // If the sequence number is greater, keep it
ctx.value = value; ctx.value = value;
// One node has shown us this value so far // One node has shown us this value so far
ctx.value_count = 1; ctx.set_count = 1;
ctx.missed_since_last_set = 0;
} else { } else {
// If the sequence number is older, or an equal sequence number, // If the sequence number is older, or an equal sequence number,
// node should have not returned a value here. // node should have not returned a value here.
@ -108,8 +112,12 @@ impl StorageManager {
} else { } else {
// It was set on this node and no newer value was found and returned, // It was set on this node and no newer value was found and returned,
// so increase our consensus count // so increase our consensus count
ctx.value_count += 1; ctx.set_count += 1;
ctx.missed_since_last_set = 0;
} }
} else {
let mut ctx = context.lock();
ctx.missed_since_last_set += 1;
} }
// Return peers if we have some // Return peers if we have some
@ -122,9 +130,18 @@ impl StorageManager {
// Routine to call to check if we're done at each step // Routine to call to check if we're done at each step
let check_done = |_closest_nodes: &[NodeRef]| { let check_done = |_closest_nodes: &[NodeRef]| {
// If we have reached sufficient consensus, return done
let ctx = context.lock(); let ctx = context.lock();
if ctx.value_count >= consensus_count {
// If we have reached sufficient consensus, return done
if ctx.set_count >= consensus_count {
return Some(());
}
// If we have missed more than our consensus count since our last set, return done
// This keeps the traversal from searching too many nodes when we aren't converging
// Only do this if we have gotten at least half our desired sets.
if ctx.set_count >= ((consensus_count + 1) / 2)
&& ctx.missed_since_last_set >= consensus_count
{
return Some(()); return Some(());
} }
None None
@ -150,18 +167,26 @@ impl StorageManager {
let ctx = context.lock(); let ctx = context.lock();
Ok(ctx.value.clone()) Ok(ctx.value.clone())
} }
// If we finished with consensus (enough nodes returning the same value) // If we finished with or without consensus (enough nodes returning the same value)
TimeoutOr::Value(Ok(Some(()))) => { TimeoutOr::Value(Ok(Some(()))) => {
log_stor!(debug "SetValue Fanout Consensus");
// Return the best answer we've got // Return the best answer we've got
let ctx = context.lock(); let ctx = context.lock();
if ctx.set_count >= consensus_count {
log_stor!(debug "SetValue Fanout Consensus");
} else {
log_stor!(debug "SetValue Fanout Non-Consensus: {}", ctx.set_count);
}
Ok(ctx.value.clone()) Ok(ctx.value.clone())
} }
// If we finished without consensus (ran out of nodes before getting consensus) // If we ran out of nodes before getting consensus)
TimeoutOr::Value(Ok(None)) => { TimeoutOr::Value(Ok(None)) => {
// Return the best answer we've got // Return the best answer we've got
let ctx = context.lock(); let ctx = context.lock();
log_stor!(debug "SetValue Fanout No Consensus: {}", ctx.value_count); if ctx.set_count >= consensus_count {
log_stor!(debug "SetValue Fanout Exhausted Consensus");
} else {
log_stor!(debug "SetValue Fanout Exhausted Non-Consensus: {}", ctx.set_count);
}
Ok(ctx.value.clone()) Ok(ctx.value.clone())
} }
// Failed // Failed

View File

@ -33,16 +33,16 @@ where
fn drop(&mut self) { fn drop(&mut self) {
let mut inner = self.table.inner.lock(); let mut inner = self.table.inner.lock();
// Inform the table we're dropping this guard // Inform the table we're dropping this guard
let waiters = { let guards = {
// Get the table entry, it must exist since we have a guard locked // Get the table entry, it must exist since we have a guard locked
let entry = inner.table.get_mut(&self.tag).unwrap(); let entry = inner.table.get_mut(&self.tag).unwrap();
// Decrement the number of waiters // Decrement the number of guards
entry.waiters -= 1; entry.guards -= 1;
// Return the number of waiters left // Return the number of guards left
entry.waiters entry.guards
}; };
// If there are no waiters left, we remove the tag from the table // If there are no guards left, we remove the tag from the table
if waiters == 0 { if guards == 0 {
inner.table.remove(&self.tag).unwrap(); inner.table.remove(&self.tag).unwrap();
} }
// Proceed with releasing _guard, which may cause some concurrent tag lock to acquire // Proceed with releasing _guard, which may cause some concurrent tag lock to acquire
@ -52,7 +52,7 @@ where
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct AsyncTagLockTableEntry { struct AsyncTagLockTableEntry {
mutex: Arc<AsyncMutex<()>>, mutex: Arc<AsyncMutex<()>>,
waiters: usize, guards: usize,
} }
struct AsyncTagLockTableInner<T> struct AsyncTagLockTableInner<T>
@ -108,11 +108,11 @@ where
.entry(tag.clone()) .entry(tag.clone())
.or_insert_with(|| AsyncTagLockTableEntry { .or_insert_with(|| AsyncTagLockTableEntry {
mutex: Arc::new(AsyncMutex::new(())), mutex: Arc::new(AsyncMutex::new(())),
waiters: 0, guards: 0,
}); });
// Increment the number of waiters // Increment the number of guards
entry.waiters += 1; entry.guards += 1;
// Return the mutex associated with the tag // Return the mutex associated with the tag
entry.mutex.clone() entry.mutex.clone()
@ -121,16 +121,7 @@ where
}; };
// Lock the tag lock // Lock the tag lock
let guard; let guard = asyncmutex_lock_arc!(mutex);
cfg_if! {
if #[cfg(feature="rt-tokio")] {
// tokio version
guard = mutex.lock_owned().await;
} else {
// async-std and wasm async-lock version
guard = mutex.lock_arc().await;
}
}
// Return the locked guard // Return the locked guard
AsyncTagLockGuard::new(self.clone(), tag, guard) AsyncTagLockGuard::new(self.clone(), tag, guard)
@ -138,32 +129,28 @@ where
pub fn try_lock_tag(&self, tag: T) -> Option<AsyncTagLockGuard<T>> { pub fn try_lock_tag(&self, tag: T) -> Option<AsyncTagLockGuard<T>> {
// Get or create a tag lock entry // Get or create a tag lock entry
let mutex = {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
// See if this tag is in the table // See if this tag is in the table
// and if not, add a new mutex for this tag // and if not, add a new mutex for this tag
let entry = inner let entry = inner.table.entry(tag.clone());
.table
.entry(tag.clone())
.or_insert_with(|| AsyncTagLockTableEntry {
mutex: Arc::new(AsyncMutex::new(())),
waiters: 0,
});
// Increment the number of waiters
entry.waiters += 1;
// Return the mutex associated with the tag
entry.mutex.clone()
// Drop the table guard
};
// Lock the tag lock // Lock the tag lock
let opt_guard = asyncmutex_try_lock_arc!(mutex); let guard = match entry {
std::collections::hash_map::Entry::Occupied(mut o) => {
// Return the locked guard let e = o.get_mut();
opt_guard.map(|guard| AsyncTagLockGuard::new(self.clone(), tag, guard)) let guard = asyncmutex_try_lock_arc!(e.mutex)?;
e.guards += 1;
guard
}
std::collections::hash_map::Entry::Vacant(v) => {
let mutex = Arc::new(AsyncMutex::new(()));
let guard = asyncmutex_try_lock_arc!(mutex)?;
v.insert(AsyncTagLockTableEntry { mutex, guards: 1 });
guard
}
};
// Return guard
Some(AsyncTagLockGuard::new(self.clone(), tag, guard))
} }
} }

View File

@ -51,7 +51,7 @@ cfg_if::cfg_if! {
#[macro_export] #[macro_export]
macro_rules! asyncmutex_try_lock_arc { macro_rules! asyncmutex_try_lock_arc {
($x:expr) => { ($x:expr) => {
$x.try_lock_owned().ok() $x.clone().try_lock_owned().ok()
}; };
} }
} else { } else {