Extract parts of cron script to modules

Introduce new Python modules:

- lacre.notify -- to send notifications from the cron script;

- lacre.dbschema -- to keep database schema definition as code (SQLAlchemy);

- lacre.repositories -- to define key and identity repositories with high
  level APIs that we can then use elsewhere.

Also:

- rework GnuPG.add_key to return fingerprint so we can use it in the cron
  script;

- rename cron-job's logger name, replacing dash with an underscore as logging
  module doesn't like dashes.
This commit is contained in:
Piotr F. Mieszkowski 2023-11-01 21:30:26 +01:00
parent bf677585be
commit 9bbc86bc53
6 changed files with 233 additions and 109 deletions

View File

@ -62,9 +62,12 @@ def _build_command(key_home, *args, **kwargs):
return cmd
def public_keys(keyhome):
def public_keys(keyhome, *, key_id=None):
"""List public keys from keyring KEYHOME."""
cmd = _build_command(keyhome, '--list-keys', '--with-colons')
if key_id is not None:
cmd.append(key_id)
p = subprocess.Popen(cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.wait()
@ -106,13 +109,10 @@ def confirm_key(content, email: str):
tmpkeyhome = tempfile.mkdtemp()
localized_env = os.environ.copy()
localized_env["LANG"] = "C"
p = subprocess.Popen(_build_command(tmpkeyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=localized_env)
result = p.communicate(input=content)[1]
result = _import_key(tmpkeyhome, content)
confirmed = False
for line in result.split(b'\n'):
for line in result.splitlines():
found = RX_CONFIRM.search(line)
if found:
(_, extracted_email) = parseaddr(found.group(1).decode())
@ -124,14 +124,40 @@ def confirm_key(content, email: str):
return confirmed
def _import_key(keyhome, content):
content = _to_bytes(content)
# Ensure we get expected output regardless of the system locale.
localized_env = os.environ.copy()
localized_env["LANG"] = "C"
p = subprocess.Popen(_build_command(keyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=localized_env)
output = p.communicate(input=content)[1]
p.wait()
return output
# adds a key and ensures it has the given email address
def add_key(keyhome, content):
"""Register new key CONTENT in the keyring KEYHOME."""
if isinstance(content, str):
content = bytes(content, sys.getdefaultencoding())
p = subprocess.Popen(_build_command(keyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.communicate(input=content)
p.wait()
output = _import_key(keyhome, content)
email = None
for line in output.splitlines():
found = RX_CONFIRM.search(line)
if found:
(_, extracted_email) = parseaddr(found.group(1).decode())
email = extracted_email
# Find imported key to get its fingerprint
imported = public_keys(keyhome, key_id=email)
if len(imported.keys()) == 1:
fingerprint = list(imported.keys())[0]
return fingerprint, imported[fingerprint]
else:
return None, None
def delete_key(keyhome, email):

View File

@ -22,6 +22,10 @@ MANDATORY_CONFIG_ITEMS = [("relay", "host"),
("daemon", "port"),
("gpg", "keyhome")]
CRON_REQUIRED = [('database', 'enabled'),
('database', 'url'),
('cron', 'mail_templates')]
# Global dict to keep configuration parameters. It's hidden behind several
# utility functions to make it easy to replace it with ConfigParser object in
# the future.
@ -90,16 +94,23 @@ def flag_enabled(section, key) -> bool:
return config_item_equals(section, key, 'yes')
def validate_config():
def validate_config(*, additional=None):
"""Check if configuration is complete.
Returns a list of missing parameters, so an empty list means
configuration is complete.
If 'additional' parameter is specified, it should be a list of
tuples (section, param).
"""
missing = []
for (section, param) in MANDATORY_CONFIG_ITEMS:
if not config_item_set(section, param):
missing.append((section, param))
if additional:
for (section, param) in additional:
if not config_item_set(section, param):
missing.append((section, param))
return missing

32
lacre/dbschema.py Normal file
View File

@ -0,0 +1,32 @@
"""Database schema for Lacre.
This definition includes:
- 'gpgmw_keys' -- temporary key storage, used by the frontend to submit keys and
by webgate-cron script to import submitted keys.
- 'gpgmw_identities' -- identity catalogue, used by encryption logic to match
emails with corresponding keys.
"""
import sqlalchemy
# Values for gpgmw_keys.status column:
ST_DEFAULT, ST_IMPORTED, ST_TO_BE_DELETED = range(3)
_meta = sqlalchemy.MetaData()
GPGMW_KEYS = sqlalchemy.Table('gpgmw_keys', _meta,
sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column('email', sqlalchemy.String(256)),
# ASCII-armored key
sqlalchemy.Column('publickey', sqlalchemy.Text),
sqlalchemy.Column('confirm', sqlalchemy.String(32)),
# Status: see ST_* constants at the top of the file.
sqlalchemy.Column('status', sqlalchemy.Integer),
sqlalchemy.Column('time', sqlalchemy.DateTime))
GPGMW_IDENTITIES = sqlalchemy.Table('gpgmw_identities', _meta,
sqlalchemy.Column('email', sqlalchemy.String(256), index=True),
# Key fingerprint
sqlalchemy.Column('key_id', sqlalchemy.String(64), index=True))

52
lacre/notify.py Normal file
View File

@ -0,0 +1,52 @@
"""Lacre notification sender"""
import logging
import lacre
import lacre.config as conf
# Read configuration from /etc/gpg-mailgate.conf
conf.load_config()
lacre.init_logging(conf.get_item('logging', 'config'))
LOG = logging.getLogger('webgate-cron.py')
def _load_file(name):
f = open(name)
data = f.read()
f.close()
return data
def _authenticate_maybe(smtp):
if conf.config_item_equals('smtp', 'enabled', 'true'):
LOG.debug(f"Connecting to {conf.get_item('smtp', 'host')}:{conf.get_item('smtp', 'port')}")
smtp.connect(conf.get_item('smtp', 'host'), conf.get_item('smtp', 'port'))
smtp.ehlo()
if conf.config_item_equals('smtp', 'starttls', 'true'):
LOG.debug("StartTLS enabled")
smtp.starttls()
smtp.ehlo()
smtp.login(conf.get_item('smtp', 'username'), conf.get_item('smtp', 'password'))
def notify(mailsubject, messagefile, recipients = None):
"""Send notification email."""
mailbody = _load_file(conf.get_item('cron', 'mail_templates') + "/" + messagefile)
msg = MIMEMultipart("alternative")
msg["From"] = conf.get_item('cron', 'notification_email')
msg["To"] = recipients
msg["Subject"] = mailsubject
msg.attach(MIMEText(mailbody, 'plain'))
msg.attach(MIMEText(markdown.markdown(mailbody), 'html'))
if conf.config_item_set('relay', 'host') and conf.config_item_set('relay', 'enc_port'):
(host, port) = conf.relay_params()
smtp = smtplib.SMTP(host, port)
_authenticate_maybe(smtp)
smtp.sendmail(conf.get_item('cron', 'notification_email'), recipients, msg.as_string())
else:
LOG.info("Could not send mail due to wrong configuration")

58
lacre/repositories.py Normal file
View File

@ -0,0 +1,58 @@
"""Lacre identity and key repositories."""
from sqlalchemy import select, delete, and_
import logging
import lacre.dbschema as db
LOG = logging.getLogger(__name__)
class IdentityRepository:
def __init__(self, identity_table, connection):
self._identities = identity_table
self._conn = connection
def register(self, email, fingerprint):
# TODO: upsert
self._identities.insert().values(email=email, fingerprint=fingerprint)
class KeyConfirmationQueue:
"""Encapsulates access to gpgmw_keys table."""
keys_read_max = 100
def __init__(self, keys_table, connection):
self._keys = keys_table
self._conn = connection
def fetch_keys(self, /, max_keys=None):
"""Runs a query to retrieve at most `keys_read_max` keys and returns db result."""
max_keys = max_keys or self.keys_read_max
selq = select(self._keys.c.publickey, self._keys.c.id, self._keys.c.email)\
.where(and_(self._keys.c.status == db.ST_DEFAULT, self._keys.c.confirm == ""))\
.limit(max_keys)
LOG.debug('Retrieving keys to be processed: %s', selq)
return self._conn.execute(selq)
def fetch_keys_to_delete(self):
seldel = select(self._keys.c.email, self._keys.c.id).where(self._keys.c.status == db.ST_TO_BE_DELETED).limit(self.keys_read_max)
return self._conn.execute(seldel)
def delete_keys(self, row_id, email=None):
if email is not None:
delq = delete(self._keys).where(and_(self._keys.c.email == email, self._keys.c.id != row_id))
else:
delq = delete(self._keys).where(self._keys.c.id != row_id)
LOG.debug(f'Deleting public keys associated with confirmed email: {delq}')
self._conn.execute(delq)
def mark_accepted(self, row_id):
modq = self._keys.update().where(self._keys.c.id == row_id).values(status=db.ST_IMPORTED)
LOG.debug("Key imported, updating key: %s", modq)
self._conn.execute(modq)

View File

@ -20,7 +20,6 @@
#
import sqlalchemy
from sqlalchemy.sql import select, delete, and_
import smtplib
import markdown
from email.mime.text import MIMEText
@ -29,137 +28,83 @@ from email.mime.multipart import MIMEMultipart
import logging
import lacre
import lacre.config as conf
import lacre.dbschema as db
from lacre.repositories import KeyConfirmationQueue, IdentityRepository
from lacre.notify import notify
# Read configuration from /etc/gpg-mailgate.conf
conf.load_config()
lacre.init_logging(conf.get_item('logging', 'config'))
LOG = logging.getLogger('webgate-cron.py')
LOG = logging.getLogger('webgate_cron.py')
import GnuPG
def _load_file(name):
f = open(name)
data = f.read()
f.close()
return data
def _authenticate_maybe(smtp):
if conf.config_item_equals('smtp', 'enabled', 'true'):
LOG.debug(f"Connecting to {conf.get_item('smtp', 'host')}:{conf.get_item('smtp', 'port')}")
smtp.connect(conf.get_item('smtp', 'host'), conf.get_item('smtp', 'port'))
smtp.ehlo()
if conf.config_item_equals('smtp', 'starttls', 'true'):
LOG.debug("StartTLS enabled")
smtp.starttls()
smtp.ehlo()
smtp.login(conf.get_item('smtp', 'username'), conf.get_item('smtp', 'password'))
def _send_msg(mailsubject, messagefile, recipients = None):
mailbody = _load_file(conf.get_item('cron', 'mail_templates') + "/" + messagefile)
msg = MIMEMultipart("alternative")
msg["From"] = conf.get_item('cron', 'notification_email')
msg["To"] = recipients
msg["Subject"] = mailsubject
msg.attach(MIMEText(mailbody, 'plain'))
msg.attach(MIMEText(markdown.markdown(mailbody), 'html'))
if conf.config_item_set('relay', 'host') and conf.config_item_set('relay', 'enc_port'):
relay = (conf.get_item('relay', 'host'), int(conf.get_item('relay', 'enc_port')))
smtp = smtplib.SMTP(relay[0], relay[1])
_authenticate_maybe(smtp)
smtp.sendmail(conf.get_item('cron', 'notification_email'), recipients, msg.as_string())
else:
LOG.info("Could not send mail due to wrong configuration")
def _setup_db_connection(url):
engine = sqlalchemy.create_engine(url)
LOG.debug('Initialised database engine: %s', engine)
return (engine, engine.connect())
def _define_db_schema():
meta = sqlalchemy.MetaData()
gpgmw_keys = sqlalchemy.Table('gpgmw_keys', meta,
sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column('email', sqlalchemy.String(256)),
sqlalchemy.Column('publickey', sqlalchemy.Text),
sqlalchemy.Column('confirm', sqlalchemy.String(32)),
sqlalchemy.Column('status', sqlalchemy.Integer),
sqlalchemy.Column('time', sqlalchemy.DateTime))
identities = sqlalchemy.Table('gpgmw_identities', meta,
sqlalchemy.Column('email', sqlalchemy.String(256), index=True),
sqlalchemy.Column('key_id', sqlalchemy.String(64), index=True))
return (gpgmw_keys, identities)
return (db.GPGMW_KEYS, db.GPGMW_IDENTITIES)
if conf.config_item_equals('database', 'enabled', 'yes') and conf.config_item_set('database', 'url'):
def _validate_config():
missing = conf.validate_config(additional=conf.CRON_REQUIRED)
if missing:
LOG.error('Missing config parameters: %s', missing)
_validate_config()
if conf.flag_enabled('database', 'enabled') and conf.config_item_set('database', 'url'):
(engine, conn) = _setup_db_connection(conf.get_item("database", "url"))
(gpgmw_keys, gpgmw_identities) = _define_db_schema()
selq = select(gpgmw_keys.c.publickey, gpgmw_keys.c.id, gpgmw_keys.c.email)\
.where(and_(gpgmw_keys.c.status == 0, gpgmw_keys.c.confirm == ""))\
.limit(100)
LOG.debug(f"Retrieving keys to be processed: {selq}")
result_set = conn.execute(selq)
identities = IdentityRepository(gpgmw_identities, conn)
key_queue = KeyConfirmationQueue(gpgmw_keys, conn)
key_dir = conf.get_item('gpg', 'keyhome')
LOG.debug('Using GnuPG with home directory in %s', key_dir)
result_set = key_queue.fetch_keys()
for armored_key, row_id, email in result_set:
# delete any other public keys associated with this confirmed email address
delq = delete(gpgmw_keys).where(and_(gpgmw_keys.c.email == email, gpgmw_keys.c.id != row_id))
LOG.debug(f"Deleting public keys associated with confirmed email: {delq}")
conn.execute(delq)
GnuPG.delete_key(conf.get_item('gpg', 'keyhome'), email)
key_queue.delete_keys(row_id, email)
GnuPG.delete_key(key_dir, email)
LOG.info('Deleted key for <%s> via import request', email)
if armored_key.strip(): # we have this so that user can submit blank key to remove any encryption
if GnuPG.confirm_key(armored_key, email):
# import the key to gpg
GnuPG.add_key(conf.get_item('gpg', 'keyhome'), armored_key)
(fingerprint, _) = GnuPG.add_key(key_dir, armored_key)
# mark key as accepted
modq = gpgmw_keys.update().where(gpgmw_keys.c.id == row_id).values(status=1)
LOG.debug("Key imported, updating key: %s", modq)
conn.execute(modq)
key_queue.mark_accepted(row_id)
identities.register(email, fingerprint)
# add to identity database
update_identity = gpgmw_identities.insert().values(email=email, key_id=armored_key)
conn.execute(update_identity)
LOG.warning('Imported key from <%s>', email)
if conf.config_item_equals('cron', 'send_email', 'yes'):
_send_msg("PGP key registration successful", "registrationSuccess.md", email)
LOG.info('Imported key from <%s>', email)
if conf.flag_enabled('cron', 'send_email'):
notify("PGP key registration successful", "registrationSuccess.md", email)
else:
delq = delete(gpgmw_keys).where(gpgmw_keys.c.id == row_id)
LOG.debug(f"Cannot confirm key, deleting it: {delq}")
conn.execute(delq) # delete key
LOG.warning('Import confirmation failed for <' + email + '>')
if conf.config_item_equals('cron', 'send_email', 'yes'):
_send_msg("PGP key registration failed", "registrationError.md", email)
key_queue.delete_keys(row_id)
LOG.warning('Import confirmation failed for <%s>', email)
if conf.flag_enabled('cron', 'send_email'):
notify("PGP key registration failed", "registrationError.md", email)
else:
# delete key so we don't continue processing it
delq = delete(gpgmw_keys).where(gpgmw_keys.c.id == row_id)
LOG.debug(f"Deleting key: {delq}")
conn.execute(delq)
if conf.config_item_equals('cron', 'send_email', 'yes'):
_send_msg("PGP key deleted", "keyDeleted.md", email)
# delete keys
stat2q = select(gpgmw_keys.c.email, gpgmw_keys.c.id).where(gpgmw_keys.c.status == 2).limit(100)
stat2_result_set = conn.execute(stat2q)
key_queue.delete_keys(row_id)
if conf.flag_enabled('cron', 'send_email'):
notify("PGP key deleted", "keyDeleted.md", email)
stat2_result_set = key_queue.fetch_keys_to_delete()
for email, row_id in stat2_result_set:
GnuPG.delete_key(conf.get_item('gpg', 'keyhome'), email)
delq = delete(gpgmw_keys).where(gpgmw_keys.c.id == row_id)
LOG.debug(f"Deleting keys that have already been processed: {delq}")
conn.execute(delq)
LOG.info('Deleted key for <' + email + '>')
GnuPG.delete_key(key_dir, email)
key_queue.delete_keys(row_id)
LOG.info('Deleted key for <%s>', email)
else:
print("Warning: doing nothing since database settings are not configured!")
LOG.error("Warning: doing nothing since database settings are not configured!")