From d342f206de7c0c5e683d874e333690af8050dba0 Mon Sep 17 00:00:00 2001 From: "Piotr F. Mieszkowski" Date: Sat, 4 Mar 2023 20:38:50 +0100 Subject: [PATCH] Handle messages as EmailMessage In the daemon, specify policy as SMTPUTF8. That sets the deafult message type to EmailMessage. EmailMessage class is richer, including support for Content Managers, giving it the capability to properly handle textual data and its encodings. Also: add another contract test. --- lacre/core.py | 36 +++++++++++++++++----------------- lacre/daemon.py | 4 +++- test/modules/test_contracts.py | 17 +++++++++++++++- 3 files changed, 37 insertions(+), 20 deletions(-) diff --git a/lacre/core.py b/lacre/core.py index 48a6586..61af1e2 100644 --- a/lacre/core.py +++ b/lacre/core.py @@ -113,7 +113,7 @@ def _sort_gpg_recipients(gpg_to): return gpg_to_smtp_mime, gpg_to_cmdline_mime, gpg_to_smtp_inline, gpg_to_cmdline_inline -def _gpg_encrypt_copy(message: email.message.Message, cmdline, to, encrypt_f): +def _gpg_encrypt_copy(message: email.message.EmailMessage, cmdline, to, encrypt_f): msg_copy = copy.deepcopy(message) _customise_headers(msg_copy) encrypted_payloads = encrypt_f(msg_copy, cmdline) @@ -121,22 +121,22 @@ def _gpg_encrypt_copy(message: email.message.Message, cmdline, to, encrypt_f): return msg_copy -def _gpg_encrypt_to_bytes(message: email.message.Message, cmdline, to, encrypt_f) -> bytes: +def _gpg_encrypt_to_bytes(message: email.message.EmailMessage, cmdline, to, encrypt_f) -> bytes: msg_copy = _gpg_encrypt_copy(message, cmdline, to, encrypt_f) return msg_copy.as_bytes(policy=SMTPUTF8) -def _gpg_encrypt_to_str(message: email.message.Message, cmdline, to, encrypt_f) -> str: +def _gpg_encrypt_to_str(message: email.message.EmailMessage, cmdline, to, encrypt_f) -> str: msg_copy = _gpg_encrypt_copy(message, cmdline, to, encrypt_f) return msg_copy.as_string() -def _gpg_encrypt_and_deliver(message: email.message.Message, cmdline, to, encrypt_f): +def _gpg_encrypt_and_deliver(message: email.message.EmailMessage, cmdline, to, encrypt_f): out = _gpg_encrypt_to_str(message, cmdline, to, encrypt_f) send_msg(out, to) -def _customise_headers(msg_copy: email.message.Message): +def _customise_headers(msg_copy: email.message.EmailMessage): if conf.config_item_equals('default', 'add_header', 'yes'): msg_copy['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate' @@ -279,7 +279,7 @@ def _try_configured_domain_key(recipient, keys): return None -def _encrypt_all_payloads_inline(message: email.message.Message, gpg_to_cmdline): +def _encrypt_all_payloads_inline(message: email.message.EmailMessage, gpg_to_cmdline): # This breaks cascaded MIME messages. Blame PGP/INLINE. encrypted_payloads = list() @@ -295,7 +295,7 @@ def _encrypt_all_payloads_inline(message: email.message.Message, gpg_to_cmdline) return encrypted_payloads -def _encrypt_all_payloads_mime(message: email.message.Message, gpg_to_cmdline): +def _encrypt_all_payloads_mime(message: email.message.EmailMessage, gpg_to_cmdline): # Convert a plain text email into PGP/MIME attachment style. Modeled after enigmail. pgp_ver_part = email.message.MIMEPart() pgp_ver_part.set_payload('Version: 1' + text.EOL) @@ -329,18 +329,18 @@ def _encrypt_all_payloads_mime(message: email.message.Message, gpg_to_cmdline): return [pgp_ver_part, _encrypt_payload(encrypted_part, gpg_to_cmdline, False)] -def _rewrap_payload(message: email.message.Message) -> email.message.MIMEPart: +def _rewrap_payload(message: email.message.EmailMessage) -> email.message.MIMEPart: # In PGP/MIME (RFC 3156), the payload has to be a valid MIME entity. In # other words, we need to wrap text/plain message's payload in a new MIME # entity. pld = email.message.MIMEPart() - pld.set_payload(message.get_payload()) - pld.set_type(pld.get_content_type()) + pld.set_type(message.get_content_type()) + pld.set_content(message.get_content()) - cs = message.get_param('charset', None, 'Content-Type') - if cs: - pld.set_param('charset', cs) + # Make sure all Content-Type parameters are included. + for (k, v) in message.get_params(): + pld.set_param(k, v) return pld @@ -352,13 +352,13 @@ def _make_boundary(): return junk_msg.get_boundary() -def _set_type_and_boundary(message: email.message.Message, boundary): +def _set_type_and_boundary(message: email.message.EmailMessage, boundary): message.set_type('multipart/encrypted') message.set_param('protocol', 'application/pgp-encrypted') message.set_param('boundary', boundary) -def _encrypt_payload(payload: email.message.Message, recipients, check_nested=True, **kwargs): +def _encrypt_payload(payload: email.message.EmailMessage, recipients, check_nested=True, **kwargs): raw_payload = payload.get_payload(decode=True) LOG.debug('About to encrypt raw payload: %s', raw_payload) LOG.debug('Original message: %s', payload) @@ -555,7 +555,7 @@ def send_msg_bytes(message: bytes, recipients, fromaddr=None): LOG.info("No recipient found") -def _is_encrypted(raw_message: email.message.Message): +def _is_encrypted(raw_message: email.message.EmailMessage): if raw_message.get_content_type() == 'multipart/encrypted': return True @@ -566,7 +566,7 @@ def _is_encrypted(raw_message: email.message.Message): return text.is_message_pgp_inline(first_part) -def delivery_plan(recipients, message: email.message.Message, key_cache: kcache.KeyCache): +def delivery_plan(recipients, message: email.message.EmailMessage, key_cache: kcache.KeyCache): """Generate a sequence of delivery strategies.""" if _is_encrypted(message): LOG.debug(f'Message is already encrypted: {message!r}') @@ -590,7 +590,7 @@ def delivery_plan(recipients, message: email.message.Message, key_cache: kcache. return plan -def deliver_message(raw_message: email.message.Message, from_address, to_addrs): +def deliver_message(raw_message: email.message.EmailMessage, from_address, to_addrs): """Send RAW_MESSAGE to all TO_ADDRS using the best encryption method available.""" global from_addr diff --git a/lacre/daemon.py b/lacre/daemon.py index 75dfe4c..50255ec 100644 --- a/lacre/daemon.py +++ b/lacre/daemon.py @@ -9,6 +9,7 @@ from aiosmtpd.controller import Controller from aiosmtpd.smtp import Envelope import asyncio import email +from email.policy import SMTPUTF8 import time from watchdog.observers import Observer @@ -43,7 +44,8 @@ class MailEncryptionProxy: try: keys = await self._keyring.freeze_identities() LOG.debug('Parsing message: %s', self._beginning(envelope)) - message = email.message_from_bytes(envelope.original_content) + message = email.message_from_bytes(envelope.original_content, policy=SMTPUTF8) + LOG.debug('Parsed into %s: %s', type(message), repr(message)) if message.defects: # Sometimes a weird message cannot be encoded back and diff --git a/test/modules/test_contracts.py b/test/modules/test_contracts.py index 907b864..a942297 100644 --- a/test/modules/test_contracts.py +++ b/test/modules/test_contracts.py @@ -55,6 +55,7 @@ class EmailParsingTest(unittest.TestCase): parsed = email.message_from_bytes(rawmsg) + self.assertEqual(parsed["From"], "alice@lacre.io") self.assertEqual(parsed.get_payload(), "Test message from Alice to Bob.\r\n") self.assertEqual(parsed.get_payload(decode=True), b"Test message from Alice to Bob.\r\n") @@ -71,7 +72,7 @@ class EmailParsingTest(unittest.TestCase): self.assertEqual(parsed["To"], "bob@lacre.io") self.assertEqual(parsed["Subject"], "Test message") - def test_str_message_payload_decoded_produces_bytes(self): + def test_str_base64_payload(self): rawmsg = "From: alice@lacre.io\r\n" \ + "To: bob@lacre.io\r\n" \ + "Subject: Test message\r\n" \ @@ -102,6 +103,20 @@ class EmailParsingTest(unittest.TestCase): self.assertEqual(parsed.get_payload(decode=False), "SGVsbG8sIFdvcmxkIQo=\r\n") self.assertEqual(parsed.get_payload(decode=True), b"Hello, World!\n") + def test_headers_only_produces_single_payload_for_multipart(self): + msg = None + with open('test/msgin/utf8-alternative.msg', 'rb') as f: + p = email.parser.BytesHeaderParser() + msg = p.parse(f) + + payload = msg.get_payload() + + # Taken from test/msgin/utf8-alternative.msg: + message_boundary = '6s7R3c0y2W8qiD7cU3iWyXcw' + + self.assertIsInstance(payload, str) + self.assertTrue(message_boundary in payload) + class EmailTest(unittest.TestCase): def test_boundary_generated_after_as_string_call(self):