Implement a basic KeyCache

This commit is contained in:
Piotr F. Mieszkowski 2022-09-30 22:40:42 +02:00 committed by Gitea
parent 07263d5afa
commit 5f601fa50c
5 changed files with 129 additions and 29 deletions

View File

@ -3,6 +3,7 @@
import logging
import lacre
import lacre.config as conf
import lacre.keycache as kcache
import sys
from aiosmtpd.controller import Controller
from aiosmtpd.smtp import Envelope
@ -28,11 +29,15 @@ import lacre.mailgate as gate
class MailEncryptionProxy:
"""A mail handler dispatching to appropriate mail operation."""
def __init__(self, keys: kcache.KeyCache):
"""Initialise the mail proxy with a reference to the key cache."""
self._keys = keys
async def handle_DATA(self, server, session, envelope: Envelope):
"""Accept a message and either encrypt it or forward as-is."""
try:
message = email.message_from_bytes(envelope.content)
for operation in gate.delivery_plan(envelope.rcpt_tos):
for operation in gate.delivery_plan(envelope.rcpt_tos, self._keys):
LOG.debug(f"Sending mail via {operation!r}")
new_message = operation.perform(message)
gate.send_msg(new_message, operation.recipients(), envelope.mail_from)
@ -43,8 +48,8 @@ class MailEncryptionProxy:
return RESULT_NOT_IMPLEMENTED
def _init_controller():
proxy = MailEncryptionProxy()
def _init_controller(keys: kcache.KeyCache):
proxy = MailEncryptionProxy(keys)
host, port = conf.daemon_params()
LOG.info(f"Initialising a mail Controller at {host}:{port}")
return Controller(proxy, hostname=host, port=port)
@ -70,7 +75,9 @@ async def _sleep():
def _main():
_validate_config()
controller = _init_controller()
keys = kcache.KeyCache()
keys.load_keyring(conf.get_item('gpg', 'keyhome'))
controller = _init_controller(keys)
LOG.info("Starting the daemon...")
# starts the controller in a new thread

51
lacre/keycache.py Normal file
View File

@ -0,0 +1,51 @@
"""A cache of OpenPGP keys known to Lacre."""
import lacre.text as text
import GnuPG
class KeyCache:
"""A store for OpenPGP keys.
Key case is sanitised while loading from GnuPG if so
configured. See mail_case_insensitive parameter in section
[default].
"""
def __init__(self):
"""Initialise an empty cache."""
self._keys = dict()
def __getitem__(self, email):
return self._keys[email]
def __setitem__(self, email, fingerprint):
self._keys[email] = fingerprint
def __contains__(self, fingerprint):
"""Check if the given fingerprint is assigned to an email."""
# This method has to be present for KeyCache to be a dict substitute.
# See mailgate, function _identify_gpg_recipients.
return fingerprint in self._keys
def has_email(self, email):
return email in self._keys.values()
def load_keyring(self, keyhome: str):
"""Add all identities from keyring stored in KEYHOME."""
self.extend_keyring(GnuPG.public_keys(keyhome))
def replace_keyring(self, keys: dict):
"""Overwrite previously stored key cache with KEYS."""
for fingerprint in keys:
keys[fingerprint] = text.sanitize_case_sense(keys[fingerprint])
self._keys = keys
def extend_keyring(self, keys: dict):
"""Add all keys from KEYS."""
for fingerprint in keys:
keys[fingerprint] = text.sanitize_case_sense(keys[fingerprint])
self._keys.update(keys)

View File

