Use bytes instead of str to hold message bodies

smtplib.SMTP expects ASCII-only message bodies when message body is provided
as a 'str'.  If we pass a 'bytes', we need to choose encoding earlier and we
do this by calling 'as_bytes' on messages with SMTP policy, which takes care
of formatting the body properly.

As a result, ISO-8859-x messages are converted to Quoted Printable and UTF-8
messages are Base64-encoded.

Testing this behaviour is tricky, because we use the same SMTP client to send
test data.  For this reason, test code has become a bit ugly, but it does
exactly what we need.
This commit is contained in:
Piotr F. Mieszkowski 2022-11-03 22:37:21 +01:00
parent 0fac54a29a
commit b6bd36a460
5 changed files with 78 additions and 27 deletions

View File

@ -28,6 +28,7 @@ import copy
import email
import email.message
import email.utils
from email.policy import SMTP
import GnuPG
import os
import smtplib
@ -113,16 +114,26 @@ def _sort_gpg_recipients(gpg_to):
return gpg_to_smtp_mime, gpg_to_cmdline_mime, gpg_to_smtp_inline, gpg_to_cmdline_inline
def _gpg_encrypt_and_return(message, cmdline, to, encrypt_f) -> str:
def _gpg_encrypt_copy(message, cmdline, to, encrypt_f):
msg_copy = copy.deepcopy(message)
_customise_headers(msg_copy)
encrypted_payloads = encrypt_f(msg_copy, cmdline)
msg_copy.set_payload(encrypted_payloads)
return msg_copy
def _gpg_encrypt_to_bytes(message, cmdline, to, encrypt_f) -> bytes:
msg_copy = _gpg_encrypt_copy(message, cmdline, to, encrypt_f)
return msg_copy.as_bytes(policy=SMTP)
def _gpg_encrypt_to_str(message, cmdline, to, encrypt_f) -> str:
msg_copy = _gpg_encrypt_copy(message, cmdline, to, encrypt_f)
return msg_copy.as_string()
def _gpg_encrypt_and_deliver(message, cmdline, to, encrypt_f):
out = _gpg_encrypt_and_return(message, cmdline, to, encrypt_f)
out = _gpg_encrypt_to_str(message, cmdline, to, encrypt_f)
send_msg(out, to)
@ -492,6 +503,25 @@ def send_msg(message: str, recipients, fromaddr=None):
LOG.info("No recipient found")
def send_msg_bytes(message: bytes, recipients, fromaddr=None):
"""Send MESSAGE to RECIPIENTS to the mail relay."""
global from_addr
if fromaddr is not None:
from_addr = fromaddr
recipients = [_f for _f in recipients if _f]
if recipients:
LOG.info(f"Sending email to: {recipients!r}")
relay = conf.relay_params()
smtp = smtplib.SMTP(relay[0], relay[1])
if conf.flag_enabled('relay', 'starttls'):
smtp.starttls()
smtp.sendmail(from_addr, recipients, message)
else:
LOG.info("No recipient found")
def _is_encrypted(raw_message: email.message.Message):
if raw_message.get_content_type() == 'multipart/encrypted':
return True

View File

@ -43,7 +43,7 @@ class MailEncryptionProxy:
for operation in gate.delivery_plan(envelope.rcpt_tos, message, keys):
LOG.debug(f"Sending mail via {operation!r}")
new_message = operation.perform(message)
gate.send_msg(new_message, operation.recipients(), envelope.mail_from)
gate.send_msg_bytes(new_message, operation.recipients(), envelope.mail_from)
except TypeError as te:
LOG.exception("Got exception while processing", exc_info=te)
return RESULT_ERROR

View File

