This commit is contained in:
John Smith 2023-06-19 11:29:33 -04:00
parent 9e496e1625
commit ea651e526d
3 changed files with 85 additions and 65 deletions

View File

@ -11,6 +11,21 @@ where
pub type FanoutCallReturnType = Result<Option<Vec<PeerInfo>>, RPCError>;
/// Contains the logic for generically searing the Veilid routing table for a set of nodes and applying an
/// RPC operation that eventually converges on satisfactory result, or times out and returns some
/// unsatisfactory but acceptable result. Or something.
///
/// The algorithm starts by creating a 'closest_nodes' working set of the nodes closest to some node id currently in our routing table
/// If has pluggable callbacks:
/// * 'check_done' - for checking for a termination condition
/// * 'call_routine' - routine to call for each node that performs an operation and may add more nodes to our closest_nodes set
/// The algorithm is parameterized by:
/// * 'node_count' - the number of nodes to keep in the closest_nodes set
/// * 'fanout' - the number of concurrent calls being processed at the same time
/// The algorithm returns early if 'check_done' returns some value, or if an error is found during the process.
/// If the algorithm times out, a Timeout result is returned, however operations will still have been performed and a
/// timeout is not necessarily indicative of an algorithmic 'failure', just that no definitive stopping condition was found
/// in the given time
pub struct FanoutCall<R, F, C, D>
where
R: Unpin,
@ -68,6 +83,7 @@ where
let mut ctx = self.context.lock();
for nn in new_nodes {
// Make sure the new node isnt already in the list
let mut dup = false;
for cn in &ctx.closest_nodes {
if cn.same_entry(&nn) {
@ -75,9 +91,14 @@ where
}
}
if !dup {
// Add the new node if we haven't already called it before (only one call per node ever)
if let Some(key) = nn.node_ids().get(self.crypto_kind) {
if !ctx.called_nodes.contains(&key) {
ctx.closest_nodes.push(nn.clone());
}
}
}
}
self.routing_table
.sort_and_clean_closest_noderefs(self.node_id, &mut ctx.closest_nodes);
@ -145,7 +166,7 @@ where
self.clone().add_new_nodes(new_nodes);
}
Ok(None) => {
// Call failed, remove the node so it isn't included in the output
// Call failed, remove the node so it isn't considered as part of the fanout
self.clone().remove_node(next_node);
}
Err(e) => {

View File

@ -71,7 +71,7 @@ impl StorageManager {
)
.await?;
let sva = network_result_value_or_log!(vres => {
// Any other failures, just try the next node
// Any other failures, just try the next node and pretend this one never happened
return Ok(None);
});
@ -88,8 +88,7 @@ impl StorageManager {
subkey,
value.value_data(),
) {
// Validation failed, ignore this value
// Move to the next node
// Validation failed, ignore this value and pretend we never saw this node
return Ok(None);
}
@ -104,7 +103,7 @@ impl StorageManager {
} else {
// If the sequence number is older, or an equal sequence number,
// node should have not returned a value here.
// Skip this node's closer list because it is misbehaving
// Skip this node and it's closer list because it is misbehaving
return Ok(None);
}
}

View File

