Use logging module

- Replace custom logging code with calls to logging module.
- Use logging.config to provide configuration parameters.

To make Lacre's logging more flexible, use fileConfig from logging.config to
set up all parameters.  If the configuration file is missing, use dictConfig
with hardcoded reasonable defaults.
This commit is contained in:
Piotr F. Mieszkowski 2022-03-26 09:52:56 +01:00
parent baf7954270
commit 75ccfb0850
7 changed files with 98 additions and 69 deletions

View File

@ -66,9 +66,9 @@ notification_email = gpg-mailgate@yourdomain.tld
mail_templates = /var/gpgmailgate/cron_templates
[logging]
# For logging to syslog. 'file = syslog', otherwise use path to the file.
file = syslog
verbose = yes
# path to the logging configuration; see documentation for details:
# https://docs.python.org/3/library/logging.config.html#logging-config-fileformat
config = /etc/gpg-lacre-logging.ini
[relay]
# the relay settings to use for Postfix

View File

@ -43,19 +43,11 @@ import logging
import lacre
import lacre.config as conf
def log( msg ):
if conf.config_item_set("logging", "file"):
if conf.config_item_equals('logging', 'file', "syslog"):
syslog.syslog(syslog.LOG_INFO | syslog.LOG_MAIL, msg)
else:
logfile = open(conf.get_item('logging', 'file'), 'a')
logfile.write(msg + "\n")
logfile.close()
def gpg_encrypt( raw_message, recipients ):
global LOG
if not conf.config_item_set('gpg', 'keyhome'):
log("No valid entry for gpg keyhome. Encryption aborted.")
LOG.info("No valid entry for gpg keyhome. Encryption aborted.")
return recipients
keys = GnuPG.public_keys( conf.get_item('gpg', 'keyhome') )
@ -69,13 +61,13 @@ def gpg_encrypt( raw_message, recipients ):
# Check if recipient is in keymap
if conf.config_item_set('enc_keymap', to):
log("Encrypt keymap has key '%s'" % conf.get_item('enc_keymap', to) )
LOG.info("Encrypt keymap has key '%s'" % conf.get_item('enc_keymap', to) )
# Check we've got a matching key!
if conf.get_item('enc_keymap', to) in keys:
gpg_to.append( (to, conf.get_item('enc_keymap', to)) )
continue
else:
log("Key '%s' in encrypt keymap not found in keyring for email address '%s'." % (conf.get_item('enc_keymap', to), to))
LOG.info("Key '%s' in encrypt keymap not found in keyring for email address '%s'." % (conf.get_item('enc_keymap', to), to))
# Check if key in keychain is present
if to in keys.values() and not conf.config_item_equals('default', 'enc_keymap_only', 'yes'):
@ -87,22 +79,21 @@ def gpg_encrypt( raw_message, recipients ):
if len(splitted_to) > 1:
domain = splitted_to[1]
if conf.config_item_set('enc_domain_keymap', domain):
log("Encrypt domain keymap has key '%s'" % conf.get_item('enc_dec_keymap', domain) )
LOG.info("Encrypt domain keymap has key '%s'" % conf.get_item('enc_dec_keymap', domain) )
# Check we've got a matching key!
if conf.get_item('enc_domain_keymap', domain) in keys:
log("Using default domain key for recipient '%s'" % to)
LOG.info("Using default domain key for recipient '%s'" % to)
gpg_to.append( (to, conf.get_item('enc_domain_keymap', domain)) )
continue
else:
log("Key '%s' in encrypt domain keymap not found in keyring for email address '%s'." % (conf.get_item('enc_domain_keymap', domain), to))
LOG.info("Key '%s' in encrypt domain keymap not found in keyring for email address '%s'." % (conf.get_item('enc_domain_keymap', domain), to))
# At this point no key has been found
if verbose:
log("Recipient (%s) not in PGP domain list for encrypting." % to)
LOG.debug("Recipient (%s) not in PGP domain list for encrypting." % to)
ungpg_to.append(to)
if gpg_to:
log("Encrypting email to: %s" % ' '.join( x[0] for x in gpg_to ))
LOG.info("Encrypting email to: %s" % ' '.join( x[0] for x in gpg_to ))
# Getting PGP style for recipient
gpg_to_smtp_mime = list()
@ -122,7 +113,7 @@ def gpg_encrypt( raw_message, recipients ):
else:
# Log message only if an unknown style is defined
if conf.config_item_set('pgp_style', rcpt[0]):
log("Style %s for recipient %s is not known. Use default as fallback." % (conf.get_item("pgp_style", rcpt[0]), rcpt[0]))
LOG.info("Style %s for recipient %s is not known. Use default as fallback." % (conf.get_item("pgp_style", rcpt[0]), rcpt[0]))
# If no style is in settings defined for recipient, use default from settings
if conf.config_item_equals('default', 'mime_conversion', 'yes'):
@ -229,21 +220,20 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ):
return [ submsg1, encrypt_payload(submsg2, gpg_to_cmdline, check_nested) ]
def encrypt_payload( payload, gpg_to_cmdline, check_nested = True ):
global LOG
raw_payload = payload.get_payload(decode=True)
if check_nested and b"-----BEGIN PGP MESSAGE-----" in raw_payload and b"-----END PGP MESSAGE-----" in raw_payload:
if verbose:
log("Message is already pgp encrypted. No nested encryption needed.")
LOG.debug("Message is already pgp encrypted. No nested encryption needed.")
return payload
# No check is needed for conf.get_item('gpg', 'keyhome') as this is already done in method gpg_encrypt
gpg = GnuPG.GPGEncryptor( conf.get_item('gpg', 'keyhome'), gpg_to_cmdline, payload.get_content_charset() )
gpg.update( raw_payload )
encrypted_data, returncode = gpg.encrypt()
if verbose:
log("Return code from encryption=%d (0 indicates success)." % returncode)
LOG.debug("Return code from encryption=%d (0 indicates success)." % returncode)
if returncode != 0:
log("Encrytion failed with return code %d. Encryption aborted." % returncode)
LOG.info("Encrytion failed with return code %d. Encryption aborted." % returncode)
return payload
payload.set_payload( encrypted_data )
@ -263,9 +253,10 @@ def encrypt_payload( payload, gpg_to_cmdline, check_nested = True ):
return payload
def smime_encrypt( raw_message, recipients ):
global LOG
if not conf.config_item_set('smime', 'cert_path'):
log("No valid path for S/MIME certs found in config file. S/MIME encryption aborted.")
LOG.info("No valid path for S/MIME certs found in config file. S/MIME encryption aborted.")
return recipients
cert_path = conf.get_item('smime', 'cert_path')+"/"
@ -279,8 +270,7 @@ def smime_encrypt( raw_message, recipients ):
if not (cert_and_email is None):
(to_cert, normal_email) = cert_and_email
if verbose:
log("Found cert " + to_cert + " for " + addr + ": " + normal_email)
LOG.debug("Found cert " + to_cert + " for " + addr + ": " + normal_email)
smime_to.append(addr)
x509 = X509.load_cert(to_cert, format=X509.FORMAT_PEM)
sk.push(x509)
@ -307,17 +297,16 @@ def smime_encrypt( raw_message, recipients ):
s.write(out, p7)
if verbose:
log("Sending message from " + from_addr + " to " + str(smime_to))
LOG.debug("Sending message from " + from_addr + " to " + str(smime_to))
send_msg(out.read(), smime_to)
if unsmime_to:
if verbose:
log("Unable to find valid S/MIME certificates for " + str(unsmime_to))
LOG.debug("Unable to find valid S/MIME certificates for " + str(unsmime_to))
return unsmime_to
def get_cert_for_email( to_addr, cert_path ):
global LOG
files_in_directory = os.listdir(cert_path)
for filename in files_in_directory:
@ -335,8 +324,7 @@ def get_cert_for_email( to_addr, cert_path ):
multi_email = re.match('^([^\+]+)\+([^@]+)@(.*)$', to_addr)
if multi_email:
fixed_up_email = "%s@%s" % (multi_email.group(1), multi_email.group(3))
if verbose:
log("Multi-email %s converted to %s" % (to_addr, fixed_up_email))
LOG.debug("Multi-email %s converted to %s" % (to_addr, fixed_up_email))
return get_cert_for_email(fixed_up_email)
return None
@ -377,22 +365,24 @@ def get_first_payload( payloads ):
return payloads
def send_msg( message, recipients ):
global LOG
recipients = [_f for _f in recipients if _f]
if recipients:
if not (conf.config_item_set('relay', 'host') and conf.config_item_set('relay', 'port')):
log("Missing settings for relay. Sending email aborted.")
LOG.info("Missing settings for relay. Sending email aborted.")
return None
log("Sending email to: <%s>" % '> <'.join( recipients ))
LOG.info("Sending email to: <%s>" % '> <'.join( recipients ))
relay = (conf.get_item('relay', 'host'), int(conf.get_item('relay', 'port')))
smtp = smtplib.SMTP(relay[0], relay[1])
if conf.config_item_equals('relay', 'starttls', 'yes'):
smtp.starttls()
smtp.sendmail( from_addr, recipients, message )
else:
log("No recipient found")
LOG.info("No recipient found")
def sort_recipients( raw_message, from_addr, to_addrs ):
global LOG
recipients_left = list()
for recipient in to_addrs:
@ -401,21 +391,18 @@ def sort_recipients( raw_message, from_addr, to_addrs ):
# There is no need for nested encryption
first_payload = get_first_payload(raw_message)
if first_payload.get_content_type() == 'application/pkcs7-mime':
if verbose:
log("Message is already encrypted with S/MIME. Encryption aborted.")
LOG.debug("Message is already encrypted with S/MIME. Encryption aborted.")
send_msg(raw_message.as_string(), recipients_left)
return
first_payload = first_payload.get_payload(decode=True)
if b"-----BEGIN PGP MESSAGE-----" in first_payload and b"-----END PGP MESSAGE-----" in first_payload:
if verbose:
log("Message is already encrypted as PGP/INLINE. Encryption aborted.")
LOG.debug("Message is already encrypted as PGP/INLINE. Encryption aborted.")
send_msg(raw_message.as_string(), recipients_left)
return
if raw_message.get_content_type() == 'multipart/encrypted':
if verbose:
log("Message is already encrypted. Encryption aborted.")
LOG.debug("Message is already encrypted. Encryption aborted.")
send_msg(raw_message.as_string(), recipients_left)
return
@ -434,9 +421,10 @@ def sort_recipients( raw_message, from_addr, to_addrs ):
conf.load_config()
verbose = conf.verbose_logging_enabled()
lacre.init_logging(conf.get_item('logging', 'config'))
lacre.init_logging(conf.get_item('logging', 'file'))
LOG = logging.getLogger(__name__)
# Read e-mail from stdin
raw = sys.stdin.read()

