Move recipient-processing code to a dedicated module

This commit is contained in:
Piotr F. Mieszkowski 2023-04-09 21:47:58 +02:00
parent 5f5b374f84
commit ff6e0bfbdd
4 changed files with 209 additions and 192 deletions

View File

@ -42,87 +42,19 @@ import logging
import lacre.text as text
import lacre.config as conf
import lacre.keyring as kcache
import lacre.recipients as recpt
from lacre.mailop import KeepIntact, InlineOpenPGPEncrypt, MimeOpenPGPEncrypt
LOG = logging.getLogger(__name__)
class GpgRecipient:
"""A tuple-like object that contains GPG recipient data."""
def __init__(self, left, right):
"""Initialise a tuple-like object that contains GPG recipient data."""
self._left = left
self._right = right
def __getitem__(self, index):
"""Pretend this object is a tuple by returning an indexed tuple element."""
if index == 0:
return self._left
elif index == 1:
return self._right
else:
raise IndexError()
def __repr__(self):
"""Return textual representation of this GPG Recipient."""
return f"GpgRecipient({self._left!r}, {self._right!r})"
def email(self):
"""Return this recipient's email address."""
return self._left
def key(self):
"""Return this recipient's key ID."""
return self._right
class RecipientList:
"""Encalsulates two lists of recipients.
First list contains addresses, the second - GPG identities.
"""
def __init__(self, recipients=[], keys=[]):
"""Initialise lists of recipients and identities."""
self._recipients = [GpgRecipient(email, key) for (email, key) in zip(recipients, keys)]
def emails(self):
"""Return list of recipients."""
LOG.debug('Recipient emails from: %s', self._recipients)
return [r.email() for r in self._recipients]
def keys(self):
"""Return list of GPG identities."""
LOG.debug('Recipient keys from: %s', self._recipients)
return [r.key() for r in self._recipients]
def __iadd__(self, recipient: GpgRecipient):
"""Append a recipient."""
LOG.debug('Adding %s to %s', recipient, self._recipients)
self._recipients.append(recipient)
LOG.debug('Added; got: %s', self._recipients)
return self
def __len__(self):
"""Provide len().
With this method, it is possible to write code like:
rl = RecipientList()
if rl:
# do something
"""
return len(self._recipients)
def _gpg_encrypt(raw_message, recipients):
if not conf.config_item_set('gpg', 'keyhome'):
LOG.error("No valid entry for gpg keyhome. Encryption aborted.")
return recipients
gpg_recipients, cleartext_recipients = _identify_gpg_recipients(recipients, _load_keys())
gpg_recipients, cleartext_recipients = recpt.identify_gpg_recipients(recipients, _load_keys())
LOG.info(f"Got addresses: gpg_to={gpg_recipients!r}, ungpg_to={cleartext_recipients!r}")
@ -147,10 +79,7 @@ def _gpg_encrypt(raw_message, recipients):
return cleartext_recipients
def _sort_gpg_recipients(gpg_to) -> Tuple[RecipientList, RecipientList]:
mime = RecipientList()
inline = RecipientList()
def _sort_gpg_recipients(gpg_to) -> Tuple[recpt.RecipientList, recpt.RecipientList]:
recipients_mime = list()
keys_mime = list()
@ -164,11 +93,9 @@ def _sort_gpg_recipients(gpg_to) -> Tuple[RecipientList, RecipientList]:
if conf.config_item_equals('pgp_style', rcpt.email(), 'mime'):
recipients_mime.append(rcpt.email())
keys_mime.extend(rcpt.key().split(','))
mime += rcpt
elif conf.config_item_equals('pgp_style', rcpt.email(), 'inline'):
recipients_inline.append(rcpt.email())
keys_inline.extend(rcpt.key().split(','))
inline += rcpt
else:
# Log message only if an unknown style is defined
if conf.config_item_set('pgp_style', rcpt.email()):
@ -179,14 +106,12 @@ def _sort_gpg_recipients(gpg_to) -> Tuple[RecipientList, RecipientList]:
if default_to_pgp_mime:
recipients_mime.append(rcpt.email())
keys_mime.extend(rcpt.key().split(','))
mime += rcpt
else:
recipients_inline.append(rcpt.email())
keys_inline.extend(rcpt.key().split(','))
inline += rcpt
mime = RecipientList(recipients_mime, keys_mime)
inline = RecipientList(recipients_inline, keys_inline)
mime = recpt.RecipientList(recipients_mime, keys_mime)
inline = recpt.RecipientList(recipients_inline, keys_inline)
LOG.debug('Loaded recipients: MIME %s; Inline %s', repr(mime), repr(inline))
@ -232,103 +157,6 @@ def _load_keys():
return asyncio.run(keyring.freeze_identities())
def _identify_gpg_recipients(recipients, keys: kcache.KeyCache):
# This list will be filled with pairs (M, N), where M is the destination
# address we're going to deliver the message to and N is the identity we're
# going to encrypt it for.
gpg_recipients = list()
# This will be the list of recipients that haven't provided us with their
# public keys.
cleartext_recipients = list()
# In "strict mode", only keys included in configuration are used to encrypt
# email.
strict_mode = conf.strict_mode()
# GnuPG keys found in our keyring.
for to in recipients:
own_key = _try_configured_key(to, keys)
if own_key is not None:
gpg_recipients.append(GpgRecipient(own_key[0], own_key[1]))
continue
direct_key = _try_direct_key_lookup(to, keys, strict_mode)
if direct_key is not None:
gpg_recipients.append(GpgRecipient(direct_key[0], direct_key[1]))
continue
domain_key = _try_configured_domain_key(to, keys)
if domain_key is not None:
gpg_recipients.append(GpgRecipient(domain_key[0], domain_key[1]))
continue
cleartext_recipients.append(to)
LOG.debug('Collected recipients; GPG: %s; cleartext: %s', gpg_recipients, cleartext_recipients)
return gpg_recipients, cleartext_recipients
def _find_key(recipient, keys, strict_mode):
own_key = _try_configured_key(recipient, keys)
if own_key is not None:
return own_key
direct_key = _try_direct_key_lookup(recipient, keys, strict_mode)
if direct_key is not None:
return direct_key
domain_key = _try_configured_domain_key(recipient, keys)
if domain_key is not None:
return domain_key
return None
def _try_configured_key(recipient, keys):
if conf.config_item_set('enc_keymap', recipient):
key = conf.get_item('enc_keymap', recipient)
if key in keys:
LOG.debug(f"Found key {key} configured for {recipient}")
return (recipient, key)
LOG.debug(f"No configured key found for {recipient}")
return None
def _try_direct_key_lookup(recipient, keys, strict_mode):
if strict_mode:
return None
if keys.has_email(recipient):
LOG.info(f"Found key for {recipient}")
return recipient, recipient
(newto, topic) = text.parse_delimiter(recipient)
if keys.has_email(newto):
LOG.info(f"Found key for {newto}, stripped {recipient}")
return recipient, newto
return None
def _try_configured_domain_key(recipient, keys):
parts = recipient.split('@')
if len(parts) != 2:
return None
domain = parts[1]
if conf.config_item_set('enc_domain_keymap', domain):
domain_key = conf.get_item('enc_domain_keymap', domain)
if domain_key in keys:
LOG.debug(f"Found domain key {domain_key} for {recipient}")
return recipient, domain_key
LOG.debug(f"No domain key for {recipient}")
return None
def _encrypt_all_payloads_inline(message: EmailMessage, gpg_to_cmdline):
# This breaks cascaded MIME messages. Blame PGP/INLINE.
@ -628,7 +456,7 @@ def delivery_plan(recipients, message: EmailMessage, key_cache: kcache.KeyCache)
LOG.debug('Message is already encrypted: %s', message)
return [KeepIntact(recipients)]
gpg_recipients, cleartext_recipients = _identify_gpg_recipients(recipients, key_cache)
gpg_recipients, cleartext_recipients = recpt.identify_gpg_recipients(recipients, key_cache)
mime, inline = _sort_gpg_recipients(gpg_recipients)

