add eartly build

This commit is contained in:
John Smith 2022-01-09 15:17:36 -05:00
parent 5122fd8ebd
commit 8e20228529
21 changed files with 1390 additions and 0 deletions

8
.earthlyignore Normal file
View File

@ -0,0 +1,8 @@
.vscode
.git
external/keyring-manager/android_test/.gradle
external/keyring-manager/android_test/app/build
external/keyring-manager/android_test/build
external/keyring-manager/android_test/local.properties
target
veilid-core/pkg

83
Earthfile Normal file
View File

@ -0,0 +1,83 @@
VERSION 0.6
# Start with older Ubuntu to ensure GLIBC symbol versioning support for older linux
# Ensure we are using an amd64 platform because some of these targets use cross-platform tooling
FROM --platform amd64 ubuntu:16.04
# Choose where Rust ends up
# Install build prerequisites
deps-base:
RUN apt-get -y update
RUN apt-get install -y software-properties-common
RUN add-apt-repository -y ppa:deadsnakes/ppa
RUN apt-get -y update
RUN apt-get install -y iproute2 curl build-essential cmake libssl-dev openssl file git pkg-config python3.8 python3.8-distutils python3.8-dev libdbus-1-dev libdbus-glib-1-dev libgirepository1.0-dev libcairo2-dev
RUN apt-get remove -y python3.5
RUN curl https://bootstrap.pypa.io/get-pip.py | python3.8
# Install Rust
deps-rust:
FROM +deps-base
ENV RUSTUP_HOME=/usr/local/rustup
ENV CARGO_HOME=/usr/local/cargo
ENV PATH=/usr/local/cargo/bin:$PATH
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y -c clippy --no-modify-path --profile minimal
RUN chmod -R a+w $RUSTUP_HOME $CARGO_HOME; \
rustup --version; \
cargo --version; \
rustc --version;
# ARM64 Linux
RUN rustup target add aarch64-unknown-linux-gnu
# Android
RUN rustup target add aarch64-linux-android
RUN rustup target add armv7-linux-androideabi
RUN rustup target add i686-linux-android
RUN rustup target add x86_64-linux-android
# WASM
RUN rustup target add wasm32-unknown-unknown
# Install Cap'n Proto
deps-capnp:
FROM +deps-rust
COPY scripts/earthly/install_capnproto.sh /
RUN /bin/bash /install_capnproto.sh; rm /install_capnproto.sh
# Install stub secrets daemon for keyring tests
deps-secretsd:
FROM +deps-capnp
COPY scripts/earthly/secretsd /secretsd
RUN pip install -r /secretsd/requirements.txt
RUN pip install keyring
RUN cp /secretsd/dbus/org.freedesktop.secrets.service /usr/share/dbus-1/services/org.freedesktop.secrets.service
# Clean up the apt cache to save space
deps:
FROM +deps-secretsd
RUN apt-get clean
code:
FROM +deps
COPY . .
# Clippy only
clippy:
FROM +code
RUN cargo clippy
# Build
build-linux-amd64:
FROM +code
RUN cargo build --target x86_64-unknown-linux-gnu --release
SAVE ARTIFACT ./target/x86_64-unknown-linux-gnu AS LOCAL ./target/artifacts/x86_64-unknown-linux-gnu
build-linux-arm64:
FROM +code
RUN cargo build --target aarch64-unknown-linux-gnu --release
SAVE ARTIFACT ./target/aarch64-unknown-linux-gnu AS LOCAL ./target/artifacts/aarch64-unknown-linux-gnu
# Unit tests
unit-test:
FROM +code
RUN cargo test --release

View File

@ -0,0 +1,11 @@
#!/bin/bash
mkdir /tmp/capnproto-install
cd /tmp/capnproto-install
curl -O https://capnproto.org/capnproto-c++-0.9.1.tar.gz
tar zxf capnproto-c++-0.9.1.tar.gz
cd capnproto-c++-0.9.1
./configure
make -j6 check
make install
cd /
rm -rf /tmp/capnproto-install

View File

