Extract config, separate logging, split into smaller functions

- Move configuration-processing code to a separate module (lacre.config) and
  provide a simple API to access configuration parameters.
- Prepare to use builtin logging module to log diagnostic data.
- Rework the configuration-processing file to make it cleaner.
- Log additional information while processing configuration.
- Reorder functions.
This commit is contained in:
Piotr F. Mieszkowski 2022-02-25 23:38:08 +01:00
parent 968677f1ec
commit e2cb188bb0
4 changed files with 162 additions and 74 deletions

View File

@ -39,30 +39,20 @@ import traceback
from M2Crypto import BIO, Rand, SMIME, X509
from email.mime.message import MIMEMessage
# Environment variable name we read to retrieve configuration path. This is to
# enable non-root users to set up and run GPG Mailgate and to make the software
# testable.
CONFIG_PATH_ENV = "GPG_MAILGATE_CONFIG"
# Read configuration from /etc/gpg-mailgate.conf
_cfg = RawConfigParser()
_cfg.read(os.getenv(CONFIG_PATH_ENV, '/etc/gpg-mailgate.conf'))
cfg = dict()
for sect in _cfg.sections():
cfg[sect] = dict()
for (name, value) in _cfg.items(sect):
cfg[sect][name] = value
import logging
import lacre
import lacre.config as conf
def log( msg ):
if 'logging' in cfg and 'file' in cfg['logging']:
if cfg['logging'].get('file') == "syslog":
if conf.config_item_set("logging", "file"):
if conf.config_item_equals('logging', 'file', "syslog"):
syslog.syslog(syslog.LOG_INFO | syslog.LOG_MAIL, msg)
else:
logfile = open(cfg['logging']['file'], 'a')
logfile = open(conf.get_item('logging', 'file'), 'a')
logfile.write(msg + "\n")
logfile.close()
verbose = 'logging' in cfg and 'verbose' in cfg['logging'] and cfg['logging'].get('verbose') == 'yes'
verbose = conf.verbose_logging_enabled()
# Read e-mail from stdin
raw = sys.stdin.read()
@ -79,14 +69,14 @@ def gpg_decrypt( raw_message, recipients ):
# private key is present but not in keymap.
noenc_to = list()
if not get_bool_from_cfg('gpg', 'keyhome'):
if not conf.config_item_set('gpg', 'keyhome'):
log("No valid entry for gpg keyhome. Decryption aborted.")
return recipients
keys = GnuPG.private_keys( cfg['gpg']['keyhome'] )
keys = GnuPG.private_keys( conf.get_item('gpg', 'keyhome') )
if get_bool_from_cfg('default', 'dec_regex'):
dec_regex = cfg['default']['dec_regex']
if conf.config_item_set('default', 'dec_regex'):
dec_regex = conf.get_item('default', 'dec_regex')
else:
dec_regex = None
@ -94,19 +84,19 @@ def gpg_decrypt( raw_message, recipients ):
keys[fingerprint] = sanitize_case_sense(keys[fingerprint])
for to in recipients:
if to in keys.values() and not get_bool_from_cfg('default', 'dec_keymap_only', 'yes'):
if to in keys.values() and not conf.config_item_equals('default', 'dec_keymap_only', 'yes'):
gpg_to.append(to)
# Is this recipient defined in regex for default decryption?
elif not (dec_regex is None) and not (re.match(dec_regex, to) is None):
log("Using default decrytion defined in dec_regex for recipient '%s'" % to)
gpg_to.append(to)
elif get_bool_from_cfg('dec_keymap', to):
log("Decrypt keymap has key '%s'" % cfg['dec_keymap'][to] )
elif conf.config_item_set('dec_keymap', to):
log("Decrypt keymap has key '%s'" % conf.get_item('dec_keymap', to) )
# Check we've got a matching key! If not, decline to attempt decryption. The key is checked for safty reasons.
if not cfg['dec_keymap'][to] in keys:
log("Key '%s' in decryption keymap not found in keyring for email address '%s'. Won't decrypt." % (cfg['dec_keymap'][to], to))
if not conf.get_item('dec_keymap', to) in keys:
log("Key '%s' in decryption keymap not found in keyring for email address '%s'. Won't decrypt." % (conf.get_item('dec_keymap', to), to))
# Avoid unwanted encryption if set
if to in keys.values() and get_bool_from_cfg('default', 'failsave_dec', 'yes'):
if to in keys.values() and conf.config_item_equals('default', 'failsave_dec', 'yes'):
noenc_to.append(to)
else:
ungpg_to.append(to)
@ -116,7 +106,7 @@ def gpg_decrypt( raw_message, recipients ):
if verbose:
log("Recipient (%s) not in PGP domain list for decrypting." % to)
# Avoid unwanted encryption if set
if to in keys.values() and get_bool_from_cfg('default', 'failsave_dec', 'yes'):
if to in keys.values() and conf.config_item_equals('default', 'failsave_dec', 'yes'):
noenc_to.append(to)
else:
ungpg_to.append(to)
@ -141,7 +131,7 @@ def gpg_decrypt_all_payloads( message ):
# At this point the message could only be PGP/INLINE encrypted, unencrypted or
# encrypted with a mechanism not covered by GPG-Mailgate
elif get_bool_from_cfg('default', 'no_inline_dec', 'no'):
elif conf.config_item_equals('default', 'no_inline_dec', 'no'):
# Check if message is PGP/INLINE encrypted and has attachments (or unencrypted with attachments)
if message.is_multipart():
@ -152,7 +142,7 @@ def gpg_decrypt_all_payloads( message ):
decrypted_message, success = decrypt_inline_with_attachments(message, False, decrypted_message)
# Add header here to avoid it being appended several times
if get_bool_from_cfg('default', 'add_header', 'yes') and success:
if conf.config_item_equals('default', 'add_header', 'yes') and success:
decrypted_message['X-GPG-Mailgate'] = 'Decrypted by GPG Mailgate'
# Check if message is PGP/INLINE encrypted without attachments (or unencrypted without attachments)
@ -194,7 +184,7 @@ def decrypt_mime( decrypted_message ):
decrypted_message.preamble = None
if get_bool_from_cfg('default', 'add_header', 'yes'):
if conf.config_item_equals('default', 'add_header', 'yes'):
decrypted_message['X-GPG-Mailgate'] = 'Decrypted by GPG Mailgate'
# If decryption fails, decrypted_message is equal to the original message
@ -269,7 +259,7 @@ def decrypt_inline_without_attachments( decrypted_message ):
# Need this nasty hack to avoid double blank lines at beginning of message
decrypted_message.set_payload(decrypted_payload.as_string()[1:])
if get_bool_from_cfg('default', 'add_header', 'yes'):
if conf.config_item_equals('default', 'add_header', 'yes'):
decrypted_message['X-GPG-Mailgate'] = 'Decrypted by GPG Mailgate'
# If message was not encrypted, this will just return the original message
@ -277,7 +267,7 @@ def decrypt_inline_without_attachments( decrypted_message ):
def decrypt_payload( payload ):
gpg = GnuPG.GPGDecryptor( cfg['gpg']['keyhome'] )
gpg = GnuPG.GPGDecryptor( conf.get_item('gpg', 'keyhome') )
gpg.update( payload )
decrypted_data, returncode = gpg.decrypt()
if verbose:
@ -293,11 +283,11 @@ def decrypt_payload( payload ):
def gpg_encrypt( raw_message, recipients ):
if not get_bool_from_cfg('gpg', 'keyhome'):
if not conf.config_item_set('gpg', 'keyhome'):
log("No valid entry for gpg keyhome. Encryption aborted.")
return recipients
keys = GnuPG.public_keys( cfg['gpg']['keyhome'] )
keys = GnuPG.public_keys( conf.get_item('gpg', 'keyhome') )
for fingerprint in keys:
keys[fingerprint] = sanitize_case_sense(keys[fingerprint])
@ -307,17 +297,17 @@ def gpg_encrypt( raw_message, recipients ):
for to in recipients:
# Check if recipient is in keymap
if get_bool_from_cfg('enc_keymap', to):
log("Encrypt keymap has key '%s'" % cfg['enc_keymap'][to] )
if conf.config_item_set('enc_keymap', to):
log("Encrypt keymap has key '%s'" % conf.get_item('enc_keymap', to) )
# Check we've got a matching key!
if cfg['enc_keymap'][to] in keys:
gpg_to.append( (to, cfg['enc_keymap'][to]) )
if conf.get_item('enc_keymap', to) in keys:
gpg_to.append( (to, conf.get_item('enc_keymap', to)) )
continue
else:
log("Key '%s' in encrypt keymap not found in keyring for email address '%s'." % (cfg['enc_keymap'][to], to))
log("Key '%s' in encrypt keymap not found in keyring for email address '%s'." % (conf.get_item('enc_keymap', to), to))
# Check if key in keychain is present
if to in keys.values() and not get_bool_from_cfg('default', 'enc_keymap_only', 'yes'):
if to in keys.values() and not conf.config_item_equals('default', 'enc_keymap_only', 'yes'):
gpg_to.append( (to, to) )
continue
@ -325,15 +315,15 @@ def gpg_encrypt( raw_message, recipients ):
splitted_to = to.split('@')
if len(splitted_to) > 1:
domain = splitted_to[1]
if get_bool_from_cfg('enc_domain_keymap', domain):
log("Encrypt domain keymap has key '%s'" % cfg['enc_dec_keymap'][domain] )
if conf.config_item_set('enc_domain_keymap', domain):
log("Encrypt domain keymap has key '%s'" % conf.get_item('enc_dec_keymap', domain) )
# Check we've got a matching key!
if cfg['enc_domain_keymap'][domain] in keys:
if conf.get_item('enc_domain_keymap', domain) in keys:
log("Using default domain key for recipient '%s'" % to)
gpg_to.append( (to, cfg['enc_domain_keymap'][domain]) )
gpg_to.append( (to, conf.get_item('enc_domain_keymap', domain)) )
continue
else:
log("Key '%s' in encrypt domain keymap not found in keyring for email address '%s'." % (cfg['enc_domain_keymap'][domain], to))
log("Key '%s' in encrypt domain keymap not found in keyring for email address '%s'." % (conf.get_item('enc_domain_keymap', domain), to))
# At this point no key has been found
if verbose:
@ -352,19 +342,19 @@ def gpg_encrypt( raw_message, recipients ):
for rcpt in gpg_to:
# Checking pre defined styles in settings first
if get_bool_from_cfg('pgp_style', rcpt[0], 'mime'):
if conf.config_item_equals('pgp_style', rcpt[0], 'mime'):
gpg_to_smtp_mime.append(rcpt[0])
gpg_to_cmdline_mime.extend(rcpt[1].split(','))
elif get_bool_from_cfg('pgp_style', rcpt[0], 'inline'):
elif conf.config_item_equals('pgp_style', rcpt[0], 'inline'):
gpg_to_smtp_inline.append(rcpt[0])
gpg_to_cmdline_inline.extend(rcpt[1].split(','))
else:
# Log message only if an unknown style is defined
if get_bool_from_cfg('pgp_style', rcpt[0]):
log("Style %s for recipient %s is not known. Use default as fallback." % (cfg['pgp_style'][rcpt[0]], rcpt[0]))
if conf.config_item_set('pgp_style', rcpt[0]):
log("Style %s for recipient %s is not known. Use default as fallback." % (conf.get_item("pgp_style", rcpt[0]), rcpt[0]))
# If no style is in settings defined for recipient, use default from settings
if get_bool_from_cfg('default', 'mime_conversion', 'yes'):
if conf.config_item_equals('default', 'mime_conversion', 'yes'):
gpg_to_smtp_mime.append(rcpt[0])
gpg_to_cmdline_mime.extend(rcpt[1].split(','))
else:
@ -375,7 +365,7 @@ def gpg_encrypt( raw_message, recipients ):
# Encrypt mail with PGP/MIME
raw_message_mime = copy.deepcopy(raw_message)
if get_bool_from_cfg('default', 'add_header', 'yes'):
if conf.config_item_equals('default', 'add_header', 'yes'):
raw_message_mime['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
if 'Content-Transfer-Encoding' in raw_message_mime:
@ -392,7 +382,7 @@ def gpg_encrypt( raw_message, recipients ):
# Encrypt mail with PGP/INLINE
raw_message_inline = copy.deepcopy(raw_message)
if get_bool_from_cfg('default', 'add_header', 'yes'):
if conf.config_item_equals('default', 'add_header', 'yes'):
raw_message_inline['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
if 'Content-Transfer-Encoding' in raw_message_inline:
@ -475,8 +465,8 @@ def encrypt_payload( payload, gpg_to_cmdline, check_nested = True ):
log("Message is already pgp encrypted. No nested encryption needed.")
return payload
# No check is needed for cfg['gpg']['keyhome'] as this is already done in method gpg_encrypt
gpg = GnuPG.GPGEncryptor( cfg['gpg']['keyhome'], gpg_to_cmdline, payload.get_content_charset() )
# No check is needed for conf.get_item('gpg', 'keyhome') as this is already done in method gpg_encrypt
gpg = GnuPG.GPGEncryptor( conf.get_item('gpg', 'keyhome'), gpg_to_cmdline, payload.get_content_charset() )
gpg.update( raw_payload )
encrypted_data, returncode = gpg.encrypt()
if verbose:
@ -503,11 +493,11 @@ def encrypt_payload( payload, gpg_to_cmdline, check_nested = True ):
def smime_encrypt( raw_message, recipients ):
if not get_bool_from_cfg('smime', 'cert_path'):
if not conf.config_item_set('smime', 'cert_path'):
log("No valid path for S/MIME certs found in config file. S/MIME encryption aborted.")
return recipients
cert_path = cfg['smime']['cert_path']+"/"
cert_path = conf.get_item('smime', 'cert_path')+"/"
s = SMIME.SMIME()
sk = X509.X509_Stack()
smime_to = list()
@ -541,7 +531,7 @@ def smime_encrypt( raw_message, recipients ):
if raw_message['Subject']:
out.write('Subject: '+ raw_message['Subject'] + '\n')
if get_bool_from_cfg('default', 'add_header', 'yes'):
if conf.config_item_equals('default', 'add_header', 'yes'):
out.write('X-GPG-Mailgate: Encrypted by GPG Mailgate\n')
s.write(out, p7)
@ -564,7 +554,7 @@ def get_cert_for_email( to_addr, cert_path ):
if not os.path.isfile(file_path):
continue
if get_bool_from_cfg('default', 'mail_case_insensitive', 'yes'):
if conf.config_item_equals('default', 'mail_case_insensitive', 'yes'):
if filename.lower() == to_addr:
return (file_path, to_addr)
else:
@ -580,20 +570,9 @@ def get_cert_for_email( to_addr, cert_path ):
return None
def get_bool_from_cfg( section, key = None, evaluation = None ):
if not (key is None) and not (evaluation is None):
return section in cfg and cfg[section].get(key) == evaluation
elif not (key is None) and (evaluation is None):
return section in cfg and not (cfg[section].get(key) is None)
else:
return section in cfg
def sanitize_case_sense( address ):
if get_bool_from_cfg('default', 'mail_case_insensitive', 'yes'):
if conf.config_item_equals('default', 'mail_case_insensitive', 'yes'):
address = address.lower()
else:
if isinstance(address, str):
@ -630,13 +609,13 @@ def send_msg( message, recipients ):
recipients = [_f for _f in recipients if _f]
if recipients:
if not (get_bool_from_cfg('relay', 'host') and get_bool_from_cfg('relay', 'port')):
if not (conf.config_item_set('relay', 'host') and conf.config_item_set('relay', 'port')):
log("Missing settings for relay. Sending email aborted.")
return None
log("Sending email to: <%s>" % '> <'.join( recipients ))
relay = (cfg['relay']['host'], int(cfg['relay']['port']))
relay = (conf.get_item('relay', 'host'), int(conf.get_item('relay', 'port')))
smtp = smtplib.SMTP(relay[0], relay[1])
if 'relay' in cfg and 'starttls' in cfg['relay'] and cfg['relay']['starttls'] == 'yes':
if conf.config_item_equals('relay', 'starttls', 'yes'):
smtp.starttls()
smtp.sendmail( from_addr, recipients, message )
else:
@ -688,5 +667,8 @@ def sort_recipients( raw_message, from_addr, to_addrs ):
send_msg(raw_message.as_string(), recipients_left)
conf.load_config()
lacre.init_logging(conf.get_item('logging', 'file'))
# Let's start
sort_recipients(raw_message, from_addr, to_addrs)

16
lacre/__init__.py Normal file
View File

@ -0,0 +1,16 @@
"""Lacre --- the Postfix mail filter encrypting incoming email
"""
import logging
# Initialise logging infrastructure.
#
# This is going to be removed at some point of time, but is used at the moment
# because it's easy and keeps the rest of the code quite clean.
def init_logging(log_filename):
logging.basicConfig(filename = log_filename,
format = '%(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s',
datefmt = '%Y-%m-%d %H:%M:%S',
level = logging.DEBUG)
logging.info("Starting logs from lacre module...")

73
lacre/config.py Normal file
View File

@ -0,0 +1,73 @@
"""Lacre configuration
Routines defined here are responsible for processing configuration.
"""
from configparser import RawConfigParser
import os
import logging
# Environment variable name we read to retrieve configuration path. This is to
# enable non-root users to set up and run GPG Mailgate and to make the software
# testable.
CONFIG_PATH_ENV = "GPG_MAILGATE_CONFIG"
# Global dict to keep configuration parameters. It's hidden behind several
# utility functions to make it easy to replace it with ConfigParser object in
# the future.
cfg = dict()
def load_config() -> dict:
"""Parses configuration file.
If environment variable identified by CONFIG_PATH_ENV
variable is set, its value is taken as a configuration file
path. Otherwise, the default is taken
('/etc/gpg-mailgate.conf').
"""
configFile = os.getenv(CONFIG_PATH_ENV, '/etc/gpg-mailgate.conf')
logging.debug(f"Loading configuration file: {configFile}")
parser = read_config(configFile)
global cfg
cfg = copy_to_dict(parser)
return cfg
def read_config(fileName) -> RawConfigParser:
cp = RawConfigParser()
cp.read(fileName)
return cp
def copy_to_dict(confParser) -> dict:
config = dict()
for sect in confParser.sections():
config[sect] = dict()
for (name, value) in confParser.items(sect):
config[sect][name] = value
return config
def get_item(section, key):
global cfg
return cfg[section][key]
def has_section(section) -> bool:
global cfg
return section in cfg
def config_item_set(section, key) -> bool:
global cfg
return section in cfg and (key in cfg[section]) and not (cfg[section][key] is None)
def config_item_equals(section, key, value) -> bool:
global cfg
return section in cfg and key in cfg[section] and cfg[section][key] == value
def verbose_logging_enabled() -> bool:
global cfg
return config_item_equals("logging", "verbose", "yes")

View File

@ -11,6 +11,8 @@
import sys
import socket
import logging
EXIT_UNAVAILABLE = 1
@ -44,12 +46,17 @@ def serve(port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.bind(localhost_at(port))
logging.info(f"Listening on localhost, port {port}")
s.listen(1)
logging.info("Listening...")
except socket.error as e:
print("Cannot connect", e)
logging.error(f"Cannot connect {e}")
sys.exit(EXIT_UNAVAILABLE)
logging.debug("About to accept a connection...")
(conn, addr) = s.accept()
logging.debug(f"Accepting connection from {conn}")
conn.sendall(welcome(b"TEST SERVER"))
receive_and_confirm(conn) # Ignore HELO/EHLO
@ -70,14 +77,24 @@ def serve(port):
conn.close()
logging.debug(f"Received {len(message)} characters of data")
# Trim EOM marker as we're only interested in the message body.
return message[:-len(EOM)]
def error(msg, exit_code):
logging.error(msg)
print("ERROR: %s" % (msg))
sys.exit(exit_code)
# filename is relative to where we run the tests from, i.e. the project root
# directory
logging.basicConfig(filename = 'test/logs/relay.log',
format = '%(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s',
datefmt = '%Y-%m-%d %H:%M:%S',
level = logging.DEBUG)
if len(sys.argv) < 2:
error("Usage: relay.py PORT_NUMBER", EXIT_UNAVAILABLE)