186
lacre/recipients.py Normal file
View File

@ -0,0 +1,186 @@
#
# gpg-mailgate
#
# This file is part of the gpg-mailgate source code.
#
# gpg-mailgate is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# gpg-mailgate source code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
#
"""Recipient processing package.
Defines:
- GpgRecipient, wrapper for user's email and identity.
- RecipientList, a wrapper for lists of GpgRecipient objects.
"""
import logging
import lacre.config as conf
import lacre.keyring as kcache
import lacre.text as text
LOG = logging.getLogger(__name__)
class GpgRecipient:
"""A tuple-like object that contains GPG recipient data."""
def __init__(self, left, right):
"""Initialise a tuple-like object that contains GPG recipient data."""
self._left = left
self._right = right
def __getitem__(self, index):
"""Pretend this object is a tuple by returning an indexed tuple element."""
if index == 0:
return self._left
elif index == 1:
return self._right
else:
raise IndexError()
def __repr__(self):
"""Return textual representation of this GPG Recipient."""
return f"GpgRecipient({self._left!r}, {self._right!r})"
def email(self):
"""Return this recipient's email address."""
return self._left
def key(self):
"""Return this recipient's key ID."""
return self._right
class RecipientList:
"""Encalsulates two lists of recipients.
First list contains addresses, the second - GPG identities.
"""
def __init__(self, recipients=[], keys=[]):
"""Initialise lists of recipients and identities."""
self._recipients = [GpgRecipient(email, key) for (email, key) in zip(recipients, keys)]
def emails(self):
"""Return list of recipients."""
LOG.debug('Recipient emails from: %s', self._recipients)
return [r.email() for r in self._recipients]
def keys(self):
"""Return list of GPG identities."""
LOG.debug('Recipient keys from: %s', self._recipients)
return [r.key() for r in self._recipients]
def __iadd__(self, recipient: GpgRecipient):
"""Append a recipient."""
LOG.debug('Adding %s to %s', recipient, self._recipients)
self._recipients.append(recipient)
LOG.debug('Added; got: %s', self._recipients)
return self
def __len__(self):
"""Provide len().
With this method, it is possible to write code like:
rl = RecipientList()
if rl:
# do something
"""
return len(self._recipients)
def identify_gpg_recipients(recipients, keys: kcache.KeyCache):
"""Split recipient list into GPG and non-GPG ones."""
# This list will be filled with pairs (M, N), where M is the destination
# address we're going to deliver the message to and N is the identity we're
# going to encrypt it for.
gpg_recipients = list()
# This will be the list of recipients that haven't provided us with their
# public keys.
cleartext_recipients = list()
# In "strict mode", only keys included in configuration are used to encrypt
# email.
strict_mode = conf.strict_mode()
for recipient in recipients:
gpg_recipient = _find_key(recipient, keys, strict_mode)
if gpg_recipient is not None:
gpg_recipients.append(gpg_recipient)
else:
cleartext_recipients.append(recipient)
LOG.debug('Collected recipients; GPG: %s; cleartext: %s', gpg_recipients, cleartext_recipients)
return gpg_recipients, cleartext_recipients
def _find_key(recipient, keys, strict_mode):
own_key = _try_configured_key(recipient, keys)
if own_key is not None:
return GpgRecipient(own_key[0], own_key[1])
direct_key = _try_direct_key_lookup(recipient, keys, strict_mode)
if direct_key is not None:
return GpgRecipient(direct_key[0], direct_key[1])
domain_key = _try_configured_domain_key(recipient, keys)
if domain_key is not None:
return GpgRecipient(domain_key[0], domain_key[1])
return None
def _try_configured_key(recipient, keys):
if conf.config_item_set('enc_keymap', recipient):
key = conf.get_item('enc_keymap', recipient)
if key in keys:
LOG.debug(f"Found key {key} configured for {recipient}")
return (recipient, key)
LOG.debug(f"No configured key found for {recipient}")
return None
def _try_direct_key_lookup(recipient, keys, strict_mode):
if strict_mode:
return None
if keys.has_email(recipient):
LOG.info(f"Found key for {recipient}")
return recipient, recipient
(newto, topic) = text.parse_delimiter(recipient)
if keys.has_email(newto):
LOG.info(f"Found key for {newto}, stripped {recipient}")
return recipient, newto
return None
def _try_configured_domain_key(recipient, keys):
parts = recipient.split('@')
if len(parts) != 2:
return None
domain = parts[1]
if conf.config_item_set('enc_domain_keymap', domain):
domain_key = conf.get_item('enc_domain_keymap', domain)
if domain_key in keys:
LOG.debug(f"Found domain key {domain_key} for {recipient}")
return recipient, domain_key
LOG.debug(f"No domain key for {recipient}")
return None

