Merge pull request 'lacre.repositories: Configure SQLAlchemy connection pooling' (#136) from connection-pooling into main

Reviewed-on: #136
This commit is contained in:
pfm 2024-01-04 18:52:03 +00:00
commit 748fd00957
14 changed files with 207 additions and 72 deletions

View File

@ -137,15 +137,18 @@ def confirm_key(content, email: str):
expected_email = email.lower()
tmpkeyhome = tempfile.mkdtemp()
LOG.debug('Importing into temporary directory: %s', tmpkeyhome)
result = _import_key(tmpkeyhome, content)
confirmed = False
for line in result.splitlines():
LOG.debug('Line from GnuPG: %s', line)
found = RX_CONFIRM.search(line)
if found:
(_, extracted_email) = parseaddr(found.group(1).decode())
confirmed = (extracted_email == expected_email)
LOG.debug('Confirmed email %s: %s', extracted_email, confirmed)
# cleanup
shutil.rmtree(tmpkeyhome)

View File

@ -64,7 +64,7 @@ clean-db:
# Run unit tests
#
unittest:
$(PYTHON) -m unittest discover -s test/modules
GPG_MAILGATE_CONFIG=test/gpg-mailgate.conf $(PYTHON) -m unittest discover -s test/modules
pre-clean:
rm -fv test/gpg-mailgate.conf

View File

@ -101,6 +101,17 @@ starttls = true
enabled = yes
url = sqlite:///test.db
# Pooling mode: pessimistic or optimistic (required parameter).
#
# - Pessimistic disconnect-handling: pre_ping. Connection pool will try using
# connection before it executes a SQL query to find out if the connection is
# still alive. If not, it'll just establish a new connection.
#
# - Optimistic distonnect-handling: just avoid using connections after some
# time.
#
pooling_mode = optimistic
# For a MySQL database "gpgmw", user "gpgmw" and password "password",
# use the following URL:
#
@ -109,6 +120,19 @@ url = sqlite:///test.db
# For other RDBMS backends, see:
# https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls
# Number of seconds after which an idle connection is recycled. This is
# useful with MySQL servers. This is only used with pooling_mode=optimistic.
# For more information, see:
# https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine.params.pool_recycle
#max_connection_age = 3600
# Number of connections stored in the pool.
#pool_size = 5
# If the pool size is not enough for current traffic, some connections can be
# made and closed after use, to avoid pool growth and connection rejections.
#max_overflow = 10
[enc_keymap]
# You can find these by running the following command:
# gpg --list-keys --keyid-format long user@example.com

View File

@ -18,7 +18,7 @@ lacre.init_logging(conf.get_item('logging', 'config'))
import lacre.repositories as repo
import lacre.dbschema as db
LOG = logging.getLogger('lacre.admin')
LOG = logging.getLogger(__name__)
def _no_database():
@ -30,23 +30,25 @@ def sub_queue(args):
"""Sub-command to inspect queue contents."""
LOG.debug('Inspecting queue...')
conn = repo.connect(conf.get_item('database', 'url'))
queue = repo.KeyConfirmationQueue(conn)
eng = repo.init_engine(conf.get_item('database', 'url'))
queue = repo.KeyConfirmationQueue(engine=eng)
cnt = queue.count_keys()
if args.delete:
queue.delete_key_by_email(args.delete)
else:
cnt = queue.count_keys()
if cnt is None:
_no_database()
if cnt is None:
_no_database()
print(f'Keys in the queue: {cnt}')
print(f'Keys in the queue: {cnt}')
def sub_identities(args):
"""Sub-command to inspect identity database."""
LOG.debug('Inspecting identities...')
conn = repo.connect(conf.get_item('database', 'url'))
identities = repo.IdentityRepository(conn)
eng = repo.init_engine(conf.get_item('database', 'url'))
identities = repo.IdentityRepository(engine=eng)
all_identities = identities.freeze_identities()
if all_identities is None:
@ -67,8 +69,8 @@ def sub_import(args):
public = GnuPG.public_keys(source_dir)
conn = repo.connect(conf.get_item('database', 'url'))
identities = repo.IdentityRepository(conn)
eng = repo.init_engine(conf.get_item('database', 'url'))
identities = repo.IdentityRepository(engine=eng)
if args.reload:
identities.delete_all()
@ -110,6 +112,8 @@ def main():
help='Inspect key queue',
aliases=['q']
)
cmd_queue.add_argument('-D', '--delete',
help='delete specified email from the queue')
cmd_queue.set_defaults(operation=sub_queue)
cmd_identities = sub_commands.add_parser('identities',

View File

@ -4,6 +4,7 @@ Routines defined here are responsible for processing and validating
configuration.
"""
from enum import Enum, auto
from configparser import RawConfigParser
import os
@ -23,10 +24,12 @@ MANDATORY_CONFIG_ITEMS = [("relay", "host"),
("gpg", "keyhome")]
SCRIPT_REQUIRED = [('database', 'enabled'),
('database', 'url')]
('database', 'url'),
('database', 'pooling_mode')]
CRON_REQUIRED = [('database', 'enabled'),
('database', 'url'),
('database', 'pooling_mode'),
('cron', 'mail_templates')]
# Global dict to keep configuration parameters. It's hidden behind several
@ -142,3 +145,43 @@ def daemon_params():
def strict_mode():
"""Check if Lacre is configured to support only a fixed list of keys."""
return ("default" in cfg and cfg["default"]["enc_keymap_only"] == "yes")
class FromStrMixin:
"""Additional operations for configuration enums."""
@classmethod
def from_str(cls, name, *, required=False):
if name is None:
return None
name = name.upper()
if name in cls.__members__:
return cls.__members__[name]
if required:
raise NameError('Unsupported or missing value')
else:
return None
@classmethod
def from_config(cls, section, key, *, required=False):
param = get_item(section, key)
return cls.from_str(param, required=required)
class PGPStyle(FromStrMixin, Enum):
"""PGP message structure: PGP/Inline or PGP/MIME."""
MIME = auto()
INLINE = auto()
class PoolingMode(FromStrMixin, Enum):
"""Database connection pool behaviour.
- Optimistic - recycles connections.
- Pessimistic - checks connection before usage.
"""
OPTIMISTIC = auto()
PESSIMISTIC = auto()

View File

@ -88,17 +88,18 @@ def _sort_gpg_recipients(gpg_to) -> Tuple[recpt.RecipientList, recpt.RecipientLi
for rcpt in gpg_to:
# Checking pre defined styles in settings first
if conf.config_item_equals('pgp_style', rcpt.email(), 'mime'):
style = conf.PGPStyle.from_config('pgp_style', rcpt.email())
if style is conf.PGPStyle.MIME:
recipients_mime.append(rcpt.email())
keys_mime.extend(rcpt.key().split(','))
elif conf.config_item_equals('pgp_style', rcpt.email(), 'inline'):
elif style is conf.PGPStyle.INLINE:
recipients_inline.append(rcpt.email())
keys_inline.extend(rcpt.key().split(','))
else:
# Log message only if an unknown style is defined
if conf.config_item_set('pgp_style', rcpt.email()):
LOG.debug("Style %s for recipient %s is not known. Use default as fallback."
% (conf.get_item("pgp_style", rcpt.email()), rcpt.email()))
LOG.debug("Style %s for recipient %s is not known. Use default as fallback.",
conf.get_item("pgp_style", rcpt.email()), rcpt.email())
# If no style is in settings defined for recipient, use default from settings
if default_to_pgp_mime:

View File

@ -6,7 +6,7 @@ module.
import lacre.config as conf
from lacre._keyringcommon import KeyRing, KeyCache
from lacre.repositories import IdentityRepository
from lacre.repositories import IdentityRepository, init_engine
import logging
LOG = logging.getLogger(__name__)
@ -15,7 +15,8 @@ LOG = logging.getLogger(__name__)
def init_keyring() -> KeyRing:
"""Initialise appropriate type of keyring."""
url = conf.get_item('database', 'url')
return IdentityRepository(db_url=url)
db_engine = init_engine(url)
return IdentityRepository(engine=db_engine)
def freeze_and_load_keys() -> KeyCache:

View File

@ -8,8 +8,6 @@ from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import markdown
# Read configuration from /etc/gpg-mailgate.conf
conf.load_config()
LOG = logging.getLogger(__name__)
@ -50,6 +48,7 @@ def notify(mailsubject, messagefile, recipients = None):
(host, port) = conf.relay_params()
smtp = smtplib.SMTP(host, port)
_authenticate_maybe(smtp)
LOG.info('Delivering notification: %s', recipients)
smtp.sendmail(conf.get_item('cron', 'notification_email'), recipients, msg.as_string())
else:
LOG.info("Could not send mail due to wrong configuration")
LOG.warning("Could not send mail due to wrong configuration")

View File

@ -4,31 +4,55 @@ from sqlalchemy import create_engine, select, delete, and_, func
from sqlalchemy.exc import OperationalError
import logging
from lacre.config import flag_enabled
from lacre.config import flag_enabled, config_item_set, get_item, PoolingMode
from lacre._keyringcommon import KeyRing, KeyCache
import lacre.dbschema as db
LOG = logging.getLogger(__name__)
_HOUR_IN_SECONDS = 3600
# Internal state
_engine = None
def connect(url):
def init_engine(url):
global _engine
if not _engine:
_engine = create_engine(url)
config = _conn_config()
_engine = create_engine(url, **config)
return _engine
def _conn_config():
config = dict()
mode = PoolingMode.from_config('database', 'pooling_mode', required=True)
if mode is PoolingMode.OPTIMISTIC:
# Optimistic distonnect-handling: recycle connections.
config['pool_recycle'] = int(get_item('database', 'max_connection_age', _HOUR_IN_SECONDS))
elif mode is PoolingMode.PESSIMISTIC:
# Pessimistic disconnect-handling: pre_ping.
config['pool_pre_ping'] = True
# Additional pool settings
if config_item_set('database', 'pool_size'):
config['pool_size'] = int(get_item('database', 'pool_size'))
if config_item_set('database', 'max_overflow'):
config['max_overflow'] = int(get_item('database', 'max_overflow'))
LOG.debug('Database engine configuration: %s', config)
return config
return _engine.connect()
class IdentityRepository(KeyRing):
def __init__(self, /, connection=None, db_url=None):
def __init__(self, /, connection=None, *, engine):
self._identities = db.LACRE_IDENTITIES
self._conn = connection
self._url = db_url
self._initialised = connection is not None
self._engine = engine
def register_or_update(self, email, fprint):
assert email, "email is mandatory"
@ -40,44 +64,39 @@ class IdentityRepository(KeyRing):
self._insert(email, fprint)
def _exists(self, email: str) -> bool:
self._ensure_connected()
selq = select(self._identities.c.email).where(self._identities.c.email == email)
return [e for e in self._conn.execute(selq)]
with self._engine.connect() as conn:
return [e for e in conn.execute(selq)]
def _insert(self, email, fprint):
self._ensure_connected()
insq = self._identities.insert().values(email=email, fingerprint=fprint)
LOG.debug('Registering identity %s: %s', email, insq)
self._conn.execute(insq)
with self._engine.connect() as conn:
conn.execute(insq)
def _update(self, email, fprint):
self._ensure_connected()
upq = self._identities.update() \
.values(fingerprint=fprint) \
.where(self._identities.c.email == email)
LOG.debug('Updating identity %s: %s', email, upq)
self._conn.execute(upq)
def _ensure_connected(self):
if not self._initialised:
LOG.debug('Connecting with %s', self._url)
self._conn = connect(self._url)
with self._engine.connect() as conn:
conn.execute(upq)
def delete(self, email):
self._ensure_connected()
delq = delete(self._identities).where(self._identities.c.email == email)
LOG.debug('Deleting keys assigned to %s', email)
self._conn.execute(delq)
with self._engine.connect() as conn:
conn.execute(delq)
def delete_all(self):
LOG.warn('Deleting all identities from the database')
delq = delete(self._identities)
self._conn.execute(delq)
with self._engine.connect() as conn:
conn.execute(delq)
def freeze_identities(self) -> KeyCache:
"""Return a static, async-safe copy of the identity map.
@ -86,7 +105,6 @@ class IdentityRepository(KeyRing):
if we get a database exception, this method will either return
empty collection or let the exception be propagated.
"""
self._ensure_connected()
try:
return self._load_identities()
except OperationalError:
@ -98,9 +116,10 @@ class IdentityRepository(KeyRing):
def _load_identities(self) -> KeyCache:
all_identities = select(self._identities.c.fingerprint, self._identities.c.email)
result = self._conn.execute(all_identities)
LOG.debug('Retrieving all keys')
return KeyCache({key_id: email for key_id, email in result})
with self._engine.connect() as conn:
result = conn.execute(all_identities)
LOG.debug('Retrieving all keys')
return KeyCache({key_id: email for key_id, email in result})
class KeyConfirmationQueue:
@ -109,9 +128,9 @@ class KeyConfirmationQueue:
# Default number of items retrieved from the database.
keys_read_max = 100
def __init__(self, connection):
def __init__(self, /, engine):
self._keys = db.LACRE_KEYS
self._conn = connection
self._engine = engine
def fetch_keys(self, /, max_keys=None):
"""Runs a query to retrieve at most `keys_read_max` keys and returns db result."""
@ -122,37 +141,52 @@ class KeyConfirmationQueue:
.limit(max_keys)
LOG.debug('Retrieving keys to be processed: %s', selq)
return self._conn.execute(selq)
with self._engine.connect() as conn:
return conn.execute(selq)
def count_keys(self):
selq = select(func.count(self._keys.c.id))
LOG.debug('Counting all keys: %s', selq)
try:
c = [cnt for cnt in self._conn.execute(selq)]
with self._engine.connect() as conn:
c = [cnt for cnt in conn.execute(selq)]
# Result is an iterable of tuples:
return c[0][0]
# Result is an iterable of tuples:
return c[0][0]
except OperationalError:
LOG.exception('Cannot count keys')
return None
def fetch_keys_to_delete(self):
seldel = select(self._keys.c.email, self._keys.c.id).where(self._keys.c.status == db.ST_TO_BE_DELETED).limit(self.keys_read_max)
return self._conn.execute(seldel)
with self._engine.connect() as conn:
return conn.execute(seldel)
def delete_keys(self, row_id, /, email=None):
"""Remove key from the database."""
if email is not None:
delq = delete(self._keys).where(and_(self._keys.c.email == email, self._keys.c.id != row_id))
LOG.debug('Deleting key: id=%s, email=%s', row_id, email)
delq = delete(self._keys).where(and_(self._keys.c.email == email, self._keys.c.id == row_id))
else:
delq = delete(self._keys).where(self._keys.c.id != row_id)
LOG.debug('Deleting key: id=%s', row_id)
delq = delete(self._keys).where(self._keys.c.id == row_id)
LOG.debug('Deleting public keys associated with confirmed email: %s', delq)
self._conn.execute(delq)
with self._engine.connect() as conn:
LOG.debug('Deleting public keys associated with confirmed email: %s', delq)
return conn.execute(delq)
def delete_key_by_email(self, email):
"""Remove keys linked to the given email from the database."""
delq = delete(self._keys).where(self._keys.c.email == email)
LOG.debug('Deleting email for: %s', email)
with self._engine.connect() as conn:
conn.execute(delq)
def mark_accepted(self, row_id):
modq = self._keys.update().where(self._keys.c.id == row_id).values(status=db.ST_IMPORTED)
LOG.debug("Key imported, updating key: %s", modq)
self._conn.execute(modq)
with self._engine.connect() as conn:
conn.execute(modq)

View File

@ -13,6 +13,7 @@ class ExecutionTimeLogger:
def __enter__(self):
self._start = time.process_time()
self._log.info('Start: %s', self._message)
def __exit__(self, exc_type=None, exc_value=None, traceback=None):
end = time.process_time()

View File

@ -42,6 +42,7 @@ def _build_config(config):
cp.add_section('database')
cp.set('database', 'enabled', 'yes')
cp.set('database', 'url', 'sqlite:///test/lacre.db')
cp.set('database', 'pooling_mode', 'optimistic')
cp.add_section("smime")
cp.set("smime", "cert_path", config["smime_certpath"])

View File

@ -13,6 +13,7 @@ cert_path = test/certs
[database]
enabled = yes
url = sqlite:///test/lacre.db
pooling_mode = optimistic
[relay]
host = localhost

View File

@ -2,13 +2,23 @@
import unittest
import lacre.config as conf
import lacre.repositories as r
import lacre.dbschema as s
def ignore_sql(sql, *args, **kwargs):
pass
class IdentityRepositoryTest(unittest.TestCase):
def test_x(self):
ir = r.IdentityRepository(db_url='sqlite:///test/lacre.db')
def setUpClass():
# required for init_engine to work
conf.load_config()
def test_freeze_identities(self):
eng = r.init_engine('sqlite:///test/lacre.db')
ir = r.IdentityRepository(engine=eng)
identities = ir.freeze_identities()
self.assertTrue(identities)

View File

@ -19,6 +19,7 @@
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
#
import sys
import logging
import lacre
import lacre.config as conf
@ -31,7 +32,7 @@ lacre.init_logging(conf.get_item('logging', 'config'))
LOG = logging.getLogger('webgate-cron.py')
import GnuPG
from lacre.repositories import KeyConfirmationQueue, IdentityRepository, connect
from lacre.repositories import KeyConfirmationQueue, IdentityRepository, init_engine
def _validate_config():
@ -43,11 +44,17 @@ def _validate_config():
_validate_config()
if conf.flag_enabled('database', 'enabled') and conf.config_item_set('database', 'url'):
conn = connect(conf.get_item('database', 'url'))
if not (conf.flag_enabled('database', 'enabled') and conf.config_item_set('database', 'url')):
print("Warning: doing nothing since database settings are not configured!")
LOG.error("Warning: doing nothing since database settings are not configured!")
sys.exit(lacre.EX_CONFIG)
identities = IdentityRepository(conn)
key_queue = KeyConfirmationQueue(conn)
try:
db_engine = init_engine(conf.get_item('database', 'url'))
identities = IdentityRepository(engine=db_engine)
key_queue = KeyConfirmationQueue(engine=db_engine)
key_dir = conf.get_item('gpg', 'keyhome')
LOG.debug('Using GnuPG with home directory in %s', key_dir)
@ -80,15 +87,21 @@ if conf.flag_enabled('database', 'enabled') and conf.config_item_set('database',
notify("PGP key registration failed", "registrationError.md", email)
else:
# delete key so we don't continue processing it
LOG.debug('Empty key received, just deleting')
key_queue.delete_keys(row_id)
if conf.flag_enabled('cron', 'send_email'):
notify("PGP key deleted", "keyDeleted.md", email)
LOG.info('Cleaning up after a round of key confirmation')
stat2_result_set = key_queue.fetch_keys_to_delete()
for email, row_id in stat2_result_set:
LOG.debug('Removing key from keyring: %s', email)
GnuPG.delete_key(key_dir, email)
LOG.debug('Removing key from identity store: %s', row_id)
key_queue.delete_keys(row_id)
LOG.info('Deleted key for <%s>', email)
else:
print("Warning: doing nothing since database settings are not configured!")
LOG.error("Warning: doing nothing since database settings are not configured!")
except:
LOG.exception('Unexpected issue during key confirmation')