Merge branch 'tek/python-chat-demo' into 'main'

Python chat demo

A Python based demonstration app which facilitates a 1:1 conversation between two parties. Routing privacy is enabled in this initial version.

See merge request veilid/veilid!106
This commit is contained in:
TC 2023-08-02 20:19:18 +00:00
commit 9a65f1b1bf
14 changed files with 399 additions and 164 deletions

195
veilid-python/demo/chat.py Executable file
View File

@ -0,0 +1,195 @@
#!/usr/bin/env python
"""A simple chat server using Veilid's DHT."""
import argparse
import asyncio
import sys
import config
import veilid
QUIT = b"QUIT"
async def noop_callback(*args, **kwargs):
"""In the real world, we'd use this to process interesting incoming events."""
return
async def chatter(rc: veilid.api.RoutingContext, key: str, send_channel: int, recv_channel: int):
"""Read input, write it to the DHT, and print the response from the DHT."""
last_seq = -1
send_subkey = veilid.types.ValueSubkey(send_channel)
recv_subkey = veilid.types.ValueSubkey(recv_channel)
# Prime the pumps. Especially when starting the conversation, this
# causes the DHT key to propagate to the network.
await rc.set_dht_value(key, send_subkey, b"Hello from the world!")
while True:
try:
msg = input("SEND> ")
except EOFError:
# Cat got your tongue? Hang up.
print("Closing the chat.")
await rc.set_dht_value(key, send_subkey, QUIT)
return
# Write the input message to the DHT key.
await rc.set_dht_value(key, send_subkey, msg.encode())
# In the real world, don't do this. People may tease you for it.
# This is meant to be easy to understand for demonstration
# purposes, not a great pattern. Instead, you'd want to use the
# callback function to handle events asynchronously.
while True:
# Try to get an updated version of the receiving subkey.
resp = await rc.get_dht_value(key, recv_subkey, True)
if resp is None:
continue
# If the other party hasn't sent a newer message, try again.
if resp.seq == last_seq:
continue
if resp.data == QUIT:
print("Other end closed the chat.")
return
print(f"RECV< {resp.data.decode()}")
last_seq = resp.seq
break
async def start(host: str, port: int, name: str):
"""Begin a conversation with a friend."""
conn = await veilid.json_api_connect(host, port, noop_callback)
keys = config.read_keys()
my_key = veilid.KeyPair(keys["self"])
members = [
veilid.types.DHTSchemaSMPLMember(my_key.key(), 1),
veilid.types.DHTSchemaSMPLMember(keys["peers"][name], 1),
]
router = await(await conn.new_routing_context()).with_privacy()
async with router:
rec = await router.create_dht_record(veilid.DHTSchema.smpl(0, members))
print(f"New chat key: {rec.key}")
print("Give that to your friend!")
# Close this key first. We'll reopen it for writing with our saved key.
await router.close_dht_record(rec.key)
await router.open_dht_record(rec.key, veilid.KeyPair(keys["self"]))
try:
# Write to the 1st subkey and read from the 2nd.
await chatter(router, rec.key, 0, 1)
finally:
await router.close_dht_record(rec.key)
await router.delete_dht_record(rec.key)
async def respond(host: str, port: int, key: str):
"""Reply to a friend's chat."""
conn = await veilid.json_api_connect(host, port, noop_callback)
keys = config.read_keys()
my_key = veilid.KeyPair(keys["self"])
router = await(await conn.new_routing_context()).with_privacy()
async with router:
await router.open_dht_record(key, my_key)
# As the responder, we're writing to the 2nd subkey and reading from the 1st.
await chatter(router, key, 1, 0)
async def keygen(host: str, port: int):
"""Generate a keypair."""
conn = await veilid.json_api_connect(host, port, noop_callback)
crypto_system = await conn.get_crypto_system(veilid.CryptoKind.CRYPTO_KIND_VLD0)
async with crypto_system:
my_key = await crypto_system.generate_key_pair()
keys = config.read_keys()
if keys["self"]:
print("You already have a keypair.")
sys.exit(1)
keys["self"] = my_key
config.write_keys(keys)
print(f"Your new public key is {my_key.key()}. Share it with your friends!")
async def add_friend(host: str, port: int, name: str, pubkey: str):
"""Add a friend's public key."""
keys = config.read_keys()
keys["peers"][name] = pubkey
config.write_keys(keys)
async def clean(host: str, port: int, key: str):
"""Delete a DHT key."""
conn = await veilid.json_api_connect(host, port, noop_callback)
router = await(await conn.new_routing_context()).with_privacy()
async with router:
await router.close_dht_record(key)
await router.delete_dht_record(key)
def handle_command_line(arglist: list[str]):
"""Process the command line.
This isn't the interesting part."""
parser = argparse.ArgumentParser(description="Veilid chat demonstration")
parser.add_argument("--host", default="localhost", help="Address of the Veilid server host.")
parser.add_argument("--port", type=int, default=5959, help="Port of the Veilid server.")
subparsers = parser.add_subparsers(required=True)
cmd_start = subparsers.add_parser("start", help=start.__doc__)
cmd_start.add_argument("name", help="Your friend's name")
cmd_start.set_defaults(func=start)
cmd_respond = subparsers.add_parser("respond", help=respond.__doc__)
cmd_respond.add_argument("key", help="The chat's DHT key")
cmd_respond.set_defaults(func=respond)
cmd_keygen = subparsers.add_parser("keygen", help=keygen.__doc__)
cmd_keygen.set_defaults(func=keygen)
cmd_add_friend = subparsers.add_parser("add-friend", help=add_friend.__doc__)
cmd_add_friend.add_argument("name", help="Your friend's name")
cmd_add_friend.add_argument("pubkey", help="Your friend's public key")
cmd_add_friend.set_defaults(func=add_friend)
cmd_clean = subparsers.add_parser("clean", help=clean.__doc__)
cmd_clean.add_argument("key", help="DHT key to delete")
cmd_clean.set_defaults(func=clean)
args = parser.parse_args(arglist)
kwargs = args.__dict__
func = kwargs.pop("func")
asyncio.run(func(**kwargs))
if __name__ == "__main__":
handle_command_line(sys.argv[1:])

