Handle messages as EmailMessage

In the daemon, specify policy as SMTPUTF8.  That sets the deafult message type
to EmailMessage.

EmailMessage class is richer, including support for Content Managers, giving
it the capability to properly handle textual data and its encodings.

Also: add another contract test.
This commit is contained in:
Piotr F. Mieszkowski 2023-03-04 20:38:50 +01:00
parent ace2ce6b06
commit d342f206de
3 changed files with 37 additions and 20 deletions

View File

@ -113,7 +113,7 @@ def _sort_gpg_recipients(gpg_to):
return gpg_to_smtp_mime, gpg_to_cmdline_mime, gpg_to_smtp_inline, gpg_to_cmdline_inline
def _gpg_encrypt_copy(message: email.message.Message, cmdline, to, encrypt_f):
def _gpg_encrypt_copy(message: email.message.EmailMessage, cmdline, to, encrypt_f):
msg_copy = copy.deepcopy(message)
_customise_headers(msg_copy)
encrypted_payloads = encrypt_f(msg_copy, cmdline)
@ -121,22 +121,22 @@ def _gpg_encrypt_copy(message: email.message.Message, cmdline, to, encrypt_f):
return msg_copy
def _gpg_encrypt_to_bytes(message: email.message.Message, cmdline, to, encrypt_f) -> bytes:
def _gpg_encrypt_to_bytes(message: email.message.EmailMessage, cmdline, to, encrypt_f) -> bytes:
msg_copy = _gpg_encrypt_copy(message, cmdline, to, encrypt_f)
return msg_copy.as_bytes(policy=SMTPUTF8)
def _gpg_encrypt_to_str(message: email.message.Message, cmdline, to, encrypt_f) -> str:
def _gpg_encrypt_to_str(message: email.message.EmailMessage, cmdline, to, encrypt_f) -> str:
msg_copy = _gpg_encrypt_copy(message, cmdline, to, encrypt_f)
return msg_copy.as_string()
def _gpg_encrypt_and_deliver(message: email.message.Message, cmdline, to, encrypt_f):
def _gpg_encrypt_and_deliver(message: email.message.EmailMessage, cmdline, to, encrypt_f):
out = _gpg_encrypt_to_str(message, cmdline, to, encrypt_f)
send_msg(out, to)
def _customise_headers(msg_copy: email.message.Message):
def _customise_headers(msg_copy: email.message.EmailMessage):
if conf.config_item_equals('default', 'add_header', 'yes'):
msg_copy['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
@ -279,7 +279,7 @@ def _try_configured_domain_key(recipient, keys):
return None
def _encrypt_all_payloads_inline(message: email.message.Message, gpg_to_cmdline):
def _encrypt_all_payloads_inline(message: email.message.EmailMessage, gpg_to_cmdline):
# This breaks cascaded MIME messages. Blame PGP/INLINE.
encrypted_payloads = list()
@ -295,7 +295,7 @@ def _encrypt_all_payloads_inline(message: email.message.Message, gpg_to_cmdline)
return encrypted_payloads
def _encrypt_all_payloads_mime(message: email.message.Message, gpg_to_cmdline):
def _encrypt_all_payloads_mime(message: email.message.EmailMessage, gpg_to_cmdline):
# Convert a plain text email into PGP/MIME attachment style. Modeled after enigmail.
pgp_ver_part = email.message.MIMEPart()
pgp_ver_part.set_payload('Version: 1' + text.EOL)
@ -329,18 +329,18 @@ def _encrypt_all_payloads_mime(message: email.message.Message, gpg_to_cmdline):
return [pgp_ver_part, _encrypt_payload(encrypted_part, gpg_to_cmdline, False)]
def _rewrap_payload(message: email.message.Message) -> email.message.MIMEPart:
def _rewrap_payload(message: email.message.EmailMessage) -> email.message.MIMEPart:
# In PGP/MIME (RFC 3156), the payload has to be a valid MIME entity. In
# other words, we need to wrap text/plain message's payload in a new MIME
# entity.
pld = email.message.MIMEPart()
pld.set_payload(message.get_payload())
pld.set_type(pld.get_content_type())
pld.set_type(message.get_content_type())
pld.set_content(message.get_content())
cs = message.get_param('charset', None, 'Content-Type')
if cs:
pld.set_param('charset', cs)
# Make sure all Content-Type parameters are included.
for (k, v) in message.get_params():
pld.set_param(k, v)
return pld
@ -352,13 +352,13 @@ def _make_boundary():
return junk_msg.get_boundary()
def _set_type_and_boundary(message: email.message.Message, boundary):
def _set_type_and_boundary(message: email.message.EmailMessage, boundary):
message.set_type('multipart/encrypted')
message.set_param('protocol', 'application/pgp-encrypted')
message.set_param('boundary', boundary)
def _encrypt_payload(payload: email.message.Message, recipients, check_nested=True, **kwargs):
def _encrypt_payload(payload: email.message.EmailMessage, recipients, check_nested=True, **kwargs):
raw_payload = payload.get_payload(decode=True)
LOG.debug('About to encrypt raw payload: %s', raw_payload)
LOG.debug('Original message: %s', payload)
@ -555,7 +555,7 @@ def send_msg_bytes(message: bytes, recipients, fromaddr=None):
LOG.info("No recipient found")
def _is_encrypted(raw_message: email.message.Message):
def _is_encrypted(raw_message: email.message.EmailMessage):
if raw_message.get_content_type() == 'multipart/encrypted':
return True
@ -566,7 +566,7 @@ def _is_encrypted(raw_message: email.message.Message):
return text.is_message_pgp_inline(first_part)
def delivery_plan(recipients, message: email.message.Message, key_cache: kcache.KeyCache):
def delivery_plan(recipients, message: email.message.EmailMessage, key_cache: kcache.KeyCache):
"""Generate a sequence of delivery strategies."""
if _is_encrypted(message):
LOG.debug(f'Message is already encrypted: {message!r}')
@ -590,7 +590,7 @@ def delivery_plan(recipients, message: email.message.Message, key_cache: kcache.
return plan
def deliver_message(raw_message: email.message.Message, from_address, to_addrs):
def deliver_message(raw_message: email.message.EmailMessage, from_address, to_addrs):
"""Send RAW_MESSAGE to all TO_ADDRS using the best encryption method available."""
global from_addr

View File

@ -9,6 +9,7 @@ from aiosmtpd.controller import Controller
from aiosmtpd.smtp import Envelope
import asyncio
import email
from email.policy import SMTPUTF8
import time
from watchdog.observers import Observer
@ -43,7 +44,8 @@ class MailEncryptionProxy:
try:
keys = await self._keyring.freeze_identities()
LOG.debug('Parsing message: %s', self._beginning(envelope))
message = email.message_from_bytes(envelope.original_content)
message = email.message_from_bytes(envelope.original_content, policy=SMTPUTF8)
LOG.debug('Parsed into %s: %s', type(message), repr(message))
if message.defects:
# Sometimes a weird message cannot be encoded back and

View File

@ -55,6 +55,7 @@ class EmailParsingTest(unittest.TestCase):
parsed = email.message_from_bytes(rawmsg)
self.assertEqual(parsed["From"], "alice@lacre.io")
self.assertEqual(parsed.get_payload(), "Test message from Alice to Bob.\r\n")
self.assertEqual(parsed.get_payload(decode=True), b"Test message from Alice to Bob.\r\n")
@ -71,7 +72,7 @@ class EmailParsingTest(unittest.TestCase):
self.assertEqual(parsed["To"], "bob@lacre.io")
self.assertEqual(parsed["Subject"], "Test message")
def test_str_message_payload_decoded_produces_bytes(self):
def test_str_base64_payload(self):
rawmsg = "From: alice@lacre.io\r\n" \
+ "To: bob@lacre.io\r\n" \
+ "Subject: Test message\r\n" \
@ -102,6 +103,20 @@ class EmailParsingTest(unittest.TestCase):
self.assertEqual(parsed.get_payload(decode=False), "SGVsbG8sIFdvcmxkIQo=\r\n")
self.assertEqual(parsed.get_payload(decode=True), b"Hello, World!\n")
def test_headers_only_produces_single_payload_for_multipart(self):
msg = None
with open('test/msgin/utf8-alternative.msg', 'rb') as f:
p = email.parser.BytesHeaderParser()
msg = p.parse(f)
payload = msg.get_payload()
# Taken from test/msgin/utf8-alternative.msg:
message_boundary = '6s7R3c0y2W8qiD7cU3iWyXcw'
self.assertIsInstance(payload, str)
self.assertTrue(message_boundary in payload)
class EmailTest(unittest.TestCase):
def test_boundary_generated_after_as_string_call(self):