Extract PGP/INLINE checks, remove unnecessary byte-check

This commit is contained in:
Piotr F. Mieszkowski 2022-06-01 23:00:05 +02:00
parent 4c6fdc52ec
commit d3b1717290
2 changed files with 14 additions and 17 deletions

View File

@ -179,7 +179,6 @@ def encrypt_all_payloads_inline( message, gpg_to_cmdline ):
return encrypted_payloads
def encrypt_all_payloads_mime( message, gpg_to_cmdline ):
# Convert a plain text email into PGP/MIME attachment style. Modeled after enigmail.
pgp_ver_part = email.message.Message()
pgp_ver_part.set_payload("Version: 1\n")
@ -230,7 +229,7 @@ def encrypt_payload( payload, gpg_to_cmdline, check_nested = True ):
global LOG
raw_payload = payload.get_payload(decode=True)
if check_nested and b"-----BEGIN PGP MESSAGE-----" in raw_payload and b"-----END PGP MESSAGE-----" in raw_payload:
if check_nested and text.is_pgp_inline(raw_payload):
LOG.debug("Message is already pgp encrypted. No nested encryption needed.")
return payload
@ -337,22 +336,16 @@ def get_cert_for_email( to_addr, cert_path ):
return None
def sanitize_case_sense( address ):
if conf.config_item_equals('default', 'mail_case_insensitive', 'yes'):
address = address.lower()
else:
if isinstance(address, str):
sep = '@'
else:
sep = b'@'
splitted_address = address.split(sep)
splitted_address = address.split('@')
if len(splitted_address) > 1:
address = splitted_address[0] + sep + splitted_address[1].lower()
return address
def generate_message_from_payloads( payloads, message = None ):
if message == None:
message = email.mime.multipart.MIMEMultipart(payloads.get_content_subtype())
@ -365,7 +358,6 @@ def generate_message_from_payloads( payloads, message = None ):
return message
def get_first_payload( payloads ):
if payloads.is_multipart():
return get_first_payload(payloads.get_payload(0))
else:
@ -376,7 +368,7 @@ def send_msg( message, recipients ):
recipients = [_f for _f in recipients if _f]
if recipients:
LOG.info("Sending email to: <%s>" % '> <'.join( recipients ))
LOG.info(f"Sending email to: {recipients!r}")
relay = (conf.get_item('relay', 'host'), int(conf.get_item('relay', 'port')))
smtp = smtplib.SMTP(relay[0], relay[1])
if conf.config_item_equals('relay', 'starttls', 'yes'):
@ -388,9 +380,7 @@ def send_msg( message, recipients ):
def sort_recipients( raw_message, from_addr, to_addrs ):
global LOG
recipients_left = list()
for recipient in to_addrs:
recipients_left.append(sanitize_case_sense(recipient))
recipients_left = [sanitize_case_sense(recipient) for recipient in to_addrs]
# There is no need for nested encryption
first_payload = get_first_payload(raw_message)
@ -400,7 +390,7 @@ def sort_recipients( raw_message, from_addr, to_addrs ):
return
first_payload = first_payload.get_payload(decode=True)
if b"-----BEGIN PGP MESSAGE-----" in first_payload and b"-----END PGP MESSAGE-----" in first_payload:
if text.is_pgp_inline(first_payload):
LOG.debug("Message is already encrypted as PGP/INLINE. Encryption aborted.")
send_msg(raw_message.as_string(), recipients_left)
return
@ -431,8 +421,8 @@ LOG = logging.getLogger(__name__)
missing_params = conf.validate_config()
if missing_params:
LOG.error(f"Aborting delivery! Following mandatory config parameters are missing: {missing_params!r}")
sys.exit(EX_CONFIG)
LOG.error(f"Aborting delivery! Following mandatory config parameters are missing: {missing_params!r}")
sys.exit(EX_CONFIG)
# Read e-mail from stdin
raw = sys.stdin.read()

View File

@ -1,5 +1,8 @@
import sys
PGP_INLINE_BEGIN = b"-----BEGIN PGP MESSAGE-----"
PGP_INLINE_END = b"-----END PGP MESSAGE-----"
def parse_content_type(content_type):
split_at = content_type.find(';')
if split_at < 0:
@ -9,3 +12,7 @@ def parse_content_type(content_type):
return (content_type[0 : split_at], second_part[second_part.index('=') + 1 : ].strip())
else:
return (content_type[0 : split_at], sys.getdefaultencoding())
def is_pgp_inline(payload):
"""Finds out if the payload (bytes) contains PGP/INLINE markers."""
return PGP_INLINE_BEGIN in payload and PGP_INLINE_END in payload