Fix a bug introduced by refactoring, clean up code

- Fix certificate retrieval.

- Store recipients within MailOperation objects.

- Log more information.

- Fix some warnings.
This commit is contained in:
Piotr F. Mieszkowski 2022-09-12 22:32:54 +02:00 committed by Gitea
parent ce2e55e90c
commit ddcef93abb
4 changed files with 109 additions and 36 deletions

View file

@ -30,10 +30,10 @@ class MailEncryptionProxy:
"""Accept a message and either encrypt it or forward as-is."""
# for now, just return an error because we're not ready to handle mail
for recipient, operation in gate.delivery_plan(envelope.rcpt_tos):
LOG.debug(f"Sending mail to {recipient} via {operation}")
for operation in gate.delivery_plan(envelope.rcpt_tos):
LOG.debug(f"Sending mail via {operation}")
new_message = operation.perform(envelope.content)
gate.send_msg(new_message, [recipient], envelope.mail_from)
gate.send_msg(new_message, operation.recipients(), envelope.mail_from)
return RESULT_NOT_IMPLEMENTED
@ -48,11 +48,15 @@ def _init_controller():
def _validate_config():
missing = conf.validate_config()
if missing:
params = ", ".join([f"[{tup[0]}]{tup[1]}" for tup in missing])
params = ", ".join([_full_param_name(tup) for tup in missing])
LOG.error(f"Following mandatory parameters are missing: {params}")
sys.exit(lacre.EX_CONFIG)
def _full_param_name(tup):
return f"[{tup[0]}]{tup[1]}"
async def _sleep():
while True:
await asyncio.sleep(5)

View file