View File

@ -2,15 +2,35 @@
"""
import logging
import logging.config
# Initialise logging infrastructure.
#
# This is going to be removed at some point of time, but is used at the moment
# because it's easy and keeps the rest of the code quite clean.
# Following structure configures logging iff a file-based configuration cannot
# be performed. It only sets up a syslog handler, so that the admin has at
# least some basic information.
FAIL_OVER_LOGGING_CONFIG = {
'version': 1,
'formatters': {
'sysfmt': {
'format': '%(asctime)s %(module)s %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S'
},
},
'handlers': {
'syslog': {
'class': 'logging.handlers.SysLogHandler',
'level': 'INFO',
'formatter': 'sysfmt'
}
},
'root': {
'level': 'INFO',
'handlers': ['syslog']
}
}
def init_logging(log_filename):
logging.basicConfig(filename = log_filename,
format = '%(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s',
datefmt = '%Y-%m-%d %H:%M:%S',
level = logging.DEBUG)
logging.info("Starting logs from lacre module...")
def init_logging(config_filename):
if config_filename is not None:
logging.config.fileConfig(config_filename)
else:
logging.config.dictConfig(FAIL_OVER_LOGGING_CONFIG)
logging.warning('Lacre logging configuration missing, using syslog as default')

View File

@ -7,7 +7,6 @@ from configparser import RawConfigParser
import os
import logging
# Environment variable name we read to retrieve configuration path. This is to
# enable non-root users to set up and run GPG Mailgate and to make the software
@ -29,7 +28,6 @@ def load_config() -> dict:
"""
configFile = os.getenv(CONFIG_PATH_ENV, '/etc/gpg-mailgate.conf')
logging.debug(f"Loading configuration file: {configFile}")
parser = read_config(configFile)
global cfg
@ -52,9 +50,12 @@ def copy_to_dict(confParser) -> dict:
return config
def get_item(section, key):
def get_item(section, key, empty_value = None):
global cfg
return cfg[section][key]
if config_item_set(section, key):
return cfg[section][key]
else:
return empty_value
def has_section(section) -> bool:
global cfg
@ -67,7 +68,3 @@ def config_item_set(section, key) -> bool:
def config_item_equals(section, key, value) -> bool:
global cfg
return section in cfg and key in cfg[section] and cfg[section][key] == value
def verbose_logging_enabled() -> bool:
global cfg
return config_item_equals("logging", "verbose", "yes")

