Implement Advanced Filter flow for cleartext and OpenPGP

- Polish implementation of mail operations (lacre/mailop.py).  Add two
strategies: InlineOpenPGPEncrypt and MimeOpenPGPEncrypt, to support two modes
of OpenPGP encryption.

- In delivery_plan, only use those strategies that actually make sense with
the recipients we'd got.

- Add flag_enabled predicate (lacre/config.py) to make configuration checks
easier / simpler.

- Handle TypeError errors in Advanced Filter, indicating a delivery failure
when they appear.

- Add type hints to some of the functions.
This commit is contained in:
Piotr F. Mieszkowski 2022-09-29 22:05:45 +02:00
parent 33c325a992
commit e8b7a9fb44
4 changed files with 46 additions and 28 deletions

View File

@ -81,6 +81,10 @@ def config_item_equals(section, key, value) -> bool:
return section in cfg and key in cfg[section] and cfg[section][key] == value
def flag_enabled(section, key) -> bool:
return config_item_equals(section, key, 'yes')
def validate_config():
"""Check if configuration is complete.

View File

@ -5,7 +5,9 @@ import lacre
import lacre.config as conf
import sys
from aiosmtpd.controller import Controller
from aiosmtpd.smtp import Envelope
import asyncio
import email
# Mail status constants.
#
@ -26,14 +28,17 @@ import lacre.mailgate as gate
class MailEncryptionProxy:
"""A mail handler dispatching to appropriate mail operation."""
async def handle_DATA(self, server, session, envelope):
async def handle_DATA(self, server, session, envelope: Envelope):
"""Accept a message and either encrypt it or forward as-is."""
# for now, just return an error because we're not ready to handle mail
for operation in gate.delivery_plan(envelope.rcpt_tos):
LOG.debug(f"Sending mail via {operation}")
new_message = operation.perform(envelope.content)
gate.send_msg(new_message, operation.recipients(), envelope.mail_from)
try:
message = email.message_from_bytes(envelope.content)
for operation in gate.delivery_plan(envelope.rcpt_tos):
LOG.debug(f"Sending mail via {operation!r}")
new_message = operation.perform(message)
gate.send_msg(new_message, operation.recipients(), envelope.mail_from)
except TypeError as te:
LOG.exception("Got exception while processing", exc_info=te)
return RESULT_ERROR
return RESULT_NOT_IMPLEMENTED

View File

@ -107,7 +107,7 @@ def _sort_gpg_recipients(gpg_to):
return gpg_to_smtp_mime, gpg_to_cmdline_mime, gpg_to_smtp_inline, gpg_to_cmdline_inline
def _gpg_encrypt_and_return(message, cmdline, to, encrypt_f):
def _gpg_encrypt_and_return(message, cmdline, to, encrypt_f) -> str:
msg_copy = copy.deepcopy(message)
_customise_headers(msg_copy)
encrypted_payloads = encrypt_f(msg_copy, cmdline)
@ -202,7 +202,7 @@ def _identify_gpg_recipients(recipients):
gpg_to.append(GpgRecipient(domain_key[0], domain_key[1]))
continue
ungpg_to.append((to, to))
ungpg_to.append(to)
LOG.debug(f'Collected recipients; GPG: {gpg_to}; UnGPG: {ungpg_to}')
return gpg_to, ungpg_to
@ -480,7 +480,7 @@ def _get_first_payload(payloads):
return payloads
def send_msg(message, recipients, fromaddr=None):
def send_msg(message: str, recipients, fromaddr=None):
"""Send MESSAGE to RECIPIENTS to the mail relay."""
global from_addr
@ -492,7 +492,7 @@ def send_msg(message, recipients, fromaddr=None):
LOG.info(f"Sending email to: {recipients!r}")
relay = conf.relay_params()
smtp = smtplib.SMTP(relay[0], relay[1])
if conf.config_item_equals('relay', 'starttls', 'yes'):
if conf.flag_enabled('relay', 'starttls'):
smtp.starttls()
smtp.sendmail(from_addr, recipients, message)
else:
@ -520,12 +520,18 @@ def delivery_plan(recipients):
keyhome = conf.get_item('gpg', 'keyhome')
return [MimeOpenPGPEncrypt(gpg_mime_to, gpg_mime_cmd, keyhome),
InlineOpenPGPEncrypt(gpg_inline_to, gpg_inline_cmd, keyhome),
KeepIntact(ungpg_to)]
plan = []
if gpg_mime_to:
plan.append(MimeOpenPGPEncrypt(gpg_mime_to, gpg_mime_cmd, keyhome))
if gpg_inline_to:
plan.append(InlineOpenPGPEncrypt(gpg_inline_to, gpg_inline_cmd, keyhome))
if ungpg_to:
plan.append(KeepIntact(ungpg_to))
return plan
def deliver_message(raw_message, from_address, to_addrs):
def deliver_message(raw_message: email.message.Message, from_address, to_addrs):
"""Send RAW_MESSAGE to all TO_ADDRS using the best encryption method available."""
global from_addr

View File

@ -15,6 +15,7 @@ There are 3 operations available:
import logging
import lacre.mailgate as mailgate
from email.message import Message
LOG = logging.getLogger(__name__)
@ -27,7 +28,7 @@ class MailOperation:
"""Initialise the operation with a recipient."""
self._recipients = recipients
def perform(self, message):
def perform(self, message: Message):
"""Perform this operation on MESSAGE.
Return target message.
@ -46,10 +47,10 @@ class MailOperation:
class OpenPGPEncrypt(MailOperation):
"""OpenPGP-encrypt the message."""
def __init__(self, recipient, key, keyhome):
def __init__(self, recipients, keys, keyhome):
"""Initialise encryption operation."""
super().__init__(recipient)
self._key = key
super().__init__(recipients)
self._keys = keys
self._keyhome = keyhome
def extend_keys(self, keys):
@ -58,13 +59,17 @@ class OpenPGPEncrypt(MailOperation):
def __repr__(self):
"""Generate a representation with just method and key."""
return f"<{type(self).__name__} {self._recipients} {self._key}>"
return f"<{type(self).__name__} {self._recipients} {self._keys}>"
class InlineOpenPGPEncrypt(OpenPGPEncrypt):
"""Inline encryption strategy."""
def perform(self, msg):
def __init__(self, recipients, keys, keyhome):
"""Initialise strategy object."""
super().__init__(recipients, keys, keyhome)
def perform(self, msg: Message):
"""Encrypt with PGP Inline."""
LOG.debug('Sending PGP/Inline...')
return mailgate._gpg_encrypt_and_return(msg,
@ -77,11 +82,9 @@ class MimeOpenPGPEncrypt(OpenPGPEncrypt):
def __init__(self, recipients, keys, keyhome):
"""Initialise strategy object."""
super().__init__(recipients)
self._keys = keys
self._keyhome = keyhome
super().__init__(recipients, keys, keyhome)
def perform(self, msg):
def perform(self, msg: Message):
"""Encrypt with PGP MIME."""
LOG.debug('Sending PGP/MIME...')
return mailgate._gpg_encrypt_and_return(msg,
@ -98,7 +101,7 @@ class SMimeEncrypt(MailOperation):
self._email = email
self._cert = certificate
def perform(self, message):
def perform(self, message: Message):
"""Encrypt with a certificate."""
LOG.warning(f"Delivering clear-text to {self._recipients}")
return message
@ -118,9 +121,9 @@ class KeepIntact(MailOperation):
"""Initialise pass-through operation for a given recipient."""
super().__init__(recipients)
def perform(self, message):
def perform(self, message: Message):
"""Return MESSAGE unmodified."""
return message
return message.as_string()
def __repr__(self):
"""Return representation with just method and email."""