Rework PGP/MIME flow

This commit is contained in:
Piotr F. Mieszkowski 2023-02-18 21:50:39 +01:00
parent 5e408259c0
commit 27b07e672d
2 changed files with 74 additions and 40 deletions

View File

@ -171,8 +171,7 @@ class GPGEncryptor:
def encrypt(self):
"""Feed GnuPG with the message."""
p = subprocess.Popen(self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True)
p = self._popen()
encdata, err = p.communicate(input=self._message)
if p.returncode != 0:
LOG.debug('Errors: %s', err)
@ -180,6 +179,16 @@ class GPGEncryptor:
raise EncryptionException(details['issue'], details['recipient'], details['cause'])
return (encdata, p.returncode)
def _popen(self):
if self._charset:
return subprocess.Popen(self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding=self._charset)
else:
return subprocess.Popen(self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def _command(self):
cmd = _build_command(self._keyhome, "--trust-model", "always", "--status-fd", "2", "--batch", "--yes", "--pgp7", "--no-secmem-warning", "-a", "-e")

View File

@ -115,7 +115,7 @@ def _sort_gpg_recipients(gpg_to):
return gpg_to_smtp_mime, gpg_to_cmdline_mime, gpg_to_smtp_inline, gpg_to_cmdline_inline
def _gpg_encrypt_copy(message, cmdline, to, encrypt_f):
def _gpg_encrypt_copy(message: email.message.Message, cmdline, to, encrypt_f):
msg_copy = copy.deepcopy(message)
_customise_headers(msg_copy)
encrypted_payloads = encrypt_f(msg_copy, cmdline)
@ -123,22 +123,22 @@ def _gpg_encrypt_copy(message, cmdline, to, encrypt_f):
return msg_copy
def _gpg_encrypt_to_bytes(message, cmdline, to, encrypt_f) -> bytes:
def _gpg_encrypt_to_bytes(message: email.message.Message, cmdline, to, encrypt_f) -> bytes:
msg_copy = _gpg_encrypt_copy(message, cmdline, to, encrypt_f)
return msg_copy.as_bytes(policy=SMTPUTF8)
def _gpg_encrypt_to_str(message, cmdline, to, encrypt_f) -> str:
def _gpg_encrypt_to_str(message: email.message.Message, cmdline, to, encrypt_f) -> str:
msg_copy = _gpg_encrypt_copy(message, cmdline, to, encrypt_f)
return msg_copy.as_string()
def _gpg_encrypt_and_deliver(message, cmdline, to, encrypt_f):
def _gpg_encrypt_and_deliver(message: email.message.Message, cmdline, to, encrypt_f):
out = _gpg_encrypt_to_str(message, cmdline, to, encrypt_f)
send_msg(out, to)
def _customise_headers(msg_copy):
def _customise_headers(msg_copy: email.message.Message):
if conf.config_item_equals('default', 'add_header', 'yes'):
msg_copy['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
@ -281,7 +281,7 @@ def _try_configured_domain_key(recipient, keys):
return None
def _encrypt_all_payloads_inline(message, gpg_to_cmdline):
def _encrypt_all_payloads_inline(message: email.message.Message, gpg_to_cmdline):
# This breaks cascaded MIME messages. Blame PGP/INLINE.
encrypted_payloads = list()
@ -311,55 +311,58 @@ def _encrypt_all_payloads_mime(message: email.message.Message, gpg_to_cmdline):
encrypted_part.set_param('inline', "", 'Content-Disposition')
encrypted_part.set_param('filename', "encrypted.asc", 'Content-Disposition')
message.preamble = "This is an OpenPGP/MIME encrypted message (RFC 2440 and 3156)"
boundary = _make_boundary()
if isinstance(message.get_payload(), str):
# WTF! It seems to swallow the first line. Not sure why. Perhaps
# it's skipping an imaginary blank line someplace. (ie skipping a header)
# Workaround it here by prepending a blank line.
# This happens only on text only messages.
additionalSubHeader = ""
encoding = sys.getdefaultencoding()
if 'Content-Type' in message and not message['Content-Type'].startswith('multipart'):
additionalSubHeader = "Content-Type: " + message['Content-Type'] + text.EOL
encoding = message.get_content_charset(sys.getdefaultencoding())
LOG.debug(f"Identified encoding as {encoding}")
encrypted_part.set_payload(additionalSubHeader+text.EOL + message.get_payload(decode=True).decode(encoding))
check_nested = True
msg_copy = copy.deepcopy(message)
copy_encrypted = _encrypt_payload(msg_copy, gpg_to_cmdline, True)
encrypted_part.set_payload(copy_encrypted.get_payload())
_set_content_type(message, boundary)
return [pgp_ver_part, encrypted_part]
else:
processed_payloads = _generate_message_from_payloads(message)
encrypted_part.set_payload(processed_payloads.as_string())
check_nested = False
message.preamble = "This is an OpenPGP/MIME encrypted message (RFC 2440 and 3156)"
_set_content_type(message, boundary)
# Use this just to generate a MIME boundary string.
return [pgp_ver_part, _encrypt_payload(encrypted_part, gpg_to_cmdline, False)]
def _make_boundary():
junk_msg = MIMEMultipart()
_ = junk_msg.as_string() # WTF! Without this, get_boundary() will return 'None'!
boundary = junk_msg.get_boundary()
return junk_msg.get_boundary()
# This also modifies the boundary in the body of the message, ie it gets parsed.
def _set_content_type(message: email.message.Message, boundary: str):
if 'Content-Type' in message:
message.replace_header('Content-Type', _multipart_encrypted_with_boundary(boundary)+text.EOL)
else:
message['Content-Type'] = _multipart_encrypted_with_boundary(boundary)+text.EOL
return [pgp_ver_part, _encrypt_payload(encrypted_part, gpg_to_cmdline, check_nested)]
def _multipart_encrypted_with_boundary(boundary):
return f"multipart/encrypted; protocol=\"application/pgp-encrypted\"; boundary=\"{boundary}\""
def _encrypt_payload(payload, gpg_to_cmdline, check_nested=True):
def _encrypt_payload(payload: email.message.Message, recipients, check_nested=True, **kwargs):
raw_payload = raw_data_manager.get_content(payload)
LOG.debug('Got raw payload: %s (%s); original was: %s', raw_payload, type(raw_payload), payload)
LOG.debug('Got raw_payload (%s; %s): %s. Message: %s',
type(raw_payload),
payload.get_content_type(),
raw_payload,
payload)
if check_nested and text.is_payload_pgp_inline(raw_payload):
LOG.debug("Message is already pgp encrypted. No nested encryption needed.")
return payload
# No check is needed for conf.get_item('gpg', 'keyhome') as this is already
# done in method gpg_encrypt
gpg = GnuPG.GPGEncryptor(conf.get_item('gpg', 'keyhome'), gpg_to_cmdline,
payload.get_content_charset())
gpg = _make_encryptor(raw_payload, recipients)
gpg.update(raw_payload)
encrypted_data, returncode = gpg.encrypt()
LOG.debug("Return code from encryption=%d (0 indicates success)." % returncode)
@ -368,19 +371,41 @@ def _encrypt_payload(payload, gpg_to_cmdline, check_nested=True):
isAttachment = payload.get_param('attachment', None, 'Content-Disposition') is not None
if isAttachment:
filename = payload.get_filename()
if filename:
pgpFilename = filename + ".pgp"
if not (payload.get('Content-Disposition') is None):
payload.set_param('filename', pgpFilename, 'Content-Disposition')
if not (payload.get('Content-Type') is None) and not (payload.get_param('name') is None):
payload.set_param('name', pgpFilename)
_append_gpg_extension(payload)
if not (payload.get('Content-Transfer-Encoding') is None):
payload.replace_header('Content-Transfer-Encoding', "7bit")
return payload
def _make_encryptor(raw_data, recipients):
# No check is needed for conf.get_item('gpg', 'keyhome') as this is already
# done in method gpg_encrypt
keyhome = conf.get_item('gpg', 'keyhome')
if isinstance(raw_data, str):
return GnuPG.GPGEncryptor(keyhome, recipients, 'utf-8')
else:
return GnuPG.GPGEncryptor(keyhome, recipients)
def _append_gpg_extension(attachment):
filename = attachment.get_filename()
if not filename:
return
pgpFilename = filename + ".pgp"
# Attachment name can come from one of two places: Content-Disposition or
# Content-Type header, hence the two cases below.
if not (attachment.get('Content-Disposition') is None):
attachment.set_param('filename', pgpFilename, 'Content-Disposition')
if not (attachment.get('Content-Type') is None) and not (attachment.get_param('name') is None):
attachment.set_param('name', pgpFilename)
def _smime_encrypt(raw_message, recipients):
global LOG
global from_addr