@ -47,6 +47,8 @@ def _gpg_encrypt(raw_message, recipients):
gpg_to, ungpg_to = _sort_gpg_recipients(recipients)
LOG.info(f"Got addresses: gpg_to={gpg_to!r}, ungpg_to={ungpg_to!r}")
if gpg_to:
LOG.info("Encrypting email to: %s" % ' '.join(x[0] for x in gpg_to))
@ -82,13 +84,7 @@ def _gpg_encrypt(raw_message, recipients):
# Encrypt mail with PGP/MIME
raw_message_mime = copy.deepcopy(raw_message)
if conf.config_item_equals('default', 'add_header', 'yes'):
raw_message_mime['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
if 'Content-Transfer-Encoding' in raw_message_mime:
raw_message_mime.replace_header('Content-Transfer-Encoding', '8BIT')
else:
raw_message_mime['Content-Transfer-Encoding'] = '8BIT'
_customise_headers(raw_message_mime)
encrypted_payloads = _encrypt_all_payloads_mime(raw_message_mime, gpg_to_cmdline_mime)
raw_message_mime.set_payload(encrypted_payloads)
@ -99,22 +95,30 @@ def _gpg_encrypt(raw_message, recipients):
# Encrypt mail with PGP/INLINE
raw_message_inline = copy.deepcopy(raw_message)
if conf.config_item_equals('default', 'add_header', 'yes'):
raw_message_inline['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
if 'Content-Transfer-Encoding' in raw_message_inline:
raw_message_inline.replace_header('Content-Transfer-Encoding', '8BIT')
else:
raw_message_inline['Content-Transfer-Encoding'] = '8BIT'
_customise_headers(raw_message_inline)
encrypted_payloads = _encrypt_all_payloads_inline(raw_message_inline, gpg_to_cmdline_inline)
raw_message_inline.set_payload(encrypted_payloads)
send_msg(raw_message_inline.as_string(), gpg_to_smtp_inline)
LOG.info(f"Not processed emails: {ungpg_to}")
return ungpg_to
def _customise_headers(message):
msg_copy = copy.deepcopy(message)
if conf.config_item_equals('default', 'add_header', 'yes'):
msg_copy['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
if 'Content-Transfer-Encoding' in msg_copy:
msg_copy.replace_header('Content-Transfer-Encoding', '8BIT')
else:
msg_copy['Content-Transfer-Encoding'] = '8BIT'
return
def _load_keys():
"""Return a map from a key's fingerprint to email address."""
keys = GnuPG.public_keys(conf.get_item('gpg', 'keyhome'))
@ -124,6 +128,28 @@ def _load_keys():
return keys
class GpgRecipient:
"""A tuple-like object that contains GPG recipient data."""
def __init__(self, left, right):
"""Initialise a tuple-like object that contains GPG recipient data."""
self._left = left
self._right = right
def __getitem__(self, index):
"""Pretend this object is a tuple by returning an indexed tuple element."""
if index == 0:
return self._left
elif index == 1:
return self._right
else:
raise IndexError()
def __repr__(self):
"""Return textual representation of this GPG Recipient."""
return f"GpgRecipient({self._left!r}, {self._right!r})"
def _sort_gpg_recipients(recipients):
# This list will be filled with pairs (M, N), where M is the destination
# address we're going to deliver the message to and N is the identity we're
@ -141,12 +167,25 @@ def _sort_gpg_recipients(recipients):
# GnuPG keys found in our keyring.
keys = _load_keys()
LOG.info(f'Processisng recipients: {recipients!r}')
for to in recipients:
key = _find_key(to, keys, strict_mode)
if key is not None:
gpg_to.append(key)
else:
ungpg_to.append(to)
LOG.info(f"At to={to!r}")
own_key = _try_configured_key(to, keys)
if own_key is not None:
gpg_to.append(GpgRecipient(own_key[0], own_key[1]))
continue
direct_key = _try_direct_key_lookup(to, keys, strict_mode)
if direct_key is not None:
gpg_to.append(GpgRecipient(direct_key[0], direct_key[1]))
continue
domain_key = _try_configured_domain_key(to, keys)
if domain_key is not None:
gpg_to.append(GpgRecipient(domain_key[0], domain_key[1]))
continue
ungpg_to.append((to, to))
return gpg_to, ungpg_to
@ -321,7 +360,7 @@ def _smime_encrypt(raw_message, recipients):
unsmime_to = list()
for addr in recipients:
cert_and_email = _get_cert_for_email(addr, cert_path)
cert_and_email = _get_cert_for_email(addr[0], cert_path)
if not (cert_and_email is None):
(to_cert, normal_email) = cert_and_email
@ -364,6 +403,8 @@ def _smime_encrypt(raw_message, recipients):
def _get_cert_for_email(to_addr, cert_path):
insensitive = conf.config_item_equals('default', 'mail_case_insensitive', 'yes')
LOG.info(f'Retrieving certificate for {to_addr!r} from {cert_path!r}, sensitivity={insensitive!r}')
files_in_directory = os.listdir(cert_path)
for filename in files_in_directory:
file_path = os.path.join(cert_path, filename)
@ -378,12 +419,15 @@ def _get_cert_for_email(to_addr, cert_path):
return (file_path, to_addr)
# support foo+ignore@bar.com -> foo@bar.com
LOG.info(f"An email with topic? {to_addr}")
(fixed_up_email, topic) = text.parse_delimiter(to_addr)
LOG.info(f'Got {fixed_up_email!r} and {topic!r}')
if topic is None:
# delimiter not used
LOG.info('Topic not found')
return None
else:
LOG.debug(f"Looking up certificate for {fixed_up_email} after parsing {to_addr}")
LOG.info(f"Looking up certificate for {fixed_up_email} after parsing {to_addr}")
return _get_cert_for_email(fixed_up_email, cert_path)
@ -452,7 +496,7 @@ def _is_encrypted(raw_message):
def delivery_plan(recipients):
"""Generate a sequence of pairs: a recipient and their delivery strategy."""
for recipient in recipients:
yield recipient, KeepIntact(recipient)
yield KeepIntact(recipient)
def deliver_message(raw_message, from_address, to_addrs):
@ -465,22 +509,26 @@ def deliver_message(raw_message, from_address, to_addrs):
recipients_left = [_sanitize_case_sense(recipient) for recipient in to_addrs]
# There is no need for nested encryption
LOG.debug("Seeing if it's already encrypted")
if _is_encrypted(raw_message):
LOG.debug("Message is already encrypted. Encryption aborted.")
send_msg(raw_message.as_string(), recipients_left)
return
# Encrypt mails for recipients with known public PGP keys
LOG.debug("Encrypting with OpenPGP")
recipients_left = _gpg_encrypt(raw_message, recipients_left)
if not recipients_left:
return
# Encrypt mails for recipients with known S/MIME certificate
LOG.debug("Encrypting with S/MIME")
recipients_left = _smime_encrypt(raw_message, recipients_left)
if not recipients_left:
return
# Send out mail to recipients which are left
LOG.debug("Sending the rest as text/plain")
send_msg(raw_message.as_string(), recipients_left)

View file

@ -23,9 +23,9 @@ LOG = logging.getLogger(__name__)
class MailOperation:
"""Contract for an operation to be performed on a message."""
def __init__(self, recipient):
def __init__(self, recipients=[]):
"""Initialise the operation with a recipient."""
self._recipient = recipient
self._recipients = recipients
def perform(self, message):
"""Perform this operation on MESSAGE.
@ -34,9 +34,13 @@ class MailOperation:
"""
raise NotImplementedError(self.__class__())
def recipient(self):
"""Return recipient of the message."""
return self._recipient
def recipients(self):
"""Return list of recipients of the message."""
return self._recipients
def add_recipient(self, recipient):
"""Register another message recipient."""
self._recipients.append(recipient)
class OpenPGPEncrypt(MailOperation):

View file

@ -1,5 +1,6 @@
import sys
import re
import logging
# The standard way to encode line-ending in email:
EOL = "\r\n"
@ -7,7 +8,15 @@ EOL = "\r\n"
PGP_INLINE_BEGIN = b"-----BEGIN PGP MESSAGE-----"
PGP_INLINE_END = b"-----END PGP MESSAGE-----"
LOG = logging.getLogger(__name__)
def parse_content_type(content_type):
"""Analyse Content-Type email header.
Return a pair: type and sub-type.
"""
parts = [p.strip() for p in content_type.split(';')]
if len(parts) == 1:
# No additional attributes provided. Use default encoding.
@ -16,20 +25,28 @@ def parse_content_type(content_type):
# At least one attribute provided. Find out if any of them is named
# 'charset' and if so, use it.
ctype = parts[0]
encoding = [p for p in parts[1:] if p.startswith('charset=') ]
encoding = [p for p in parts[1:] if p.startswith('charset=')]
if encoding:
eq_idx = encoding[0].index('=')
return (ctype, encoding[0][eq_idx+1:])
else:
return (ctype, sys.getdefaultencoding())
def parse_delimiter(address):
withdelim = re.match('^([^\+]+)\+([^@]+)@(.*)$', address)
def parse_delimiter(address: str):
"""Parse an email with delimiter and topic.
Return destination emaili and topic as a tuple.
"""
withdelim = re.match('^([^\\+]+)\\+([^@]+)@(.*)$', address)
LOG.debug(f'Parsed email: {withdelim!r}')
if withdelim:
return (withdelim.group(1) + '@' + withdelim.group(3), withdelim.group(2))
else:
return (address, None)
def is_pgp_inline(payload):
"""Finds out if the payload (bytes) contains PGP/INLINE markers."""
def is_pgp_inline(payload) -> bool:
"""Find out if the payload (bytes) contains PGP/INLINE markers."""
return PGP_INLINE_BEGIN in payload and PGP_INLINE_END in payload