From fa92c9902d3622fb33bd63c16d090d435b725793 Mon Sep 17 00:00:00 2001 From: Teknique Date: Fri, 16 Jun 2023 23:33:10 -0700 Subject: [PATCH] Cleanup types --- veilid-python/veilid/types.py | 294 +++++++++++++++++++--------------- 1 file changed, 169 insertions(+), 125 deletions(-) diff --git a/veilid-python/veilid/types.py b/veilid-python/veilid/types.py index 684f3b17..fa85057f 100644 --- a/veilid-python/veilid/types.py +++ b/veilid-python/veilid/types.py @@ -1,12 +1,11 @@ -import time -import json import base64 - +import json from enum import StrEnum -from typing import Self, Optional, Any, Tuple +from typing import Any, Optional, Self, Tuple #################################################################### + def urlsafe_b64encode_no_pad(b: bytes) -> str: """ Removes any `=` used as padding from the encoded string. @@ -22,6 +21,7 @@ def urlsafe_b64decode_no_pad(s: str) -> bytes: s = s + ("=" * padding) return base64.urlsafe_b64decode(s) + class VeilidJSONEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, bytes): @@ -29,173 +29,206 @@ class VeilidJSONEncoder(json.JSONEncoder): if hasattr(o, "to_json") and callable(o.to_json): return o.to_json() return json.JSONEncoder.default(self, o) - + @staticmethod def dumps(req: Any, *args, **kwargs) -> str: - return json.dumps(req, cls = VeilidJSONEncoder, *args, **kwargs) + return json.dumps(req, cls=VeilidJSONEncoder, *args, **kwargs) + #################################################################### + class VeilidLogLevel(StrEnum): - ERROR = 'Error' - WARN = 'Warn' - INFO = 'Info' - DEBUG = 'Debug' - TRACE = 'Trace' + ERROR = "Error" + WARN = "Warn" + INFO = "Info" + DEBUG = "Debug" + TRACE = "Trace" + class CryptoKind(StrEnum): CRYPTO_KIND_NONE = "NONE" CRYPTO_KIND_VLD0 = "VLD0" + class Stability(StrEnum): LOW_LATENCY = "LowLatency" RELIABLE = "Reliable" + class Sequencing(StrEnum): NO_PREFERENCE = "NoPreference" PREFER_ORDERED = "PreferOrdered" ENSURE_ORDERED = "EnsureOrdered" + class DHTSchemaKind(StrEnum): DFLT = "DFLT" SMPL = "SMPL" + #################################################################### + class Timestamp(int): pass + class TimestampDuration(int): pass + class ByteCount(int): pass + class OperationId(int): pass + class RouteId(str): pass -class CryptoKey: + +class EncodedString(str): def to_bytes(self) -> bytes: return urlsafe_b64decode_no_pad(self) -class CryptoKeyDistance(CryptoKey, str): - @staticmethod - def from_bytes(b: bytes) -> Self: - return CryptoKeyDistance(urlsafe_b64encode_no_pad(b)) + @classmethod + def from_bytes(cls, b: bytes) -> Self: + return cls(urlsafe_b64encode_no_pad(b)) -class PublicKey(CryptoKey, str): - @staticmethod - def from_bytes(b: bytes) -> Self: - return PublicKey(urlsafe_b64encode_no_pad(b)) -class SecretKey(CryptoKey, str): - @staticmethod - def from_bytes(b: bytes) -> Self: - return SecretKey(urlsafe_b64encode_no_pad(b)) +class CryptoKey(EncodedString): + pass -class SharedSecret(CryptoKey, str): - @staticmethod - def from_bytes(b: bytes) -> Self: - return SharedSecret(urlsafe_b64encode_no_pad(b)) -class HashDigest(CryptoKey, str): - @staticmethod - def from_bytes(b: bytes) -> Self: - return HashDigest(urlsafe_b64encode_no_pad(b)) +class CryptoKeyDistance(CryptoKey): + pass -class Signature(str): - @staticmethod - def from_bytes(b: bytes) -> Self: - return Signature(urlsafe_b64encode_no_pad(b)) - def to_bytes(self) -> bytes: - return urlsafe_b64decode_no_pad(self) -class Nonce(str): - @staticmethod - def from_bytes(b: bytes) -> Self: - return Signature(urlsafe_b64encode_no_pad(b)) - def to_bytes(self) -> bytes: - return urlsafe_b64decode_no_pad(self) +class PublicKey(CryptoKey): + pass + + +class SecretKey(CryptoKey): + pass + + +class SharedSecret(CryptoKey): + pass + + +class HashDigest(CryptoKey): + pass + + +class Signature(EncodedString): + pass + + +class Nonce(EncodedString): + pass + class KeyPair(str): - @staticmethod - def from_parts(key: PublicKey, secret: SecretKey) -> Self: - return KeyPair(key + ":" + secret) - def key(self) -> PublicKey: - return PublicKey(str.split(":", 1)[0]) - def secret(self) -> SecretKey: - return SecretKey(str.split(":", 1)[1]) - def to_parts(self) -> Tuple[PublicKey, SecretKey]: - parts = str.split(":", 1) - return (PublicKey(parts[0]), SecretKey(parts[1])) + @classmethod + def from_parts(cls, key: PublicKey, secret: SecretKey) -> Self: + return cls(f"{key}:{secret}") -class CryptoTyped: + def key(self) -> PublicKey: + return PublicKey(self.split(":", 1)[0]) + + def secret(self) -> SecretKey: + return SecretKey(self.split(":", 1)[1]) + + def to_parts(self) -> Tuple[PublicKey, SecretKey]: + public, secret = self.split(":", 1) + return (PublicKey(public), SecretKey(secret)) + + +class CryptoTyped(str): def kind(self) -> CryptoKind: - if self[4] != ':': + if self[4] != ":": raise ValueError("Not CryptoTyped") return CryptoKind(self[0:4]) + def _value(self) -> str: - if self[4] != ':': + if self[4] != ":": raise ValueError("Not CryptoTyped") return self[5:] -class TypedKey(CryptoTyped, str): - @staticmethod - def from_value(kind: CryptoKind, value: PublicKey) -> Self: - return TypedKey(kind + ":" + value) + +class TypedKey(CryptoTyped): + @classmethod + def from_value(cls, kind: CryptoKind, value: PublicKey) -> Self: + return cls(f"{kind}:{value}") + def value(self) -> PublicKey: - PublicKey(self._value()) - -class TypedSecret(CryptoTyped, str): - @staticmethod - def from_value(kind: CryptoKind, value: SecretKey) -> Self: - return TypedSecret(kind + ":" + value) + return PublicKey(self._value()) + + +class TypedSecret(CryptoTyped): + @classmethod + def from_value(cls, kind: CryptoKind, value: SecretKey) -> Self: + return cls(f"{kind}:{value}") + def value(self) -> SecretKey: - SecretKey(self._value()) + return SecretKey(self._value()) + + +class TypedKeyPair(CryptoTyped): + @classmethod + def from_value(cls, kind: CryptoKind, value: KeyPair) -> Self: + return cls(f"{kind}:{value}") -class TypedKeyPair(CryptoTyped, str): - @staticmethod - def from_value(kind: CryptoKind, value: KeyPair) -> Self: - return TypedKeyPair(kind + ":" + value) def value(self) -> KeyPair: - KeyPair(self._value()) + return KeyPair(self._value()) + + +class TypedSignature(CryptoTyped): + @classmethod + def from_value(cls, kind: CryptoKind, value: Signature) -> Self: + return cls(f"{kind}:{value}") -class TypedSignature(CryptoTyped, str): - @staticmethod - def from_value(kind: CryptoKind, value: Signature) -> Self: - return TypedSignature(kind + ":" + value) def value(self) -> Signature: - Signature(self._value()) + return Signature(self._value()) + class ValueSubkey(int): pass + class ValueSeqNum(int): pass + #################################################################### + class VeilidVersion: _major: int _minor: int _patch: int + def __init__(self, major: int, minor: int, patch: int): self._major = major self._minor = minor self._patch = patch + @property def major(self): return self._major + @property def minor(self): return self._minor + @property def patch(self): return self._patch + class NewPrivateRouteResult: route_id: RouteId blob: bytes @@ -207,95 +240,106 @@ class NewPrivateRouteResult: def to_tuple(self) -> Tuple[RouteId, bytes]: return (self.route_id, self.blob) - @staticmethod - def from_json(j: dict) -> Self: - return NewPrivateRouteResult( - RouteId(j['route_id']), - urlsafe_b64decode_no_pad(j['blob'])) + @classmethod + def from_json(cls, j: dict) -> Self: + return cls(RouteId(j["route_id"]), urlsafe_b64decode_no_pad(j["blob"])) + class DHTSchemaSMPLMember: m_key: PublicKey m_cnt: int + def __init__(self, m_key: PublicKey, m_cnt: int): self.m_key = m_key self.m_cnt = m_cnt - @staticmethod - def from_json(j: dict) -> Self: - return DHTSchemaSMPLMember( - PublicKey(j['m_key']), - j['m_cnt']) + + @classmethod + def from_json(cls, j: dict) -> Self: + return cls(PublicKey(j["m_key"]), j["m_cnt"]) + def to_json(self) -> dict: return self.__dict__ - + + class DHTSchema: kind: DHTSchemaKind - + def __init__(self, kind: DHTSchemaKind, **kwargs): self.kind = kind for k, v in kwargs.items(): setattr(self, k, v) - - @staticmethod - def dflt(o_cnt: int) -> Self: - Self(DHTSchemaKind.DFLT, o_cnt = o_cnt) - - @staticmethod - def smpl(o_cnt: int, members: list[DHTSchemaSMPLMember]) -> Self: - Self(DHTSchemaKind.SMPL, o_cnt = o_cnt, members = members) - @staticmethod - def from_json(j: dict) -> Self: - if DHTSchemaKind(j['kind']) == DHTSchemaKind.DFLT: - return DHTSchema.dflt(j['o_cnt']) - if DHTSchemaKind(j['kind']) == DHTSchemaKind.SMPL: - return DHTSchema.smpl( - j['o_cnt'], - list(map(lambda x: DHTSchemaSMPLMember.from_json(x), j['members']))) - raise Exception("Unknown DHTSchema kind", j['kind']) + @classmethod + def dflt(cls, o_cnt: int) -> Self: + return cls(DHTSchemaKind.DFLT, o_cnt=o_cnt) + + @classmethod + def smpl(cls, o_cnt: int, members: list[DHTSchemaSMPLMember]) -> Self: + return cls(DHTSchemaKind.SMPL, o_cnt=o_cnt, members=members) + + @classmethod + def from_json(cls, j: dict) -> Self: + if DHTSchemaKind(j["kind"]) == DHTSchemaKind.DFLT: + return cls.dflt(j["o_cnt"]) + if DHTSchemaKind(j["kind"]) == DHTSchemaKind.SMPL: + return cls.smpl( + j["o_cnt"], + [DHTSchemaSMPLMember.from_json(member) for member in j["members"]], + ) + raise Exception("Unknown DHTSchema kind", j["kind"]) def to_json(self) -> dict: return self.__dict__ + class DHTRecordDescriptor: key: TypedKey owner: PublicKey owner_secret: Optional[SecretKey] schema: DHTSchema - def __init__(self, key: TypedKey, owner: PublicKey, owner_secret: Optional[SecretKey], schema: DHTSchema): + def __init__( + self, + key: TypedKey, + owner: PublicKey, + owner_secret: Optional[SecretKey], + schema: DHTSchema, + ): self.key = key self.owner = owner self.owner_secret = owner_secret self.schema = schema - - @staticmethod - def from_json(j: dict) -> Self: - DHTRecordDescriptor( - TypedKey(j['key']), - PublicKey(j['owner']), - None if j['owner_secret'] is None else SecretKey(j['owner_secret']), - DHTSchema.from_json(j['schema'])) + + @classmethod + def from_json(cls, j: dict) -> Self: + return cls( + TypedKey(j["key"]), + PublicKey(j["owner"]), + None if j["owner_secret"] is None else SecretKey(j["owner_secret"]), + DHTSchema.from_json(j["schema"]), + ) def to_json(self) -> dict: return self.__dict__ + class ValueData: seq: ValueSeqNum data: bytes writer: PublicKey - + def __init__(self, seq: ValueSeqNum, data: bytes, writer: PublicKey): self.seq = seq self.data = data self.writer = writer - - @staticmethod - def from_json(j: dict) -> Self: - DHTRecordDescriptor( - ValueSeqNum(j['seq']), - urlsafe_b64decode_no_pad(j['data']), - PublicKey(j['writer'])) + + @classmethod + def from_json(cls, j: dict) -> Self: + return cls( + ValueSeqNum(j["seq"]), + urlsafe_b64decode_no_pad(j["data"]), + PublicKey(j["writer"]), + ) def to_json(self) -> dict: return self.__dict__ -