Merge pull request 'Fix encoding issues' (#123) from post-test-fixes into main

Reviewed-on: #123
This commit is contained in:
pfm 2023-05-11 20:22:23 +00:00
commit 07fb8d6ae8
30 changed files with 1605 additions and 680 deletions

View File

@ -37,8 +37,23 @@ POS_FINGERPRINT = 9
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class EncryptionException(Exception):
"""Represents a failure to encrypt a payload."""
def __init__(self, issue: str, recipient: str, cause: str):
"""Initialise an exception."""
self._issue = issue
self._recipient = recipient
self._cause = cause
def __str__(self):
"""Return human-readable string representation."""
return f"issue: {self._issue}; to: {self._recipient}; cause: {self._cause}"
def _build_command(key_home, *args, **kwargs): def _build_command(key_home, *args, **kwargs):
cmd = ["gpg", '--homedir', key_home] + list(args) cmd = ["gpg", '--homedir', key_home]
cmd.extend(args)
return cmd return cmd
@ -130,6 +145,7 @@ def delete_key(keyhome, email):
if result[1]: if result[1]:
# delete all keys matching this email address # delete all keys matching this email address
p = subprocess.Popen(_build_command(keyhome, '--delete-key', '--batch', '--yes', result[1]), stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE) p = subprocess.Popen(_build_command(keyhome, '--delete-key', '--batch', '--yes', result[1]), stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.communicate()
p.wait() p.wait()
return True return True
@ -142,7 +158,7 @@ class GPGEncryptor:
def __init__(self, keyhome, recipients=None, charset=None): def __init__(self, keyhome, recipients=None, charset=None):
"""Initialise the wrapper.""" """Initialise the wrapper."""
self._keyhome = keyhome self._keyhome = keyhome
self._message = b'' self._message = None
self._recipients = list() self._recipients = list()
self._charset = charset self._charset = charset
if recipients is not None: if recipients is not None:
@ -150,16 +166,39 @@ class GPGEncryptor:
def update(self, message): def update(self, message):
"""Append MESSAGE to buffer about to be encrypted.""" """Append MESSAGE to buffer about to be encrypted."""
self._message += message if self._message is None:
self._message = message
else:
self._message += message
def encrypt(self): def encrypt(self):
"""Feed GnuPG with the message.""" """Feed GnuPG with the message."""
p = subprocess.Popen(self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) p = self._popen()
encdata = p.communicate(input=self._message)[0] encdata, err = p.communicate(input=self._message)
if p.returncode != 0:
LOG.debug('Errors: %s', err)
details = parse_status(err)
raise EncryptionException(details['issue'], details['recipient'], details['cause'])
return (encdata, p.returncode) return (encdata, p.returncode)
def _popen(self):
if self._charset:
return subprocess.Popen(self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding=self._charset)
else:
return subprocess.Popen(self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def _command(self): def _command(self):
cmd = _build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--pgp7", "--no-secmem-warning", "-a", "-e") cmd = _build_command(self._keyhome,
"--trust-model", "always",
"--status-fd", "2",
"--batch",
"--yes",
"--pgp7",
"--no-secmem-warning",
"-a", "-e")
# add recipients # add recipients
for recipient in self._recipients: for recipient in self._recipients:
@ -171,7 +210,7 @@ class GPGEncryptor:
cmd.append("--comment") cmd.append("--comment")
cmd.append('Charset: ' + self._charset) cmd.append('Charset: ' + self._charset)
LOG.debug(f'Built command: {cmd!r}') LOG.debug('Built command: %s', cmd)
return cmd return cmd
@ -195,3 +234,62 @@ class GPGDecryptor:
def _command(self): def _command(self):
return _build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--no-secmem-warning", "-a", "-d") return _build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--no-secmem-warning", "-a", "-d")
STATUS_FD_PREFIX = b'[GNUPG:] '
STATUS_FD_PREFIX_LEN = len(STATUS_FD_PREFIX)
KEY_EXPIRED = b'KEYEXPIRED'
KEY_REVOKED = b'KEYREVOKED'
NO_RECIPIENTS = b'NO_RECP'
INVALID_RECIPIENT = b'INV_RECP'
# INV_RECP reason code descriptions.
INVALID_RECIPIENT_CAUSES = [
'No specific reason given',
'Not Found',
'Ambiguous specification',
'Wrong key usage',
'Key revoked',
'Key expired',
'No CRL known',
'CRL too old',
'Policy mismatch',
'Not a secret key',
'Key not trusted',
'Missing certificate',
'Missing issuer certificate',
'Key disabled',
'Syntax error in specification'
]
def parse_status(status_buffer: str) -> dict:
"""Parse --status-fd output and return important information."""
return parse_status_lines(status_buffer.splitlines())
def parse_status_lines(lines: list) -> dict:
"""Parse --status-fd output and return important information."""
result = {'issue': 'n/a', 'recipient': 'n/a', 'cause': 'Unknown'}
LOG.debug('Processing stderr lines %s', lines)
for line in lines:
LOG.debug('At gnupg stderr line %s', line)
if not line.startswith(STATUS_FD_PREFIX):
continue
if line.startswith(KEY_EXPIRED, STATUS_FD_PREFIX_LEN):
result['issue'] = KEY_EXPIRED
elif line.startswith(KEY_REVOKED, STATUS_FD_PREFIX_LEN):
result['issue'] = KEY_REVOKED
elif line.startswith(NO_RECIPIENTS, STATUS_FD_PREFIX_LEN):
result['issue'] = NO_RECIPIENTS
elif line.startswith(INVALID_RECIPIENT, STATUS_FD_PREFIX_LEN):
words = line.split(b' ')
reason_code = int(words[2])
result['recipient'] = words[3]
result['cause'] = INVALID_RECIPIENT_CAUSES[reason_code]
return result

View File

@ -59,6 +59,15 @@ config = /etc/gpg-lacre-logging.conf
host = 127.0.0.1 host = 127.0.0.1
port = 10025 port = 10025
# Maximum size (in bytes) of message body, i.e. data provided after DATA
# message. Following value comes from aiosmtpd module's default for this
# setting.
max_data_bytes = 33554432
# Sometimes it may make sense to log additional information from mail headers.
# This should never be PII, but information like encoding, content types, etc.
log_headers = no
[relay] [relay]
# the relay settings to use for Postfix # the relay settings to use for Postfix
# gpg-mailgate will submit email to this relay after it is done processing # gpg-mailgate will submit email to this relay after it is done processing

View File

@ -19,6 +19,7 @@
# #
import email import email
from email.policy import SMTPUTF8
import sys import sys
import time import time
import logging import logging
@ -40,15 +41,29 @@ if missing_params:
LOG.error(f"Aborting delivery! Following mandatory config parameters are missing: {missing_params!r}") LOG.error(f"Aborting delivery! Following mandatory config parameters are missing: {missing_params!r}")
sys.exit(lacre.EX_CONFIG) sys.exit(lacre.EX_CONFIG)
# Read e-mail from stdin, parse it delivered = False
raw = sys.stdin.read() try:
raw_message = email.message_from_string(raw) # Read e-mail from stdin, parse it
from_addr = raw_message['From'] raw = sys.stdin.read()
# Read recipients from the command-line raw_message = email.message_from_string(raw, policy=SMTPUTF8)
to_addrs = sys.argv[1:] from_addr = raw_message['From']
# Read recipients from the command-line
to_addrs = sys.argv[1:]
# Let's start # Let's start
core.deliver_message(raw_message, from_addr, to_addrs) core.deliver_message(raw_message, from_addr, to_addrs)
process_t = (time.process_time() - start) * 1000 process_t = (time.process_time() - start) * 1000
LOG.info("Message delivered in {process:.2f} ms".format(process=process_t)) LOG.info("Message delivered in {process:.2f} ms".format(process=process_t))
delivered = True
except:
LOG.exception('Could not handle message')
if not delivered:
# It seems we weren't able to deliver the message. In case it was some
# silly message-encoding issue that shouldn't bounce the message, we just
# try recoding the message body and delivering it.
try:
core.failover_delivery(raw_message, to_addrs, from_addr)
except:
LOG.exception('Failover delivery failed too')

View File

@ -26,21 +26,20 @@ module.
from email.mime.multipart import MIMEMultipart from email.mime.multipart import MIMEMultipart
import copy import copy
import email import email
import email.message from email.message import EmailMessage, MIMEPart
import email.utils import email.utils
from email.policy import SMTPUTF8
import GnuPG import GnuPG
import os
import smtplib
import sys
import asyncio import asyncio
from typing import Tuple
# imports for S/MIME
from M2Crypto import BIO, SMIME, X509
import logging import logging
import lacre.text as text import lacre.text as text
import lacre.config as conf import lacre.config as conf
import lacre.keyring as kcache import lacre.keyring as kcache
import lacre.recipients as recpt
import lacre.smime as smime
from lacre.transport import send_msg, register_sender, SendFrom
from lacre.mailop import KeepIntact, InlineOpenPGPEncrypt, MimeOpenPGPEncrypt from lacre.mailop import KeepIntact, InlineOpenPGPEncrypt, MimeOpenPGPEncrypt
@ -52,50 +51,49 @@ def _gpg_encrypt(raw_message, recipients):
LOG.error("No valid entry for gpg keyhome. Encryption aborted.") LOG.error("No valid entry for gpg keyhome. Encryption aborted.")
return recipients return recipients
gpg_to, ungpg_to = _identify_gpg_recipients(recipients, _load_keys()) gpg_recipients, cleartext_recipients = \
recpt.identify_gpg_recipients(recipients, kcache.freeze_and_load_keys())
LOG.info(f"Got addresses: gpg_to={gpg_to!r}, ungpg_to={ungpg_to!r}") LOG.info(f"Got addresses: gpg_to={gpg_recipients!r}, ungpg_to={cleartext_recipients!r}")
if gpg_to: if gpg_recipients:
LOG.info("Encrypting email to: %s" % ' '.join(x.email() for x in gpg_to)) LOG.info("Encrypting email to: %s", gpg_recipients)
gpg_to_smtp_mime, gpg_to_cmdline_mime, \ mime, inline = _sort_gpg_recipients(gpg_recipients)
gpg_to_smtp_inline, gpg_to_cmdline_inline = \
_sort_gpg_recipients(gpg_to)
if gpg_to_smtp_mime: if mime:
# Encrypt mail with PGP/MIME # Encrypt mail with PGP/MIME
_gpg_encrypt_and_deliver(raw_message, _gpg_encrypt_and_deliver(raw_message,
gpg_to_cmdline_mime, gpg_to_smtp_mime, mime.keys(), mime.emails(),
_encrypt_all_payloads_mime) _encrypt_all_payloads_mime)
if gpg_to_smtp_inline: if inline:
# Encrypt mail with PGP/INLINE # Encrypt mail with PGP/INLINE
_gpg_encrypt_and_deliver(raw_message, _gpg_encrypt_and_deliver(raw_message,
gpg_to_cmdline_inline, gpg_to_smtp_inline, inline.keys(), inline.emails(),
_encrypt_all_payloads_inline) _encrypt_all_payloads_inline)
LOG.info(f"Not processed emails: {ungpg_to}") LOG.info('Not processed emails: %s', cleartext_recipients)
return ungpg_to return cleartext_recipients
def _sort_gpg_recipients(gpg_to): def _sort_gpg_recipients(gpg_to) -> Tuple[recpt.RecipientList, recpt.RecipientList]:
gpg_to_smtp_mime = list() recipients_mime = list()
gpg_to_cmdline_mime = list() keys_mime = list()
gpg_to_smtp_inline = list() recipients_inline = list()
gpg_to_cmdline_inline = list() keys_inline = list()
default_to_pgp_mime = conf.config_item_equals('default', 'mime_conversion', 'yes') default_to_pgp_mime = conf.flag_enabled('default', 'mime_conversion')
for rcpt in gpg_to: for rcpt in gpg_to:
# Checking pre defined styles in settings first # Checking pre defined styles in settings first
if conf.config_item_equals('pgp_style', rcpt.email(), 'mime'): if conf.config_item_equals('pgp_style', rcpt.email(), 'mime'):
gpg_to_smtp_mime.append(rcpt.email()) recipients_mime.append(rcpt.email())
gpg_to_cmdline_mime.extend(rcpt.key().split(',')) keys_mime.extend(rcpt.key().split(','))
elif conf.config_item_equals('pgp_style', rcpt.email(), 'inline'): elif conf.config_item_equals('pgp_style', rcpt.email(), 'inline'):
gpg_to_smtp_inline.append(rcpt.email()) recipients_inline.append(rcpt.email())
gpg_to_cmdline_inline.extend(rcpt.key().split(',')) keys_inline.extend(rcpt.key().split(','))
else: else:
# Log message only if an unknown style is defined # Log message only if an unknown style is defined
if conf.config_item_set('pgp_style', rcpt.email()): if conf.config_item_set('pgp_style', rcpt.email()):
@ -104,172 +102,49 @@ def _sort_gpg_recipients(gpg_to):
# If no style is in settings defined for recipient, use default from settings # If no style is in settings defined for recipient, use default from settings
if default_to_pgp_mime: if default_to_pgp_mime:
gpg_to_smtp_mime.append(rcpt.email()) recipients_mime.append(rcpt.email())
gpg_to_cmdline_mime.extend(rcpt.key().split(',')) keys_mime.extend(rcpt.key().split(','))
else: else:
gpg_to_smtp_inline.append(rcpt.email()) recipients_inline.append(rcpt.email())
gpg_to_cmdline_inline.extend(rcpt.key().split(',')) keys_inline.extend(rcpt.key().split(','))
return gpg_to_smtp_mime, gpg_to_cmdline_mime, gpg_to_smtp_inline, gpg_to_cmdline_inline mime = recpt.RecipientList(recipients_mime, keys_mime)
inline = recpt.RecipientList(recipients_inline, keys_inline)
LOG.debug('Loaded recipients: MIME %s; Inline %s', repr(mime), repr(inline))
return mime, inline
def _gpg_encrypt_and_return(message, cmdline, to, encrypt_f) -> str: def _gpg_encrypt_copy(message: EmailMessage, keys, recipients, encrypt_f):
msg_copy = copy.deepcopy(message) msg_copy = copy.deepcopy(message)
_customise_headers(msg_copy) _customise_headers(msg_copy)
encrypted_payloads = encrypt_f(msg_copy, cmdline) encrypted_payloads = encrypt_f(msg_copy, keys)
msg_copy.set_payload(encrypted_payloads) msg_copy.set_payload(encrypted_payloads)
return msg_copy.as_string() return msg_copy
def _gpg_encrypt_and_deliver(message, cmdline, to, encrypt_f): def _gpg_encrypt_to_bytes(message: EmailMessage, keys, recipients, encrypt_f) -> bytes:
out = _gpg_encrypt_and_return(message, cmdline, to, encrypt_f) msg_copy = _gpg_encrypt_copy(message, keys, recipients, encrypt_f)
send_msg(out, to) return msg_copy.as_bytes(policy=SMTPUTF8)
def _customise_headers(msg_copy): def _gpg_encrypt_to_str(message: EmailMessage, keys, recipients, encrypt_f) -> str:
if conf.config_item_equals('default', 'add_header', 'yes'): msg_copy = _gpg_encrypt_copy(message, keys, recipients, encrypt_f)
msg_copy['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate' return msg_copy.as_string(policy=SMTPUTF8)
if 'Content-Transfer-Encoding' in msg_copy:
msg_copy.replace_header('Content-Transfer-Encoding', '8BIT')
else:
msg_copy['Content-Transfer-Encoding'] = '8BIT'
def _load_keys(): def _gpg_encrypt_and_deliver(message: EmailMessage, keys, recipients, encrypt_f):
"""Return a map from a key's fingerprint to email address.""" out = _gpg_encrypt_to_str(message, keys, recipients, encrypt_f)
keyring = kcache.KeyRing(conf.get_item('gpg', 'keyhome')) send_msg(out, recipients)
return asyncio.run(keyring.freeze_identities())
class GpgRecipient: def _customise_headers(message: EmailMessage):
"""A tuple-like object that contains GPG recipient data.""" if conf.flag_enabled('default', 'add_header'):
message['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
def __init__(self, left, right):
"""Initialise a tuple-like object that contains GPG recipient data."""
self._left = left
self._right = right
def __getitem__(self, index):
"""Pretend this object is a tuple by returning an indexed tuple element."""
if index == 0:
return self._left
elif index == 1:
return self._right
else:
raise IndexError()
def __repr__(self):
"""Return textual representation of this GPG Recipient."""
return f"GpgRecipient({self._left!r}, {self._right!r})"
def email(self):
"""Return this recipient's email address."""
return self._left
def key(self):
"""Return this recipient's key ID."""
return self._right
def _identify_gpg_recipients(recipients, keys: kcache.KeyCache): def _encrypt_all_payloads_inline(message: EmailMessage, gpg_to_cmdline):
# This list will be filled with pairs (M, N), where M is the destination
# address we're going to deliver the message to and N is the identity we're
# going to encrypt it for.
gpg_to = list()
# This will be the list of recipients that haven't provided us with their
# public keys.
ungpg_to = list()
# In "strict mode", only keys included in configuration are used to encrypt
# email.
strict_mode = conf.strict_mode()
# GnuPG keys found in our keyring.
for to in recipients:
own_key = _try_configured_key(to, keys)
if own_key is not None:
gpg_to.append(GpgRecipient(own_key[0], own_key[1]))
continue
direct_key = _try_direct_key_lookup(to, keys, strict_mode)
if direct_key is not None:
gpg_to.append(GpgRecipient(direct_key[0], direct_key[1]))
continue
domain_key = _try_configured_domain_key(to, keys)
if domain_key is not None:
gpg_to.append(GpgRecipient(domain_key[0], domain_key[1]))
continue
ungpg_to.append(to)
LOG.debug(f'Collected recipients; GPG: {gpg_to}; UnGPG: {ungpg_to}')
return gpg_to, ungpg_to
def _find_key(recipient, keys, strict_mode):
own_key = _try_configured_key(recipient, keys)
if own_key is not None:
return own_key
direct_key = _try_direct_key_lookup(recipient, keys, strict_mode)
if direct_key is not None:
return direct_key
domain_key = _try_configured_domain_key(recipient, keys)
if domain_key is not None:
return domain_key
return None
def _try_configured_key(recipient, keys):
if conf.config_item_set('enc_keymap', recipient):
key = conf.get_item('enc_keymap', recipient)
if key in keys:
LOG.debug(f"Found key {key} configured for {recipient}")
return (recipient, key)
LOG.debug(f"No configured key found for {recipient}")
return None
def _try_direct_key_lookup(recipient, keys, strict_mode):
if strict_mode:
return None
if keys.has_email(recipient):
LOG.info(f"Found key for {recipient}")
return recipient, recipient
(newto, topic) = text.parse_delimiter(recipient)
if keys.has_email(newto):
LOG.info(f"Found key for {newto}, stripped {recipient}")
return recipient, newto
return None
def _try_configured_domain_key(recipient, keys):
parts = recipient.split('@')
if len(parts) != 2:
return None
domain = parts[1]
if conf.config_item_set('enc_domain_keymap', domain):
domain_key = conf.get_item('enc_domain_keymap', domain)
if domain_key in keys:
LOG.debug(f"Found domain key {domain_key} for {recipient}")
return recipient, domain_key
LOG.debug(f"No domain key for {recipient}")
return None
def _encrypt_all_payloads_inline(message, gpg_to_cmdline):
# This breaks cascaded MIME messages. Blame PGP/INLINE. # This breaks cascaded MIME messages. Blame PGP/INLINE.
encrypted_payloads = list() encrypted_payloads = list()
@ -285,172 +160,125 @@ def _encrypt_all_payloads_inline(message, gpg_to_cmdline):
return encrypted_payloads return encrypted_payloads
def _encrypt_all_payloads_mime(message: email.message.Message, gpg_to_cmdline): def _encrypt_all_payloads_mime(message: EmailMessage, gpg_to_cmdline):
# Convert a plain text email into PGP/MIME attachment style. Modeled after enigmail. # Convert a plain text email into PGP/MIME attachment style. Modeled after enigmail.
pgp_ver_part = email.message.Message() pgp_ver_part = MIMEPart()
pgp_ver_part.set_payload("Version: 1"+text.EOL) pgp_ver_part.set_content('Version: 1' + text.EOL_S)
pgp_ver_part.set_type("application/pgp-encrypted") pgp_ver_part.set_type("application/pgp-encrypted")
pgp_ver_part.set_param('PGP/MIME version identification', "", 'Content-Description') pgp_ver_part.set_param('PGP/MIME version identification', "", 'Content-Description')
encrypted_part = email.message.Message() encrypted_part = MIMEPart()
encrypted_part.set_type("application/octet-stream") encrypted_part.set_type("application/octet-stream")
encrypted_part.set_param('name', "encrypted.asc") encrypted_part.set_param('name', "encrypted.asc")
encrypted_part.set_param('OpenPGP encrypted message', "", 'Content-Description') encrypted_part.set_param('OpenPGP encrypted message', "", 'Content-Description')
encrypted_part.set_param('inline', "", 'Content-Disposition') encrypted_part.set_param('inline', "", 'Content-Disposition')
encrypted_part.set_param('filename', "encrypted.asc", 'Content-Disposition') encrypted_part.set_param('filename', "encrypted.asc", 'Content-Disposition')
message.preamble = "This is an OpenPGP/MIME encrypted message (RFC 2440 and 3156)"
boundary = _make_boundary()
if isinstance(message.get_payload(), str): if isinstance(message.get_payload(), str):
# WTF! It seems to swallow the first line. Not sure why. Perhaps LOG.debug('Rewrapping a flat, text-only message')
# it's skipping an imaginary blank line someplace. (ie skipping a header) wrapped_payload = _rewrap_payload(message)
# Workaround it here by prepending a blank line. encrypted_part.set_payload(wrapped_payload.as_string())
# This happens only on text only messages.
additionalSubHeader = "" _set_type_and_boundary(message, boundary)
encoding = sys.getdefaultencoding()
if 'Content-Type' in message and not message['Content-Type'].startswith('multipart'):
additionalSubHeader = "Content-Type: " + message['Content-Type'] + text.EOL
encoding = message.get_content_charset(sys.getdefaultencoding())
LOG.debug(f"Identified encoding as {encoding}")
encrypted_part.set_payload(additionalSubHeader+text.EOL + message.get_payload(decode=True).decode(encoding))
check_nested = True check_nested = True
else: else:
processed_payloads = _generate_message_from_payloads(message) processed_payloads = _generate_message_from_payloads(message)
encrypted_part.set_payload(processed_payloads.as_string()) encrypted_part.set_payload(processed_payloads.as_string())
_set_type_and_boundary(message, boundary)
check_nested = False check_nested = False
message.preamble = "This is an OpenPGP/MIME encrypted message (RFC 2440 and 3156)"
# Use this just to generate a MIME boundary string.
junk_msg = MIMEMultipart()
junk_str = junk_msg.as_string() # WTF! Without this, get_boundary() will return 'None'!
boundary = junk_msg.get_boundary()
# This also modifies the boundary in the body of the message, ie it gets parsed.
if 'Content-Type' in message:
message.replace_header('Content-Type', f"multipart/encrypted; protocol=\"application/pgp-encrypted\"; boundary=\"{boundary}\""+text.EOL)
else:
message['Content-Type'] = f"multipart/encrypted; protocol=\"application/pgp-encrypted\"; boundary=\"{boundary}\""+text.EOL
return [pgp_ver_part, _encrypt_payload(encrypted_part, gpg_to_cmdline, check_nested)] return [pgp_ver_part, _encrypt_payload(encrypted_part, gpg_to_cmdline, check_nested)]
def _encrypt_payload(payload, gpg_to_cmdline, check_nested=True): def _rewrap_payload(message: EmailMessage) -> MIMEPart:
# In PGP/MIME (RFC 3156), the payload has to be a valid MIME entity. In
# other words, we need to wrap text/* message's payload in a new MIME
# entity.
wrapper = MIMEPart(policy=SMTPUTF8)
content = message.get_content()
wrapper.set_content(content)
wrapper.set_type(message.get_content_type())
# Copy all Content-Type parameters.
for (pname, pvalue) in message.get_params():
# Skip MIME type that's also returned by get_params().
if not '/' in pname:
wrapper.set_param(pname, pvalue)
return wrapper
def _make_boundary():
junk_msg = MIMEMultipart()
# XXX See EmailTest.test_boundary_generated_after_as_string_call.
_ = junk_msg.as_string()
return junk_msg.get_boundary()
def _set_type_and_boundary(message: EmailMessage, boundary):
message.set_type('multipart/encrypted')
message.set_param('protocol', 'application/pgp-encrypted')
message.set_param('boundary', boundary)
def _encrypt_payload(payload: EmailMessage, recipients, check_nested=True, **kwargs):
raw_payload = payload.get_payload(decode=True) raw_payload = payload.get_payload(decode=True)
LOG.debug('About to encrypt raw payload: %s', raw_payload)
LOG.debug('Original message: %s', payload)
if check_nested and text.is_payload_pgp_inline(raw_payload): if check_nested and text.is_payload_pgp_inline(raw_payload):
LOG.debug("Message is already pgp encrypted. No nested encryption needed.") LOG.debug("Message is already pgp encrypted. No nested encryption needed.")
return payload return payload
# No check is needed for conf.get_item('gpg', 'keyhome') as this is already gpg = _make_encryptor(raw_payload, recipients)
# done in method gpg_encrypt
gpg = GnuPG.GPGEncryptor(conf.get_item('gpg', 'keyhome'), gpg_to_cmdline,
payload.get_content_charset())
gpg.update(raw_payload) gpg.update(raw_payload)
encrypted_data, returncode = gpg.encrypt() encrypted_data, exit_code = gpg.encrypt()
LOG.debug("Return code from encryption=%d (0 indicates success)." % returncode)
if returncode != 0:
LOG.info("Encrytion failed with return code %d. Encryption aborted." % returncode)
return payload
payload.set_payload(encrypted_data) payload.set_payload(encrypted_data)
isAttachment = payload.get_param('attachment', None, 'Content-Disposition') is not None isAttachment = payload.get_param('attachment', None, 'Content-Disposition') is not None
if isAttachment: if isAttachment:
filename = payload.get_filename() _append_gpg_extension(payload)
if filename:
pgpFilename = filename + ".pgp"
if not (payload.get('Content-Disposition') is None):
payload.set_param('filename', pgpFilename, 'Content-Disposition')
if not (payload.get('Content-Type') is None) and not (payload.get_param('name') is None):
payload.set_param('name', pgpFilename)
if not (payload.get('Content-Transfer-Encoding') is None):
payload.replace_header('Content-Transfer-Encoding', "7bit")
return payload return payload
def _smime_encrypt(raw_message, recipients): def _make_encryptor(raw_data, recipients):
global LOG # No check is needed for conf.get_item('gpg', 'keyhome') as this is already
global from_addr # done in method gpg_encrypt
keyhome = conf.get_item('gpg', 'keyhome')
if not conf.config_item_set('smime', 'cert_path'): if isinstance(raw_data, str):
LOG.info("No valid path for S/MIME certs found in config file. S/MIME encryption aborted.") return GnuPG.GPGEncryptor(keyhome, recipients, 'utf-8')
return recipients
cert_path = conf.get_item('smime', 'cert_path')+"/"
s = SMIME.SMIME()
sk = X509.X509_Stack()
smime_to = list()
unsmime_to = list()
for addr in recipients:
cert_and_email = _get_cert_for_email(addr, cert_path)
if not (cert_and_email is None):
(to_cert, normal_email) = cert_and_email
LOG.debug("Found cert " + to_cert + " for " + addr + ": " + normal_email)
smime_to.append(addr)
x509 = X509.load_cert(to_cert, format=X509.FORMAT_PEM)
sk.push(x509)
else:
unsmime_to.append(addr)
if smime_to:
s.set_x509_stack(sk)
s.set_cipher(SMIME.Cipher('aes_192_cbc'))
p7 = s.encrypt(BIO.MemoryBuffer(raw_message.as_string()))
# Output p7 in mail-friendly format.
out = BIO.MemoryBuffer()
out.write('From: ' + from_addr + text.EOL)
out.write('To: ' + raw_message['To'] + text.EOL)
if raw_message['Cc']:
out.write('Cc: ' + raw_message['Cc'] + text.EOL)
if raw_message['Bcc']:
out.write('Bcc: ' + raw_message['Bcc'] + text.EOL)
if raw_message['Subject']:
out.write('Subject: ' + raw_message['Subject'] + text.EOL)
if conf.config_item_equals('default', 'add_header', 'yes'):
out.write('X-GPG-Mailgate: Encrypted by GPG Mailgate' + text.EOL)
s.write(out, p7)
LOG.debug(f"Sending message from {from_addr} to {smime_to}")
send_msg(out.read(), smime_to)
if unsmime_to:
LOG.debug(f"Unable to find valid S/MIME certificates for {unsmime_to}")
return unsmime_to
def _get_cert_for_email(to_addr, cert_path):
insensitive = conf.config_item_equals('default', 'mail_case_insensitive', 'yes')
LOG.info(f'Retrieving certificate for {to_addr!r} from {cert_path!r}, sensitivity={insensitive!r}')
files_in_directory = os.listdir(cert_path)
for filename in files_in_directory:
file_path = os.path.join(cert_path, filename)
if not os.path.isfile(file_path):
continue
if insensitive:
if filename.casefold() == to_addr:
return (file_path, to_addr)
else:
if filename == to_addr:
return (file_path, to_addr)
# support foo+ignore@bar.com -> foo@bar.com
LOG.info(f"An email with topic? {to_addr}")
(fixed_up_email, topic) = text.parse_delimiter(to_addr)
LOG.info(f'Got {fixed_up_email!r} and {topic!r}')
if topic is None:
# delimiter not used
LOG.info('Topic not found')
return None
else: else:
LOG.info(f"Looking up certificate for {fixed_up_email} after parsing {to_addr}") return GnuPG.GPGEncryptor(keyhome, recipients)
return _get_cert_for_email(fixed_up_email, cert_path)
def _append_gpg_extension(attachment):
filename = attachment.get_filename()
if not filename:
return
pgpFilename = filename + ".pgp"
# Attachment name can come from one of two places: Content-Disposition or
# Content-Type header, hence the two cases below.
if not (attachment.get('Content-Disposition') is None):
attachment.set_param('filename', pgpFilename, 'Content-Disposition')
if not (attachment.get('Content-Type') is None) and not (attachment.get_param('name') is None):
attachment.set_param('name', pgpFilename)
def _generate_message_from_payloads(payloads, message=None): def _generate_message_from_payloads(payloads, message=None):
@ -473,26 +301,33 @@ def _get_first_payload(payloads):
return payloads return payloads
def send_msg(message: str, recipients, fromaddr=None): def _recode(m: EmailMessage):
"""Send MESSAGE to RECIPIENTS to the mail relay.""" payload = m.get_payload()
global from_addr m.set_content(payload)
if fromaddr is not None:
from_addr = fromaddr
recipients = [_f for _f in recipients if _f] def failover_delivery(message: EmailMessage, recipients, from_address):
if recipients: """Try delivering message just one last time."""
LOG.info(f"Sending email to: {recipients!r}") LOG.debug('Failover delivery')
relay = conf.relay_params()
smtp = smtplib.SMTP(relay[0], relay[1]) send = SendFrom(from_address)
if conf.flag_enabled('relay', 'starttls'): if message.get_content_maintype() == 'text':
smtp.starttls() LOG.debug('Flat text message, adjusting coding')
smtp.sendmail(from_addr, recipients, message) _recode(message)
b = message.as_bytes(policy=SMTPUTF8)
send(b, recipients)
elif message.get_content_maintype() == 'multipart':
LOG.debug('Multipart message, adjusting coding of text entities')
for part in message.iter_parts():
if part.get_content_maintype() == 'text':
_recode(part)
b = message.as_bytes(policy=SMTPUTF8)
send(b, recipients)
else: else:
LOG.info("No recipient found") LOG.warning('No failover strategy, giving up')
def _is_encrypted(raw_message: email.message.Message): def _is_encrypted(raw_message: EmailMessage):
if raw_message.get_content_type() == 'multipart/encrypted': if raw_message.get_content_type() == 'multipart/encrypted':
return True return True
@ -503,45 +338,44 @@ def _is_encrypted(raw_message: email.message.Message):
return text.is_message_pgp_inline(first_part) return text.is_message_pgp_inline(first_part)
def delivery_plan(recipients, message: email.message.Message, key_cache: kcache.KeyCache): def delivery_plan(recipients, message: EmailMessage, key_cache: kcache.KeyCache):
"""Generate a sequence of delivery strategies.""" """Generate a sequence of delivery strategies."""
if _is_encrypted(message): if _is_encrypted(message):
LOG.debug(f'Message is already encrypted: {message!r}') LOG.debug('Message is already encrypted: %s', message)
return [KeepIntact(recipients)] return [KeepIntact(recipients)]
gpg_to, ungpg_to = _identify_gpg_recipients(recipients, key_cache) gpg_recipients, cleartext_recipients = recpt.identify_gpg_recipients(recipients, key_cache)
gpg_mime_to, gpg_mime_cmd, gpg_inline_to, gpg_inline_cmd = \ mime, inline = _sort_gpg_recipients(gpg_recipients)
_sort_gpg_recipients(gpg_to)
keyhome = conf.get_item('gpg', 'keyhome') keyhome = conf.get_item('gpg', 'keyhome')
plan = [] plan = []
if gpg_mime_to: if mime:
plan.append(MimeOpenPGPEncrypt(gpg_mime_to, gpg_mime_cmd, keyhome)) plan.append(MimeOpenPGPEncrypt(mime.emails(), mime.keys(), keyhome))
if gpg_inline_to: if inline:
plan.append(InlineOpenPGPEncrypt(gpg_inline_to, gpg_inline_cmd, keyhome)) plan.append(InlineOpenPGPEncrypt(inline.emails(), inline.keys(), keyhome))
if ungpg_to: if cleartext_recipients:
plan.append(KeepIntact(ungpg_to)) plan.append(KeepIntact(cleartext_recipients))
return plan return plan
def deliver_message(raw_message: email.message.Message, from_address, to_addrs): def deliver_message(raw_message: EmailMessage, from_address, to_addrs):
"""Send RAW_MESSAGE to all TO_ADDRS using the best encryption method available.""" """Send RAW_MESSAGE to all TO_ADDRS using the best encryption method available."""
global from_addr
# Ugly workaround to keep the code working without too many changes. # Ugly workaround to keep the code working without too many changes.
from_addr = from_address register_sender(from_address)
sanitize = text.choose_sanitizer(conf.get_item('default', 'mail_case_insensitive')) sanitize = text.choose_sanitizer(conf.get_item('default', 'mail_case_insensitive'))
recipients_left = [sanitize(recipient) for recipient in to_addrs] recipients_left = [sanitize(recipient) for recipient in to_addrs]
send = SendFrom(from_address)
# There is no need for nested encryption # There is no need for nested encryption
LOG.debug("Seeing if it's already encrypted") LOG.debug("Seeing if it's already encrypted")
if _is_encrypted(raw_message): if _is_encrypted(raw_message):
LOG.debug("Message is already encrypted. Encryption aborted.") LOG.debug("Message is already encrypted. Encryption aborted.")
send_msg(raw_message.as_string(), recipients_left) send(raw_message.as_string(), recipients_left)
return return
# Encrypt mails for recipients with known public PGP keys # Encrypt mails for recipients with known public PGP keys
@ -552,10 +386,10 @@ def deliver_message(raw_message: email.message.Message, from_address, to_addrs):
# Encrypt mails for recipients with known S/MIME certificate # Encrypt mails for recipients with known S/MIME certificate
LOG.debug("Encrypting with S/MIME") LOG.debug("Encrypting with S/MIME")
recipients_left = _smime_encrypt(raw_message, recipients_left) recipients_left = smime.encrypt(raw_message, recipients_left, from_address)
if not recipients_left: if not recipients_left:
return return
# Send out mail to recipients which are left # Send out mail to recipients which are left
LOG.debug("Sending the rest as text/plain") LOG.debug("Sending the rest as text/plain")
send_msg(raw_message.as_string(), recipients_left) send(raw_message.as_bytes(policy=SMTPUTF8), recipients_left)

View File

@ -2,29 +2,28 @@
import logging import logging
import lacre import lacre
from lacre.text import DOUBLE_EOL_BYTES
import lacre.config as conf import lacre.config as conf
import sys import sys
from aiosmtpd.controller import Controller from aiosmtpd.controller import Controller
from aiosmtpd.smtp import Envelope from aiosmtpd.smtp import Envelope
import asyncio import asyncio
import email import email
from email.policy import SMTPUTF8
import time import time
from watchdog.observers import Observer from watchdog.observers import Observer
# Mail status constants.
#
# These are the only values that our mail handler is allowed to return.
RESULT_OK = '250 OK'
RESULT_ERROR = '500 Could not process your message'
# Load configuration and init logging, in this order. Only then can we load # Load configuration and init logging, in this order. Only then can we load
# the last Lacre module, i.e. lacre.mailgate. # the last Lacre module, i.e. lacre.mailgate.
conf.load_config() conf.load_config()
lacre.init_logging(conf.get_item("logging", "config")) lacre.init_logging(conf.get_item("logging", "config"))
LOG = logging.getLogger('lacre.daemon') LOG = logging.getLogger('lacre.daemon')
from GnuPG import EncryptionException
import lacre.core as gate import lacre.core as gate
import lacre.keyring as kcache import lacre.keyring as kcache
import lacre.transport as xport
from lacre.mailop import KeepIntact
class MailEncryptionProxy: class MailEncryptionProxy:
@ -39,25 +38,72 @@ class MailEncryptionProxy:
start = time.process_time() start = time.process_time()
try: try:
keys = await self._keyring.freeze_identities() keys = await self._keyring.freeze_identities()
message = email.message_from_bytes(envelope.content) LOG.debug('Parsing message: %s', self._beginning(envelope))
message = email.message_from_bytes(envelope.original_content, policy=SMTPUTF8)
LOG.debug('Parsed into %s: %s', type(message), repr(message))
if message.defects:
# Sometimes a weird message cannot be encoded back and
# delivered, so before bouncing such messages we at least
# record information about the issues. Defects are identified
# by email.* package.
LOG.warning("Issues found: %d; %s", len(message.defects), repr(message.defects))
if conf.flag_enabled('daemon', 'log_headers'):
LOG.info('Message headers: %s', self._extract_headers(message))
send = xport.SendFrom(envelope.mail_from)
for operation in gate.delivery_plan(envelope.rcpt_tos, message, keys): for operation in gate.delivery_plan(envelope.rcpt_tos, message, keys):
LOG.debug(f"Sending mail via {operation!r}") LOG.debug(f"Sending mail via {operation!r}")
new_message = operation.perform(message) try:
gate.send_msg(new_message, operation.recipients(), envelope.mail_from) new_message = operation.perform(message)
except TypeError as te: send(new_message, operation.recipients())
LOG.exception("Got exception while processing", exc_info=te) except EncryptionException:
return RESULT_ERROR # If the message can't be encrypted, deliver cleartext.
LOG.exception('Unable to encrypt message, delivering in cleartext')
if not isinstance(operation, KeepIntact):
self._send_unencrypted(operation, message, envelope, send)
else:
LOG.error(f'Cannot perform {operation}')
except:
LOG.exception('Unexpected exception caught, bouncing message')
return xport.RESULT_ERROR
ellapsed = (time.process_time() - start) * 1000 ellapsed = (time.process_time() - start) * 1000
LOG.info(f'Message delivered in {ellapsed:.2f} ms') LOG.info(f'Message delivered in {ellapsed:.2f} ms')
return RESULT_OK
return xport.RESULT_OK
def _send_unencrypted(self, operation, message, envelope, send: xport.SendFrom):
keep = KeepIntact(operation.recipients())
new_message = keep.perform(message)
send(new_message, operation.recipients(), envelope.mail_from)
def _beginning(self, e: Envelope) -> bytes:
double_eol_pos = e.original_content.find(DOUBLE_EOL_BYTES)
if double_eol_pos < 0:
limit = len(e.original_content)
else:
limit = double_eol_pos
end = min(limit, 2560)
return e.original_content[0:end]
def _extract_headers(self, message: email.message.Message):
return {
'mime' : message.get_content_type(),
'charsets' : message.get_charsets(),
'cte' : message['Content-Transfer-Encoding']
}
def _init_controller(keys: kcache.KeyRing, tout: float = 5): def _init_controller(keys: kcache.KeyRing, max_body_bytes=None, tout: float = 5):
proxy = MailEncryptionProxy(keys) proxy = MailEncryptionProxy(keys)
host, port = conf.daemon_params() host, port = conf.daemon_params()
LOG.info(f"Initialising a mail Controller at {host}:{port}") LOG.info(f"Initialising a mail Controller at {host}:{port}")
return Controller(proxy, hostname=host, port=port, ready_timeout=tout) return Controller(proxy, hostname=host, port=port,
ready_timeout=tout,
data_size_limit=max_body_bytes)
def _init_reloader(keyring_dir: str, reloader) -> kcache.KeyringModificationListener: def _init_reloader(keyring_dir: str, reloader) -> kcache.KeyringModificationListener:
@ -88,8 +134,12 @@ def _main():
_validate_config() _validate_config()
keyring_path = conf.get_item('gpg', 'keyhome') keyring_path = conf.get_item('gpg', 'keyhome')
keyring = kcache.KeyRing(keyring_path) max_data_bytes = int(conf.get_item('daemon', 'max_data_bytes', 2**25))
controller = _init_controller(keyring)
loop = asyncio.get_event_loop()
keyring = kcache.KeyRing(keyring_path, loop)
controller = _init_controller(keyring, max_data_bytes)
reloader = _init_reloader(keyring_path, keyring) reloader = _init_reloader(keyring_path, keyring)
LOG.info(f'Watching keyring directory {keyring_path}...') LOG.info(f'Watching keyring directory {keyring_path}...')
@ -99,7 +149,7 @@ def _main():
controller.start() controller.start()
try: try:
asyncio.run(_sleep()) loop.run_until_complete(_sleep())
except KeyboardInterrupt: except KeyboardInterrupt:
LOG.info("Finishing...") LOG.info("Finishing...")
except: except:

View File

@ -9,7 +9,7 @@ import lacre.config as conf
import logging import logging
from os import stat from os import stat
from watchdog.events import FileSystemEventHandler, FileSystemEvent from watchdog.events import FileSystemEventHandler, FileSystemEvent
from asyncio import Semaphore, run from asyncio import Semaphore, create_task, get_event_loop, run
import copy import copy
import GnuPG import GnuPG
@ -62,7 +62,7 @@ class KeyCache:
def __repr__(self): def __repr__(self):
"""Return text representation of this object.""" """Return text representation of this object."""
details = ' '.join(self._keys.keys()) details = ' '.join(self._keys.keys())
return f'<KeyCache {details}>' return '<KeyCache %s>' % (details)
class KeyRing: class KeyRing:
@ -73,12 +73,13 @@ class KeyRing:
fingerprint=>email maps. fingerprint=>email maps.
""" """
def __init__(self, path: str): def __init__(self, path: str, loop=None):
"""Initialise the adapter.""" """Initialise the adapter."""
self._path = path self._path = path
self._keys = self._load_and_sanitize() self._keys = self._load_and_sanitize()
self._sema = Semaphore() self._sema = Semaphore()
self._last_mod = None self._last_mod = None
self._loop = loop or get_event_loop()
def _load_and_sanitize(self): def _load_and_sanitize(self):
keys = self._load_keyring_from(self._path) keys = self._load_keyring_from(self._path)
@ -96,13 +97,19 @@ class KeyRing:
def load(self): def load(self):
"""Load keyring, replacing any previous contents of the cache.""" """Load keyring, replacing any previous contents of the cache."""
LOG.debug('Reloading keys...') LOG.debug('Reloading keys...')
run(self._load()) tsk = create_task(self._load(), 'LoadTask')
self._loop.run_until_complete(tsk)
async def _load(self): async def _load(self):
last_mod = self._read_mod_time() last_mod = self._read_mod_time()
LOG.debug(f'Keyring was last modified: {last_mod}')
if self._is_modified(last_mod): if self._is_modified(last_mod):
LOG.debug('Keyring has been modified')
async with self._sema: async with self._sema:
LOG.debug('About to re-load the keyring')
self.replace_keyring(self._load_keyring_from(self._path)) self.replace_keyring(self._load_keyring_from(self._path))
else:
LOG.debug('Keyring not modified recently, continuing')
self._last_mod = self._read_mod_time() self._last_mod = self._read_mod_time()
@ -115,7 +122,7 @@ class KeyRing:
LOG.info(f'Storing {len(keys)} keys') LOG.info(f'Storing {len(keys)} keys')
self._keys = keys self._keys = keys
def _read_mod_time(self): def _read_mod_time(self) -> int:
# (mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime) # (mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime)
# 0 1 2 3 4 5 6 7 8 9 # 0 1 2 3 4 5 6 7 8 9
MTIME = 8 MTIME = 8
@ -133,6 +140,10 @@ class KeyRing:
LOG.debug('Keyring not modified ') LOG.debug('Keyring not modified ')
return False return False
def __repr__(self) -> str:
"""Return text representation of this keyring."""
return '<KeyRing path=%s last_mod=%d>' % (self._path, self._last_mod)
class KeyringModificationListener(FileSystemEventHandler): class KeyringModificationListener(FileSystemEventHandler):
"""A filesystem event listener that triggers key cache reload.""" """A filesystem event listener that triggers key cache reload."""
@ -143,11 +154,22 @@ class KeyringModificationListener(FileSystemEventHandler):
def handle(self, event: FileSystemEvent): def handle(self, event: FileSystemEvent):
"""Reload keys upon FS event.""" """Reload keys upon FS event."""
LOG.debug('FS event: %s, %s', event.event_type, event.src_path)
if 'pubring.kbx' in event.src_path: if 'pubring.kbx' in event.src_path:
LOG.debug(f'Reloading on event {event!r}') LOG.info('Reloading %s on event: %s', self._keyring, event)
self._keyring.reload() self._keyring.reload()
# All methods should do the same: reload the key cache. # All methods should do the same: reload the key cache.
# on_created = handle # on_created = handle
# on_deleted = handle # on_deleted = handle
on_modified = handle on_modified = handle
def freeze_and_load_keys():
"""Load and return keys.
Doesn't refresh the keys when they change on disk.
'"""
keyring_dir = conf.get_item('gpg', 'keyhome')
keyring = KeyRing(keyring_dir)
return run(keyring.freeze_identities())

View File

@ -16,6 +16,7 @@ There are 3 operations available:
import logging import logging
import lacre.core as core import lacre.core as core
from email.message import Message from email.message import Message
from email.policy import SMTP, SMTPUTF8
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -28,7 +29,7 @@ class MailOperation:
"""Initialise the operation with a recipient.""" """Initialise the operation with a recipient."""
self._recipients = recipients self._recipients = recipients
def perform(self, message: Message): def perform(self, message: Message) -> bytes:
"""Perform this operation on MESSAGE. """Perform this operation on MESSAGE.
Return target message. Return target message.
@ -69,12 +70,12 @@ class InlineOpenPGPEncrypt(OpenPGPEncrypt):
"""Initialise strategy object.""" """Initialise strategy object."""
super().__init__(recipients, keys, keyhome) super().__init__(recipients, keys, keyhome)
def perform(self, msg: Message): def perform(self, msg: Message) -> bytes:
"""Encrypt with PGP Inline.""" """Encrypt with PGP Inline."""
LOG.debug('Sending PGP/Inline...') LOG.debug('Sending PGP/Inline...')
return core._gpg_encrypt_and_return(msg, return core._gpg_encrypt_to_bytes(msg,
self._keys, self._recipients, self._keys, self._recipients,
core._encrypt_all_payloads_inline) core._encrypt_all_payloads_inline)
class MimeOpenPGPEncrypt(OpenPGPEncrypt): class MimeOpenPGPEncrypt(OpenPGPEncrypt):
@ -84,12 +85,12 @@ class MimeOpenPGPEncrypt(OpenPGPEncrypt):
"""Initialise strategy object.""" """Initialise strategy object."""
super().__init__(recipients, keys, keyhome) super().__init__(recipients, keys, keyhome)
def perform(self, msg: Message): def perform(self, msg: Message) -> bytes:
"""Encrypt with PGP MIME.""" """Encrypt with PGP MIME."""
LOG.debug('Sending PGP/MIME...') LOG.debug('Sending PGP/MIME...')
return core._gpg_encrypt_and_return(msg, return core._gpg_encrypt_to_bytes(msg,
self._keys, self._recipients, self._keys, self._recipients,
core._encrypt_all_payloads_mime) core._encrypt_all_payloads_mime)
class SMimeEncrypt(MailOperation): class SMimeEncrypt(MailOperation):
@ -101,10 +102,10 @@ class SMimeEncrypt(MailOperation):
self._email = email self._email = email
self._cert = certificate self._cert = certificate
def perform(self, message: Message): def perform(self, message: Message) -> bytes:
"""Encrypt with a certificate.""" """Encrypt with a certificate."""
LOG.warning(f"Delivering clear-text to {self._recipients}") LOG.warning(f"Delivering clear-text to {self._recipients}")
return message return message.as_bytes(policy=SMTP)
def __repr__(self): def __repr__(self):
"""Generate a representation with just method and key.""" """Generate a representation with just method and key."""
@ -121,9 +122,9 @@ class KeepIntact(MailOperation):
"""Initialise pass-through operation for a given recipient.""" """Initialise pass-through operation for a given recipient."""
super().__init__(recipients) super().__init__(recipients)
def perform(self, message: Message): def perform(self, message: Message) -> bytes:
"""Return MESSAGE unmodified.""" """Return MESSAGE unmodified."""
return message.as_string() return message.as_bytes(policy=SMTPUTF8)
def __repr__(self): def __repr__(self):
"""Return representation with just method and email.""" """Return representation with just method and email."""

204
lacre/recipients.py Normal file
View File

@ -0,0 +1,204 @@
#
# gpg-mailgate
#
# This file is part of the gpg-mailgate source code.
#
# gpg-mailgate is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# gpg-mailgate source code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
#
"""Recipient processing package.
Defines:
- GpgRecipient, wrapper for user's email and identity.
- RecipientList, a wrapper for lists of GpgRecipient objects.
"""
import logging
import lacre.config as conf
import lacre.keyring as kcache
import lacre.text as text
LOG = logging.getLogger(__name__)
class Recipient:
"""Wraps recipient's email."""
def __init__(self, email):
"""Initialise the recipient."""
self._email = email
def email(self) -> str:
"""Return email address of this recipient."""
return self._email
def __str__(self):
"""Return string representation of this recipient: the email address."""
return self._email
class GpgRecipient(Recipient):
"""A tuple-like object that contains GPG recipient data."""
def __init__(self, left, right):
"""Initialise a tuple-like object that contains GPG recipient data."""
self._left = left
self._right = right
def __getitem__(self, index):
"""Pretend this object is a tuple by returning an indexed tuple element."""
if index == 0:
return self._left
elif index == 1:
return self._right
else:
raise IndexError()
def __repr__(self):
"""Return textual representation of this GPG Recipient."""
return f"GpgRecipient({self._left!r}, {self._right!r})"
def email(self) -> str:
"""Return this recipient's email address."""
return self._left
def key(self):
"""Return this recipient's key ID."""
return self._right
class RecipientList:
"""Encalsulates two lists of recipients.
First list contains addresses, the second - GPG identities.
"""
def __init__(self, recipients=[], keys=[]):
"""Initialise lists of recipients and identities."""
self._recipients = [GpgRecipient(email, key) for (email, key) in zip(recipients, keys)]
def emails(self):
"""Return list of recipients."""
return [r.email() for r in self._recipients]
def keys(self):
"""Return list of GPG identities."""
return [r.key() for r in self._recipients]
def __iadd__(self, recipient: GpgRecipient):
"""Append a recipient."""
LOG.debug('Adding %s to %s', recipient, self._recipients)
self._recipients.append(recipient)
LOG.debug('Added; got: %s', self._recipients)
return self
def __len__(self):
"""Provide len().
With this method, it is possible to write code like:
rl = RecipientList()
if rl:
# do something
"""
return len(self._recipients)
def __repr__(self):
"""Returns textual object representation."""
return '<RecipientList %d %s>' % (len(self._recipients), ','.join(self.emails()))
def identify_gpg_recipients(recipients, keys: kcache.KeyCache):
"""Split recipient list into GPG and non-GPG ones."""
# This list will be filled with pairs (M, N), where M is the destination
# address we're going to deliver the message to and N is the identity we're
# going to encrypt it for.
gpg_recipients = list()
# This will be the list of recipients that haven't provided us with their
# public keys.
cleartext_recipients = list()
# In "strict mode", only keys included in configuration are used to encrypt
# email.
strict_mode = conf.strict_mode()
for recipient in recipients:
gpg_recipient = _find_key(recipient, keys, strict_mode)
if gpg_recipient is not None:
gpg_recipients.append(gpg_recipient)
else:
cleartext_recipients.append(recipient)
LOG.debug('Collected recipients; GPG: %s; cleartext: %s', gpg_recipients, cleartext_recipients)
return gpg_recipients, cleartext_recipients
def _find_key(recipient, keys: kcache.KeyCache, strict_mode):
own_key = _try_configured_key(recipient, keys)
if own_key is not None:
return GpgRecipient(own_key[0], own_key[1])
direct_key = _try_direct_key_lookup(recipient, keys, strict_mode)
if direct_key is not None:
return GpgRecipient(direct_key[0], direct_key[1])
domain_key = _try_configured_domain_key(recipient, keys)
if domain_key is not None:
return GpgRecipient(domain_key[0], domain_key[1])
return None
def _try_configured_key(recipient, keys: kcache.KeyCache):
if conf.config_item_set('enc_keymap', recipient):
key = conf.get_item('enc_keymap', recipient)
if key in keys:
LOG.debug(f"Found key {key} configured for {recipient}")
return (recipient, key)
LOG.debug(f"No configured key found for {recipient}")
return None
def _try_direct_key_lookup(recipient, keys: kcache.KeyCache, strict_mode):
if strict_mode:
return None
if keys.has_email(recipient):
LOG.info(f"Found key for {recipient}")
return recipient, recipient
(newto, topic) = text.parse_delimiter(recipient)
if keys.has_email(newto):
LOG.info(f"Found key for {newto}, stripped {recipient}")
return recipient, newto
return None
def _try_configured_domain_key(recipient, keys: kcache.KeyCache):
parts = recipient.split('@')
if len(parts) != 2:
return None
domain = parts[1]
if conf.config_item_set('enc_domain_keymap', domain):
domain_key = conf.get_item('enc_domain_keymap', domain)
if domain_key in keys:
LOG.debug(f"Found domain key {domain_key} for {recipient}")
return recipient, domain_key
LOG.debug(f"No domain key for {recipient}")
return None

126
lacre/smime.py Normal file
View File

@ -0,0 +1,126 @@
#
# gpg-mailgate
#
# This file is part of the gpg-mailgate source code.
#
# gpg-mailgate is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# gpg-mailgate source code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
#
"""S/MIME handling module."""
import os
from M2Crypto import BIO, SMIME, X509
import logging
import lacre.text as text
import lacre.config as conf
import lacre.transport as xport
LOG = logging.getLogger(__name__)
#
# WARNING: This file is not covered with E2E tests.
#
def encrypt(raw_message, recipients, from_addr):
"""Encrypt with S/MIME."""
if not conf.config_item_set('smime', 'cert_path'):
LOG.info("No valid path for S/MIME certs found in config file. S/MIME encryption aborted.")
return recipients
cert_path = conf.get_item('smime', 'cert_path')+"/"
s = SMIME.SMIME()
sk = X509.X509_Stack()
smime_to = list()
cleartext_to = list()
for addr in recipients:
cert_and_email = _get_cert_for_email(addr, cert_path)
if not (cert_and_email is None):
(to_cert, normal_email) = cert_and_email
LOG.debug("Found cert " + to_cert + " for " + addr + ": " + normal_email)
smime_to.append(addr)
x509 = X509.load_cert(to_cert, format=X509.FORMAT_PEM)
sk.push(x509)
else:
cleartext_to.append(addr)
if smime_to:
s.set_x509_stack(sk)
s.set_cipher(SMIME.Cipher('aes_192_cbc'))
p7 = s.encrypt(BIO.MemoryBuffer(raw_message.as_string()))
# Output p7 in mail-friendly format.
out = BIO.MemoryBuffer()
out.write('From: ' + from_addr + text.EOL_S)
out.write('To: ' + raw_message['To'] + text.EOL_S)
if raw_message['Cc']:
out.write('Cc: ' + raw_message['Cc'] + text.EOL_S)
if raw_message['Bcc']:
out.write('Bcc: ' + raw_message['Bcc'] + text.EOL_S)
if raw_message['Subject']:
out.write('Subject: ' + raw_message['Subject'] + text.EOL_S)
if conf.config_item_equals('default', 'add_header', 'yes'):
out.write('X-GPG-Mailgate: Encrypted by GPG Mailgate' + text.EOL_S)
s.write(out, p7)
LOG.debug(f"Sending message from {from_addr} to {smime_to}")
send_msg = xport.SendFrom(from_addr)
send_msg(out.read(), smime_to)
if cleartext_to:
LOG.debug(f"Unable to find valid S/MIME certificates for {cleartext_to}")
return cleartext_to
def _path_comparator(insensitive: bool):
if insensitive:
return lambda filename, recipient: filename.casefold() == recipient
else:
return lambda filename, recipient: filename == recipient
def _get_cert_for_email(to_addr, cert_path):
insensitive = conf.config_item_equals('default', 'mail_case_insensitive', 'yes')
paths_equal = _path_comparator(insensitive)
LOG.info('Retrieving certificate for %s from %s, insensitive=%s',
to_addr, cert_path, insensitive)
files_in_directory = os.listdir(cert_path)
for filename in files_in_directory:
file_path = os.path.join(cert_path, filename)
if not os.path.isfile(file_path):
continue
if paths_equal(file_path, to_addr):
return (file_path, to_addr)
# support foo+ignore@bar.com -> foo@bar.com
LOG.info(f"An email with topic? {to_addr}")
(fixed_up_email, topic) = text.parse_delimiter(to_addr)
LOG.info(f'Got {fixed_up_email!r} and {topic!r}')
if topic is None:
# delimiter not used
LOG.info('Topic not found')
return None
else:
LOG.info(f"Looking up certificate for {fixed_up_email} after parsing {to_addr}")
return _get_cert_for_email(fixed_up_email, cert_path)

View File

@ -3,15 +3,19 @@
import sys import sys
import re import re
import logging import logging
from email.message import Message from email.message import EmailMessage
# The standard way to encode line-ending in email: # The standard way to encode line-ending in email:
EOL = "\r\n" EOL = b"\r\n"
EOL_BYTES = b"\r\n" EOL_S = EOL.decode()
DOUBLE_EOL_BYTES = EOL*2
PGP_INLINE_BEGIN = EOL_BYTES + b"-----BEGIN PGP MESSAGE-----" + EOL_BYTES PGP_BEGIN = b"-----BEGIN PGP MESSAGE-----"
PGP_INLINE_END = EOL_BYTES + b"-----END PGP MESSAGE-----" + EOL_BYTES PGP_END = b"-----END PGP MESSAGE-----"
PGP_BEGIN_S = PGP_BEGIN.decode()
PGP_END_S = PGP_END.decode()
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -72,12 +76,25 @@ def choose_sanitizer(mail_case_insensitive: bool):
return _lowercase_domain_only return _lowercase_domain_only
def is_payload_pgp_inline(payload: bytes) -> bool: def is_payload_pgp_inline(payload) -> bool:
"""Find out if the payload (bytes) contains PGP/inline markers.""" """Find out if the payload (bytes) contains PGP/inline markers."""
return PGP_INLINE_BEGIN in payload and PGP_INLINE_END in payload if isinstance(payload, bytes):
return payload.startswith(PGP_BEGIN) and _ends_with(payload, PGP_END)
elif isinstance(payload, str):
return payload.startswith(PGP_BEGIN_S) and _ends_with(payload, PGP_END_S)
else:
raise TypeError('Expected str or bytes')
def is_message_pgp_inline(message: Message) -> bool: def _ends_with(payload, marker) -> bool:
# Length of the span at the end of the payload we want to inspect should
# include CRLF, CR or LF, so make it slightly larger than the marker
# itself.
span = len(marker) + 2
return marker in payload[-span:]
def is_message_pgp_inline(message: EmailMessage) -> bool:
"""Find out if a message is already PGP-Inline encrypted.""" """Find out if a message is already PGP-Inline encrypted."""
if message.is_multipart() or isinstance(message.get_payload(), list): if message.is_multipart() or isinstance(message.get_payload(), list):
# more than one payload, check each one of them # more than one payload, check each one of them

71
lacre/transport.py Normal file
View File

@ -0,0 +1,71 @@
"""SMTP transport module."""
import smtplib
import logging
from typing import AnyStr, List
import lacre.config as conf
# Mail status constants.
#
# These are the only values that our mail handler is allowed to return.
RESULT_OK = '250 OK'
RESULT_ERROR = '500 Could not process your message'
LOG = logging.getLogger(__name__)
# This is a left-over from old architecture.
from_addr = None
def register_sender(fromaddr):
"""Set module state: message sender address."""
global from_addr
LOG.warning('Setting global recipient: %s', fromaddr)
from_addr = fromaddr
def send_msg(message: AnyStr, recipients: List[str]):
"""Send MESSAGE to RECIPIENTS to the mail relay."""
global from_addr
LOG.debug('Delivery from %s to %s', from_addr, recipients)
recipients = [_f for _f in recipients if _f]
if recipients:
LOG.info(f"Sending email to: {recipients!r}")
relay = conf.relay_params()
smtp = smtplib.SMTP(relay[0], relay[1])
if conf.flag_enabled('relay', 'starttls'):
smtp.starttls()
smtp.sendmail(from_addr, recipients, message)
else:
LOG.info("No recipient found")
class SendFrom:
"""A class wrapping the transport process."""
def __init__(self, from_addr):
"""Initialise the transport."""
self._from_addr = from_addr
def __call__(self, message: AnyStr, recipients: List[str]):
"""Send the given message to all recipients from the list.
- Message is the email object serialised to str or bytes.
- Empty recipients are filtered out before communication.
"""
recipients = [_f for _f in recipients if _f]
if not recipients:
LOG.warning("No recipient found")
return
LOG.info("Sending email to: %s", recipients)
relay = conf.relay_params()
smtp = smtplib.SMTP(relay[0], relay[1])
if conf.flag_enabled('relay', 'starttls'):
smtp.starttls()
smtp.sendmail(self._from_addr, recipients, message)

View File

@ -22,12 +22,14 @@ import logging
import subprocess import subprocess
import os import os
import time import time
import unittest
def _spawn(cmd): def _spawn(cmd):
env_dict = { env_dict = {
"PATH": os.getenv("PATH"), "PATH": os.getenv("PATH"),
"PYTHONPATH": os.getcwd(), "PYTHONPATH": os.getcwd(),
"LANG": 'en_US.UTF-8',
"GPG_MAILGATE_CONFIG": "test/gpg-mailgate-daemon-test.conf" "GPG_MAILGATE_CONFIG": "test/gpg-mailgate-daemon-test.conf"
} }
logging.debug(f"Spawning command: {cmd} with environment: {env_dict!r}") logging.debug(f"Spawning command: {cmd} with environment: {env_dict!r}")
@ -38,25 +40,19 @@ def _spawn(cmd):
def _interrupt(proc): def _interrupt(proc):
# proc.send_signal(signal.SIGINT)
proc.terminate() proc.terminate()
def _load(name):
logging.debug(f"Loading file {name}")
f = open(name, "r")
contents = f.read()
f.close()
return contents
def _send(host, port, mail_from, mail_to, message): def _send(host, port, mail_from, mail_to, message):
logging.debug(f"Sending message to {host}:{port}") logging.debug(f"Sending message to {host}:{port}")
_spawn([os.getenv("PYTHON") or "python", p = _spawn([os.getenv("PYTHON") or "python",
"test/utils/sendmail.py", "test/utils/sendmail.py",
"-f", mail_from, "-f", mail_from,
"-t", mail_to, "-t", mail_to,
"-m", message]) "-m", message])
# Perform subprocess's internal resource management:
p.communicate()
def _load_test_config(): def _load_test_config():
@ -65,66 +61,76 @@ def _load_test_config():
return cp return cp
def _identity(x): class AdvancedMailFilterE2ETest(unittest.TestCase):
return x """End-to-end tests for Advanced Mail Filter.
These tests are described by e2e.ini file, each case being a
separate section. All cases are executed following the same
procedure:
1. start up a mail relay mock;
2. load test message;
3. send the message to the daemon;
4. check if message received by relay mock meets criteria.
Before any case is executed, the daemon is started and finally it's
terminated by sending it a SIGINT signal."""
@classmethod
def setUpClass(cls):
"""Start up the daemon."""
cls.config = _load_test_config()
python = os.getenv("PYTHON", "python")
logging.info('Starting the server...')
cls.server = _spawn([python, '-m', 'lacre.daemon'])
@classmethod
def tearDownClass(cls):
"""Terminate the daemon."""
logging.info('Closing the server (SIGINT): %s', (cls.server))
_interrupt(cls.server)
def case_names(self):
"""A generator yielding a sequence of test case names."""
def is_test_case(case_name: str) -> bool:
return case_name.startswith('case-')
for tc in filter(is_test_case, self.config.sections()):
yield tc
def test_all_cases(self):
for case_name in self.case_names():
with self.subTest(case=case_name):
self._execute_case(self.config, case_name=case_name)
def _execute_case(self, config, case_name):
logging.info(f"Executing case {case_name}")
python = os.getenv("PYTHON", "python")
relay_mock = _spawn([python, "test/utils/relay.py", "2500"])
time.sleep(1) # Wait for the relay to start up.
_send("localhost", 10025, "dave@disposlab",
config.get(case_name, 'to'), config.get(case_name, 'in'))
(test_out, _) = relay_mock.communicate()
test_out = test_out.decode('utf-8')
logging.debug(f"Read {len(test_out)} characters of output: '{test_out}'")
if 'out' in config[case_name]:
expected = '\r\n' + self.config.get(case_name, 'out')
self.assertIn(expected, test_out, self.config.get(case_name, 'in'))
else:
unexpected = '\r\n' + self.config.get(case_name, 'out-not')
self.assertNotIn(unexpected, test_out, self.config.get(case_name, 'in'))
def _inversion(x): if __name__ == '__main__':
return not(x)
def _report_result(message_file, expected, test_output, boolean_func=_identity):
status = None
expected_line = "\r\n" + expected # + "\r\n"
cond_met = boolean_func(expected_line in test_output)
if cond_met:
status = "Success"
else:
status = "Failure"
print(message_file.ljust(35), status)
def _execute_case(config, case_name):
logging.info(f"Executing case {case_name}")
python = os.getenv("PYTHON", "python")
relay_mock = _spawn([python, "test/utils/relay.py", "2500"])
time.sleep(1) # Wait for the relay to start up.
_send("localhost", 10025, "dave@disposlab",
config.get(case_name, 'to'), config.get(case_name, 'in'))
relay_mock.wait()
(test_out, _) = relay_mock.communicate()
test_out = test_out.decode('utf-8')
logging.debug(f"Read {len(test_out)} characters of output: '{test_out}'")
if 'out' in config[case_name]:
_report_result(config.get(case_name, "in"), config.get(case_name, "out"), test_out)
else:
_report_result(config.get(case_name, "in"), config.get(case_name, "out-not"), test_out, boolean_func=_inversion)
def _main():
conf = _load_test_config()
logging.basicConfig(filename="test/logs/daemon-test.log", logging.basicConfig(filename="test/logs/daemon-test.log",
format="%(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s", format="%(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S", datefmt="%Y-%m-%d %H:%M:%S",
level=logging.DEBUG) level=logging.DEBUG)
logging.info("Starting Lacre Daemon tests...") unittest.main()
python = os.getenv("PYTHON", "python")
server = _spawn([python, "-m", "lacre.daemon"])
for case_no in range(1, conf.getint("tests", "cases") + 1):
_execute_case(conf, case_name=f"case-{case_no}")
_interrupt(server)
if __name__ == '__main__':
_main()

View File

@ -29,14 +29,20 @@ keys: test/keyhome
certs: test/certs certs: test/certs
[tests] [tests]
# Number of "test-*" sections in this file, describing test cases.
cases: 9
e2e_log: test/logs/e2e.log e2e_log: test/logs/e2e.log
e2e_log_format: %(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s e2e_log_format: %(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s
e2e_log_datefmt: %Y-%m-%d %H:%M:%S e2e_log_datefmt: %Y-%m-%d %H:%M:%S
lacre_log: test/logs/gpg-mailgate.log lacre_log: test/logs/gpg-mailgate.log
log_config: test/gpg-lacre-log.ini log_config: test/gpg-lacre-log.ini
# TEST IDENTITIES AND SETTINGS:
#
# Email Key Style
# alice@disposlab RSA 3072 PGP/Inline
# bob@disposlab ED25519 PGP/Inline
# carlos@disposlab none PGP/Inline
# evan@disposlab ED25519 PGP/MIME
[case-1] [case-1]
descr: Clear text message to a user without a key descr: Clear text message to a user without a key
to: carlos@disposlab to: carlos@disposlab
@ -80,13 +86,91 @@ in: test/msgin/clear2rsa2.msg
out: -----BEGIN PGP MESSAGE----- out: -----BEGIN PGP MESSAGE-----
[case-8] [case-8]
descr: Clear text message to address with delimiter and a user with an Ed25519 key. descr: Clear text message to address with delimiter and a user with an Ed25519 key
to: bob@disposlab to: bob+foo@disposlab
in: test/msgin/clear2ed-delim.msg in: test/msgin/clear2ed-delim.msg
out: -----BEGIN PGP MESSAGE----- out: -----BEGIN PGP MESSAGE-----
[case-9] [case-9]
descr: Clear text message with inline PGP markers to recipient without a key. descr: Clear text message with inline PGP markers to recipient with a key
to: carlos@disposlab to: bob@disposlab
in: test/msgin/with-markers2clear.msg in: test/msgin/with-markers2clear.msg
out-not: This message includes inline PGP markers. out-not: This message includes inline PGP markers.
[case-10]
descr: UTF-8 message (yields Base64)
to: carlos@disposlab
in: test/msgin/utf8.msg
out: xYHEhENaTk/FmsSGLiBaYcW6w7PFgsSHIGfEmcWbbMSFIGphxbrFhC4=
[case-11]
descr: Non-ASCII message (ISO-8859-2; yields quoted-printable)
to: carlos@disposlab
in: test/msgin/nonascii.msg
out: =A3=A1CZNO=A6=C6.
[case-12]
descr: multipart/alternative with UTF-8, not encrypted
to: carlos@disposlab
in: test/msgin/utf8-alternative.msg
out-not: -----BEGIN PGP MESSAGE-----
[case-13]
descr: multipart/alternative with UTF-8, encrypted
to: evan@disposlab
in: test/msgin/utf8-alternative.msg
out: -----BEGIN PGP MESSAGE-----
[case-14]
descr: Clear text with UTF-8, PGP/MIME
to: evan@disposlab
in: test/msgin/utf8-plain.msg
out: Content-Type: application/pgp-encrypted
[case-15]
descr: Clear text with UTF-8, PGP/Inline
to: bob@disposlab
in: test/msgin/utf8-plain.msg
out: -----BEGIN PGP MESSAGE-----
[case-16]
descr: HTML, cleartext
to: carlos@disposlab
in: test/msgin/html-utf8.msg
out: PGh0bWw+DQo8aGVhZD4NCjwvaGVhZD4NCjxib2R5Pg0KWkHFu8OTxYHEhiBHxJjFmkzEhCBKQcW5
[case-17]
descr: HTML, PGP/MIME
to: evan@disposlab
in: test/msgin/html-utf8.msg
out: -----BEGIN PGP MESSAGE-----
[case-18]
descr: HTML, PGP/Inline
to: bob@disposlab
in: test/msgin/html-utf8.msg
out: -----BEGIN PGP MESSAGE-----
[case-19]
descr: US-ASCII HTML, cleartext
to: carlos@disposlab
in: test/msgin/html-ascii.msg
out: <html>
[case-20]
descr: US-ASCII HTML, PGP/Inline
to: bob@disposlab
in: test/msgin/html-ascii.msg
out: -----BEGIN PGP MESSAGE-----
[case-21]
descr: US-ASCII HTML, PGP/MIME
to: evan@disposlab
in: test/msgin/html-ascii.msg
out: -----BEGIN PGP MESSAGE-----
[case-22]
descr: HTML with emoji, PGP/MIME
to: evan@disposlab
in: test/msgin/emoji.msg
out: -----BEGIN PGP MESSAGE-----

View File

@ -23,6 +23,8 @@ import subprocess
import configparser import configparser
import logging import logging
import unittest
RELAY_SCRIPT = "test/utils/relay.py" RELAY_SCRIPT = "test/utils/relay.py"
CONFIG_FILE = "test/gpg-mailgate.conf" CONFIG_FILE = "test/gpg-mailgate.conf"
@ -58,7 +60,6 @@ def _build_config(config):
# uses PGP/MIME. # uses PGP/MIME.
cp.set("pgp_style", "evan@disposlab", "mime") cp.set("pgp_style", "evan@disposlab", "mime")
logging.debug(f"Created config with keyhome={config['gpg_keyhome']}, cert_path={config['smime_certpath']} and relay at port {config['port']}")
return cp return cp
@ -74,77 +75,11 @@ def _write_test_config(outfile, **config):
def _load_file(name): def _load_file(name):
f = open(name, 'r') f = open(name, 'rb')
contents = f.read() contents = f.read()
f.close() f.close()
return bytes(contents, 'utf-8') return contents
def _identity(x):
return x
def _inversion(x):
return not(x)
def _report_result(message_file, expected, test_output, boolean_func=_identity):
status = None
expected_line = "\r\n" + expected # + "\r\n"
cond_met = boolean_func(expected_line in test_output)
if cond_met:
status = "Success"
else:
status = "Failure"
print(message_file.ljust(35), status)
def _execute_e2e_test(case_name, config, config_path):
"""Read test case configuration from config and run that test case.
Parameter case_name should refer to a section in test
config file. Each of these sections should contain
following properties: 'descr', 'to', 'in' and 'out'.
"""
# This environment variable is set in Makefile.
python_path = os.getenv('PYTHON', 'python3')
gpglacre_cmd = [python_path,
"gpg-mailgate.py",
config.get(case_name, "to")]
relay_cmd = [python_path,
config.get("relay", "script"),
config.get("relay", "port")]
logging.debug(f"Spawning relay: {relay_cmd}")
relay_proc = subprocess.Popen(relay_cmd,
stdin=None,
stdout=subprocess.PIPE)
logging.debug(f"Spawning GPG-Lacre: {gpglacre_cmd}, stdin = {config.get(case_name, 'in')}")
# pass PATH because otherwise it would be dropped
gpglacre_proc = subprocess.run(gpglacre_cmd,
input=_load_file(config.get(case_name, "in")),
capture_output=True,
env={"GPG_MAILGATE_CONFIG": config_path,
"PATH": os.getenv("PATH")})
# Let the relay process the data.
relay_proc.wait()
(testout, _) = relay_proc.communicate()
testout = testout.decode('utf-8')
logging.debug(f"Read {len(testout)} characters of test output: '{testout}'")
if 'out' in config[case_name]:
_report_result(config.get(case_name, "in"), config.get(case_name, "out"), testout)
else:
_report_result(config.get(case_name, "in"), config.get(case_name, "out-not"), testout, boolean_func=_inversion)
def _load_test_config(): def _load_test_config():
@ -154,28 +89,92 @@ def _load_test_config():
return cp return cp
config = _load_test_config() class SimpleMailFilterE2ETest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls._e2e_config = _load_test_config()
cls._e2e_config_path = os.path.join(os.getcwd(), CONFIG_FILE)
logging.basicConfig(filename = config.get("tests", "e2e_log"), # This environment variable is set in Makefile.
# Get raw values of log and date formats because they cls._python_path = os.getenv('PYTHON', 'python3')
# contain %-sequences and we don't want them to be expanded
# by the ConfigParser.
format = config.get("tests", "e2e_log_format", raw=True),
datefmt = config.get("tests", "e2e_log_datefmt", raw=True),
level = logging.DEBUG)
config_path = os.getcwd() + "/" + CONFIG_FILE _write_test_config(cls._e2e_config_path,
port = cls._e2e_config.get("relay", "port"),
gpg_keyhome = cls._e2e_config.get("dirs", "keys"),
smime_certpath = cls._e2e_config.get("dirs", "certs"),
log_config = cls._e2e_config.get("tests", "log_config"))
_write_test_config(config_path, def case_names(self):
port = config.get("relay", "port"), def is_test_case(case_name: str) -> bool:
gpg_keyhome = config.get("dirs", "keys"), return case_name.startswith('case-')
smime_certpath = config.get("dirs", "certs"),
log_config = config.get("tests", "log_config"))
for case_no in range(1, config.getint("tests", "cases") + 1): for tc in filter(is_test_case, self._e2e_config.sections()):
case_name = f"case-{case_no}" yield tc
logging.info(f"Executing {case_name}: {config.get(case_name, 'descr')}")
_execute_e2e_test(case_name, config, config_path) def test_all_cases(self):
for case_name in self.case_names():
with self.subTest(case=case_name):
self._execute_e2e_test(case_name)
print("See diagnostic output for details. Tests: '%s', Lacre: '%s'" % (config.get("tests", "e2e_log"), config.get("tests", "lacre_log"))) def _execute_e2e_test(self, case_name):
"""Read test case configuration from config and run that test case.
Parameter case_name should refer to a section in test
config file. Each of these sections should contain
following properties: 'descr', 'to', 'in' and 'out'.
"""
gpglacre_cmd = self._python_command(
'gpg-mailgate.py',
self._e2e_config.get(case_name, 'to'))
relay_cmd = self._python_command(
self._e2e_config.get("relay", "script"),
self._e2e_config.get("relay", "port"))
logging.debug(f"Spawning relay: {relay_cmd}")
relay_proc = subprocess.Popen(relay_cmd,
stdin=None,
stdout=subprocess.PIPE)
logging.debug(f"Spawning GPG-Lacre: {gpglacre_cmd}, stdin = {self._e2e_config.get(case_name, 'in')}")
# pass PATH because otherwise it would be dropped
gpglacre_proc = subprocess.run(gpglacre_cmd,
input=_load_file(self._e2e_config.get(case_name, "in")),
capture_output=True,
env={"GPG_MAILGATE_CONFIG": self._e2e_config_path,
"PATH": os.getenv("PATH")})
# Let the relay process the data.
relay_proc.wait()
(testout, _) = relay_proc.communicate()
testout = testout.decode('utf-8')
logging.debug(f"Read {len(testout)} characters of test output: '{testout}'")
if 'out' in self._e2e_config[case_name]:
expected = "\r\n" + self._e2e_config.get(case_name, "out")
self.assertIn(expected, testout, self._e2e_config.get(case_name, "in"))
else:
unexpected = "\r\n" + self._e2e_config.get(case_name, "out-not")
self.assertNotIn(unexpected, testout, self._e2e_config.get(case_name, "in"))
def _python_command(self, script, *args):
command = [self._python_path, script]
command.extend(args)
return command
if __name__ == '__main__':
config = _load_test_config()
logging.basicConfig(filename = config.get("tests", "e2e_log"),
# Get raw values of log and date formats because they
# contain %-sequences and we don't want them to be
# expanded by the ConfigParser.
format = config.get("tests", "e2e_log_format", raw=True),
datefmt = config.get("tests", "e2e_log_datefmt", raw=True),
level = logging.DEBUG)
unittest.main()

View File

@ -22,11 +22,16 @@ port = 2500
[daemon] [daemon]
host = localhost host = localhost
port = 10025 port = 10025
log_headers = yes
[cron] [cron]
send_email = no send_email = no
[pgp_style]
# this recipient has PGP/MIME enabled, because the default approach is to use
# PGP/Inline
evan@disposlab = mime
[enc_keymap] [enc_keymap]
alice@disposlab = 1CD245308F0963D038E88357973CF4D9387C44D7 alice@disposlab = 1CD245308F0963D038E88357973CF4D9387C44D7
bob@disposlab = 19CF4B47ECC9C47AFA84D4BD96F39FDA0E31BB67 bob@disposlab = 19CF4B47ECC9C47AFA84D4BD96F39FDA0E31BB67

View File

@ -1,20 +1,20 @@
# #
# gpg-mailgate # gpg-mailgate
# #
# This file is part of the gpg-mailgate source code. # This file is part of the gpg-mailgate source code.
# #
# gpg-mailgate is free software: you can redistribute it and/or modify # gpg-mailgate is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by # it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or # the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version. # (at your option) any later version.
# #
# gpg-mailgate source code is distributed in the hope that it will be useful, # gpg-mailgate source code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of # but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details. # GNU General Public License for more details.
# #
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>. # along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
# #
"""Unit-tests as contracts for external dependencies. """Unit-tests as contracts for external dependencies.
@ -25,68 +25,170 @@ documentation.
""" """
import email import email
import email.mime.multipart
from email.message import EmailMessage
from email.policy import SMTP
import unittest import unittest
from configparser import RawConfigParser from configparser import RawConfigParser
class EmailParsingTest(unittest.TestCase): class EmailParsingTest(unittest.TestCase):
"""This test serves as a package contract and documentation of its behaviour.""" """This test serves as a package contract and documentation of its behaviour."""
def test_message_from_bytes_produces_message_with_str_headers(self): def test_message_from_bytes_produces_message_with_str_headers(self):
rawmsg = b"From: alice@lacre.io\r\n" \ rawmsg = b"From: alice@lacre.io\r\n" \
+ b"To: bob@lacre.io\r\n" \ + b"To: bob@lacre.io\r\n" \
+ b"Subject: Test message\r\n" \ + b"Subject: Test message\r\n" \
+ b"\r\n" \ + b"\r\n" \
+ b"Test message from Alice to Bob.\r\n" + b"Test message from Alice to Bob.\r\n"
parsed = email.message_from_bytes(rawmsg) parsed = email.message_from_bytes(rawmsg)
self.assertEqual(parsed["From"], "alice@lacre.io") self.assertEqual(parsed["From"], "alice@lacre.io")
self.assertEqual(parsed["To"], "bob@lacre.io") self.assertEqual(parsed["To"], "bob@lacre.io")
self.assertEqual(parsed["Subject"], "Test message") self.assertEqual(parsed["Subject"], "Test message")
def test_bytes_message_payload_decoded_produces_bytes(self): def test_bytes_message_payload_decoded_produces_bytes(self):
rawmsg = b"From: alice@lacre.io\r\n" \ rawmsg = b"From: alice@lacre.io\r\n" \
+ b"To: bob@lacre.io\r\n" \ + b"To: bob@lacre.io\r\n" \
+ b"Subject: Test message\r\n" \ + b"Subject: Test message\r\n" \
+ b"\r\n" \ + b"\r\n" \
+ b"Test message from Alice to Bob.\r\n" + b"Test message from Alice to Bob.\r\n"
parsed = email.message_from_bytes(rawmsg) parsed = email.message_from_bytes(rawmsg)
self.assertEqual(parsed.get_payload(), "Test message from Alice to Bob.\r\n") self.assertEqual(parsed["From"], "alice@lacre.io")
self.assertEqual(parsed.get_payload(decode=True), b"Test message from Alice to Bob.\r\n") self.assertEqual(parsed.get_payload(), "Test message from Alice to Bob.\r\n")
self.assertEqual(parsed.get_payload(decode=True), b"Test message from Alice to Bob.\r\n")
def test_message_from_string_produces_message_with_str_headers(self): def test_message_from_string_produces_message_with_str_headers(self):
rawmsg = "From: alice@lacre.io\r\n" \ rawmsg = "From: alice@lacre.io\r\n" \
+ "To: bob@lacre.io\r\n" \ + "To: bob@lacre.io\r\n" \
+ "Subject: Test message\r\n" \ + "Subject: Test message\r\n" \
+ "\r\n" \ + "\r\n" \
+ "Test message from Alice to Bob.\r\n" + "Test message from Alice to Bob.\r\n"
parsed = email.message_from_string(rawmsg) parsed = email.message_from_string(rawmsg)
self.assertEqual(parsed["From"], "alice@lacre.io") self.assertEqual(parsed["From"], "alice@lacre.io")
self.assertEqual(parsed["To"], "bob@lacre.io") self.assertEqual(parsed["To"], "bob@lacre.io")
self.assertEqual(parsed["Subject"], "Test message") self.assertEqual(parsed["Subject"], "Test message")
def test_str_message_payload_decoded_produces_bytes(self): def test_str_base64_payload(self):
rawmsg = "From: alice@lacre.io\r\n" \ rawmsg = "From: alice@lacre.io\r\n" \
+ "To: bob@lacre.io\r\n" \ + "To: bob@lacre.io\r\n" \
+ "Subject: Test message\r\n" \ + "Subject: Test message\r\n" \
+ "\r\n" \ + "Content-Type: text/plain\r\n" \
+ "Test message from Alice to Bob.\r\n" + "Content-Transfer-Encoding: base64\r\n" \
+ "\r\n" \
+ "VGVzdCBtZXNzYWdlIGZyb20gQWxpY2UgdG8gQm9iLgo=\r\n"
parsed = email.message_from_string(rawmsg) parsed: EmailMessage = email.message_from_string(rawmsg, policy=SMTP)
self.assertEqual(parsed.get_payload(decode=False),
"VGVzdCBtZXNzYWdlIGZyb20gQWxpY2UgdG8gQm9iLgo=\r\n")
self.assertEqual(parsed.get_payload(decode=True),
b"Test message from Alice to Bob.\n")
self.assertEqual(parsed.get_content(),
"Test message from Alice to Bob.\n")
def test_bytes_base64_payload(self):
rawmsg = b"From: alice@lacre.io\r\n" \
+ b"To: bob@lacre.io\r\n" \
+ b"Subject: Test message\r\n" \
+ b"Content-Type: application/octet-stream\r\n" \
+ b"Content-Transfer-Encoding: base64\r\n" \
+ b"\r\n" \
+ b"VGVzdCBtZXNzYWdlIGZyb20gQWxpY2UgdG8gQm9iLgo=\r\n"
parsed: EmailMessage = email.message_from_bytes(rawmsg, policy=SMTP)
self.assertEqual(parsed.get_payload(decode=False),
"VGVzdCBtZXNzYWdlIGZyb20gQWxpY2UgdG8gQm9iLgo=\r\n")
self.assertEqual(parsed.get_payload(decode=True),
b"Test message from Alice to Bob.\n")
self.assertEqual(parsed.get_content(),
b"Test message from Alice to Bob.\n")
def test_multipart_parser(self):
rawmsg = b"Content-Type: multipart/mixed; boundary=XXXXXXXX\r\n" \
+ b"\r\n" \
+ b"--XXXXXXXX\r\n" \
+ b"Content-Type: application/octet-stream\r\n" \
+ b"Content-Transfer-Encoding: base64\r\n" \
+ b"\r\n" \
+ b"VGVzdCBtZXNzYWdlIGZyb20gQWxpY2UgdG8gQm9iLgo=\r\n" \
+ b"\r\n" \
+ b"--XXXXXXXX\r\n" \
+ b"Content-Type: application/octet-stream\r\n" \
+ b"Content-Transfer-Encoding: base64\r\n" \
+ b"\r\n" \
+ b"SGVsbG8sIFdvcmxkIQo=\r\n" \
+ b"\r\n" \
+ b"--XXXXXXXX--\r\n"
parsed: EmailMessage = email.message_from_bytes(rawmsg, policy=SMTP)
self.assertRaises(KeyError, parsed.get_content)
self.assertEqual(parsed.get_payload(0).get_content(),
b'Test message from Alice to Bob.\n')
self.assertEqual(parsed.get_payload(1).get_content(),
b'Hello, World!\n')
def test_headers_only_returns_bytes_payload(self):
rawmsg = b"From: alice@lacre.io\r\n" \
+ b"To: bob@lacre.io\r\n" \
+ b"Subject: Test message\r\n" \
+ b"Content-Type: text/plain\r\n" \
+ b"Content-Transfer-Encoding: base64\r\n" \
+ b"\r\n" \
+ b"SGVsbG8sIFdvcmxkIQo=\r\n"
parser = email.parser.BytesHeaderParser()
parsed = parser.parsebytes(rawmsg)
self.assertEqual(parsed.get_payload(decode=False), "SGVsbG8sIFdvcmxkIQo=\r\n")
self.assertEqual(parsed.get_payload(decode=True), b"Hello, World!\n")
def test_headers_only_produces_single_payload_for_multipart(self):
msg = None
with open('test/msgin/utf8-alternative.msg', 'rb') as f:
p = email.parser.BytesHeaderParser()
msg = p.parse(f)
payload = msg.get_payload()
# Taken from test/msgin/utf8-alternative.msg:
message_boundary = '6s7R3c0y2W8qiD7cU3iWyXcw'
self.assertIsInstance(payload, str)
self.assertTrue(message_boundary in payload)
class EmailTest(unittest.TestCase):
def test_boundary_generated_after_as_string_call(self):
mp = email.mime.multipart.MIMEMultipart()
self.assertTrue(mp.get_boundary() is None)
_ = mp.as_string()
self.assertFalse(mp.get_boundary() is None)
def test_content_type_params_include_mime_type(self):
p = email.message.MIMEPart()
p.set_type('text/plain')
p.set_param('charset', 'UTF-8')
p.set_param('format', 'flowed')
self.assertIn(('text/plain', ''), p.get_params())
self.assertEqual(parsed.get_payload(), "Test message from Alice to Bob.\r\n")
self.assertEqual(parsed.get_payload(decode=True), b"Test message from Alice to Bob.\r\n")
class RawConfigParserTest(unittest.TestCase): class RawConfigParserTest(unittest.TestCase):
def test_config_parser_returns_str(self): def test_config_parser_returns_str(self):
cp = RawConfigParser() cp = RawConfigParser()
cp.read("test/sample.ini") cp.read("test/sample.ini")
self.assertEqual(cp.get("foo", "bar"), "quux") self.assertEqual(cp.get("foo", "bar"), "quux")
self.assertEqual(cp.get("foo", "baz"), "14") self.assertEqual(cp.get("foo", "baz"), "14")
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -2,7 +2,6 @@ import GnuPG
import unittest import unittest
class GnuPGUtilitiesTest(unittest.TestCase): class GnuPGUtilitiesTest(unittest.TestCase):
def test_build_default_command(self): def test_build_default_command(self):
cmd = GnuPG._build_command("test/keyhome") cmd = GnuPG._build_command("test/keyhome")
@ -13,14 +12,14 @@ class GnuPGUtilitiesTest(unittest.TestCase):
self.assertEqual(cmd, ["gpg", "--homedir", "test/keyhome", "--foo", "--bar"]) self.assertEqual(cmd, ["gpg", "--homedir", "test/keyhome", "--foo", "--bar"])
def test_key_confirmation_with_matching_email(self): def test_key_confirmation_with_matching_email(self):
armored_key = self._load('test/keys/bob@disposlab') armored_key = self._load('test/keys/bob@disposlab.pub')
matching_email = 'bob@disposlab' matching_email = 'bob@disposlab'
is_confirmed = GnuPG.confirm_key(armored_key, matching_email) is_confirmed = GnuPG.confirm_key(armored_key, matching_email)
self.assertTrue(is_confirmed) self.assertTrue(is_confirmed)
def test_key_confirmation_email_mismatch(self): def test_key_confirmation_email_mismatch(self):
armored_key = self._load('test/keys/bob@disposlab') armored_key = self._load('test/keys/bob@disposlab.pub')
not_matching_email = 'lucy@disposlab' not_matching_email = 'lucy@disposlab'
is_confirmed = GnuPG.confirm_key(armored_key, not_matching_email) is_confirmed = GnuPG.confirm_key(armored_key, not_matching_email)
@ -50,6 +49,18 @@ class GnuPGUtilitiesTest(unittest.TestCase):
with open(filename) as f: with open(filename) as f:
return f.read() return f.read()
def test_parse_statusfd_key_expired(self):
key_expired = b"""
[GNUPG:] KEYEXPIRED 1668272263
[GNUPG:] KEY_CONSIDERED XXXXXXXXXXXXX 0
[GNUPG:] INV_RECP 0 name@domain
[GNUPG:] FAILURE encrypt 1
"""
result = GnuPG.parse_status(key_expired)
self.assertEqual(result['issue'], b'KEYEXPIRED')
self.assertEqual(result['recipient'], b'name@domain')
self.assertEqual(result['cause'], 'No specific reason given')
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -0,0 +1,42 @@
import lacre.core
from email.message import EmailMessage
import unittest
class LacreCoreTest(unittest.TestCase):
def test_attachment_handling(self):
m = EmailMessage()
m.set_payload('This is a payload')
m.set_param('attachment', '', 'Content-Disposition')
m.set_param('filename', 'foo', 'Content-Disposition')
lacre.core._append_gpg_extension(m)
self.assertEqual(m.get_filename(), 'foo.pgp')
def test_attachment_handling_2(self):
m = EmailMessage()
m.set_payload('This is a payload')
m.set_param('attachment', '', 'Content-Disposition')
m.set_param('name', 'quux', 'Content-Type')
lacre.core._append_gpg_extension(m)
self.assertEqual(m.get_filename(), 'quux.pgp')
def test_payload_wrapping(self):
m = EmailMessage()
m.set_payload('This is a payload.\r\n'
+ '\r\n'
+ 'It has two paragraphs.\r\n')
m['Subject'] = 'Source message'
m.set_type('text/plain')
m.set_param('charset', 'utf-8')
rewrapped = lacre.core._rewrap_payload(m)
self.assertFalse('Subject' in rewrapped,
'only content and content-type should be copied')
self.assertEqual(rewrapped.get_content_type(), 'text/plain',
'rewrapped part should have initial message\'s content-type')

View File

@ -0,0 +1,17 @@
import lacre.recipients
import unittest
class RecipientListTest(unittest.TestCase):
def test_addition(self):
a_list = lacre.recipients.RecipientList()
a_list += lacre.recipients.GpgRecipient(
'alice@disposlab',
'1CD245308F0963D038E88357973CF4D9387C44D7')
emails = [x for x in a_list.emails()]
keys = [x for x in a_list.keys()]
self.assertSequenceEqual(emails, ['alice@disposlab'])
self.assertSequenceEqual(keys, ['1CD245308F0963D038E88357973CF4D9387C44D7'])

View File

@ -1,5 +1,8 @@
import lacre.text import lacre.text
import sys import sys
from email import message_from_binary_file
from email.message import EmailMessage
from email.policy import SMTPUTF8
import unittest import unittest
@ -35,3 +38,23 @@ class LacreTextTest(unittest.TestCase):
(addr2, topic) = lacre.text.parse_delimiter(addr) (addr2, topic) = lacre.text.parse_delimiter(addr)
self.assertEqual(addr2, "Some.Name@example.com") self.assertEqual(addr2, "Some.Name@example.com")
self.assertEqual(topic, "some-topic") self.assertEqual(topic, "some-topic")
def test_pgp_inline_recognised(self):
msg = None
with open('test/msgin/ed2ed.msg', 'rb') as f:
msg = message_from_binary_file(f, policy=SMTPUTF8)
body = msg.get_payload()
self.assertIn(lacre.text.PGP_BEGIN_S, body)
self.assertIn(lacre.text.PGP_END_S, body)
self.assertTrue(lacre.text.is_payload_pgp_inline(body))
def test_pgp_marker_mentioned(self):
msg = None
with open('test/msgin/with-markers2clear.msg', 'rb') as f:
msg = message_from_binary_file(f, policy=SMTPUTF8)
body = msg.get_payload()
self.assertFalse(lacre.text.is_payload_pgp_inline(body))

18
test/msgin/emoji.msg Normal file
View File

@ -0,0 +1,18 @@
Date: Sun, 16 Apr 2023 07:29:45 +0200
MIME-Version: 1.0
User-Agent: Mozilla/5.0 (X11; FreeBSD amd64; rv:102.0) Gecko/20100101
Thunderbird/102.9.0
Content-Language: pl
To: Carlos <carlos@localhost>
From: Dave <dave@localhost>
Subject: Emoji test
Content-Type: text/html; charset=UTF-8; format=flowed
Content-Transfer-Encoding: 8bit
<html>
<head>
</head>
<body>
àèéòìù ø Ø 🙂️ 👍️ 🚗️
</body>
</html>

18
test/msgin/html-ascii.msg Normal file
View File

@ -0,0 +1,18 @@
Date: Sun, 16 Apr 2023 07:29:45 +0200
MIME-Version: 1.0
User-Agent: Mozilla/5.0 (X11; FreeBSD amd64; rv:102.0) Gecko/20100101
Thunderbird/102.9.0
Content-Language: pl
To: Carlos <carlos@localhost>
From: Dave <dave@localhost>
Subject: HTML test
Content-Type: text/html; charset=UTF-8
Content-Transfer-Encoding: 7bit
<html>
<head>
</head>
<body>
This is just an HTML email.
</body>
</html>

19
test/msgin/html-utf8.msg Normal file
View File

@ -0,0 +1,19 @@
Date: Sun, 16 Apr 2023 07:29:45 +0200
MIME-Version: 1.0
User-Agent: Mozilla/5.0 (X11; FreeBSD amd64; rv:102.0) Gecko/20100101
Thunderbird/102.9.0
Content-Language: pl
To: Carlos <carlos@localhost>
From: Dave <dave@localhost>
Subject: HTML test
Content-Type: text/html; charset=UTF-8
Content-Transfer-Encoding: 8bit
<html>
<head>
</head>
<body>
ZAŻÓŁĆ GĘŚLĄ JAŹŃ.<br>
zażółć gęślą jaźń.
</body>
</html>

7
test/msgin/nonascii.msg Normal file
View File

@ -0,0 +1,7 @@
From: Dave <dave@localhost>
To: Carlos <carlos@localhost>
Content-Type: text/plain; charset="iso-8859-2"
Content-Transfer-Encoding: 8bit
Subject: Test
£¡CZNO¦Æ. Za¼ó³æ gê¶l± ja¼ñ.

View File

@ -0,0 +1,43 @@
Content-Type: multipart/alternative;
boundary="------------6s7R3c0y2W8qiD7cU3iWyXcw"
Date: Wed, 23 Nov 2022 08:06:29 +0100
MIME-Version: 1.0
Content-Language: pl-PL
From: Dave <dave@localhost>
To: carlos@disposlab
Subject: Lorem ipsum...
This is a multi-part message in MIME format.
--------------6s7R3c0y2W8qiD7cU3iWyXcw
Content-Type: text/plain; charset=UTF-8; format=flowed
Content-Transfer-Encoding: 8bit
Современная литература - это всемирное культурное богатство, наследие
всех людей, которые находят вдохновение в книгах. Читать - значит жить,
вникать в поток мыслей других.
Współczesna literatura to światowe bogactwo kulturowe, dziedzictwo
wszystkich ludzi, którzy znajdują inspirację w książkach. Czytać to żyć,
zagłębiać się w przepływ myśli innych.
--------------6s7R3c0y2W8qiD7cU3iWyXcw
Content-Type: text/html; charset=UTF-8
Content-Transfer-Encoding: 8bit
<html>
<head>
<meta http-equiv="content-type" content="text/html; charset=UTF-8">
</head>
<body text="#dcdcdc" bgcolor="#3b3b3b">
<p>Современная литература - это всемирное культурное богатство,
наследие всех людей, которые находят вдохновение в книгах. Читать
- значит жить, вникать в поток мыслей других.<br>
<br>
Współczesna literatura to światowe bogactwo kulturowe, dziedzictwo
wszystkich ludzi, którzy znajdują inspirację w książkach. Czytać
to żyć, zagłębiać się w przepływ myśli innych.<br>
</p>
</body>
</html>
--------------6s7R3c0y2W8qiD7cU3iWyXcw--

29
test/msgin/utf8-plain.msg Normal file
View File

@ -0,0 +1,29 @@
Date: Thu, 15 Dec 2022 21:40:51 +0100
MIME-Version: 1.0
User-Agent: Mozilla/5.0 (X11; FreeBSD amd64; rv:102.0) Gecko/20100101
Thunderbird/102.5.1
Subject: Lorem_ipsum, text/plain
Content-Language: pl
To: somebody@disposlab
From: Dave <dave@localhost>
Content-Type: text/plain; charset=UTF-8; format=flowed
siema :)
poniżej tekst, o który prosiłeś. o coś takiego chodziło? :) jeśli trzeba
poprawić, daj znać!
pzdr!
łukasz
***
Современная литература - это всемирное культурное богатство, наследие
всех людей, которые находят вдохновение в книгах. Читать - значит жить,
вникать в поток мыслей других.
// tłumaczenie:
Współczesna literatura to światowe bogactwo kulturowe, dziedzictwo
wszystkich ludzi, którzy znajdują inspirację w książkach. Czytać to żyć,
zagłębiać się w przepływ myśli innych.

7
test/msgin/utf8.msg Normal file
View File

@ -0,0 +1,7 @@
From: Dave <dave@localhost>
To: Carlos <carlos@localhost>
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: 8bit
Subject: Test
ŁĄCZNOŚĆ. Zaźółć gęślą jaźń.

View File

@ -1,6 +1,6 @@
From: Dave <dave@localhost> From: Dave <dave@localhost>
To: Carlos <carlos@localhost> To: Carlos <carlos@localhost>
Subject: Test Subject: PGP markers
This message includes inline PGP markers. This message includes inline PGP markers.
It's enough to include these two lines: It's enough to include these two lines:

View File

@ -12,77 +12,100 @@ import sys
import socket import socket
import logging import logging
import email
import email.policy
EXIT_UNAVAILABLE = 1 EXIT_UNAVAILABLE = 1
EXIT_NETWORK = 2
EXIT_UNKNOWN = 3
ENCODING = 'utf-8' ENCODING = 'utf-8'
BUFFER_SIZE = 4096 BUFFER_SIZE = 4096
EOM = "\r\n.\r\n" EOM = b"\r\n.\r\n"
LAST_LINE = -3 LAST_LINE = -3
def welcome(msg): def _welcome(msg):
return b"220 %b\r\n" % (msg) return b"220 %b\r\n" % (msg)
def ok(msg = b"OK"):
def _ok(msg=b"OK"):
return b"250 %b\r\n" % (msg) return b"250 %b\r\n" % (msg)
def bye():
def _bye():
return b"251 Bye" return b"251 Bye"
def provide_message():
def _provide_message():
return b"354 Enter a message, ending it with a '.' on a line by itself\r\n" return b"354 Enter a message, ending it with a '.' on a line by itself\r\n"
def receive_and_confirm(session):
session.recv(BUFFER_SIZE)
session.sendall(ok())
def localhost_at(port): def _receive_and_confirm(session):
session.recv(BUFFER_SIZE)
session.sendall(_ok())
def _receive_and_ignore(session):
session.recv(BUFFER_SIZE)
def _localhost_at(port):
return ('127.0.0.1', port) return ('127.0.0.1', port)
def serve(port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def _receive_bytes(conn) -> bytes:
return conn.recv(BUFFER_SIZE)
def _listen(port, sock):
try: try:
s.bind(localhost_at(port)) sock.bind(_localhost_at(port))
logging.info(f"Listening on localhost, port {port}") sock.listen(1)
s.listen(1)
logging.info("Listening...")
except socket.error as e: except socket.error as e:
print("Cannot connect", e) print("Cannot connect", e)
logging.error(f"Cannot connect {e}") logging.exception('Cannot connect')
sys.exit(EXIT_UNAVAILABLE) sys.exit(EXIT_NETWORK)
def _serve(port) -> bytes:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_listen(port, s)
logging.debug("About to accept a connection...") logging.debug("About to accept a connection...")
(conn, addr) = s.accept() (conn, addr) = s.accept()
logging.debug(f"Accepting connection from {conn}") logging.debug(f"Accepting connection from {conn}")
conn.sendall(welcome(b"TEST SERVER")) conn.sendall(_welcome(b"TEST SERVER"))
receive_and_confirm(conn) # Ignore HELO/EHLO _receive_and_confirm(conn) # Ignore HELO/EHLO
receive_and_confirm(conn) # Ignore sender address _receive_and_confirm(conn) # Ignore sender address
receive_and_confirm(conn) # Ignore recipient address _receive_and_confirm(conn) # Ignore recipient address
conn.recv(BUFFER_SIZE) _receive_and_ignore(conn)
conn.sendall(provide_message()) conn.sendall(_provide_message())
# Consume until we get <CR><LF>.<CR><LF>, the end-of-message marker. # Consume until we get <CR><LF>.<CR><LF>, the end-of-message marker.
message = '' message = b''
while not message.endswith(EOM): while not message.endswith(EOM):
message += conn.recv(BUFFER_SIZE).decode(ENCODING) buf = _receive_bytes(conn)
conn.sendall(ok(b"OK, id=test")) logging.debug('Received data: %s', buf)
message += buf
conn.sendall(_ok(b"OK, id=test"))
conn.recv(BUFFER_SIZE) conn.recv(BUFFER_SIZE)
conn.sendall(bye()) conn.sendall(_bye())
conn.close() conn.close()
logging.debug(f"Received {len(message)} characters of data") logging.debug('Received %d bytes of data', len(message))
# Trim EOM marker as we're only interested in the message body. # Trim EOM marker as we're only interested in the message body.
return message[:-len(EOM)] return message[:-len(EOM)]
def error(msg, exit_code):
def _error(msg, exit_code):
logging.error(msg) logging.error(msg)
print("ERROR: %s" % (msg)) print("ERROR: %s" % (msg))
sys.exit(exit_code) sys.exit(exit_code)
@ -96,9 +119,18 @@ logging.basicConfig(filename='test/logs/relay.log',
level=logging.DEBUG) level=logging.DEBUG)
if len(sys.argv) < 2: if len(sys.argv) < 2:
error("Usage: relay.py PORT_NUMBER", EXIT_UNAVAILABLE) _error("Usage: relay.py PORT_NUMBER", EXIT_UNAVAILABLE)
port = int(sys.argv[1]) port = int(sys.argv[1])
body = serve(port)
print(body) try:
body = _serve(port)
logging.debug('Parsing message')
msg = email.message_from_bytes(body, policy=email.policy.SMTP)
print(msg)
except ConnectionResetError:
logging.exception('Communication issue')
_error('Could not receive complete message', EXIT_NETWORK)
except BrokenPipeError:
logging.exception('Pipe error')
_error('Pipe error', EXIT_UNKNOWN)

View File

@ -2,27 +2,49 @@ import logging
import smtplib import smtplib
import sys import sys
import getopt import getopt
from email import message_from_binary_file
from email.policy import SMTPUTF8
def _load_file(name): def _load_file(name) -> bytes:
f = open(name, 'r') with open(name, 'rb') as f:
contents = f.read() return f.read()
f.close()
return contents
def _send(host, port, from_addr, recipients, message): def _load_message(name):
with open(name, 'rb') as f:
return message_from_binary_file(f, policy=SMTPUTF8)
def _send_message(host, port, from_addr, recipients, message):
logging.info(f"From {from_addr} to {recipients} at {host}:{port}") logging.info(f"From {from_addr} to {recipients} at {host}:{port}")
try: try:
smtp = smtplib.SMTP(host, port) smtp = smtplib.SMTP(host, port)
# smtp.starttls() return smtp.sendmail(from_addr, recipients, message.as_bytes())
return smtp.sendmail(from_addr, recipients, message)
except smtplib.SMTPDataError as e: except smtplib.SMTPDataError as e:
logging.error(f"Couldn't deliver message. Got error: {e}") logging.error(f"Couldn't deliver message. Got error: {e}")
return None return None
except ConnectionRefusedError as e: except ConnectionRefusedError as e:
logging.exception(f"Connection refused: {e}") logging.exception(f"Connection refused: {e}")
return None return None
except:
logging.exception('Unexpected exception was thrown')
return None
# The poinf of this function is to do _almost_ what SMTP.sendmail does, but
# without enforcing ASCII. We want to test Lacre with not necessarily valid
# messages.
def _send_bytes(host: str, port, from_addr: str, recipients, message: bytes):
try:
smtp = smtplib.SMTP(host, port)
smtp.ehlo_or_helo_if_needed()
smtp.mail(from_addr)
for r in recipients:
smtp.rcpt(r)
smtp.data(message)
except:
logging.exception('Unexpected exception was thrown')
logging.basicConfig(filename="test/logs/sendmail.log", logging.basicConfig(filename="test/logs/sendmail.log",
@ -36,21 +58,15 @@ opts, _ = getopt.getopt(sys.argv[1:], "f:t:m:")
for opt, value in opts: for opt, value in opts:
if opt == "-f": if opt == "-f":
sender = value sender = value
logging.debug(f"Sender is {sender}") logging.debug(f"Sender is {sender!r}")
if opt == "-t": if opt == "-t":
recipient = value recipient = value
logging.debug(f"Recipient is {recipient}") logging.debug(f"Recipient is {recipient!r}")
if opt == "-m": if opt == "-m":
message = _load_file(value) message = _load_file(value)
logging.debug(f"Message is {message}") logging.debug(f"Message is {message}")
if message is None: if message is None or sender is None or recipient is None:
message = """\ print('Use options to provide: -f sender -t recipient -m message')
From: dave@disposlab
To: alice@disposlab
Subject: Test message
Lorem ipsum dolor sit amet. _send_bytes('localhost', 10025, sender, [recipient], message)
"""
_send('localhost', 10025, sender, [recipient], message)