From d01865d21ce4ee3fe663191852ca6a2c0eed6bc3 Mon Sep 17 00:00:00 2001 From: "Piotr F. Mieszkowski" Date: Sat, 17 Sep 2022 14:22:22 +0200 Subject: [PATCH] Refactor into smaller functions and objects --- lacre/mailgate.py | 127 +++++++++++++++++++++++++++------------------- lacre/mailop.py | 12 ++--- 2 files changed, 80 insertions(+), 59 deletions(-) diff --git a/lacre/mailgate.py b/lacre/mailgate.py index 2fa727a..0d10fe0 100644 --- a/lacre/mailgate.py +++ b/lacre/mailgate.py @@ -34,7 +34,7 @@ from M2Crypto import BIO, SMIME, X509 import logging import lacre.text as text import lacre.config as conf -from lacre.mailop import KeepIntact +from lacre.mailop import KeepIntact, OpenPGPEncrypt LOG = logging.getLogger(__name__) @@ -45,69 +45,77 @@ def _gpg_encrypt(raw_message, recipients): LOG.error("No valid entry for gpg keyhome. Encryption aborted.") return recipients - gpg_to, ungpg_to = _sort_gpg_recipients(recipients) + # TODO: gpg_to contains objects, not tuples, so we need to access them + # appropriately + gpg_to, ungpg_to = _identify_gpg_recipients(recipients) LOG.info(f"Got addresses: gpg_to={gpg_to!r}, ungpg_to={ungpg_to!r}") if gpg_to: - LOG.info("Encrypting email to: %s" % ' '.join(x[0] for x in gpg_to)) + LOG.info("Encrypting email to: %s" % ' '.join(x.email() for x in gpg_to)) - # Getting PGP style for recipient - gpg_to_smtp_mime = list() - gpg_to_cmdline_mime = list() - - gpg_to_smtp_inline = list() - gpg_to_cmdline_inline = list() - - for rcpt in gpg_to: - # Checking pre defined styles in settings first - if conf.config_item_equals('pgp_style', rcpt[0], 'mime'): - gpg_to_smtp_mime.append(rcpt[0]) - gpg_to_cmdline_mime.extend(rcpt[1].split(',')) - elif conf.config_item_equals('pgp_style', rcpt[0], 'inline'): - gpg_to_smtp_inline.append(rcpt[0]) - gpg_to_cmdline_inline.extend(rcpt[1].split(',')) - else: - # Log message only if an unknown style is defined - if conf.config_item_set('pgp_style', rcpt[0]): - LOG.debug("Style %s for recipient %s is not known. Use default as fallback." % (conf.get_item("pgp_style", rcpt[0]), rcpt[0])) - - # If no style is in settings defined for recipient, use default from settings - if conf.config_item_equals('default', 'mime_conversion', 'yes'): - gpg_to_smtp_mime.append(rcpt[0]) - gpg_to_cmdline_mime.extend(rcpt[1].split(',')) - else: - gpg_to_smtp_inline.append(rcpt[0]) - gpg_to_cmdline_inline.extend(rcpt[1].split(',')) + gpg_to_smtp_mime, gpg_to_cmdline_mime, \ + gpg_to_smtp_inline, gpg_to_cmdline_inline = \ + _sort_gpg_recipients(gpg_to) if gpg_to_smtp_mime: # Encrypt mail with PGP/MIME - raw_message_mime = copy.deepcopy(raw_message) - - _customise_headers(raw_message_mime) - - encrypted_payloads = _encrypt_all_payloads_mime(raw_message_mime, gpg_to_cmdline_mime) - raw_message_mime.set_payload(encrypted_payloads) - - send_msg(raw_message_mime.as_string(), gpg_to_smtp_mime) + _gpg_encrypt_and_deliver(raw_message, + gpg_to_cmdline_mime, gpg_to_smtp_mime, + _encrypt_all_payloads_mime) if gpg_to_smtp_inline: # Encrypt mail with PGP/INLINE - raw_message_inline = copy.deepcopy(raw_message) - - _customise_headers(raw_message_inline) - - encrypted_payloads = _encrypt_all_payloads_inline(raw_message_inline, gpg_to_cmdline_inline) - raw_message_inline.set_payload(encrypted_payloads) - - send_msg(raw_message_inline.as_string(), gpg_to_smtp_inline) + _gpg_encrypt_and_deliver(raw_message, + gpg_to_cmdline_inline, gpg_to_smtp_inline, + _encrypt_all_payloads_inline) LOG.info(f"Not processed emails: {ungpg_to}") return ungpg_to -def _customise_headers(message): +def _sort_gpg_recipients(gpg_to): + gpg_to_smtp_mime = list() + gpg_to_cmdline_mime = list() + + gpg_to_smtp_inline = list() + gpg_to_cmdline_inline = list() + + default_to_pgp_mime = conf.config_item_equals('default', 'mime_conversion', 'yes') + + for rcpt in gpg_to: + # Checking pre defined styles in settings first + if conf.config_item_equals('pgp_style', rcpt.email(), 'mime'): + gpg_to_smtp_mime.append(rcpt.email()) + gpg_to_cmdline_mime.extend(rcpt.key().split(',')) + elif conf.config_item_equals('pgp_style', rcpt.email(), 'inline'): + gpg_to_smtp_inline.append(rcpt.email()) + gpg_to_cmdline_inline.extend(rcpt.key().split(',')) + else: + # Log message only if an unknown style is defined + if conf.config_item_set('pgp_style', rcpt[0]): + LOG.debug("Style %s for recipient %s is not known. Use default as fallback." % (conf.get_item("pgp_style", rcpt[0]), rcpt[0])) + + # If no style is in settings defined for recipient, use default from settings + if default_to_pgp_mime: + gpg_to_smtp_mime.append(rcpt[0]) + gpg_to_cmdline_mime.extend(rcpt[1].split(',')) + else: + gpg_to_smtp_inline.append(rcpt[0]) + gpg_to_cmdline_inline.extend(rcpt[1].split(',')) + + return gpg_to_smtp_mime, gpg_to_cmdline_mime, gpg_to_smtp_inline, gpg_to_cmdline_inline + + +def _gpg_encrypt_and_deliver(message, cmdline, to, encrypt_f): msg_copy = copy.deepcopy(message) + _customise_headers(msg_copy) + encrypted_payloads = encrypt_f(msg_copy, cmdline) + msg_copy.set_payload(encrypted_payloads) + send_msg(msg_copy.as_string(), to) + + +def _customise_headers(msg_copy): if conf.config_item_equals('default', 'add_header', 'yes'): msg_copy['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate' @@ -116,8 +124,6 @@ def _customise_headers(message): else: msg_copy['Content-Transfer-Encoding'] = '8BIT' - return - def _load_keys(): """Return a map from a key's fingerprint to email address.""" @@ -149,8 +155,14 @@ class GpgRecipient: """Return textual representation of this GPG Recipient.""" return f"GpgRecipient({self._left!r}, {self._right!r})" + def email(self): + return self._left -def _sort_gpg_recipients(recipients): + def key(self): + return self._right + + +def _identify_gpg_recipients(recipients): # This list will be filled with pairs (M, N), where M is the destination # address we're going to deliver the message to and N is the identity we're # going to encrypt it for. @@ -187,6 +199,7 @@ def _sort_gpg_recipients(recipients): ungpg_to.append((to, to)) + LOG.debug(f'Collected recipients; GPG: {gpg_to}; UnGPG: {ungpg_to}') return gpg_to, ungpg_to @@ -494,9 +507,17 @@ def _is_encrypted(raw_message): def delivery_plan(recipients): - """Generate a sequence of pairs: a recipient and their delivery strategy.""" - for recipient in recipients: - yield KeepIntact(recipient) + """Generate a sequence of delivery strategies.""" + gpg_to, ungpg_to = _identify_gpg_recipients(recipients) + + gpg_mime_to, gpg_mime_cmd, gpg_inline_to, gpg_inline_cmd = \ + _sort_gpg_recipients(gpg_to) + + keyhome = conf.get_item('gpg', 'keyhome') + + return [OpenPGPEncrypt(gpg_mime_to, gpg_mime_cmd, keyhome), + OpenPGPEncrypt(gpg_inline_to, gpg_inline_cmd, keyhome), + KeepIntact(ungpg_to)] def deliver_message(raw_message, from_address, to_addrs): diff --git a/lacre/mailop.py b/lacre/mailop.py index 51e3c33..179b05b 100644 --- a/lacre/mailop.py +++ b/lacre/mailop.py @@ -65,7 +65,7 @@ class OpenPGPEncrypt(MailOperation): def __repr__(self): """Generate a representation with just method and key.""" - return f"" + return f"" class SMimeEncrypt(MailOperation): @@ -79,12 +79,12 @@ class SMimeEncrypt(MailOperation): def perform(self, message): """Encrypt with a certificate.""" - LOG.warning(f"Delivering clear-text to {self._recipient}") + LOG.warning(f"Delivering clear-text to {self._recipients}") return message def __repr__(self): """Generate a representation with just method and key.""" - return f"" + return f"" class KeepIntact(MailOperation): @@ -93,9 +93,9 @@ class KeepIntact(MailOperation): This operation should be used for mail that's already encrypted. """ - def __init__(self, recipient): + def __init__(self, recipients): """Initialise pass-through operation for a given recipient.""" - super().__init__(recipient) + super().__init__(recipients) def perform(self, message): """Return MESSAGE unmodified.""" @@ -103,4 +103,4 @@ class KeepIntact(MailOperation): def __repr__(self): """Return representation with just method and email.""" - return f"" + return f""