Merge pull request 'Handle text data carefully' (#82) from 75-unify-types into master
Reviewed-on: #82
This commit is contained in:
commit
9820e42457
12 changed files with 222 additions and 53 deletions
|
@ -35,20 +35,6 @@ def build_command(key_home, *args, **kwargs):
|
|||
cmd = ["gpg", '--homedir', key_home] + list(args)
|
||||
return cmd
|
||||
|
||||
def private_keys( keyhome ):
|
||||
cmd = build_command(keyhome, '--list-secret-keys', '--with-colons')
|
||||
p = subprocess.Popen( cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
|
||||
p.wait()
|
||||
keys = dict()
|
||||
for line in p.stdout.readlines():
|
||||
if line[0:3] == 'uid' or line[0:3] == 'sec':
|
||||
if ('<' not in line or '>' not in line):
|
||||
continue
|
||||
email = line.split('<')[1].split('>')[0]
|
||||
fingerprint = line.split(':')[4]
|
||||
keys[fingerprint] = email
|
||||
return keys
|
||||
|
||||
def public_keys( keyhome ):
|
||||
cmd = build_command(keyhome, '--list-keys', '--with-colons')
|
||||
p = subprocess.Popen( cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
|
||||
|
@ -58,7 +44,7 @@ def public_keys( keyhome ):
|
|||
fingerprint = None
|
||||
email = None
|
||||
for line in p.stdout.readlines():
|
||||
line = line.decode('utf-8')
|
||||
line = line.decode(sys.getdefaultencoding())
|
||||
if line[0:3] == LINE_FINGERPRINT:
|
||||
fingerprint = line.split(':')[POS_FINGERPRINT]
|
||||
if line[0:3] == LINE_USER_ID:
|
||||
|
|
|
@ -41,8 +41,12 @@ from email.mime.message import MIMEMessage
|
|||
|
||||
import logging
|
||||
import lacre
|
||||
import lacre.text as text
|
||||
import lacre.config as conf
|
||||
|
||||
# Exit code taken from <sysexits.h>:
|
||||
EX_CONFIG = 78
|
||||
|
||||
def gpg_encrypt( raw_message, recipients ):
|
||||
global LOG
|
||||
|
||||
|
@ -113,7 +117,7 @@ def gpg_encrypt( raw_message, recipients ):
|
|||
else:
|
||||
# Log message only if an unknown style is defined
|
||||
if conf.config_item_set('pgp_style', rcpt[0]):
|
||||
LOG.info("Style %s for recipient %s is not known. Use default as fallback." % (conf.get_item("pgp_style", rcpt[0]), rcpt[0]))
|
||||
LOG.debug("Style %s for recipient %s is not known. Use default as fallback." % (conf.get_item("pgp_style", rcpt[0]), rcpt[0]))
|
||||
|
||||
# If no style is in settings defined for recipient, use default from settings
|
||||
if conf.config_item_equals('default', 'mime_conversion', 'yes'):
|
||||
|
@ -175,19 +179,18 @@ def encrypt_all_payloads_inline( message, gpg_to_cmdline ):
|
|||
return encrypted_payloads
|
||||
|
||||
def encrypt_all_payloads_mime( message, gpg_to_cmdline ):
|
||||
|
||||
# Convert a plain text email into PGP/MIME attachment style. Modeled after enigmail.
|
||||
submsg1 = email.message.Message()
|
||||
submsg1.set_payload("Version: 1\n")
|
||||
submsg1.set_type("application/pgp-encrypted")
|
||||
submsg1.set_param('PGP/MIME version identification', "", 'Content-Description' )
|
||||
pgp_ver_part = email.message.Message()
|
||||
pgp_ver_part.set_payload("Version: 1\n")
|
||||
pgp_ver_part.set_type("application/pgp-encrypted")
|
||||
pgp_ver_part.set_param('PGP/MIME version identification', "", 'Content-Description' )
|
||||
|
||||
submsg2 = email.message.Message()
|
||||
submsg2.set_type("application/octet-stream")
|
||||
submsg2.set_param('name', "encrypted.asc")
|
||||
submsg2.set_param('OpenPGP encrypted message', "", 'Content-Description' )
|
||||
submsg2.set_param('inline', "", 'Content-Disposition' )
|
||||
submsg2.set_param('filename', "encrypted.asc", 'Content-Disposition' )
|
||||
encrypted_part = email.message.Message()
|
||||
encrypted_part.set_type("application/octet-stream")
|
||||
encrypted_part.set_param('name', "encrypted.asc")
|
||||
encrypted_part.set_param('OpenPGP encrypted message', "", 'Content-Description' )
|
||||
encrypted_part.set_param('inline', "", 'Content-Disposition' )
|
||||
encrypted_part.set_param('filename', "encrypted.asc", 'Content-Disposition' )
|
||||
|
||||
if isinstance(message.get_payload(), str):
|
||||
# WTF! It seems to swallow the first line. Not sure why. Perhaps
|
||||
|
@ -195,13 +198,16 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ):
|
|||
# Workaround it here by prepending a blank line.
|
||||
# This happens only on text only messages.
|
||||
additionalSubHeader=""
|
||||
encoding = sys.getdefaultencoding()
|
||||
if 'Content-Type' in message and not message['Content-Type'].startswith('multipart'):
|
||||
additionalSubHeader="Content-Type: "+message['Content-Type']+"\n"
|
||||
submsg2.set_payload(additionalSubHeader+"\n" +message.get_payload(decode=True))
|
||||
(base, encoding) = text.parse_content_type(message['Content-Type'])
|
||||
LOG.debug(f"Identified encoding as {encoding}")
|
||||
encrypted_part.set_payload(additionalSubHeader+"\n" +message.get_payload(decode=True).decode(encoding))
|
||||
check_nested = True
|
||||
else:
|
||||
processed_payloads = generate_message_from_payloads(message)
|
||||
submsg2.set_payload(processed_payloads.as_string())
|
||||
encrypted_part.set_payload(processed_payloads.as_string())
|
||||
check_nested = False
|
||||
|
||||
message.preamble = "This is an OpenPGP/MIME encrypted message (RFC 2440 and 3156)"
|
||||
|
@ -217,13 +223,13 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ):
|
|||
else:
|
||||
message['Content-Type'] = "multipart/encrypted; protocol=\"application/pgp-encrypted\";\nboundary=\"%s\"\n" % boundary
|
||||
|
||||
return [ submsg1, encrypt_payload(submsg2, gpg_to_cmdline, check_nested) ]
|
||||
return [ pgp_ver_part, encrypt_payload(encrypted_part, gpg_to_cmdline, check_nested) ]
|
||||
|
||||
def encrypt_payload( payload, gpg_to_cmdline, check_nested = True ):
|
||||
global LOG
|
||||
|
||||
raw_payload = payload.get_payload(decode=True)
|
||||
if check_nested and b"-----BEGIN PGP MESSAGE-----" in raw_payload and b"-----END PGP MESSAGE-----" in raw_payload:
|
||||
if check_nested and text.is_pgp_inline(raw_payload):
|
||||
LOG.debug("Message is already pgp encrypted. No nested encryption needed.")
|
||||
return payload
|
||||
|
||||
|
@ -297,11 +303,11 @@ def smime_encrypt( raw_message, recipients ):
|
|||
|
||||
s.write(out, p7)
|
||||
|
||||
LOG.debug("Sending message from " + from_addr + " to " + str(smime_to))
|
||||
LOG.debug(f"Sending message from {from_addr} to {smime_to}")
|
||||
|
||||
send_msg(out.read(), smime_to)
|
||||
if unsmime_to:
|
||||
LOG.debug("Unable to find valid S/MIME certificates for " + str(unsmime_to))
|
||||
LOG.debug(f"Unable to find valid S/MIME certificates for {unsmime_to}")
|
||||
|
||||
return unsmime_to
|
||||
|
||||
|
@ -330,22 +336,16 @@ def get_cert_for_email( to_addr, cert_path ):
|
|||
return None
|
||||
|
||||
def sanitize_case_sense( address ):
|
||||
|
||||
if conf.config_item_equals('default', 'mail_case_insensitive', 'yes'):
|
||||
address = address.lower()
|
||||
else:
|
||||
if isinstance(address, str):
|
||||
sep = '@'
|
||||
else:
|
||||
sep = b'@'
|
||||
splitted_address = address.split(sep)
|
||||
splitted_address = address.split('@')
|
||||
if len(splitted_address) > 1:
|
||||
address = splitted_address[0] + sep + splitted_address[1].lower()
|
||||
address = splitted_address[0] + '@' + splitted_address[1].lower()
|
||||
|
||||
return address
|
||||
|
||||
def generate_message_from_payloads( payloads, message = None ):
|
||||
|
||||
if message == None:
|
||||
message = email.mime.multipart.MIMEMultipart(payloads.get_content_subtype())
|
||||
|
||||
|
@ -358,7 +358,6 @@ def generate_message_from_payloads( payloads, message = None ):
|
|||
return message
|
||||
|
||||
def get_first_payload( payloads ):
|
||||
|
||||
if payloads.is_multipart():
|
||||
return get_first_payload(payloads.get_payload(0))
|
||||
else:
|
||||
|
@ -369,10 +368,7 @@ def send_msg( message, recipients ):
|
|||
|
||||
recipients = [_f for _f in recipients if _f]
|
||||
if recipients:
|
||||
if not (conf.config_item_set('relay', 'host') and conf.config_item_set('relay', 'port')):
|
||||
LOG.warning("Missing settings for relay. Sending email aborted.")
|
||||
return None
|
||||
LOG.info("Sending email to: <%s>" % '> <'.join( recipients ))
|
||||
LOG.info(f"Sending email to: {recipients!r}")
|
||||
relay = (conf.get_item('relay', 'host'), int(conf.get_item('relay', 'port')))
|
||||
smtp = smtplib.SMTP(relay[0], relay[1])
|
||||
if conf.config_item_equals('relay', 'starttls', 'yes'):
|
||||
|
@ -384,9 +380,7 @@ def send_msg( message, recipients ):
|
|||
def sort_recipients( raw_message, from_addr, to_addrs ):
|
||||
global LOG
|
||||
|
||||
recipients_left = list()
|
||||
for recipient in to_addrs:
|
||||
recipients_left.append(sanitize_case_sense(recipient))
|
||||
recipients_left = [sanitize_case_sense(recipient) for recipient in to_addrs]
|
||||
|
||||
# There is no need for nested encryption
|
||||
first_payload = get_first_payload(raw_message)
|
||||
|
@ -396,7 +390,7 @@ def sort_recipients( raw_message, from_addr, to_addrs ):
|
|||
return
|
||||
|
||||
first_payload = first_payload.get_payload(decode=True)
|
||||
if b"-----BEGIN PGP MESSAGE-----" in first_payload and b"-----END PGP MESSAGE-----" in first_payload:
|
||||
if text.is_pgp_inline(first_payload):
|
||||
LOG.debug("Message is already encrypted as PGP/INLINE. Encryption aborted.")
|
||||
send_msg(raw_message.as_string(), recipients_left)
|
||||
return
|
||||
|
@ -423,9 +417,13 @@ def sort_recipients( raw_message, from_addr, to_addrs ):
|
|||
conf.load_config()
|
||||
lacre.init_logging(conf.get_item('logging', 'config'))
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
missing_params = conf.validate_config()
|
||||
if missing_params:
|
||||
LOG.error(f"Aborting delivery! Following mandatory config parameters are missing: {missing_params!r}")
|
||||
sys.exit(EX_CONFIG)
|
||||
|
||||
# Read e-mail from stdin
|
||||
raw = sys.stdin.read()
|
||||
raw_message = email.message_from_string( raw )
|
||||
|
|
|
@ -20,11 +20,17 @@ FAIL_OVER_LOGGING_CONFIG = {
|
|||
'class': 'logging.handlers.SysLogHandler',
|
||||
'level': 'INFO',
|
||||
'formatter': 'sysfmt'
|
||||
},
|
||||
'lacrelog': {
|
||||
'class': 'logging.FileHandler',
|
||||
'level': 'INFO',
|
||||
'formatter': 'sysfmt',
|
||||
'filename': 'lacre.log'
|
||||
}
|
||||
},
|
||||
'root': {
|
||||
'level': 'INFO',
|
||||
'handlers': ['syslog']
|
||||
'handlers': ['syslog', 'lacrelog']
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -13,6 +13,11 @@ import os
|
|||
# testable.
|
||||
CONFIG_PATH_ENV = "GPG_MAILGATE_CONFIG"
|
||||
|
||||
# List of mandatory configuration parameters. Each item on this list should be
|
||||
# a pair: a section name and a parameter name.
|
||||
MANDATORY_CONFIG_ITEMS = [("relay", "host"),
|
||||
("relay", "port")]
|
||||
|
||||
# Global dict to keep configuration parameters. It's hidden behind several
|
||||
# utility functions to make it easy to replace it with ConfigParser object in
|
||||
# the future.
|
||||
|
@ -68,3 +73,15 @@ def config_item_set(section, key) -> bool:
|
|||
def config_item_equals(section, key, value) -> bool:
|
||||
global cfg
|
||||
return section in cfg and key in cfg[section] and cfg[section][key] == value
|
||||
|
||||
def validate_config():
|
||||
"""Checks whether the configuration is complete.
|
||||
|
||||
Returns a list of missing parameters, so an empty list means
|
||||
configuration is complete.
|
||||
"""
|
||||
missing = []
|
||||
for (section, param) in MANDATORY_CONFIG_ITEMS:
|
||||
if not config_item_set(section, param):
|
||||
missing.append((section, param))
|
||||
return missing
|
||||
|
|
24
lacre/text.py
Normal file
24
lacre/text.py
Normal file
|
@ -0,0 +1,24 @@
|
|||
import sys
|
||||
|
||||
PGP_INLINE_BEGIN = b"-----BEGIN PGP MESSAGE-----"
|
||||
PGP_INLINE_END = b"-----END PGP MESSAGE-----"
|
||||
|
||||
def parse_content_type(content_type):
|
||||
parts = [p.strip() for p in content_type.split(';')]
|
||||
if len(parts) == 1:
|
||||
# No additional attributes provided. Use default encoding.
|
||||
return (content_type, sys.getdefaultencoding())
|
||||
|
||||
# At least one attribute provided. Find out if any of them is named
|
||||
# 'charset' and if so, use it.
|
||||
ctype = parts[0]
|
||||
encoding = [p for p in parts[1:] if p.startswith('charset=') ]
|
||||
if encoding:
|
||||
eq_idx = encoding[0].index('=')
|
||||
return (ctype, encoding[0][eq_idx+1:])
|
||||
else:
|
||||
return (ctype, sys.getdefaultencoding())
|
||||
|
||||
def is_pgp_inline(payload):
|
||||
"""Finds out if the payload (bytes) contains PGP/INLINE markers."""
|
||||
return PGP_INLINE_BEGIN in payload and PGP_INLINE_END in payload
|
|
@ -30,7 +30,7 @@ certs: test/certs
|
|||
|
||||
[tests]
|
||||
# Number of "test-*" sections in this file, describing test cases.
|
||||
cases: 6
|
||||
cases: 7
|
||||
e2e_log: test/logs/e2e.log
|
||||
e2e_log_format: %(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s
|
||||
e2e_log_datefmt: %Y-%m-%d %H:%M:%S
|
||||
|
@ -72,3 +72,9 @@ descr: Multipart encrypted message to a user with an Ed25519 key.
|
|||
to: bob@disposlab
|
||||
in: test/msgin/multipart2rsa.msg
|
||||
out: -----BEGIN PGP MESSAGE-----
|
||||
|
||||
[case-7]
|
||||
descr: Clear text message to a user with an RSA key and PGP/MIME enabled in configuration
|
||||
to: evan@disposlab
|
||||
in: test/msgin/clear2rsa2.msg
|
||||
out: -----BEGIN PGP MESSAGE-----
|
||||
|
|
|
@ -51,6 +51,12 @@ def build_config(config):
|
|||
cp.add_section("enc_keymap")
|
||||
cp.set("enc_keymap", "alice@disposlab", "1CD245308F0963D038E88357973CF4D9387C44D7")
|
||||
cp.set("enc_keymap", "bob@disposlab", "19CF4B47ECC9C47AFA84D4BD96F39FDA0E31BB67")
|
||||
cp.set("enc_keymap", "evan@disposlab", "530B1BB2D0CC7971648198BBA4774E507D3AF5BC")
|
||||
|
||||
cp.add_section("pgp_style")
|
||||
# Default style is PGP/Inline, so to cover more branches, one test identity
|
||||
# uses PGP/MIME.
|
||||
cp.set("pgp_style", "evan@disposlab", "mime")
|
||||
|
||||
logging.debug(f"Created config with keyhome={config['gpg_keyhome']}, cert_path={config['smime_certpath']} and relay at port {config['port']}")
|
||||
return cp
|
||||
|
|
Binary file not shown.
6
test/msgin/clear2rsa2.msg
Normal file
6
test/msgin/clear2rsa2.msg
Normal file
|
@ -0,0 +1,6 @@
|
|||
From: Dave <dave@disposlab
|
||||
To: Evan <evan@disposlab>
|
||||
Subject: Test
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
|
||||
Body of the message.
|
3
test/sample.ini
Normal file
3
test/sample.ini
Normal file
|
@ -0,0 +1,3 @@
|
|||
[foo]
|
||||
bar: quux
|
||||
baz: 14
|
92
test/test_contracts.py
Normal file
92
test/test_contracts.py
Normal file
|
@ -0,0 +1,92 @@
|
|||
#
|
||||
# gpg-mailgate
|
||||
#
|
||||
# This file is part of the gpg-mailgate source code.
|
||||
#
|
||||
# gpg-mailgate is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# gpg-mailgate source code is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
"""Unit-tests as contracts for external dependencies.
|
||||
|
||||
Unit tests defined here are our contracts for the dependencies used by Lacre.
|
||||
Since not all software is documented thoroughly, they are also a form of
|
||||
documentation.
|
||||
"""
|
||||
|
||||
import email
|
||||
import unittest
|
||||
from configparser import RawConfigParser
|
||||
|
||||
class EmailParsingTest(unittest.TestCase):
|
||||
"""This test serves as a package contract and documentation of its behaviour."""
|
||||
|
||||
def test_message_from_bytes_produces_message_with_str_headers(self):
|
||||
rawmsg = b"From: alice@lacre.io\r\n" \
|
||||
+ b"To: bob@lacre.io\r\n" \
|
||||
+ b"Subject: Test message\r\n" \
|
||||
+ b"\r\n" \
|
||||
+ b"Test message from Alice to Bob.\r\n"
|
||||
|
||||
parsed = email.message_from_bytes(rawmsg)
|
||||
|
||||
self.assertEqual(parsed["From"], "alice@lacre.io")
|
||||
self.assertEqual(parsed["To"], "bob@lacre.io")
|
||||
self.assertEqual(parsed["Subject"], "Test message")
|
||||
|
||||
def test_bytes_message_payload_decoded_produces_bytes(self):
|
||||
rawmsg = b"From: alice@lacre.io\r\n" \
|
||||
+ b"To: bob@lacre.io\r\n" \
|
||||
+ b"Subject: Test message\r\n" \
|
||||
+ b"\r\n" \
|
||||
+ b"Test message from Alice to Bob.\r\n"
|
||||
|
||||
parsed = email.message_from_bytes(rawmsg)
|
||||
|
||||
self.assertEqual(parsed.get_payload(), "Test message from Alice to Bob.\r\n")
|
||||
self.assertEqual(parsed.get_payload(decode=True), b"Test message from Alice to Bob.\r\n")
|
||||
|
||||
def test_message_from_string_produces_message_with_str_headers(self):
|
||||
rawmsg = "From: alice@lacre.io\r\n" \
|
||||
+ "To: bob@lacre.io\r\n" \
|
||||
+ "Subject: Test message\r\n" \
|
||||
+ "\r\n" \
|
||||
+ "Test message from Alice to Bob.\r\n"
|
||||
|
||||
parsed = email.message_from_string(rawmsg)
|
||||
|
||||
self.assertEqual(parsed["From"], "alice@lacre.io")
|
||||
self.assertEqual(parsed["To"], "bob@lacre.io")
|
||||
self.assertEqual(parsed["Subject"], "Test message")
|
||||
|
||||
def test_str_message_payload_decoded_produces_bytes(self):
|
||||
rawmsg = "From: alice@lacre.io\r\n" \
|
||||
+ "To: bob@lacre.io\r\n" \
|
||||
+ "Subject: Test message\r\n" \
|
||||
+ "\r\n" \
|
||||
+ "Test message from Alice to Bob.\r\n"
|
||||
|
||||
parsed = email.message_from_string(rawmsg)
|
||||
|
||||
self.assertEqual(parsed.get_payload(), "Test message from Alice to Bob.\r\n")
|
||||
self.assertEqual(parsed.get_payload(decode=True), b"Test message from Alice to Bob.\r\n")
|
||||
|
||||
class RawConfigParserTest(unittest.TestCase):
|
||||
def test_config_parser_returns_str(self):
|
||||
cp = RawConfigParser()
|
||||
cp.read("test/sample.ini")
|
||||
self.assertEqual(cp.get("foo", "bar"), "quux")
|
||||
self.assertEqual(cp.get("foo", "baz"), "14")
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
25
test/test_lacre_text.py
Normal file
25
test/test_lacre_text.py
Normal file
|
@ -0,0 +1,25 @@
|
|||
import lacre.text
|
||||
import sys
|
||||
|
||||
import unittest
|
||||
|
||||
class LacreTextTest(unittest.TestCase):
|
||||
def test_parse_content_type_without_charset(self):
|
||||
(mtype, mcharset) = lacre.text.parse_content_type('text/plain')
|
||||
self.assertEqual(mtype, 'text/plain')
|
||||
self.assertEqual(mcharset, sys.getdefaultencoding())
|
||||
|
||||
def test_parse_content_type_with_charset(self):
|
||||
(mtype, mcharset) = lacre.text.parse_content_type('text/plain; charset="UTF-8"')
|
||||
self.assertEqual(mtype, 'text/plain')
|
||||
self.assertEqual(mcharset, '"UTF-8"')
|
||||
|
||||
def test_parse_content_type_with_other_attributes(self):
|
||||
(mtype, mcharset) = lacre.text.parse_content_type('text/plain; some-param="Some Value"')
|
||||
self.assertEqual(mtype, 'text/plain')
|
||||
self.assertEqual(mcharset, sys.getdefaultencoding())
|
||||
|
||||
def test_parse_content_type_with_several_attributes(self):
|
||||
(mtype, mcharset) = lacre.text.parse_content_type('text/plain; charset="UTF-8"; some-param="Some Value"')
|
||||
self.assertEqual(mtype, 'text/plain')
|
||||
self.assertEqual(mcharset, '"UTF-8"')
|
Loading…
Reference in a new issue