This commit is contained in:
Christien Rioux 2023-09-21 22:24:37 -04:00
parent ca1a802b5b
commit 2d075626f1
3 changed files with 28 additions and 13 deletions

View File

@ -113,21 +113,21 @@ where
}); });
} }
async fn fanout_processor(self: Arc<Self>) { async fn fanout_processor(self: Arc<Self>) -> bool {
// Loop until we have a result or are done // Loop until we have a result or are done
loop { loop {
// Get the closest node we haven't processed yet if we're not done yet // Get the closest node we haven't processed yet if we're not done yet
let next_node = { let next_node = {
let mut ctx = self.context.lock(); let mut ctx = self.context.lock();
if self.clone().evaluate_done(&mut ctx) { if self.clone().evaluate_done(&mut ctx) {
break; break true;
} }
ctx.fanout_queue.next() ctx.fanout_queue.next()
}; };
// If we don't have a node to process, stop fanning out // If we don't have a node to process, stop fanning out
let Some(next_node) = next_node else { let Some(next_node) = next_node else {
break; break false;
}; };
// Do the call for this node // Do the call for this node
@ -161,7 +161,7 @@ where
Err(e) => { Err(e) => {
// Error happened, abort everything and return the error // Error happened, abort everything and return the error
self.context.lock().result = Some(Err(e)); self.context.lock().result = Some(Err(e));
return; break true;
} }
}; };
} }
@ -248,12 +248,18 @@ where
} }
} }
// Wait for them to complete // Wait for them to complete
timeout(timeout_ms, async { while unord.next().await.is_some() {} }) timeout(timeout_ms, async {
.await while let Some(is_done) = unord.next().await {
.into_timeout_or() if is_done {
.map(|_| { break;
// Finished, return whatever value we came up with }
self.context.lock().result.take().transpose() }
}) })
.await
.into_timeout_or()
.map(|_| {
// Finished, return whatever value we came up with
self.context.lock().result.take().transpose()
})
} }
} }

View File

@ -166,9 +166,13 @@ impl StorageManager {
match fanout_call.run().await { match fanout_call.run().await {
// If we don't finish in the timeout (too much time passed checking for consensus) // If we don't finish in the timeout (too much time passed checking for consensus)
TimeoutOr::Timeout => { TimeoutOr::Timeout => {
log_stor!(debug "GetValue Fanout Timeout");
// Return the best answer we've got // Return the best answer we've got
let ctx = context.lock(); let ctx = context.lock();
if ctx.value_count >= consensus_count {
log_stor!(debug "GetValue Fanout Timeout Consensus");
} else {
log_stor!(debug "GetValue Fanout Timeout Non-Consensus: {}", ctx.value_count);
}
Ok(SubkeyResult { Ok(SubkeyResult {
value: ctx.value.clone(), value: ctx.value.clone(),
descriptor: ctx.descriptor.clone(), descriptor: ctx.descriptor.clone(),

View File

@ -162,9 +162,14 @@ impl StorageManager {
match fanout_call.run().await { match fanout_call.run().await {
// If we don't finish in the timeout (too much time passed checking for consensus) // If we don't finish in the timeout (too much time passed checking for consensus)
TimeoutOr::Timeout => { TimeoutOr::Timeout => {
log_stor!(debug "SetValue Fanout Timeout");
// Return the best answer we've got // Return the best answer we've got
let ctx = context.lock(); let ctx = context.lock();
if ctx.set_count >= consensus_count {
log_stor!(debug "SetValue Fanout Timeout Consensus");
} else {
log_stor!(debug "SetValue Fanout Timeout Non-Consensus: {}", ctx.set_count);
}
Ok(ctx.value.clone()) Ok(ctx.value.clone())
} }
// If we finished with or without consensus (enough nodes returning the same value) // If we finished with or without consensus (enough nodes returning the same value)