From ea651e526d32c096657f0e62aa7aa1622570de9a Mon Sep 17 00:00:00 2001 From: John Smith Date: Mon, 19 Jun 2023 11:29:33 -0400 Subject: [PATCH] xfer --- veilid-core/src/rpc_processor/fanout_call.rs | 25 +++- veilid-core/src/storage_manager/set_value.rs | 7 +- veilid-python/tests/test_dht.py | 118 +++++++++---------- 3 files changed, 85 insertions(+), 65 deletions(-) diff --git a/veilid-core/src/rpc_processor/fanout_call.rs b/veilid-core/src/rpc_processor/fanout_call.rs index e55606f5..3486e8e2 100644 --- a/veilid-core/src/rpc_processor/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout_call.rs @@ -11,6 +11,21 @@ where pub type FanoutCallReturnType = Result>, RPCError>; +/// Contains the logic for generically searing the Veilid routing table for a set of nodes and applying an +/// RPC operation that eventually converges on satisfactory result, or times out and returns some +/// unsatisfactory but acceptable result. Or something. +/// +/// The algorithm starts by creating a 'closest_nodes' working set of the nodes closest to some node id currently in our routing table +/// If has pluggable callbacks: +/// * 'check_done' - for checking for a termination condition +/// * 'call_routine' - routine to call for each node that performs an operation and may add more nodes to our closest_nodes set +/// The algorithm is parameterized by: +/// * 'node_count' - the number of nodes to keep in the closest_nodes set +/// * 'fanout' - the number of concurrent calls being processed at the same time +/// The algorithm returns early if 'check_done' returns some value, or if an error is found during the process. +/// If the algorithm times out, a Timeout result is returned, however operations will still have been performed and a +/// timeout is not necessarily indicative of an algorithmic 'failure', just that no definitive stopping condition was found +/// in the given time pub struct FanoutCall where R: Unpin, @@ -68,6 +83,7 @@ where let mut ctx = self.context.lock(); for nn in new_nodes { + // Make sure the new node isnt already in the list let mut dup = false; for cn in &ctx.closest_nodes { if cn.same_entry(&nn) { @@ -75,7 +91,12 @@ where } } if !dup { - ctx.closest_nodes.push(nn.clone()); + // Add the new node if we haven't already called it before (only one call per node ever) + if let Some(key) = nn.node_ids().get(self.crypto_kind) { + if !ctx.called_nodes.contains(&key) { + ctx.closest_nodes.push(nn.clone()); + } + } } } @@ -145,7 +166,7 @@ where self.clone().add_new_nodes(new_nodes); } Ok(None) => { - // Call failed, remove the node so it isn't included in the output + // Call failed, remove the node so it isn't considered as part of the fanout self.clone().remove_node(next_node); } Err(e) => { diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index 2ea9ddf2..896a623d 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -71,7 +71,7 @@ impl StorageManager { ) .await?; let sva = network_result_value_or_log!(vres => { - // Any other failures, just try the next node + // Any other failures, just try the next node and pretend this one never happened return Ok(None); }); @@ -88,8 +88,7 @@ impl StorageManager { subkey, value.value_data(), ) { - // Validation failed, ignore this value - // Move to the next node + // Validation failed, ignore this value and pretend we never saw this node return Ok(None); } @@ -104,7 +103,7 @@ impl StorageManager { } else { // If the sequence number is older, or an equal sequence number, // node should have not returned a value here. - // Skip this node's closer list because it is misbehaving + // Skip this node and it's closer list because it is misbehaving return Ok(None); } } diff --git a/veilid-python/tests/test_dht.py b/veilid-python/tests/test_dht.py index d7f196fb..630a6d12 100644 --- a/veilid-python/tests/test_dht.py +++ b/veilid-python/tests/test_dht.py @@ -1,74 +1,74 @@ -# # Routing context veilid tests +# Routing context veilid tests -# import veilid -# import pytest -# import asyncio -# import json -# from . import * +import veilid +import pytest +import asyncio +import json +from . import * -# ################################################################## -# BOGUS_KEY = veilid.TypedKey.from_value(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.PublicKey.from_bytes(b' ')) +################################################################## +BOGUS_KEY = veilid.TypedKey.from_value(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.PublicKey.from_bytes(b' ')) -# @pytest.mark.asyncio -# async def test_get_dht_value_unopened(api_connection: veilid.VeilidAPI): -# rc = await api_connection.new_routing_context() -# async with rc: -# with pytest.raises(veilid.VeilidAPIError): -# out = await rc.get_dht_value(BOGUS_KEY, veilid.ValueSubkey(0), False) +@pytest.mark.asyncio +async def test_get_dht_value_unopened(api_connection: veilid.VeilidAPI): + rc = await api_connection.new_routing_context() + async with rc: + with pytest.raises(veilid.VeilidAPIError): + out = await rc.get_dht_value(BOGUS_KEY, veilid.ValueSubkey(0), False) -# @pytest.mark.asyncio -# async def test_open_dht_record_nonexistent_no_writer(api_connection: veilid.VeilidAPI): -# rc = await api_connection.new_routing_context() -# async with rc: -# with pytest.raises(veilid.VeilidAPIError): -# out = await rc.open_dht_record(BOGUS_KEY, None) +@pytest.mark.asyncio +async def test_open_dht_record_nonexistent_no_writer(api_connection: veilid.VeilidAPI): + rc = await api_connection.new_routing_context() + async with rc: + with pytest.raises(veilid.VeilidAPIError): + out = await rc.open_dht_record(BOGUS_KEY, None) -# @pytest.mark.asyncio -# async def test_close_dht_record_nonexistent(api_connection: veilid.VeilidAPI): -# rc = await api_connection.new_routing_context() -# async with rc: -# with pytest.raises(veilid.VeilidAPIError): -# await rc.close_dht_record(BOGUS_KEY) +@pytest.mark.asyncio +async def test_close_dht_record_nonexistent(api_connection: veilid.VeilidAPI): + rc = await api_connection.new_routing_context() + async with rc: + with pytest.raises(veilid.VeilidAPIError): + await rc.close_dht_record(BOGUS_KEY) -# @pytest.mark.asyncio -# async def test_delete_dht_record_nonexistent(api_connection: veilid.VeilidAPI): -# rc = await api_connection.new_routing_context() -# async with rc: -# with pytest.raises(veilid.VeilidAPIError): -# await rc.delete_dht_record(BOGUS_KEY) +@pytest.mark.asyncio +async def test_delete_dht_record_nonexistent(api_connection: veilid.VeilidAPI): + rc = await api_connection.new_routing_context() + async with rc: + with pytest.raises(veilid.VeilidAPIError): + await rc.delete_dht_record(BOGUS_KEY) -# @pytest.mark.asyncio -# async def test_create_delete_dht_record_simple(api_connection: veilid.VeilidAPI): -# rc = await api_connection.new_routing_context() -# async with rc: -# rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1)) -# await rc.close_dht_record(rec.key) -# await rc.delete_dht_record(rec.key) +@pytest.mark.asyncio +async def test_create_delete_dht_record_simple(api_connection: veilid.VeilidAPI): + rc = await api_connection.new_routing_context() + async with rc: + rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1)) + await rc.close_dht_record(rec.key) + await rc.delete_dht_record(rec.key) -# @pytest.mark.asyncio -# async def test_get_dht_value_nonexistent(api_connection: veilid.VeilidAPI): -# rc = await api_connection.new_routing_context() -# async with rc: -# rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1)) -# assert await rc.get_dht_value(rec.key, 0, False) == None -# await rc.close_dht_record(rec.key) -# await rc.delete_dht_record(rec.key) +@pytest.mark.asyncio +async def test_get_dht_value_nonexistent(api_connection: veilid.VeilidAPI): + rc = await api_connection.new_routing_context() + async with rc: + rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1)) + assert await rc.get_dht_value(rec.key, 0, False) == None + await rc.close_dht_record(rec.key) + await rc.delete_dht_record(rec.key) -# @pytest.mark.asyncio -# async def test_set_get_dht_value(api_connection: veilid.VeilidAPI): -# rc = await api_connection.new_routing_context() -# async with rc: -# rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1)) +@pytest.mark.asyncio +async def test_set_get_dht_value(api_connection: veilid.VeilidAPI): + rc = await api_connection.new_routing_context() + async with rc: + rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1)) -# vd = await rc.set_dht_value(rec.key, 0, b"BLAH BLAH BLAH") -# assert vd != None + vd = await rc.set_dht_value(rec.key, 0, b"BLAH BLAH BLAH") + assert vd != None -# vd2 = await rc.get_dht_value(rec.key, 0, False) -# assert vd2 != None + vd2 = await rc.get_dht_value(rec.key, 0, False) + assert vd2 != None -# assert vd == vd2 + assert vd == vd2 -# await rc.close_dht_record(rec.key) -# await rc.delete_dht_record(rec.key) + await rc.close_dht_record(rec.key) + await rc.delete_dht_record(rec.key)