Wrap recipient lists

Instead of passing pairs of lists (emails and keys) separately, implement a
class RecipientList to wrap such pair of lists.
This commit is contained in:
Piotr F. Mieszkowski 2023-03-26 14:26:55 +02:00
parent c5e788b2a0
commit a5f79c1ae7
1 changed files with 139 additions and 125 deletions

View File

@ -33,6 +33,7 @@ import GnuPG
import os
import smtplib
import asyncio
from typing import Tuple
# imports for S/MIME
from M2Crypto import BIO, SMIME, X509
@ -47,120 +48,6 @@ from lacre.mailop import KeepIntact, InlineOpenPGPEncrypt, MimeOpenPGPEncrypt
LOG = logging.getLogger(__name__)
def _gpg_encrypt(raw_message, recipients):
if not conf.config_item_set('gpg', 'keyhome'):
LOG.error("No valid entry for gpg keyhome. Encryption aborted.")
return recipients
gpg_recipients, cleartext_recipients = _identify_gpg_recipients(recipients, _load_keys())
LOG.info(f"Got addresses: gpg_to={gpg_recipients!r}, ungpg_to={cleartext_recipients!r}")
if gpg_recipients:
LOG.info("Encrypting email to: %s", gpg_recipients)
recipients_mime, keys_mime, recipients_inline, keys_inline = \
_sort_gpg_recipients(gpg_recipients)
if recipients_mime:
# Encrypt mail with PGP/MIME
_gpg_encrypt_and_deliver(raw_message,
keys_mime, recipients_mime,
_encrypt_all_payloads_mime)
if recipients_inline:
# Encrypt mail with PGP/INLINE
_gpg_encrypt_and_deliver(raw_message,
keys_inline, recipients_inline,
_encrypt_all_payloads_inline)
LOG.info('Not processed emails: %s', cleartext_recipients)
return cleartext_recipients
def _sort_gpg_recipients(gpg_to):
mime = RecipientList()
inline = RecipientList()
recipients_mime = list()
keys_mime = list()
recipients_inline = list()
keys_inline = list()
default_to_pgp_mime = conf.flag_enabled('default', 'mime_conversion')
for rcpt in gpg_to:
# Checking pre defined styles in settings first
if conf.config_item_equals('pgp_style', rcpt.email(), 'mime'):
recipients_mime.append(rcpt.email())
keys_mime.extend(rcpt.key().split(','))
mime += rcpt
elif conf.config_item_equals('pgp_style', rcpt.email(), 'inline'):
recipients_inline.append(rcpt.email())
keys_inline.extend(rcpt.key().split(','))
inline += rcpt
else:
# Log message only if an unknown style is defined
if conf.config_item_set('pgp_style', rcpt.email()):
LOG.debug("Style %s for recipient %s is not known. Use default as fallback."
% (conf.get_item("pgp_style", rcpt.email()), rcpt.email()))
# If no style is in settings defined for recipient, use default from settings
if default_to_pgp_mime:
recipients_mime.append(rcpt.email())
keys_mime.extend(rcpt.key().split(','))
mime += rcpt
else:
recipients_inline.append(rcpt.email())
keys_inline.extend(rcpt.key().split(','))
inline += rcpt
# mime = RecipientList(recipients_mime, keys_mime)
# inline = RecipientList(recipients_inline, keys_inline)
return recipients_mime, keys_mime, recipients_inline, keys_inline
def _gpg_encrypt_copy(message: EmailMessage, keys, recipients, encrypt_f):
msg_copy = copy.deepcopy(message)
_customise_headers(msg_copy)
encrypted_payloads = encrypt_f(msg_copy, keys)
msg_copy.set_payload(encrypted_payloads)
return msg_copy
def _gpg_encrypt_to_bytes(message: EmailMessage, keys, recipients, encrypt_f) -> bytes:
msg_copy = _gpg_encrypt_copy(message, keys, recipients, encrypt_f)
return msg_copy.as_bytes(policy=SMTPUTF8)
def _gpg_encrypt_to_str(message: EmailMessage, keys, recipients, encrypt_f) -> str:
msg_copy = _gpg_encrypt_copy(message, keys, recipients, encrypt_f)
return msg_copy.as_string(policy=SMTPUTF8)
def _gpg_encrypt_and_deliver(message: EmailMessage, keys, recipients, encrypt_f):
out = _gpg_encrypt_to_str(message, keys, recipients, encrypt_f)
send_msg(out, recipients)
def _customise_headers(message: EmailMessage):
if conf.config_item_equals('default', 'add_header', 'yes'):
message['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
if 'Content-Transfer-Encoding' in message:
message.replace_header('Content-Transfer-Encoding', '8BIT')
else:
message['Content-Transfer-Encoding'] = '8BIT'
def _load_keys():
"""Return a map from a key's fingerprint to email address."""
keyring = kcache.KeyRing(conf.get_item('gpg', 'keyhome'))
return asyncio.run(keyring.freeze_identities())
class GpgRecipient:
"""A tuple-like object that contains GPG recipient data."""
@ -199,23 +86,151 @@ class RecipientList:
def __init__(self, recipients=[], keys=[]):
"""Initialise lists of recipients and identities."""
self._recipients = recipients
self._recipients = [GpgRecipient(email, key) for (email, key) in zip(recipients, keys)]
def emails(self):
"""Return list of recipients."""
for r in self._recipients:
yield r.email()
LOG.debug('Recipient emails from: %s', self._recipients)
return [r.email() for r in self._recipients]
def keys(self):
"""Return list of GPG identities."""
for r in self._recipients:
yield r.key()
LOG.debug('Recipient keys from: %s', self._recipients)
return [r.key() for r in self._recipients]
def __iadd__(self, recipient: GpgRecipient):
"""Append a recipient."""
LOG.debug('Adding %s to %s', recipient, self._recipients)
self._recipients.append(recipient)
LOG.debug('Added; got: %s', self._recipients)
return self
def __len__(self):
"""Provide len().
With this method, it is possible to write code like:
rl = RecipientList()
if rl:
# do something
"""
return len(self._recipients)
def _gpg_encrypt(raw_message, recipients):
if not conf.config_item_set('gpg', 'keyhome'):
LOG.error("No valid entry for gpg keyhome. Encryption aborted.")
return recipients
gpg_recipients, cleartext_recipients = _identify_gpg_recipients(recipients, _load_keys())
LOG.info(f"Got addresses: gpg_to={gpg_recipients!r}, ungpg_to={cleartext_recipients!r}")
if gpg_recipients:
LOG.info("Encrypting email to: %s", gpg_recipients)
mime, inline = _sort_gpg_recipients(gpg_recipients)
if mime:
# Encrypt mail with PGP/MIME
_gpg_encrypt_and_deliver(raw_message,
mime.keys(), mime.emails(),
_encrypt_all_payloads_mime)
if inline:
# Encrypt mail with PGP/INLINE
_gpg_encrypt_and_deliver(raw_message,
inline.keys(), inline.emails(),
_encrypt_all_payloads_inline)
LOG.info('Not processed emails: %s', cleartext_recipients)
return cleartext_recipients
def _sort_gpg_recipients(gpg_to) -> Tuple[RecipientList, RecipientList]:
mime = RecipientList()
inline = RecipientList()
recipients_mime = list()
keys_mime = list()
recipients_inline = list()
keys_inline = list()
default_to_pgp_mime = conf.flag_enabled('default', 'mime_conversion')
for rcpt in gpg_to:
# Checking pre defined styles in settings first
if conf.config_item_equals('pgp_style', rcpt.email(), 'mime'):
recipients_mime.append(rcpt.email())
keys_mime.extend(rcpt.key().split(','))
mime += rcpt
elif conf.config_item_equals('pgp_style', rcpt.email(), 'inline'):
recipients_inline.append(rcpt.email())
keys_inline.extend(rcpt.key().split(','))
inline += rcpt
else:
# Log message only if an unknown style is defined
if conf.config_item_set('pgp_style', rcpt.email()):
LOG.debug("Style %s for recipient %s is not known. Use default as fallback."
% (conf.get_item("pgp_style", rcpt.email()), rcpt.email()))
# If no style is in settings defined for recipient, use default from settings
if default_to_pgp_mime:
recipients_mime.append(rcpt.email())
keys_mime.extend(rcpt.key().split(','))
mime += rcpt
else:
recipients_inline.append(rcpt.email())
keys_inline.extend(rcpt.key().split(','))
inline += rcpt
mime = RecipientList(recipients_mime, keys_mime)
inline = RecipientList(recipients_inline, keys_inline)
LOG.debug('Loaded recipients: MIME %s; Inline %s', repr(mime), repr(inline))
return mime, inline
def _gpg_encrypt_copy(message: EmailMessage, keys, recipients, encrypt_f):
msg_copy = copy.deepcopy(message)
_customise_headers(msg_copy)
encrypted_payloads = encrypt_f(msg_copy, keys)
msg_copy.set_payload(encrypted_payloads)
return msg_copy
def _gpg_encrypt_to_bytes(message: EmailMessage, keys, recipients, encrypt_f) -> bytes:
msg_copy = _gpg_encrypt_copy(message, keys, recipients, encrypt_f)
return msg_copy.as_bytes(policy=SMTPUTF8)
def _gpg_encrypt_to_str(message: EmailMessage, keys, recipients, encrypt_f) -> str:
msg_copy = _gpg_encrypt_copy(message, keys, recipients, encrypt_f)
return msg_copy.as_string(policy=SMTPUTF8)
def _gpg_encrypt_and_deliver(message: EmailMessage, keys, recipients, encrypt_f):
out = _gpg_encrypt_to_str(message, keys, recipients, encrypt_f)
send_msg(out, recipients)
def _customise_headers(message: EmailMessage):
if conf.config_item_equals('default', 'add_header', 'yes'):
message['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
if 'Content-Transfer-Encoding' in message:
message.replace_header('Content-Transfer-Encoding', '8BIT')
else:
message['Content-Transfer-Encoding'] = '8BIT'
def _load_keys():
"""Return a map from a key's fingerprint to email address."""
keyring = kcache.KeyRing(conf.get_item('gpg', 'keyhome'))
return asyncio.run(keyring.freeze_identities())
def _identify_gpg_recipients(recipients, keys: kcache.KeyCache):
# This list will be filled with pairs (M, N), where M is the destination
@ -609,16 +624,15 @@ def delivery_plan(recipients, message: EmailMessage, key_cache: kcache.KeyCache)
gpg_recipients, cleartext_recipients = _identify_gpg_recipients(recipients, key_cache)
mime_recipients, mime_keys, inline_recipients, inline_keys = \
_sort_gpg_recipients(gpg_recipients)
mime, inline = _sort_gpg_recipients(gpg_recipients)
keyhome = conf.get_item('gpg', 'keyhome')
plan = []
if mime_recipients:
plan.append(MimeOpenPGPEncrypt(mime_recipients, mime_keys, keyhome))
if inline_recipients:
plan.append(InlineOpenPGPEncrypt(inline_recipients, inline_keys, keyhome))
if mime:
plan.append(MimeOpenPGPEncrypt(mime.emails(), mime.keys(), keyhome))
if inline:
plan.append(InlineOpenPGPEncrypt(inline.emails(), inline.keys(), keyhome))
if cleartext_recipients:
plan.append(KeepIntact(cleartext_recipients))