Check mandatory config early, add tests

Also: extend failover logging configuration with file-based handler to make
sure that the user gets _some_ logs even if they do not configure Lacre at
all.
This commit is contained in:
Piotr F. Mieszkowski 2022-05-31 22:09:10 +02:00
parent 3bcc1151e5
commit 4c6fdc52ec
5 changed files with 63 additions and 14 deletions

View File

@ -41,8 +41,12 @@ from email.mime.message import MIMEMessage
import logging
import lacre
import lacre.text as text
import lacre.config as conf
# Exit code taken from <sysexits.h>:
EX_CONFIG = 78
def gpg_encrypt( raw_message, recipients ):
global LOG
@ -198,7 +202,7 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ):
encoding = sys.getdefaultencoding()
if 'Content-Type' in message and not message['Content-Type'].startswith('multipart'):
additionalSubHeader="Content-Type: "+message['Content-Type']+"\n"
(base, encoding) = parse_content_type(message['Content-Type'])
(base, encoding) = text.parse_content_type(message['Content-Type'])
LOG.debug(f"Identified encoding as {encoding}")
encrypted_part.set_payload(additionalSubHeader+"\n" +message.get_payload(decode=True).decode(encoding))
check_nested = True
@ -222,14 +226,6 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ):
return [ pgp_ver_part, encrypt_payload(encrypted_part, gpg_to_cmdline, check_nested) ]
def parse_content_type(content_type):
split_at = content_type.index(';')
second_part = content_type[split_at+1 : ].strip()
if second_part.startswith('charset'):
return (content_type[0 : split_at], second_part[second_part.index('=') + 1 : ].strip())
else:
return (content_type[0 : split_at], sys.getdefaultencoding())
def encrypt_payload( payload, gpg_to_cmdline, check_nested = True ):
global LOG
@ -380,9 +376,6 @@ def send_msg( message, recipients ):
recipients = [_f for _f in recipients if _f]
if recipients:
if not (conf.config_item_set('relay', 'host') and conf.config_item_set('relay', 'port')):
LOG.warning("Missing settings for relay. Sending email aborted.")
return None
LOG.info("Sending email to: <%s>" % '> <'.join( recipients ))
relay = (conf.get_item('relay', 'host'), int(conf.get_item('relay', 'port')))
smtp = smtplib.SMTP(relay[0], relay[1])
@ -434,9 +427,13 @@ def sort_recipients( raw_message, from_addr, to_addrs ):
conf.load_config()
lacre.init_logging(conf.get_item('logging', 'config'))
LOG = logging.getLogger(__name__)
missing_params = conf.validate_config()
if missing_params:
LOG.error(f"Aborting delivery! Following mandatory config parameters are missing: {missing_params!r}")
sys.exit(EX_CONFIG)
# Read e-mail from stdin
raw = sys.stdin.read()
raw_message = email.message_from_string( raw )

View File

@ -20,11 +20,17 @@ FAIL_OVER_LOGGING_CONFIG = {
'class': 'logging.handlers.SysLogHandler',
'level': 'INFO',
'formatter': 'sysfmt'
},
'lacrelog': {
'class': 'logging.FileHandler',
'level': 'INFO',
'formatter': 'sysfmt',
'filename': 'lacre.log'
}
},
'root': {
'level': 'INFO',
'handlers': ['syslog']
'handlers': ['syslog', 'lacrelog']
}
}

View File

@ -13,6 +13,11 @@ import os
# testable.
CONFIG_PATH_ENV = "GPG_MAILGATE_CONFIG"
# List of mandatory configuration parameters. Each item on this list should be
# a pair: a section name and a parameter name.
MANDATORY_CONFIG_ITEMS = [("relay", "host"),
("relay", "port")]
# Global dict to keep configuration parameters. It's hidden behind several
# utility functions to make it easy to replace it with ConfigParser object in
# the future.
@ -68,3 +73,15 @@ def config_item_set(section, key) -> bool:
def config_item_equals(section, key, value) -> bool:
global cfg
return section in cfg and key in cfg[section] and cfg[section][key] == value
def validate_config():
"""Checks whether the configuration is complete.
Returns a list of missing parameters, so an empty list means
configuration is complete.
"""
missing = []
for (section, param) in MANDATORY_CONFIG_ITEMS:
if not config_item_set(section, param):
missing.append((section, param))
return missing

11
lacre/text.py Normal file
View File

@ -0,0 +1,11 @@
import sys
def parse_content_type(content_type):
split_at = content_type.find(';')
if split_at < 0:
return (content_type, sys.getdefaultencoding())
second_part = content_type[split_at+1 : ].strip()
if second_part.startswith('charset'):
return (content_type[0 : split_at], second_part[second_part.index('=') + 1 : ].strip())
else:
return (content_type[0 : split_at], sys.getdefaultencoding())

18
test/test_lacre_text.py Normal file
View File

@ -0,0 +1,18 @@
import lacre.text
import sys
import unittest
class LacreTextTest(unittest.TestCase):
def test_parse_content_type(self):
(mtype, mcharset) = lacre.text.parse_content_type('text/plain')
self.assertEqual(mtype, 'text/plain')
self.assertEqual(mcharset, sys.getdefaultencoding())
(mtype, mcharset) = lacre.text.parse_content_type('text/plain; charset="UTF-8"')
self.assertEqual(mtype, 'text/plain')
self.assertEqual(mcharset, '"UTF-8"')
(mtype, mcharset) = lacre.text.parse_content_type('text/plain; some-param="Some Value"')
self.assertEqual(mtype, 'text/plain')
self.assertEqual(mcharset, sys.getdefaultencoding())