Add header-only parser and minor test improvements

This commit is contained in:
Piotr F. Mieszkowski 2024-05-24 14:22:55 +02:00
parent 474d20f32b
commit 4977185ba1
Signed by: pfm
GPG key ID: BDE5BC1FA5DC53D5
3 changed files with 26 additions and 9 deletions

View file

@ -15,13 +15,18 @@ There are 3 operations available:
import logging
import lacre.core as core
from email.message import Message
from email.message import Message, EmailMessage
from email.parser import BytesHeaderParser
from email.policy import SMTP, SMTPUTF8
LOG = logging.getLogger(__name__)
def parse_headers(message: bytes) -> EmailMessage:
return BytesHeaderParser(policy=SMTPUTF8).parsebytes(message)
class MailSerialisationException(BaseException):
"""We can't turn an EmailMessage into sequence of bytes."""
pass

View file

@ -242,6 +242,7 @@ class EmailParsingTest(unittest.TestCase):
self.assertIsNone(msg_headers_only.get_body())
self.assertEqual(msg_headers_only.get_payload(), message_body)
self.assertRaises(KeyError, lambda: msg_headers_only.get_content())
self.assertFalse(msg_headers_only.is_multipart())
def test_headersonly_multipart_alternative(self):
rawmsg = b"From: eva@lacre.io\r\n" \
@ -282,6 +283,7 @@ class EmailParsingTest(unittest.TestCase):
self.assertIsNone(msg_headers_only.get_body())
self.assertEqual(msg_headers_only.get_payload(), message_body)
self.assertRaises(KeyError, lambda: msg_headers_only.get_content())
self.assertFalse(msg_headers_only.is_multipart())
class EmailTest(unittest.TestCase):

View file

@ -2,6 +2,7 @@ import logging
import smtplib
import sys
import getopt
from contextlib import contextmanager
from email import message_from_binary_file
from email.policy import SMTPUTF8
@ -16,11 +17,20 @@ def _load_message(name):
return message_from_binary_file(f, policy=SMTPUTF8)
@contextmanager
def smtp_connection(host, port):
smtp = smtplib.SMTP(host, port)
try:
yield smtp
finally:
smtp.close()
def _send_message(host, port, from_addr, recipients, message):
logging.info(f"From {from_addr} to {recipients} at {host}:{port}")
try:
smtp = smtplib.SMTP(host, port)
return smtp.sendmail(from_addr, recipients, message.as_bytes())
with smtp_connection(host, port) as smtp:
return smtp.sendmail(from_addr, recipients, message.as_bytes())
except smtplib.SMTPDataError as e:
logging.error(f"Couldn't deliver message. Got error: {e}")
return None
@ -37,12 +47,12 @@ def _send_message(host, port, from_addr, recipients, message):
# messages.
def _send_bytes(host: str, port, from_addr: str, recipients, message: bytes):
try:
smtp = smtplib.SMTP(host, port)
smtp.ehlo_or_helo_if_needed()
smtp.mail(from_addr)
for r in recipients:
smtp.rcpt(r)
smtp.data(message)
with smtp_connection(host, port) as smtp:
smtp.ehlo_or_helo_if_needed()
smtp.mail(from_addr)
for r in recipients:
smtp.rcpt(r)
smtp.data(message)
except:
logging.exception('Unexpected exception was thrown')