Use bytes instead of str to hold message bodies
smtplib.SMTP expects ASCII-only message bodies when message body is provided as a 'str'. If we pass a 'bytes', we need to choose encoding earlier and we do this by calling 'as_bytes' on messages with SMTP policy, which takes care of formatting the body properly. As a result, ISO-8859-x messages are converted to Quoted Printable and UTF-8 messages are Base64-encoded. Testing this behaviour is tricky, because we use the same SMTP client to send test data. For this reason, test code has become a bit ugly, but it does exactly what we need.
This commit is contained in:
parent
ae574a701b
commit
51863dda2b
|
@ -28,6 +28,7 @@ import copy
|
|||
import email
|
||||
import email.message
|
||||
import email.utils
|
||||
from email.policy import SMTP
|
||||
import GnuPG
|
||||
import os
|
||||
import smtplib
|
||||
|
@ -113,16 +114,26 @@ def _sort_gpg_recipients(gpg_to):
|
|||
return gpg_to_smtp_mime, gpg_to_cmdline_mime, gpg_to_smtp_inline, gpg_to_cmdline_inline
|
||||
|
||||
|
||||
def _gpg_encrypt_and_return(message, cmdline, to, encrypt_f) -> str:
|
||||
def _gpg_encrypt_copy(message, cmdline, to, encrypt_f):
|
||||
msg_copy = copy.deepcopy(message)
|
||||
_customise_headers(msg_copy)
|
||||
encrypted_payloads = encrypt_f(msg_copy, cmdline)
|
||||
msg_copy.set_payload(encrypted_payloads)
|
||||
return msg_copy
|
||||
|
||||
|
||||
def _gpg_encrypt_to_bytes(message, cmdline, to, encrypt_f) -> bytes:
|
||||
msg_copy = _gpg_encrypt_copy(message, cmdline, to, encrypt_f)
|
||||
return msg_copy.as_bytes(policy=SMTP)
|
||||
|
||||
|
||||
def _gpg_encrypt_to_str(message, cmdline, to, encrypt_f) -> str:
|
||||
msg_copy = _gpg_encrypt_copy(message, cmdline, to, encrypt_f)
|
||||
return msg_copy.as_string()
|
||||
|
||||
|
||||
def _gpg_encrypt_and_deliver(message, cmdline, to, encrypt_f):
|
||||
out = _gpg_encrypt_and_return(message, cmdline, to, encrypt_f)
|
||||
out = _gpg_encrypt_to_str(message, cmdline, to, encrypt_f)
|
||||
send_msg(out, to)
|
||||
|
||||
|
||||
|
@ -492,6 +503,25 @@ def send_msg(message: str, recipients, fromaddr=None):
|
|||
LOG.info("No recipient found")
|
||||
|
||||
|
||||
def send_msg_bytes(message: bytes, recipients, fromaddr=None):
|
||||
"""Send MESSAGE to RECIPIENTS to the mail relay."""
|
||||
global from_addr
|
||||
|
||||
if fromaddr is not None:
|
||||
from_addr = fromaddr
|
||||
|
||||
recipients = [_f for _f in recipients if _f]
|
||||
if recipients:
|
||||
LOG.info(f"Sending email to: {recipients!r}")
|
||||
relay = conf.relay_params()
|
||||
smtp = smtplib.SMTP(relay[0], relay[1])
|
||||
if conf.flag_enabled('relay', 'starttls'):
|
||||
smtp.starttls()
|
||||
smtp.sendmail(from_addr, recipients, message)
|
||||
else:
|
||||
LOG.info("No recipient found")
|
||||
|
||||
|
||||
def _is_encrypted(raw_message: email.message.Message):
|
||||
if raw_message.get_content_type() == 'multipart/encrypted':
|
||||
return True
|
||||
|
|
|
@ -43,7 +43,7 @@ class MailEncryptionProxy:
|
|||
for operation in gate.delivery_plan(envelope.rcpt_tos, message, keys):
|
||||
LOG.debug(f"Sending mail via {operation!r}")
|
||||
new_message = operation.perform(message)
|
||||
gate.send_msg(new_message, operation.recipients(), envelope.mail_from)
|
||||
gate.send_msg_bytes(new_message, operation.recipients(), envelope.mail_from)
|
||||
except TypeError as te:
|
||||
LOG.exception("Got exception while processing", exc_info=te)
|
||||
return RESULT_ERROR
|
||||
|
|
|
@ -16,6 +16,7 @@ There are 3 operations available:
|
|||
import logging
|
||||
import lacre.core as core
|
||||
from email.message import Message
|
||||
from email.policy import SMTP
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
@ -28,7 +29,7 @@ class MailOperation:
|
|||
"""Initialise the operation with a recipient."""
|
||||
self._recipients = recipients
|
||||
|
||||
def perform(self, message: Message):
|
||||
def perform(self, message: Message) -> bytes:
|
||||
"""Perform this operation on MESSAGE.
|
||||
|
||||
Return target message.
|
||||
|
@ -69,12 +70,12 @@ class InlineOpenPGPEncrypt(OpenPGPEncrypt):
|
|||
"""Initialise strategy object."""
|
||||
super().__init__(recipients, keys, keyhome)
|
||||
|
||||
def perform(self, msg: Message):
|
||||
def perform(self, msg: Message) -> bytes:
|
||||
"""Encrypt with PGP Inline."""
|
||||
LOG.debug('Sending PGP/Inline...')
|
||||
return core._gpg_encrypt_and_return(msg,
|
||||
self._keys, self._recipients,
|
||||
core._encrypt_all_payloads_inline)
|
||||
return core._gpg_encrypt_to_bytes(msg,
|
||||
self._keys, self._recipients,
|
||||
core._encrypt_all_payloads_inline)
|
||||
|
||||
|
||||
class MimeOpenPGPEncrypt(OpenPGPEncrypt):
|
||||
|
@ -84,12 +85,12 @@ class MimeOpenPGPEncrypt(OpenPGPEncrypt):
|
|||
"""Initialise strategy object."""
|
||||
super().__init__(recipients, keys, keyhome)
|
||||
|
||||
def perform(self, msg: Message):
|
||||
def perform(self, msg: Message) -> bytes:
|
||||
"""Encrypt with PGP MIME."""
|
||||
LOG.debug('Sending PGP/MIME...')
|
||||
return core._gpg_encrypt_and_return(msg,
|
||||
self._keys, self._recipients,
|
||||
core._encrypt_all_payloads_mime)
|
||||
return core._gpg_encrypt_to_bytes(msg,
|
||||
self._keys, self._recipients,
|
||||
core._encrypt_all_payloads_mime)
|
||||
|
||||
|
||||
class SMimeEncrypt(MailOperation):
|
||||
|
@ -101,10 +102,10 @@ class SMimeEncrypt(MailOperation):
|
|||
self._email = email
|
||||
self._cert = certificate
|
||||
|
||||
def perform(self, message: Message):
|
||||
def perform(self, message: Message) -> bytes:
|
||||
"""Encrypt with a certificate."""
|
||||
LOG.warning(f"Delivering clear-text to {self._recipients}")
|
||||
return message
|
||||
return message.as_bytes(policy=SMTP)
|
||||
|
||||
def __repr__(self):
|
||||
"""Generate a representation with just method and key."""
|
||||
|
@ -121,9 +122,9 @@ class KeepIntact(MailOperation):
|
|||
"""Initialise pass-through operation for a given recipient."""
|
||||
super().__init__(recipients)
|
||||
|
||||
def perform(self, message: Message):
|
||||
def perform(self, message: Message) -> bytes:
|
||||
"""Return MESSAGE unmodified."""
|
||||
return message.as_string()
|
||||
return message.as_bytes(policy=SMTP)
|
||||
|
||||
def __repr__(self):
|
||||
"""Return representation with just method and email."""
|
||||
|
|
|
@ -92,13 +92,13 @@ in: test/msgin/with-markers2clear.msg
|
|||
out-not: This message includes inline PGP markers.
|
||||
|
||||
[case-10]
|
||||
descr: UTF-8 message
|
||||
descr: UTF-8 message (yields Base64)
|
||||
to: carlos@disposlab
|
||||
in: test/msgin/utf8.msg
|
||||
out: ŁĄCZNOŚĆ
|
||||
out: xYHEhENaTk/FmsSGLiBaYcW6w7PFgsSHIGfEmcWbbMSFIGphxbrFhC4=
|
||||
|
||||
[case-11]
|
||||
descr: Non-ASCII message (ISO-8859-2)
|
||||
descr: Non-ASCII message (ISO-8859-2; yields quoted-printable)
|
||||
to: carlos@disposlab
|
||||
in: test/msgin/nonascii.msg
|
||||
out: ŁĄCZNOŚĆ
|
||||
out: =A3=A1CZNO=A6=C6.
|
||||
|
|
|
@ -2,20 +2,25 @@ import logging
|
|||
import smtplib
|
||||
import sys
|
||||
import getopt
|
||||
from email import policy, message_from_binary_file
|
||||
from email import message_from_binary_file
|
||||
from email.policy import SMTPUTF8
|
||||
|
||||
|
||||
def _load_file(name):
|
||||
def _load_file(name) -> bytes:
|
||||
with open(name, 'rb') as f:
|
||||
return message_from_binary_file(f, policy=policy.SMTP)
|
||||
return f.read()
|
||||
|
||||
|
||||
def _send(host, port, from_addr, recipients, message):
|
||||
def _load_message(name):
|
||||
with open(name, 'rb') as f:
|
||||
return message_from_binary_file(f, policy=SMTPUTF8)
|
||||
|
||||
|
||||
def _send_message(host, port, from_addr, recipients, message):
|
||||
logging.info(f"From {from_addr} to {recipients} at {host}:{port}")
|
||||
try:
|
||||
smtp = smtplib.SMTP(host, port)
|
||||
# smtp.starttls()
|
||||
return smtp.sendmail(from_addr, recipients, message.as_bytes(policy=policy.SMTP))
|
||||
return smtp.sendmail(from_addr, recipients, message.as_bytes())
|
||||
except smtplib.SMTPDataError as e:
|
||||
logging.error(f"Couldn't deliver message. Got error: {e}")
|
||||
return None
|
||||
|
@ -27,6 +32,21 @@ def _send(host, port, from_addr, recipients, message):
|
|||
return None
|
||||
|
||||
|
||||
# The poinf of this function is to do _almost_ what SMTP.sendmail does, but
|
||||
# without enforcing ASCII. We want to test Lacre with not necessarily valid
|
||||
# messages.
|
||||
def _send_bytes(host: str, port, from_addr: str, recipients, message: bytes):
|
||||
try:
|
||||
smtp = smtplib.SMTP(host, port)
|
||||
smtp.ehlo_or_helo_if_needed()
|
||||
smtp.mail(from_addr)
|
||||
for r in recipients:
|
||||
smtp.rcpt(r)
|
||||
smtp.data(message)
|
||||
except:
|
||||
logging.exception('Unexpected exception was thrown')
|
||||
|
||||
|
||||
logging.basicConfig(filename="test/logs/sendmail.log",
|
||||
format="%(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
|
@ -49,4 +69,4 @@ for opt, value in opts:
|
|||
if message is None or sender is None or recipient is None:
|
||||
print('Use options to provide: -f sender -t recipient -m message')
|
||||
|
||||
_send('localhost', 10025, sender, [recipient], message)
|
||||
_send_bytes('localhost', 10025, sender, [recipient], message)
|
||||
|
|
Loading…
Reference in a new issue