veilid/veilid-python/tests/test_dht.py

75 lines
2.9 KiB
Python

# # Routing context veilid tests
# import veilid
# import pytest
# import asyncio
# import json
# from . import *
# ##################################################################
# BOGUS_KEY = veilid.TypedKey.from_value(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.PublicKey.from_bytes(b' '))
# @pytest.mark.asyncio
# async def test_get_dht_value_unopened(api_connection: veilid.VeilidAPI):
# rc = await api_connection.new_routing_context()
# async with rc:
# with pytest.raises(veilid.VeilidAPIError):
# out = await rc.get_dht_value(BOGUS_KEY, veilid.ValueSubkey(0), False)
# @pytest.mark.asyncio
# async def test_open_dht_record_nonexistent_no_writer(api_connection: veilid.VeilidAPI):
# rc = await api_connection.new_routing_context()
# async with rc:
# with pytest.raises(veilid.VeilidAPIError):
# out = await rc.open_dht_record(BOGUS_KEY, None)
# @pytest.mark.asyncio
# async def test_close_dht_record_nonexistent(api_connection: veilid.VeilidAPI):
# rc = await api_connection.new_routing_context()
# async with rc:
# with pytest.raises(veilid.VeilidAPIError):
# await rc.close_dht_record(BOGUS_KEY)
# @pytest.mark.asyncio
# async def test_delete_dht_record_nonexistent(api_connection: veilid.VeilidAPI):
# rc = await api_connection.new_routing_context()
# async with rc:
# with pytest.raises(veilid.VeilidAPIError):
# await rc.delete_dht_record(BOGUS_KEY)
# @pytest.mark.asyncio
# async def test_create_delete_dht_record_simple(api_connection: veilid.VeilidAPI):
# rc = await api_connection.new_routing_context()
# async with rc:
# rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1))
# await rc.close_dht_record(rec.key)
# await rc.delete_dht_record(rec.key)
# @pytest.mark.asyncio
# async def test_get_dht_value_nonexistent(api_connection: veilid.VeilidAPI):
# rc = await api_connection.new_routing_context()
# async with rc:
# rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1))
# assert await rc.get_dht_value(rec.key, 0, False) == None
# await rc.close_dht_record(rec.key)
# await rc.delete_dht_record(rec.key)
# @pytest.mark.asyncio
# async def test_set_get_dht_value(api_connection: veilid.VeilidAPI):
# rc = await api_connection.new_routing_context()
# async with rc:
# rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1))
# vd = await rc.set_dht_value(rec.key, 0, b"BLAH BLAH BLAH")
# assert vd != None
# vd2 = await rc.get_dht_value(rec.key, 0, False)
# assert vd2 != None
# assert vd == vd2
# await rc.close_dht_record(rec.key)
# await rc.delete_dht_record(rec.key)