When encryption fails, revert to cleartext delivery

When GnuPG refuses to encrypt a message (e.g. when key has expired), record
information about the failure and send to logs, then deliver cleartext.  This
way we won't bounce email that could be delivered without encryption.

Also: add more E2E tests.
This commit is contained in:
Piotr F. Mieszkowski 2022-12-27 18:05:27 +01:00
parent d47d91c174
commit 17f782519e
9 changed files with 195 additions and 21 deletions

View File

@ -37,6 +37,20 @@ POS_FINGERPRINT = 9
LOG = logging.getLogger(__name__)
class EncryptionException(Exception):
"""Represents a failure to encrypt a payload."""
def __init__(self, issue: str, recipient: str, cause: str):
"""Initialise an exception."""
self._issue = issue
self._recipient = recipient
self._cause = cause
def __str__(self):
"""Return human-readable string representation."""
return f"issue: {self._issue}; to: {self._recipient}; cause: {self._cause}"
def _build_command(key_home, *args, **kwargs):
cmd = ["gpg", '--homedir', key_home] + list(args)
return cmd
@ -152,11 +166,15 @@ class GPGEncryptor:
def encrypt(self):
"""Feed GnuPG with the message."""
p = subprocess.Popen(self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
encdata = p.communicate(input=self._message)[0]
encdata, err = p.communicate(input=self._message)
if p.returncode != 0:
LOG.debug('Errors: %s', err)
details = parse_status(err)
raise EncryptionException(details['issue'], details['recipient'], details['cause'])
return (encdata, p.returncode)
def _command(self):
cmd = _build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--pgp7", "--no-secmem-warning", "-a", "-e")
cmd = _build_command(self._keyhome, "--trust-model", "always", "--status-fd", "2", "--batch", "--yes", "--pgp7", "--no-secmem-warning", "-a", "-e")
# add recipients
for recipient in self._recipients:
@ -192,3 +210,62 @@ class GPGDecryptor:
def _command(self):
return _build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--no-secmem-warning", "-a", "-d")
STATUS_FD_PREFIX = b'[GNUPG:] '
STATUS_FD_PREFIX_LEN = len(STATUS_FD_PREFIX)
KEY_EXPIRED = b'KEYEXPIRED'
KEY_REVOKED = b'KEYREVOKED'
NO_RECIPIENTS = b'NO_RECP'
INVALID_RECIPIENT = b'INV_RECP'
# INV_RECP reason code descriptions.
INVALID_RECIPIENT_CAUSES = [
'No specific reason given',
'Not Found',
'Ambiguous specification',
'Wrong key usage',
'Key revoked',
'Key expired',
'No CRL known',
'CRL too old',
'Policy mismatch',
'Not a secret key',
'Key not trusted',
'Missing certificate',
'Missing issuer certificate',
'Key disabled',
'Syntax error in specification'
]
def parse_status(status_buffer: str) -> dict:
"""Parse --status-fd output and return important information."""
return parse_status_lines(status_buffer.splitlines())
def parse_status_lines(lines: list) -> dict:
"""Parse --status-fd output and return important information."""
result = {'issue': 'n/a', 'recipient': 'n/a', 'cause': 'Unknown'}
LOG.debug('Processing stderr lines %s', lines)
for line in lines:
LOG.debug('At gnupg stderr line %s', line)
if not line.startswith(STATUS_FD_PREFIX):
continue
if line.startswith(KEY_EXPIRED, STATUS_FD_PREFIX_LEN):
result['issue'] = KEY_EXPIRED
elif line.startswith(KEY_REVOKED, STATUS_FD_PREFIX_LEN):
result['issue'] = KEY_REVOKED
elif line.startswith(NO_RECIPIENTS, STATUS_FD_PREFIX_LEN):
result['issue'] = NO_RECIPIENTS
elif line.startswith(INVALID_RECIPIENT, STATUS_FD_PREFIX_LEN):
words = line.split(b' ')
reason_code = int(words[2])
result['recipient'] = words[3]
result['cause'] = INVALID_RECIPIENT_CAUSES[reason_code]
return result

View File

@ -357,9 +357,6 @@ def _encrypt_payload(payload, gpg_to_cmdline, check_nested=True):
gpg.update(raw_payload)
encrypted_data, returncode = gpg.encrypt()
LOG.debug("Return code from encryption=%d (0 indicates success)." % returncode)
if returncode != 0:
LOG.info("Encrytion failed with return code %d. Encryption aborted." % returncode)
return payload
payload.set_payload(encrypted_data)
isAttachment = payload.get_param('attachment', None, 'Content-Disposition') is not None

View File

@ -4,6 +4,7 @@ import logging
import lacre
from lacre.text import DOUBLE_EOL_BYTES
import lacre.config as conf
from GnuPG import EncryptionException
import sys
from aiosmtpd.controller import Controller
from aiosmtpd.smtp import Envelope
@ -26,6 +27,7 @@ LOG = logging.getLogger('lacre.daemon')
import lacre.core as gate
import lacre.keyring as kcache
from lacre.mailop import KeepIntact
class MailEncryptionProxy:
@ -55,8 +57,16 @@ class MailEncryptionProxy:
for operation in gate.delivery_plan(envelope.rcpt_tos, message, keys):
LOG.debug(f"Sending mail via {operation!r}")
new_message = operation.perform(message)
gate.send_msg_bytes(new_message, operation.recipients(), envelope.mail_from)
try:
new_message = operation.perform(message)
gate.send_msg_bytes(new_message, operation.recipients(), envelope.mail_from)
except EncryptionException:
# If the message can't be encrypted, deliver cleartext.
LOG.exception('Unable to encrypt message, delivering in cleartext')
if not isinstance(operation, KeepIntact):
self._send_unencrypted(operation, message, envelope)
else:
LOG.error(f'Cannot perform {operation}')
except:
LOG.exception('Unexpected exception caught, bouncing message')
@ -67,6 +77,11 @@ class MailEncryptionProxy:
return RESULT_OK
def _send_unencrypted(self, operation, message, envelope):
keep = KeepIntact(operation.recipients())
new_message = keep.perform(message)
gate.send_msg_bytes(new_message, operation.recipients(), envelope.mail_from)
def _beginning(self, e: Envelope) -> bytes:
double_eol_pos = e.original_content.find(DOUBLE_EOL_BYTES)
if double_eol_pos < 0:

View File

@ -28,6 +28,7 @@ def _spawn(cmd):
env_dict = {
"PATH": os.getenv("PATH"),
"PYTHONPATH": os.getcwd(),
"LANG": 'en_US.UTF-8',
"GPG_MAILGATE_CONFIG": "test/gpg-mailgate-daemon-test.conf"
}
logging.debug(f"Spawning command: {cmd} with environment: {env_dict!r}")

View File

@ -30,7 +30,7 @@ certs: test/certs
[tests]
# Number of "test-*" sections in this file, describing test cases.
cases: 12
cases: 15
e2e_log: test/logs/e2e.log
e2e_log_format: %(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s
e2e_log_datefmt: %Y-%m-%d %H:%M:%S
@ -80,13 +80,13 @@ in: test/msgin/clear2rsa2.msg
out: -----BEGIN PGP MESSAGE-----
[case-8]
descr: Clear text message to address with delimiter and a user with an Ed25519 key.
descr: Clear text message to address with delimiter and a user with an Ed25519 key
to: bob@disposlab
in: test/msgin/clear2ed-delim.msg
out: -----BEGIN PGP MESSAGE-----
[case-9]
descr: Clear text message with inline PGP markers to recipient without a key.
descr: Clear text message with inline PGP markers to recipient without a key
to: carlos@disposlab
in: test/msgin/with-markers2clear.msg
out-not: This message includes inline PGP markers.
@ -104,7 +104,25 @@ in: test/msgin/nonascii.msg
out: =A3=A1CZNO=A6=C6.
[case-12]
descr: Russian, with UTF-8, producing Base64
descr: multipart/alternative with UTF-8, not encrypted
to: carlos@disposlab
in: test/msgin/Lorem_ipsum_ru.msg
out: 0KHQvtCy0YDQtdC80
in: test/msgin/utf8-alternative.msg
out-not: -----BEGIN PGP MESSAGE-----
[case-13]
descr: multipart/alternative with UTF-8, encrypted
to: evan@disposlab
in: test/msgin/utf8-alternative.msg
out: -----BEGIN PGP MESSAGE-----
[case-14]
descr: Clear text with UTF-8, PGP/MIME
to: evan@disposlab
in: test/msgin/utf8-plain.msg
out: -----BEGIN PGP MESSAGE-----
[case-15]
descr: Clear text with UTF-8, PGP/Inline
to: carlos@disposlab
in: test/msgin/utf8-plain.msg
out-not: -----BEGIN PGP MESSAGE-----

View File

@ -27,6 +27,9 @@ log_headers = yes
[cron]
send_email = no
[pgp_style]
evan@disposlab = mime
[enc_keymap]
alice@disposlab = 1CD245308F0963D038E88357973CF4D9387C44D7
bob@disposlab = 19CF4B47ECC9C47AFA84D4BD96F39FDA0E31BB67

View File

@ -3,14 +3,26 @@ import GnuPG
import unittest
class GnuPGUtilitiesTest(unittest.TestCase):
def test_build_default_command(self):
cmd = GnuPG._build_command("test/keyhome")
self.assertEqual(cmd, ["gpg", "--homedir", "test/keyhome"])
def test_build_default_command(self):
cmd = GnuPG._build_command("test/keyhome")
self.assertEqual(cmd, ["gpg", "--homedir", "test/keyhome"])
def test_build_command_extended_with_args(self):
cmd = GnuPG._build_command("test/keyhome", "--foo", "--bar")
self.assertEqual(cmd, ["gpg", "--homedir", "test/keyhome", "--foo", "--bar"])
def test_build_command_extended_with_args(self):
cmd = GnuPG._build_command("test/keyhome", "--foo", "--bar")
self.assertEqual(cmd, ["gpg", "--homedir", "test/keyhome", "--foo", "--bar"])
def test_parse_statusfd_key_expired(self):
key_expired = b"""
[GNUPG:] KEYEXPIRED 1668272263
[GNUPG:] KEY_CONSIDERED XXXXXXXXXXXXX 0
[GNUPG:] INV_RECP 0 name@domain
[GNUPG:] FAILURE encrypt 1
"""
result = GnuPG.parse_status(key_expired)
self.assertEqual(result['issue'], b'KEYEXPIRED')
self.assertEqual(result['recipient'], b'name@domain')
self.assertEqual(result['cause'], 'No specific reason given')
if __name__ == '__main__':
unittest.main()
unittest.main()

View File

@ -3,7 +3,7 @@ Content-Type: multipart/alternative;
Date: Wed, 23 Nov 2022 08:06:29 +0100
MIME-Version: 1.0
Content-Language: pl-PL
From: Sender <lukas@lacre.io>
From: Dave <dave@localhost>
To: carlos@disposlab
Subject: Lorem ipsum...

51
test/msgin/utf8-plain.msg Normal file
View File

@ -0,0 +1,51 @@
Date: Thu, 15 Dec 2022 21:40:51 +0100
MIME-Version: 1.0
User-Agent: Mozilla/5.0 (X11; FreeBSD amd64; rv:102.0) Gecko/20100101
Thunderbird/102.5.1
Subject: Lorem_ipsum, text/plain
Content-Language: pl
To: somebody@disposlab
From: Dave <dave@localhost>
Content-Type: text/plain; charset=UTF-8; format=flowed
Let's try once again to inspect headers.
--- Treść przekazanej wiadomości ---
Temat: Lorem ipsum...
Data: Wed, 23 Nov 2022 08:06:29 +0100
Nadawca: Lukas Discrust <lukaso.dasein@riseup.net>
Adresat: pfm@disroot.org
siema :)
poniżej tekst, o który prosiłeś. o coś takiego chodziło? :) jeśli trzeba
poprawić, daj znać!
pzdr!
łukasz
***
Современная литература - это всемирное культурное богатство, наследие
всех людей, которые находят вдохновение в книгах. Читать - значит жить,
вникать в поток мыслей других.
// tłumaczenie:
Współczesna literatura to światowe bogactwo kulturowe, dziedzictwo
wszystkich ludzi, którzy znajdują inspirację w książkach. Czytać to żyć,
zagłębiać się w przepływ myśli innych.
--
Lukas
website → discrust.pl
gemini →gemini://nuclear.discrust.pl
microblog → chaos.social/@lukaso666
XMPP →discrust@colloquy.ca (OMEMO)
IRC → ##discrust (Libera.Chat)
Tox → ask me for my ID