fanout debugging
This commit is contained in:
parent
fcd9772e00
commit
2c779b2257
@ -288,7 +288,11 @@ where
|
|||||||
}
|
}
|
||||||
// Wait for them to complete
|
// Wait for them to complete
|
||||||
timeout(timeout_ms, async {
|
timeout(timeout_ms, async {
|
||||||
while let Some(_) = unord.next().await {}
|
while let Some(_) = unord.next().await {
|
||||||
|
if self.clone().evaluate_done() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.into_timeout_or()
|
.into_timeout_or()
|
||||||
|
@ -13,7 +13,6 @@ struct OutboundGetValueContext {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl StorageManager {
|
impl StorageManager {
|
||||||
|
|
||||||
/// Perform a 'get value' query on the network
|
/// Perform a 'get value' query on the network
|
||||||
pub async fn outbound_get_value(
|
pub async fn outbound_get_value(
|
||||||
&self,
|
&self,
|
||||||
@ -74,15 +73,14 @@ impl StorageManager {
|
|||||||
if let Some(descriptor) = gva.answer.descriptor {
|
if let Some(descriptor) = gva.answer.descriptor {
|
||||||
let mut ctx = context.lock();
|
let mut ctx = context.lock();
|
||||||
if ctx.descriptor.is_none() && ctx.schema.is_none() {
|
if ctx.descriptor.is_none() && ctx.schema.is_none() {
|
||||||
ctx.schema =
|
ctx.schema = Some(descriptor.schema().map_err(RPCError::invalid_format)?);
|
||||||
Some(descriptor.schema().map_err(RPCError::invalid_format)?);
|
|
||||||
ctx.descriptor = Some(descriptor);
|
ctx.descriptor = Some(descriptor);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Keep the value if we got one and it is newer and it passes schema validation
|
// Keep the value if we got one and it is newer and it passes schema validation
|
||||||
if let Some(value) = gva.answer.value {
|
if let Some(value) = gva.answer.value {
|
||||||
log_stor!(debug "Got value back: len={}", value.value_data().data().len());
|
log_stor!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq());
|
||||||
let mut ctx = context.lock();
|
let mut ctx = context.lock();
|
||||||
|
|
||||||
// Ensure we have a schema and descriptor
|
// Ensure we have a schema and descriptor
|
||||||
@ -126,8 +124,7 @@ impl StorageManager {
|
|||||||
} else {
|
} else {
|
||||||
// If the sequence number is older, ignore it
|
// If the sequence number is older, ignore it
|
||||||
}
|
}
|
||||||
}
|
} else {
|
||||||
else {
|
|
||||||
// If we have no prior value, keep it
|
// If we have no prior value, keep it
|
||||||
ctx.value = Some(value);
|
ctx.value = Some(value);
|
||||||
// One node has shown us this value so far
|
// One node has shown us this value so far
|
||||||
@ -136,7 +133,7 @@ impl StorageManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Return peers if we have some
|
// Return peers if we have some
|
||||||
#[cfg(feature="network-result-extra")]
|
#[cfg(feature = "network-result-extra")]
|
||||||
log_stor!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len());
|
log_stor!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len());
|
||||||
|
|
||||||
Ok(Some(gva.answer.peers))
|
Ok(Some(gva.answer.peers))
|
||||||
@ -147,7 +144,8 @@ impl StorageManager {
|
|||||||
let check_done = |_closest_nodes: &[NodeRef]| {
|
let check_done = |_closest_nodes: &[NodeRef]| {
|
||||||
// If we have reached sufficient consensus, return done
|
// If we have reached sufficient consensus, return done
|
||||||
let ctx = context.lock();
|
let ctx = context.lock();
|
||||||
if ctx.value.is_some() && ctx.descriptor.is_some() && ctx.value_count >= consensus_count {
|
if ctx.value.is_some() && ctx.descriptor.is_some() && ctx.value_count >= consensus_count
|
||||||
|
{
|
||||||
return Some(());
|
return Some(());
|
||||||
}
|
}
|
||||||
None
|
None
|
||||||
@ -167,14 +165,31 @@ impl StorageManager {
|
|||||||
|
|
||||||
match fanout_call.run().await {
|
match fanout_call.run().await {
|
||||||
// If we don't finish in the timeout (too much time passed checking for consensus)
|
// If we don't finish in the timeout (too much time passed checking for consensus)
|
||||||
TimeoutOr::Timeout |
|
TimeoutOr::Timeout => {
|
||||||
|
log_stor!(debug "GetValue Fanout Timeout");
|
||||||
|
// Return the best answer we've got
|
||||||
|
let ctx = context.lock();
|
||||||
|
Ok(SubkeyResult {
|
||||||
|
value: ctx.value.clone(),
|
||||||
|
descriptor: ctx.descriptor.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
// If we finished with consensus (enough nodes returning the same value)
|
// If we finished with consensus (enough nodes returning the same value)
|
||||||
TimeoutOr::Value(Ok(Some(()))) |
|
TimeoutOr::Value(Ok(Some(()))) => {
|
||||||
|
log_stor!(debug "GetValue Fanout Consensus");
|
||||||
|
// Return the best answer we've got
|
||||||
|
let ctx = context.lock();
|
||||||
|
Ok(SubkeyResult {
|
||||||
|
value: ctx.value.clone(),
|
||||||
|
descriptor: ctx.descriptor.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
// If we finished without consensus (ran out of nodes before getting consensus)
|
// If we finished without consensus (ran out of nodes before getting consensus)
|
||||||
TimeoutOr::Value(Ok(None)) => {
|
TimeoutOr::Value(Ok(None)) => {
|
||||||
// Return the best answer we've got
|
// Return the best answer we've got
|
||||||
let ctx = context.lock();
|
let ctx = context.lock();
|
||||||
Ok(SubkeyResult{
|
log_stor!(debug "GetValue Fanout No Consensus: {}", ctx.value_count);
|
||||||
|
Ok(SubkeyResult {
|
||||||
value: ctx.value.clone(),
|
value: ctx.value.clone(),
|
||||||
descriptor: ctx.descriptor.clone(),
|
descriptor: ctx.descriptor.clone(),
|
||||||
})
|
})
|
||||||
@ -182,22 +197,31 @@ impl StorageManager {
|
|||||||
// Failed
|
// Failed
|
||||||
TimeoutOr::Value(Err(e)) => {
|
TimeoutOr::Value(Err(e)) => {
|
||||||
// If we finished with an error, return that
|
// If we finished with an error, return that
|
||||||
|
log_stor!(debug "GetValue Fanout Error: {}", e);
|
||||||
Err(e.into())
|
Err(e.into())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle a recieved 'Get Value' query
|
/// Handle a recieved 'Get Value' query
|
||||||
pub async fn inbound_get_value(&self, key: TypedKey, subkey: ValueSubkey, want_descriptor: bool) -> VeilidAPIResult<NetworkResult<SubkeyResult>> {
|
pub async fn inbound_get_value(
|
||||||
|
&self,
|
||||||
|
key: TypedKey,
|
||||||
|
subkey: ValueSubkey,
|
||||||
|
want_descriptor: bool,
|
||||||
|
) -> VeilidAPIResult<NetworkResult<SubkeyResult>> {
|
||||||
let mut inner = self.lock().await?;
|
let mut inner = self.lock().await?;
|
||||||
let res = match inner.handle_get_remote_value(key, subkey, want_descriptor).await {
|
let res = match inner
|
||||||
|
.handle_get_remote_value(key, subkey, want_descriptor)
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(res) => res,
|
Ok(res) => res,
|
||||||
Err(VeilidAPIError::Internal { message }) => {
|
Err(VeilidAPIError::Internal { message }) => {
|
||||||
apibail_internal!(message);
|
apibail_internal!(message);
|
||||||
},
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
return Ok(NetworkResult::invalid_message(e));
|
return Ok(NetworkResult::invalid_message(e));
|
||||||
},
|
}
|
||||||
};
|
};
|
||||||
Ok(NetworkResult::value(res))
|
Ok(NetworkResult::value(res))
|
||||||
}
|
}
|
||||||
|
@ -11,7 +11,6 @@ struct OutboundSetValueContext {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl StorageManager {
|
impl StorageManager {
|
||||||
|
|
||||||
/// Perform a 'set value' query on the network
|
/// Perform a 'set value' query on the network
|
||||||
pub async fn outbound_set_value(
|
pub async fn outbound_set_value(
|
||||||
&self,
|
&self,
|
||||||
@ -49,7 +48,6 @@ impl StorageManager {
|
|||||||
let context = context.clone();
|
let context = context.clone();
|
||||||
let descriptor = descriptor.clone();
|
let descriptor = descriptor.clone();
|
||||||
async move {
|
async move {
|
||||||
|
|
||||||
let send_descriptor = true; // xxx check if next_node needs the descriptor or not
|
let send_descriptor = true; // xxx check if next_node needs the descriptor or not
|
||||||
|
|
||||||
// get most recent value to send
|
// get most recent value to send
|
||||||
@ -81,6 +79,7 @@ impl StorageManager {
|
|||||||
|
|
||||||
// Keep the value if we got one and it is newer and it passes schema validation
|
// Keep the value if we got one and it is newer and it passes schema validation
|
||||||
if let Some(value) = sva.answer.value {
|
if let Some(value) = sva.answer.value {
|
||||||
|
log_stor!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq());
|
||||||
|
|
||||||
// Validate with schema
|
// Validate with schema
|
||||||
if !ctx.schema.check_subkey_value_data(
|
if !ctx.schema.check_subkey_value_data(
|
||||||
@ -101,14 +100,12 @@ impl StorageManager {
|
|||||||
// One node has shown us this value so far
|
// One node has shown us this value so far
|
||||||
ctx.value_count = 1;
|
ctx.value_count = 1;
|
||||||
} else {
|
} else {
|
||||||
// If the sequence number is older, or an equal sequence number,
|
// If the sequence number is older, or an equal sequence number,
|
||||||
// node should have not returned a value here.
|
// node should have not returned a value here.
|
||||||
// Skip this node and it's closer list because it is misbehaving
|
// Skip this node and it's closer list because it is misbehaving
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
}
|
} else {
|
||||||
else
|
|
||||||
{
|
|
||||||
// It was set on this node and no newer value was found and returned,
|
// It was set on this node and no newer value was found and returned,
|
||||||
// so increase our consensus count
|
// so increase our consensus count
|
||||||
ctx.value_count += 1;
|
ctx.value_count += 1;
|
||||||
@ -116,7 +113,7 @@ impl StorageManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Return peers if we have some
|
// Return peers if we have some
|
||||||
#[cfg(feature="network-result-extra")]
|
#[cfg(feature = "network-result-extra")]
|
||||||
log_stor!(debug "SetValue fanout call returned peers {}", sva.answer.peers.len());
|
log_stor!(debug "SetValue fanout call returned peers {}", sva.answer.peers.len());
|
||||||
|
|
||||||
Ok(Some(sva.answer.peers))
|
Ok(Some(sva.answer.peers))
|
||||||
@ -147,18 +144,30 @@ impl StorageManager {
|
|||||||
|
|
||||||
match fanout_call.run().await {
|
match fanout_call.run().await {
|
||||||
// If we don't finish in the timeout (too much time passed checking for consensus)
|
// If we don't finish in the timeout (too much time passed checking for consensus)
|
||||||
TimeoutOr::Timeout |
|
TimeoutOr::Timeout => {
|
||||||
|
log_stor!(debug "SetValue Fanout Timeout");
|
||||||
|
// Return the best answer we've got
|
||||||
|
let ctx = context.lock();
|
||||||
|
Ok(ctx.value.clone())
|
||||||
|
}
|
||||||
// If we finished with consensus (enough nodes returning the same value)
|
// If we finished with consensus (enough nodes returning the same value)
|
||||||
TimeoutOr::Value(Ok(Some(()))) |
|
TimeoutOr::Value(Ok(Some(()))) => {
|
||||||
|
log_stor!(debug "SetValue Fanout Consensus");
|
||||||
|
// Return the best answer we've got
|
||||||
|
let ctx = context.lock();
|
||||||
|
Ok(ctx.value.clone())
|
||||||
|
}
|
||||||
// If we finished without consensus (ran out of nodes before getting consensus)
|
// If we finished without consensus (ran out of nodes before getting consensus)
|
||||||
TimeoutOr::Value(Ok(None)) => {
|
TimeoutOr::Value(Ok(None)) => {
|
||||||
// Return the best answer we've got
|
// Return the best answer we've got
|
||||||
let ctx = context.lock();
|
let ctx = context.lock();
|
||||||
|
log_stor!(debug "SetValue Fanout No Consensus: {}", ctx.value_count);
|
||||||
Ok(ctx.value.clone())
|
Ok(ctx.value.clone())
|
||||||
}
|
}
|
||||||
// Failed
|
// Failed
|
||||||
TimeoutOr::Value(Err(e)) => {
|
TimeoutOr::Value(Err(e)) => {
|
||||||
// If we finished with an error, return that
|
// If we finished with an error, return that
|
||||||
|
log_stor!(debug "SetValue Fanout Error: {}", e);
|
||||||
Err(e.into())
|
Err(e.into())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -167,7 +176,13 @@ impl StorageManager {
|
|||||||
/// Handle a recieved 'Set Value' query
|
/// Handle a recieved 'Set Value' query
|
||||||
/// Returns a None if the value passed in was set
|
/// Returns a None if the value passed in was set
|
||||||
/// Returns a Some(current value) if the value was older and the current value was kept
|
/// Returns a Some(current value) if the value was older and the current value was kept
|
||||||
pub async fn inbound_set_value(&self, key: TypedKey, subkey: ValueSubkey, value: SignedValueData, descriptor: Option<SignedValueDescriptor>) -> VeilidAPIResult<NetworkResult<Option<SignedValueData>>> {
|
pub async fn inbound_set_value(
|
||||||
|
&self,
|
||||||
|
key: TypedKey,
|
||||||
|
subkey: ValueSubkey,
|
||||||
|
value: SignedValueData,
|
||||||
|
descriptor: Option<SignedValueDescriptor>,
|
||||||
|
) -> VeilidAPIResult<NetworkResult<Option<SignedValueData>>> {
|
||||||
let mut inner = self.lock().await?;
|
let mut inner = self.lock().await?;
|
||||||
|
|
||||||
// See if this is a remote or local value
|
// See if this is a remote or local value
|
||||||
@ -198,19 +213,23 @@ impl StorageManager {
|
|||||||
if let Some(descriptor) = descriptor {
|
if let Some(descriptor) = descriptor {
|
||||||
// Descriptor must match last one if it is provided
|
// Descriptor must match last one if it is provided
|
||||||
if descriptor.cmp_no_sig(&last_descriptor) != cmp::Ordering::Equal {
|
if descriptor.cmp_no_sig(&last_descriptor) != cmp::Ordering::Equal {
|
||||||
return Ok(NetworkResult::invalid_message("setvalue descriptor does not match last descriptor"));
|
return Ok(NetworkResult::invalid_message(
|
||||||
|
"setvalue descriptor does not match last descriptor",
|
||||||
|
));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Descriptor was not provided always go with last descriptor
|
// Descriptor was not provided always go with last descriptor
|
||||||
}
|
}
|
||||||
last_descriptor
|
last_descriptor
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
if let Some(descriptor) = descriptor {
|
if let Some(descriptor) = descriptor {
|
||||||
descriptor
|
descriptor
|
||||||
} else {
|
} else {
|
||||||
// No descriptor
|
// No descriptor
|
||||||
return Ok(NetworkResult::invalid_message("descriptor must be provided"));
|
return Ok(NetworkResult::invalid_message(
|
||||||
|
"descriptor must be provided",
|
||||||
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -228,16 +247,18 @@ impl StorageManager {
|
|||||||
let res = if is_local {
|
let res = if is_local {
|
||||||
inner.handle_set_local_value(key, subkey, value).await
|
inner.handle_set_local_value(key, subkey, value).await
|
||||||
} else {
|
} else {
|
||||||
inner.handle_set_remote_value(key, subkey, value, actual_descriptor).await
|
inner
|
||||||
|
.handle_set_remote_value(key, subkey, value, actual_descriptor)
|
||||||
|
.await
|
||||||
};
|
};
|
||||||
match res {
|
match res {
|
||||||
Ok(()) => {},
|
Ok(()) => {}
|
||||||
Err(VeilidAPIError::Internal { message }) => {
|
Err(VeilidAPIError::Internal { message }) => {
|
||||||
apibail_internal!(message);
|
apibail_internal!(message);
|
||||||
},
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
return Ok(NetworkResult::invalid_message(e));
|
return Ok(NetworkResult::invalid_message(e));
|
||||||
},
|
}
|
||||||
}
|
}
|
||||||
Ok(NetworkResult::value(None))
|
Ok(NetworkResult::value(None))
|
||||||
}
|
}
|
||||||
|
@ -117,15 +117,15 @@ Future<VeilidConfig> getDefaultVeilidConfig(String programName) async =>
|
|||||||
),
|
),
|
||||||
dht: VeilidConfigDHT(
|
dht: VeilidConfigDHT(
|
||||||
resolveNodeTimeoutMs: 10000,
|
resolveNodeTimeoutMs: 10000,
|
||||||
resolveNodeCount: 20,
|
resolveNodeCount: 1,
|
||||||
resolveNodeFanout: 3,
|
resolveNodeFanout: 4,
|
||||||
maxFindNodeCount: 20,
|
maxFindNodeCount: 20,
|
||||||
getValueTimeoutMs: 10000,
|
getValueTimeoutMs: 10000,
|
||||||
getValueCount: 20,
|
getValueCount: 3,
|
||||||
getValueFanout: 3,
|
getValueFanout: 4,
|
||||||
setValueTimeoutMs: 10000,
|
setValueTimeoutMs: 10000,
|
||||||
setValueCount: 20,
|
setValueCount: 4,
|
||||||
setValueFanout: 5,
|
setValueFanout: 6,
|
||||||
minPeerCount: 20,
|
minPeerCount: 20,
|
||||||
minPeerRefreshTimeMs: 60000,
|
minPeerRefreshTimeMs: 60000,
|
||||||
validateDialInfoReceiptTimeMs: 2000,
|
validateDialInfoReceiptTimeMs: 2000,
|
||||||
|
Loading…
Reference in New Issue
Block a user