Refactor into smaller functions and objects

This commit is contained in:
Piotr F. Mieszkowski 2022-09-17 14:22:22 +02:00 committed by Gitea
parent ddcef93abb
commit d01865d21c
2 changed files with 80 additions and 59 deletions

View File

@ -34,7 +34,7 @@ from M2Crypto import BIO, SMIME, X509
import logging
import lacre.text as text
import lacre.config as conf
from lacre.mailop import KeepIntact
from lacre.mailop import KeepIntact, OpenPGPEncrypt
LOG = logging.getLogger(__name__)
@ -45,69 +45,77 @@ def _gpg_encrypt(raw_message, recipients):
LOG.error("No valid entry for gpg keyhome. Encryption aborted.")
return recipients
gpg_to, ungpg_to = _sort_gpg_recipients(recipients)
# TODO: gpg_to contains objects, not tuples, so we need to access them
# appropriately
gpg_to, ungpg_to = _identify_gpg_recipients(recipients)
LOG.info(f"Got addresses: gpg_to={gpg_to!r}, ungpg_to={ungpg_to!r}")
if gpg_to:
LOG.info("Encrypting email to: %s" % ' '.join(x[0] for x in gpg_to))
LOG.info("Encrypting email to: %s" % ' '.join(x.email() for x in gpg_to))
# Getting PGP style for recipient
gpg_to_smtp_mime = list()
gpg_to_cmdline_mime = list()
gpg_to_smtp_inline = list()
gpg_to_cmdline_inline = list()
for rcpt in gpg_to:
# Checking pre defined styles in settings first
if conf.config_item_equals('pgp_style', rcpt[0], 'mime'):
gpg_to_smtp_mime.append(rcpt[0])
gpg_to_cmdline_mime.extend(rcpt[1].split(','))
elif conf.config_item_equals('pgp_style', rcpt[0], 'inline'):
gpg_to_smtp_inline.append(rcpt[0])
gpg_to_cmdline_inline.extend(rcpt[1].split(','))
else:
# Log message only if an unknown style is defined
if conf.config_item_set('pgp_style', rcpt[0]):
LOG.debug("Style %s for recipient %s is not known. Use default as fallback." % (conf.get_item("pgp_style", rcpt[0]), rcpt[0]))
# If no style is in settings defined for recipient, use default from settings
if conf.config_item_equals('default', 'mime_conversion', 'yes'):
gpg_to_smtp_mime.append(rcpt[0])
gpg_to_cmdline_mime.extend(rcpt[1].split(','))
else:
gpg_to_smtp_inline.append(rcpt[0])
gpg_to_cmdline_inline.extend(rcpt[1].split(','))
gpg_to_smtp_mime, gpg_to_cmdline_mime, \
gpg_to_smtp_inline, gpg_to_cmdline_inline = \
_sort_gpg_recipients(gpg_to)
if gpg_to_smtp_mime:
# Encrypt mail with PGP/MIME
raw_message_mime = copy.deepcopy(raw_message)
_customise_headers(raw_message_mime)
encrypted_payloads = _encrypt_all_payloads_mime(raw_message_mime, gpg_to_cmdline_mime)
raw_message_mime.set_payload(encrypted_payloads)
send_msg(raw_message_mime.as_string(), gpg_to_smtp_mime)
_gpg_encrypt_and_deliver(raw_message,
gpg_to_cmdline_mime, gpg_to_smtp_mime,
_encrypt_all_payloads_mime)
if gpg_to_smtp_inline:
# Encrypt mail with PGP/INLINE
raw_message_inline = copy.deepcopy(raw_message)
_customise_headers(raw_message_inline)
encrypted_payloads = _encrypt_all_payloads_inline(raw_message_inline, gpg_to_cmdline_inline)
raw_message_inline.set_payload(encrypted_payloads)
send_msg(raw_message_inline.as_string(), gpg_to_smtp_inline)
_gpg_encrypt_and_deliver(raw_message,
gpg_to_cmdline_inline, gpg_to_smtp_inline,
_encrypt_all_payloads_inline)
LOG.info(f"Not processed emails: {ungpg_to}")
return ungpg_to
def _customise_headers(message):
def _sort_gpg_recipients(gpg_to):
gpg_to_smtp_mime = list()
gpg_to_cmdline_mime = list()
gpg_to_smtp_inline = list()
gpg_to_cmdline_inline = list()
default_to_pgp_mime = conf.config_item_equals('default', 'mime_conversion', 'yes')
for rcpt in gpg_to:
# Checking pre defined styles in settings first
if conf.config_item_equals('pgp_style', rcpt.email(), 'mime'):
gpg_to_smtp_mime.append(rcpt.email())
gpg_to_cmdline_mime.extend(rcpt.key().split(','))
elif conf.config_item_equals('pgp_style', rcpt.email(), 'inline'):
gpg_to_smtp_inline.append(rcpt.email())
gpg_to_cmdline_inline.extend(rcpt.key().split(','))
else:
# Log message only if an unknown style is defined
if conf.config_item_set('pgp_style', rcpt[0]):
LOG.debug("Style %s for recipient %s is not known. Use default as fallback." % (conf.get_item("pgp_style", rcpt[0]), rcpt[0]))
# If no style is in settings defined for recipient, use default from settings
if default_to_pgp_mime:
gpg_to_smtp_mime.append(rcpt[0])
gpg_to_cmdline_mime.extend(rcpt[1].split(','))
else:
gpg_to_smtp_inline.append(rcpt[0])
gpg_to_cmdline_inline.extend(rcpt[1].split(','))
return gpg_to_smtp_mime, gpg_to_cmdline_mime, gpg_to_smtp_inline, gpg_to_cmdline_inline
def _gpg_encrypt_and_deliver(message, cmdline, to, encrypt_f):
msg_copy = copy.deepcopy(message)
_customise_headers(msg_copy)
encrypted_payloads = encrypt_f(msg_copy, cmdline)
msg_copy.set_payload(encrypted_payloads)
send_msg(msg_copy.as_string(), to)
def _customise_headers(msg_copy):
if conf.config_item_equals('default', 'add_header', 'yes'):
msg_copy['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
@ -116,8 +124,6 @@ def _customise_headers(message):
else:
msg_copy['Content-Transfer-Encoding'] = '8BIT'
return
def _load_keys():
"""Return a map from a key's fingerprint to email address."""
@ -149,8 +155,14 @@ class GpgRecipient:
"""Return textual representation of this GPG Recipient."""
return f"GpgRecipient({self._left!r}, {self._right!r})"
def email(self):
return self._left
def _sort_gpg_recipients(recipients):
def key(self):
return self._right
def _identify_gpg_recipients(recipients):
# This list will be filled with pairs (M, N), where M is the destination
# address we're going to deliver the message to and N is the identity we're
# going to encrypt it for.
@ -187,6 +199,7 @@ def _sort_gpg_recipients(recipients):
ungpg_to.append((to, to))
LOG.debug(f'Collected recipients; GPG: {gpg_to}; UnGPG: {ungpg_to}')
return gpg_to, ungpg_to
@ -494,9 +507,17 @@ def _is_encrypted(raw_message):
def delivery_plan(recipients):
"""Generate a sequence of pairs: a recipient and their delivery strategy."""
for recipient in recipients:
yield KeepIntact(recipient)
"""Generate a sequence of delivery strategies."""
gpg_to, ungpg_to = _identify_gpg_recipients(recipients)
gpg_mime_to, gpg_mime_cmd, gpg_inline_to, gpg_inline_cmd = \
_sort_gpg_recipients(gpg_to)
keyhome = conf.get_item('gpg', 'keyhome')
return [OpenPGPEncrypt(gpg_mime_to, gpg_mime_cmd, keyhome),
OpenPGPEncrypt(gpg_inline_to, gpg_inline_cmd, keyhome),
KeepIntact(ungpg_to)]
def deliver_message(raw_message, from_address, to_addrs):

View File

@ -65,7 +65,7 @@ class OpenPGPEncrypt(MailOperation):
def __repr__(self):
"""Generate a representation with just method and key."""
return f"<OpenPGP {self._recipient} {self._key}>"
return f"<OpenPGP {self._recipients} {self._key}>"
class SMimeEncrypt(MailOperation):
@ -79,12 +79,12 @@ class SMimeEncrypt(MailOperation):
def perform(self, message):
"""Encrypt with a certificate."""
LOG.warning(f"Delivering clear-text to {self._recipient}")
LOG.warning(f"Delivering clear-text to {self._recipients}")
return message
def __repr__(self):
"""Generate a representation with just method and key."""
return f"<S/MIME {self._recipient}, {self._cert}>"
return f"<S/MIME {self._recipients}, {self._cert}>"
class KeepIntact(MailOperation):
@ -93,9 +93,9 @@ class KeepIntact(MailOperation):
This operation should be used for mail that's already encrypted.
"""
def __init__(self, recipient):
def __init__(self, recipients):
"""Initialise pass-through operation for a given recipient."""
super().__init__(recipient)
super().__init__(recipients)
def perform(self, message):
"""Return MESSAGE unmodified."""
@ -103,4 +103,4 @@ class KeepIntact(MailOperation):
def __repr__(self):
"""Return representation with just method and email."""
return f"<KeepIntact {self._recipient}>"
return f"<KeepIntact {self._recipients}>"