Make disconnect handling configuration explicit

- Provide a new reuqired parameter: [database]pooling_mode and use it during
  SQLAlchemy engine initialisation.

- Update tests and configuration (including sample configuration).

- Adjust repository unit test to load config during setup.

- Pass an engine instance to repository constructors instead of connections.
  Engine keeps a connection pool and we rely on it.
This commit is contained in:
Piotr F. Mieszkowski 2023-12-17 14:03:20 +01:00
parent 86cc27e918
commit 90da933bf9
10 changed files with 133 additions and 54 deletions

View File

@ -64,7 +64,7 @@ clean-db:
# Run unit tests
#
unittest:
$(PYTHON) -m unittest discover -s test/modules
GPG_MAILGATE_CONFIG=test/gpg-mailgate.conf $(PYTHON) -m unittest discover -s test/modules
pre-clean:
rm -fv test/gpg-mailgate.conf

View File

@ -101,6 +101,17 @@ starttls = true
enabled = yes
url = sqlite:///test.db
# Pooling mode: pessimistic or optimistic (required parameter).
#
# - Pessimistic disconnect-handling: pre_ping. Connection pool will try using
# connection before it executes a SQL query to find out if the connection is
# still alive. If not, it'll just establish a new connection.
#
# - Optimistic distonnect-handling: just avoid using connections after some
# time.
#
pooling_mode = optimistic
# For a MySQL database "gpgmw", user "gpgmw" and password "password",
# use the following URL:
#
@ -110,7 +121,8 @@ url = sqlite:///test.db
# https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls
# Number of seconds after which an idle connection is recycled. This is
# useful with MySQL servers. For more information, see:
# useful with MySQL servers. This is only used with pooling_mode=optimistic.
# For more information, see:
# https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine.params.pool_recycle
#max_connection_age = 3600

View File

@ -30,8 +30,8 @@ def sub_queue(args):
"""Sub-command to inspect queue contents."""
LOG.debug('Inspecting queue...')
conn = repo.connect(conf.get_item('database', 'url'))
queue = repo.KeyConfirmationQueue(conn)
eng = repo.init_engine(conf.get_item('database', 'url'))
queue = repo.KeyConfirmationQueue(engine=eng)
cnt = queue.count_keys()
@ -45,8 +45,8 @@ def sub_identities(args):
"""Sub-command to inspect identity database."""
LOG.debug('Inspecting identities...')
conn = repo.connect(conf.get_item('database', 'url'))
identities = repo.IdentityRepository(conn)
eng = repo.init_engine(conf.get_item('database', 'url'))
identities = repo.IdentityRepository(engine=eng)
all_identities = identities.freeze_identities()
if all_identities is None:
@ -67,8 +67,8 @@ def sub_import(args):
public = GnuPG.public_keys(source_dir)
conn = repo.connect(conf.get_item('database', 'url'))
identities = repo.IdentityRepository(conn)
eng = repo.init_engine(conf.get_item('database', 'url'))
identities = repo.IdentityRepository(engine=eng)
if args.reload:
identities.delete_all()

View File

