diff --git a/veilid-python/tests/api.py b/veilid-python/tests/api.py new file mode 100644 index 00000000..d6c44ecb --- /dev/null +++ b/veilid-python/tests/api.py @@ -0,0 +1,70 @@ +import errno +import os +import re +from collections.abc import Callable +from functools import cache + +from veilid.json_api import _JsonVeilidAPI + +import veilid + +ERRNO_PATTERN = re.compile(r"errno (\d+)", re.IGNORECASE) + + +class VeilidTestConnectionError(Exception): + """The test client could not connect to the veilid-server.""" + + pass + + +@cache +def server_info() -> tuple[str, int]: + """Return the hostname and port of the test server.""" + VEILID_SERVER = os.getenv("VEILID_SERVER") + if VEILID_SERVER is None: + return "localhost", 5959 + + hostname, *rest = VEILID_SERVER.split(":") + if rest: + return hostname, int(rest[0]) + return hostname, 5959 + + +async def api_connector(callback: Callable) -> _JsonVeilidAPI: + """Return an API connection if possible. + + If the connection fails due to an inability to connect to the + server's socket, raise an easy-to-catch VeilidTestConnectionError. + """ + + hostname, port = server_info() + try: + return await veilid.json_api_connect(hostname, port, callback) + except OSError as exc: + # This is a little goofy. The underlying Python library handles + # connection errors in 2 ways, depending on how many connections + # it attempted to make: + # + # - If it only tried to connect to one IP address socket, the + # library propagates the one single OSError it got. + # + # - If it tried to connect to multiple sockets, perhaps because + # the hostname resolved to several addresses (e.g. "localhost" + # => 127.0.0.1 and ::1), then the library raises one exception + # with all the failure exception strings joined together. + + # If errno is set, it's the first kind of exception. Check that + # it's the code we expected. + if exc.errno is not None: + if exc.errno == errno.ECONNREFUSED: + raise VeilidTestConnectionError + raise + + # If not, use a regular expression to find all the errno values + # in the combined error string. Check that all of them have the + # code we're looking for. + errnos = ERRNO_PATTERN.findall(str(exc)) + if all(int(err) == errno.ECONNREFUSED for err in errnos): + raise VeilidTestConnectionError + + raise diff --git a/veilid-python/tests/conftest.py b/veilid-python/tests/conftest.py index 6cabbd26..80ea7846 100644 --- a/veilid-python/tests/conftest.py +++ b/veilid-python/tests/conftest.py @@ -1,35 +1,30 @@ -import os -from functools import cache +"""Common test fixtures.""" + from typing import AsyncGenerator +import pytest import pytest_asyncio -import veilid from veilid.json_api import _JsonVeilidAPI +import veilid + +from .api import VeilidTestConnectionError, api_connector + pytest_plugins = ("pytest_asyncio",) -@cache -def server_info() -> tuple[str, int]: - """Return the hostname and port of the test server.""" - VEILID_SERVER = os.getenv("VEILID_SERVER") - if VEILID_SERVER is None: - return "localhost", 5959 - - hostname, *rest = VEILID_SERVER.split(":") - if rest: - return hostname, int(rest[0]) - return hostname, 5959 - - async def simple_update_callback(update: veilid.VeilidUpdate): print(f"VeilidUpdate: {update}") @pytest_asyncio.fixture async def api_connection() -> AsyncGenerator[_JsonVeilidAPI, None]: - hostname, port = server_info() - api = await veilid.json_api_connect(hostname, port, simple_update_callback) + try: + api = await api_connector(simple_update_callback) + except VeilidTestConnectionError: + pytest.skip("Unable to connect to veilid-server.") + return + async with api: # purge routes to ensure we start fresh await api.debug("purge routes") diff --git a/veilid-python/tests/test_routing_context.py b/veilid-python/tests/test_routing_context.py index 7ddc21af..1d602b3b 100644 --- a/veilid-python/tests/test_routing_context.py +++ b/veilid-python/tests/test_routing_context.py @@ -1,15 +1,15 @@ # Routing context veilid tests import asyncio +import os import random import sys -import os import pytest -import veilid -from veilid.types import OperationId -from .conftest import server_info +import veilid + +from .api import VeilidTestConnectionError, api_connector ################################################################## @@ -26,18 +26,24 @@ async def test_routing_contexts(api_connection: veilid.VeilidAPI): async with rcp: pass - rc = await (await api_connection.new_routing_context()).with_sequencing(veilid.Sequencing.ENSURE_ORDERED) + rc = await (await api_connection.new_routing_context()).with_sequencing( + veilid.Sequencing.ENSURE_ORDERED + ) async with rc: pass rc = await (await api_connection.new_routing_context()).with_custom_privacy( veilid.SafetySelection.safe( - veilid.SafetySpec(None, 2, veilid.Stability.RELIABLE, - veilid.Sequencing.ENSURE_ORDERED) - )) + veilid.SafetySpec( + None, 2, veilid.Stability.RELIABLE, veilid.Sequencing.ENSURE_ORDERED + ) + ) + ) await rc.release() - rc = await (await api_connection.new_routing_context()).with_custom_privacy(veilid.SafetySelection.unsafe(veilid.Sequencing.ENSURE_ORDERED)) + rc = await (await api_connection.new_routing_context()).with_custom_privacy( + veilid.SafetySelection.unsafe(veilid.Sequencing.ENSURE_ORDERED) + ) await rc.release() @@ -50,10 +56,12 @@ async def test_routing_context_app_message_loopback(): if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE: await app_message_queue.put(update) - hostname, port = server_info() - api = await veilid.json_api_connect( - hostname, port, app_message_queue_update_callback - ) + try: + api = await api_connector(app_message_queue_update_callback) + except VeilidTestConnectionError: + pytest.skip("Unable to connect to veilid-server.") + return + async with api: # purge routes to ensure we start fresh await api.debug("purge routes") @@ -61,7 +69,6 @@ async def test_routing_context_app_message_loopback(): # make a routing context that uses a safety route rc = await (await api.new_routing_context()).with_privacy() async with rc: - # make a new local private route prl, blob = await api.new_private_route() @@ -89,8 +96,12 @@ async def test_routing_context_app_call_loopback(): if update.kind == veilid.VeilidUpdateKind.APP_CALL: await app_call_queue.put(update) - hostname, port = server_info() - api = await veilid.json_api_connect(hostname, port, app_call_queue_update_callback) + try: + api = await api_connector(app_call_queue_update_callback) + except VeilidTestConnectionError: + pytest.skip("Unable to connect to veilid-server.") + return + async with api: # purge routes to ensure we start fresh await api.debug("purge routes") @@ -98,7 +109,6 @@ async def test_routing_context_app_call_loopback(): # make a routing context that uses a safety route rc = await (await api.new_routing_context()).with_privacy() async with rc: - # make a new local private route prl, blob = await api.new_private_route() @@ -131,7 +141,6 @@ async def test_routing_context_app_call_loopback(): @pytest.mark.asyncio async def test_routing_context_app_message_loopback_big_packets(): - app_message_queue: asyncio.Queue = asyncio.Queue() global got_message @@ -146,18 +155,21 @@ async def test_routing_context_app_message_loopback_big_packets(): sent_messages: set[bytes] = set() - hostname, port = server_info() - api = await veilid.json_api_connect( - hostname, port, app_message_queue_update_callback - ) + try: + api = await api_connector(app_message_queue_update_callback) + except VeilidTestConnectionError: + pytest.skip("Unable to connect to veilid-server.") + return + async with api: # purge routes to ensure we start fresh await api.debug("purge routes") # make a routing context that uses a safety route - rc = await (await (await api.new_routing_context()).with_privacy()).with_sequencing(veilid.Sequencing.ENSURE_ORDERED) + rc = await ( + await (await api.new_routing_context()).with_privacy() + ).with_sequencing(veilid.Sequencing.ENSURE_ORDERED) async with rc: - # make a new local private route prl, blob = await api.new_private_route() @@ -166,7 +178,6 @@ async def test_routing_context_app_message_loopback_big_packets(): # do this test 1000 times for _ in range(1000): - # send a random sized random app message to our own private route message = random.randbytes(random.randint(0, 32768)) await rc.app_message(prr, message) @@ -208,10 +219,12 @@ async def test_routing_context_app_call_loopback_big_packets(): await api.app_call_reply(update.detail.call_id, update.detail.message) - hostname, port = server_info() - api = await veilid.json_api_connect( - hostname, port, app_call_queue_update_callback - ) + try: + api = await api_connector(app_call_queue_update_callback) + except VeilidTestConnectionError: + pytest.skip("Unable to connect to veilid-server.") + return + async with api: # purge routes to ensure we start fresh await api.debug("purge routes") @@ -221,9 +234,10 @@ async def test_routing_context_app_call_loopback_big_packets(): ) # make a routing context that uses a safety route - rc = await (await (await api.new_routing_context()).with_privacy()).with_sequencing(veilid.Sequencing.ENSURE_ORDERED) + rc = await ( + await (await api.new_routing_context()).with_privacy() + ).with_sequencing(veilid.Sequencing.ENSURE_ORDERED) async with rc: - # make a new local private route prl, blob = await api.new_private_route() @@ -232,7 +246,6 @@ async def test_routing_context_app_call_loopback_big_packets(): # do this test 10 times for _ in range(10): - # send a random sized random app message to our own private route message = random.randbytes(random.randint(0, 32768)) out_message = await rc.app_call(prr, message) @@ -242,20 +255,23 @@ async def test_routing_context_app_call_loopback_big_packets(): app_call_task.cancel() -@pytest.mark.skipif(os.getenv("NOSKIP") != "1", reason="unneeded test, only for performance check") +@pytest.mark.skipif( + os.getenv("NOSKIP") != "1", reason="unneeded test, only for performance check" +) @pytest.mark.asyncio async def test_routing_context_app_message_loopback_bandwidth(): - app_message_queue: asyncio.Queue = asyncio.Queue() async def app_message_queue_update_callback(update: veilid.VeilidUpdate): if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE: await app_message_queue.put(True) - hostname, port = server_info() - api = await veilid.json_api_connect( - hostname, port, app_message_queue_update_callback - ) + try: + api = await api_connector(app_message_queue_update_callback) + except VeilidTestConnectionError: + pytest.skip("Unable to connect to veilid-server.") + return + async with api: # purge routes to ensure we start fresh await api.debug("purge routes") @@ -265,7 +281,6 @@ async def test_routing_context_app_message_loopback_bandwidth(): # rc = await (await api.new_routing_context()).with_privacy() rc = await api.new_routing_context() async with rc: - # make a new local private route prl, blob = await api.new_private_route() @@ -275,12 +290,9 @@ async def test_routing_context_app_message_loopback_bandwidth(): # do this test 1000 times message = random.randbytes(16384) for _ in range(10000): - # send a random sized random app message to our own private route await rc.app_message(prr, message) # we should get the same number of messages back (not storing all that data) for _ in range(10000): - await asyncio.wait_for( - app_message_queue.get(), timeout=10 - ) + await asyncio.wait_for(app_message_queue.get(), timeout=10)