diff --git a/GnuPG/__init__.py b/GnuPG/__init__.py index eda5c2c..f873e2c 100644 --- a/GnuPG/__init__.py +++ b/GnuPG/__init__.py @@ -137,15 +137,18 @@ def confirm_key(content, email: str): expected_email = email.lower() tmpkeyhome = tempfile.mkdtemp() + LOG.debug('Importing into temporary directory: %s', tmpkeyhome) result = _import_key(tmpkeyhome, content) confirmed = False for line in result.splitlines(): + LOG.debug('Line from GnuPG: %s', line) found = RX_CONFIRM.search(line) if found: (_, extracted_email) = parseaddr(found.group(1).decode()) confirmed = (extracted_email == expected_email) + LOG.debug('Confirmed email %s: %s', extracted_email, confirmed) # cleanup shutil.rmtree(tmpkeyhome) diff --git a/Makefile b/Makefile index ab1ea20..a298cbd 100644 --- a/Makefile +++ b/Makefile @@ -64,7 +64,7 @@ clean-db: # Run unit tests # unittest: - $(PYTHON) -m unittest discover -s test/modules + GPG_MAILGATE_CONFIG=test/gpg-mailgate.conf $(PYTHON) -m unittest discover -s test/modules pre-clean: rm -fv test/gpg-mailgate.conf diff --git a/gpg-mailgate.conf.sample b/gpg-mailgate.conf.sample index 42dd7bc..7ed7bbb 100644 --- a/gpg-mailgate.conf.sample +++ b/gpg-mailgate.conf.sample @@ -101,6 +101,17 @@ starttls = true enabled = yes url = sqlite:///test.db +# Pooling mode: pessimistic or optimistic (required parameter). +# +# - Pessimistic disconnect-handling: pre_ping. Connection pool will try using +# connection before it executes a SQL query to find out if the connection is +# still alive. If not, it'll just establish a new connection. +# +# - Optimistic distonnect-handling: just avoid using connections after some +# time. +# +pooling_mode = optimistic + # For a MySQL database "gpgmw", user "gpgmw" and password "password", # use the following URL: # @@ -109,6 +120,19 @@ url = sqlite:///test.db # For other RDBMS backends, see: # https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls +# Number of seconds after which an idle connection is recycled. This is +# useful with MySQL servers. This is only used with pooling_mode=optimistic. +# For more information, see: +# https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine.params.pool_recycle +#max_connection_age = 3600 + +# Number of connections stored in the pool. +#pool_size = 5 + +# If the pool size is not enough for current traffic, some connections can be +# made and closed after use, to avoid pool growth and connection rejections. +#max_overflow = 10 + [enc_keymap] # You can find these by running the following command: # gpg --list-keys --keyid-format long user@example.com diff --git a/lacre/admin.py b/lacre/admin.py index 2a37742..2850b71 100644 --- a/lacre/admin.py +++ b/lacre/admin.py @@ -18,7 +18,7 @@ lacre.init_logging(conf.get_item('logging', 'config')) import lacre.repositories as repo import lacre.dbschema as db -LOG = logging.getLogger('lacre.admin') +LOG = logging.getLogger(__name__) def _no_database(): @@ -30,23 +30,25 @@ def sub_queue(args): """Sub-command to inspect queue contents.""" LOG.debug('Inspecting queue...') - conn = repo.connect(conf.get_item('database', 'url')) - queue = repo.KeyConfirmationQueue(conn) + eng = repo.init_engine(conf.get_item('database', 'url')) + queue = repo.KeyConfirmationQueue(engine=eng) - cnt = queue.count_keys() + if args.delete: + queue.delete_key_by_email(args.delete) + else: + cnt = queue.count_keys() + if cnt is None: + _no_database() - if cnt is None: - _no_database() - - print(f'Keys in the queue: {cnt}') + print(f'Keys in the queue: {cnt}') def sub_identities(args): """Sub-command to inspect identity database.""" LOG.debug('Inspecting identities...') - conn = repo.connect(conf.get_item('database', 'url')) - identities = repo.IdentityRepository(conn) + eng = repo.init_engine(conf.get_item('database', 'url')) + identities = repo.IdentityRepository(engine=eng) all_identities = identities.freeze_identities() if all_identities is None: @@ -67,8 +69,8 @@ def sub_import(args): public = GnuPG.public_keys(source_dir) - conn = repo.connect(conf.get_item('database', 'url')) - identities = repo.IdentityRepository(conn) + eng = repo.init_engine(conf.get_item('database', 'url')) + identities = repo.IdentityRepository(engine=eng) if args.reload: identities.delete_all() @@ -110,6 +112,8 @@ def main(): help='Inspect key queue', aliases=['q'] ) + cmd_queue.add_argument('-D', '--delete', + help='delete specified email from the queue') cmd_queue.set_defaults(operation=sub_queue) cmd_identities = sub_commands.add_parser('identities', diff --git a/lacre/config.py b/lacre/config.py index 9db96ee..0eeac32 100644 --- a/lacre/config.py +++ b/lacre/config.py @@ -4,6 +4,7 @@ Routines defined here are responsible for processing and validating configuration. """ +from enum import Enum, auto from configparser import RawConfigParser import os @@ -23,10 +24,12 @@ MANDATORY_CONFIG_ITEMS = [("relay", "host"), ("gpg", "keyhome")] SCRIPT_REQUIRED = [('database', 'enabled'), - ('database', 'url')] + ('database', 'url'), + ('database', 'pooling_mode')] CRON_REQUIRED = [('database', 'enabled'), ('database', 'url'), + ('database', 'pooling_mode'), ('cron', 'mail_templates')] # Global dict to keep configuration parameters. It's hidden behind several @@ -142,3 +145,43 @@ def daemon_params(): def strict_mode(): """Check if Lacre is configured to support only a fixed list of keys.""" return ("default" in cfg and cfg["default"]["enc_keymap_only"] == "yes") + + +class FromStrMixin: + """Additional operations for configuration enums.""" + + @classmethod + def from_str(cls, name, *, required=False): + if name is None: + return None + + name = name.upper() + + if name in cls.__members__: + return cls.__members__[name] + + if required: + raise NameError('Unsupported or missing value') + else: + return None + + @classmethod + def from_config(cls, section, key, *, required=False): + param = get_item(section, key) + return cls.from_str(param, required=required) + + +class PGPStyle(FromStrMixin, Enum): + """PGP message structure: PGP/Inline or PGP/MIME.""" + MIME = auto() + INLINE = auto() + + +class PoolingMode(FromStrMixin, Enum): + """Database connection pool behaviour. + + - Optimistic - recycles connections. + - Pessimistic - checks connection before usage. + """ + OPTIMISTIC = auto() + PESSIMISTIC = auto() diff --git a/lacre/core.py b/lacre/core.py index 06cee5a..c29e496 100644 --- a/lacre/core.py +++ b/lacre/core.py @@ -88,17 +88,18 @@ def _sort_gpg_recipients(gpg_to) -> Tuple[recpt.RecipientList, recpt.RecipientLi for rcpt in gpg_to: # Checking pre defined styles in settings first - if conf.config_item_equals('pgp_style', rcpt.email(), 'mime'): + style = conf.PGPStyle.from_config('pgp_style', rcpt.email()) + if style is conf.PGPStyle.MIME: recipients_mime.append(rcpt.email()) keys_mime.extend(rcpt.key().split(',')) - elif conf.config_item_equals('pgp_style', rcpt.email(), 'inline'): + elif style is conf.PGPStyle.INLINE: recipients_inline.append(rcpt.email()) keys_inline.extend(rcpt.key().split(',')) else: # Log message only if an unknown style is defined if conf.config_item_set('pgp_style', rcpt.email()): - LOG.debug("Style %s for recipient %s is not known. Use default as fallback." - % (conf.get_item("pgp_style", rcpt.email()), rcpt.email())) + LOG.debug("Style %s for recipient %s is not known. Use default as fallback.", + conf.get_item("pgp_style", rcpt.email()), rcpt.email()) # If no style is in settings defined for recipient, use default from settings if default_to_pgp_mime: diff --git a/lacre/keyring.py b/lacre/keyring.py index f7ac9f0..5b84db9 100644 --- a/lacre/keyring.py +++ b/lacre/keyring.py @@ -6,7 +6,7 @@ module. import lacre.config as conf from lacre._keyringcommon import KeyRing, KeyCache -from lacre.repositories import IdentityRepository +from lacre.repositories import IdentityRepository, init_engine import logging LOG = logging.getLogger(__name__) @@ -15,7 +15,8 @@ LOG = logging.getLogger(__name__) def init_keyring() -> KeyRing: """Initialise appropriate type of keyring.""" url = conf.get_item('database', 'url') - return IdentityRepository(db_url=url) + db_engine = init_engine(url) + return IdentityRepository(engine=db_engine) def freeze_and_load_keys() -> KeyCache: diff --git a/lacre/notify.py b/lacre/notify.py index bf05d6c..ca81f39 100644 --- a/lacre/notify.py +++ b/lacre/notify.py @@ -8,8 +8,6 @@ from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import markdown -# Read configuration from /etc/gpg-mailgate.conf -conf.load_config() LOG = logging.getLogger(__name__) @@ -50,6 +48,7 @@ def notify(mailsubject, messagefile, recipients = None): (host, port) = conf.relay_params() smtp = smtplib.SMTP(host, port) _authenticate_maybe(smtp) + LOG.info('Delivering notification: %s', recipients) smtp.sendmail(conf.get_item('cron', 'notification_email'), recipients, msg.as_string()) else: - LOG.info("Could not send mail due to wrong configuration") + LOG.warning("Could not send mail due to wrong configuration") diff --git a/lacre/repositories.py b/lacre/repositories.py index b4c3ad0..52f21c9 100644 --- a/lacre/repositories.py +++ b/lacre/repositories.py @@ -4,31 +4,55 @@ from sqlalchemy import create_engine, select, delete, and_, func from sqlalchemy.exc import OperationalError import logging -from lacre.config import flag_enabled +from lacre.config import flag_enabled, config_item_set, get_item, PoolingMode from lacre._keyringcommon import KeyRing, KeyCache import lacre.dbschema as db LOG = logging.getLogger(__name__) +_HOUR_IN_SECONDS = 3600 + # Internal state _engine = None -def connect(url): +def init_engine(url): global _engine if not _engine: - _engine = create_engine(url) + config = _conn_config() + _engine = create_engine(url, **config) + + return _engine + + +def _conn_config(): + config = dict() + + mode = PoolingMode.from_config('database', 'pooling_mode', required=True) + if mode is PoolingMode.OPTIMISTIC: + # Optimistic distonnect-handling: recycle connections. + config['pool_recycle'] = int(get_item('database', 'max_connection_age', _HOUR_IN_SECONDS)) + elif mode is PoolingMode.PESSIMISTIC: + # Pessimistic disconnect-handling: pre_ping. + config['pool_pre_ping'] = True + + # Additional pool settings + if config_item_set('database', 'pool_size'): + config['pool_size'] = int(get_item('database', 'pool_size')) + + if config_item_set('database', 'max_overflow'): + config['max_overflow'] = int(get_item('database', 'max_overflow')) + + LOG.debug('Database engine configuration: %s', config) + return config - return _engine.connect() class IdentityRepository(KeyRing): - def __init__(self, /, connection=None, db_url=None): + def __init__(self, /, connection=None, *, engine): self._identities = db.LACRE_IDENTITIES - self._conn = connection - self._url = db_url - self._initialised = connection is not None + self._engine = engine def register_or_update(self, email, fprint): assert email, "email is mandatory" @@ -40,44 +64,39 @@ class IdentityRepository(KeyRing): self._insert(email, fprint) def _exists(self, email: str) -> bool: - self._ensure_connected() selq = select(self._identities.c.email).where(self._identities.c.email == email) - return [e for e in self._conn.execute(selq)] + with self._engine.connect() as conn: + return [e for e in conn.execute(selq)] def _insert(self, email, fprint): - self._ensure_connected() insq = self._identities.insert().values(email=email, fingerprint=fprint) LOG.debug('Registering identity %s: %s', email, insq) - self._conn.execute(insq) + with self._engine.connect() as conn: + conn.execute(insq) def _update(self, email, fprint): - self._ensure_connected() upq = self._identities.update() \ .values(fingerprint=fprint) \ .where(self._identities.c.email == email) LOG.debug('Updating identity %s: %s', email, upq) - self._conn.execute(upq) - - def _ensure_connected(self): - if not self._initialised: - LOG.debug('Connecting with %s', self._url) - self._conn = connect(self._url) + with self._engine.connect() as conn: + conn.execute(upq) def delete(self, email): - self._ensure_connected() - delq = delete(self._identities).where(self._identities.c.email == email) LOG.debug('Deleting keys assigned to %s', email) - self._conn.execute(delq) + with self._engine.connect() as conn: + conn.execute(delq) def delete_all(self): LOG.warn('Deleting all identities from the database') delq = delete(self._identities) - self._conn.execute(delq) + with self._engine.connect() as conn: + conn.execute(delq) def freeze_identities(self) -> KeyCache: """Return a static, async-safe copy of the identity map. @@ -86,7 +105,6 @@ class IdentityRepository(KeyRing): if we get a database exception, this method will either return empty collection or let the exception be propagated. """ - self._ensure_connected() try: return self._load_identities() except OperationalError: @@ -98,9 +116,10 @@ class IdentityRepository(KeyRing): def _load_identities(self) -> KeyCache: all_identities = select(self._identities.c.fingerprint, self._identities.c.email) - result = self._conn.execute(all_identities) - LOG.debug('Retrieving all keys') - return KeyCache({key_id: email for key_id, email in result}) + with self._engine.connect() as conn: + result = conn.execute(all_identities) + LOG.debug('Retrieving all keys') + return KeyCache({key_id: email for key_id, email in result}) class KeyConfirmationQueue: @@ -109,9 +128,9 @@ class KeyConfirmationQueue: # Default number of items retrieved from the database. keys_read_max = 100 - def __init__(self, connection): + def __init__(self, /, engine): self._keys = db.LACRE_KEYS - self._conn = connection + self._engine = engine def fetch_keys(self, /, max_keys=None): """Runs a query to retrieve at most `keys_read_max` keys and returns db result.""" @@ -122,37 +141,52 @@ class KeyConfirmationQueue: .limit(max_keys) LOG.debug('Retrieving keys to be processed: %s', selq) - return self._conn.execute(selq) + with self._engine.connect() as conn: + return conn.execute(selq) def count_keys(self): selq = select(func.count(self._keys.c.id)) LOG.debug('Counting all keys: %s', selq) try: - c = [cnt for cnt in self._conn.execute(selq)] + with self._engine.connect() as conn: + c = [cnt for cnt in conn.execute(selq)] - # Result is an iterable of tuples: - return c[0][0] + # Result is an iterable of tuples: + return c[0][0] except OperationalError: LOG.exception('Cannot count keys') return None def fetch_keys_to_delete(self): seldel = select(self._keys.c.email, self._keys.c.id).where(self._keys.c.status == db.ST_TO_BE_DELETED).limit(self.keys_read_max) - return self._conn.execute(seldel) + with self._engine.connect() as conn: + return conn.execute(seldel) def delete_keys(self, row_id, /, email=None): """Remove key from the database.""" if email is not None: - delq = delete(self._keys).where(and_(self._keys.c.email == email, self._keys.c.id != row_id)) + LOG.debug('Deleting key: id=%s, email=%s', row_id, email) + delq = delete(self._keys).where(and_(self._keys.c.email == email, self._keys.c.id == row_id)) else: - delq = delete(self._keys).where(self._keys.c.id != row_id) + LOG.debug('Deleting key: id=%s', row_id) + delq = delete(self._keys).where(self._keys.c.id == row_id) - LOG.debug('Deleting public keys associated with confirmed email: %s', delq) - self._conn.execute(delq) + with self._engine.connect() as conn: + LOG.debug('Deleting public keys associated with confirmed email: %s', delq) + return conn.execute(delq) + + def delete_key_by_email(self, email): + """Remove keys linked to the given email from the database.""" + delq = delete(self._keys).where(self._keys.c.email == email) + + LOG.debug('Deleting email for: %s', email) + with self._engine.connect() as conn: + conn.execute(delq) def mark_accepted(self, row_id): modq = self._keys.update().where(self._keys.c.id == row_id).values(status=db.ST_IMPORTED) LOG.debug("Key imported, updating key: %s", modq) - self._conn.execute(modq) + with self._engine.connect() as conn: + conn.execute(modq) diff --git a/lacre/stats.py b/lacre/stats.py index e11cad7..f4d62f0 100644 --- a/lacre/stats.py +++ b/lacre/stats.py @@ -13,6 +13,7 @@ class ExecutionTimeLogger: def __enter__(self): self._start = time.process_time() + self._log.info('Start: %s', self._message) def __exit__(self, exc_type=None, exc_value=None, traceback=None): end = time.process_time() diff --git a/test/e2e_test.py b/test/e2e_test.py index 2255ea5..a88cc68 100644 --- a/test/e2e_test.py +++ b/test/e2e_test.py @@ -42,6 +42,7 @@ def _build_config(config): cp.add_section('database') cp.set('database', 'enabled', 'yes') cp.set('database', 'url', 'sqlite:///test/lacre.db') + cp.set('database', 'pooling_mode', 'optimistic') cp.add_section("smime") cp.set("smime", "cert_path", config["smime_certpath"]) diff --git a/test/gpg-mailgate-daemon-test.conf b/test/gpg-mailgate-daemon-test.conf index 396e105..68279a3 100644 --- a/test/gpg-mailgate-daemon-test.conf +++ b/test/gpg-mailgate-daemon-test.conf @@ -13,6 +13,7 @@ cert_path = test/certs [database] enabled = yes url = sqlite:///test/lacre.db +pooling_mode = optimistic [relay] host = localhost diff --git a/test/modules/test_lacre_repositories.py b/test/modules/test_lacre_repositories.py index bd99855..df970a3 100644 --- a/test/modules/test_lacre_repositories.py +++ b/test/modules/test_lacre_repositories.py @@ -2,13 +2,23 @@ import unittest +import lacre.config as conf import lacre.repositories as r import lacre.dbschema as s +def ignore_sql(sql, *args, **kwargs): + pass + class IdentityRepositoryTest(unittest.TestCase): - def test_x(self): - ir = r.IdentityRepository(db_url='sqlite:///test/lacre.db') + def setUpClass(): + # required for init_engine to work + conf.load_config() + + def test_freeze_identities(self): + eng = r.init_engine('sqlite:///test/lacre.db') + + ir = r.IdentityRepository(engine=eng) identities = ir.freeze_identities() self.assertTrue(identities) diff --git a/webgate-cron.py b/webgate-cron.py index 88cd947..7fa6af5 100755 --- a/webgate-cron.py +++ b/webgate-cron.py @@ -19,6 +19,7 @@ # along with gpg-mailgate source code. If not, see . # +import sys import logging import lacre import lacre.config as conf @@ -31,7 +32,7 @@ lacre.init_logging(conf.get_item('logging', 'config')) LOG = logging.getLogger('webgate-cron.py') import GnuPG -from lacre.repositories import KeyConfirmationQueue, IdentityRepository, connect +from lacre.repositories import KeyConfirmationQueue, IdentityRepository, init_engine def _validate_config(): @@ -43,11 +44,17 @@ def _validate_config(): _validate_config() -if conf.flag_enabled('database', 'enabled') and conf.config_item_set('database', 'url'): - conn = connect(conf.get_item('database', 'url')) +if not (conf.flag_enabled('database', 'enabled') and conf.config_item_set('database', 'url')): + print("Warning: doing nothing since database settings are not configured!") + LOG.error("Warning: doing nothing since database settings are not configured!") + sys.exit(lacre.EX_CONFIG) - identities = IdentityRepository(conn) - key_queue = KeyConfirmationQueue(conn) + +try: + db_engine = init_engine(conf.get_item('database', 'url')) + + identities = IdentityRepository(engine=db_engine) + key_queue = KeyConfirmationQueue(engine=db_engine) key_dir = conf.get_item('gpg', 'keyhome') LOG.debug('Using GnuPG with home directory in %s', key_dir) @@ -80,15 +87,21 @@ if conf.flag_enabled('database', 'enabled') and conf.config_item_set('database', notify("PGP key registration failed", "registrationError.md", email) else: # delete key so we don't continue processing it + LOG.debug('Empty key received, just deleting') + key_queue.delete_keys(row_id) if conf.flag_enabled('cron', 'send_email'): notify("PGP key deleted", "keyDeleted.md", email) + LOG.info('Cleaning up after a round of key confirmation') stat2_result_set = key_queue.fetch_keys_to_delete() for email, row_id in stat2_result_set: + LOG.debug('Removing key from keyring: %s', email) GnuPG.delete_key(key_dir, email) + + LOG.debug('Removing key from identity store: %s', row_id) key_queue.delete_keys(row_id) + LOG.info('Deleted key for <%s>', email) -else: - print("Warning: doing nothing since database settings are not configured!") - LOG.error("Warning: doing nothing since database settings are not configured!") +except: + LOG.exception('Unexpected issue during key confirmation')