View File

@ -0,0 +1,26 @@
"""Load and save configuration."""
import json
from pathlib import Path
KEYFILE = Path(".demokeys")
def read_keys() -> dict:
"""Load the stored keys from disk."""
try:
keydata = KEYFILE.read_text()
except FileNotFoundError:
return {
"self": None,
"peers": {},
}
return json.loads(keydata)
def write_keys(keydata: dict):
"""Save the keys to disk."""
KEYFILE.write_text(json.dumps(keydata, indent=2))

View File

@ -17,3 +17,9 @@ pytest-asyncio = "^0.21.0"
[build-system] [build-system]
requires = ["poetry-core"] requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api" build-backend = "poetry.core.masonry.api"
[tool.black]
line-length = 99
[tool.mypy]
check_untyped_defs = true

View File

@ -27,9 +27,7 @@ async def test_get_node_id(api_connection: veilid.VeilidAPI):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_fail_connect(): async def test_fail_connect():
with pytest.raises(socket.gaierror) as exc: with pytest.raises(socket.gaierror) as exc:
await veilid.json_api_connect( await veilid.json_api_connect("fuahwelifuh32luhwafluehawea", 1, simple_update_callback)
"fuahwelifuh32luhwafluehawea", 1, simple_update_callback
)
assert exc.value.errno == socket.EAI_NONAME assert exc.value.errno == socket.EAI_NONAME

View File

@ -12,14 +12,13 @@ async def test_best_crypto_system(api_connection: veilid.VeilidAPI):
async with cs: async with cs:
assert await cs.default_salt_length() == 16 assert await cs.default_salt_length() == 16
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_crypto_system(api_connection: veilid.VeilidAPI): async def test_get_crypto_system(api_connection: veilid.VeilidAPI):
cs: CryptoSystem = await api_connection.get_crypto_system( cs: CryptoSystem = await api_connection.get_crypto_system(veilid.CryptoKind.CRYPTO_KIND_VLD0)
veilid.CryptoKind.CRYPTO_KIND_VLD0
)
async with cs: async with cs:
assert await cs.default_salt_length() == 16 assert await cs.default_salt_length() == 16
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_crypto_system_invalid(api_connection: veilid.VeilidAPI): async def test_get_crypto_system_invalid(api_connection: veilid.VeilidAPI):
@ -45,4 +44,3 @@ async def test_hash_and_verify_password(api_connection: veilid.VeilidAPI):
# Password mismatch # Password mismatch
phash2 = await cs.hash_password(b"abc1234", salt) phash2 = await cs.hash_password(b"abc1234", salt)
assert not await cs.verify_password(b"abc12345", phash) assert not await cs.verify_password(b"abc12345", phash)

View File

