Rework PGP-Inline verification/recognition

This commit is contained in:
Piotr F. Mieszkowski 2022-10-22 19:21:25 +02:00
parent ba7978b4a6
commit fc85cdb841
4 changed files with 48 additions and 25 deletions

View file

@ -285,7 +285,7 @@ def _encrypt_all_payloads_inline(message, gpg_to_cmdline):
return encrypted_payloads
def _encrypt_all_payloads_mime(message, gpg_to_cmdline):
def _encrypt_all_payloads_mime(message: email.message.Message, gpg_to_cmdline):
# Convert a plain text email into PGP/MIME attachment style. Modeled after enigmail.
pgp_ver_part = email.message.Message()
pgp_ver_part.set_payload("Version: 1"+text.EOL)
@ -308,7 +308,7 @@ def _encrypt_all_payloads_mime(message, gpg_to_cmdline):
encoding = sys.getdefaultencoding()
if 'Content-Type' in message and not message['Content-Type'].startswith('multipart'):
additionalSubHeader = "Content-Type: " + message['Content-Type'] + text.EOL
(base, encoding) = text.parse_content_type(message['Content-Type'])
encoding = message.get_content_charset(sys.getdefaultencoding())
LOG.debug(f"Identified encoding as {encoding}")
encrypted_part.set_payload(additionalSubHeader+text.EOL + message.get_payload(decode=True).decode(encoding))
check_nested = True
@ -335,7 +335,7 @@ def _encrypt_all_payloads_mime(message, gpg_to_cmdline):
def _encrypt_payload(payload, gpg_to_cmdline, check_nested=True):
raw_payload = payload.get_payload(decode=True)
if check_nested and text.is_pgp_inline(raw_payload):
if check_nested and text.is_payload_pgp_inline(raw_payload):
LOG.debug("Message is already pgp encrypted. No nested encryption needed.")
return payload
@ -492,7 +492,7 @@ def send_msg(message: str, recipients, fromaddr=None):
LOG.info("No recipient found")
def _is_encrypted(raw_message):
def _is_encrypted(raw_message: email.message.Message):
if raw_message.get_content_type() == 'multipart/encrypted':
return True
@ -500,12 +500,14 @@ def _is_encrypted(raw_message):
if first_part.get_content_type() == 'application/pkcs7-mime':
return True
first_payload = first_part.get_payload(decode=True)
return text.is_pgp_inline(first_payload)
return text.is_message_pgp_inline(first_part)
def delivery_plan(recipients, key_cache: kcache.KeyCache):
def delivery_plan(recipients, message: email.message.Message, key_cache: kcache.KeyCache):
"""Generate a sequence of delivery strategies."""
if _is_encrypted(message):
return [KeepIntact(recipients)]
gpg_to, ungpg_to = _identify_gpg_recipients(recipients, key_cache)
gpg_mime_to, gpg_mime_cmd, gpg_inline_to, gpg_inline_cmd = \
@ -531,7 +533,8 @@ def deliver_message(raw_message: email.message.Message, from_address, to_addrs):
# Ugly workaround to keep the code working without too many changes.
from_addr = from_address
recipients_left = [text.sanitize_case_sense(recipient) for recipient in to_addrs]
sanitize = text.choose_sanitizer(conf.get_item('default', 'mail_case_insensitive'))
recipients_left = [sanitize(recipient) for recipient in to_addrs]
# There is no need for nested encryption
LOG.debug("Seeing if it's already encrypted")

View file

@ -16,7 +16,6 @@ from watchdog.observers import Observer
# These are the only values that our mail handler is allowed to return.
RESULT_OK = '250 OK'
RESULT_ERROR = '500 Could not process your message'
RESULT_NOT_IMPLEMENTED = '500 Not implemented yet'
# Load configuration and init logging, in this order. Only then can we load
# the last Lacre module, i.e. lacre.mailgate.
@ -41,7 +40,7 @@ class MailEncryptionProxy:
try:
keys = await self._keyring.freeze_identities()
message = email.message_from_bytes(envelope.content)
for operation in gate.delivery_plan(envelope.rcpt_tos, keys):
for operation in gate.delivery_plan(envelope.rcpt_tos, message, keys):
LOG.debug(f"Sending mail via {operation!r}")
new_message = operation.perform(message)
gate.send_msg(new_message, operation.recipients(), envelope.mail_from)

View file

@ -5,6 +5,7 @@ module.
"""
import lacre.text as text
import lacre.config as conf
import logging
from os import stat
from watchdog.events import FileSystemEventHandler
@ -17,7 +18,8 @@ LOG = logging.getLogger(__name__)
def _sanitize(keys):
return {fingerprint: text.sanitize_case_sense(keys[fingerprint]) for fingerprint in keys}
sanitize = text.choose_sanitizer(conf.get_item('default', 'mail_case_insensitive'))
return {fingerprint: sanitize(keys[fingerprint]) for fingerprint in keys}
class KeyCacheMisconfiguration(Exception):

View file

@ -1,8 +1,10 @@
"""Basic payload-processing routines."""
import sys
import re
import logging
from email.message import Message
import lacre.config as conf
# The standard way to encode line-ending in email:
EOL = "\r\n"
@ -14,7 +16,7 @@ PGP_INLINE_END = b"-----END PGP MESSAGE-----"
LOG = logging.getLogger(__name__)
def parse_content_type(content_type):
def parse_content_type(content_type: str):
"""Analyse Content-Type email header.
Return a pair: type and sub-type.
@ -49,19 +51,36 @@ def parse_delimiter(address: str):
return (address, None)
def sanitize_case_sense(address):
"""Sanitize email case."""
# TODO: find a way to make it more unit-testable
if conf.flag_enabled('default', 'mail_case_insensitive'):
address = address.lower()
def _lowercase_whole_address(address: str):
return address.lower()
def _lowercase_domain_only(address: str):
parts = address.split('@', maxsplit=2)
if len(parts) > 1:
return parts[0] + '@' + parts[1].lower()
else:
splitted_address = address.split('@')
if len(splitted_address) > 1:
address = splitted_address[0] + '@' + splitted_address[1].lower()
return address
return address
def is_pgp_inline(payload) -> bool:
"""Find out if the payload (bytes) contains PGP/INLINE markers."""
def choose_sanitizer(mail_case_insensitive: bool):
"""Return a function to sanitize email case sense."""
if mail_case_insensitive:
return _lowercase_whole_address
else:
return _lowercase_domain_only
def is_payload_pgp_inline(payload: bytes) -> bool:
"""Find out if the payload (bytes) contains PGP/inline markers."""
return PGP_INLINE_BEGIN in payload and PGP_INLINE_END in payload
def is_message_pgp_inline(message: Message) -> bool:
"""Find out if a message is already PGP-Inline encrypted."""
if message.is_multipart() or isinstance(message.get_payload(), list):
# more than one payload, check each one of them
return any(is_message_pgp_inline(m.payload()) for m in message.iter_parts())
else:
# one payload, check it
return is_payload_pgp_inline(message.get_payload(decode=True))