@ -1,74 +1,74 @@
# # Routing context veilid tests
# Routing context veilid tests
# import veilid
# import pytest
# import asyncio
# import json
# from . import *
import veilid
import pytest
import asyncio
import json
from . import *
# ##################################################################
# BOGUS_KEY = veilid.TypedKey.from_value(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.PublicKey.from_bytes(b' '))
##################################################################
BOGUS_KEY = veilid.TypedKey.from_value(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.PublicKey.from_bytes(b' '))
# @pytest.mark.asyncio
# async def test_get_dht_value_unopened(api_connection: veilid.VeilidAPI):
# rc = await api_connection.new_routing_context()
# async with rc:
# with pytest.raises(veilid.VeilidAPIError):
# out = await rc.get_dht_value(BOGUS_KEY, veilid.ValueSubkey(0), False)
@pytest.mark.asyncio
async def test_get_dht_value_unopened(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context()
async with rc:
with pytest.raises(veilid.VeilidAPIError):
out = await rc.get_dht_value(BOGUS_KEY, veilid.ValueSubkey(0), False)
# @pytest.mark.asyncio
# async def test_open_dht_record_nonexistent_no_writer(api_connection: veilid.VeilidAPI):
# rc = await api_connection.new_routing_context()
# async with rc:
# with pytest.raises(veilid.VeilidAPIError):
# out = await rc.open_dht_record(BOGUS_KEY, None)
@pytest.mark.asyncio
async def test_open_dht_record_nonexistent_no_writer(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context()
async with rc:
with pytest.raises(veilid.VeilidAPIError):
out = await rc.open_dht_record(BOGUS_KEY, None)
# @pytest.mark.asyncio
# async def test_close_dht_record_nonexistent(api_connection: veilid.VeilidAPI):
# rc = await api_connection.new_routing_context()
# async with rc:
# with pytest.raises(veilid.VeilidAPIError):
# await rc.close_dht_record(BOGUS_KEY)
@pytest.mark.asyncio
async def test_close_dht_record_nonexistent(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context()
async with rc:
with pytest.raises(veilid.VeilidAPIError):
await rc.close_dht_record(BOGUS_KEY)
# @pytest.mark.asyncio
# async def test_delete_dht_record_nonexistent(api_connection: veilid.VeilidAPI):
# rc = await api_connection.new_routing_context()
# async with rc:
# with pytest.raises(veilid.VeilidAPIError):
# await rc.delete_dht_record(BOGUS_KEY)
@pytest.mark.asyncio
async def test_delete_dht_record_nonexistent(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context()
async with rc:
with pytest.raises(veilid.VeilidAPIError):
await rc.delete_dht_record(BOGUS_KEY)
# @pytest.mark.asyncio
# async def test_create_delete_dht_record_simple(api_connection: veilid.VeilidAPI):
# rc = await api_connection.new_routing_context()
# async with rc:
# rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1))
# await rc.close_dht_record(rec.key)
# await rc.delete_dht_record(rec.key)
@pytest.mark.asyncio
async def test_create_delete_dht_record_simple(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context()
async with rc:
rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1))
await rc.close_dht_record(rec.key)
await rc.delete_dht_record(rec.key)
# @pytest.mark.asyncio
# async def test_get_dht_value_nonexistent(api_connection: veilid.VeilidAPI):
# rc = await api_connection.new_routing_context()
# async with rc:
# rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1))
# assert await rc.get_dht_value(rec.key, 0, False) == None
# await rc.close_dht_record(rec.key)
# await rc.delete_dht_record(rec.key)
@pytest.mark.asyncio
async def test_get_dht_value_nonexistent(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context()
async with rc:
rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1))
assert await rc.get_dht_value(rec.key, 0, False) == None
await rc.close_dht_record(rec.key)
await rc.delete_dht_record(rec.key)
# @pytest.mark.asyncio
# async def test_set_get_dht_value(api_connection: veilid.VeilidAPI):
# rc = await api_connection.new_routing_context()
# async with rc:
# rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1))
@pytest.mark.asyncio
async def test_set_get_dht_value(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context()
async with rc:
rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1))
# vd = await rc.set_dht_value(rec.key, 0, b"BLAH BLAH BLAH")
# assert vd != None
vd = await rc.set_dht_value(rec.key, 0, b"BLAH BLAH BLAH")
assert vd != None
# vd2 = await rc.get_dht_value(rec.key, 0, False)
# assert vd2 != None
vd2 = await rc.get_dht_value(rec.key, 0, False)
assert vd2 != None
# assert vd == vd2
assert vd == vd2
# await rc.close_dht_record(rec.key)
# await rc.delete_dht_record(rec.key)
await rc.close_dht_record(rec.key)
await rc.delete_dht_record(rec.key)