diff --git a/lacre/daemon.py b/lacre/daemon.py index bbc7bb3..aca0d21 100644 --- a/lacre/daemon.py +++ b/lacre/daemon.py @@ -3,6 +3,7 @@ import logging import lacre import lacre.config as conf +import lacre.keycache as kcache import sys from aiosmtpd.controller import Controller from aiosmtpd.smtp import Envelope @@ -28,11 +29,15 @@ import lacre.mailgate as gate class MailEncryptionProxy: """A mail handler dispatching to appropriate mail operation.""" + def __init__(self, keys: kcache.KeyCache): + """Initialise the mail proxy with a reference to the key cache.""" + self._keys = keys + async def handle_DATA(self, server, session, envelope: Envelope): """Accept a message and either encrypt it or forward as-is.""" try: message = email.message_from_bytes(envelope.content) - for operation in gate.delivery_plan(envelope.rcpt_tos): + for operation in gate.delivery_plan(envelope.rcpt_tos, self._keys): LOG.debug(f"Sending mail via {operation!r}") new_message = operation.perform(message) gate.send_msg(new_message, operation.recipients(), envelope.mail_from) @@ -43,8 +48,8 @@ class MailEncryptionProxy: return RESULT_NOT_IMPLEMENTED -def _init_controller(): - proxy = MailEncryptionProxy() +def _init_controller(keys: kcache.KeyCache): + proxy = MailEncryptionProxy(keys) host, port = conf.daemon_params() LOG.info(f"Initialising a mail Controller at {host}:{port}") return Controller(proxy, hostname=host, port=port) @@ -70,7 +75,9 @@ async def _sleep(): def _main(): _validate_config() - controller = _init_controller() + keys = kcache.KeyCache() + keys.load_keyring(conf.get_item('gpg', 'keyhome')) + controller = _init_controller(keys) LOG.info("Starting the daemon...") # starts the controller in a new thread diff --git a/lacre/keycache.py b/lacre/keycache.py new file mode 100644 index 0000000..7ec3de5 --- /dev/null +++ b/lacre/keycache.py @@ -0,0 +1,51 @@ +"""A cache of OpenPGP keys known to Lacre.""" + +import lacre.text as text + +import GnuPG + + +class KeyCache: + """A store for OpenPGP keys. + + Key case is sanitised while loading from GnuPG if so + configured. See mail_case_insensitive parameter in section + [default]. + """ + + def __init__(self): + """Initialise an empty cache.""" + self._keys = dict() + + def __getitem__(self, email): + return self._keys[email] + + def __setitem__(self, email, fingerprint): + self._keys[email] = fingerprint + + def __contains__(self, fingerprint): + """Check if the given fingerprint is assigned to an email.""" + # This method has to be present for KeyCache to be a dict substitute. + # See mailgate, function _identify_gpg_recipients. + return fingerprint in self._keys + + def has_email(self, email): + return email in self._keys.values() + + def load_keyring(self, keyhome: str): + """Add all identities from keyring stored in KEYHOME.""" + self.extend_keyring(GnuPG.public_keys(keyhome)) + + def replace_keyring(self, keys: dict): + """Overwrite previously stored key cache with KEYS.""" + for fingerprint in keys: + keys[fingerprint] = text.sanitize_case_sense(keys[fingerprint]) + + self._keys = keys + + def extend_keyring(self, keys: dict): + """Add all keys from KEYS.""" + for fingerprint in keys: + keys[fingerprint] = text.sanitize_case_sense(keys[fingerprint]) + + self._keys.update(keys) diff --git a/lacre/mailgate.py b/lacre/mailgate.py index 7e634e9..35ae9ec 100644 --- a/lacre/mailgate.py +++ b/lacre/mailgate.py @@ -34,6 +34,7 @@ from M2Crypto import BIO, SMIME, X509 import logging import lacre.text as text import lacre.config as conf +import lacre.keycache as kcache from lacre.mailop import KeepIntact, InlineOpenPGPEncrypt, MimeOpenPGPEncrypt @@ -45,9 +46,7 @@ def _gpg_encrypt(raw_message, recipients): LOG.error("No valid entry for gpg keyhome. Encryption aborted.") return recipients - # TODO: gpg_to contains objects, not tuples, so we need to access them - # appropriately - gpg_to, ungpg_to = _identify_gpg_recipients(recipients) + gpg_to, ungpg_to = _identify_gpg_recipients(recipients, _load_keys()) LOG.info(f"Got addresses: gpg_to={gpg_to!r}, ungpg_to={ungpg_to!r}") @@ -132,10 +131,8 @@ def _customise_headers(msg_copy): def _load_keys(): """Return a map from a key's fingerprint to email address.""" - keys = GnuPG.public_keys(conf.get_item('gpg', 'keyhome')) - for fingerprint in keys: - keys[fingerprint] = _sanitize_case_sense(keys[fingerprint]) - + keys = kcache.KeyCache() + keys.load_keyring(conf.get_item('gpg', 'keyhome')) return keys @@ -167,7 +164,7 @@ class GpgRecipient: return self._right -def _identify_gpg_recipients(recipients): +def _identify_gpg_recipients(recipients, keys: kcache.KeyCache): # This list will be filled with pairs (M, N), where M is the destination # address we're going to deliver the message to and N is the identity we're # going to encrypt it for. @@ -182,7 +179,6 @@ def _identify_gpg_recipients(recipients): strict_mode = conf.strict_mode() # GnuPG keys found in our keyring. - keys = _load_keys() LOG.info(f'Processisng recipients: {recipients!r}; keys: {keys!r}') for to in recipients: @@ -239,12 +235,12 @@ def _try_direct_key_lookup(recipient, keys, strict_mode): if strict_mode: return None - if recipient in keys.values(): + if keys.has_email(recipient): LOG.info(f"Found key for {recipient}") return recipient, recipient (newto, topic) = text.parse_delimiter(recipient) - if newto in keys.values(): + if keys.has_email(newto): LOG.info(f"Found key for {newto}, stripped {recipient}") return recipient, newto @@ -449,17 +445,6 @@ def _get_cert_for_email(to_addr, cert_path): return _get_cert_for_email(fixed_up_email, cert_path) -def _sanitize_case_sense(address): - if conf.config_item_equals('default', 'mail_case_insensitive', 'yes'): - address = address.lower() - else: - splitted_address = address.split('@') - if len(splitted_address) > 1: - address = splitted_address[0] + '@' + splitted_address[1].lower() - - return address - - def _generate_message_from_payloads(payloads, message=None): if message is None: message = email.mime.multipart.MIMEMultipart(payloads.get_content_subtype()) @@ -511,9 +496,9 @@ def _is_encrypted(raw_message): return text.is_pgp_inline(first_payload) -def delivery_plan(recipients): +def delivery_plan(recipients, key_cache: kcache.KeyCache): """Generate a sequence of delivery strategies.""" - gpg_to, ungpg_to = _identify_gpg_recipients(recipients) + gpg_to, ungpg_to = _identify_gpg_recipients(recipients, key_cache) gpg_mime_to, gpg_mime_cmd, gpg_inline_to, gpg_inline_cmd = \ _sort_gpg_recipients(gpg_to) @@ -538,7 +523,7 @@ def deliver_message(raw_message: email.message.Message, from_address, to_addrs): # Ugly workaround to keep the code working without too many changes. from_addr = from_address - recipients_left = [_sanitize_case_sense(recipient) for recipient in to_addrs] + recipients_left = [text.sanitize_case_sense(recipient) for recipient in to_addrs] # There is no need for nested encryption LOG.debug("Seeing if it's already encrypted") diff --git a/lacre/text.py b/lacre/text.py index 6980b3a..3f9cc6a 100644 --- a/lacre/text.py +++ b/lacre/text.py @@ -2,6 +2,8 @@ import sys import re import logging +import lacre.config as conf + # The standard way to encode line-ending in email: EOL = "\r\n" @@ -47,6 +49,19 @@ def parse_delimiter(address: str): return (address, None) +def sanitize_case_sense(address): + """Sanitize email case.""" + # TODO: find a way to make it more unit-testable + if conf.flag_enabled('default', 'mail_case_insensitive'): + address = address.lower() + else: + splitted_address = address.split('@') + if len(splitted_address) > 1: + address = splitted_address[0] + '@' + splitted_address[1].lower() + + return address + + def is_pgp_inline(payload) -> bool: """Find out if the payload (bytes) contains PGP/INLINE markers.""" return PGP_INLINE_BEGIN in payload and PGP_INLINE_END in payload diff --git a/test/modules/test_lacre_keycache.py b/test/modules/test_lacre_keycache.py new file mode 100644 index 0000000..2f0be90 --- /dev/null +++ b/test/modules/test_lacre_keycache.py @@ -0,0 +1,42 @@ +from lacre.keycache import KeyCache + +import unittest + +class LacreKeyCacheTest(unittest.TestCase): + def test_extend_keyring(self): + kc = KeyCache() + + self.assertFalse('FINGERPRINT' in kc) + + kc.extend_keyring({'FINGERPRINT': 'john.doe@example.com'}) + + self.assertTrue('FINGERPRINT' in kc) + + def test_membership_methods(self): + kc = KeyCache() + + kc.extend_keyring({ + 'FINGERPRINT': 'alice@example.com', + 'OTHERPRINT': 'bob@example.com' + }) + + self.assertTrue('FINGERPRINT' in kc) + self.assertTrue(kc.has_email('bob@example.com')) + + def test_keyring_replacement(self): + kc = KeyCache() + + kc.extend_keyring({ + 'FINGERPRINT': 'alice@example.com', + 'OTHERPRINT': 'bob@example.com' + }) + + self.assertTrue('FINGERPRINT' in kc) + + kc.replace_keyring({ + 'FOO': 'foo@example.com', + 'BAR': 'bar@example.com' + }) + + self.assertFalse('FINGERPRINT' in kc) + self.assertTrue('FOO' in kc)