@ -4,6 +4,7 @@ Routines defined here are responsible for processing and validating
configuration.
"""
from enum import Enum, auto
from configparser import RawConfigParser
import os
@ -23,10 +24,12 @@ MANDATORY_CONFIG_ITEMS = [("relay", "host"),
("gpg", "keyhome")]
SCRIPT_REQUIRED = [('database', 'enabled'),
('database', 'url')]
('database', 'url'),
('database', 'pooling_mode')]
CRON_REQUIRED = [('database', 'enabled'),
('database', 'url'),
('database', 'pooling_mode'),
('cron', 'mail_templates')]
# Global dict to keep configuration parameters. It's hidden behind several
@ -142,3 +145,47 @@ def daemon_params():
def strict_mode():
"""Check if Lacre is configured to support only a fixed list of keys."""
return ("default" in cfg and cfg["default"]["enc_keymap_only"] == "yes")
class FromStrMixin:
"""Additional operations for configuration enums."""
@classmethod
def from_str(cls, name, *, required=False):
# If name conversion is available, use it.
if hasattr(cls, 'conv'):
name = cls.conv(name)
if name in cls.__members__:
return cls.__members__[name]
if required:
raise NameError('Unsupported or missing value')
else:
return None
@classmethod
def from_config(cls, section, key, *, required=False):
param = get_item(section, key)
# print(f'from config {param}, {section}:{key}*{required}')
return cls.from_str(param, required=required)
class PGPStyle(FromStrMixin, Enum):
"""PGP message structure: PGP/Inline or PGP/MIME."""
MIME = auto()
INLINE = auto()
class PoolingMode(FromStrMixin, Enum):
"""Database connection pool behaviour.
- Optimistic - recycles connections.
- Pessimistic - checks connection before usage.
"""
OPTIMISTIC = auto()
PESSIMISTIC = auto()
@classmethod
def conv(cls, name):
return name.upper()

View File

@ -6,7 +6,7 @@ module.
import lacre.config as conf
from lacre._keyringcommon import KeyRing, KeyCache
from lacre.repositories import IdentityRepository
from lacre.repositories import IdentityRepository, init_engine
import logging
LOG = logging.getLogger(__name__)
@ -15,7 +15,8 @@ LOG = logging.getLogger(__name__)
def init_keyring() -> KeyRing:
"""Initialise appropriate type of keyring."""
url = conf.get_item('database', 'url')
return IdentityRepository(db_url=url)
db_engine = init_engine(url)
return IdentityRepository(engine=db_engine)
def freeze_and_load_keys() -> KeyCache:

View File

@ -4,48 +4,55 @@ from sqlalchemy import create_engine, select, delete, and_, func
from sqlalchemy.exc import OperationalError
import logging
from lacre.config import flag_enabled, config_item_set, get_item
from lacre.config import flag_enabled, config_item_set, get_item, PoolingMode
from lacre._keyringcommon import KeyRing, KeyCache
import lacre.dbschema as db
LOG = logging.getLogger(__name__)
_HOUR_IN_SECONDS = 3600
# Internal state
_engine = None
def connect(url):
def init_engine(url):
global _engine
if not _engine:
config = _conn_config()
_engine = create_engine(url, **config)
return _engine.connect()
return _engine
def _conn_config():
config = dict()
if config_item_set('database', 'max_connection_age'):
config['pool_recycle'] = get_item('database', 'max_connection_age')
mode = PoolingMode.from_config('database', 'pooling_mode', required=True)
if mode is PoolingMode.OPTIMISTIC:
# Optimistic distonnect-handling: recycle connections.
config['pool_recycle'] = get_item('database', 'max_connection_age', _HOUR_IN_SECONDS)
elif mode is PoolingMode.PESSIMISTIC:
# Pessimistic disconnect-handling: pre_ping.
config['pool_pre_ping'] = True
# Additional pool settings
if config_item_set('database', 'pool_size'):
config['pool_size'] = get_item('database', 'pool_size')
if config_item_set('database', 'max_overflow'):
config['max_overflow'] = get_item('database', 'max_overflow')
LOG.debug('Database engine configuration: %s', config)
return config
class IdentityRepository(KeyRing):
def __init__(self, /, connection=None, db_url=None):
def __init__(self, /, connection=None, *, engine):
self._identities = db.LACRE_IDENTITIES
self._conn = connection
self._url = db_url
self._initialised = connection is not None
self._engine = engine
def register_or_update(self, email, fprint):
assert email, "email is mandatory"
@ -57,44 +64,39 @@ class IdentityRepository(KeyRing):
self._insert(email, fprint)
def _exists(self, email: str) -> bool:
self._ensure_connected()
selq = select(self._identities.c.email).where(self._identities.c.email == email)
return [e for e in self._conn.execute(selq)]
with self._engine.connect() as conn:
return [e for e in conn.execute(selq)]
def _insert(self, email, fprint):
self._ensure_connected()
insq = self._identities.insert().values(email=email, fingerprint=fprint)
LOG.debug('Registering identity %s: %s', email, insq)
self._conn.execute(insq)
with self._engine.connect() as conn:
conn.execute(insq)
def _update(self, email, fprint):
self._ensure_connected()
upq = self._identities.update() \
.values(fingerprint=fprint) \
.where(self._identities.c.email == email)
LOG.debug('Updating identity %s: %s', email, upq)
self._conn.execute(upq)
def _ensure_connected(self):
if not self._initialised:
LOG.debug('Connecting with %s', self._url)
self._conn = connect(self._url)
with self._engine.connect() as conn:
conn.execute(upq)
def delete(self, email):
self._ensure_connected()
delq = delete(self._identities).where(self._identities.c.email == email)
LOG.debug('Deleting keys assigned to %s', email)
self._conn.execute(delq)
with self._engine.connect() as conn:
conn.execute(delq)
def delete_all(self):
LOG.warn('Deleting all identities from the database')
delq = delete(self._identities)
self._conn.execute(delq)
with self._engine.connect() as conn:
conn.execute(delq)
def freeze_identities(self) -> KeyCache:
"""Return a static, async-safe copy of the identity map.
@ -103,7 +105,6 @@ class IdentityRepository(KeyRing):
if we get a database exception, this method will either return
empty collection or let the exception be propagated.
"""
self._ensure_connected()
try:
return self._load_identities()
except OperationalError:
@ -115,9 +116,10 @@ class IdentityRepository(KeyRing):
def _load_identities(self) -> KeyCache:
all_identities = select(self._identities.c.fingerprint, self._identities.c.email)
result = self._conn.execute(all_identities)
LOG.debug('Retrieving all keys')
return KeyCache({key_id: email for key_id, email in result})
with self._engine.connect() as conn:
result = conn.execute(all_identities)
LOG.debug('Retrieving all keys')
return KeyCache({key_id: email for key_id, email in result})
class KeyConfirmationQueue:
@ -126,9 +128,9 @@ class KeyConfirmationQueue:
# Default number of items retrieved from the database.
keys_read_max = 100
def __init__(self, connection):
def __init__(self, /, engine):
self._keys = db.LACRE_KEYS
self._conn = connection
self._engine = engine
def fetch_keys(self, /, max_keys=None):
"""Runs a query to retrieve at most `keys_read_max` keys and returns db result."""
@ -139,24 +141,27 @@ class KeyConfirmationQueue:
.limit(max_keys)
LOG.debug('Retrieving keys to be processed: %s', selq)
return self._conn.execute(selq)
with self._engine.connect() as conn:
return conn.execute(selq)
def count_keys(self):
selq = select(func.count(self._keys.c.id))
LOG.debug('Counting all keys: %s', selq)
try:
c = [cnt for cnt in self._conn.execute(selq)]
with self._engine.connect() as conn:
c = [cnt for cnt in conn.execute(selq)]
# Result is an iterable of tuples:
return c[0][0]
# Result is an iterable of tuples:
return c[0][0]
except OperationalError:
LOG.exception('Cannot count keys')
return None
def fetch_keys_to_delete(self):
seldel = select(self._keys.c.email, self._keys.c.id).where(self._keys.c.status == db.ST_TO_BE_DELETED).limit(self.keys_read_max)
return self._conn.execute(seldel)
with self._engine.connect() as conn:
return conn.execute(seldel)
def delete_keys(self, row_id, /, email=None):
"""Remove key from the database."""
@ -166,10 +171,12 @@ class KeyConfirmationQueue:
delq = delete(self._keys).where(self._keys.c.id != row_id)
LOG.debug('Deleting public keys associated with confirmed email: %s', delq)
self._conn.execute(delq)
with self._engine.connect() as conn:
conn.execute(delq)
def mark_accepted(self, row_id):
modq = self._keys.update().where(self._keys.c.id == row_id).values(status=db.ST_IMPORTED)
LOG.debug("Key imported, updating key: %s", modq)
self._conn.execute(modq)
with self._engine.connect() as conn:
conn.execute(modq)

View File

@ -42,6 +42,7 @@ def _build_config(config):
cp.add_section('database')
cp.set('database', 'enabled', 'yes')
cp.set('database', 'url', 'sqlite:///test/lacre.db')
cp.set('database', 'pooling_mode', 'optimistic')
cp.add_section("smime")
cp.set("smime", "cert_path", config["smime_certpath"])

View File

@ -13,6 +13,7 @@ cert_path = test/certs
[database]
enabled = yes
url = sqlite:///test/lacre.db
pooling_mode = optimistic
[relay]
host = localhost

View File

@ -2,13 +2,23 @@
import unittest
import lacre.config as conf
import lacre.repositories as r
import lacre.dbschema as s
def ignore_sql(sql, *args, **kwargs):
pass
class IdentityRepositoryTest(unittest.TestCase):
def test_x(self):
ir = r.IdentityRepository(db_url='sqlite:///test/lacre.db')
def setUpClass():
# required for init_engine to work
conf.load_config()
def test_freeze_identities(self):
eng = r.init_engine('sqlite:///test/lacre.db')
ir = r.IdentityRepository(engine=eng)
identities = ir.freeze_identities()
self.assertTrue(identities)

View File

@ -31,7 +31,7 @@ lacre.init_logging(conf.get_item('logging', 'config'))
LOG = logging.getLogger('webgate-cron.py')
import GnuPG
from lacre.repositories import KeyConfirmationQueue, IdentityRepository, connect
from lacre.repositories import KeyConfirmationQueue, IdentityRepository, init_engine
def _validate_config():
@ -44,10 +44,10 @@ def _validate_config():
_validate_config()
if conf.flag_enabled('database', 'enabled') and conf.config_item_set('database', 'url'):
conn = connect(conf.get_item('database', 'url'))
db_engine = init_engine(conf.get_item('database', 'url'))
identities = IdentityRepository(conn)
key_queue = KeyConfirmationQueue(conn)
identities = IdentityRepository(engine=db_engine)
key_queue = KeyConfirmationQueue(engine=db_engine)
key_dir = conf.get_item('gpg', 'keyhome')
LOG.debug('Using GnuPG with home directory in %s', key_dir)