@ -16,6 +16,7 @@ There are 3 operations available:
import logging
import lacre.core as core
from email.message import Message
from email.policy import SMTP
LOG = logging.getLogger(__name__)
@ -28,7 +29,7 @@ class MailOperation:
"""Initialise the operation with a recipient."""
self._recipients = recipients
def perform(self, message: Message):
def perform(self, message: Message) -> bytes:
"""Perform this operation on MESSAGE.
Return target message.
@ -69,12 +70,12 @@ class InlineOpenPGPEncrypt(OpenPGPEncrypt):
"""Initialise strategy object."""
super().__init__(recipients, keys, keyhome)
def perform(self, msg: Message):
def perform(self, msg: Message) -> bytes:
"""Encrypt with PGP Inline."""
LOG.debug('Sending PGP/Inline...')
return core._gpg_encrypt_and_return(msg,
self._keys, self._recipients,
core._encrypt_all_payloads_inline)
return core._gpg_encrypt_to_bytes(msg,
self._keys, self._recipients,
core._encrypt_all_payloads_inline)
class MimeOpenPGPEncrypt(OpenPGPEncrypt):
@ -84,12 +85,12 @@ class MimeOpenPGPEncrypt(OpenPGPEncrypt):
"""Initialise strategy object."""
super().__init__(recipients, keys, keyhome)
def perform(self, msg: Message):
def perform(self, msg: Message) -> bytes:
"""Encrypt with PGP MIME."""
LOG.debug('Sending PGP/MIME...')
return core._gpg_encrypt_and_return(msg,
self._keys, self._recipients,
core._encrypt_all_payloads_mime)
return core._gpg_encrypt_to_bytes(msg,
self._keys, self._recipients,
core._encrypt_all_payloads_mime)
class SMimeEncrypt(MailOperation):
@ -101,10 +102,10 @@ class SMimeEncrypt(MailOperation):
self._email = email
self._cert = certificate
def perform(self, message: Message):
def perform(self, message: Message) -> bytes:
"""Encrypt with a certificate."""
LOG.warning(f"Delivering clear-text to {self._recipients}")
return message
return message.as_bytes(policy=SMTP)
def __repr__(self):
"""Generate a representation with just method and key."""
@ -121,9 +122,9 @@ class KeepIntact(MailOperation):
"""Initialise pass-through operation for a given recipient."""
super().__init__(recipients)
def perform(self, message: Message):
def perform(self, message: Message) -> bytes:
"""Return MESSAGE unmodified."""
return message.as_string()
return message.as_bytes(policy=SMTP)
def __repr__(self):
"""Return representation with just method and email."""

View File

@ -92,13 +92,13 @@ in: test/msgin/with-markers2clear.msg
out-not: This message includes inline PGP markers.
[case-10]
descr: UTF-8 message
descr: UTF-8 message (yields Base64)
to: carlos@disposlab
in: test/msgin/utf8.msg
out: ŁĄCZNOŚĆ
out: xYHEhENaTk/FmsSGLiBaYcW6w7PFgsSHIGfEmcWbbMSFIGphxbrFhC4=
[case-11]
descr: Non-ASCII message (ISO-8859-2)
descr: Non-ASCII message (ISO-8859-2; yields quoted-printable)
to: carlos@disposlab
in: test/msgin/nonascii.msg
out: ŁĄCZNOŚĆ
out: =A3=A1CZNO=A6=C6.

View File

@ -2,20 +2,25 @@ import logging
import smtplib
import sys
import getopt
from email import policy, message_from_binary_file
from email import message_from_binary_file
from email.policy import SMTPUTF8
def _load_file(name):
def _load_file(name) -> bytes:
with open(name, 'rb') as f:
return message_from_binary_file(f, policy=policy.SMTP)
return f.read()
def _send(host, port, from_addr, recipients, message):
def _load_message(name):
with open(name, 'rb') as f:
return message_from_binary_file(f, policy=SMTPUTF8)
def _send_message(host, port, from_addr, recipients, message):
logging.info(f"From {from_addr} to {recipients} at {host}:{port}")
try:
smtp = smtplib.SMTP(host, port)
# smtp.starttls()
return smtp.sendmail(from_addr, recipients, message.as_bytes(policy=policy.SMTP))
return smtp.sendmail(from_addr, recipients, message.as_bytes())
except smtplib.SMTPDataError as e:
logging.error(f"Couldn't deliver message. Got error: {e}")
return None
@ -27,6 +32,21 @@ def _send(host, port, from_addr, recipients, message):
return None
# The poinf of this function is to do _almost_ what SMTP.sendmail does, but
# without enforcing ASCII. We want to test Lacre with not necessarily valid
# messages.
def _send_bytes(host: str, port, from_addr: str, recipients, message: bytes):
try:
smtp = smtplib.SMTP(host, port)
smtp.ehlo_or_helo_if_needed()
smtp.mail(from_addr)
for r in recipients:
smtp.rcpt(r)
smtp.data(message)
except:
logging.exception('Unexpected exception was thrown')
logging.basicConfig(filename="test/logs/sendmail.log",
format="%(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
@ -49,4 +69,4 @@ for opt, value in opts:
if message is None or sender is None or recipient is None:
print('Use options to provide: -f sender -t recipient -m message')
_send('localhost', 10025, sender, [recipient], message)
_send_bytes('localhost', 10025, sender, [recipient], message)