diff --git a/Makefile b/Makefile index ab1ea20..a298cbd 100644 --- a/Makefile +++ b/Makefile @@ -64,7 +64,7 @@ clean-db: # Run unit tests # unittest: - $(PYTHON) -m unittest discover -s test/modules + GPG_MAILGATE_CONFIG=test/gpg-mailgate.conf $(PYTHON) -m unittest discover -s test/modules pre-clean: rm -fv test/gpg-mailgate.conf diff --git a/gpg-mailgate.conf.sample b/gpg-mailgate.conf.sample index 46ea6af..7ed7bbb 100644 --- a/gpg-mailgate.conf.sample +++ b/gpg-mailgate.conf.sample @@ -101,6 +101,17 @@ starttls = true enabled = yes url = sqlite:///test.db +# Pooling mode: pessimistic or optimistic (required parameter). +# +# - Pessimistic disconnect-handling: pre_ping. Connection pool will try using +# connection before it executes a SQL query to find out if the connection is +# still alive. If not, it'll just establish a new connection. +# +# - Optimistic distonnect-handling: just avoid using connections after some +# time. +# +pooling_mode = optimistic + # For a MySQL database "gpgmw", user "gpgmw" and password "password", # use the following URL: # @@ -110,7 +121,8 @@ url = sqlite:///test.db # https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls # Number of seconds after which an idle connection is recycled. This is -# useful with MySQL servers. For more information, see: +# useful with MySQL servers. This is only used with pooling_mode=optimistic. +# For more information, see: # https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine.params.pool_recycle #max_connection_age = 3600 diff --git a/lacre/admin.py b/lacre/admin.py index 2a37742..14dd590 100644 --- a/lacre/admin.py +++ b/lacre/admin.py @@ -30,8 +30,8 @@ def sub_queue(args): """Sub-command to inspect queue contents.""" LOG.debug('Inspecting queue...') - conn = repo.connect(conf.get_item('database', 'url')) - queue = repo.KeyConfirmationQueue(conn) + eng = repo.init_engine(conf.get_item('database', 'url')) + queue = repo.KeyConfirmationQueue(engine=eng) cnt = queue.count_keys() @@ -45,8 +45,8 @@ def sub_identities(args): """Sub-command to inspect identity database.""" LOG.debug('Inspecting identities...') - conn = repo.connect(conf.get_item('database', 'url')) - identities = repo.IdentityRepository(conn) + eng = repo.init_engine(conf.get_item('database', 'url')) + identities = repo.IdentityRepository(engine=eng) all_identities = identities.freeze_identities() if all_identities is None: @@ -67,8 +67,8 @@ def sub_import(args): public = GnuPG.public_keys(source_dir) - conn = repo.connect(conf.get_item('database', 'url')) - identities = repo.IdentityRepository(conn) + eng = repo.init_engine(conf.get_item('database', 'url')) + identities = repo.IdentityRepository(engine=eng) if args.reload: identities.delete_all() diff --git a/lacre/config.py b/lacre/config.py index 9db96ee..6846068 100644 --- a/lacre/config.py +++ b/lacre/config.py @@ -4,6 +4,7 @@ Routines defined here are responsible for processing and validating configuration. """ +from enum import Enum, auto from configparser import RawConfigParser import os @@ -23,10 +24,12 @@ MANDATORY_CONFIG_ITEMS = [("relay", "host"), ("gpg", "keyhome")] SCRIPT_REQUIRED = [('database', 'enabled'), - ('database', 'url')] + ('database', 'url'), + ('database', 'pooling_mode')] CRON_REQUIRED = [('database', 'enabled'), ('database', 'url'), + ('database', 'pooling_mode'), ('cron', 'mail_templates')] # Global dict to keep configuration parameters. It's hidden behind several @@ -142,3 +145,47 @@ def daemon_params(): def strict_mode(): """Check if Lacre is configured to support only a fixed list of keys.""" return ("default" in cfg and cfg["default"]["enc_keymap_only"] == "yes") + + +class FromStrMixin: + """Additional operations for configuration enums.""" + + @classmethod + def from_str(cls, name, *, required=False): + # If name conversion is available, use it. + if hasattr(cls, 'conv'): + name = cls.conv(name) + + if name in cls.__members__: + return cls.__members__[name] + + if required: + raise NameError('Unsupported or missing value') + else: + return None + + @classmethod + def from_config(cls, section, key, *, required=False): + param = get_item(section, key) + # print(f'from config {param}, {section}:{key}*{required}') + return cls.from_str(param, required=required) + + +class PGPStyle(FromStrMixin, Enum): + """PGP message structure: PGP/Inline or PGP/MIME.""" + MIME = auto() + INLINE = auto() + + +class PoolingMode(FromStrMixin, Enum): + """Database connection pool behaviour. + + - Optimistic - recycles connections. + - Pessimistic - checks connection before usage. + """ + OPTIMISTIC = auto() + PESSIMISTIC = auto() + + @classmethod + def conv(cls, name): + return name.upper() diff --git a/lacre/keyring.py b/lacre/keyring.py index f7ac9f0..5b84db9 100644 --- a/lacre/keyring.py +++ b/lacre/keyring.py @@ -6,7 +6,7 @@ module. import lacre.config as conf from lacre._keyringcommon import KeyRing, KeyCache -from lacre.repositories import IdentityRepository +from lacre.repositories import IdentityRepository, init_engine import logging LOG = logging.getLogger(__name__) @@ -15,7 +15,8 @@ LOG = logging.getLogger(__name__) def init_keyring() -> KeyRing: """Initialise appropriate type of keyring.""" url = conf.get_item('database', 'url') - return IdentityRepository(db_url=url) + db_engine = init_engine(url) + return IdentityRepository(engine=db_engine) def freeze_and_load_keys() -> KeyCache: diff --git a/lacre/repositories.py b/lacre/repositories.py index e1416aa..7ecfa44 100644 --- a/lacre/repositories.py +++ b/lacre/repositories.py @@ -4,48 +4,55 @@ from sqlalchemy import create_engine, select, delete, and_, func from sqlalchemy.exc import OperationalError import logging -from lacre.config import flag_enabled, config_item_set, get_item +from lacre.config import flag_enabled, config_item_set, get_item, PoolingMode from lacre._keyringcommon import KeyRing, KeyCache import lacre.dbschema as db LOG = logging.getLogger(__name__) +_HOUR_IN_SECONDS = 3600 + # Internal state _engine = None -def connect(url): +def init_engine(url): global _engine if not _engine: config = _conn_config() _engine = create_engine(url, **config) - return _engine.connect() + return _engine def _conn_config(): config = dict() - if config_item_set('database', 'max_connection_age'): - config['pool_recycle'] = get_item('database', 'max_connection_age') + mode = PoolingMode.from_config('database', 'pooling_mode', required=True) + if mode is PoolingMode.OPTIMISTIC: + # Optimistic distonnect-handling: recycle connections. + config['pool_recycle'] = get_item('database', 'max_connection_age', _HOUR_IN_SECONDS) + elif mode is PoolingMode.PESSIMISTIC: + # Pessimistic disconnect-handling: pre_ping. + config['pool_pre_ping'] = True + # Additional pool settings if config_item_set('database', 'pool_size'): config['pool_size'] = get_item('database', 'pool_size') if config_item_set('database', 'max_overflow'): config['max_overflow'] = get_item('database', 'max_overflow') + LOG.debug('Database engine configuration: %s', config) return config class IdentityRepository(KeyRing): - def __init__(self, /, connection=None, db_url=None): + def __init__(self, /, connection=None, *, engine): self._identities = db.LACRE_IDENTITIES - self._conn = connection - self._url = db_url - self._initialised = connection is not None + self._engine = engine def register_or_update(self, email, fprint): assert email, "email is mandatory" @@ -57,44 +64,39 @@ class IdentityRepository(KeyRing): self._insert(email, fprint) def _exists(self, email: str) -> bool: - self._ensure_connected() selq = select(self._identities.c.email).where(self._identities.c.email == email) - return [e for e in self._conn.execute(selq)] + with self._engine.connect() as conn: + return [e for e in conn.execute(selq)] def _insert(self, email, fprint): - self._ensure_connected() insq = self._identities.insert().values(email=email, fingerprint=fprint) LOG.debug('Registering identity %s: %s', email, insq) - self._conn.execute(insq) + with self._engine.connect() as conn: + conn.execute(insq) def _update(self, email, fprint): - self._ensure_connected() upq = self._identities.update() \ .values(fingerprint=fprint) \ .where(self._identities.c.email == email) LOG.debug('Updating identity %s: %s', email, upq) - self._conn.execute(upq) - - def _ensure_connected(self): - if not self._initialised: - LOG.debug('Connecting with %s', self._url) - self._conn = connect(self._url) + with self._engine.connect() as conn: + conn.execute(upq) def delete(self, email): - self._ensure_connected() - delq = delete(self._identities).where(self._identities.c.email == email) LOG.debug('Deleting keys assigned to %s', email) - self._conn.execute(delq) + with self._engine.connect() as conn: + conn.execute(delq) def delete_all(self): LOG.warn('Deleting all identities from the database') delq = delete(self._identities) - self._conn.execute(delq) + with self._engine.connect() as conn: + conn.execute(delq) def freeze_identities(self) -> KeyCache: """Return a static, async-safe copy of the identity map. @@ -103,7 +105,6 @@ class IdentityRepository(KeyRing): if we get a database exception, this method will either return empty collection or let the exception be propagated. """ - self._ensure_connected() try: return self._load_identities() except OperationalError: @@ -115,9 +116,10 @@ class IdentityRepository(KeyRing): def _load_identities(self) -> KeyCache: all_identities = select(self._identities.c.fingerprint, self._identities.c.email) - result = self._conn.execute(all_identities) - LOG.debug('Retrieving all keys') - return KeyCache({key_id: email for key_id, email in result}) + with self._engine.connect() as conn: + result = conn.execute(all_identities) + LOG.debug('Retrieving all keys') + return KeyCache({key_id: email for key_id, email in result}) class KeyConfirmationQueue: @@ -126,9 +128,9 @@ class KeyConfirmationQueue: # Default number of items retrieved from the database. keys_read_max = 100 - def __init__(self, connection): + def __init__(self, /, engine): self._keys = db.LACRE_KEYS - self._conn = connection + self._engine = engine def fetch_keys(self, /, max_keys=None): """Runs a query to retrieve at most `keys_read_max` keys and returns db result.""" @@ -139,24 +141,27 @@ class KeyConfirmationQueue: .limit(max_keys) LOG.debug('Retrieving keys to be processed: %s', selq) - return self._conn.execute(selq) + with self._engine.connect() as conn: + return conn.execute(selq) def count_keys(self): selq = select(func.count(self._keys.c.id)) LOG.debug('Counting all keys: %s', selq) try: - c = [cnt for cnt in self._conn.execute(selq)] + with self._engine.connect() as conn: + c = [cnt for cnt in conn.execute(selq)] - # Result is an iterable of tuples: - return c[0][0] + # Result is an iterable of tuples: + return c[0][0] except OperationalError: LOG.exception('Cannot count keys') return None def fetch_keys_to_delete(self): seldel = select(self._keys.c.email, self._keys.c.id).where(self._keys.c.status == db.ST_TO_BE_DELETED).limit(self.keys_read_max) - return self._conn.execute(seldel) + with self._engine.connect() as conn: + return conn.execute(seldel) def delete_keys(self, row_id, /, email=None): """Remove key from the database.""" @@ -166,10 +171,12 @@ class KeyConfirmationQueue: delq = delete(self._keys).where(self._keys.c.id != row_id) LOG.debug('Deleting public keys associated with confirmed email: %s', delq) - self._conn.execute(delq) + with self._engine.connect() as conn: + conn.execute(delq) def mark_accepted(self, row_id): modq = self._keys.update().where(self._keys.c.id == row_id).values(status=db.ST_IMPORTED) LOG.debug("Key imported, updating key: %s", modq) - self._conn.execute(modq) + with self._engine.connect() as conn: + conn.execute(modq) diff --git a/test/e2e_test.py b/test/e2e_test.py index 2255ea5..a88cc68 100644 --- a/test/e2e_test.py +++ b/test/e2e_test.py @@ -42,6 +42,7 @@ def _build_config(config): cp.add_section('database') cp.set('database', 'enabled', 'yes') cp.set('database', 'url', 'sqlite:///test/lacre.db') + cp.set('database', 'pooling_mode', 'optimistic') cp.add_section("smime") cp.set("smime", "cert_path", config["smime_certpath"]) diff --git a/test/gpg-mailgate-daemon-test.conf b/test/gpg-mailgate-daemon-test.conf index 396e105..68279a3 100644 --- a/test/gpg-mailgate-daemon-test.conf +++ b/test/gpg-mailgate-daemon-test.conf @@ -13,6 +13,7 @@ cert_path = test/certs [database] enabled = yes url = sqlite:///test/lacre.db +pooling_mode = optimistic [relay] host = localhost diff --git a/test/modules/test_lacre_repositories.py b/test/modules/test_lacre_repositories.py index bd99855..df970a3 100644 --- a/test/modules/test_lacre_repositories.py +++ b/test/modules/test_lacre_repositories.py @@ -2,13 +2,23 @@ import unittest +import lacre.config as conf import lacre.repositories as r import lacre.dbschema as s +def ignore_sql(sql, *args, **kwargs): + pass + class IdentityRepositoryTest(unittest.TestCase): - def test_x(self): - ir = r.IdentityRepository(db_url='sqlite:///test/lacre.db') + def setUpClass(): + # required for init_engine to work + conf.load_config() + + def test_freeze_identities(self): + eng = r.init_engine('sqlite:///test/lacre.db') + + ir = r.IdentityRepository(engine=eng) identities = ir.freeze_identities() self.assertTrue(identities) diff --git a/webgate-cron.py b/webgate-cron.py index 88cd947..ef7d8ca 100755 --- a/webgate-cron.py +++ b/webgate-cron.py @@ -31,7 +31,7 @@ lacre.init_logging(conf.get_item('logging', 'config')) LOG = logging.getLogger('webgate-cron.py') import GnuPG -from lacre.repositories import KeyConfirmationQueue, IdentityRepository, connect +from lacre.repositories import KeyConfirmationQueue, IdentityRepository, init_engine def _validate_config(): @@ -44,10 +44,10 @@ def _validate_config(): _validate_config() if conf.flag_enabled('database', 'enabled') and conf.config_item_set('database', 'url'): - conn = connect(conf.get_item('database', 'url')) + db_engine = init_engine(conf.get_item('database', 'url')) - identities = IdentityRepository(conn) - key_queue = KeyConfirmationQueue(conn) + identities = IdentityRepository(engine=db_engine) + key_queue = KeyConfirmationQueue(engine=db_engine) key_dir = conf.get_item('gpg', 'keyhome') LOG.debug('Using GnuPG with home directory in %s', key_dir)