View File

@ -38,17 +38,3 @@ class LacreCoreTest(unittest.TestCase):
self.assertFalse('Subject' in rewrapped)
self.assertEqual(rewrapped.get_content_type(), m.get_content_type())
class RecipientListTest(unittest.TestCase):
def test_addition(self):
a_list = lacre.core.RecipientList()
a_list += lacre.core.GpgRecipient(
'alice@disposlab',
'1CD245308F0963D038E88357973CF4D9387C44D7')
emails = [x for x in a_list.emails()]
keys = [x for x in a_list.keys()]
self.assertSequenceEqual(emails, ['alice@disposlab'])
self.assertSequenceEqual(keys, ['1CD245308F0963D038E88357973CF4D9387C44D7'])

View File

@ -0,0 +1,17 @@
import lacre.recipients
import unittest
class RecipientListTest(unittest.TestCase):
def test_addition(self):
a_list = lacre.recipients.RecipientList()
a_list += lacre.recipients.GpgRecipient(
'alice@disposlab',
'1CD245308F0963D038E88357973CF4D9387C44D7')
emails = [x for x in a_list.emails()]
keys = [x for x in a_list.keys()]
self.assertSequenceEqual(emails, ['alice@disposlab'])
self.assertSequenceEqual(keys, ['1CD245308F0963D038E88357973CF4D9387C44D7'])