2023-06-20 02:35:49 +00:00
|
|
|
# Routing context veilid tests
|
2023-06-18 22:47:39 +00:00
|
|
|
|
2023-06-20 02:35:49 +00:00
|
|
|
import veilid
|
|
|
|
import pytest
|
|
|
|
import asyncio
|
|
|
|
import json
|
|
|
|
from . import *
|
2023-06-18 22:47:39 +00:00
|
|
|
|
2023-06-20 02:35:49 +00:00
|
|
|
##################################################################
|
|
|
|
BOGUS_KEY = veilid.TypedKey.from_value(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.PublicKey.from_bytes(b' '))
|
2023-06-18 22:47:39 +00:00
|
|
|
|
2023-06-20 02:35:49 +00:00
|
|
|
@pytest.mark.asyncio
|
|
|
|
async def test_get_dht_value_unopened(api_connection: veilid.VeilidAPI):
|
|
|
|
rc = await api_connection.new_routing_context()
|
|
|
|
async with rc:
|
|
|
|
with pytest.raises(veilid.VeilidAPIError):
|
|
|
|
out = await rc.get_dht_value(BOGUS_KEY, veilid.ValueSubkey(0), False)
|
2023-06-18 22:47:39 +00:00
|
|
|
|
|
|
|
|
2023-06-20 02:35:49 +00:00
|
|
|
@pytest.mark.asyncio
|
|
|
|
async def test_open_dht_record_nonexistent_no_writer(api_connection: veilid.VeilidAPI):
|
|
|
|
rc = await api_connection.new_routing_context()
|
|
|
|
async with rc:
|
|
|
|
with pytest.raises(veilid.VeilidAPIError):
|
|
|
|
out = await rc.open_dht_record(BOGUS_KEY, None)
|
2023-06-18 22:47:39 +00:00
|
|
|
|
2023-06-20 02:35:49 +00:00
|
|
|
@pytest.mark.asyncio
|
|
|
|
async def test_close_dht_record_nonexistent(api_connection: veilid.VeilidAPI):
|
|
|
|
rc = await api_connection.new_routing_context()
|
|
|
|
async with rc:
|
|
|
|
with pytest.raises(veilid.VeilidAPIError):
|
|
|
|
await rc.close_dht_record(BOGUS_KEY)
|
2023-06-18 22:47:39 +00:00
|
|
|
|
2023-06-20 02:35:49 +00:00
|
|
|
@pytest.mark.asyncio
|
|
|
|
async def test_delete_dht_record_nonexistent(api_connection: veilid.VeilidAPI):
|
|
|
|
rc = await api_connection.new_routing_context()
|
|
|
|
async with rc:
|
|
|
|
with pytest.raises(veilid.VeilidAPIError):
|
|
|
|
await rc.delete_dht_record(BOGUS_KEY)
|
2023-06-18 22:47:39 +00:00
|
|
|
|
2023-06-20 02:35:49 +00:00
|
|
|
@pytest.mark.asyncio
|
|
|
|
async def test_create_delete_dht_record_simple(api_connection: veilid.VeilidAPI):
|
|
|
|
rc = await api_connection.new_routing_context()
|
|
|
|
async with rc:
|
|
|
|
rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1))
|
|
|
|
await rc.close_dht_record(rec.key)
|
|
|
|
await rc.delete_dht_record(rec.key)
|
2023-06-18 22:47:39 +00:00
|
|
|
|
2023-06-20 02:35:49 +00:00
|
|
|
@pytest.mark.asyncio
|
|
|
|
async def test_get_dht_value_nonexistent(api_connection: veilid.VeilidAPI):
|
|
|
|
rc = await api_connection.new_routing_context()
|
|
|
|
async with rc:
|
|
|
|
rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1))
|
|
|
|
assert await rc.get_dht_value(rec.key, 0, False) == None
|
|
|
|
await rc.close_dht_record(rec.key)
|
|
|
|
await rc.delete_dht_record(rec.key)
|
2023-06-19 00:57:51 +00:00
|
|
|
|
2023-06-20 02:35:49 +00:00
|
|
|
@pytest.mark.asyncio
|
|
|
|
async def test_set_get_dht_value(api_connection: veilid.VeilidAPI):
|
|
|
|
rc = await api_connection.new_routing_context()
|
|
|
|
async with rc:
|
|
|
|
rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1))
|
2023-06-19 00:57:51 +00:00
|
|
|
|
2023-06-20 02:35:49 +00:00
|
|
|
vd = await rc.set_dht_value(rec.key, 0, b"BLAH BLAH BLAH")
|
|
|
|
assert vd != None
|
2023-06-19 00:57:51 +00:00
|
|
|
|
2023-06-20 02:35:49 +00:00
|
|
|
vd2 = await rc.get_dht_value(rec.key, 0, False)
|
|
|
|
assert vd2 != None
|
2023-06-19 00:57:51 +00:00
|
|
|
|
2023-06-21 03:46:39 +00:00
|
|
|
print("vd: {}", vd.__dict__)
|
|
|
|
print("vd2: {}", vd2.__dict__)
|
|
|
|
|
2023-06-20 02:35:49 +00:00
|
|
|
assert vd == vd2
|
2023-06-19 00:57:51 +00:00
|
|
|
|
2023-06-20 02:35:49 +00:00
|
|
|
await rc.close_dht_record(rec.key)
|
|
|
|
await rc.delete_dht_record(rec.key)
|
2023-06-19 00:57:51 +00:00
|
|
|
|