2022-05-31 22:09:10 +02:00
|
|
|
import sys
|
2022-06-07 22:14:32 +02:00
|
|
|
import re
|
2022-09-12 22:32:54 +02:00
|
|
|
import logging
|
2022-05-31 22:09:10 +02:00
|
|
|
|
2022-09-30 22:40:42 +02:00
|
|
|
import lacre.config as conf
|
|
|
|
|
2022-06-02 19:56:32 +02:00
|
|
|
# The standard way to encode line-ending in email:
|
|
|
|
EOL = "\r\n"
|
|
|
|
|
2022-06-01 23:00:05 +02:00
|
|
|
PGP_INLINE_BEGIN = b"-----BEGIN PGP MESSAGE-----"
|
|
|
|
PGP_INLINE_END = b"-----END PGP MESSAGE-----"
|
|
|
|
|
2022-09-12 22:32:54 +02:00
|
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
2022-05-31 22:09:10 +02:00
|
|
|
def parse_content_type(content_type):
|
2022-09-12 22:32:54 +02:00
|
|
|
"""Analyse Content-Type email header.
|
|
|
|
|
|
|
|
Return a pair: type and sub-type.
|
|
|
|
"""
|
2022-07-15 17:40:57 +02:00
|
|
|
parts = [p.strip() for p in content_type.split(';')]
|
|
|
|
if len(parts) == 1:
|
|
|
|
# No additional attributes provided. Use default encoding.
|
|
|
|
return (content_type, sys.getdefaultencoding())
|
2022-06-01 23:44:41 +02:00
|
|
|
|
2022-07-15 17:40:57 +02:00
|
|
|
# At least one attribute provided. Find out if any of them is named
|
|
|
|
# 'charset' and if so, use it.
|
|
|
|
ctype = parts[0]
|
2022-09-12 22:32:54 +02:00
|
|
|
encoding = [p for p in parts[1:] if p.startswith('charset=')]
|
2022-07-15 17:40:57 +02:00
|
|
|
if encoding:
|
|
|
|
eq_idx = encoding[0].index('=')
|
|
|
|
return (ctype, encoding[0][eq_idx+1:])
|
|
|
|
else:
|
|
|
|
return (ctype, sys.getdefaultencoding())
|
2022-06-01 23:00:05 +02:00
|
|
|
|
2022-09-12 22:32:54 +02:00
|
|
|
|
|
|
|
def parse_delimiter(address: str):
|
|
|
|
"""Parse an email with delimiter and topic.
|
|
|
|
|
|
|
|
Return destination emaili and topic as a tuple.
|
|
|
|
"""
|
|
|
|
withdelim = re.match('^([^\\+]+)\\+([^@]+)@(.*)$', address)
|
|
|
|
LOG.debug(f'Parsed email: {withdelim!r}')
|
|
|
|
|
2022-07-15 17:40:57 +02:00
|
|
|
if withdelim:
|
|
|
|
return (withdelim.group(1) + '@' + withdelim.group(3), withdelim.group(2))
|
|
|
|
else:
|
|
|
|
return (address, None)
|
2022-06-07 22:14:32 +02:00
|
|
|
|
2022-09-12 22:32:54 +02:00
|
|
|
|
2022-09-30 22:40:42 +02:00
|
|
|
def sanitize_case_sense(address):
|
|
|
|
"""Sanitize email case."""
|
|
|
|
# TODO: find a way to make it more unit-testable
|
|
|
|
if conf.flag_enabled('default', 'mail_case_insensitive'):
|
|
|
|
address = address.lower()
|
|
|
|
else:
|
|
|
|
splitted_address = address.split('@')
|
|
|
|
if len(splitted_address) > 1:
|
|
|
|
address = splitted_address[0] + '@' + splitted_address[1].lower()
|
|
|
|
|
|
|
|
return address
|
|
|
|
|
|
|
|
|
2022-09-12 22:32:54 +02:00
|
|
|
def is_pgp_inline(payload) -> bool:
|
|
|
|
"""Find out if the payload (bytes) contains PGP/INLINE markers."""
|
2022-07-15 17:40:57 +02:00
|
|
|
return PGP_INLINE_BEGIN in payload and PGP_INLINE_END in payload
|