@ -0,0 +1,22 @@
MIT License (also available at <https://spdx.org/licenses/MIT>)
(c) 20102016 Mantas Mikulėnas <grawity@gmail.com>
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -0,0 +1,41 @@
# secretsd
This is a generic backend for the libsecret API, used by various programs to store passwords and similar secrets. It mostly implements the [Secret Service API][api] specification, to act as an alternative to gnome-keyring-daemon and kwalletd.
![badge: "works on my machine"](https://img.shields.io/badge/works%20on%20my%20machine-yes-green.svg?style=flat)
[api]: https://specifications.freedesktop.org/secret-service/latest/
## Dependencies
* python-cryptography
* python-dbus
* python-gobject (3.x)
* python-xdg
## Installation
secretsd is a user-level daemon which uses your D-Bus "session bus". It could be manually started through `systemd --user`:
cp systemd/secretsd.service ~/.config/systemd/user/
systemctl --user start secretsd
or automatically started on demand through D-Bus activation:
cp systemd/secretsd.service ~/.config/systemd/user/
cp dbus/org.freedesktop.secrets.service ~/.local/share/dbus-1/services/
## Security
Secretsd does not aim to provide complete security like a modern password manager would; it only aims to allow using the libsecret API instead of ad-hoc loading of plaintext passwords from `~/.netrc` or similar files, but still relies on external protection for those files. In particular, item titles and attributes are **not** encrypted.
For now, all secrets are encrypted using a single "database key", which is stored in a regular file by default but can be provided through an environment variable, KWallet, or read from an external program. To specify the key source:
secretsd -k file:${CREDENTIALS_DIRECTORY}/secrets.key
secretsd -k env:DATABASE_KEY
secretsd -k kwallet:
secretsd -k exec:"pass Apps/secretsd"
(As secretsd is supposed to be a background service, it is strongly advised to _not_ use an external program which would show interactive prompts. And in particular avoid those which use GnuPG pinentry or otherwise make use of libsecret, for hopefuly obvious reasons.)
Individually encrypted collections are not yet supported, but planned in the future. (This will most likely be a fully separate layer of encryption, in addition to the database key.)

View File

@ -0,0 +1,3 @@
[D-BUS Service]
Name=org.freedesktop.secrets
Exec=/usr/bin/python3.8 /secretsd/secretsd.py

View File

@ -0,0 +1,6 @@
cryptography==36.0.0
pycryptodome==3.12.0
pycairo==1.19
PyGObject==3.30.5
pyxdg==0.27
dbus-python==1.2.18

View File

@ -0,0 +1,2 @@
#!/usr/bin/env python3
import secretsd.__main__

View File

@ -0,0 +1,32 @@
import argparse
import dbus
import dbus.mainloop.glib
from gi.repository import GLib
import logging
import os
import xdg.BaseDirectory
from .database import SecretsDatabase
from .service import SecretService
os.umask(0o077)
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--db-path", metavar="PATH")
parser.add_argument("-k", "--key-location", metavar="PATH")
args = parser.parse_args()
db_dir = xdg.BaseDirectory.save_data_path("nullroute.eu.org/secretsd")
db_path = os.path.join(db_dir, "secrets.db")
db_path = args.db_path or db_path
key_path = os.path.join(db_dir, "secrets.key")
key_path = args.key_location or "file:%s" % key_path
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SessionBus()
sdb = SecretsDatabase(db_path, key_path)
svc = SecretService(bus, sdb)
loop = GLib.MainLoop()
loop.run()

View File

@ -0,0 +1,130 @@
import dbus
import dbus.service
from .exception import NoSuchObjectException
from .item import SecretServiceItemFallback
from .util import BusObjectWithProperties, NullObject
class SecretServiceCollectionFallback(dbus.service.FallbackObject, BusObjectWithProperties):
ROOT = "/org/freedesktop/secrets/collection"
PATH = "/org/freedesktop/secrets/collection/c%d"
def __init__(self, service, bus_path=ROOT):
self.service = service
self.bus_path = bus_path
super().__init__(self.service.bus, self.bus_path)
def resolve_path(self, path):
if path.startswith("/org/freedesktop/secrets/aliases/"):
orig = path
alias = path[len("/org/freedesktop/secrets/aliases/"):]
path = self.service.db.resolve_alias(alias)
if not path:
raise NoSuchObjectException(orig)
if not self.service.db.collection_exists(path):
raise NoSuchObjectException(path)
return path
def get_items(self, path):
path = self.resolve_path(path)
items = self.service.db.find_items({"xdg:collection": path})
return dbus.Array(items, "o")
def get_label(self, path):
path = self.resolve_path(path)
meta = self.service.db.get_collection_metadata(path)
return dbus.String(meta[0])
def set_label(self, path, value):
path = self.resolve_path(path)
label = str(value)
self.service.db.set_collection_label(path, label)
def get_created(self, path):
path = self.resolve_path(path)
meta = self.service.db.get_collection_metadata(path)
return dbus.UInt64(meta[1])
def get_modified(self, path):
path = self.resolve_path(path)
meta = self.service.db.get_collection_metadata(path)
return dbus.UInt64(meta[2])
INTERFACE = "org.freedesktop.Secret.Collection"
PROPERTIES = {
"Items": (get_items, None, dbus.Array([], "o")),
"Label": (get_label, set_label, dbus.String("")),
"Locked": (None, None, dbus.Boolean(False)),
"Created": (get_created, None, dbus.UInt64(0)),
"Modified": (get_modified, None, dbus.UInt64(0)),
}
@dbus.service.method("org.freedesktop.Secret.Collection", "a{sv}(oayays)b", "oo",
sender_keyword="sender", path_keyword="path", byte_arrays=True)
def CreateItem(self, properties, secret, replace,
sender=None, path=None):
path = self.resolve_path(path)
label = properties["org.freedesktop.Secret.Item.Label"]
attrs = properties["org.freedesktop.Secret.Item.Attributes"]
attrs.setdefault("xdg:collection", path)
attrs.setdefault("xdg:schema", "org.freedesktop.Secret.Generic")
sec_session, sec_param, sec_ct, sec_type = secret
sec_session = self.service.path_objects[sec_session]
sec_data = sec_session.decrypt(sec_ct, sec_param)
existing = self.service.db.find_items(attrs) if replace else []
if existing:
bus_path = existing[0]
self.service.db.set_item_attributes(bus_path, attrs)
self.service.db.set_secret(bus_path, sec_data, sec_type)
self.service.send_signal(path, "org.freedesktop.Secret.Collection",
"ItemChanged",
"o",
bus_path)
else:
bus_path = self.service.make_bus_path(True, "/org/freedesktop/secrets/item/i%d")
self.service.db.add_item(bus_path, label, attrs, sec_data, sec_type)
self.service.send_signal(path, "org.freedesktop.Secret.Collection",
"ItemCreated",
"o",
bus_path)
self.service.send_signal(path, "org.freedesktop.DBus.Properties",
"PropertiesChanged",
"sa{sv}as",
"org.freedesktop.Secret.Collection",
{"Items": self.get_items(path)},
[])
return (dbus.ObjectPath(bus_path), NullObject)
@dbus.service.method("org.freedesktop.Secret.Collection", "", "o",
path_keyword="path")
def Delete(self, path=None):
path = self.resolve_path(path)
self.service.db.delete_collection(path)
self.service.CollectionDeleted(path)
self.service.PropertiesChanged("org.freedesktop.Secret.Service",
{"Collections": self.service.get_collections()},
[])
return NullObject
@dbus.service.method("org.freedesktop.Secret.Collection", "a{ss}", "ao",
path_keyword="path")
def SearchItems(self, attributes, path=None):
path = self.resolve_path(path)
attributes["xdg:collection"] = path
items = self.service.db.find_items(attributes)
return items
@dbus.service.signal("org.freedesktop.Secret.Collection", "o")
def ItemCreated(self, bus_path):
pass
@dbus.service.signal("org.freedesktop.Secret.Collection", "o")
def ItemDeleted(self, bus_path):
pass
@dbus.service.signal("org.freedesktop.Secret.Collection", "o")
def ItemChanged(self, bus_path):
pass

View File

@ -0,0 +1,132 @@
import os
__all__ = [
"AES_BLOCK_BYTES",
"aes_cbc_encrypt",
"aes_cbc_decrypt",
"aes_cfb8_encrypt",
"aes_cfb8_decrypt",
"dh_modp1024_exchange",
"hkdf_sha256_derive",
"pkcs7_pad",
"pkcs7_unpad",
]
# Second Oakley group (RFC 2409), to be used as "dh-ietf1024" in the 'Secret
# Service' API. Don't look at me. I didn't write the spec.
MODP1024_PRIME = int("FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD1"
"29024E088A67CC74020BBEA63B139B22514A08798E3404DD"
"EF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245"
"E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7ED"
"EE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381"
"FFFFFFFFFFFFFFFF", 16)
MODP1024_GENERATOR = 2
backend = os.environ.get("CRYPTO_BACKEND", "cryptography")
if backend == "cryptodome":
from Crypto.Cipher import AES
from Crypto.Hash import SHA256
from Crypto.Protocol.KDF import HKDF
from Crypto.Random.random import randint
from Crypto.Util import Padding
AES_BLOCK_BYTES = AES.block_size
def aes_cbc_encrypt(data, key, iv):
return AES.new(key, AES.MODE_CBC, iv).encrypt(data)
def aes_cbc_decrypt(data, key, iv):
return AES.new(key, AES.MODE_CBC, iv).decrypt(data)
def aes_cfb8_encrypt(data, key, iv):
return AES.new(key, AES.MODE_CFB, iv, segment_size=8).encrypt(data)
def aes_cfb8_decrypt(data, key, iv):
return AES.new(key, AES.MODE_CFB, iv, segment_size=8).decrypt(data)
def aes_cfb128_encrypt(data, key, iv):
return AES.new(key, AES.MODE_CFB, iv, segment_size=128).encrypt(data)
def aes_cfb128_decrypt(data, key, iv):
return AES.new(key, AES.MODE_CFB, iv, segment_size=128).decrypt(data)
def dh_modp1024_exchange(peer_pubkey):
prime = MODP1024_PRIME
generator = MODP1024_GENERATOR
our_privkey = randint(1, prime-1)
our_pubkey = pow(generator, our_privkey, prime)
shared_key = pow(peer_pubkey, our_privkey, prime)
shared_key = shared_key.to_bytes(1024 // 8, "big")
return our_pubkey, shared_key
def hkdf_sha256_derive(input, nbytes):
return HKDF(input, nbytes, b"", hashmod=SHA256)
def pkcs7_pad(data, size):
return Padding.pad(data, size, style="pkcs7")
def pkcs7_unpad(data, size):
return Padding.unpad(data, size, style="pkcs7")
print("using 'PyCryptodome' as crypto backend")
elif backend == "cryptography":
from cryptography.hazmat.primitives.asymmetric import dh
from cryptography.hazmat.primitives.ciphers import Cipher
from cryptography.hazmat.primitives.ciphers.algorithms import AES
from cryptography.hazmat.primitives.ciphers.modes import CBC, CFB, CFB8
from cryptography.hazmat.primitives.hashes import SHA256
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.padding import PKCS7
AES_BLOCK_BYTES = AES.block_size // 8
def aes_cbc_encrypt(data, key, iv):
c = Cipher(AES(key), CBC(iv)).encryptor()
return c.update(data) + c.finalize()
def aes_cbc_decrypt(data, key, iv):
c = Cipher(AES(key), CBC(iv)).decryptor()
return c.update(data) + c.finalize()
def aes_cfb8_encrypt(data, key, iv):
c = Cipher(AES(key), CFB8(iv)).encryptor()
return c.update(data) + c.finalize()
def aes_cfb8_decrypt(data, key, iv):
c = Cipher(AES(key), CFB8(iv)).decryptor()
return c.update(data) + c.finalize()
def aes_cfb128_encrypt(data, key, iv):
c = Cipher(AES(key), CFB(iv)).encryptor()
return c.update(data) + c.finalize()
def aes_cfb128_decrypt(data, key, iv):
c = Cipher(AES(key), CFB(iv)).decryptor()
return c.update(data) + c.finalize()
def dh_modp1024_exchange(peer_pubkey):
group = dh.DHParameterNumbers(p=MODP1024_PRIME, g=MODP1024_GENERATOR)
peer_pubkey = dh.DHPublicNumbers(peer_pubkey, group).public_key()
our_privkey = group.parameters().generate_private_key()
shared_key = our_privkey.exchange(peer_pubkey)
our_pubkey = our_privkey.public_key().public_numbers().y
return our_pubkey, shared_key
def hkdf_sha256_derive(input, nbytes):
k = HKDF(algorithm=SHA256(), length=nbytes, salt=b"", info=b"")
return k.derive(input)
def pkcs7_pad(data, size):
p = PKCS7(size * 8).padder()
return p.update(data) + p.finalize()
def pkcs7_unpad(data, size):
p = PKCS7(size * 8).unpadder()
return p.update(data) + p.finalize()
print("using 'python-cryptography' as crypto backend")
else:
raise RuntimeError("unsupported crypto backend %r" % backend)

View File

@ -0,0 +1,377 @@
import base64
import sqlite3
import time
from .encryption import (generate_key,
aes_cfb8_wrap, aes_cfb8_unwrap,
aes_cfb128_wrap, aes_cfb128_unwrap)
from .external_keys import load_ext_key, store_ext_key
class SecretsDatabase():
def __init__(self, path, key_path):
self.db = sqlite3.connect(path)
self.kp = key_path
self.mk = None
self.dk = None
self.ver = 0
self.initialize()
self.upgrade()
self.load_keys()
def initialize(self):
cur = self.db.cursor()
cur.execute("CREATE TABLE IF NOT EXISTS version (" \
" version INTEGER" \
")")
cur.execute("CREATE TABLE IF NOT EXISTS sequence (" \
" next INTEGER" \
")")
cur.execute("CREATE TABLE IF NOT EXISTS parameters (" \
" name TEXT," \
" value TEXT" \
")")
cur.execute("CREATE TABLE IF NOT EXISTS collections (" \
" object TEXT," \
" label TEXT," \
" created INTEGER," \
" modified INTEGER" \
")")
cur.execute("CREATE TABLE IF NOT EXISTS aliases (" \
" alias TEXT," \
" target TEXT" \
")")
cur.execute("CREATE TABLE IF NOT EXISTS items (" \
" object TEXT," \
" label TEXT," \
" created INTEGER," \
" modified INTEGER" \
")")
cur.execute("CREATE TABLE IF NOT EXISTS attributes (" \
" object TEXT," \
" attribute TEXT," \
" value TEXT" \
")")
cur.execute("CREATE TABLE IF NOT EXISTS secrets (" \
" object TEXT," \
" secret TEXT," \
" type TEXT" \
")")
self.db.commit()
# Encryption keys
def _store_mkey(self, key):
print("DB: storing master key to %r" % (self.kp))
store_ext_key(self.kp, base64.b64encode(key).decode())
def _load_mkey(self):
print("DB: loading master key from %r" % (self.kp))
try:
mkey = base64.b64decode(load_ext_key(self.kp))
if len(mkey) != 32:
raise IOError("wrong mkey length (expected 32 bytes)")
except (KeyError, FileNotFoundError):
raise RuntimeError("could not load the database key from %r" % (self.kp))
self.mk = mkey
def _load_dkey(self, *, v=0):
if (v or self.ver) == 3:
cur = self.db.cursor()
cur.execute("SELECT value FROM parameters WHERE name = 'dkey'")
dkey, = cur.fetchone()
try:
dkey = self._decrypt_buf(dkey, with_mkey=True, v=3)
except IOError as e:
raise IOError("wrong mkey (%s)" % e)
if len(dkey) != 32:
raise IOError("wrong dkey length (expected 32 bytes)")
self.dk = dkey
else:
raise NotImplementedError("unknown schema version %r" % (v or self.ver))
def load_keys(self):
if self.ver >= 2:
self._load_mkey()
self._load_dkey()
def _encrypt_buf(self, buf, *, with_mkey=False, v=0):
key = self.mk if with_mkey else self.dk
if (v or self.ver) >= 3:
return aes_cfb128_wrap(buf, key)
elif (v or self.ver) == 2:
return aes_cfb8_wrap(buf, key)
else:
raise NotImplementedError("unknown schema version %r" % (v or self.ver))
def _decrypt_buf(self, buf, *, with_mkey=None, v=0):
key = self.mk if with_mkey else self.dk
if (v or self.ver) >= 3:
return aes_cfb128_unwrap(buf, key)
elif (v or self.ver) == 2:
return aes_cfb8_unwrap(buf, key)
else:
raise NotImplementedError("unknown schema version %r" % (v or self.ver))
# Schema upgrades
def _upgrade_v0_to_v1(self):
# Undo commit affc514 "make items use bus paths underneath their collection"
cur = self.db.cursor()
cur.execute("SELECT object FROM items" \
" WHERE object LIKE '/org/freedesktop/secrets/collection/c%/i%'")
res = cur.fetchall()
for (old_object,) in res:
item_id = old_object.split("/")[-1]
new_object = "/org/freedesktop/secrets/item/%s" % item_id
print("DB: moving object %r => %r" % (old_object, new_object))
cur.execute("UPDATE items SET object = ? WHERE object = ?",
(new_object, old_object))
cur.execute("UPDATE secrets SET object = ? WHERE object = ?",
(new_object, old_object))
cur.execute("UPDATE attributes SET object = ? WHERE object = ?",
(new_object, old_object))
def _upgrade_v1_to_v3(self):
# Version 2 encrypts all secrets using the database master key
cur = self.db.cursor()
# Generate a "master key"
print("DB: generating a master key")
mkey = generate_key()
self._store_mkey(mkey)
self.mk = mkey
# Generate a "data key"
print("DB: generating a data key")
dkey = generate_key()
blob = self._encrypt_buf(dkey, with_mkey=True, v=3)
cur.execute("INSERT INTO parameters VALUES ('dkey', ?)", (blob,))
self.dk = dkey
# Encrypt all currently stored secrets
cur.execute("SELECT object, secret FROM secrets")
res = cur.fetchall()
for object, blob in res:
print("DB: encrypting secret %r" % (object,))
blob = self._encrypt_buf(blob, v=3)
cur.execute("UPDATE secrets SET secret = ? WHERE object = ?", (blob, object))
def _upgrade_v2_to_v3(self):
# Version 3 uses AES-CFB128 instead of (badly chosen) AES-CFB8
cur = self.db.cursor()
# Re-encrypt the data key
self._load_mkey()
cur.execute("SELECT value FROM parameters WHERE name = 'dkey'")
blob, = cur.fetchone()
blob = self._decrypt_buf(blob, with_mkey=True, v=2)
blob = self._encrypt_buf(blob, with_mkey=True, v=3)
cur.execute("UPDATE parameters SET value = ? WHERE name = 'dkey'", (blob,))
# Re-encrypt all currently stored secrets
self._load_dkey(v=3)
cur.execute("SELECT object, secret FROM secrets")
res = cur.fetchall()
for object, blob in res:
print("DB: re-encrypting secret %r" % (object,))
blob = self._decrypt_buf(blob, v=2)
blob = self._encrypt_buf(blob, v=3)
cur.execute("UPDATE secrets SET secret = ? WHERE object = ?", (blob, object))
def upgrade(self):
print("DB: current database version is %d" % self.get_version())
if self.get_version() == 0:
print("DB: upgrading to version %d" % (1,))
self._upgrade_v0_to_v1()
self.db.cursor().execute("UPDATE version SET version = ?", (1,))
self.db.commit()
if self.get_version() == 1:
print("DB: upgrading to version %d" % (3,))
self._upgrade_v1_to_v3()
self.db.cursor().execute("UPDATE version SET version = ?", (3,))
self.db.commit()
print("DB: vacuuming database")
self.db.cursor().execute("VACUUM")
if self.get_version() == 2:
print("DB: upgrading to version %d" % (3,))
self._upgrade_v2_to_v3()
self.db.cursor().execute("UPDATE version SET version = ?", (3,))
self.db.commit()
self.ver = self.get_version()
print("DB: new database version is %d" % self.ver)
def get_version(self):
cur = self.db.cursor()
cur.execute("SELECT version FROM version")
res = cur.fetchone()
if res:
version = res[0]
else:
version = 0
cur.execute("INSERT INTO version VALUES (?)", (version,))
self.db.commit()
return version
def get_next_object_id(self):
cur = self.db.cursor()
cur.execute("SELECT next FROM sequence")
res = cur.fetchone()
if res:
oid = res[0]
cur.execute("UPDATE sequence SET next = next + 1")
else:
oid = 0
cur.execute("INSERT INTO sequence VALUES (?)", (oid + 1,))
self.db.commit()
print("DB: allocated new object ID %r" % oid)
return oid
# Collections
def add_collection(self, object, label):
print("DB: adding collection %r with label %r" % (object, label))
now = int(time.time())
cur = self.db.cursor()
cur.execute("INSERT INTO collections VALUES (?,?,?,?)", (object, label, now, now))
self.db.commit()
def list_collections(self):
cur = self.db.cursor()
cur.execute("SELECT object FROM collections")
return [r[0] for r in cur.fetchall()]
def collection_exists(self, object):
return bool(self.get_collection_metadata(object))
def get_collection_metadata(self, object):
print("DB: getting collection metadata for %r" % (object,))
cur = self.db.cursor()
cur.execute("SELECT label, created, modified FROM collections WHERE object = ?",
(object,))
return cur.fetchone()
def set_collection_label(self, object, label):
print("DB: setting label for %r to %r" % (object, label))
now = int(time.time())
cur = self.db.cursor()
cur.execute("UPDATE collections SET label = ?, modified = ? WHERE object = ?",
(label, now, object))
self.db.commit()
def delete_collection(self, object):
print("DB: deleting collection %r" % (object,))
cur = self.db.cursor()
subquery = "SELECT object FROM attributes" \
" WHERE attribute = 'xdg:collection' AND value = ?"
cur.execute("DELETE FROM items WHERE object IN (" + subquery + ")", (object,))
cur.execute("DELETE FROM secrets WHERE object IN (" + subquery + ")", (object,))
cur.execute("DELETE FROM attributes WHERE object IN (" + subquery + ")", (object,))
cur.execute("DELETE FROM aliases WHERE target = ?", (object,))
cur.execute("DELETE FROM collections WHERE object = ?", (object,))
self.db.commit()
# Aliases
def add_alias(self, alias, target):
print("DB: adding alias %r -> %r" % (alias, target))
cur = self.db.cursor()
cur.execute("DELETE FROM aliases WHERE alias = ?", (alias,))
cur.execute("INSERT INTO aliases VALUES (?,?)", (alias, target))
self.db.commit()
def get_aliases(self):
cur = self.db.cursor()
cur.execute("SELECT alias, target FROM aliases")
return cur.fetchall()
def resolve_alias(self, alias):
print("DB: resolving alias %r" % (alias,))
cur = self.db.cursor()
cur.execute("SELECT target FROM aliases WHERE alias = ?", (alias,))
r = cur.fetchone()
return r[0] if r else None
def delete_alias(self, alias):
print("DB: deleting alias %r" % (alias,))
cur = self.db.cursor()
cur.execute("DELETE FROM aliases WHERE alias = ?", (alias,))
self.db.commit()
# Items
def add_item(self, object, label, attrs, secret, sec_type):
now = int(time.time())
cur = self.db.cursor()
cur.execute("INSERT INTO items VALUES (?,?,?,?)", (object, label, now, now))
for key, val in attrs.items():
cur.execute("INSERT INTO attributes VALUES (?,?,?)", (object, key, val))
cur.execute("INSERT INTO secrets VALUES (?,?,?)", (object, self._encrypt_buf(secret),
sec_type))
self.db.commit()
def find_items(self, match_attrs):
qry = "SELECT object FROM attributes WHERE attribute = ? AND value = ?"
qry = " INTERSECT ".join([qry] * len(match_attrs))
parvs = []
for k, v in match_attrs.items():
parvs += [k, v]
print("DB: searching for %r" % parvs)
cur = self.db.cursor()
cur.execute(qry, parvs)
return [r[0] for r in cur.fetchall()]
def item_exists(self, object):
return bool(self.get_item_metadata(object))
def get_item_metadata(self, object):
print("DB: getting metadata for %r" % object)
cur = self.db.cursor()
cur.execute("SELECT label, created, modified FROM items WHERE object = ?",
(object,))
return cur.fetchone()
def set_item_label(self, object, label):
print("DB: setting label for %r to %r" % (object, label))
now = int(time.time())
cur = self.db.cursor()
cur.execute("UPDATE items SET label = ?, modified = ? WHERE object = ?",
(label, now, object))
self.db.commit()
def get_item_attributes(self, object):
print("DB: getting attrs for %r" % object)
cur = self.db.cursor()
cur.execute("SELECT attribute, value FROM attributes WHERE object = ?", (object,))
return {k: v for k, v in cur.fetchall()}
def set_item_attributes(self, object, attrs):
print("DB: setting attrs for %r to %r" % (object, attrs))
now = int(time.time())
cur = self.db.cursor()
cur.execute("DELETE FROM attributes WHERE object = ?", (object,))
for key, val in attrs.items():
cur.execute("INSERT INTO attributes VALUES (?,?,?)", (object, key, val))
cur.execute("UPDATE items SET modified = ? WHERE object = ?", (now, object))
self.db.commit()
def get_secret(self, object):
print("DB: getting secret for %r" % object)
cur = self.db.cursor()
cur.execute("SELECT secret, type FROM secrets WHERE object = ?", (object,))
secret, sec_type = cur.fetchone()
return self._decrypt_buf(secret), sec_type
def set_secret(self, object, secret, sec_type):
print("DB: updating secret for %r" % object)
if hasattr(secret, "encode"):
raise ValueError("secret needs to be bytes, not str")
now = int(time.time())
cur = self.db.cursor()
cur.execute("UPDATE secrets SET secret = ?, type = ? WHERE object = ?",
(self._encrypt_buf(secret), sec_type, object))
cur.execute("UPDATE items SET modified = ? WHERE object = ?",
(now, object))
self.db.commit()
def delete_item(self, object):
print("DB: deleting item %r" % object)
cur = self.db.cursor()
cur.execute("DELETE FROM attributes WHERE object = ?", (object,))
cur.execute("DELETE FROM secrets WHERE object = ?", (object,))
cur.execute("DELETE FROM items WHERE object = ?", (object,))
self.db.commit()

View File

@ -0,0 +1,55 @@
import base64
import hmac
import os
from .crypto_backend import (
AES_BLOCK_BYTES,
aes_cfb8_encrypt,
aes_cfb8_decrypt,
aes_cfb128_encrypt,
aes_cfb128_decrypt,
)
KEY_SIZE_BYTES = 32
SHA256_HMAC_BYTES = 32
def _xor_bytes(a, b):
assert(len(a) == len(b))
return bytes([ax ^ bx for ax, bx in zip(a, b)])
def _fold_key(buf):
assert(len(buf) == 32)
return _xor_bytes(buf[:16], buf[16:])
def generate_key():
return os.urandom(KEY_SIZE_BYTES)
def sha256_hmac(buf, key):
return hmac.new(key, buf, digestmod="sha256").digest()
def aes_cfb8_wrap(data, key):
iv = os.urandom(AES_BLOCK_BYTES)
ct = aes_cfb8_encrypt(data, key, iv)
buf = iv + ct
return sha256_hmac(buf, key) + buf
def aes_cfb8_unwrap(buf, key):
mac, buf = buf[:SHA256_HMAC_BYTES], buf[SHA256_HMAC_BYTES:]
if sha256_hmac(buf, key) != mac:
raise IOError("MAC verification failed")
iv, ct = buf[:AES_BLOCK_BYTES], buf[AES_BLOCK_BYTES:]
return aes_cfb8_decrypt(ct, key, iv)
def aes_cfb128_wrap(data, key):
iv = os.urandom(AES_BLOCK_BYTES)
ct = aes_cfb128_encrypt(data, key, iv)
buf = iv + ct
return sha256_hmac(buf, key) + buf
def aes_cfb128_unwrap(buf, key):
mac, buf = buf[:SHA256_HMAC_BYTES], buf[SHA256_HMAC_BYTES:]
if sha256_hmac(buf, key) != mac:
raise IOError("MAC verification failed")
iv, ct = buf[:AES_BLOCK_BYTES], buf[AES_BLOCK_BYTES:]
return aes_cfb128_decrypt(ct, key, iv)

View File

@ -0,0 +1,18 @@
import dbus
class InvalidArgsException(dbus.DBusException):
_dbus_error_name = "org.freedesktop.DBus.Error.InvalidArgs"
class NotYetImplementedException(dbus.DBusException):
_dbus_error_name = "org.freedesktop.DBus.Error.NotSupported"
def __init__(self):
super().__init__("TODO: Not implemented")
class IsLockedException(dbus.DBusException):
_dbus_error_name = "org.freedesktop.Secret.Error.IsLocked"
class NoSessionException(dbus.DBusException):
_dbus_error_name = "org.freedesktop.Secret.Error.NoSession"
class NoSuchObjectException(dbus.DBusException):
_dbus_error_name = "org.freedesktop.Secret.Error.NoSuchObject"

View File

@ -0,0 +1,106 @@
# This module can load passwords from external sources, such as files or
# environment variables or KWallet (and no, *not* libsecret -- as much as I
# want to use the nifty gi-based libsecret API...).
#
# It deals with strings (the lowest common denominator), but it's really meant
# to be used for storing Base64-encoded keys and not raw passwords.
import dbus
import os
import re
import subprocess
class KWalletClient():
app_id = "org.eu.nullroute.Secretsd"
folder = "Passwords"
def __init__(self):
self.bus = dbus.SessionBus()
self.mgr = self.bus.get_object("org.kde.kwalletd5", "/modules/kwalletd5")
self.mgr = dbus.Interface(self.mgr, "org.kde.KWallet")
def __enter__(self):
self.wallet = self.mgr.localWallet()
self.handle = self.mgr.open(self.wallet, 0, self.app_id)
return self
def __exit__(self, *argv):
self.mgr.disconnectApplication(self.wallet, self.app_id)
def get_password(self, name):
if self.mgr.hasEntry(self.handle, self.folder, name, self.app_id):
return str(self.mgr.readPassword(self.handle, self.folder, name, self.app_id))
else:
raise KeyError(name)
def set_password(self, name, value):
self.mgr.writePassword(self.handle, self.folder, name, value, self.app_id)
def _parse_specifier(source):
m = re.match(r"^(\w+):(.*)", source)
if m:
return m.groups()
else:
# Too easy to end up saving keys in a file named 'kwallet'...
#return "file", source
raise ValueError("key location must be specified as 'type:rest'")
def load_ext_key(source):
kind, rest = _parse_specifier(source)
if kind == "env":
return os.environ[rest]
elif kind == "exec":
res = subprocess.run(rest, shell=True,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
check=True)
return res.stdout.decode().strip()
elif kind == "file":
with open(rest, "r") as fh:
return fh.read().strip()
elif kind == "kwallet":
with KWalletClient() as kw:
return kw.get_password(rest or "secretsd master key")
elif kind == "libsecret":
raise ValueError("cannot load external key from myself")
else:
raise ValueError("unknown external key source %r" % kind)
def store_ext_key(source, key):
kind, rest = _parse_specifier(source)
if kind == "env":
raise ValueError("environment is volatile storage, cannot store keys there")
elif kind == "exec":
# XXX: Should there be a way to distinguish whether the command is
# invoked for load vs store, or should I just remove storing to exec:
# because this is an advanced operation and the user can just emulate
# it with a temporary file?
res = subprocess.run(rest, shell=True,
input=key.encode(),
check=True)
elif kind == "file":
with open(rest, "w", opener=lambda p, f: os.open(p, f, 0o400)) as fh:
fh.write(key)
elif kind == "kwallet":
with KWalletClient() as kw:
kw.set_password(rest or "secretsd master key", key)
elif kind == "libsecret":
raise ValueError("cannot store external key in myself")
else:
raise ValueError("unknown external key source %r" % kind)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("specifier")
parser.add_argument("--store-key", metavar="KEY", help="store the specified key (as text)")
args = parser.parse_args()
arg = args.specifier
if key := args.store_key:
print(f"Storing key {key!r} to {arg!r}")
store_ext_key(arg, key)
print(f"Retrieving key from {arg!r}")
key = load_ext_key(arg)
print(f"The key is {key!r}")

View File

@ -0,0 +1,96 @@
import dbus
import dbus.service
from .util import BusObjectWithProperties, NullObject
class SecretServiceItemFallback(dbus.service.FallbackObject, BusObjectWithProperties):
ROOT = "/org/freedesktop/secrets/item"
PATH = "/org/freedesktop/secrets/item/i%d"
def __init__(self, service, bus_path=ROOT):
self.service = service
self.bus_path = bus_path
super().__init__(self.service.bus, self.bus_path)
def get_collection(self, path):
attrs = self.service.db.get_item_attributes(path)
if attrs is None:
raise NoSuchObjectException(path)
return attrs["xdg:collection"]
def get_attributes(self, path):
attrs = self.service.db.get_item_attributes(path)
if attrs is None:
raise NoSuchObjectException(path)
attrs.setdefault("xdg:schema", "org.freedesktop.Secret.Generic")
return attrs
def set_attributes(self, path, value):
if not self.service.db.item_exists(path):
raise NoSuchObjectException(path)
attrs["xdg:collection"] = self.get_collection(path)
self.service.db.set_item_attributes(path, value)
def get_label(self, path):
meta = self.service.db.get_item_metadata(path)
if not meta:
raise NoSuchObjectException(path)
return meta[0]
def set_label(self, path, value):
if not self.service.db.item_exists(path):
raise NoSuchObjectException(path)
self.service.db.set_item_label(path, value)
def get_created(self, path):
meta = self.service.db.get_item_metadata(path)
if not meta:
raise NoSuchObjectException(path)
return dbus.UInt64(meta[1])
def get_modified(self, path):
meta = self.service.db.get_item_metadata(path)
if not meta:
raise NoSuchObjectException(path)
return dbus.UInt64(meta[2])
INTERFACE = "org.freedesktop.Secret.Item"
PROPERTIES = {
"Attributes": (get_attributes, set_attributes, None),
"Created": (get_created, None, None),
"Label": (get_label, set_label, None),
"Locked": (None, None, False),
"Modified": (get_modified, None, None),
}
@dbus.service.method("org.freedesktop.Secret.Item", "", "o",
path_keyword="path")
def Delete(self, path=None):
coll = self.get_collection(path)
self.service.db.delete_item(path)
self.service.send_signal(coll, "org.freedesktop.Secret.Collection",
"ItemDeleted",
"o",
path)
self.service.send_signal(coll, "org.freedesktop.DBus.Properties",
"PropertiesChanged",
"sa{sv}as",
"org.freedesktop.Secret.Collection",
{"Items": self.service.fallback_collection.get_items(coll)},
[])
return NullObject
@dbus.service.method("org.freedesktop.Secret.Item", "o", "(oayays)",
path_keyword="path")
def GetSecret(self, session, path=None):
session = self.service.path_objects[session]
sec_data, sec_type = self.service.db.get_secret(path)
sec_ct, sec_iv = session.encrypt(sec_data)
return (session.bus_path, sec_iv, sec_ct, sec_type)
@dbus.service.method("org.freedesktop.Secret.Item", "(oayays)", "",
path_keyword="path")
def SetSecret(self, secret, path=None):
session, sec_param, sec_ct, sec_type = secret
session = self.service.path_objects[session]
secret = session.decrypt(sec_ct, sec_param)
self.service.db.set_secret(path, secret)

View File

@ -0,0 +1,147 @@
from collections import defaultdict
import dbus
import dbus.lowlevel
import dbus.service
import time
from .collection import SecretServiceCollectionFallback
from .exception import *
from .item import SecretServiceItemFallback
from .session import SecretServiceSession
from .util import BusObjectWithProperties, NullObject
def encode_path_component(value):
return "".join([c if c.isalnum() else "_%02x" % ord(c) for c in value])
class SecretService(dbus.service.Object, BusObjectWithProperties):
def __init__(self, bus, sdb):
self.bus = bus
self.db = sdb
self.bus_name = dbus.service.BusName("org.freedesktop.secrets", self.bus)
self.path_objects = {}
self.next_object = 0
self.client_objects = defaultdict(list)
super().__init__(self.bus, "/org/freedesktop/secrets")
self.fallback_item = SecretServiceItemFallback(self)
self.fallback_collection = SecretServiceCollectionFallback(self)
self.fallback_alias = SecretServiceCollectionFallback(self, "/org/freedesktop/secrets/aliases")
def get_collections(self, path=None):
collections = self.db.list_collections()
return dbus.Array(collections, "o")
INTERFACE = "org.freedesktop.Secret.Service"
PROPERTIES = {
"Collections": (get_collections, None, None),
}
def make_bus_path(self, persist, template):
if persist:
bus_path = template % self.db.get_next_object_id()
else:
bus_path = template % self.next_object
self.next_object += 1
return bus_path
def make_object(self, sender, persist, type, *args, **kwargs):
bus_path = self.make_bus_path(persist, type.PATH)
object = type(self, bus_path, *args, **kwargs)
self.path_objects[bus_path] = object
if sender:
self.client_objects[sender].append(bus_path)
return object
def gc_client(self, sender):
# TODO: hook this up
if sender in self.client_objects:
for path in self.client_objects[sender]:
del self.path_objects[path]
del self.client_objects[sender]
def send_signal(self, path, interface, member, signature, *args):
msg = dbus.lowlevel.SignalMessage(path, interface, member)
msg.append(*args, signature=signature)
self.bus.send_message(msg)
## bus methods
@dbus.service.method("org.freedesktop.Secret.Service", "a{sv}s", "oo",
sender_keyword="sender")
def CreateCollection(self, properties, alias,
sender=None):
label = properties["org.freedesktop.Secret.Collection.Label"]
bus_path = self.make_bus_path(True, "/org/freedesktop/secrets/collection/c%d")
self.db.add_collection(bus_path, label)
if alias:
self.db.add_alias(alias, bus_path)
self.CollectionCreated(bus_path)
self.PropertiesChanged("org.freedesktop.Secret.Service",
{"Collections": self.get_collections()},
[])
return (dbus.ObjectPath(bus_path), NullObject)
@dbus.service.method("org.freedesktop.Secret.Service", "aoo", "a{o(oayays)}")
def GetSecrets(self, items, session):
session = self.path_objects[session]
out = {}
for item_path in items:
sec_data, sec_type = self.db.get_secret(item_path)
sec_ct, sec_iv = session.encrypt(sec_data)
out[item_path] = (session.bus_path, sec_iv, sec_ct, sec_type)
return out
@dbus.service.method("org.freedesktop.Secret.Service", "sv", "vo",
sender_keyword="sender",
byte_arrays=True)
def OpenSession(self, algorithm, input, sender=None):
session = self.make_object(sender, False, SecretServiceSession, algorithm)
output, done = session.kex(input)
if done:
return (output, session.bus_path)
else:
return (output, NullObject)
@dbus.service.method("org.freedesktop.Secret.Service", "s", "o")
def ReadAlias(self, alias):
path = self.db.resolve_alias(alias)
return dbus.ObjectPath(path or NullObject)
@dbus.service.method("org.freedesktop.Secret.Service", "so", "")
def SetAlias(self, alias, collection):
if alias != "default":
raise dbus.DBusException("Only the 'default' alias is supported",
name="org.freedesktop.DBus.Error.NotSupported")
if not self.db.collection_exists(collection):
raise dbus.DBusException("Collection with path %r not found" % (str(collection),),
name="org.freedesktop.DBus.Error.InvalidArgs")
self.db.add_alias(alias, collection)
@dbus.service.method("org.freedesktop.Secret.Service", "a{ss}", "aoao")
def SearchItems(self, attributes):
unlocked_items = self.db.find_items(attributes)
locked_items = []
return unlocked_items, locked_items
@dbus.service.method("org.freedesktop.Secret.Service", "ao", "aoo")
def Lock(self, objects):
print("TODO: Service.Lock(%r)" % objects)
raise NotYetImplementedException()
@dbus.service.method("org.freedesktop.Secret.Service", "ao", "aoo")
def Unlock(self, objects):
return (objects, NullObject)
@dbus.service.signal("org.freedesktop.Secret.Service", "o")
def CollectionCreated(self, bus_path):
pass
@dbus.service.signal("org.freedesktop.Secret.Service", "o")
def CollectionDeleted(self, bus_path):
pass
@dbus.service.signal("org.freedesktop.Secret.Service", "o")
def CollectionChanged(self, bus_path):
pass

View File

@ -0,0 +1,55 @@
import dbus
import dbus.service
import os
from .crypto_backend import (
AES_BLOCK_BYTES,
aes_cbc_encrypt,
aes_cbc_decrypt,
dh_modp1024_exchange,
hkdf_sha256_derive,
pkcs7_pad,
pkcs7_unpad,
)
class SecretServiceSession(dbus.service.Object):
ROOT = "/org/freedesktop/secrets/session"
PATH = "/org/freedesktop/secrets/session/s%d"
def __init__(self, service, bus_path, algorithm):
self.bus_path = bus_path
self.algorithm = algorithm
self.kex_done = False
self.crypt_key = None
super().__init__(service.bus, bus_path)
def kex(self, input):
if self.algorithm == "plain":
return (dbus.ByteArray(b""), True)
elif self.algorithm == "dh-ietf1024-sha256-aes128-cbc-pkcs7":
peer_pubkey = int.from_bytes(input, "big")
our_pubkey, shared_key = dh_modp1024_exchange(peer_pubkey)
self.crypt_key = hkdf_sha256_derive(shared_key, 128 // 8)
output = our_pubkey.to_bytes(1024 // 8, "big")
return (dbus.ByteArray(output), True)
else:
raise dbus.DBusException("org.freedesktop.DBus.Error.NotSupported")
def encrypt(self, input):
if self.algorithm == "plain":
return input, None
elif self.algorithm == "dh-ietf1024-sha256-aes128-cbc-pkcs7":
key = self.crypt_key
iv = os.urandom(AES_BLOCK_BYTES)
ct = pkcs7_pad(input, AES_BLOCK_BYTES)
ct = aes_cbc_encrypt(ct, key, iv)
return ct, iv
def decrypt(self, input, iv):
if self.algorithm == "plain":
return input
elif self.algorithm == "dh-ietf1024-sha256-aes128-cbc-pkcs7":
key = self.crypt_key
pt = aes_cbc_decrypt(input, key, iv)
pt = pkcs7_unpad(pt, AES_BLOCK_BYTES)
return pt

View File

@ -0,0 +1,55 @@
import dbus
import dbus.service
from .exception import InvalidArgsException
NullObject = dbus.ObjectPath("/")
class BusObjectWithProperties():
PROPERTIES = {}
@dbus.service.method("org.freedesktop.DBus.Properties", "ss", "v",
path_keyword="path")
def Get(self, interface, property, path=None):
if interface == self.INTERFACE:
if property in self.PROPERTIES:
getter, setter, value = self.PROPERTIES[property]
if getter:
value = getter(self, path)
return value
else:
raise InvalidArgsException("No such property %r" % str(property))
else:
raise InvalidArgsException("No such interface %r" % str(interface))
@dbus.service.method("org.freedesktop.DBus.Properties", "s", "a{sv}",
path_keyword="path")
def GetAll(self, interface, path=None):
if interface == self.INTERFACE:
out = {}
for name, (getter, setter, value) in self.PROPERTIES.items():
if getter:
value = getter(self, path)
out[name] = value
return out
else:
raise InvalidArgsException("No such interface %r" % str(interface))
@dbus.service.method("org.freedesktop.DBus.Properties", "ssv", "",
path_keyword="path")
def Set(self, interface, property, value, path=None):
if interface == self.INTERFACE:
if property in self.PROPERTIES:
getter, setter, _ = self.PROPERTIES[property]
if setter:
setter(self, path, value)
else:
raise InvalidArgsException("Property %r is read-only" % str(property))
else:
raise InvalidArgsException("No such property %r" % str(property))
else:
raise InvalidArgsException("No such interface %r" % str(interface))
@dbus.service.signal("org.freedesktop.DBus.Properties", "sa{sv}as")
def PropertiesChanged(self, interface, changed_props, invalidated_props):
pass

View File

@ -0,0 +1,11 @@
# ~/.config/systemd/user/secretsd.service
[Unit]
Description=Secret Storage Service
[Service]
Type=dbus
BusName=org.freedesktop.secrets
ExecStart=/usr/bin/env %h/.local/bin/secretsd
[Install]
WantedBy=default.target