Mailgate: replace tabs with spaces

This commit is contained in:
Piotr F. Mieszkowski 2022-06-30 22:16:22 +02:00 committed by Gitea
parent 3f2760ba2d
commit 29b5b50901
1 changed files with 409 additions and 401 deletions

View File

@ -17,7 +17,6 @@
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
#
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
import copy
import email
@ -25,34 +24,29 @@ import email.message
import email.utils
import GnuPG
import os
import re
import smtplib
import sys
import syslog
import traceback
import time
# imports for S/MIME
from M2Crypto import BIO, Rand, SMIME, X509
from email.mime.message import MIMEMessage
from M2Crypto import BIO, SMIME, X509
import logging
import lacre
import lacre.text as text
import lacre.config as conf
LOG = logging.getLogger(__name__)
def gpg_encrypt( raw_message, recipients ):
def _gpg_encrypt(raw_message, recipients):
if not conf.config_item_set('gpg', 'keyhome'):
LOG.error("No valid entry for gpg keyhome. Encryption aborted.")
return recipients
keys = GnuPG.public_keys( conf.get_item('gpg', 'keyhome') )
keys = GnuPG.public_keys(conf.get_item('gpg', 'keyhome'))
for fingerprint in keys:
keys[fingerprint] = sanitize_case_sense(keys[fingerprint])
keys[fingerprint] = _sanitize_case_sense(keys[fingerprint])
# This list will be filled with pairs (M, N), where M is the destination
# address we're going to deliver the message to and N is the identity we're
@ -67,10 +61,10 @@ def gpg_encrypt( raw_message, recipients ):
# Check if recipient is in keymap
if conf.config_item_set('enc_keymap', to):
LOG.info("Encrypt keymap has key '%s'" % conf.get_item('enc_keymap', to) )
LOG.info("Encrypt keymap has key '%s'" % conf.get_item('enc_keymap', to))
# Check we've got a matching key!
if conf.get_item('enc_keymap', to) in keys:
gpg_to.append( (to, conf.get_item('enc_keymap', to)) )
gpg_to.append((to, conf.get_item('enc_keymap', to)))
continue
else:
LOG.info("Key '%s' in encrypt keymap not found in keyring for email address '%s'." % (conf.get_item('enc_keymap', to), to))
@ -78,7 +72,7 @@ def gpg_encrypt( raw_message, recipients ):
# Check if key in keychain is present
if not enc_keymap_only:
if to in keys.values():
gpg_to.append( (to, to) )
gpg_to.append((to, to))
continue
# If this is an address with a delimiter (i.e. "foo+bar@example.com"),
@ -92,11 +86,11 @@ def gpg_encrypt( raw_message, recipients ):
if len(splitted_to) > 1:
domain = splitted_to[1]
if conf.config_item_set('enc_domain_keymap', domain):
LOG.info("Encrypt domain keymap has key '%s'" % conf.get_item('enc_domain_keymap', domain) )
LOG.info("Encrypt domain keymap has key '%s'" % conf.get_item('enc_domain_keymap', domain))
# Check we've got a matching key!
if conf.get_item('enc_domain_keymap', domain) in keys:
LOG.info("Using default domain key for recipient '%s'" % to)
gpg_to.append( (to, conf.get_item('enc_domain_keymap', domain)) )
gpg_to.append((to, conf.get_item('enc_domain_keymap', domain)))
continue
else:
LOG.info("Key '%s' in encrypt domain keymap not found in keyring for email address '%s'." % (conf.get_item('enc_domain_keymap', domain), to))
@ -106,7 +100,7 @@ def gpg_encrypt( raw_message, recipients ):
ungpg_to.append(to)
if gpg_to:
LOG.info("Encrypting email to: %s" % ' '.join( x[0] for x in gpg_to ))
LOG.info("Encrypting email to: %s" % ' '.join(x[0] for x in gpg_to))
# Getting PGP style for recipient
gpg_to_smtp_mime = list()
@ -148,10 +142,10 @@ def gpg_encrypt( raw_message, recipients ):
else:
raw_message_mime['Content-Transfer-Encoding'] = '8BIT'
encrypted_payloads = encrypt_all_payloads_mime( raw_message_mime, gpg_to_cmdline_mime )
raw_message_mime.set_payload( encrypted_payloads )
encrypted_payloads = _encrypt_all_payloads_mime(raw_message_mime, gpg_to_cmdline_mime)
raw_message_mime.set_payload(encrypted_payloads)
send_msg( raw_message_mime.as_string(), gpg_to_smtp_mime )
_send_msg(raw_message_mime.as_string(), gpg_to_smtp_mime)
if gpg_to_smtp_inline:
# Encrypt mail with PGP/INLINE
@ -165,57 +159,59 @@ def gpg_encrypt( raw_message, recipients ):
else:
raw_message_inline['Content-Transfer-Encoding'] = '8BIT'
encrypted_payloads = encrypt_all_payloads_inline( raw_message_inline, gpg_to_cmdline_inline )
raw_message_inline.set_payload( encrypted_payloads )
encrypted_payloads = _encrypt_all_payloads_inline(raw_message_inline, gpg_to_cmdline_inline)
raw_message_inline.set_payload(encrypted_payloads)
send_msg( raw_message_inline.as_string(), gpg_to_smtp_inline )
_send_msg(raw_message_inline.as_string(), gpg_to_smtp_inline)
return ungpg_to
def encrypt_all_payloads_inline( message, gpg_to_cmdline ):
def _encrypt_all_payloads_inline(message, gpg_to_cmdline):
# This breaks cascaded MIME messages. Blame PGP/INLINE.
encrypted_payloads = list()
if isinstance(message.get_payload(), str):
return encrypt_payload( message, gpg_to_cmdline ).get_payload()
return _encrypt_payload(message, gpg_to_cmdline).get_payload()
for payload in message.get_payload():
if( isinstance(payload.get_payload(), list) ):
encrypted_payloads.extend( encrypt_all_payloads_inline( payload, gpg_to_cmdline ) )
if(isinstance(payload.get_payload(), list)):
encrypted_payloads.extend(_encrypt_all_payloads_inline(payload, gpg_to_cmdline))
else:
encrypted_payloads.append( encrypt_payload( payload, gpg_to_cmdline ) )
encrypted_payloads.append(_encrypt_payload(payload, gpg_to_cmdline))
return encrypted_payloads
def encrypt_all_payloads_mime( message, gpg_to_cmdline ):
def _encrypt_all_payloads_mime(message, gpg_to_cmdline):
# Convert a plain text email into PGP/MIME attachment style. Modeled after enigmail.
pgp_ver_part = email.message.Message()
pgp_ver_part.set_payload("Version: 1"+text.EOL)
pgp_ver_part.set_type("application/pgp-encrypted")
pgp_ver_part.set_param('PGP/MIME version identification', "", 'Content-Description' )
pgp_ver_part.set_param('PGP/MIME version identification', "", 'Content-Description')
encrypted_part = email.message.Message()
encrypted_part.set_type("application/octet-stream")
encrypted_part.set_param('name', "encrypted.asc")
encrypted_part.set_param('OpenPGP encrypted message', "", 'Content-Description' )
encrypted_part.set_param('inline', "", 'Content-Disposition' )
encrypted_part.set_param('filename', "encrypted.asc", 'Content-Disposition' )
encrypted_part.set_param('OpenPGP encrypted message', "", 'Content-Description')
encrypted_part.set_param('inline', "", 'Content-Disposition')
encrypted_part.set_param('filename', "encrypted.asc", 'Content-Disposition')
if isinstance(message.get_payload(), str):
# WTF! It seems to swallow the first line. Not sure why. Perhaps
# it's skipping an imaginary blank line someplace. (ie skipping a header)
# Workaround it here by prepending a blank line.
# This happens only on text only messages.
additionalSubHeader=""
additionalSubHeader = ""
encoding = sys.getdefaultencoding()
if 'Content-Type' in message and not message['Content-Type'].startswith('multipart'):
additionalSubHeader="Content-Type: "+message['Content-Type']+text.EOL
additionalSubHeader = "Content-Type: " + message['Content-Type'] + text.EOL
(base, encoding) = text.parse_content_type(message['Content-Type'])
LOG.debug(f"Identified encoding as {encoding}")
encrypted_part.set_payload(additionalSubHeader+text.EOL +message.get_payload(decode=True).decode(encoding))
encrypted_part.set_payload(additionalSubHeader+text.EOL + message.get_payload(decode=True).decode(encoding))
check_nested = True
else:
processed_payloads = generate_message_from_payloads(message)
processed_payloads = _generate_message_from_payloads(message)
encrypted_part.set_payload(processed_payloads.as_string())
check_nested = False
@ -232,40 +228,42 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ):
else:
message['Content-Type'] = f"multipart/encrypted; protocol=\"application/pgp-encrypted\"; boundary=\"{boundary}\""+text.EOL
return [ pgp_ver_part, encrypt_payload(encrypted_part, gpg_to_cmdline, check_nested) ]
return [pgp_ver_part, _encrypt_payload(encrypted_part, gpg_to_cmdline, check_nested)]
def encrypt_payload( payload, gpg_to_cmdline, check_nested = True ):
def _encrypt_payload(payload, gpg_to_cmdline, check_nested=True):
raw_payload = payload.get_payload(decode=True)
if check_nested and text.is_pgp_inline(raw_payload):
LOG.debug("Message is already pgp encrypted. No nested encryption needed.")
return payload
# No check is needed for conf.get_item('gpg', 'keyhome') as this is already done in method gpg_encrypt
gpg = GnuPG.GPGEncryptor( conf.get_item('gpg', 'keyhome'), gpg_to_cmdline, payload.get_content_charset() )
gpg.update( raw_payload )
gpg = GnuPG.GPGEncryptor(conf.get_item('gpg', 'keyhome'), gpg_to_cmdline, payload.get_content_charset())
gpg.update(raw_payload)
encrypted_data, returncode = gpg.encrypt()
LOG.debug("Return code from encryption=%d (0 indicates success)." % returncode)
if returncode != 0:
LOG.info("Encrytion failed with return code %d. Encryption aborted." % returncode)
return payload
payload.set_payload( encrypted_data )
isAttachment = payload.get_param( 'attachment', None, 'Content-Disposition' ) is not None
payload.set_payload(encrypted_data)
isAttachment = payload.get_param('attachment', None, 'Content-Disposition') is not None
if isAttachment:
filename = payload.get_filename()
if filename:
pgpFilename = filename + ".pgp"
if not (payload.get('Content-Disposition') is None):
payload.set_param( 'filename', pgpFilename, 'Content-Disposition' )
if not (payload.get('Content-Type') is None) and not (payload.get_param( 'name' ) is None):
payload.set_param( 'name', pgpFilename )
payload.set_param('filename', pgpFilename, 'Content-Disposition')
if not (payload.get('Content-Type') is None) and not (payload.get_param('name') is None):
payload.set_param('name', pgpFilename)
if not (payload.get('Content-Transfer-Encoding') is None):
payload.replace_header( 'Content-Transfer-Encoding', "7bit" )
payload.replace_header('Content-Transfer-Encoding', "7bit")
return payload
def smime_encrypt( raw_message, recipients ):
def _smime_encrypt(raw_message, recipients):
global LOG
global from_addr
@ -280,7 +278,7 @@ def smime_encrypt( raw_message, recipients ):
unsmime_to = list()
for addr in recipients:
cert_and_email = get_cert_for_email(addr, cert_path)
cert_and_email = _get_cert_for_email(addr, cert_path)
if not (cert_and_email is None):
(to_cert, normal_email) = cert_and_email
@ -294,7 +292,7 @@ def smime_encrypt( raw_message, recipients ):
if smime_to:
s.set_x509_stack(sk)
s.set_cipher(SMIME.Cipher('aes_192_cbc'))
p7 = s.encrypt( BIO.MemoryBuffer( raw_message.as_string() ) )
p7 = s.encrypt(BIO.MemoryBuffer(raw_message.as_string()))
# Output p7 in mail-friendly format.
out = BIO.MemoryBuffer()
out.write('From: ' + from_addr + text.EOL)
@ -304,7 +302,7 @@ def smime_encrypt( raw_message, recipients ):
if raw_message['Bcc']:
out.write('Bcc: ' + raw_message['Bcc'] + text.EOL)
if raw_message['Subject']:
out.write('Subject: '+ raw_message['Subject'] + text.EOL)
out.write('Subject: ' + raw_message['Subject'] + text.EOL)
if conf.config_item_equals('default', 'add_header', 'yes'):
out.write('X-GPG-Mailgate: Encrypted by GPG Mailgate' + text.EOL)
@ -313,13 +311,14 @@ def smime_encrypt( raw_message, recipients ):
LOG.debug(f"Sending message from {from_addr} to {smime_to}")
send_msg(out.read(), smime_to)
_send_msg(out.read(), smime_to)
if unsmime_to:
LOG.debug(f"Unable to find valid S/MIME certificates for {unsmime_to}")
return unsmime_to
def get_cert_for_email( to_addr, cert_path ):
def _get_cert_for_email(to_addr, cert_path):
insensitive = conf.config_item_equals('default', 'mail_case_insensitive', 'yes')
files_in_directory = os.listdir(cert_path)
@ -342,9 +341,10 @@ def get_cert_for_email( to_addr, cert_path ):
return None
else:
LOG.debug(f"Looking up certificate for {fixed_up_email} after parsing {to_addr}")
return get_cert_for_email(fixed_up_email, cert_path)
return _get_cert_for_email(fixed_up_email, cert_path)
def sanitize_case_sense( address ):
def _sanitize_case_sense(address):
if conf.config_item_equals('default', 'mail_case_insensitive', 'yes'):
address = address.lower()
else:
@ -354,25 +354,28 @@ def sanitize_case_sense( address ):
return address
def generate_message_from_payloads( payloads, message = None ):
if message == None:
def _generate_message_from_payloads(payloads, message=None):
if message is None:
message = email.mime.multipart.MIMEMultipart(payloads.get_content_subtype())
for payload in payloads.get_payload():
if( isinstance(payload.get_payload(), list) ):
message.attach(generate_message_from_payloads(payload))
if(isinstance(payload.get_payload(), list)):
message.attach(_generate_message_from_payloads(payload))
else:
message.attach(payload)
return message
def get_first_payload( payloads ):
def _get_first_payload(payloads):
if payloads.is_multipart():
return get_first_payload(payloads.get_payload(0))
return _get_first_payload(payloads.get_payload(0))
else:
return payloads
def send_msg( message, recipients ):
def _send_msg(message, recipients):
global from_addr
recipients = [_f for _f in recipients if _f]
@ -382,49 +385,54 @@ def send_msg( message, recipients ):
smtp = smtplib.SMTP(relay[0], relay[1])
if conf.config_item_equals('relay', 'starttls', 'yes'):
smtp.starttls()
smtp.sendmail( from_addr, recipients, message )
smtp.sendmail(from_addr, recipients, message)
else:
LOG.info("No recipient found")
def is_encrypted(raw_message):
def _is_encrypted(raw_message):
if raw_message.get_content_type() == 'multipart/encrypted':
return True
first_part = get_first_payload(raw_message)
first_part = _get_first_payload(raw_message)
if first_part.get_content_type() == 'application/pkcs7-mime':
return True
first_payload = first_part.get_payload(decode=True)
return text.is_pgp_inline(first_payload)
def deliver_message( raw_message, from_address, to_addrs ):
def deliver_message(raw_message, from_address, to_addrs):
"""Send RAW_MESSAGE to all TO_ADDRS using the best encryption method available."""
global from_addr
# Ugly workaround to keep the code working without too many changes.
from_addr = from_address
recipients_left = [sanitize_case_sense(recipient) for recipient in to_addrs]
recipients_left = [_sanitize_case_sense(recipient) for recipient in to_addrs]
# There is no need for nested encryption
if is_encrypted(raw_message):
if _is_encrypted(raw_message):
LOG.debug("Message is already encrypted. Encryption aborted.")
send_msg(raw_message.as_string(), recipients_left)
_send_msg(raw_message.as_string(), recipients_left)
return
# Encrypt mails for recipients with known public PGP keys
recipients_left = gpg_encrypt(raw_message, recipients_left)
recipients_left = _gpg_encrypt(raw_message, recipients_left)
if not recipients_left:
return
# Encrypt mails for recipients with known S/MIME certificate
recipients_left = smime_encrypt(raw_message, recipients_left)
recipients_left = _smime_encrypt(raw_message, recipients_left)
if not recipients_left:
return
# Send out mail to recipients which are left
send_msg(raw_message.as_string(), recipients_left)
_send_msg(raw_message.as_string(), recipients_left)
def exec_time_info(start_timestamp):
"""Calculate time since the given timestamp."""
elapsed_s = time.time() - start_timestamp
process_t = time.process_time()
return (elapsed_s, process_t)