From 2d075626f18ee472a12b1950e82943aa708c08de Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Thu, 21 Sep 2023 22:24:37 -0400 Subject: [PATCH] fixes --- veilid-core/src/rpc_processor/fanout_call.rs | 28 ++++++++++++-------- veilid-core/src/storage_manager/get_value.rs | 6 ++++- veilid-core/src/storage_manager/set_value.rs | 7 ++++- 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/veilid-core/src/rpc_processor/fanout_call.rs b/veilid-core/src/rpc_processor/fanout_call.rs index 1c4ff0a0..3ae25137 100644 --- a/veilid-core/src/rpc_processor/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout_call.rs @@ -113,21 +113,21 @@ where }); } - async fn fanout_processor(self: Arc) { + async fn fanout_processor(self: Arc) -> bool { // Loop until we have a result or are done loop { // Get the closest node we haven't processed yet if we're not done yet let next_node = { let mut ctx = self.context.lock(); if self.clone().evaluate_done(&mut ctx) { - break; + break true; } ctx.fanout_queue.next() }; // If we don't have a node to process, stop fanning out let Some(next_node) = next_node else { - break; + break false; }; // Do the call for this node @@ -161,7 +161,7 @@ where Err(e) => { // Error happened, abort everything and return the error self.context.lock().result = Some(Err(e)); - return; + break true; } }; } @@ -248,12 +248,18 @@ where } } // Wait for them to complete - timeout(timeout_ms, async { while unord.next().await.is_some() {} }) - .await - .into_timeout_or() - .map(|_| { - // Finished, return whatever value we came up with - self.context.lock().result.take().transpose() - }) + timeout(timeout_ms, async { + while let Some(is_done) = unord.next().await { + if is_done { + break; + } + } + }) + .await + .into_timeout_or() + .map(|_| { + // Finished, return whatever value we came up with + self.context.lock().result.take().transpose() + }) } } diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index ebf8969d..85a28a10 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -166,9 +166,13 @@ impl StorageManager { match fanout_call.run().await { // If we don't finish in the timeout (too much time passed checking for consensus) TimeoutOr::Timeout => { - log_stor!(debug "GetValue Fanout Timeout"); // Return the best answer we've got let ctx = context.lock(); + if ctx.value_count >= consensus_count { + log_stor!(debug "GetValue Fanout Timeout Consensus"); + } else { + log_stor!(debug "GetValue Fanout Timeout Non-Consensus: {}", ctx.value_count); + } Ok(SubkeyResult { value: ctx.value.clone(), descriptor: ctx.descriptor.clone(), diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index 15de4467..5c812f24 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -162,9 +162,14 @@ impl StorageManager { match fanout_call.run().await { // If we don't finish in the timeout (too much time passed checking for consensus) TimeoutOr::Timeout => { - log_stor!(debug "SetValue Fanout Timeout"); // Return the best answer we've got let ctx = context.lock(); + if ctx.set_count >= consensus_count { + log_stor!(debug "SetValue Fanout Timeout Consensus"); + } else { + log_stor!(debug "SetValue Fanout Timeout Non-Consensus: {}", ctx.set_count); + } + Ok(ctx.value.clone()) } // If we finished with or without consensus (enough nodes returning the same value)