diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index caeb894d..b6137796 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -126,7 +126,7 @@ impl RPCProcessor { log_rpc!(debug "{}", debug_string_answer); let peer_ids:Vec = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect(); - log_rpc!(debug "Peers: {:#?}", peers); + log_rpc!(debug "Peers: {:#?}", peer_ids); } // Validate peers returned are, in fact, closer to the key than the node we sent this to @@ -224,7 +224,7 @@ impl RPCProcessor { ); log_rpc!(debug "{}", debug_string); - } + } // See if we have this record ourselves let storage_manager = self.storage_manager(); diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index cab0d6e6..48f7563b 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -114,7 +114,7 @@ impl RPCProcessor { }; let (set, value, peers) = set_value_a.destructure(); - + #[cfg(feature="debug-dht")] { let debug_string_value = value.as_ref().map(|v| { @@ -140,6 +140,7 @@ impl RPCProcessor { ); log_rpc!(debug "{}", debug_string_answer); + let peer_ids:Vec = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect(); log_rpc!(debug "Peers: {:#?}", peer_ids); } diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index 8092b1d0..ebf8969d 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -176,9 +176,13 @@ impl StorageManager { } // If we finished with consensus (enough nodes returning the same value) TimeoutOr::Value(Ok(Some(()))) => { - log_stor!(debug "GetValue Fanout Consensus"); // Return the best answer we've got let ctx = context.lock(); + if ctx.value_count >= consensus_count { + log_stor!(debug "GetValue Fanout Consensus"); + } else { + log_stor!(debug "GetValue Fanout Non-Consensus: {}", ctx.value_count); + } Ok(SubkeyResult { value: ctx.value.clone(), descriptor: ctx.descriptor.clone(), @@ -188,7 +192,11 @@ impl StorageManager { TimeoutOr::Value(Ok(None)) => { // Return the best answer we've got let ctx = context.lock(); - log_stor!(debug "GetValue Fanout No Consensus: {}", ctx.value_count); + if ctx.value_count >= consensus_count { + log_stor!(debug "GetValue Fanout Exhausted Consensus"); + } else { + log_stor!(debug "GetValue Fanout Exhausted Non-Consensus: {}", ctx.value_count); + } Ok(SubkeyResult { value: ctx.value.clone(), descriptor: ctx.descriptor.clone(), diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index 17f65448..e1b4be01 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -5,7 +5,9 @@ struct OutboundSetValueContext { /// The latest value of the subkey, may be the value passed in pub value: SignedValueData, /// The consensus count for the value we have received - pub value_count: usize, + pub set_count: usize, + /// The number of non-sets since the last set we have received + pub missed_since_last_set: usize, /// The parsed schema from the descriptor if we have one pub schema: DHTSchema, } @@ -38,7 +40,8 @@ impl StorageManager { let schema = descriptor.schema()?; let context = Arc::new(Mutex::new(OutboundSetValueContext { value, - value_count: 0, + set_count: 0, + missed_since_last_set: 0, schema, })); @@ -98,7 +101,8 @@ impl StorageManager { // If the sequence number is greater, keep it ctx.value = value; // One node has shown us this value so far - ctx.value_count = 1; + ctx.set_count = 1; + ctx.missed_since_last_set = 0; } else { // If the sequence number is older, or an equal sequence number, // node should have not returned a value here. @@ -108,8 +112,12 @@ impl StorageManager { } else { // It was set on this node and no newer value was found and returned, // so increase our consensus count - ctx.value_count += 1; + ctx.set_count += 1; + ctx.missed_since_last_set = 0; } + } else { + let mut ctx = context.lock(); + ctx.missed_since_last_set += 1; } // Return peers if we have some @@ -122,9 +130,18 @@ impl StorageManager { // Routine to call to check if we're done at each step let check_done = |_closest_nodes: &[NodeRef]| { - // If we have reached sufficient consensus, return done let ctx = context.lock(); - if ctx.value_count >= consensus_count { + + // If we have reached sufficient consensus, return done + if ctx.set_count >= consensus_count { + return Some(()); + } + // If we have missed more than our consensus count since our last set, return done + // This keeps the traversal from searching too many nodes when we aren't converging + // Only do this if we have gotten at least half our desired sets. + if ctx.set_count >= ((consensus_count + 1) / 2) + && ctx.missed_since_last_set >= consensus_count + { return Some(()); } None @@ -150,18 +167,26 @@ impl StorageManager { let ctx = context.lock(); Ok(ctx.value.clone()) } - // If we finished with consensus (enough nodes returning the same value) + // If we finished with or without consensus (enough nodes returning the same value) TimeoutOr::Value(Ok(Some(()))) => { - log_stor!(debug "SetValue Fanout Consensus"); // Return the best answer we've got let ctx = context.lock(); + if ctx.set_count >= consensus_count { + log_stor!(debug "SetValue Fanout Consensus"); + } else { + log_stor!(debug "SetValue Fanout Non-Consensus: {}", ctx.set_count); + } Ok(ctx.value.clone()) } - // If we finished without consensus (ran out of nodes before getting consensus) + // If we ran out of nodes before getting consensus) TimeoutOr::Value(Ok(None)) => { // Return the best answer we've got let ctx = context.lock(); - log_stor!(debug "SetValue Fanout No Consensus: {}", ctx.value_count); + if ctx.set_count >= consensus_count { + log_stor!(debug "SetValue Fanout Exhausted Consensus"); + } else { + log_stor!(debug "SetValue Fanout Exhausted Non-Consensus: {}", ctx.set_count); + } Ok(ctx.value.clone()) } // Failed diff --git a/veilid-tools/src/async_tag_lock.rs b/veilid-tools/src/async_tag_lock.rs index cf3950b8..ec9e8ec3 100644 --- a/veilid-tools/src/async_tag_lock.rs +++ b/veilid-tools/src/async_tag_lock.rs @@ -33,16 +33,16 @@ where fn drop(&mut self) { let mut inner = self.table.inner.lock(); // Inform the table we're dropping this guard - let waiters = { + let guards = { // Get the table entry, it must exist since we have a guard locked let entry = inner.table.get_mut(&self.tag).unwrap(); - // Decrement the number of waiters - entry.waiters -= 1; - // Return the number of waiters left - entry.waiters + // Decrement the number of guards + entry.guards -= 1; + // Return the number of guards left + entry.guards }; - // If there are no waiters left, we remove the tag from the table - if waiters == 0 { + // If there are no guards left, we remove the tag from the table + if guards == 0 { inner.table.remove(&self.tag).unwrap(); } // Proceed with releasing _guard, which may cause some concurrent tag lock to acquire @@ -52,7 +52,7 @@ where #[derive(Clone, Debug)] struct AsyncTagLockTableEntry { mutex: Arc>, - waiters: usize, + guards: usize, } struct AsyncTagLockTableInner @@ -108,11 +108,11 @@ where .entry(tag.clone()) .or_insert_with(|| AsyncTagLockTableEntry { mutex: Arc::new(AsyncMutex::new(())), - waiters: 0, + guards: 0, }); - // Increment the number of waiters - entry.waiters += 1; + // Increment the number of guards + entry.guards += 1; // Return the mutex associated with the tag entry.mutex.clone() @@ -121,16 +121,7 @@ where }; // Lock the tag lock - let guard; - cfg_if! { - if #[cfg(feature="rt-tokio")] { - // tokio version - guard = mutex.lock_owned().await; - } else { - // async-std and wasm async-lock version - guard = mutex.lock_arc().await; - } - } + let guard = asyncmutex_lock_arc!(mutex); // Return the locked guard AsyncTagLockGuard::new(self.clone(), tag, guard) @@ -138,32 +129,28 @@ where pub fn try_lock_tag(&self, tag: T) -> Option> { // Get or create a tag lock entry - let mutex = { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock(); - // See if this tag is in the table - // and if not, add a new mutex for this tag - let entry = inner - .table - .entry(tag.clone()) - .or_insert_with(|| AsyncTagLockTableEntry { - mutex: Arc::new(AsyncMutex::new(())), - waiters: 0, - }); - - // Increment the number of waiters - entry.waiters += 1; - - // Return the mutex associated with the tag - entry.mutex.clone() - - // Drop the table guard - }; + // See if this tag is in the table + // and if not, add a new mutex for this tag + let entry = inner.table.entry(tag.clone()); // Lock the tag lock - let opt_guard = asyncmutex_try_lock_arc!(mutex); - - // Return the locked guard - opt_guard.map(|guard| AsyncTagLockGuard::new(self.clone(), tag, guard)) + let guard = match entry { + std::collections::hash_map::Entry::Occupied(mut o) => { + let e = o.get_mut(); + let guard = asyncmutex_try_lock_arc!(e.mutex)?; + e.guards += 1; + guard + } + std::collections::hash_map::Entry::Vacant(v) => { + let mutex = Arc::new(AsyncMutex::new(())); + let guard = asyncmutex_try_lock_arc!(mutex)?; + v.insert(AsyncTagLockTableEntry { mutex, guards: 1 }); + guard + } + }; + // Return guard + Some(AsyncTagLockGuard::new(self.clone(), tag, guard)) } } diff --git a/veilid-tools/src/tools.rs b/veilid-tools/src/tools.rs index 76d0a2d8..648da9b2 100644 --- a/veilid-tools/src/tools.rs +++ b/veilid-tools/src/tools.rs @@ -51,7 +51,7 @@ cfg_if::cfg_if! { #[macro_export] macro_rules! asyncmutex_try_lock_arc { ($x:expr) => { - $x.try_lock_owned().ok() + $x.clone().try_lock_owned().ok() }; } } else {