Polish the code

This commit is contained in:
Piotr F. Mieszkowski 2023-03-05 08:51:50 +01:00
parent ffd5f08ad9
commit 603a88489e
2 changed files with 82 additions and 83 deletions

View File

@ -26,7 +26,7 @@ module.
from email.mime.multipart import MIMEMultipart
import copy
import email
import email.message
from email.message import EmailMessage, MIMEPart
import email.utils
from email.policy import SMTPUTF8
import GnuPG
@ -52,50 +52,49 @@ def _gpg_encrypt(raw_message, recipients):
LOG.error("No valid entry for gpg keyhome. Encryption aborted.")
return recipients
gpg_to, ungpg_to = _identify_gpg_recipients(recipients, _load_keys())
gpg_recipients, cleartext_recipients = _identify_gpg_recipients(recipients, _load_keys())
LOG.info(f"Got addresses: gpg_to={gpg_to!r}, ungpg_to={ungpg_to!r}")
LOG.info(f"Got addresses: gpg_to={gpg_recipients!r}, ungpg_to={cleartext_recipients!r}")
if gpg_to:
LOG.info("Encrypting email to: %s" % ' '.join(x.email() for x in gpg_to))
if gpg_recipients:
LOG.info("Encrypting email to: %s", gpg_recipients)
gpg_to_smtp_mime, gpg_to_cmdline_mime, \
gpg_to_smtp_inline, gpg_to_cmdline_inline = \
_sort_gpg_recipients(gpg_to)
recipients_mime, keys_mime, recipients_inline, keys_inline = \
_sort_gpg_recipients(gpg_recipients)
if gpg_to_smtp_mime:
if recipients_mime:
# Encrypt mail with PGP/MIME
_gpg_encrypt_and_deliver(raw_message,
gpg_to_cmdline_mime, gpg_to_smtp_mime,
keys_mime, recipients_mime,
_encrypt_all_payloads_mime)
if gpg_to_smtp_inline:
if recipients_inline:
# Encrypt mail with PGP/INLINE
_gpg_encrypt_and_deliver(raw_message,
gpg_to_cmdline_inline, gpg_to_smtp_inline,
keys_inline, recipients_inline,
_encrypt_all_payloads_inline)
LOG.info(f"Not processed emails: {ungpg_to}")
return ungpg_to
LOG.info('Not processed emails: %s', cleartext_recipients)
return cleartext_recipients
def _sort_gpg_recipients(gpg_to):
gpg_to_smtp_mime = list()
gpg_to_cmdline_mime = list()
recipients_mime = list()
keys_mime = list()
gpg_to_smtp_inline = list()
gpg_to_cmdline_inline = list()
recipients_inline = list()
keys_inline = list()
default_to_pgp_mime = conf.config_item_equals('default', 'mime_conversion', 'yes')
default_to_pgp_mime = conf.flag_enabled('default', 'mime_conversion')
for rcpt in gpg_to:
# Checking pre defined styles in settings first
if conf.config_item_equals('pgp_style', rcpt.email(), 'mime'):
gpg_to_smtp_mime.append(rcpt.email())
gpg_to_cmdline_mime.extend(rcpt.key().split(','))
recipients_mime.append(rcpt.email())
keys_mime.extend(rcpt.key().split(','))
elif conf.config_item_equals('pgp_style', rcpt.email(), 'inline'):
gpg_to_smtp_inline.append(rcpt.email())
gpg_to_cmdline_inline.extend(rcpt.key().split(','))
recipients_inline.append(rcpt.email())
keys_inline.extend(rcpt.key().split(','))
else:
# Log message only if an unknown style is defined
if conf.config_item_set('pgp_style', rcpt.email()):
@ -104,46 +103,46 @@ def _sort_gpg_recipients(gpg_to):
# If no style is in settings defined for recipient, use default from settings
if default_to_pgp_mime:
gpg_to_smtp_mime.append(rcpt.email())
gpg_to_cmdline_mime.extend(rcpt.key().split(','))
recipients_mime.append(rcpt.email())
keys_mime.extend(rcpt.key().split(','))
else:
gpg_to_smtp_inline.append(rcpt.email())
gpg_to_cmdline_inline.extend(rcpt.key().split(','))
recipients_inline.append(rcpt.email())
keys_inline.extend(rcpt.key().split(','))
return gpg_to_smtp_mime, gpg_to_cmdline_mime, gpg_to_smtp_inline, gpg_to_cmdline_inline
return recipients_mime, keys_mime, recipients_inline, keys_inline
def _gpg_encrypt_copy(message: email.message.EmailMessage, cmdline, to, encrypt_f):
def _gpg_encrypt_copy(message: EmailMessage, keys, recipients, encrypt_f):
msg_copy = copy.deepcopy(message)
_customise_headers(msg_copy)
encrypted_payloads = encrypt_f(msg_copy, cmdline)
encrypted_payloads = encrypt_f(msg_copy, keys)
msg_copy.set_payload(encrypted_payloads)
return msg_copy
def _gpg_encrypt_to_bytes(message: email.message.EmailMessage, cmdline, to, encrypt_f) -> bytes:
msg_copy = _gpg_encrypt_copy(message, cmdline, to, encrypt_f)
def _gpg_encrypt_to_bytes(message: EmailMessage, keys, recipients, encrypt_f) -> bytes:
msg_copy = _gpg_encrypt_copy(message, keys, recipients, encrypt_f)
return msg_copy.as_bytes(policy=SMTPUTF8)
def _gpg_encrypt_to_str(message: email.message.EmailMessage, cmdline, to, encrypt_f) -> str:
msg_copy = _gpg_encrypt_copy(message, cmdline, to, encrypt_f)
return msg_copy.as_string()
def _gpg_encrypt_to_str(message: EmailMessage, keys, recipients, encrypt_f) -> str:
msg_copy = _gpg_encrypt_copy(message, keys, recipients, encrypt_f)
return msg_copy.as_string(policy=SMTPUTF8)
def _gpg_encrypt_and_deliver(message: email.message.EmailMessage, cmdline, to, encrypt_f):
out = _gpg_encrypt_to_str(message, cmdline, to, encrypt_f)
send_msg(out, to)
def _gpg_encrypt_and_deliver(message: EmailMessage, keys, recipients, encrypt_f):
out = _gpg_encrypt_to_str(message, keys, recipients, encrypt_f)
send_msg(out, recipients)
def _customise_headers(msg_copy: email.message.EmailMessage):
def _customise_headers(message: EmailMessage):
if conf.config_item_equals('default', 'add_header', 'yes'):
msg_copy['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
message['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
if 'Content-Transfer-Encoding' in msg_copy:
msg_copy.replace_header('Content-Transfer-Encoding', '8BIT')
if 'Content-Transfer-Encoding' in message:
message.replace_header('Content-Transfer-Encoding', '8BIT')
else:
msg_copy['Content-Transfer-Encoding'] = '8BIT'
message['Content-Transfer-Encoding'] = '8BIT'
def _load_keys():
@ -186,11 +185,11 @@ def _identify_gpg_recipients(recipients, keys: kcache.KeyCache):
# This list will be filled with pairs (M, N), where M is the destination
# address we're going to deliver the message to and N is the identity we're
# going to encrypt it for.
gpg_to = list()
gpg_recipients = list()
# This will be the list of recipients that haven't provided us with their
# public keys.
ungpg_to = list()
cleartext_recipients = list()
# In "strict mode", only keys included in configuration are used to encrypt
# email.
@ -201,23 +200,23 @@ def _identify_gpg_recipients(recipients, keys: kcache.KeyCache):
for to in recipients:
own_key = _try_configured_key(to, keys)
if own_key is not None:
gpg_to.append(GpgRecipient(own_key[0], own_key[1]))
gpg_recipients.append(GpgRecipient(own_key[0], own_key[1]))
continue
direct_key = _try_direct_key_lookup(to, keys, strict_mode)
if direct_key is not None:
gpg_to.append(GpgRecipient(direct_key[0], direct_key[1]))
gpg_recipients.append(GpgRecipient(direct_key[0], direct_key[1]))
continue
domain_key = _try_configured_domain_key(to, keys)
if domain_key is not None:
gpg_to.append(GpgRecipient(domain_key[0], domain_key[1]))
gpg_recipients.append(GpgRecipient(domain_key[0], domain_key[1]))
continue
ungpg_to.append(to)
cleartext_recipients.append(to)
LOG.debug(f'Collected recipients; GPG: {gpg_to}; UnGPG: {ungpg_to}')
return gpg_to, ungpg_to
LOG.debug('Collected recipients; GPG: %s; cleartext: %s', gpg_recipients, cleartext_recipients)
return gpg_recipients, cleartext_recipients
def _find_key(recipient, keys, strict_mode):
@ -279,7 +278,7 @@ def _try_configured_domain_key(recipient, keys):
return None
def _encrypt_all_payloads_inline(message: email.message.EmailMessage, gpg_to_cmdline):
def _encrypt_all_payloads_inline(message: EmailMessage, gpg_to_cmdline):
# This breaks cascaded MIME messages. Blame PGP/INLINE.
encrypted_payloads = list()
@ -295,14 +294,14 @@ def _encrypt_all_payloads_inline(message: email.message.EmailMessage, gpg_to_cmd
return encrypted_payloads
def _encrypt_all_payloads_mime(message: email.message.EmailMessage, gpg_to_cmdline):
def _encrypt_all_payloads_mime(message: EmailMessage, gpg_to_cmdline):
# Convert a plain text email into PGP/MIME attachment style. Modeled after enigmail.
pgp_ver_part = email.message.MIMEPart()
pgp_ver_part.set_payload('Version: 1' + text.EOL)
pgp_ver_part = MIMEPart()
pgp_ver_part.set_payload('Version: 1' + text.EOL_S)
pgp_ver_part.set_type("application/pgp-encrypted")
pgp_ver_part.set_param('PGP/MIME version identification', "", 'Content-Description')
encrypted_part = email.message.MIMEPart()
encrypted_part = MIMEPart()
encrypted_part.set_type("application/octet-stream")
encrypted_part.set_param('name', "encrypted.asc")
encrypted_part.set_param('OpenPGP encrypted message', "", 'Content-Description')
@ -329,12 +328,12 @@ def _encrypt_all_payloads_mime(message: email.message.EmailMessage, gpg_to_cmdli
return [pgp_ver_part, _encrypt_payload(encrypted_part, gpg_to_cmdline, False)]
def _rewrap_payload(message: email.message.EmailMessage) -> email.message.MIMEPart:
def _rewrap_payload(message: EmailMessage) -> MIMEPart:
# In PGP/MIME (RFC 3156), the payload has to be a valid MIME entity. In
# other words, we need to wrap text/plain message's payload in a new MIME
# entity.
pld = email.message.MIMEPart()
pld = MIMEPart()
pld.set_type(message.get_content_type())
pld.set_content(message.get_content())
@ -352,13 +351,13 @@ def _make_boundary():
return junk_msg.get_boundary()
def _set_type_and_boundary(message: email.message.EmailMessage, boundary):
def _set_type_and_boundary(message: EmailMessage, boundary):
message.set_type('multipart/encrypted')
message.set_param('protocol', 'application/pgp-encrypted')
message.set_param('boundary', boundary)
def _encrypt_payload(payload: email.message.EmailMessage, recipients, check_nested=True, **kwargs):
def _encrypt_payload(payload: EmailMessage, recipients, check_nested=True, **kwargs):
raw_payload = payload.get_payload(decode=True)
LOG.debug('About to encrypt raw payload: %s', raw_payload)
LOG.debug('Original message: %s', payload)
@ -443,17 +442,17 @@ def _smime_encrypt(raw_message, recipients):
p7 = s.encrypt(BIO.MemoryBuffer(raw_message.as_string()))
# Output p7 in mail-friendly format.
out = BIO.MemoryBuffer()
out.write('From: ' + from_addr + text.EOL)
out.write('To: ' + raw_message['To'] + text.EOL)
out.write('From: ' + from_addr + text.EOL_S)
out.write('To: ' + raw_message['To'] + text.EOL_S)
if raw_message['Cc']:
out.write('Cc: ' + raw_message['Cc'] + text.EOL)
out.write('Cc: ' + raw_message['Cc'] + text.EOL_S)
if raw_message['Bcc']:
out.write('Bcc: ' + raw_message['Bcc'] + text.EOL)
out.write('Bcc: ' + raw_message['Bcc'] + text.EOL_S)
if raw_message['Subject']:
out.write('Subject: ' + raw_message['Subject'] + text.EOL)
out.write('Subject: ' + raw_message['Subject'] + text.EOL_S)
if conf.config_item_equals('default', 'add_header', 'yes'):
out.write('X-GPG-Mailgate: Encrypted by GPG Mailgate' + text.EOL)
out.write('X-GPG-Mailgate: Encrypted by GPG Mailgate' + text.EOL_S)
s.write(out, p7)
@ -555,7 +554,7 @@ def send_msg_bytes(message: bytes, recipients, fromaddr=None):
LOG.info("No recipient found")
def _is_encrypted(raw_message: email.message.EmailMessage):
def _is_encrypted(raw_message: EmailMessage):
if raw_message.get_content_type() == 'multipart/encrypted':
return True
@ -566,31 +565,31 @@ def _is_encrypted(raw_message: email.message.EmailMessage):
return text.is_message_pgp_inline(first_part)
def delivery_plan(recipients, message: email.message.EmailMessage, key_cache: kcache.KeyCache):
def delivery_plan(recipients, message: EmailMessage, key_cache: kcache.KeyCache):
"""Generate a sequence of delivery strategies."""
if _is_encrypted(message):
LOG.debug(f'Message is already encrypted: {message!r}')
LOG.debug('Message is already encrypted: %s', message)
return [KeepIntact(recipients)]
gpg_to, ungpg_to = _identify_gpg_recipients(recipients, key_cache)
gpg_recipients, cleartext_recipients = _identify_gpg_recipients(recipients, key_cache)
gpg_mime_to, gpg_mime_cmd, gpg_inline_to, gpg_inline_cmd = \
_sort_gpg_recipients(gpg_to)
mime_recipients, mime_keys, inline_recipients, inline_keys = \
_sort_gpg_recipients(gpg_recipients)
keyhome = conf.get_item('gpg', 'keyhome')
plan = []
if gpg_mime_to:
plan.append(MimeOpenPGPEncrypt(gpg_mime_to, gpg_mime_cmd, keyhome))
if gpg_inline_to:
plan.append(InlineOpenPGPEncrypt(gpg_inline_to, gpg_inline_cmd, keyhome))
if ungpg_to:
plan.append(KeepIntact(ungpg_to))
if mime_recipients:
plan.append(MimeOpenPGPEncrypt(mime_recipients, mime_keys, keyhome))
if inline_recipients:
plan.append(InlineOpenPGPEncrypt(inline_recipients, inline_keys, keyhome))
if cleartext_recipients:
plan.append(KeepIntact(cleartext_recipients))
return plan
def deliver_message(raw_message: email.message.EmailMessage, from_address, to_addrs):
def deliver_message(raw_message: EmailMessage, from_address, to_addrs):
"""Send RAW_MESSAGE to all TO_ADDRS using the best encryption method available."""
global from_addr

View File

@ -1,12 +1,12 @@
import lacre.core
from email.message import Message
from email.message import EmailMessage
import unittest
class LacreCoreTest(unittest.TestCase):
def test_attachment_handling(self):
m = Message()
m = EmailMessage()
m.set_payload('This is a payload')
m.set_param('attachment', '', 'Content-Disposition')
m.set_param('filename', 'foo', 'Content-Disposition')
@ -16,7 +16,7 @@ class LacreCoreTest(unittest.TestCase):
self.assertEqual(m.get_filename(), 'foo.pgp')
def test_attachment_handling_2(self):
m = Message()
m = EmailMessage()
m.set_payload('This is a payload')
m.set_param('attachment', '', 'Content-Disposition')
m.set_param('name', 'quux', 'Content-Type')
@ -26,7 +26,7 @@ class LacreCoreTest(unittest.TestCase):
self.assertEqual(m.get_filename(), 'quux.pgp')
def test_payload_wrapping(self):
m = Message()
m = EmailMessage()
m.set_payload('This is a payload.\r\n'
+ '\r\n'
+ 'It has two paragraphs.\r\n')