View File

@ -35,6 +35,7 @@ e2e_log: test/logs/e2e.log
e2e_log_format: %(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s
e2e_log_datefmt: %Y-%m-%d %H:%M:%S
lacre_log: test/logs/gpg-mailgate.log
log_config: test/gpg-lacre-log.ini
[case-1]
descr: Clear text message to a user without a key

View File

@ -33,11 +33,10 @@ RELAY_SCRIPT = "test/relay.py"
CONFIG_FILE = "test/gpg-mailgate.conf"
def build_config(config):
cp = configparser.ConfigParser()
cp = configparser.RawConfigParser()
cp.add_section("logging")
cp.set("logging", "file", config["log_file"])
cp.set("logging", "verbose", "yes")
cp.set("logging", "config", config["log_config"])
cp.add_section("gpg")
cp.set("gpg", "keyhome", config["gpg_keyhome"])
@ -147,7 +146,7 @@ write_test_config(config_path,
port = config.get("relay", "port"),
gpg_keyhome = config.get("dirs", "keys"),
smime_certpath = config.get("dirs", "certs"),
log_file = config.get("tests", "lacre_log"))
log_config = config.get("tests", "log_config"))
for case_no in range(1, config.getint("tests", "cases")+1):
case_name = f"case-{case_no}"

24
test/gpg-lacre-log.ini Normal file
View File

@ -0,0 +1,24 @@
[loggers]
keys=root
[logger_root]
level=NOTSET
handlers=lacrelog
[handlers]
keys=lacrelog
[formatters]
keys=postfixfmt
[handler_lacrelog]
class=FileHandler
level=DEBUG
formatter=postfixfmt
args=('test/logs/lacre.log', 'a+')
[formatter_postfixfmt]
format=%(asctime)s %(module)s[%(process)d]: %(message)s
datefmt=%b %e %H:%M:%S
style=%
validate=True