Reformat GnuPG module

This commit is contained in:
Piotr F. Mieszkowski 2022-09-22 22:05:31 +02:00 committed by Gitea
parent a5bcf2d9b2
commit 8963eee47f
2 changed files with 145 additions and 117 deletions

View File

@ -1,22 +1,24 @@
# #
# gpg-mailgate # gpg-mailgate
# #
# This file is part of the gpg-mailgate source code. # This file is part of the gpg-mailgate source code.
# #
# gpg-mailgate is free software: you can redistribute it and/or modify # gpg-mailgate is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by # it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or # the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version. # (at your option) any later version.
# #
# gpg-mailgate source code is distributed in the hope that it will be useful, # gpg-mailgate source code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of # but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details. # GNU General Public License for more details.
# #
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>. # along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
# #
"""GnuPG wrapper module."""
import os import os
import os.path import os.path
import subprocess import subprocess
@ -24,6 +26,7 @@ import shutil
import random import random
import string import string
import sys import sys
import logging
LINE_FINGERPRINT = 'fpr' LINE_FINGERPRINT = 'fpr'
@ -31,136 +34,161 @@ LINE_USER_ID = 'uid'
POS_FINGERPRINT = 9 POS_FINGERPRINT = 9
def build_command(key_home, *args, **kwargs): LOG = logging.getLogger(__name__)
cmd = ["gpg", '--homedir', key_home] + list(args)
return cmd
def public_keys( keyhome ):
cmd = build_command(keyhome, '--list-keys', '--with-colons')
p = subprocess.Popen( cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
p.wait()
keys = dict() def _build_command(key_home, *args, **kwargs):
fingerprint = None cmd = ["gpg", '--homedir', key_home] + list(args)
email = None return cmd
for line in p.stdout.readlines():
line = line.decode(sys.getdefaultencoding())
if line[0:3] == LINE_FINGERPRINT: def public_keys(keyhome):
fingerprint = line.split(':')[POS_FINGERPRINT] """List public keys from keyring KEYHOME."""
if line[0:3] == LINE_USER_ID: cmd = _build_command(keyhome, '--list-keys', '--with-colons')
if ('<' not in line or '>' not in line): p = subprocess.Popen(cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
continue p.wait()
email = line.split('<')[1].split('>')[0]
if not (fingerprint is None or email is None): keys = dict()
keys[fingerprint] = email fingerprint = None
fingerprint = None email = None
email = None for line in p.stdout.readlines():
return keys line = line.decode(sys.getdefaultencoding())
if line[0:3] == LINE_FINGERPRINT:
fingerprint = line.split(':')[POS_FINGERPRINT]
if line[0:3] == LINE_USER_ID:
if ('<' not in line or '>' not in line):
continue
email = line.split('<')[1].split('>')[0]
if not (fingerprint is None or email is None):
keys[fingerprint] = email
fingerprint = None
email = None
return keys
def _to_bytes(s) -> bytes:
if isinstance(s, str):
return bytes(s, sys.getdefaultencoding())
else:
return s
def to_bytes(s) -> bytes:
if isinstance(s, str):
return bytes(s, sys.getdefaultencoding())
else:
return s
# Confirms a key has a given email address by importing it into a temporary # Confirms a key has a given email address by importing it into a temporary
# keyring. If this operation succeeds and produces a message mentioning the # keyring. If this operation succeeds and produces a message mentioning the
# expected email, a key is confirmed. # expected email, a key is confirmed.
def confirm_key( content, email ): def confirm_key(content, email):
tmpkeyhome = '' """Verify that the key CONTENT is assigned to identity EMAIL."""
content = to_bytes(content) tmpkeyhome = ''
expected_email = to_bytes(email.lower()) content = _to_bytes(content)
expected_email = _to_bytes(email.lower())
while True: while True:
tmpkeyhome = '/tmp/' + ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(12)) tmpkeyhome = '/tmp/' + ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(12))
if not os.path.exists(tmpkeyhome): if not os.path.exists(tmpkeyhome):
break break
# let only the owner access the directory, otherwise gpg would complain # let only the owner access the directory, otherwise gpg would complain
os.mkdir(tmpkeyhome, mode=0o700) os.mkdir(tmpkeyhome, mode=0o700)
localized_env = os.environ.copy() localized_env = os.environ.copy()
localized_env["LANG"] = "C" localized_env["LANG"] = "C"
p = subprocess.Popen( build_command(tmpkeyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=localized_env ) p = subprocess.Popen(_build_command(tmpkeyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=localized_env)
result = p.communicate(input=content)[1] result = p.communicate(input=content)[1]
confirmed = False confirmed = False
for line in result.split(b"\n"): for line in result.split(b"\n"):
if b'imported' in line and b'<' in line and b'>' in line: if b'imported' in line and b'<' in line and b'>' in line:
if line.split(b'<')[1].split(b'>')[0].lower() == expected_email: if line.split(b'<')[1].split(b'>')[0].lower() == expected_email:
confirmed = True confirmed = True
break break
else: else:
break # confirmation failed break # confirmation failed
# cleanup # cleanup
shutil.rmtree(tmpkeyhome) shutil.rmtree(tmpkeyhome)
return confirmed
return confirmed
# adds a key and ensures it has the given email address # adds a key and ensures it has the given email address
def add_key( keyhome, content ): def add_key(keyhome, content):
if isinstance(content, str): """Register new key CONTENT in the keyring KEYHOME."""
content = bytes(content, sys.getdefaultencoding()) if isinstance(content, str):
p = subprocess.Popen( build_command(keyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) content = bytes(content, sys.getdefaultencoding())
p.communicate(input=content) p = subprocess.Popen(_build_command(keyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.wait() p.communicate(input=content)
p.wait()
def delete_key( keyhome, email ):
from email.utils import parseaddr
result = parseaddr(email)
if result[1]: def delete_key(keyhome, email):
# delete all keys matching this email address """Remove key assigned to identity EMAIL from keyring KEYHOME."""
p = subprocess.Popen( build_command(keyhome, '--delete-key', '--batch', '--yes', result[1]), stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) from email.utils import parseaddr
p.wait() result = parseaddr(email)
return True
if result[1]:
# delete all keys matching this email address
p = subprocess.Popen(_build_command(keyhome, '--delete-key', '--batch', '--yes', result[1]), stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.wait()
return True
return False
return False
class GPGEncryptor: class GPGEncryptor:
def __init__(self, keyhome, recipients = None, charset = None): """A wrapper for 'gpg -e' command."""
self._keyhome = keyhome
self._message = b''
self._recipients = list()
self._charset = charset
if recipients != None:
self._recipients.extend(recipients)
def update(self, message): def __init__(self, keyhome, recipients=None, charset=None):
self._message += message """Initialise the wrapper."""
self._keyhome = keyhome
self._message = b''
self._recipients = list()
self._charset = charset
if recipients is not None:
self._recipients.extend(recipients)
def encrypt(self): def update(self, message):
p = subprocess.Popen( self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) """Append MESSAGE to buffer about to be encrypted."""
encdata = p.communicate(input=self._message)[0] self._message += message
return (encdata, p.returncode)
def _command(self): def encrypt(self):
cmd = build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--pgp7", "--no-secmem-warning", "-a", "-e") """Feed GnuPG with the message."""
p = subprocess.Popen(self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
encdata = p.communicate(input=self._message)[0]
return (encdata, p.returncode)
# add recipients def _command(self):
for recipient in self._recipients: cmd = _build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--pgp7", "--no-secmem-warning", "-a", "-e")
cmd.append("-r")
cmd.append(recipient)
# add on the charset, if set # add recipients
if self._charset: for recipient in self._recipients:
cmd.append("--comment") cmd.append("-r")
cmd.append('Charset: ' + self._charset) cmd.append(recipient)
# add on the charset, if set
if self._charset:
cmd.append("--comment")
cmd.append('Charset: ' + self._charset)
LOG.debug(f'Built command: {cmd!r}')
return cmd
return cmd
class GPGDecryptor: class GPGDecryptor:
def __init__(self, keyhome): """A wrapper for 'gpg -d' command."""
self._keyhome = keyhome
self._message = ''
def update(self, message): def __init__(self, keyhome):
self._message += message """Initialise the wrapper."""
self._keyhome = keyhome
self._message = ''
def decrypt(self): def update(self, message):
p = subprocess.Popen( self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) """Append encrypted content to be decrypted."""
decdata = p.communicate(input=self._message)[0] self._message += message
return (decdata, p.returncode)
def _command(self): def decrypt(self):
return build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--no-secmem-warning", "-a", "-d") """Decrypt the message."""
p = subprocess.Popen(self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
decdata = p.communicate(input=self._message)[0]
return (decdata, p.returncode)
def _command(self):
return _build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--no-secmem-warning", "-a", "-d")

View File

@ -4,11 +4,11 @@ import unittest
class GnuPGUtilitiesTest(unittest.TestCase): class GnuPGUtilitiesTest(unittest.TestCase):
def test_build_default_command(self): def test_build_default_command(self):
cmd = GnuPG.build_command("test/keyhome") cmd = GnuPG._build_command("test/keyhome")
self.assertEqual(cmd, ["gpg", "--homedir", "test/keyhome"]) self.assertEqual(cmd, ["gpg", "--homedir", "test/keyhome"])
def test_build_command_extended_with_args(self): def test_build_command_extended_with_args(self):
cmd = GnuPG.build_command("test/keyhome", "--foo", "--bar") cmd = GnuPG._build_command("test/keyhome", "--foo", "--bar")
self.assertEqual(cmd, ["gpg", "--homedir", "test/keyhome", "--foo", "--bar"]) self.assertEqual(cmd, ["gpg", "--homedir", "test/keyhome", "--foo", "--bar"])