@ -34,6 +34,7 @@ from M2Crypto import BIO, SMIME, X509
import logging
import lacre.text as text
import lacre.config as conf
import lacre.keycache as kcache
from lacre.mailop import KeepIntact, InlineOpenPGPEncrypt, MimeOpenPGPEncrypt
@ -45,9 +46,7 @@ def _gpg_encrypt(raw_message, recipients):
LOG.error("No valid entry for gpg keyhome. Encryption aborted.")
return recipients
# TODO: gpg_to contains objects, not tuples, so we need to access them
# appropriately
gpg_to, ungpg_to = _identify_gpg_recipients(recipients)
gpg_to, ungpg_to = _identify_gpg_recipients(recipients, _load_keys())
LOG.info(f"Got addresses: gpg_to={gpg_to!r}, ungpg_to={ungpg_to!r}")
@ -132,10 +131,8 @@ def _customise_headers(msg_copy):
def _load_keys():
"""Return a map from a key's fingerprint to email address."""
keys = GnuPG.public_keys(conf.get_item('gpg', 'keyhome'))
for fingerprint in keys:
keys[fingerprint] = _sanitize_case_sense(keys[fingerprint])
keys = kcache.KeyCache()
keys.load_keyring(conf.get_item('gpg', 'keyhome'))
return keys
@ -167,7 +164,7 @@ class GpgRecipient:
return self._right
def _identify_gpg_recipients(recipients):
def _identify_gpg_recipients(recipients, keys: kcache.KeyCache):
# This list will be filled with pairs (M, N), where M is the destination
# address we're going to deliver the message to and N is the identity we're
# going to encrypt it for.
@ -182,7 +179,6 @@ def _identify_gpg_recipients(recipients):
strict_mode = conf.strict_mode()
# GnuPG keys found in our keyring.
keys = _load_keys()
LOG.info(f'Processisng recipients: {recipients!r}; keys: {keys!r}')
for to in recipients:
@ -239,12 +235,12 @@ def _try_direct_key_lookup(recipient, keys, strict_mode):
if strict_mode:
return None
if recipient in keys.values():
if keys.has_email(recipient):
LOG.info(f"Found key for {recipient}")
return recipient, recipient
(newto, topic) = text.parse_delimiter(recipient)
if newto in keys.values():
if keys.has_email(newto):
LOG.info(f"Found key for {newto}, stripped {recipient}")
return recipient, newto
@ -449,17 +445,6 @@ def _get_cert_for_email(to_addr, cert_path):
return _get_cert_for_email(fixed_up_email, cert_path)
def _sanitize_case_sense(address):
if conf.config_item_equals('default', 'mail_case_insensitive', 'yes'):
address = address.lower()
else:
splitted_address = address.split('@')
if len(splitted_address) > 1:
address = splitted_address[0] + '@' + splitted_address[1].lower()
return address
def _generate_message_from_payloads(payloads, message=None):
if message is None:
message = email.mime.multipart.MIMEMultipart(payloads.get_content_subtype())
@ -511,9 +496,9 @@ def _is_encrypted(raw_message):
return text.is_pgp_inline(first_payload)
def delivery_plan(recipients):
def delivery_plan(recipients, key_cache: kcache.KeyCache):
"""Generate a sequence of delivery strategies."""
gpg_to, ungpg_to = _identify_gpg_recipients(recipients)
gpg_to, ungpg_to = _identify_gpg_recipients(recipients, key_cache)
gpg_mime_to, gpg_mime_cmd, gpg_inline_to, gpg_inline_cmd = \
_sort_gpg_recipients(gpg_to)
@ -538,7 +523,7 @@ def deliver_message(raw_message: email.message.Message, from_address, to_addrs):
# Ugly workaround to keep the code working without too many changes.
from_addr = from_address
recipients_left = [_sanitize_case_sense(recipient) for recipient in to_addrs]
recipients_left = [text.sanitize_case_sense(recipient) for recipient in to_addrs]
# There is no need for nested encryption
LOG.debug("Seeing if it's already encrypted")

View File

@ -2,6 +2,8 @@ import sys
import re
import logging
import lacre.config as conf
# The standard way to encode line-ending in email:
EOL = "\r\n"
@ -47,6 +49,19 @@ def parse_delimiter(address: str):
return (address, None)
def sanitize_case_sense(address):
"""Sanitize email case."""
# TODO: find a way to make it more unit-testable
if conf.flag_enabled('default', 'mail_case_insensitive'):
address = address.lower()
else:
splitted_address = address.split('@')
if len(splitted_address) > 1:
address = splitted_address[0] + '@' + splitted_address[1].lower()
return address
def is_pgp_inline(payload) -> bool:
"""Find out if the payload (bytes) contains PGP/INLINE markers."""
return PGP_INLINE_BEGIN in payload and PGP_INLINE_END in payload

View File

@ -0,0 +1,42 @@
from lacre.keycache import KeyCache
import unittest
class LacreKeyCacheTest(unittest.TestCase):
def test_extend_keyring(self):
kc = KeyCache()
self.assertFalse('FINGERPRINT' in kc)
kc.extend_keyring({'FINGERPRINT': 'john.doe@example.com'})
self.assertTrue('FINGERPRINT' in kc)
def test_membership_methods(self):
kc = KeyCache()
kc.extend_keyring({
'FINGERPRINT': 'alice@example.com',
'OTHERPRINT': 'bob@example.com'
})
self.assertTrue('FINGERPRINT' in kc)
self.assertTrue(kc.has_email('bob@example.com'))
def test_keyring_replacement(self):
kc = KeyCache()
kc.extend_keyring({
'FINGERPRINT': 'alice@example.com',
'OTHERPRINT': 'bob@example.com'
})
self.assertTrue('FINGERPRINT' in kc)
kc.replace_keyring({
'FOO': 'foo@example.com',
'BAR': 'bar@example.com'
})
self.assertFalse('FINGERPRINT' in kc)
self.assertTrue('FOO' in kc)