GnuPG: clean up and collect more diagnostic info
- Use regular expressions instead of finding particular characters in gnupg output to decide whether confirmation line was found. - Use tempfile.mkdtemp to create secure temporary directories. - Record information about the key considered by GnuPG. When missing in exception, it means no key was found.
This commit is contained in:
parent
6c114b6dcd
commit
624a335a41
|
@ -27,6 +27,9 @@ import random
|
|||
import string
|
||||
import sys
|
||||
import logging
|
||||
import re
|
||||
import tempfile
|
||||
from email.utils import parseaddr
|
||||
|
||||
|
||||
LINE_FINGERPRINT = 'fpr'
|
||||
|
@ -36,6 +39,8 @@ POS_FINGERPRINT = 9
|
|||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
RX_CONFIRM = re.compile(br'key "([^"]+)" imported')
|
||||
|
||||
|
||||
class EncryptionException(Exception):
|
||||
"""Represents a failure to encrypt a payload."""
|
||||
|
@ -94,32 +99,24 @@ def _to_bytes(s) -> bytes:
|
|||
# Confirms a key has a given email address by importing it into a temporary
|
||||
# keyring. If this operation succeeds and produces a message mentioning the
|
||||
# expected email, a key is confirmed.
|
||||
def confirm_key(content, email):
|
||||
def confirm_key(content, email: str):
|
||||
"""Verify that the key CONTENT is assigned to identity EMAIL."""
|
||||
tmpkeyhome = ''
|
||||
content = _to_bytes(content)
|
||||
expected_email = _to_bytes(email.lower())
|
||||
expected_email = email.lower()
|
||||
|
||||
while True:
|
||||
tmpkeyhome = '/tmp/' + ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(12))
|
||||
if not os.path.exists(tmpkeyhome):
|
||||
break
|
||||
tmpkeyhome = tempfile.mkdtemp()
|
||||
|
||||
# let only the owner access the directory, otherwise gpg would complain
|
||||
os.mkdir(tmpkeyhome, mode=0o700)
|
||||
localized_env = os.environ.copy()
|
||||
localized_env["LANG"] = "C"
|
||||
p = subprocess.Popen(_build_command(tmpkeyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=localized_env)
|
||||
result = p.communicate(input=content)[1]
|
||||
confirmed = False
|
||||
|
||||
for line in result.split(b"\n"):
|
||||
if b'imported' in line and b'<' in line and b'>' in line:
|
||||
if line.split(b'<')[1].split(b'>')[0].lower() == expected_email:
|
||||
confirmed = True
|
||||
break
|
||||
else:
|
||||
break # confirmation failed
|
||||
for line in result.split(b'\n'):
|
||||
found = RX_CONFIRM.search(line)
|
||||
if found:
|
||||
(_, extracted_email) = parseaddr(found.group(1).decode())
|
||||
confirmed = (extracted_email == expected_email)
|
||||
|
||||
# cleanup
|
||||
shutil.rmtree(tmpkeyhome)
|
||||
|
@ -139,7 +136,6 @@ def add_key(keyhome, content):
|
|||
|
||||
def delete_key(keyhome, email):
|
||||
"""Remove key assigned to identity EMAIL from keyring KEYHOME."""
|
||||
from email.utils import parseaddr
|
||||
result = parseaddr(email)
|
||||
|
||||
if result[1]:
|
||||
|
@ -149,6 +145,7 @@ def delete_key(keyhome, email):
|
|||
p.wait()
|
||||
return True
|
||||
|
||||
LOG.warn('Failed to parse email before deleting key: %s', email)
|
||||
return False
|
||||
|
||||
|
||||
|
@ -243,6 +240,8 @@ KEY_EXPIRED = b'KEYEXPIRED'
|
|||
KEY_REVOKED = b'KEYREVOKED'
|
||||
NO_RECIPIENTS = b'NO_RECP'
|
||||
INVALID_RECIPIENT = b'INV_RECP'
|
||||
KEY_CONSIDERED = b'KEY_CONSIDERED'
|
||||
NOAVAIL = b'n/a'
|
||||
|
||||
# INV_RECP reason code descriptions.
|
||||
INVALID_RECIPIENT_CAUSES = [
|
||||
|
@ -271,7 +270,7 @@ def parse_status(status_buffer: str) -> dict:
|
|||
|
||||
def parse_status_lines(lines: list) -> dict:
|
||||
"""Parse --status-fd output and return important information."""
|
||||
result = {'issue': 'n/a', 'recipient': 'n/a', 'cause': 'Unknown'}
|
||||
result = {'issue': NOAVAIL, 'recipient': NOAVAIL, 'cause': 'Unknown', 'key': NOAVAIL}
|
||||
|
||||
LOG.debug('Processing stderr lines %s', lines)
|
||||
|
||||
|
@ -286,6 +285,8 @@ def parse_status_lines(lines: list) -> dict:
|
|||
result['issue'] = KEY_REVOKED
|
||||
elif line.startswith(NO_RECIPIENTS, STATUS_FD_PREFIX_LEN):
|
||||
result['issue'] = NO_RECIPIENTS
|
||||
elif line.startswith(KEY_CONSIDERED, STATUS_FD_PREFIX_LEN):
|
||||
result['key'] = line.split(b' ')[2]
|
||||
elif line.startswith(INVALID_RECIPIENT, STATUS_FD_PREFIX_LEN):
|
||||
words = line.split(b' ')
|
||||
reason_code = int(words[2])
|
||||
|
|
|
@ -56,10 +56,24 @@ class GnuPGUtilitiesTest(unittest.TestCase):
|
|||
[GNUPG:] INV_RECP 0 name@domain
|
||||
[GNUPG:] FAILURE encrypt 1
|
||||
"""
|
||||
|
||||
result = GnuPG.parse_status(key_expired)
|
||||
self.assertEqual(result['issue'], b'KEYEXPIRED')
|
||||
self.assertEqual(result['recipient'], b'name@domain')
|
||||
self.assertEqual(result['cause'], 'No specific reason given')
|
||||
self.assertEqual(result['key'], b'XXXXXXXXXXXXX')
|
||||
|
||||
def test_parse_statusfd_key_absent(self):
|
||||
non_specific_errors = b"""
|
||||
[GNUPG:] INV_RECP 0 name@domain
|
||||
[GNUPG:] FAILURE encrypt 1
|
||||
"""
|
||||
|
||||
result = GnuPG.parse_status(non_specific_errors)
|
||||
self.assertEqual(result['issue'], b'n/a')
|
||||
self.assertEqual(result['recipient'], b'name@domain')
|
||||
self.assertEqual(result['cause'], 'No specific reason given')
|
||||
self.assertEqual(result['key'], b'n/a')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
Loading…
Reference in New Issue