veilid/scripts/earthly/secretsd/secretsd/session.py
2022-01-09 15:17:36 -05:00

56 lines
1.8 KiB
Python

import dbus
import dbus.service
import os
from .crypto_backend import (
AES_BLOCK_BYTES,
aes_cbc_encrypt,
aes_cbc_decrypt,
dh_modp1024_exchange,
hkdf_sha256_derive,
pkcs7_pad,
pkcs7_unpad,
)
class SecretServiceSession(dbus.service.Object):
ROOT = "/org/freedesktop/secrets/session"
PATH = "/org/freedesktop/secrets/session/s%d"
def __init__(self, service, bus_path, algorithm):
self.bus_path = bus_path
self.algorithm = algorithm
self.kex_done = False
self.crypt_key = None
super().__init__(service.bus, bus_path)
def kex(self, input):
if self.algorithm == "plain":
return (dbus.ByteArray(b""), True)
elif self.algorithm == "dh-ietf1024-sha256-aes128-cbc-pkcs7":
peer_pubkey = int.from_bytes(input, "big")
our_pubkey, shared_key = dh_modp1024_exchange(peer_pubkey)
self.crypt_key = hkdf_sha256_derive(shared_key, 128 // 8)
output = our_pubkey.to_bytes(1024 // 8, "big")
return (dbus.ByteArray(output), True)
else:
raise dbus.DBusException("org.freedesktop.DBus.Error.NotSupported")
def encrypt(self, input):
if self.algorithm == "plain":
return input, None
elif self.algorithm == "dh-ietf1024-sha256-aes128-cbc-pkcs7":
key = self.crypt_key
iv = os.urandom(AES_BLOCK_BYTES)
ct = pkcs7_pad(input, AES_BLOCK_BYTES)
ct = aes_cbc_encrypt(ct, key, iv)
return ct, iv
def decrypt(self, input, iv):
if self.algorithm == "plain":
return input
elif self.algorithm == "dh-ietf1024-sha256-aes128-cbc-pkcs7":
key = self.crypt_key
pt = aes_cbc_decrypt(input, key, iv)
pt = pkcs7_unpad(pt, AES_BLOCK_BYTES)
return pt