287 lines
9.7 KiB
Python
287 lines
9.7 KiB
Python
# Routing context veilid tests
|
|
|
|
import asyncio
|
|
import random
|
|
import sys
|
|
import os
|
|
|
|
import pytest
|
|
import veilid
|
|
from veilid.types import OperationId
|
|
|
|
from .conftest import server_info
|
|
|
|
##################################################################
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_routing_contexts(api_connection: veilid.VeilidAPI):
|
|
rc = await api_connection.new_routing_context()
|
|
async with rc:
|
|
pass
|
|
|
|
rc = await api_connection.new_routing_context()
|
|
async with rc:
|
|
rcp = await rc.with_privacy(release=False)
|
|
async with rcp:
|
|
pass
|
|
|
|
rc = await (await api_connection.new_routing_context()).with_sequencing(veilid.Sequencing.ENSURE_ORDERED)
|
|
async with rc:
|
|
pass
|
|
|
|
rc = await (await api_connection.new_routing_context()).with_custom_privacy(
|
|
veilid.SafetySelection.safe(
|
|
veilid.SafetySpec(None, 2, veilid.Stability.RELIABLE,
|
|
veilid.Sequencing.ENSURE_ORDERED)
|
|
))
|
|
await rc.release()
|
|
|
|
rc = await (await api_connection.new_routing_context()).with_custom_privacy(veilid.SafetySelection.unsafe(veilid.Sequencing.ENSURE_ORDERED))
|
|
await rc.release()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_routing_context_app_message_loopback():
|
|
# Seriously, mypy?
|
|
app_message_queue: asyncio.Queue = asyncio.Queue()
|
|
|
|
async def app_message_queue_update_callback(update: veilid.VeilidUpdate):
|
|
if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE:
|
|
await app_message_queue.put(update)
|
|
|
|
hostname, port = server_info()
|
|
api = await veilid.json_api_connect(
|
|
hostname, port, app_message_queue_update_callback
|
|
)
|
|
async with api:
|
|
# purge routes to ensure we start fresh
|
|
await api.debug("purge routes")
|
|
|
|
# make a routing context that uses a safety route
|
|
rc = await (await api.new_routing_context()).with_privacy()
|
|
async with rc:
|
|
|
|
# make a new local private route
|
|
prl, blob = await api.new_private_route()
|
|
|
|
# import it as a remote route as well so we can send to it
|
|
prr = await api.import_remote_private_route(blob)
|
|
|
|
# send an app message to our own private route
|
|
message = b"abcd1234"
|
|
await rc.app_message(prr, message)
|
|
|
|
# we should get the same message back
|
|
update: veilid.VeilidUpdate = await asyncio.wait_for(
|
|
app_message_queue.get(), timeout=10
|
|
)
|
|
|
|
assert isinstance(update.detail, veilid.VeilidAppMessage)
|
|
assert update.detail.message == message
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_routing_context_app_call_loopback():
|
|
app_call_queue: asyncio.Queue = asyncio.Queue()
|
|
|
|
async def app_call_queue_update_callback(update: veilid.VeilidUpdate):
|
|
if update.kind == veilid.VeilidUpdateKind.APP_CALL:
|
|
await app_call_queue.put(update)
|
|
|
|
hostname, port = server_info()
|
|
api = await veilid.json_api_connect(hostname, port, app_call_queue_update_callback)
|
|
async with api:
|
|
# purge routes to ensure we start fresh
|
|
await api.debug("purge routes")
|
|
|
|
# make a routing context that uses a safety route
|
|
rc = await (await api.new_routing_context()).with_privacy()
|
|
async with rc:
|
|
|
|
# make a new local private route
|
|
prl, blob = await api.new_private_route()
|
|
|
|
# import it as a remote route as well so we can send to it
|
|
prr = await api.import_remote_private_route(blob)
|
|
|
|
# send an app message to our own private route
|
|
request = b"abcd1234"
|
|
app_call_task = asyncio.create_task(
|
|
rc.app_call(prr, request), name="app call task"
|
|
)
|
|
|
|
# we should get the same request back
|
|
update: veilid.VeilidUpdate = await asyncio.wait_for(
|
|
app_call_queue.get(), timeout=10
|
|
)
|
|
appcall = update.detail
|
|
|
|
assert isinstance(appcall, veilid.VeilidAppCall)
|
|
assert appcall.message == request
|
|
|
|
# now we reply to the request
|
|
reply = b"qwer5678"
|
|
await api.app_call_reply(appcall.call_id, reply)
|
|
|
|
# now we should get the reply from the call
|
|
result = await app_call_task
|
|
assert result == reply
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_routing_context_app_message_loopback_big_packets():
|
|
|
|
app_message_queue: asyncio.Queue = asyncio.Queue()
|
|
|
|
global got_message
|
|
got_message = 0
|
|
|
|
async def app_message_queue_update_callback(update: veilid.VeilidUpdate):
|
|
if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE:
|
|
global got_message
|
|
got_message += 1
|
|
sys.stdout.write("{} ".format(got_message))
|
|
await app_message_queue.put(update)
|
|
|
|
sent_messages: set[bytes] = set()
|
|
|
|
hostname, port = server_info()
|
|
api = await veilid.json_api_connect(
|
|
hostname, port, app_message_queue_update_callback
|
|
)
|
|
async with api:
|
|
# purge routes to ensure we start fresh
|
|
await api.debug("purge routes")
|
|
|
|
# make a routing context that uses a safety route
|
|
rc = await (await (await api.new_routing_context()).with_privacy()).with_sequencing(veilid.Sequencing.ENSURE_ORDERED)
|
|
async with rc:
|
|
|
|
# make a new local private route
|
|
prl, blob = await api.new_private_route()
|
|
|
|
# import it as a remote route as well so we can send to it
|
|
prr = await api.import_remote_private_route(blob)
|
|
|
|
# do this test 1000 times
|
|
for _ in range(1000):
|
|
|
|
# send a random sized random app message to our own private route
|
|
message = random.randbytes(random.randint(0, 32768))
|
|
await rc.app_message(prr, message)
|
|
|
|
sent_messages.add(message)
|
|
|
|
# we should get the same messages back
|
|
print(len(sent_messages))
|
|
for n in range(len(sent_messages)):
|
|
print(n)
|
|
update: veilid.VeilidUpdate = await asyncio.wait_for(
|
|
app_message_queue.get(), timeout=10
|
|
)
|
|
assert isinstance(update.detail, veilid.VeilidAppMessage)
|
|
|
|
assert update.detail.message in sent_messages
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_routing_context_app_call_loopback_big_packets():
|
|
global got_message
|
|
got_message = 0
|
|
|
|
app_call_queue: asyncio.Queue = asyncio.Queue()
|
|
|
|
async def app_call_queue_update_callback(update: veilid.VeilidUpdate):
|
|
if update.kind == veilid.VeilidUpdateKind.APP_CALL:
|
|
await app_call_queue.put(update)
|
|
|
|
async def app_call_queue_task_handler(api: veilid.VeilidAPI):
|
|
while True:
|
|
update = await app_call_queue.get()
|
|
|
|
global got_message
|
|
got_message += 1
|
|
|
|
sys.stdout.write("{} ".format(got_message))
|
|
sys.stdout.flush()
|
|
|
|
await api.app_call_reply(update.detail.call_id, update.detail.message)
|
|
|
|
hostname, port = server_info()
|
|
api = await veilid.json_api_connect(
|
|
hostname, port, app_call_queue_update_callback
|
|
)
|
|
async with api:
|
|
# purge routes to ensure we start fresh
|
|
await api.debug("purge routes")
|
|
|
|
app_call_task = asyncio.create_task(
|
|
app_call_queue_task_handler(api), name="app call task"
|
|
)
|
|
|
|
# make a routing context that uses a safety route
|
|
rc = await (await (await api.new_routing_context()).with_privacy()).with_sequencing(veilid.Sequencing.ENSURE_ORDERED)
|
|
async with rc:
|
|
|
|
# make a new local private route
|
|
prl, blob = await api.new_private_route()
|
|
|
|
# import it as a remote route as well so we can send to it
|
|
prr = await api.import_remote_private_route(blob)
|
|
|
|
# do this test 10 times
|
|
for _ in range(10):
|
|
|
|
# send a random sized random app message to our own private route
|
|
message = random.randbytes(random.randint(0, 32768))
|
|
out_message = await rc.app_call(prr, message)
|
|
|
|
assert message == out_message
|
|
|
|
app_call_task.cancel()
|
|
|
|
|
|
@pytest.mark.skipif(os.getenv("NOSKIP") != "1", reason="unneeded test, only for performance check")
|
|
@pytest.mark.asyncio
|
|
async def test_routing_context_app_message_loopback_bandwidth():
|
|
|
|
app_message_queue: asyncio.Queue = asyncio.Queue()
|
|
|
|
async def app_message_queue_update_callback(update: veilid.VeilidUpdate):
|
|
if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE:
|
|
await app_message_queue.put(True)
|
|
|
|
hostname, port = server_info()
|
|
api = await veilid.json_api_connect(
|
|
hostname, port, app_message_queue_update_callback
|
|
)
|
|
async with api:
|
|
# purge routes to ensure we start fresh
|
|
await api.debug("purge routes")
|
|
|
|
# make a routing context that uses a safety route
|
|
# rc = await (await (await api.new_routing_context()).with_privacy()).with_sequencing(veilid.Sequencing.ENSURE_ORDERED)
|
|
# rc = await (await api.new_routing_context()).with_privacy()
|
|
rc = await api.new_routing_context()
|
|
async with rc:
|
|
|
|
# make a new local private route
|
|
prl, blob = await api.new_private_route()
|
|
|
|
# import it as a remote route as well so we can send to it
|
|
prr = await api.import_remote_private_route(blob)
|
|
|
|
# do this test 1000 times
|
|
message = random.randbytes(16384)
|
|
for _ in range(10000):
|
|
|
|
# send a random sized random app message to our own private route
|
|
await rc.app_message(prr, message)
|
|
|
|
# we should get the same number of messages back (not storing all that data)
|
|
for _ in range(10000):
|
|
await asyncio.wait_for(
|
|
app_message_queue.get(), timeout=10
|
|
)
|