From 558872d9d0bc559e76aaaf91c8ac70113bb236ba Mon Sep 17 00:00:00 2001 From: "Piotr F. Mieszkowski" Date: Mon, 16 May 2022 20:57:12 +0200 Subject: [PATCH 1/7] Start documenting dependency contracts with unit tests Implement some unit tests for 'email' package so we know precisely how this package behaves. --- test/test_contracts.py | 62 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 test/test_contracts.py diff --git a/test/test_contracts.py b/test/test_contracts.py new file mode 100644 index 0000000..3d9cfe1 --- /dev/null +++ b/test/test_contracts.py @@ -0,0 +1,62 @@ +# +# gpg-mailgate +# +# This file is part of the gpg-mailgate source code. +# +# gpg-mailgate is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# gpg-mailgate source code is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with gpg-mailgate source code. If not, see . +# + +"""Unit-tests as contracts for external dependencies. + +Unit tests defined here are our contracts for the dependencies used by Lacre. +Since not all software is documented thoroughly, they are also a form of +documentation. +""" + +import email +import unittest + +class EmailParsingTest(unittest.TestCase): + """This test serves as a package contract and documentation of its behaviour.""" + + def test_message_from_bytes_produces_message_with_str_headers(self): + rawmsg = b"From: alice@lacre.io\r\n" \ + + b"To: bob@lacre.io\r\n" \ + + b"Subject: Test message\r\n" \ + + b"\r\n" \ + + b"Test message from Alice to Bob.\r\n" + + parsed = email.message_from_bytes(rawmsg) + + self.assertEqual(parsed["From"], "alice@lacre.io") + self.assertEqual(parsed["To"], "bob@lacre.io") + self.assertEqual(parsed["Subject"], "Test message") + self.assertEqual(parsed.get_payload(), "Test message from Alice to Bob.\r\n") + + def test_message_from_string_produces_message_with_str_headers(self): + rawmsg = "From: alice@lacre.io\r\n" \ + + "To: bob@lacre.io\r\n" \ + + "Subject: Test message\r\n" \ + + "\r\n" \ + + "Test message from Alice to Bob.\r\n" + + parsed = email.message_from_string(rawmsg) + + self.assertEqual(parsed["From"], "alice@lacre.io") + self.assertEqual(parsed["To"], "bob@lacre.io") + self.assertEqual(parsed["Subject"], "Test message") + self.assertEqual(parsed.get_payload(), "Test message from Alice to Bob.\r\n") + +if __name__ == '__main__': + unittest.main() From 707fc96234b869e5a15532df9e4fcddb3da10686 Mon Sep 17 00:00:00 2001 From: "Piotr F. Mieszkowski" Date: Wed, 25 May 2022 22:13:40 +0200 Subject: [PATCH 2/7] Add more contract tests - Verify that Message.get_payload() returns str, unless passed decode=True, when it returns bytes. - Verify that RawConfigParser returns str. --- test/sample.ini | 3 +++ test/test_contracts.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) create mode 100644 test/sample.ini diff --git a/test/sample.ini b/test/sample.ini new file mode 100644 index 0000000..6caa202 --- /dev/null +++ b/test/sample.ini @@ -0,0 +1,3 @@ +[foo] +bar: quux +baz: 14 diff --git a/test/test_contracts.py b/test/test_contracts.py index 3d9cfe1..fd6521d 100644 --- a/test/test_contracts.py +++ b/test/test_contracts.py @@ -26,6 +26,7 @@ documentation. import email import unittest +from configparser import RawConfigParser class EmailParsingTest(unittest.TestCase): """This test serves as a package contract and documentation of its behaviour.""" @@ -42,7 +43,18 @@ class EmailParsingTest(unittest.TestCase): self.assertEqual(parsed["From"], "alice@lacre.io") self.assertEqual(parsed["To"], "bob@lacre.io") self.assertEqual(parsed["Subject"], "Test message") + + def test_bytes_message_payload_decoded_produces_bytes(self): + rawmsg = b"From: alice@lacre.io\r\n" \ + + b"To: bob@lacre.io\r\n" \ + + b"Subject: Test message\r\n" \ + + b"\r\n" \ + + b"Test message from Alice to Bob.\r\n" + + parsed = email.message_from_bytes(rawmsg) + self.assertEqual(parsed.get_payload(), "Test message from Alice to Bob.\r\n") + self.assertEqual(parsed.get_payload(decode=True), b"Test message from Alice to Bob.\r\n") def test_message_from_string_produces_message_with_str_headers(self): rawmsg = "From: alice@lacre.io\r\n" \ @@ -56,7 +68,25 @@ class EmailParsingTest(unittest.TestCase): self.assertEqual(parsed["From"], "alice@lacre.io") self.assertEqual(parsed["To"], "bob@lacre.io") self.assertEqual(parsed["Subject"], "Test message") + + def test_str_message_payload_decoded_produces_bytes(self): + rawmsg = "From: alice@lacre.io\r\n" \ + + "To: bob@lacre.io\r\n" \ + + "Subject: Test message\r\n" \ + + "\r\n" \ + + "Test message from Alice to Bob.\r\n" + + parsed = email.message_from_string(rawmsg) + self.assertEqual(parsed.get_payload(), "Test message from Alice to Bob.\r\n") + self.assertEqual(parsed.get_payload(decode=True), b"Test message from Alice to Bob.\r\n") + +class RawConfigParserTest(unittest.TestCase): + def test_config_parser_returns_str(self): + cp = RawConfigParser() + cp.read("test/sample.ini") + self.assertEqual(cp.get("foo", "bar"), "quux") + self.assertEqual(cp.get("foo", "baz"), "14") if __name__ == '__main__': unittest.main() From 3bcc1151e5e9dc59320b0691500b52599d93b508 Mon Sep 17 00:00:00 2001 From: "Piotr F. Mieszkowski" Date: Mon, 30 May 2022 00:45:33 +0200 Subject: [PATCH 3/7] Add E2E case: a user with a key and PGP/MIME configured - Add a new test input message for a new test identity, test scenario configuration and a test key. - While retrieving message payload, determine charset based on the Content-Type header. When missing, default to UTF-8. - Use more comprehensible variables names. - Adjust logging levels. --- GnuPG/__init__.py | 16 +------------- gpg-mailgate.py | 43 ++++++++++++++++++++++++-------------- test/e2e.ini | 8 ++++++- test/e2e_test.py | 6 ++++++ test/keyhome/pubring.kbx | Bin 3088 -> 4246 bytes test/msgin/clear2rsa2.msg | 6 ++++++ 6 files changed, 47 insertions(+), 32 deletions(-) create mode 100644 test/msgin/clear2rsa2.msg diff --git a/GnuPG/__init__.py b/GnuPG/__init__.py index 15a47af..3a9c2c2 100644 --- a/GnuPG/__init__.py +++ b/GnuPG/__init__.py @@ -35,20 +35,6 @@ def build_command(key_home, *args, **kwargs): cmd = ["gpg", '--homedir', key_home] + list(args) return cmd -def private_keys( keyhome ): - cmd = build_command(keyhome, '--list-secret-keys', '--with-colons') - p = subprocess.Popen( cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) - p.wait() - keys = dict() - for line in p.stdout.readlines(): - if line[0:3] == 'uid' or line[0:3] == 'sec': - if ('<' not in line or '>' not in line): - continue - email = line.split('<')[1].split('>')[0] - fingerprint = line.split(':')[4] - keys[fingerprint] = email - return keys - def public_keys( keyhome ): cmd = build_command(keyhome, '--list-keys', '--with-colons') p = subprocess.Popen( cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) @@ -58,7 +44,7 @@ def public_keys( keyhome ): fingerprint = None email = None for line in p.stdout.readlines(): - line = line.decode('utf-8') + line = line.decode(sys.getdefaultencoding()) if line[0:3] == LINE_FINGERPRINT: fingerprint = line.split(':')[POS_FINGERPRINT] if line[0:3] == LINE_USER_ID: diff --git a/gpg-mailgate.py b/gpg-mailgate.py index 7d45855..f4cace7 100755 --- a/gpg-mailgate.py +++ b/gpg-mailgate.py @@ -113,7 +113,7 @@ def gpg_encrypt( raw_message, recipients ): else: # Log message only if an unknown style is defined if conf.config_item_set('pgp_style', rcpt[0]): - LOG.info("Style %s for recipient %s is not known. Use default as fallback." % (conf.get_item("pgp_style", rcpt[0]), rcpt[0])) + LOG.debug("Style %s for recipient %s is not known. Use default as fallback." % (conf.get_item("pgp_style", rcpt[0]), rcpt[0])) # If no style is in settings defined for recipient, use default from settings if conf.config_item_equals('default', 'mime_conversion', 'yes'): @@ -177,17 +177,17 @@ def encrypt_all_payloads_inline( message, gpg_to_cmdline ): def encrypt_all_payloads_mime( message, gpg_to_cmdline ): # Convert a plain text email into PGP/MIME attachment style. Modeled after enigmail. - submsg1 = email.message.Message() - submsg1.set_payload("Version: 1\n") - submsg1.set_type("application/pgp-encrypted") - submsg1.set_param('PGP/MIME version identification', "", 'Content-Description' ) + pgp_ver_part = email.message.Message() + pgp_ver_part.set_payload("Version: 1\n") + pgp_ver_part.set_type("application/pgp-encrypted") + pgp_ver_part.set_param('PGP/MIME version identification', "", 'Content-Description' ) - submsg2 = email.message.Message() - submsg2.set_type("application/octet-stream") - submsg2.set_param('name', "encrypted.asc") - submsg2.set_param('OpenPGP encrypted message', "", 'Content-Description' ) - submsg2.set_param('inline', "", 'Content-Disposition' ) - submsg2.set_param('filename', "encrypted.asc", 'Content-Disposition' ) + encrypted_part = email.message.Message() + encrypted_part.set_type("application/octet-stream") + encrypted_part.set_param('name', "encrypted.asc") + encrypted_part.set_param('OpenPGP encrypted message', "", 'Content-Description' ) + encrypted_part.set_param('inline', "", 'Content-Disposition' ) + encrypted_part.set_param('filename', "encrypted.asc", 'Content-Disposition' ) if isinstance(message.get_payload(), str): # WTF! It seems to swallow the first line. Not sure why. Perhaps @@ -195,13 +195,16 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ): # Workaround it here by prepending a blank line. # This happens only on text only messages. additionalSubHeader="" + encoding = sys.getdefaultencoding() if 'Content-Type' in message and not message['Content-Type'].startswith('multipart'): additionalSubHeader="Content-Type: "+message['Content-Type']+"\n" - submsg2.set_payload(additionalSubHeader+"\n" +message.get_payload(decode=True)) + (base, encoding) = parse_content_type(message['Content-Type']) + LOG.debug(f"Identified encoding as {encoding}") + encrypted_part.set_payload(additionalSubHeader+"\n" +message.get_payload(decode=True).decode(encoding)) check_nested = True else: processed_payloads = generate_message_from_payloads(message) - submsg2.set_payload(processed_payloads.as_string()) + encrypted_part.set_payload(processed_payloads.as_string()) check_nested = False message.preamble = "This is an OpenPGP/MIME encrypted message (RFC 2440 and 3156)" @@ -217,7 +220,15 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ): else: message['Content-Type'] = "multipart/encrypted; protocol=\"application/pgp-encrypted\";\nboundary=\"%s\"\n" % boundary - return [ submsg1, encrypt_payload(submsg2, gpg_to_cmdline, check_nested) ] + return [ pgp_ver_part, encrypt_payload(encrypted_part, gpg_to_cmdline, check_nested) ] + +def parse_content_type(content_type): + split_at = content_type.index(';') + second_part = content_type[split_at+1 : ].strip() + if second_part.startswith('charset'): + return (content_type[0 : split_at], second_part[second_part.index('=') + 1 : ].strip()) + else: + return (content_type[0 : split_at], sys.getdefaultencoding()) def encrypt_payload( payload, gpg_to_cmdline, check_nested = True ): global LOG @@ -297,11 +308,11 @@ def smime_encrypt( raw_message, recipients ): s.write(out, p7) - LOG.debug("Sending message from " + from_addr + " to " + str(smime_to)) + LOG.debug(f"Sending message from {from_addr} to {smime_to}") send_msg(out.read(), smime_to) if unsmime_to: - LOG.debug("Unable to find valid S/MIME certificates for " + str(unsmime_to)) + LOG.debug(f"Unable to find valid S/MIME certificates for {unsmime_to}") return unsmime_to diff --git a/test/e2e.ini b/test/e2e.ini index ba32999..e6ba7c7 100644 --- a/test/e2e.ini +++ b/test/e2e.ini @@ -30,7 +30,7 @@ certs: test/certs [tests] # Number of "test-*" sections in this file, describing test cases. -cases: 6 +cases: 7 e2e_log: test/logs/e2e.log e2e_log_format: %(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s e2e_log_datefmt: %Y-%m-%d %H:%M:%S @@ -72,3 +72,9 @@ descr: Multipart encrypted message to a user with an Ed25519 key. to: bob@disposlab in: test/msgin/multipart2rsa.msg out: -----BEGIN PGP MESSAGE----- + +[case-7] +descr: Clear text message to a user with an RSA key and PGP/MIME enabled in configuration +to: evan@disposlab +in: test/msgin/clear2rsa2.msg +out: -----BEGIN PGP MESSAGE----- diff --git a/test/e2e_test.py b/test/e2e_test.py index 2898528..2ba8905 100644 --- a/test/e2e_test.py +++ b/test/e2e_test.py @@ -51,6 +51,12 @@ def build_config(config): cp.add_section("enc_keymap") cp.set("enc_keymap", "alice@disposlab", "1CD245308F0963D038E88357973CF4D9387C44D7") cp.set("enc_keymap", "bob@disposlab", "19CF4B47ECC9C47AFA84D4BD96F39FDA0E31BB67") + cp.set("enc_keymap", "evan@disposlab", "530B1BB2D0CC7971648198BBA4774E507D3AF5BC") + + cp.add_section("pgp_style") + # Default style is PGP/Inline, so to cover more branches, one test identity + # uses PGP/MIME. + cp.set("pgp_style", "evan@disposlab", "mime") logging.debug(f"Created config with keyhome={config['gpg_keyhome']}, cert_path={config['smime_certpath']} and relay at port {config['port']}") return cp diff --git a/test/keyhome/pubring.kbx b/test/keyhome/pubring.kbx index 83e1be16f71dfa5c39e580f11efc92404693ebe4..36d2ff0a0338d52e4ca9289c29d6d87feed4a7b5 100644 GIT binary patch delta 1066 zcmbOrF->uU08c#w14|ndBLfJmV_;xD!obWR6U;5W>B5=H!j#4tyO)&v1=L!7-2;?Y z04ZAXf71&+Yg4r#9V-P3_1>%vI|Oe6g>67`m#!|8b&l?y|5~rv>bOhIwh5^RIDx_u zAUPmlV&DOipMjVU#ARS$+{nPqz@Q0cvoNqsoG3bR0b4yo(&P_yAh8+7EJ>5!d=lf- zW@BVwyyec%$jI)n^Mm!0wXJ1q1J2xfxwInlX<=|4`_fO`8FF>iz5&TQk8S|kkzSAv z@e#uozSOeBJcpFb;)49*oWvxkEECvDhK^}0!eSf@c4CSw2rmmD{LIP(vVZbKCe?aa zFl3)#_#gOj*{oACH@Hs8rM_cGTOV1qNJsP3HR<`L{5wCfGZ$JiGQ8^BVi1|Q#_C_< zt~)wX0xY@N=`$qX`7A5p_*-+v>m}m`HU?&(BN?`6x&oc20K#EFtWi=_nyLU2(p0cP zarop3T+&hqH=;W6-C-0bzHMjtA5t9N`t+G>cdJwW^oK{BdG)xhqQe_(&RvzPUa@_G zxevqt_z;;*e)(E|T%Ybbv+?+$2XQ+KJsC>4};>1MV-6A`*&(<@E?k+w#y;KZIKhmIa#!GyVH~YUB4q$IC+}+pk-?r=5}E!i7~u<#msj%!>QJc?s9Y zCpK|1r>o|KtE|^fwv3v0iDyGSG){L|fC7wPhzp!*r-w4Kf|9L^Nn|nSsb|{Db?lQ8 zoVX7+%+8K!*?lW-{j$~x`wQ*DnHf3QJ1PhV8b6aX4=4;^fp)B!;jduF+Eo+xMyxvf z>hbRfFIJspo;mq3Oo&;y7`=Lcn5#Cqz0Rt +Subject: Test +Content-Type: text/plain; charset="utf-8" + +Body of the message. From 4c6fdc52ec0c3a5d8ac1c576975b46e6104b8f73 Mon Sep 17 00:00:00 2001 From: "Piotr F. Mieszkowski" Date: Tue, 31 May 2022 22:09:10 +0200 Subject: [PATCH 4/7] Check mandatory config early, add tests Also: extend failover logging configuration with file-based handler to make sure that the user gets _some_ logs even if they do not configure Lacre at all. --- gpg-mailgate.py | 23 ++++++++++------------- lacre/__init__.py | 8 +++++++- lacre/config.py | 17 +++++++++++++++++ lacre/text.py | 11 +++++++++++ test/test_lacre_text.py | 18 ++++++++++++++++++ 5 files changed, 63 insertions(+), 14 deletions(-) create mode 100644 lacre/text.py create mode 100644 test/test_lacre_text.py diff --git a/gpg-mailgate.py b/gpg-mailgate.py index f4cace7..0b1714e 100755 --- a/gpg-mailgate.py +++ b/gpg-mailgate.py @@ -41,8 +41,12 @@ from email.mime.message import MIMEMessage import logging import lacre +import lacre.text as text import lacre.config as conf +# Exit code taken from : +EX_CONFIG = 78 + def gpg_encrypt( raw_message, recipients ): global LOG @@ -198,7 +202,7 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ): encoding = sys.getdefaultencoding() if 'Content-Type' in message and not message['Content-Type'].startswith('multipart'): additionalSubHeader="Content-Type: "+message['Content-Type']+"\n" - (base, encoding) = parse_content_type(message['Content-Type']) + (base, encoding) = text.parse_content_type(message['Content-Type']) LOG.debug(f"Identified encoding as {encoding}") encrypted_part.set_payload(additionalSubHeader+"\n" +message.get_payload(decode=True).decode(encoding)) check_nested = True @@ -222,14 +226,6 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ): return [ pgp_ver_part, encrypt_payload(encrypted_part, gpg_to_cmdline, check_nested) ] -def parse_content_type(content_type): - split_at = content_type.index(';') - second_part = content_type[split_at+1 : ].strip() - if second_part.startswith('charset'): - return (content_type[0 : split_at], second_part[second_part.index('=') + 1 : ].strip()) - else: - return (content_type[0 : split_at], sys.getdefaultencoding()) - def encrypt_payload( payload, gpg_to_cmdline, check_nested = True ): global LOG @@ -380,9 +376,6 @@ def send_msg( message, recipients ): recipients = [_f for _f in recipients if _f] if recipients: - if not (conf.config_item_set('relay', 'host') and conf.config_item_set('relay', 'port')): - LOG.warning("Missing settings for relay. Sending email aborted.") - return None LOG.info("Sending email to: <%s>" % '> <'.join( recipients )) relay = (conf.get_item('relay', 'host'), int(conf.get_item('relay', 'port'))) smtp = smtplib.SMTP(relay[0], relay[1]) @@ -434,9 +427,13 @@ def sort_recipients( raw_message, from_addr, to_addrs ): conf.load_config() lacre.init_logging(conf.get_item('logging', 'config')) - LOG = logging.getLogger(__name__) +missing_params = conf.validate_config() +if missing_params: + LOG.error(f"Aborting delivery! Following mandatory config parameters are missing: {missing_params!r}") + sys.exit(EX_CONFIG) + # Read e-mail from stdin raw = sys.stdin.read() raw_message = email.message_from_string( raw ) diff --git a/lacre/__init__.py b/lacre/__init__.py index 44fb255..15175a9 100644 --- a/lacre/__init__.py +++ b/lacre/__init__.py @@ -20,11 +20,17 @@ FAIL_OVER_LOGGING_CONFIG = { 'class': 'logging.handlers.SysLogHandler', 'level': 'INFO', 'formatter': 'sysfmt' + }, + 'lacrelog': { + 'class': 'logging.FileHandler', + 'level': 'INFO', + 'formatter': 'sysfmt', + 'filename': 'lacre.log' } }, 'root': { 'level': 'INFO', - 'handlers': ['syslog'] + 'handlers': ['syslog', 'lacrelog'] } } diff --git a/lacre/config.py b/lacre/config.py index 0b5fb77..2d2c089 100644 --- a/lacre/config.py +++ b/lacre/config.py @@ -13,6 +13,11 @@ import os # testable. CONFIG_PATH_ENV = "GPG_MAILGATE_CONFIG" +# List of mandatory configuration parameters. Each item on this list should be +# a pair: a section name and a parameter name. +MANDATORY_CONFIG_ITEMS = [("relay", "host"), + ("relay", "port")] + # Global dict to keep configuration parameters. It's hidden behind several # utility functions to make it easy to replace it with ConfigParser object in # the future. @@ -68,3 +73,15 @@ def config_item_set(section, key) -> bool: def config_item_equals(section, key, value) -> bool: global cfg return section in cfg and key in cfg[section] and cfg[section][key] == value + +def validate_config(): + """Checks whether the configuration is complete. + + Returns a list of missing parameters, so an empty list means + configuration is complete. + """ + missing = [] + for (section, param) in MANDATORY_CONFIG_ITEMS: + if not config_item_set(section, param): + missing.append((section, param)) + return missing diff --git a/lacre/text.py b/lacre/text.py new file mode 100644 index 0000000..41f2166 --- /dev/null +++ b/lacre/text.py @@ -0,0 +1,11 @@ +import sys + +def parse_content_type(content_type): + split_at = content_type.find(';') + if split_at < 0: + return (content_type, sys.getdefaultencoding()) + second_part = content_type[split_at+1 : ].strip() + if second_part.startswith('charset'): + return (content_type[0 : split_at], second_part[second_part.index('=') + 1 : ].strip()) + else: + return (content_type[0 : split_at], sys.getdefaultencoding()) diff --git a/test/test_lacre_text.py b/test/test_lacre_text.py new file mode 100644 index 0000000..5ada013 --- /dev/null +++ b/test/test_lacre_text.py @@ -0,0 +1,18 @@ +import lacre.text +import sys + +import unittest + +class LacreTextTest(unittest.TestCase): + def test_parse_content_type(self): + (mtype, mcharset) = lacre.text.parse_content_type('text/plain') + self.assertEqual(mtype, 'text/plain') + self.assertEqual(mcharset, sys.getdefaultencoding()) + + (mtype, mcharset) = lacre.text.parse_content_type('text/plain; charset="UTF-8"') + self.assertEqual(mtype, 'text/plain') + self.assertEqual(mcharset, '"UTF-8"') + + (mtype, mcharset) = lacre.text.parse_content_type('text/plain; some-param="Some Value"') + self.assertEqual(mtype, 'text/plain') + self.assertEqual(mcharset, sys.getdefaultencoding()) From d3b17172901e3add22362de3a3fc6feb4d7a97bf Mon Sep 17 00:00:00 2001 From: "Piotr F. Mieszkowski" Date: Wed, 1 Jun 2022 23:00:05 +0200 Subject: [PATCH 5/7] Extract PGP/INLINE checks, remove unnecessary byte-check --- gpg-mailgate.py | 24 +++++++----------------- lacre/text.py | 7 +++++++ 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/gpg-mailgate.py b/gpg-mailgate.py index 0b1714e..13e2a38 100755 --- a/gpg-mailgate.py +++ b/gpg-mailgate.py @@ -179,7 +179,6 @@ def encrypt_all_payloads_inline( message, gpg_to_cmdline ): return encrypted_payloads def encrypt_all_payloads_mime( message, gpg_to_cmdline ): - # Convert a plain text email into PGP/MIME attachment style. Modeled after enigmail. pgp_ver_part = email.message.Message() pgp_ver_part.set_payload("Version: 1\n") @@ -230,7 +229,7 @@ def encrypt_payload( payload, gpg_to_cmdline, check_nested = True ): global LOG raw_payload = payload.get_payload(decode=True) - if check_nested and b"-----BEGIN PGP MESSAGE-----" in raw_payload and b"-----END PGP MESSAGE-----" in raw_payload: + if check_nested and text.is_pgp_inline(raw_payload): LOG.debug("Message is already pgp encrypted. No nested encryption needed.") return payload @@ -337,22 +336,16 @@ def get_cert_for_email( to_addr, cert_path ): return None def sanitize_case_sense( address ): - if conf.config_item_equals('default', 'mail_case_insensitive', 'yes'): address = address.lower() else: - if isinstance(address, str): - sep = '@' - else: - sep = b'@' - splitted_address = address.split(sep) + splitted_address = address.split('@') if len(splitted_address) > 1: address = splitted_address[0] + sep + splitted_address[1].lower() return address def generate_message_from_payloads( payloads, message = None ): - if message == None: message = email.mime.multipart.MIMEMultipart(payloads.get_content_subtype()) @@ -365,7 +358,6 @@ def generate_message_from_payloads( payloads, message = None ): return message def get_first_payload( payloads ): - if payloads.is_multipart(): return get_first_payload(payloads.get_payload(0)) else: @@ -376,7 +368,7 @@ def send_msg( message, recipients ): recipients = [_f for _f in recipients if _f] if recipients: - LOG.info("Sending email to: <%s>" % '> <'.join( recipients )) + LOG.info(f"Sending email to: {recipients!r}") relay = (conf.get_item('relay', 'host'), int(conf.get_item('relay', 'port'))) smtp = smtplib.SMTP(relay[0], relay[1]) if conf.config_item_equals('relay', 'starttls', 'yes'): @@ -388,9 +380,7 @@ def send_msg( message, recipients ): def sort_recipients( raw_message, from_addr, to_addrs ): global LOG - recipients_left = list() - for recipient in to_addrs: - recipients_left.append(sanitize_case_sense(recipient)) + recipients_left = [sanitize_case_sense(recipient) for recipient in to_addrs] # There is no need for nested encryption first_payload = get_first_payload(raw_message) @@ -400,7 +390,7 @@ def sort_recipients( raw_message, from_addr, to_addrs ): return first_payload = first_payload.get_payload(decode=True) - if b"-----BEGIN PGP MESSAGE-----" in first_payload and b"-----END PGP MESSAGE-----" in first_payload: + if text.is_pgp_inline(first_payload): LOG.debug("Message is already encrypted as PGP/INLINE. Encryption aborted.") send_msg(raw_message.as_string(), recipients_left) return @@ -431,8 +421,8 @@ LOG = logging.getLogger(__name__) missing_params = conf.validate_config() if missing_params: - LOG.error(f"Aborting delivery! Following mandatory config parameters are missing: {missing_params!r}") - sys.exit(EX_CONFIG) + LOG.error(f"Aborting delivery! Following mandatory config parameters are missing: {missing_params!r}") + sys.exit(EX_CONFIG) # Read e-mail from stdin raw = sys.stdin.read() diff --git a/lacre/text.py b/lacre/text.py index 41f2166..ad85daf 100644 --- a/lacre/text.py +++ b/lacre/text.py @@ -1,5 +1,8 @@ import sys +PGP_INLINE_BEGIN = b"-----BEGIN PGP MESSAGE-----" +PGP_INLINE_END = b"-----END PGP MESSAGE-----" + def parse_content_type(content_type): split_at = content_type.find(';') if split_at < 0: @@ -9,3 +12,7 @@ def parse_content_type(content_type): return (content_type[0 : split_at], second_part[second_part.index('=') + 1 : ].strip()) else: return (content_type[0 : split_at], sys.getdefaultencoding()) + +def is_pgp_inline(payload): + """Finds out if the payload (bytes) contains PGP/INLINE markers.""" + return PGP_INLINE_BEGIN in payload and PGP_INLINE_END in payload From 55b58d25bc78d714695d569032e53b0ea6e03293 Mon Sep 17 00:00:00 2001 From: "Piotr F. Mieszkowski" Date: Wed, 1 Jun 2022 23:23:51 +0200 Subject: [PATCH 6/7] Use literal separator '@' in sanitize_case_sense --- gpg-mailgate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gpg-mailgate.py b/gpg-mailgate.py index 13e2a38..879a27a 100755 --- a/gpg-mailgate.py +++ b/gpg-mailgate.py @@ -341,7 +341,7 @@ def sanitize_case_sense( address ): else: splitted_address = address.split('@') if len(splitted_address) > 1: - address = splitted_address[0] + sep + splitted_address[1].lower() + address = splitted_address[0] + '@' + splitted_address[1].lower() return address From 46be24670cafe8f8137bfc0ec4e6eebe6b1a137f Mon Sep 17 00:00:00 2001 From: "Piotr F. Mieszkowski" Date: Wed, 1 Jun 2022 23:44:41 +0200 Subject: [PATCH 7/7] Fix charset resolution in Content-Type parser --- lacre/text.py | 18 ++++++++++++------ test/test_lacre_text.py | 9 ++++++++- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/lacre/text.py b/lacre/text.py index ad85daf..4459670 100644 --- a/lacre/text.py +++ b/lacre/text.py @@ -4,14 +4,20 @@ PGP_INLINE_BEGIN = b"-----BEGIN PGP MESSAGE-----" PGP_INLINE_END = b"-----END PGP MESSAGE-----" def parse_content_type(content_type): - split_at = content_type.find(';') - if split_at < 0: + parts = [p.strip() for p in content_type.split(';')] + if len(parts) == 1: + # No additional attributes provided. Use default encoding. return (content_type, sys.getdefaultencoding()) - second_part = content_type[split_at+1 : ].strip() - if second_part.startswith('charset'): - return (content_type[0 : split_at], second_part[second_part.index('=') + 1 : ].strip()) + + # At least one attribute provided. Find out if any of them is named + # 'charset' and if so, use it. + ctype = parts[0] + encoding = [p for p in parts[1:] if p.startswith('charset=') ] + if encoding: + eq_idx = encoding[0].index('=') + return (ctype, encoding[0][eq_idx+1:]) else: - return (content_type[0 : split_at], sys.getdefaultencoding()) + return (ctype, sys.getdefaultencoding()) def is_pgp_inline(payload): """Finds out if the payload (bytes) contains PGP/INLINE markers.""" diff --git a/test/test_lacre_text.py b/test/test_lacre_text.py index 5ada013..943ad28 100644 --- a/test/test_lacre_text.py +++ b/test/test_lacre_text.py @@ -4,15 +4,22 @@ import sys import unittest class LacreTextTest(unittest.TestCase): - def test_parse_content_type(self): + def test_parse_content_type_without_charset(self): (mtype, mcharset) = lacre.text.parse_content_type('text/plain') self.assertEqual(mtype, 'text/plain') self.assertEqual(mcharset, sys.getdefaultencoding()) + def test_parse_content_type_with_charset(self): (mtype, mcharset) = lacre.text.parse_content_type('text/plain; charset="UTF-8"') self.assertEqual(mtype, 'text/plain') self.assertEqual(mcharset, '"UTF-8"') + def test_parse_content_type_with_other_attributes(self): (mtype, mcharset) = lacre.text.parse_content_type('text/plain; some-param="Some Value"') self.assertEqual(mtype, 'text/plain') self.assertEqual(mcharset, sys.getdefaultencoding()) + + def test_parse_content_type_with_several_attributes(self): + (mtype, mcharset) = lacre.text.parse_content_type('text/plain; charset="UTF-8"; some-param="Some Value"') + self.assertEqual(mtype, 'text/plain') + self.assertEqual(mcharset, '"UTF-8"')