Start using Content Manager

Also:
- Pass text to Popen in GnuPG (used to be bytes).
- Make is_payload_pgp_inline type-agnostic (str / bytes).
This commit is contained in:
Piotr F. Mieszkowski 2023-02-16 20:16:05 +01:00
parent d2ed4a9cee
commit 5e408259c0
3 changed files with 27 additions and 9 deletions

View File

@ -156,7 +156,7 @@ class GPGEncryptor:
def __init__(self, keyhome, recipients=None, charset=None):
"""Initialise the wrapper."""
self._keyhome = keyhome
self._message = b''
self._message = None
self._recipients = list()
self._charset = charset
if recipients is not None:
@ -164,11 +164,15 @@ class GPGEncryptor:
def update(self, message):
"""Append MESSAGE to buffer about to be encrypted."""
self._message += message
if self._message is None:
self._message = message
else:
self._message += message
def encrypt(self):
"""Feed GnuPG with the message."""
p = subprocess.Popen(self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p = subprocess.Popen(self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True)
encdata, err = p.communicate(input=self._message)
if p.returncode != 0:
LOG.debug('Errors: %s', err)

View File

@ -28,6 +28,7 @@ import copy
import email
import email.message
import email.utils
from email.contentmanager import raw_data_manager
from email.policy import SMTPUTF8
import GnuPG
import os
@ -332,20 +333,25 @@ def _encrypt_all_payloads_mime(message: email.message.Message, gpg_to_cmdline):
# Use this just to generate a MIME boundary string.
junk_msg = MIMEMultipart()
junk_str = junk_msg.as_string() # WTF! Without this, get_boundary() will return 'None'!
_ = junk_msg.as_string() # WTF! Without this, get_boundary() will return 'None'!
boundary = junk_msg.get_boundary()
# This also modifies the boundary in the body of the message, ie it gets parsed.
if 'Content-Type' in message:
message.replace_header('Content-Type', f"multipart/encrypted; protocol=\"application/pgp-encrypted\"; boundary=\"{boundary}\""+text.EOL)
message.replace_header('Content-Type', _multipart_encrypted_with_boundary(boundary)+text.EOL)
else:
message['Content-Type'] = f"multipart/encrypted; protocol=\"application/pgp-encrypted\"; boundary=\"{boundary}\""+text.EOL
message['Content-Type'] = _multipart_encrypted_with_boundary(boundary)+text.EOL
return [pgp_ver_part, _encrypt_payload(encrypted_part, gpg_to_cmdline, check_nested)]
def _multipart_encrypted_with_boundary(boundary):
return f"multipart/encrypted; protocol=\"application/pgp-encrypted\"; boundary=\"{boundary}\""
def _encrypt_payload(payload, gpg_to_cmdline, check_nested=True):
raw_payload = payload.get_payload(decode=True)
raw_payload = raw_data_manager.get_content(payload)
LOG.debug('Got raw payload: %s (%s); original was: %s', raw_payload, type(raw_payload), payload)
if check_nested and text.is_payload_pgp_inline(raw_payload):
LOG.debug("Message is already pgp encrypted. No nested encryption needed.")
return payload

View File

@ -14,6 +14,9 @@ DOUBLE_EOL_BYTES = EOL_BYTES*2
PGP_INLINE_BEGIN = EOL_BYTES + b"-----BEGIN PGP MESSAGE-----" + EOL_BYTES
PGP_INLINE_END = EOL_BYTES + b"-----END PGP MESSAGE-----" + EOL_BYTES
PGP_INLINE_BEGIN_S = EOL + "-----BEGIN PGP MESSAGE-----" + EOL
PGP_INLINE_END_S = EOL + "-----END PGP MESSAGE-----" + EOL
LOG = logging.getLogger(__name__)
@ -73,9 +76,14 @@ def choose_sanitizer(mail_case_insensitive: bool):
return _lowercase_domain_only
def is_payload_pgp_inline(payload: bytes) -> bool:
def is_payload_pgp_inline(payload) -> bool:
"""Find out if the payload (bytes) contains PGP/inline markers."""
return PGP_INLINE_BEGIN in payload and PGP_INLINE_END in payload
if isinstance(payload, bytes):
return PGP_INLINE_BEGIN in payload and PGP_INLINE_END in payload
elif isinstance(payload, str):
return PGP_INLINE_BEGIN_S in payload and PGP_INLINE_END_S in payload
else:
raise TypeError('Expected str or bytes')
def is_message_pgp_inline(message: Message) -> bool: