Replace file-based identity store with a dedicated db table
This commit is contained in:
10 changed files with 38 additions and 195 deletions
@ -28,15 +28,6 @@ mail_case_insensitive = no
keyhome = /var/gpgmailgate/.gnupg
# Two options available:
# - file -- use GnuPG's pubring.kbx file as the only key store and cache its
# contents, reloading it on file modifications.
# - database -- use a relational database to store identities (when used, you
# must specify url parameter to specify which database to connect to).
type = file
# Only required when type==DatabaseKeyRing, specifies the database URL.
#url = file:///path/to/sqlite.db
@ -44,10 +44,14 @@ class KeyRing:
"""Load keyring, replacing any previous contents of the cache."""
raise NotImplementedError('KeyRing.load not implemented')
async def freeze_identities(self) -> KeyCache:
def freeze_identities(self) -> KeyCache:
"""Return a static, async-safe copy of the identity map."""
raise NotImplementedError('KeyRing.load not implemented')
def register(self, email: str, key_id: str):
"""Add a new (email,key) pair to the keystore."""
raise NotImplementedError('KeyRing.register not implemented')
def post_init_hook(self):
"""Lets the keyring perform additional operations following its initialisation."""
@ -20,8 +20,7 @@ MANDATORY_CONFIG_ITEMS = [("relay", "host"),
("relay", "port"),
("daemon", "host"),
("daemon", "port"),
("gpg", "keyhome"),
("keyring", "type")]
("gpg", "keyhome")]
# Global dict to keep configuration parameters. It's hidden behind several
# utility functions to make it easy to replace it with ConfigParser object in
@ -36,7 +36,7 @@ class MailEncryptionProxy:
"""Accept a message and either encrypt it or forward as-is."""
start = time.process_time()
keys = await self._keyring.freeze_identities()
keys = self._keyring.freeze_identities()
LOG.debug('Parsing message: %s', self._beginning(envelope))
message = email.message_from_bytes(envelope.original_content, policy=SMTPUTF8)
LOG.debug('Parsed into %s: %s', type(message), repr(message))
@ -130,10 +130,8 @@ async def _main():
loop = asyncio.get_event_loop()
mode = conf.get_item('keyring', 'type')
keyring = kcache.init_keyring(mode, loop = loop)
keyring = kcache.init_keyring()
controller = _init_controller(keyring, max_data_bytes)
@ -14,7 +14,7 @@ class KeyRingSchema:
self._id_table = self._identities()
def _identities(self):
lacre_id = sqlalchemy.Table('identities', self._meta,
lacre_id = sqlalchemy.Table('gpgmw_identities', self._meta,
sqlalchemy.Column('email', sqlalchemy.String(256), index=True),
sqlalchemy.Column('key_id', sqlalchemy.String(64), index=True))
return lacre_id
@ -40,7 +40,7 @@ class DatabaseKeyRing(KeyRing):
"""Do nothing, database contents doesn't need to be cached."""
async def freeze_identities(self) -> KeyCache:
def freeze_identities(self) -> KeyCache:
"""Return a static, async-safe copy of the identity map."""
return self._load_identities()
@ -1,148 +0,0 @@
"""File-based keyring.
It's a wrapper over GnuPG module that just executes gpg command.
import logging
import GnuPG
import copy
from os import stat
from import FileSystemEventHandler, FileSystemEvent
from watchdog.observers import Observer
from asyncio import Semaphore, create_task, get_event_loop, run
import lacre.text as text
import lacre.config as conf
from lacre._keyringcommon import KeyRing, KeyCache
LOG = logging.getLogger(__name__)
def _sanitize(keys):
sanitize = text.choose_sanitizer(conf.get_item('default', 'mail_case_insensitive'))
return {fingerprint: sanitize(keys[fingerprint]) for fingerprint in keys}
class FileKeyRing(KeyRing):
"""A high-level adapter for GnuPG-maintained keyring directory.
Its role is to keep a cache of keys present in the keyring,
reload it when necessary and produce static copies of
fingerprint=>email maps.
def __init__(self, path: str, loop=None):
"""Initialise the adapter."""
self._path = path
self._keys = self._load_and_sanitize()
self._sema = Semaphore()
self._last_mod = None
self._loop = loop or get_event_loop()
def _load_and_sanitize(self):
keys = self._load_keyring_from(self._path)
return _sanitize(keys)
def _load_keyring_from(self, keyring_dir):
return GnuPG.public_keys(keyring_dir)
async def freeze_identities(self) -> KeyCache:
"""Return a static, async-safe copy of the identity map."""
async with self._sema:
keys = copy.deepcopy(self._keys)
return KeyCache(keys)
def load(self):
"""Load keyring, replacing any previous contents of the cache."""
LOG.debug('Reloading keys...')
return create_task(self._load(), 'LoadTask')
async def _load(self):
last_mod = self._read_mod_time()
LOG.debug(f'Keyring was last modified: {last_mod}')
if self._is_modified(last_mod):
LOG.debug('Keyring has been modified')
async with self._sema:
LOG.debug('About to re-load the keyring')
LOG.debug('Keyring not modified recently, continuing')
self._last_mod = self._read_mod_time()
reload = load
def replace_keyring(self, keys: dict):
"""Overwrite previously stored key cache with KEYS."""
keys = _sanitize(keys)
||||'Storing {len(keys)} keys')
self._keys = keys
def _read_mod_time(self) -> int:
# (mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime)
# 0 1 2 3 4 5 6 7 8 9
st = stat(self._path)
return st[MTIME]
def _is_modified(self, last_mod):
if self._last_mod is None:
LOG.debug('Keyring not loaded before')
return True
elif self._last_mod != last_mod:
LOG.debug('Keyring directory mtime changed')
return True
LOG.debug('Keyring not modified ')
return False
def __repr__(self) -> str:
"""Return text representation of this keyring."""
return '<KeyRing path=%s last_mod=%d>' % (self._path, self._last_mod)
def post_init_hook(self):
self._reloader = init_reloader(self._path, self)
||||'Watching keyring directory {self._path}...')
def shutdown(self):
class KeyringModificationListener(FileSystemEventHandler):
"""A filesystem event listener that triggers key cache reload."""
def __init__(self, keyring: FileKeyRing):
"""Initialise a listener with a callback to be executed upon each change."""
self._keyring = keyring
def handle(self, event: FileSystemEvent):
"""Reload keys upon FS event."""
LOG.debug('FS event: %s, %s', event.event_type, event.src_path)
if 'pubring.kbx' in event.src_path:
||||'Reloading %s on event: %s', self._keyring, event)
# All methods should do the same: reload the key cache.
# on_created = handle
# on_deleted = handle
on_modified = handle
def init_reloader(keyring_dir: str, reloader) -> KeyringModificationListener:
"""Initialise a reloader for the keyring."""
listener = KeyringModificationListener(reloader)
observer = Observer()
observer.schedule(listener, keyring_dir, recursive=False)
return observer
def freeze_and_load_keys():
"""Load and return keys.
Doesn't refresh the keys when they change on disk.
keyring_dir = conf.get_item('gpg', 'keyhome')
keyring = FileKeyRing(keyring_dir)
return run(keyring.freeze_identities())
@ -6,23 +6,15 @@ module.
import lacre.config as conf
from lacre._keyringcommon import KeyRing, KeyCache
import lacre.filekeyring as fk
import lacre.dbkeyring as dbk
import logging
LOG = logging.getLogger(__name__)
def init_keyring(mode, **kwargs) -> KeyRing:
def init_keyring() -> KeyRing:
"""Initialise appropriate type of keyring."""
if mode == 'file' and 'loop' in kwargs:
path = conf.get_item('gpg', 'keyhome')
||||'Initialising pubring.kbx-based keyring from %s with cache', path)
return fk.FileKeyRing(path, kwargs['loop'])
elif mode == 'database':
url = conf.get_item('keyring', 'url')
schema = dbk.KeyRingSchema()
||||'Initialising database keyring from %s', url)
return dbk.DatabaseKeyRing(url, schema)
LOG.error('Unsupported type of keyring: %s', mode)
url = conf.get_item('database', 'url')
schema = dbk.KeyRingSchema()
||||'Initialising database keyring from %s', url)
return dbk.DatabaseKeyRing(url, schema)
@ -8,11 +8,6 @@ date_format = ISO
keyhome = test/keyhome
cache_refresh_minutes = 1
#type = database
url = sqlite:///test/lacre.db
type = file
cert_path = test/certs
@ -13,7 +13,7 @@ def define_db_schema():
sqlalchemy.Column('status', sqlalchemy.Integer),
sqlalchemy.Column('time', sqlalchemy.DateTime))
identities = sqlalchemy.Table('identities', meta,
identities = sqlalchemy.Table('gpgmw_identities', meta,
sqlalchemy.Column('email', sqlalchemy.String(256), index=True),
sqlalchemy.Column('key_id', sqlalchemy.String(64), index=True))
@ -93,12 +93,16 @@ def _define_db_schema():
sqlalchemy.Column('status', sqlalchemy.Integer),
sqlalchemy.Column('time', sqlalchemy.DateTime))
return (gpgmw_keys)
identities = sqlalchemy.Table('gpgmw_identities', meta,
sqlalchemy.Column('email', sqlalchemy.String(256), index=True),
sqlalchemy.Column('key_id', sqlalchemy.String(64), index=True))
return (gpgmw_keys, identities)
if conf.config_item_equals('database', 'enabled', 'yes') and conf.config_item_set('database', 'url'):
(engine, conn) = _setup_db_connection(conf.get_item("database", "url"))
(gpgmw_keys) = _define_db_schema()
(gpgmw_keys, gpgmw_identities) = _define_db_schema()
selq = select(gpgmw_keys.c.publickey,,\
.where(and_(gpgmw_keys.c.status == 0, gpgmw_keys.c.confirm == ""))\
@ -106,21 +110,29 @@ if conf.config_item_equals('database', 'enabled', 'yes') and conf.config_item_se
LOG.debug(f"Retrieving keys to be processed: {selq}")
result_set = conn.execute(selq)
for key_id, row_id, email in result_set:
for armored_key, row_id, email in result_set:
# delete any other public keys associated with this confirmed email address
delq = delete(gpgmw_keys).where(and_( == email, != row_id))
LOG.debug(f"Deleting public keys associated with confirmed email: {delq}")
GnuPG.delete_key(conf.get_item('gpg', 'keyhome'), email)
||||'Deleted key for <' + email + '> via import request')
||||'Deleted key for <%s> via import request', email)
if key_id.strip(): # we have this so that user can submit blank key to remove any encryption
if GnuPG.confirm_key(key_id, email):
GnuPG.add_key(conf.get_item('gpg', 'keyhome'), key_id) # import the key to gpg
if armored_key.strip(): # we have this so that user can submit blank key to remove any encryption
if GnuPG.confirm_key(armored_key, email):
# import the key to gpg
GnuPG.add_key(conf.get_item('gpg', 'keyhome'), armored_key)
# mark key as accepted
modq = gpgmw_keys.update().where( == row_id).values(status=1)
LOG.debug(f"Key imported, updating key: {modq}")
conn.execute(modq) # mark key as accepted
LOG.warning('Imported key from <' + email + '>')
LOG.debug("Key imported, updating key: %s", modq)
# add to identity database
update_identity = gpgmw_identities.insert().values(email=email, key_id=armored_key)
LOG.warning('Imported key from <%s>', email)
if conf.config_item_equals('cron', 'send_email', 'yes'):
_send_msg("PGP key registration successful", "", email)
Reference in a new issue