From fc85cdb8415e36bb97cb073d997e809adab32056 Mon Sep 17 00:00:00 2001 From: "Piotr F. Mieszkowski" Date: Sat, 22 Oct 2022 19:21:25 +0200 Subject: [PATCH] Rework PGP-Inline verification/recognition --- lacre/core.py | 19 +++++++++++-------- lacre/daemon.py | 3 +-- lacre/keyring.py | 4 +++- lacre/text.py | 47 +++++++++++++++++++++++++++++++++-------------- 4 files changed, 48 insertions(+), 25 deletions(-) diff --git a/lacre/core.py b/lacre/core.py index d8b3707..0b1d01e 100644 --- a/lacre/core.py +++ b/lacre/core.py @@ -285,7 +285,7 @@ def _encrypt_all_payloads_inline(message, gpg_to_cmdline): return encrypted_payloads -def _encrypt_all_payloads_mime(message, gpg_to_cmdline): +def _encrypt_all_payloads_mime(message: email.message.Message, gpg_to_cmdline): # Convert a plain text email into PGP/MIME attachment style. Modeled after enigmail. pgp_ver_part = email.message.Message() pgp_ver_part.set_payload("Version: 1"+text.EOL) @@ -308,7 +308,7 @@ def _encrypt_all_payloads_mime(message, gpg_to_cmdline): encoding = sys.getdefaultencoding() if 'Content-Type' in message and not message['Content-Type'].startswith('multipart'): additionalSubHeader = "Content-Type: " + message['Content-Type'] + text.EOL - (base, encoding) = text.parse_content_type(message['Content-Type']) + encoding = message.get_content_charset(sys.getdefaultencoding()) LOG.debug(f"Identified encoding as {encoding}") encrypted_part.set_payload(additionalSubHeader+text.EOL + message.get_payload(decode=True).decode(encoding)) check_nested = True @@ -335,7 +335,7 @@ def _encrypt_all_payloads_mime(message, gpg_to_cmdline): def _encrypt_payload(payload, gpg_to_cmdline, check_nested=True): raw_payload = payload.get_payload(decode=True) - if check_nested and text.is_pgp_inline(raw_payload): + if check_nested and text.is_payload_pgp_inline(raw_payload): LOG.debug("Message is already pgp encrypted. No nested encryption needed.") return payload @@ -492,7 +492,7 @@ def send_msg(message: str, recipients, fromaddr=None): LOG.info("No recipient found") -def _is_encrypted(raw_message): +def _is_encrypted(raw_message: email.message.Message): if raw_message.get_content_type() == 'multipart/encrypted': return True @@ -500,12 +500,14 @@ def _is_encrypted(raw_message): if first_part.get_content_type() == 'application/pkcs7-mime': return True - first_payload = first_part.get_payload(decode=True) - return text.is_pgp_inline(first_payload) + return text.is_message_pgp_inline(first_part) -def delivery_plan(recipients, key_cache: kcache.KeyCache): +def delivery_plan(recipients, message: email.message.Message, key_cache: kcache.KeyCache): """Generate a sequence of delivery strategies.""" + if _is_encrypted(message): + return [KeepIntact(recipients)] + gpg_to, ungpg_to = _identify_gpg_recipients(recipients, key_cache) gpg_mime_to, gpg_mime_cmd, gpg_inline_to, gpg_inline_cmd = \ @@ -531,7 +533,8 @@ def deliver_message(raw_message: email.message.Message, from_address, to_addrs): # Ugly workaround to keep the code working without too many changes. from_addr = from_address - recipients_left = [text.sanitize_case_sense(recipient) for recipient in to_addrs] + sanitize = text.choose_sanitizer(conf.get_item('default', 'mail_case_insensitive')) + recipients_left = [sanitize(recipient) for recipient in to_addrs] # There is no need for nested encryption LOG.debug("Seeing if it's already encrypted") diff --git a/lacre/daemon.py b/lacre/daemon.py index c1e8e4e..9607ecb 100644 --- a/lacre/daemon.py +++ b/lacre/daemon.py @@ -16,7 +16,6 @@ from watchdog.observers import Observer # These are the only values that our mail handler is allowed to return. RESULT_OK = '250 OK' RESULT_ERROR = '500 Could not process your message' -RESULT_NOT_IMPLEMENTED = '500 Not implemented yet' # Load configuration and init logging, in this order. Only then can we load # the last Lacre module, i.e. lacre.mailgate. @@ -41,7 +40,7 @@ class MailEncryptionProxy: try: keys = await self._keyring.freeze_identities() message = email.message_from_bytes(envelope.content) - for operation in gate.delivery_plan(envelope.rcpt_tos, keys): + for operation in gate.delivery_plan(envelope.rcpt_tos, message, keys): LOG.debug(f"Sending mail via {operation!r}") new_message = operation.perform(message) gate.send_msg(new_message, operation.recipients(), envelope.mail_from) diff --git a/lacre/keyring.py b/lacre/keyring.py index 6387e9b..6a37c96 100644 --- a/lacre/keyring.py +++ b/lacre/keyring.py @@ -5,6 +5,7 @@ module. """ import lacre.text as text +import lacre.config as conf import logging from os import stat from watchdog.events import FileSystemEventHandler @@ -17,7 +18,8 @@ LOG = logging.getLogger(__name__) def _sanitize(keys): - return {fingerprint: text.sanitize_case_sense(keys[fingerprint]) for fingerprint in keys} + sanitize = text.choose_sanitizer(conf.get_item('default', 'mail_case_insensitive')) + return {fingerprint: sanitize(keys[fingerprint]) for fingerprint in keys} class KeyCacheMisconfiguration(Exception): diff --git a/lacre/text.py b/lacre/text.py index 3f9cc6a..bdbc3e0 100644 --- a/lacre/text.py +++ b/lacre/text.py @@ -1,8 +1,10 @@ +"""Basic payload-processing routines.""" + import sys import re import logging +from email.message import Message -import lacre.config as conf # The standard way to encode line-ending in email: EOL = "\r\n" @@ -14,7 +16,7 @@ PGP_INLINE_END = b"-----END PGP MESSAGE-----" LOG = logging.getLogger(__name__) -def parse_content_type(content_type): +def parse_content_type(content_type: str): """Analyse Content-Type email header. Return a pair: type and sub-type. @@ -49,19 +51,36 @@ def parse_delimiter(address: str): return (address, None) -def sanitize_case_sense(address): - """Sanitize email case.""" - # TODO: find a way to make it more unit-testable - if conf.flag_enabled('default', 'mail_case_insensitive'): - address = address.lower() +def _lowercase_whole_address(address: str): + return address.lower() + + +def _lowercase_domain_only(address: str): + parts = address.split('@', maxsplit=2) + if len(parts) > 1: + return parts[0] + '@' + parts[1].lower() else: - splitted_address = address.split('@') - if len(splitted_address) > 1: - address = splitted_address[0] + '@' + splitted_address[1].lower() - - return address + return address -def is_pgp_inline(payload) -> bool: - """Find out if the payload (bytes) contains PGP/INLINE markers.""" +def choose_sanitizer(mail_case_insensitive: bool): + """Return a function to sanitize email case sense.""" + if mail_case_insensitive: + return _lowercase_whole_address + else: + return _lowercase_domain_only + + +def is_payload_pgp_inline(payload: bytes) -> bool: + """Find out if the payload (bytes) contains PGP/inline markers.""" return PGP_INLINE_BEGIN in payload and PGP_INLINE_END in payload + + +def is_message_pgp_inline(message: Message) -> bool: + """Find out if a message is already PGP-Inline encrypted.""" + if message.is_multipart() or isinstance(message.get_payload(), list): + # more than one payload, check each one of them + return any(is_message_pgp_inline(m.payload()) for m in message.iter_parts()) + else: + # one payload, check it + return is_payload_pgp_inline(message.get_payload(decode=True))