@ -47,7 +47,9 @@ async def test_delete_dht_record_nonexistent(api_connection: veilid.VeilidAPI):
async def test_create_delete_dht_record_simple(api_connection: veilid.VeilidAPI): async def test_create_delete_dht_record_simple(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context() rc = await api_connection.new_routing_context()
async with rc: async with rc:
rec = await rc.create_dht_record(veilid.DHTSchema.dflt(1), veilid.CryptoKind.CRYPTO_KIND_VLD0) rec = await rc.create_dht_record(
veilid.DHTSchema.dflt(1), veilid.CryptoKind.CRYPTO_KIND_VLD0
)
await rc.close_dht_record(rec.key) await rc.close_dht_record(rec.key)
await rc.delete_dht_record(rec.key) await rc.delete_dht_record(rec.key)

View File

@ -34,9 +34,7 @@ async def test_routing_contexts(api_connection: veilid.VeilidAPI):
rc = await (await api_connection.new_routing_context()).with_custom_privacy( rc = await (await api_connection.new_routing_context()).with_custom_privacy(
veilid.SafetySelection.safe( veilid.SafetySelection.safe(
veilid.SafetySpec( veilid.SafetySpec(None, 2, veilid.Stability.RELIABLE, veilid.Sequencing.ENSURE_ORDERED)
None, 2, veilid.Stability.RELIABLE, veilid.Sequencing.ENSURE_ORDERED
)
) )
) )
await rc.release() await rc.release()
@ -117,14 +115,10 @@ async def test_routing_context_app_call_loopback():
# send an app message to our own private route # send an app message to our own private route
request = b"abcd1234" request = b"abcd1234"
app_call_task = asyncio.create_task( app_call_task = asyncio.create_task(rc.app_call(prr, request), name="app call task")
rc.app_call(prr, request), name="app call task"
)
# we should get the same request back # we should get the same request back
update: veilid.VeilidUpdate = await asyncio.wait_for( update: veilid.VeilidUpdate = await asyncio.wait_for(app_call_queue.get(), timeout=10)
app_call_queue.get(), timeout=10
)
appcall = update.detail appcall = update.detail
assert isinstance(appcall, veilid.VeilidAppCall) assert isinstance(appcall, veilid.VeilidAppCall)
@ -164,9 +158,9 @@ async def test_routing_context_app_message_loopback_big_packets():
await api.debug("purge routes") await api.debug("purge routes")
# make a routing context that uses a safety route # make a routing context that uses a safety route
rc = await ( rc = await (await (await api.new_routing_context()).with_privacy()).with_sequencing(
await (await api.new_routing_context()).with_privacy() veilid.Sequencing.ENSURE_ORDERED
).with_sequencing(veilid.Sequencing.ENSURE_ORDERED) )
async with rc: async with rc:
# make a new local private route # make a new local private route
prl, blob = await api.new_private_route() prl, blob = await api.new_private_route()
@ -223,14 +217,12 @@ async def test_routing_context_app_call_loopback_big_packets():
# purge routes to ensure we start fresh # purge routes to ensure we start fresh
await api.debug("purge routes") await api.debug("purge routes")
app_call_task = asyncio.create_task( app_call_task = asyncio.create_task(app_call_queue_task_handler(api), name="app call task")
app_call_queue_task_handler(api), name="app call task"
)
# make a routing context that uses a safety route # make a routing context that uses a safety route
rc = await ( rc = await (await (await api.new_routing_context()).with_privacy()).with_sequencing(
await (await api.new_routing_context()).with_privacy() veilid.Sequencing.ENSURE_ORDERED
).with_sequencing(veilid.Sequencing.ENSURE_ORDERED) )
async with rc: async with rc:
# make a new local private route # make a new local private route
prl, blob = await api.new_private_route() prl, blob = await api.new_private_route()
@ -249,9 +241,7 @@ async def test_routing_context_app_call_loopback_big_packets():
app_call_task.cancel() app_call_task.cancel()
@pytest.mark.skipif( @pytest.mark.skipif(os.getenv("NOSKIP") != "1", reason="unneeded test, only for performance check")
os.getenv("NOSKIP") != "1", reason="unneeded test, only for performance check"
)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_routing_context_app_message_loopback_bandwidth(): async def test_routing_context_app_message_loopback_bandwidth():
app_message_queue: asyncio.Queue = asyncio.Queue() app_message_queue: asyncio.Queue = asyncio.Queue()

View File

@ -8,6 +8,7 @@ from veilid.api import CryptoSystem
TEST_DB = "__pytest_db" TEST_DB = "__pytest_db"
TEST_NONEXISTENT_DB = "__pytest_nonexistent_db" TEST_NONEXISTENT_DB = "__pytest_nonexistent_db"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_delete_table_db_nonexistent(api_connection: veilid.VeilidAPI): async def test_delete_table_db_nonexistent(api_connection: veilid.VeilidAPI):
deleted = await api_connection.delete_table_db(TEST_NONEXISTENT_DB) deleted = await api_connection.delete_table_db(TEST_NONEXISTENT_DB)
@ -25,11 +26,12 @@ async def test_open_delete_table_db(api_connection: veilid.VeilidAPI):
with pytest.raises(veilid.VeilidAPIErrorGeneric) as exc: with pytest.raises(veilid.VeilidAPIErrorGeneric) as exc:
await api_connection.delete_table_db(TEST_DB) await api_connection.delete_table_db(TEST_DB)
# drop the db # drop the db
# now delete should succeed # now delete should succeed
deleted = await api_connection.delete_table_db(TEST_DB) deleted = await api_connection.delete_table_db(TEST_DB)
assert deleted assert deleted
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_open_twice_table_db(api_connection: veilid.VeilidAPI): async def test_open_twice_table_db(api_connection: veilid.VeilidAPI):
# delete test db if it exists # delete test db if it exists
@ -37,12 +39,12 @@ async def test_open_twice_table_db(api_connection: veilid.VeilidAPI):
tdb = await api_connection.open_table_db(TEST_DB, 1) tdb = await api_connection.open_table_db(TEST_DB, 1)
tdb2 = await api_connection.open_table_db(TEST_DB, 1) tdb2 = await api_connection.open_table_db(TEST_DB, 1)
# delete should fail because open # delete should fail because open
with pytest.raises(veilid.VeilidAPIErrorGeneric) as exc: with pytest.raises(veilid.VeilidAPIErrorGeneric) as exc:
await api_connection.delete_table_db(TEST_DB) await api_connection.delete_table_db(TEST_DB)
await tdb.release() await tdb.release()
# delete should fail because open # delete should fail because open
with pytest.raises(veilid.VeilidAPIErrorGeneric) as exc: with pytest.raises(veilid.VeilidAPIErrorGeneric) as exc:
await api_connection.delete_table_db(TEST_DB) await api_connection.delete_table_db(TEST_DB)
@ -62,7 +64,7 @@ async def test_open_twice_table_db_store_load(api_connection: veilid.VeilidAPI):
async with tdb: async with tdb:
tdb2 = await api_connection.open_table_db(TEST_DB, 1) tdb2 = await api_connection.open_table_db(TEST_DB, 1)
async with tdb2: async with tdb2:
# store into first db copy # store into first db copy
await tdb.store(b"asdf", b"1234") await tdb.store(b"asdf", b"1234")
# load from second db copy # load from second db copy
assert await tdb.load(b"asdf") == b"1234" assert await tdb.load(b"asdf") == b"1234"
@ -71,6 +73,7 @@ async def test_open_twice_table_db_store_load(api_connection: veilid.VeilidAPI):
deleted = await api_connection.delete_table_db(TEST_DB) deleted = await api_connection.delete_table_db(TEST_DB)
assert deleted assert deleted
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_open_twice_table_db_store_delete_load(api_connection: veilid.VeilidAPI): async def test_open_twice_table_db_store_delete_load(api_connection: veilid.VeilidAPI):
# delete test db if it exists # delete test db if it exists
@ -80,12 +83,11 @@ async def test_open_twice_table_db_store_delete_load(api_connection: veilid.Veil
async with tdb: async with tdb:
tdb2 = await api_connection.open_table_db(TEST_DB, 1) tdb2 = await api_connection.open_table_db(TEST_DB, 1)
async with tdb2: async with tdb2:
# store into first db copy
# store into first db copy
await tdb.store(b"asdf", b"1234") await tdb.store(b"asdf", b"1234")
# delete from second db copy and clean up # delete from second db copy and clean up
await tdb2.delete(b"asdf") await tdb2.delete(b"asdf")
# load from first db copy # load from first db copy
assert await tdb.load(b"asdf") == None assert await tdb.load(b"asdf") == None
@ -104,24 +106,22 @@ async def test_resize_table_db(api_connection: veilid.VeilidAPI):
# reopen the db with more columns should fail if it is already open # reopen the db with more columns should fail if it is already open
with pytest.raises(veilid.VeilidAPIErrorGeneric) as exc: with pytest.raises(veilid.VeilidAPIErrorGeneric) as exc:
await api_connection.open_table_db(TEST_DB, 2) await api_connection.open_table_db(TEST_DB, 2)
tdb2 = await api_connection.open_table_db(TEST_DB, 2) tdb2 = await api_connection.open_table_db(TEST_DB, 2)
async with tdb2: async with tdb2:
# write something to second column # write something to second column
await tdb2.store(b"qwer", b"5678", col = 1) await tdb2.store(b"qwer", b"5678", col=1)
# reopen the db with fewer columns # reopen the db with fewer columns
tdb = await api_connection.open_table_db(TEST_DB, 1) tdb = await api_connection.open_table_db(TEST_DB, 1)
async with tdb: async with tdb:
# Should fail access to second column
# Should fail access to second column
with pytest.raises(veilid.VeilidAPIErrorGeneric) as exc: with pytest.raises(veilid.VeilidAPIErrorGeneric) as exc:
await tdb.load(b"qwer", col = 1) await tdb.load(b"qwer", col=1)
# Should succeed with access to second column # Should succeed with access to second column
assert await tdb2.load(b"qwer", col = 1) == b"5678" assert await tdb2.load(b"qwer", col=1) == b"5678"
# now delete should succeed # now delete should succeed
deleted = await api_connection.delete_table_db(TEST_DB) deleted = await api_connection.delete_table_db(TEST_DB)
assert deleted assert deleted

View File

@ -6,7 +6,6 @@ from .state import VeilidState
class RoutingContext(ABC): class RoutingContext(ABC):
async def __aenter__(self) -> Self: async def __aenter__(self) -> Self:
return self return self
@ -23,21 +22,21 @@ class RoutingContext(ABC):
pass pass
@abstractmethod @abstractmethod
async def with_privacy(self, release = True) -> Self: async def with_privacy(self, release=True) -> Self:
pass pass
@abstractmethod @abstractmethod
async def with_custom_privacy(self, safety_selection: types.SafetySelection, release = True) -> Self: async def with_custom_privacy(
self, safety_selection: types.SafetySelection, release=True
) -> Self:
pass pass
@abstractmethod @abstractmethod
async def with_sequencing(self, sequencing: types.Sequencing, release = True) -> Self: async def with_sequencing(self, sequencing: types.Sequencing, release=True) -> Self:
pass pass
@abstractmethod @abstractmethod
async def app_call( async def app_call(self, target: types.TypedKey | types.RouteId, request: bytes) -> bytes:
self, target: types.TypedKey | types.RouteId, request: bytes
) -> bytes:
pass pass
@abstractmethod @abstractmethod
@ -166,7 +165,6 @@ class TableDb(ABC):
class CryptoSystem(ABC): class CryptoSystem(ABC):
async def __aenter__(self) -> Self: async def __aenter__(self) -> Self:
return self return self
@ -183,9 +181,7 @@ class CryptoSystem(ABC):
pass pass
@abstractmethod @abstractmethod
async def cached_dh( async def cached_dh(self, key: types.PublicKey, secret: types.SecretKey) -> types.SharedSecret:
self, key: types.PublicKey, secret: types.SecretKey
) -> types.SharedSecret:
pass pass
@abstractmethod @abstractmethod
@ -211,9 +207,7 @@ class CryptoSystem(ABC):
pass pass
@abstractmethod @abstractmethod
async def derive_shared_secret( async def derive_shared_secret(self, password: bytes, salt: bytes) -> types.SharedSecret:
self, password: bytes, salt: bytes
) -> types.SharedSecret:
pass pass
@abstractmethod @abstractmethod
@ -233,9 +227,7 @@ class CryptoSystem(ABC):
pass pass
@abstractmethod @abstractmethod
async def validate_key_pair( async def validate_key_pair(self, key: types.PublicKey, secret: types.SecretKey) -> bool:
self, key: types.PublicKey, secret: types.SecretKey
) -> bool:
pass pass
@abstractmethod @abstractmethod
@ -255,9 +247,7 @@ class CryptoSystem(ABC):
pass pass
@abstractmethod @abstractmethod
async def verify( async def verify(self, key: types.PublicKey, data: bytes, signature: types.Signature):
self, key: types.PublicKey, data: bytes, signature: types.Signature
):
pass pass
@abstractmethod @abstractmethod

View File

@ -13,6 +13,10 @@ class VeilidAPIError(Exception):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(self.label, *args, **kwargs) super().__init__(self.label, *args, **kwargs)
def __str__(self) -> str:
args = [('label', self.label)] + sorted(vars(self).items())
return " ".join(f"{key}={value!r}" for (key, value) in args)
@classmethod @classmethod
def from_json(cls, json: dict) -> Self: def from_json(cls, json: dict) -> Self:
kind = json["kind"] kind = json["kind"]

View File

@ -6,20 +6,45 @@ from typing import Awaitable, Callable, Optional, Self
from jsonschema import exceptions, validators from jsonschema import exceptions, validators
from . import schema from . import schema
from .api import (CryptoSystem, RoutingContext, TableDb, TableDbTransaction, from .api import CryptoSystem, RoutingContext, TableDb, TableDbTransaction, VeilidAPI
VeilidAPI)
from .error import raise_api_result from .error import raise_api_result
from .operations import (CryptoSystemOperation, Operation, from .operations import (
RoutingContextOperation, TableDbOperation, CryptoSystemOperation,
TableDbTransactionOperation) Operation,
RoutingContextOperation,
TableDbOperation,
TableDbTransactionOperation,
)
from .state import VeilidState, VeilidUpdate from .state import VeilidState, VeilidUpdate
from .types import (CryptoKey, CryptoKeyDistance, CryptoKind, from .types import (
DHTRecordDescriptor, DHTSchema, HashDigest, KeyPair, CryptoKey,
NewPrivateRouteResult, Nonce, OperationId, PublicKey, CryptoKeyDistance,
RouteId, SafetySelection, SecretKey, Sequencing, SharedSecret, Signature, CryptoKind,
Stability, Timestamp, TypedKey, TypedKeyPair, DHTRecordDescriptor,
TypedSignature, ValueData, ValueSubkey, VeilidJSONEncoder, DHTSchema,
VeilidVersion, urlsafe_b64decode_no_pad) HashDigest,
KeyPair,
NewPrivateRouteResult,
Nonce,
OperationId,
PublicKey,
RouteId,
SafetySelection,
SecretKey,
Sequencing,
SharedSecret,
Signature,
Stability,
Timestamp,
TypedKey,
TypedKeyPair,
TypedSignature,
ValueData,
ValueSubkey,
VeilidJSONEncoder,
VeilidVersion,
urlsafe_b64decode_no_pad,
)
############################################################## ##############################################################
@ -200,10 +225,7 @@ class _JsonVeilidAPI(VeilidAPI):
self.writer.write(reqbytes) self.writer.write(reqbytes)
async def send_ndjson_request( async def send_ndjson_request(
self, self, op: Operation, validate: Optional[Callable[[dict, dict], None]] = None, **kwargs
op: Operation,
validate: Optional[Callable[[dict, dict], None]] = None,
**kwargs
) -> dict: ) -> dict:
# Get next id # Get next id
await self.lock.acquire() await self.lock.acquire()
@ -249,9 +271,7 @@ class _JsonVeilidAPI(VeilidAPI):
return response return response
async def control(self, args: list[str]) -> str: async def control(self, args: list[str]) -> str:
return raise_api_result( return raise_api_result(await self.send_ndjson_request(Operation.CONTROL, args=args))
await self.send_ndjson_request(Operation.CONTROL, args=args)
)
async def get_state(self) -> VeilidState: async def get_state(self) -> VeilidState:
return VeilidState.from_json( return VeilidState.from_json(
@ -266,9 +286,7 @@ class _JsonVeilidAPI(VeilidAPI):
async def new_private_route(self) -> tuple[RouteId, bytes]: async def new_private_route(self) -> tuple[RouteId, bytes]:
return NewPrivateRouteResult.from_json( return NewPrivateRouteResult.from_json(
raise_api_result( raise_api_result(await self.send_ndjson_request(Operation.NEW_PRIVATE_ROUTE))
await self.send_ndjson_request(Operation.NEW_PRIVATE_ROUTE)
)
).to_tuple() ).to_tuple()
async def new_custom_private_route( async def new_custom_private_route(
@ -288,17 +306,13 @@ class _JsonVeilidAPI(VeilidAPI):
async def import_remote_private_route(self, blob: bytes) -> RouteId: async def import_remote_private_route(self, blob: bytes) -> RouteId:
return RouteId( return RouteId(
raise_api_result( raise_api_result(
await self.send_ndjson_request( await self.send_ndjson_request(Operation.IMPORT_REMOTE_PRIVATE_ROUTE, blob=blob)
Operation.IMPORT_REMOTE_PRIVATE_ROUTE, blob=blob
)
) )
) )
async def release_private_route(self, route_id: RouteId): async def release_private_route(self, route_id: RouteId):
raise_api_result( raise_api_result(
await self.send_ndjson_request( await self.send_ndjson_request(Operation.RELEASE_PRIVATE_ROUTE, route_id=route_id)
Operation.RELEASE_PRIVATE_ROUTE, route_id=route_id
)
) )
async def app_call_reply(self, call_id: OperationId, message: bytes): async def app_call_reply(self, call_id: OperationId, message: bytes):
@ -309,9 +323,7 @@ class _JsonVeilidAPI(VeilidAPI):
) )
async def new_routing_context(self) -> RoutingContext: async def new_routing_context(self) -> RoutingContext:
rc_id = raise_api_result( rc_id = raise_api_result(await self.send_ndjson_request(Operation.NEW_ROUTING_CONTEXT))
await self.send_ndjson_request(Operation.NEW_ROUTING_CONTEXT)
)
return _JsonRoutingContext(self, rc_id) return _JsonRoutingContext(self, rc_id)
async def open_table_db(self, name: str, column_count: int) -> TableDb: async def open_table_db(self, name: str, column_count: int) -> TableDb:
@ -334,9 +346,7 @@ class _JsonVeilidAPI(VeilidAPI):
return _JsonCryptoSystem(self, cs_id) return _JsonCryptoSystem(self, cs_id)
async def best_crypto_system(self) -> CryptoSystem: async def best_crypto_system(self) -> CryptoSystem:
cs_id = raise_api_result( cs_id = raise_api_result(await self.send_ndjson_request(Operation.BEST_CRYPTO_SYSTEM))
await self.send_ndjson_request(Operation.BEST_CRYPTO_SYSTEM)
)
return _JsonCryptoSystem(self, cs_id) return _JsonCryptoSystem(self, cs_id)
async def verify_signatures( async def verify_signatures(
@ -375,27 +385,19 @@ class _JsonVeilidAPI(VeilidAPI):
map( map(
lambda x: TypedKeyPair(x), lambda x: TypedKeyPair(x),
raise_api_result( raise_api_result(
await self.send_ndjson_request( await self.send_ndjson_request(Operation.GENERATE_KEY_PAIR, kind=kind)
Operation.GENERATE_KEY_PAIR, kind=kind
)
), ),
) )
) )
async def now(self) -> Timestamp: async def now(self) -> Timestamp:
return Timestamp( return Timestamp(raise_api_result(await self.send_ndjson_request(Operation.NOW)))
raise_api_result(await self.send_ndjson_request(Operation.NOW))
)
async def debug(self, command: str) -> str: async def debug(self, command: str) -> str:
return raise_api_result( return raise_api_result(await self.send_ndjson_request(Operation.DEBUG, command=command))
await self.send_ndjson_request(Operation.DEBUG, command=command)
)
async def veilid_version_string(self) -> str: async def veilid_version_string(self) -> str:
return raise_api_result( return raise_api_result(await self.send_ndjson_request(Operation.VEILID_VERSION_STRING))
await self.send_ndjson_request(Operation.VEILID_VERSION_STRING)
)
async def veilid_version(self) -> VeilidVersion: async def veilid_version(self) -> VeilidVersion:
v = await self.send_ndjson_request(Operation.VEILID_VERSION) v = await self.send_ndjson_request(Operation.VEILID_VERSION)
@ -424,11 +426,9 @@ class _JsonRoutingContext(RoutingContext):
if not self.done: if not self.done:
# attempt to clean up server-side anyway # attempt to clean up server-side anyway
self.api.send_one_way_ndjson_request( self.api.send_one_way_ndjson_request(
Operation.ROUTING_CONTEXT, Operation.ROUTING_CONTEXT, rc_id=self.rc_id, rc_op=RoutingContextOperation.RELEASE
rc_id=self.rc_id,
rc_op=RoutingContextOperation.RELEASE
) )
# complain # complain
raise AssertionError("Should have released routing context before dropping object") raise AssertionError("Should have released routing context before dropping object")
@ -442,11 +442,11 @@ class _JsonRoutingContext(RoutingContext):
Operation.ROUTING_CONTEXT, Operation.ROUTING_CONTEXT,
validate=validate_rc_op, validate=validate_rc_op,
rc_id=self.rc_id, rc_id=self.rc_id,
rc_op=RoutingContextOperation.RELEASE rc_op=RoutingContextOperation.RELEASE,
) )
self.done = True self.done = True
async def with_privacy(self, release = True) -> Self: async def with_privacy(self, release=True) -> Self:
new_rc_id = raise_api_result( new_rc_id = raise_api_result(
await self.api.send_ndjson_request( await self.api.send_ndjson_request(
Operation.ROUTING_CONTEXT, Operation.ROUTING_CONTEXT,
@ -459,7 +459,7 @@ class _JsonRoutingContext(RoutingContext):
await self.release() await self.release()
return self.__class__(self.api, new_rc_id) return self.__class__(self.api, new_rc_id)
async def with_custom_privacy(self, safety_selection: SafetySelection, release = True) -> Self: async def with_custom_privacy(self, safety_selection: SafetySelection, release=True) -> Self:
new_rc_id = raise_api_result( new_rc_id = raise_api_result(
await self.api.send_ndjson_request( await self.api.send_ndjson_request(
Operation.ROUTING_CONTEXT, Operation.ROUTING_CONTEXT,
@ -473,7 +473,7 @@ class _JsonRoutingContext(RoutingContext):
await self.release() await self.release()
return self.__class__(self.api, new_rc_id) return self.__class__(self.api, new_rc_id)
async def with_sequencing(self, sequencing: Sequencing, release = True) -> Self: async def with_sequencing(self, sequencing: Sequencing, release=True) -> Self:
new_rc_id = raise_api_result( new_rc_id = raise_api_result(
await self.api.send_ndjson_request( await self.api.send_ndjson_request(
Operation.ROUTING_CONTEXT, Operation.ROUTING_CONTEXT,
@ -664,7 +664,9 @@ class _JsonTableDbTransaction(TableDbTransaction):
) )
# complain # complain
raise AssertionError("Should have committed or rolled back transaction before dropping object") raise AssertionError(
"Should have committed or rolled back transaction before dropping object"
)
def is_done(self) -> bool: def is_done(self) -> bool:
return self.done return self.done
@ -672,7 +674,7 @@ class _JsonTableDbTransaction(TableDbTransaction):
async def commit(self): async def commit(self):
if self.done: if self.done:
raise AssertionError("Transaction is already done") raise AssertionError("Transaction is already done")
raise_api_result( raise_api_result(
await self.api.send_ndjson_request( await self.api.send_ndjson_request(
Operation.TABLE_DB_TRANSACTION, Operation.TABLE_DB_TRANSACTION,
@ -736,12 +738,9 @@ class _JsonTableDb(TableDb):
def __del__(self): def __del__(self):
if not self.done: if not self.done:
# attempt to clean up server-side anyway # attempt to clean up server-side anyway
self.api.send_one_way_ndjson_request( self.api.send_one_way_ndjson_request(
Operation.TABLE_DB, Operation.TABLE_DB, db_id=self.db_id, db_op=TableDbOperation.RELEASE
db_id=self.db_id,
db_op=TableDbOperation.RELEASE
) )
# complain # complain
@ -757,11 +756,10 @@ class _JsonTableDb(TableDb):
Operation.TABLE_DB, Operation.TABLE_DB,
validate=validate_db_op, validate=validate_db_op,
db_id=self.db_id, db_id=self.db_id,
db_op=TableDbOperation.RELEASE db_op=TableDbOperation.RELEASE,
) )
self.done = True self.done = True
async def get_column_count(self) -> int: async def get_column_count(self) -> int:
return raise_api_result( return raise_api_result(
await self.api.send_ndjson_request( await self.api.send_ndjson_request(
@ -859,12 +857,9 @@ class _JsonCryptoSystem(CryptoSystem):
def __del__(self): def __del__(self):
if not self.done: if not self.done:
# attempt to clean up server-side anyway # attempt to clean up server-side anyway
self.api.send_one_way_ndjson_request( self.api.send_one_way_ndjson_request(
Operation.CRYPTO_SYSTEM, Operation.CRYPTO_SYSTEM, cs_id=self.cs_id, cs_op=CryptoSystemOperation.RELEASE
cs_id=self.cs_id,
cs_op=CryptoSystemOperation.RELEASE
) )
# complain # complain
@ -872,7 +867,7 @@ class _JsonCryptoSystem(CryptoSystem):
def is_done(self) -> bool: def is_done(self) -> bool:
return self.done return self.done
async def release(self): async def release(self):
if self.done: if self.done:
return return
@ -880,7 +875,7 @@ class _JsonCryptoSystem(CryptoSystem):
Operation.CRYPTO_SYSTEM, Operation.CRYPTO_SYSTEM,
validate=validate_cs_op, validate=validate_cs_op,
cs_id=self.cs_id, cs_id=self.cs_id,
cs_op=CryptoSystemOperation.RELEASE cs_op=CryptoSystemOperation.RELEASE,
) )
self.done = True self.done = True
@ -1142,9 +1137,7 @@ class _JsonCryptoSystem(CryptoSystem):
) )
) )
async def crypt_no_auth( async def crypt_no_auth(self, body: bytes, nonce: Nonce, shared_secret: SharedSecret) -> bytes:
self, body: bytes, nonce: Nonce, shared_secret: SharedSecret
) -> bytes:
return urlsafe_b64decode_no_pad( return urlsafe_b64decode_no_pad(
raise_api_result( raise_api_result(
await self.api.send_ndjson_request( await self.api.send_ndjson_request(

View File

@ -1,6 +1,7 @@
from enum import StrEnum from enum import StrEnum
from typing import Self from typing import Self
class Operation(StrEnum): class Operation(StrEnum):
CONTROL = "Control" CONTROL = "Control"
GET_STATE = "GetState" GET_STATE = "GetState"
@ -28,6 +29,7 @@ class Operation(StrEnum):
VEILID_VERSION_STRING = "VeilidVersionString" VEILID_VERSION_STRING = "VeilidVersionString"
VEILID_VERSION = "VeilidVersion" VEILID_VERSION = "VeilidVersion"
class RoutingContextOperation(StrEnum): class RoutingContextOperation(StrEnum):
INVALID_ID = "InvalidId" INVALID_ID = "InvalidId"
RELEASE = "Release" RELEASE = "Release"
@ -45,6 +47,7 @@ class RoutingContextOperation(StrEnum):
WATCH_DHT_VALUES = "WatchDhtValues" WATCH_DHT_VALUES = "WatchDhtValues"
CANCEL_DHT_WATCH = "CancelDhtWatch" CANCEL_DHT_WATCH = "CancelDhtWatch"
class TableDbOperation(StrEnum): class TableDbOperation(StrEnum):
INVALID_ID = "InvalidId" INVALID_ID = "InvalidId"
RELEASE = "Release" RELEASE = "Release"
@ -55,6 +58,7 @@ class TableDbOperation(StrEnum):
LOAD = "Load" LOAD = "Load"
DELETE = "Delete" DELETE = "Delete"
class TableDbTransactionOperation(StrEnum): class TableDbTransactionOperation(StrEnum):
INVALID_ID = "InvalidId" INVALID_ID = "InvalidId"
COMMIT = "Commit" COMMIT = "Commit"
@ -62,6 +66,7 @@ class TableDbTransactionOperation(StrEnum):
STORE = "Store" STORE = "Store"
DELETE = "Delete" DELETE = "Delete"
class CryptoSystemOperation(StrEnum): class CryptoSystemOperation(StrEnum):
INVALID_ID = "InvalidId" INVALID_ID = "InvalidId"
RELEASE = "Release" RELEASE = "Release"
@ -85,7 +90,8 @@ class CryptoSystemOperation(StrEnum):
DECRYPT_AEAD = "DecryptAead" DECRYPT_AEAD = "DecryptAead"
ENCRYPT_AEAD = "EncryptAead" ENCRYPT_AEAD = "EncryptAead"
CRYPT_NO_AUTH = "CryptNoAuth" CRYPT_NO_AUTH = "CryptNoAuth"
class RecvMessageType(StrEnum): class RecvMessageType(StrEnum):
RESPONSE = "Response" RESPONSE = "Response"
UPDATE = "Update" UPDATE = "Update"

View File

@ -2,9 +2,18 @@ from enum import StrEnum
from typing import Optional, Self from typing import Optional, Self
from .config import VeilidConfig from .config import VeilidConfig
from .types import (ByteCount, RouteId, Timestamp, TimestampDuration, TypedKey, from .types import (
ValueData, ValueSubkey, VeilidLogLevel, OperationId, ByteCount,
urlsafe_b64decode_no_pad) RouteId,
Timestamp,
TimestampDuration,
TypedKey,
ValueData,
ValueSubkey,
VeilidLogLevel,
OperationId,
urlsafe_b64decode_no_pad,
)
class AttachmentState(StrEnum): class AttachmentState(StrEnum):
@ -200,9 +209,7 @@ class PeerTableData:
@classmethod @classmethod
def from_json(cls, j: dict) -> Self: def from_json(cls, j: dict) -> Self:
"""JSON object hook""" """JSON object hook"""
return cls( return cls(j["node_ids"], j["peer_address"], PeerStats.from_json(j["peer_stats"]))
j["node_ids"], j["peer_address"], PeerStats.from_json(j["peer_stats"])
)
class VeilidStateNetwork: class VeilidStateNetwork:
@ -276,9 +283,7 @@ class VeilidLog:
message: str message: str
backtrace: Optional[str] backtrace: Optional[str]
def __init__( def __init__(self, log_level: VeilidLogLevel, message: str, backtrace: Optional[str]):
self, log_level: VeilidLogLevel, message: str, backtrace: Optional[str]
):
self.log_level = log_level self.log_level = log_level
self.message = message self.message = message
self.backtrace = backtrace self.backtrace = backtrace
@ -349,9 +354,7 @@ class VeilidValueChange:
count: int count: int
value: ValueData value: ValueData
def __init__( def __init__(self, key: TypedKey, subkeys: list[ValueSubkey], count: int, value: ValueData):
self, key: TypedKey, subkeys: list[ValueSubkey], count: int, value: ValueData
):
self.key = key self.key = key
self.subkeys = subkeys self.subkeys = subkeys
self.count = count self.count = count

View File

@ -1,8 +1,8 @@
import base64 import base64
import json import json
from enum import StrEnum from enum import StrEnum
from typing import Any, Optional, Self, Tuple
from functools import total_ordering from functools import total_ordering
from typing import Any, Optional, Self, Tuple
#################################################################### ####################################################################
@ -83,6 +83,7 @@ class SafetySelectionKind(StrEnum):
UNSAFE = "Unsafe" UNSAFE = "Unsafe"
SAFE = "Safe" SAFE = "Safe"
#################################################################### ####################################################################
@ -249,7 +250,12 @@ class VeilidVersion:
return False return False
def __eq__(self, other): def __eq__(self, other):
return isinstance(other, VeilidVersion) and self.data == other.data and self.seq == other.seq and self.writer == other.writer return (
isinstance(other, VeilidVersion)
and self.data == other.data
and self.seq == other.seq
and self.writer == other.writer
)
@property @property
def major(self): def major(self):
@ -319,8 +325,7 @@ class DHTSchema:
if DHTSchemaKind(j["kind"]) == DHTSchemaKind.SMPL: if DHTSchemaKind(j["kind"]) == DHTSchemaKind.SMPL:
return cls.smpl( return cls.smpl(
j["o_cnt"], j["o_cnt"],
[DHTSchemaSMPLMember.from_json(member) [DHTSchemaSMPLMember.from_json(member) for member in j["members"]],
for member in j["members"]],
) )
raise Exception("Unknown DHTSchema kind", j["kind"]) raise Exception("Unknown DHTSchema kind", j["kind"])
@ -346,13 +351,15 @@ class DHTRecordDescriptor:
self.owner_secret = owner_secret self.owner_secret = owner_secret
self.schema = schema self.schema = schema
def __repr__(self) -> str:
return f"<{self.__class__.__name__}(key={self.key!r})>"
@classmethod @classmethod
def from_json(cls, j: dict) -> Self: def from_json(cls, j: dict) -> Self:
return cls( return cls(
TypedKey(j["key"]), TypedKey(j["key"]),
PublicKey(j["owner"]), PublicKey(j["owner"]),
None if j["owner_secret"] is None else SecretKey( None if j["owner_secret"] is None else SecretKey(j["owner_secret"]),
j["owner_secret"]),
DHTSchema.from_json(j["schema"]), DHTSchema.from_json(j["schema"]),
) )
@ -371,6 +378,9 @@ class ValueData:
self.data = data self.data = data
self.writer = writer self.writer = writer
def __repr__(self) -> str:
return f"<{self.__class__.__name__}(seq={self.seq!r}, data={self.data!r}, writer={self.writer!r})>"
def __lt__(self, other): def __lt__(self, other):
if other is None: if other is None:
return True return True
@ -387,7 +397,12 @@ class ValueData:
return False return False
def __eq__(self, other): def __eq__(self, other):
return isinstance(other, ValueData) and self.data == other.data and self.seq == other.seq and self.writer == other.writer return (
isinstance(other, ValueData)
and self.data == other.data
and self.seq == other.seq
and self.writer == other.writer
)
@classmethod @classmethod
def from_json(cls, j: dict) -> Self: def from_json(cls, j: dict) -> Self:
@ -403,13 +418,20 @@ class ValueData:
#################################################################### ####################################################################
class SafetySpec: class SafetySpec:
preferred_route: Optional[RouteId] preferred_route: Optional[RouteId]
hop_count: int hop_count: int
stability: Stability stability: Stability
sequencing: Sequencing sequencing: Sequencing
def __init__(self, preferred_route: Optional[RouteId], hop_count: int, stability: Stability, sequencing: Sequencing): def __init__(
self,
preferred_route: Optional[RouteId],
hop_count: int,
stability: Stability,
sequencing: Sequencing,
):
self.preferred_route = preferred_route self.preferred_route = preferred_route
self.hop_count = hop_count self.hop_count = hop_count
self.stability = stability self.stability = stability
@ -417,10 +439,12 @@ class SafetySpec:
@classmethod @classmethod
def from_json(cls, j: dict) -> Self: def from_json(cls, j: dict) -> Self:
return cls(RouteId(j["preferred_route"]) if "preferred_route" in j else None, return cls(
j["hop_count"], RouteId(j["preferred_route"]) if "preferred_route" in j else None,
Stability(j["stability"]), j["hop_count"],
Sequencing(j["sequencing"])) Stability(j["stability"]),
Sequencing(j["sequencing"]),
)
def to_json(self) -> dict: def to_json(self) -> dict:
return self.__dict__ return self.__dict__