diff --git a/.gitignore b/.gitignore index 7808c4b..140af95 100644 --- a/.gitignore +++ b/.gitignore @@ -26,6 +26,18 @@ pip-log.txt .tox nosetests.xml +# GPG-Mailgate test files +test/logs +test/tmp +test/gpg-mailgate.conf +test/keyhome/random_seed + +# Emacs files +*~ +TAGS +TAGS-Python +TAGS-PHP + # Translations *.mo diff --git a/GnuPG/__init__.py b/GnuPG/__init__.py index c9bbee0..8bcfebd 100644 --- a/GnuPG/__init__.py +++ b/GnuPG/__init__.py @@ -23,9 +23,20 @@ import subprocess import shutil import random import string +import sys + + +LINE_FINGERPRINT = 'fpr' +LINE_USER_ID = 'uid' + +POS_FINGERPRINT = 9 + +def build_command(key_home, *args, **kwargs): + cmd = ["gpg", '--homedir', key_home] + list(args) + return cmd def private_keys( keyhome ): - cmd = ['/usr/bin/gpg', '--homedir', keyhome, '--list-secret-keys', '--with-colons'] + cmd = build_command(keyhome, '--list-secret-keys', '--with-colons') p = subprocess.Popen( cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) p.wait() keys = dict() @@ -39,17 +50,25 @@ def private_keys( keyhome ): return keys def public_keys( keyhome ): - cmd = ['/usr/bin/gpg', '--homedir', keyhome, '--list-keys', '--with-colons'] + cmd = build_command(keyhome, '--list-keys', '--with-colons') p = subprocess.Popen( cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) p.wait() + keys = dict() + fingerprint = None + email = None for line in p.stdout.readlines(): - if line[0:3] == 'uid' or line[0:3] == 'pub': + line = line.decode('utf-8') + if line[0:3] == LINE_FINGERPRINT: + fingerprint = line.split(':')[POS_FINGERPRINT] + if line[0:3] == LINE_USER_ID: if ('<' not in line or '>' not in line): continue email = line.split('<')[1].split('>')[0] - fingerprint = line.split(':')[4] + if not (fingerprint is None or email is None): keys[fingerprint] = email + fingerprint = None + email = None return keys # confirms a key has a given email address @@ -64,7 +83,7 @@ def confirm_key( content, email ): os.mkdir(tmpkeyhome) localized_env = os.environ.copy() localized_env["LANG"] = "C" - p = subprocess.Popen( ['/usr/bin/gpg', '--homedir', tmpkeyhome, '--import', '--batch'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=localized_env ) + p = subprocess.Popen( build_command(tmpkeyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=localized_env ) result = p.communicate(input=content)[1] confirmed = False @@ -83,7 +102,7 @@ def confirm_key( content, email ): # adds a key and ensures it has the given email address def add_key( keyhome, content ): - p = subprocess.Popen( ['/usr/bin/gpg', '--homedir', keyhome, '--import', '--batch'], stdin=subprocess.PIPE, stdout=subprocess.PIPE,stderr=subprocess.PIPE ) + p = subprocess.Popen( build_command(keyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) p.communicate(input=content) p.wait() @@ -93,7 +112,7 @@ def delete_key( keyhome, email ): if result[1]: # delete all keys matching this email address - p = subprocess.Popen( ['/usr/bin/gpg', '--homedir', keyhome, '--delete-key', '--batch', '--yes', result[1]], stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) + p = subprocess.Popen( build_command(keyhome, '--delete-key', '--batch', '--yes', result[1]), stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) p.wait() return True @@ -102,7 +121,7 @@ def delete_key( keyhome, email ): class GPGEncryptor: def __init__(self, keyhome, recipients = None, charset = None): self._keyhome = keyhome - self._message = '' + self._message = b'' self._recipients = list() self._charset = charset if recipients != None: @@ -112,12 +131,12 @@ class GPGEncryptor: self._message += message def encrypt(self): - p = subprocess.Popen( self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE,stderr=subprocess.PIPE ) + p = subprocess.Popen( self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) encdata = p.communicate(input=self._message)[0] return (encdata, p.returncode) def _command(self): - cmd = ["/usr/bin/gpg", "--trust-model", "always", "--homedir", self._keyhome, "--batch", "--yes", "--pgp7", "--no-secmem-warning", "-a", "-e"] + cmd = build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--pgp7", "--no-secmem-warning", "-a", "-e") # add recipients for recipient in self._recipients: @@ -140,11 +159,9 @@ class GPGDecryptor: self._message += message def decrypt(self): - p = subprocess.Popen( self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE,stderr=subprocess.PIPE ) + p = subprocess.Popen( self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) decdata = p.communicate(input=self._message)[0] return (decdata, p.returncode) def _command(self): - cmd = ["/usr/bin/gpg", "--trust-model", "always", "--homedir", self._keyhome, "--batch", "--yes", "--no-secmem-warning", "-a", "-d"] - - return cmd \ No newline at end of file + return build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--no-secmem-warning", "-a", "-d") diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..9c4ce7b --- /dev/null +++ b/Makefile @@ -0,0 +1,42 @@ +.POSIX: +.PHONY: test unittest pre-clean clean + +# +# On systems where Python 3.x binary has a different name, just +# overwrite the name/path on the command line, like: +# +# make test PYTHON=/usr/local/bin/python3.8 +# +# This marco is passed via environment to test/e2e_test.py, where it's +# used to compute further commands. +# +PYTHON = python3 + +# +# Run a set of end-to-end tests. +# +# Test scenarios are described and configured by the test/e2e.ini +# file. Basically this is just a script that feeds GPG Mailgate with +# known input and checks whether output meets expectations. +# +test: test/tmp test/logs pre-clean + $(PYTHON) test/e2e_test.py + +# +# Run unit tests +# +unittest: + $(PYTHON) -m unittest discover -s test + +pre-clean: + rm -fv test/gpg-mailgate.conf + rm -f test/logs/*.log + +test/tmp: + mkdir test/tmp + +test/logs: + mkdir test/logs + +clean: pre-clean + rm -rfv test/tmp test/logs diff --git a/doc/testing.md b/doc/testing.md new file mode 100644 index 0000000..d018770 --- /dev/null +++ b/doc/testing.md @@ -0,0 +1,40 @@ +# Testing + +First tests have been set up to cover GPG Mailgate with at least basic test +that would be easy to run. The tests are called "end-to-end", meaning that we +feed some input to GPG Mailgate and inspect the output. + +## Running tests + +To run tests, use command `make test` or `make unittest`. + +Tests produce some helpful logs, so inspect contents of `test/logs` directory +if something goes wrong. + +If your system's Python binary isn't found in your `$PATH` or you want to use +a specific binary, use make's macro overriding: `make test +PYTHON=/path/to/python`. + +## Key building blocks + +- *Test Script* (`test/e2e_test.py`) that orchestrates the other components. + It performs test cases described in the *Test Configuration*. It spawns + *Test Mail Relay* and *GPG Mailgate* in appropriate order. +- *Test Mail Relay* (`test/relay.py`), a simplistic mail daemon that only + supports the happy path. It accepts a mail message and prints it to + stdandard output. +- *Test Configuration* (`test/e2e.ini`) specifies test cases: their input, + expected results and helpful documentation. It also specifies the port that + the *Test Mail Relay* should listen on. + +## Limitations + +Currently tests only check if the message has been encrypted, without +verifying that the correct key has been used. That's because we don't know +(yet) how to have a reproducible encrypted message. Option +`--faked-system-time` wasn't enough to produce identical output. + +## Troubleshooting + +When things go wrong, be sure to study `test/logs/e2e.log` and +`test/logs/gpg-mailgate.log` files -- they contain some useful information. diff --git a/gpg-mailgate-web/cron.py b/gpg-mailgate-web/cron.py index 3f305ba..98cd8d1 100644 --- a/gpg-mailgate-web/cron.py +++ b/gpg-mailgate-web/cron.py @@ -19,7 +19,7 @@ # along with gpg-mailgate source code. If not, see . # -from ConfigParser import RawConfigParser +from configparser import RawConfigParser import GnuPG import MySQLdb import smtplib @@ -64,7 +64,7 @@ for sect in _cfg.sections(): for (name, value) in _cfg.items(sect): cfg[sect][name] = value -if cfg.has_key('database') and cfg['database'].has_key('enabled') and cfg['database']['enabled'] == 'yes' and cfg['database'].has_key('name') and cfg['database'].has_key('host') and cfg['database'].has_key('username') and cfg['database'].has_key('password'): +if 'database' in cfg and 'enabled' in cfg['database'] and cfg['database']['enabled'] == 'yes' and 'name' in cfg['database'] and 'host' in cfg['database'] and 'username' in cfg['database'] and 'password' in cfg['database']: connection = MySQLdb.connect(host = cfg['database']['host'], user = cfg['database']['username'], passwd = cfg['database']['password'], db = cfg['database']['name'], port = 3306) cursor = connection.cursor() @@ -83,17 +83,17 @@ if cfg.has_key('database') and cfg['database'].has_key('enabled') and cfg['datab GnuPG.add_key(cfg['gpg']['keyhome'], row[0]) # import the key to gpg cursor.execute("UPDATE gpgmw_keys SET status = 1 WHERE id = %s", (row[1],)) # mark key as accepted appendLog('Imported key from <' + row[2] + '>') - if cfg['cron'].has_key('send_email') and cfg['cron']['send_email'] == 'yes': + if 'send_email' in cfg['cron'] and cfg['cron']['send_email'] == 'yes': send_msg( "PGP key registration successful", "registrationSuccess.md", row[2] ) else: cursor.execute("DELETE FROM gpgmw_keys WHERE id = %s", (row[1],)) # delete key appendLog('Import confirmation failed for <' + row[2] + '>') - if cfg['cron'].has_key('send_email') and cfg['cron']['send_email'] == 'yes': + if 'send_email' in cfg['cron'] and cfg['cron']['send_email'] == 'yes': send_msg( "PGP key registration failed", "registrationError.md", row[2] ) else: # delete key so we don't continue processing it cursor.execute("DELETE FROM gpgmw_keys WHERE id = %s", (row[1],)) - if cfg['cron'].has_key('send_email') and cfg['cron']['send_email'] == 'yes': + if 'send_email' in cfg['cron'] and cfg['cron']['send_email'] == 'yes': send_msg( "PGP key deleted", "keyDeleted.md", row[2]) connection.commit() @@ -108,4 +108,4 @@ if cfg.has_key('database') and cfg['database'].has_key('enabled') and cfg['datab appendLog('Deleted key for <' + row[0] + '>') connection.commit() else: - print "Warning: doing nothing since database settings are not configured!" + print("Warning: doing nothing since database settings are not configured!") diff --git a/gpg-mailgate.py b/gpg-mailgate.py index 9830262..02e1c80 100755 --- a/gpg-mailgate.py +++ b/gpg-mailgate.py @@ -19,7 +19,7 @@ # along with gpg-mailgate source code. If not, see . # -from ConfigParser import RawConfigParser +from configparser import RawConfigParser from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart import copy @@ -39,9 +39,14 @@ import traceback from M2Crypto import BIO, Rand, SMIME, X509 from email.mime.message import MIMEMessage +# Environment variable name we read to retrieve configuration path. This is to +# enable non-root users to set up and run GPG Mailgate and to make the software +# testable. +CONFIG_PATH_ENV = "GPG_MAILGATE_CONFIG" + # Read configuration from /etc/gpg-mailgate.conf _cfg = RawConfigParser() -_cfg.read('/etc/gpg-mailgate.conf') +_cfg.read(os.getenv(CONFIG_PATH_ENV, '/etc/gpg-mailgate.conf')) cfg = dict() for sect in _cfg.sections(): cfg[sect] = dict() @@ -65,227 +70,6 @@ raw_message = email.message_from_string( raw ) from_addr = raw_message['From'] to_addrs = sys.argv[1:] -def gpg_decrypt( raw_message, recipients ): - - gpg_to = list() - ungpg_to = list() - - # This is needed to avoid encryption if decryption is set to keymap only, - # private key is present but not in keymap. - noenc_to = list() - - if not get_bool_from_cfg('gpg', 'keyhome'): - log("No valid entry for gpg keyhome. Decryption aborted.") - return recipients - - keys = GnuPG.private_keys( cfg['gpg']['keyhome'] ) - - if get_bool_from_cfg('default', 'dec_regex'): - dec_regex = cfg['default']['dec_regex'] - else: - dec_regex = None - - for fingerprint in keys: - keys[fingerprint] = sanitize_case_sense(keys[fingerprint]) - - for to in recipients: - if to in keys.values() and not get_bool_from_cfg('default', 'dec_keymap_only', 'yes'): - gpg_to.append(to) - # Is this recipient defined in regex for default decryption? - elif not (dec_regex is None) and not (re.match(dec_regex, to) is None): - log("Using default decrytion defined in dec_regex for recipient '%s'" % to) - gpg_to.append(to) - elif get_bool_from_cfg('dec_keymap', to): - log("Decrypt keymap has key '%s'" % cfg['dec_keymap'][to] ) - # Check we've got a matching key! If not, decline to attempt decryption. The key is checked for safty reasons. - if not cfg['dec_keymap'][to] in keys: - log("Key '%s' in decryption keymap not found in keyring for email address '%s'. Won't decrypt." % (cfg['dec_keymap'][to], to)) - # Avoid unwanted encryption if set - if to in keys.values() and get_bool_from_cfg('default', 'failsave_dec', 'yes'): - noenc_to.append(to) - else: - ungpg_to.append(to) - else: - gpg_to.append(to) - else: - if verbose: - log("Recipient (%s) not in PGP domain list for decrypting." % to) - # Avoid unwanted encryption if set - if to in keys.values() and get_bool_from_cfg('default', 'failsave_dec', 'yes'): - noenc_to.append(to) - else: - ungpg_to.append(to) - - if gpg_to != list(): - send_msg( gpg_decrypt_all_payloads( raw_message ).as_string(), gpg_to ) - - if noenc_to != list(): - log("Do not try to encrypt mails for: %s" % ', '.join( noenc_to )) - send_msg(raw_message.as_string(), noenc_to) - - return ungpg_to - -def gpg_decrypt_all_payloads( message ): - - # We don't want to modify the original message - decrypted_message = copy.deepcopy(message) - - # Check if message is PGP/MIME encrypted - if not (message.get_param('protocol') is None) and message.get_param('protocol') == 'application/pgp-encrypted' and message.is_multipart(): - decrypted_message = decrypt_mime(decrypted_message) - - # At this point the message could only be PGP/INLINE encrypted, unencrypted or - # encrypted with a mechanism not covered by GPG-Mailgate - elif get_bool_from_cfg('default', 'no_inline_dec', 'no'): - # Check if message is PGP/INLINE encrypted and has attachments (or unencrypted with attachments) - if message.is_multipart(): - - # Set message's payload to list so payloads can be attached later on - decrypted_message.set_payload(list()) - - # We only need to hand over the original message here. Not needed for other decrypt implementations. - decrypted_message, success = decrypt_inline_with_attachments(message, False, decrypted_message) - - # Add header here to avoid it being appended several times - if get_bool_from_cfg('default', 'add_header', 'yes') and success: - decrypted_message['X-GPG-Mailgate'] = 'Decrypted by GPG Mailgate' - - # Check if message is PGP/INLINE encrypted without attachments (or unencrypted without attachments) - else: - decrypted_message = decrypt_inline_without_attachments(decrypted_message) - - return decrypted_message - -def decrypt_mime( decrypted_message ): - # Please note: Signatures will disappear while decrypting and will not be checked - - # Getting the part which should be PGP encrypted (according to RFC) - msg_content = decrypted_message.get_payload(1).get_payload() - - if "-----BEGIN PGP MESSAGE-----" in msg_content and "-----END PGP MESSAGE-----" in msg_content: - start = msg_content.find("-----BEGIN PGP MESSAGE-----") - end = msg_content.find("-----END PGP MESSAGE-----") - decrypted_payload, decrypt_success = decrypt_payload(msg_content[start:end + 25]) - - if decrypt_success: - # Making decrypted_message a "normal" unencrypted message - decrypted_message.del_param('protocol') - decrypted_message.set_type(decrypted_payload.get_content_type()) - - # Restore Content-Disposition header from original message - if not (decrypted_payload.get('Content-Disposition') is None): - if not (decrypted_message.get('Content-Disposition') is None): - decrypted_message.replace_header('Content-Disposition', decrypted_payload.get('Content-Disposition')) - else: - decrypted_message.set_param(decrypted_payload.get('Content-Disposition'), "", 'Content-Disposition') - - if decrypted_payload.is_multipart(): - # Clear message's original payload and insert the decrypted payloads - decrypted_message.set_payload(list()) - decrypted_message = generate_message_from_payloads( decrypted_payload, decrypted_message ) - decrypted_message.preamble = "This is a multi-part message in MIME format" - else: - decrypted_message.set_payload(decrypted_payload.get_payload()) - decrypted_message.preamble = None - - - if get_bool_from_cfg('default', 'add_header', 'yes'): - decrypted_message['X-GPG-Mailgate'] = 'Decrypted by GPG Mailgate' - - # If decryption fails, decrypted_message is equal to the original message - return decrypted_message - -def decrypt_inline_with_attachments( payloads, success, message = None ): - - if message is None: - message = email.mime.multipart.MIMEMultipart(payloads.get_content_subtype()) - - for payload in payloads.get_payload(): - if( type( payload.get_payload() ) == list ): - # Take care of cascaded MIME messages - submessage, subsuccess = decrypt_inline_with_attachments( payload, success ) - message.attach(submessage) - success = success or subsuccess - else: - msg_content = payload.get_payload() - - # Getting values for different implementations as PGP/INLINE is not implemented - # the same on different clients - pgp_inline_tags = "-----BEGIN PGP MESSAGE-----" in msg_content and "-----END PGP MESSAGE-----" in msg_content - attachment_filename = payload.get_filename() - - if pgp_inline_tags or not (attachment_filename is None) and not (re.search('.\.pgp$', attachment_filename) is None): - if pgp_inline_tags: - start = msg_content.find("-----BEGIN PGP MESSAGE-----") - end = msg_content.find("-----END PGP MESSAGE-----") - decrypted_payload, decrypt_success = decrypt_payload(msg_content[start:end + 25]) - # Some implementations like Enigmail have strange interpretations of PGP/INLINE - # This tries to cope with it as good as possible. - else: - build_message = """ ------BEGIN PGP MESSAGE----- - -%s ------END PGP MESSAGE-----""" % msg_content - - decrypted_payload, decrypt_success = decrypt_payload(build_message) - - # Was at least one decryption successful? - success = success or decrypt_success - - if decrypt_success: - - if not (attachment_filename is None): - attachment_filename = re.sub('\.pgp$', '', attachment_filename) - payload.set_param('filename', attachment_filename, 'Content-Disposition') - payload.set_param('name', attachment_filename, 'Content-Type') - # Need this nasty hack to avoid double blank lines at beginning of message - payload.set_payload(decrypted_payload.as_string()[1:]) - - message.attach(payload) - else: - # Message could not be decrypted, so non-decrypted message is attached - message.attach(payload) - else: - # There was no encrypted payload found, so the original payload is attached - message.attach(payload) - - return message, success - -def decrypt_inline_without_attachments( decrypted_message ): - - msg_content = decrypted_message.get_payload() - if "-----BEGIN PGP MESSAGE-----" in msg_content and "-----END PGP MESSAGE-----" in msg_content: - start = msg_content.find("-----BEGIN PGP MESSAGE-----") - end = msg_content.find("-----END PGP MESSAGE-----") - decrypted_payload, decrypt_success = decrypt_payload(msg_content[start:end + 25]) - - if decrypt_success: - # Need this nasty hack to avoid double blank lines at beginning of message - decrypted_message.set_payload(decrypted_payload.as_string()[1:]) - - if get_bool_from_cfg('default', 'add_header', 'yes'): - decrypted_message['X-GPG-Mailgate'] = 'Decrypted by GPG Mailgate' - - # If message was not encrypted, this will just return the original message - return decrypted_message - -def decrypt_payload( payload ): - - gpg = GnuPG.GPGDecryptor( cfg['gpg']['keyhome'] ) - gpg.update( payload ) - decrypted_data, returncode = gpg.decrypt() - if verbose: - log("Return code from decryption=%d (0 indicates success)." % returncode) - if returncode != 0: - log("Decrytion failed with return code %d. Decryption aborted." % returncode) - return payload, False - - # Decryption always generate a new message - decrypted_msg = email.message_from_string(decrypted_data) - - return decrypted_msg, True - def gpg_encrypt( raw_message, recipients ): if not get_bool_from_cfg('gpg', 'keyhome'): @@ -336,7 +120,7 @@ def gpg_encrypt( raw_message, recipients ): ungpg_to.append(to) if gpg_to != list(): - log("Encrypting email to: %s" % ' '.join( map(lambda x: x[0], gpg_to) )) + log("Encrypting email to: %s" % ' '.join( x[0] for x in gpg_to )) # Getting PGP style for recipient gpg_to_smtp_mime = list() @@ -373,10 +157,10 @@ def gpg_encrypt( raw_message, recipients ): if get_bool_from_cfg('default', 'add_header', 'yes'): raw_message_mime['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate' - if raw_message_mime.has_key('Content-Transfer-Encoding'): - raw_message_mime.replace_header('Content-Transfer-Encoding','8BIT') - else: - raw_message_mime['Content-Transfer-Encoding'] = '8BIT' + if 'Content-Transfer-Encoding' in raw_message_mime: + raw_message_mime.replace_header('Content-Transfer-Encoding', '8BIT') + else: + raw_message_mime['Content-Transfer-Encoding'] = '8BIT' encrypted_payloads = encrypt_all_payloads_mime( raw_message_mime, gpg_to_cmdline_mime ) raw_message_mime.set_payload( encrypted_payloads ) @@ -390,10 +174,10 @@ def gpg_encrypt( raw_message, recipients ): if get_bool_from_cfg('default', 'add_header', 'yes'): raw_message_inline['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate' - if raw_message_inline.has_key('Content-Transfer-Encoding'): - raw_message_inline.replace_header('Content-Transfer-Encoding','8BIT') - else: - raw_message_inline['Content-Transfer-Encoding'] = '8BIT' + if 'Content-Transfer-Encoding' in raw_message_inline: + raw_message_inline.replace_header('Content-Transfer-Encoding', '8BIT') + else: + raw_message_inline['Content-Transfer-Encoding'] = '8BIT' encrypted_payloads = encrypt_all_payloads_inline( raw_message_inline, gpg_to_cmdline_inline ) raw_message_inline.set_payload( encrypted_payloads ) @@ -406,11 +190,11 @@ def encrypt_all_payloads_inline( message, gpg_to_cmdline ): # This breaks cascaded MIME messages. Blame PGP/INLINE. encrypted_payloads = list() - if type( message.get_payload() ) == str: + if isinstance(message.get_payload(), str): return encrypt_payload( message, gpg_to_cmdline ).get_payload() for payload in message.get_payload(): - if( type( payload.get_payload() ) == list ): + if( isinstance(payload.get_payload(), list) ): encrypted_payloads.extend( encrypt_all_payloads_inline( payload, gpg_to_cmdline ) ) else: encrypted_payloads.append( encrypt_payload( payload, gpg_to_cmdline ) ) @@ -432,13 +216,13 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ): submsg2.set_param('inline', "", 'Content-Disposition' ) submsg2.set_param('filename', "encrypted.asc", 'Content-Disposition' ) - if type ( message.get_payload() ) == str: + if isinstance(message.get_payload(), str): # WTF! It seems to swallow the first line. Not sure why. Perhaps # it's skipping an imaginary blank line someplace. (ie skipping a header) # Workaround it here by prepending a blank line. # This happens only on text only messages. additionalSubHeader="" - if message.has_key('Content-Type') and not message['Content-Type'].startswith('multipart'): + if 'Content-Type' in message and not message['Content-Type'].startswith('multipart'): additionalSubHeader="Content-Type: "+message['Content-Type']+"\n" submsg2.set_payload(additionalSubHeader+"\n" +message.get_payload(decode=True)) check_nested = True @@ -454,8 +238,8 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ): junk_str = junk_msg.as_string() # WTF! Without this, get_boundary() will return 'None'! boundary = junk_msg.get_boundary() - # This also modifies the boundary in the body of the message, ie it gets parsed. - if message.has_key('Content-Type'): + # This also modifies the boundary in the body of the message, ie it gets parsed. + if 'Content-Type' in message: message.replace_header('Content-Type', "multipart/encrypted; protocol=\"application/pgp-encrypted\";\nboundary=\"%s\"\n" % boundary) else: message['Content-Type'] = "multipart/encrypted; protocol=\"application/pgp-encrypted\";\nboundary=\"%s\"\n" % boundary @@ -465,7 +249,7 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ): def encrypt_payload( payload, gpg_to_cmdline, check_nested = True ): raw_payload = payload.get_payload(decode=True) - if check_nested and "-----BEGIN PGP MESSAGE-----" in raw_payload and "-----END PGP MESSAGE-----" in raw_payload: + if check_nested and b"-----BEGIN PGP MESSAGE-----" in raw_payload and b"-----END PGP MESSAGE-----" in raw_payload: if verbose: log("Message is already pgp encrypted. No nested encryption needed.") return payload @@ -591,9 +375,13 @@ def sanitize_case_sense( address ): if get_bool_from_cfg('default', 'mail_case_insensitive', 'yes'): address = address.lower() else: - splitted_address = address.split('@') + if isinstance(address, str): + sep = '@' + else: + sep = b'@' + splitted_address = address.split(sep) if len(splitted_address) > 1: - address = splitted_address[0] + '@' + splitted_address[1].lower() + address = splitted_address[0] + sep + splitted_address[1].lower() return address @@ -603,7 +391,7 @@ def generate_message_from_payloads( payloads, message = None ): message = email.mime.multipart.MIMEMultipart(payloads.get_content_subtype()) for payload in payloads.get_payload(): - if( type( payload.get_payload() ) == list ): + if( isinstance(payload.get_payload(), list) ): message.attach(generate_message_from_payloads(payload)) else: message.attach(payload) @@ -619,7 +407,7 @@ def get_first_payload( payloads ): def send_msg( message, recipients ): - recipients = filter(None, recipients) + recipients = [_f for _f in recipients if _f] if recipients: if not (get_bool_from_cfg('relay', 'host') and get_bool_from_cfg('relay', 'port')): log("Missing settings for relay. Sending email aborted.") @@ -627,8 +415,8 @@ def send_msg( message, recipients ): log("Sending email to: <%s>" % '> <'.join( recipients )) relay = (cfg['relay']['host'], int(cfg['relay']['port'])) smtp = smtplib.SMTP(relay[0], relay[1]) - if cfg.has_key('relay') and cfg['relay'].has_key('starttls') and cfg['relay']['starttls'] == 'yes': - smtp.starttls() + if 'relay' in cfg and 'starttls' in cfg['relay'] and cfg['relay']['starttls'] == 'yes': + smtp.starttls() smtp.sendmail( from_addr, recipients, message ) else: log("No recipient found") @@ -639,11 +427,6 @@ def sort_recipients( raw_message, from_addr, to_addrs ): for recipient in to_addrs: recipients_left.append(sanitize_case_sense(recipient)) - # Decrypt mails for recipients with known private PGP keys - recipients_left = gpg_decrypt(raw_message, recipients_left) - if recipients_left == list(): - return - # There is no need for nested encryption first_payload = get_first_payload(raw_message) if first_payload.get_content_type() == 'application/pkcs7-mime': @@ -653,7 +436,7 @@ def sort_recipients( raw_message, from_addr, to_addrs ): return first_payload = first_payload.get_payload(decode=True) - if "-----BEGIN PGP MESSAGE-----" in first_payload and "-----END PGP MESSAGE-----" in first_payload: + if b"-----BEGIN PGP MESSAGE-----" in first_payload and b"-----END PGP MESSAGE-----" in first_payload: if verbose: log("Message is already encrypted as PGP/INLINE. Encryption aborted.") send_msg(raw_message.as_string(), recipients_left) diff --git a/register-handler.py b/register-handler.py index 5b1cf9a..f4c1e57 100644 --- a/register-handler.py +++ b/register-handler.py @@ -1,6 +1,6 @@ #!/usr/bin/python -from ConfigParser import RawConfigParser +from configparser import RawConfigParser import email, os, smtplib, sys, traceback, markdown, syslog, requests from M2Crypto import BIO, Rand, SMIME, X509 @@ -17,7 +17,7 @@ for sect in _cfg.sections(): cfg[sect][name] = value def log(msg): - if cfg.has_key('logging') and cfg['logging'].has_key('file'): + if 'logging' in cfg and 'file' in cfg['logging']: if cfg['logging']['file'] == "syslog": syslog.syslog(syslog.LOG_INFO | syslog.LOG_MAIL, msg) else: @@ -78,9 +78,9 @@ if __name__ == "__main__": sys.exit(0) if sign_type == 'smime': - raw_sig = sign_part.get_payload().replace("\n","") + raw_sig = sign_part.get_payload().replace("\n", "") # re-wrap signature so that it fits base64 standards - cooked_sig = '\n'.join(raw_sig[pos:pos+76] for pos in xrange(0, len(raw_sig), 76)) + cooked_sig = '\n'.join(raw_sig[pos:pos+76] for pos in range(0, len(raw_sig), 76)) # now, wrap the signature in a PKCS7 block sig = """ @@ -106,7 +106,7 @@ if __name__ == "__main__": # format in user-specific data # sending success mail only for S/MIME as GPGMW handles this on its own success_msg = file(cfg['mailregister']['mail_templates']+"/registrationSuccess.md").read() - success_msg = success_msg.replace("[:FROMADDRESS:]",from_addr) + success_msg = success_msg.replace("[:FROMADDRESS:]", from_addr) msg = MIMEMultipart("alternative") msg["From"] = cfg['mailregister']['register_email'] @@ -128,7 +128,7 @@ if __name__ == "__main__": if r.status_code != 200: log("Could not hand registration over to GPGMW. Error: %s" % r.status_code) error_msg = file(cfg['mailregister']['mail_templates']+"/gpgmwFailed.md").read() - error_msg = error_msg.replace("[:FROMADDRESS:]",from_addr) + error_msg = error_msg.replace("[:FROMADDRESS:]", from_addr) msg = MIMEMultipart("alternative") msg["From"] = cfg['mailregister']['register_email'] diff --git a/test/certs/.keep b/test/certs/.keep new file mode 100644 index 0000000..e69de29 diff --git a/test/e2e.ini b/test/e2e.ini new file mode 100644 index 0000000..134a005 --- /dev/null +++ b/test/e2e.ini @@ -0,0 +1,73 @@ +# +# gpg-mailgate +# +# This file is part of the gpg-mailgate source code. +# +# gpg-mailgate is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# gpg-mailgate source code is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with gpg-mailgate source code. If not, see . +# + +# NOTE: We use : syntax, because some values contain +# colons and that is default ConfigParser key-value separator. + +[relay] +port: 2500 +script: test/relay.py + +[dirs] +keys: test/keyhome +certs: test/certs + +[tests] +# Number of "test-*" sections in this file, describing test cases. +cases: 6 +e2e_log: test/logs/e2e.log +e2e_log_format: %(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s +e2e_log_datefmt: %Y-%m-%d %H:%M:%S +lacre_log: test/logs/gpg-mailgate.log + +[case-1] +descr: Clear text message to a user without a key +to: carlos@disposlab +in: test/msgin/clear2clear.msg +out: Body of the message. + +[case-2] +descr: Clear text message to a user with an RSA key +to: alice@disposlab +in: test/msgin/clear2rsa.msg +out: -----BEGIN PGP MESSAGE----- + +[case-3] +descr: Clear text message to a user with an Ed25519 key +to: bob@disposlab +in: test/msgin/clear2ed.msg +out: -----BEGIN PGP MESSAGE----- + +[case-4] +descr: Encrypted message to a user with an Ed25519 key +to: bob@disposlab +in: test/msgin/ed2ed.msg +out: -----BEGIN PGP MESSAGE----- + +[case-5] +descr: Signed message to a user with an Ed25519 key +to: bob@disposlab +in: test/msgin/signed.msg +out: -----BEGIN PGP MESSAGE----- + +[case-6] +descr: Multipart encrypted message to a user with an Ed25519 key. +to: bob@disposlab +in: test/msgin/multipart2rsa.msg +out: -----BEGIN PGP MESSAGE----- diff --git a/test/e2e_test.py b/test/e2e_test.py new file mode 100644 index 0000000..f937fa9 --- /dev/null +++ b/test/e2e_test.py @@ -0,0 +1,158 @@ +# +# gpg-mailgate +# +# This file is part of the gpg-mailgate source code. +# +# gpg-mailgate is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# gpg-mailgate source code is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with gpg-mailgate source code. If not, see . +# + +import os +import sys + +import subprocess + +import difflib + +import configparser +import logging + +from time import sleep + +RELAY_SCRIPT = "test/relay.py" +CONFIG_FILE = "test/gpg-mailgate.conf" + +def build_config(config): + cp = configparser.ConfigParser() + + cp.add_section("logging") + cp.set("logging", "file", config["log_file"]) + cp.set("logging", "verbose", "yes") + + cp.add_section("gpg") + cp.set("gpg", "keyhome", config["gpg_keyhome"]) + + cp.add_section("smime") + cp.set("smime", "cert_path", config["smime_certpath"]) + + cp.add_section("relay") + cp.set("relay", "host", "localhost") + cp.set("relay", "port", config["port"]) + + cp.add_section("enc_keymap") + cp.set("enc_keymap", "alice@disposlab", "1CD245308F0963D038E88357973CF4D9387C44D7") + cp.set("enc_keymap", "bob@disposlab", "19CF4B47ECC9C47AFA84D4BD96F39FDA0E31BB67") + + logging.debug(f"Created config with keyhome={config['gpg_keyhome']}, cert_path={config['smime_certpath']} and relay at port {config['port']}") + return cp + +def write_test_config(outfile, **config): + logging.debug(f"Generating configuration with {config!r}") + + out = open(outfile, "w+") + cp = build_config(config) + cp.write(out) + out.close() + + logging.debug(f"Wrote configuration to {outfile}") + +def load_file(name): + f = open(name, 'r') + contents = f.read() + f.close() + + return bytes(contents, 'utf-8') + +def report_result(message_file, expected, test_output): + status = None + if expected in test_output: + status = "Success" + else: + status = "Failure" + + print(message_file.ljust(30), status) + +def execute_e2e_test(case_name, config, config_path): + """Read test case configuration from config and run that test case. + + Parameter case_name should refer to a section in test + config file. Each of these sections should contain + following properties: 'descr', 'to', 'in' and 'out'. + """ + # This environment variable is set in Makefile. + python_path = os.getenv('PYTHON', 'python3') + + gpglacre_cmd = [python_path, + "gpg-mailgate.py", + config.get(case_name, "to")] + + relay_cmd = [python_path, + config.get("relay", "script"), + config.get("relay", "port")] + + logging.debug(f"Spawning relay: {relay_cmd}") + relay_proc = subprocess.Popen(relay_cmd, + stdin = None, + stdout = subprocess.PIPE) + + logging.debug(f"Spawning GPG-Lacre: {gpglacre_cmd}, stdin = {config.get(case_name, 'in')}") + + # pass PATH because otherwise it would be dropped + gpglacre_proc = subprocess.run(gpglacre_cmd, + input = load_file(config.get(case_name, "in")), + capture_output = True, + env = {"GPG_MAILGATE_CONFIG": config_path, + "PATH": os.getenv("PATH")}) + + # Let the relay process the data. + relay_proc.wait() + + (testout, _) = relay_proc.communicate() + testout = testout.decode('utf-8') + + logging.debug(f"Read {len(testout)} characters of test output: '{testout}'") + + report_result(config.get(case_name, "in"), config.get(case_name, "out"), testout) + +def load_test_config(): + cp = configparser.ConfigParser() + cp.read("test/e2e.ini") + + return cp + + +config = load_test_config() + +logging.basicConfig(filename = config.get("tests", "e2e_log"), + # Get raw values of log and date formats because they + # contain %-sequences and we don't want them to be expanded + # by the ConfigParser. + format = config.get("tests", "e2e_log_format", raw=True), + datefmt = config.get("tests", "e2e_log_datefmt", raw=True), + level = logging.DEBUG) + +config_path = os.getcwd() + "/" + CONFIG_FILE + +write_test_config(config_path, + port = config.get("relay", "port"), + gpg_keyhome = config.get("dirs", "keys"), + smime_certpath = config.get("dirs", "certs"), + log_file = config.get("tests", "lacre_log")) + +for case_no in range(1, config.getint("tests", "cases")+1): + case_name = f"case-{case_no}" + logging.info(f"Executing {case_name}: {config.get(case_name, 'descr')}") + + execute_e2e_test(case_name, config, config_path) + +print("See diagnostic output for details. Tests: '%s', Lacre: '%s'" % (config.get("tests", "e2e_log"), config.get("tests", "lacre_log"))) diff --git a/test/keyhome/crls.d/DIR.txt b/test/keyhome/crls.d/DIR.txt new file mode 100644 index 0000000..2a29a47 --- /dev/null +++ b/test/keyhome/crls.d/DIR.txt @@ -0,0 +1 @@ +v:1: diff --git a/test/keyhome/pubring.kbx b/test/keyhome/pubring.kbx new file mode 100644 index 0000000..83e1be1 Binary files /dev/null and b/test/keyhome/pubring.kbx differ diff --git a/test/keyhome/tofu.db b/test/keyhome/tofu.db new file mode 100644 index 0000000..16104b5 Binary files /dev/null and b/test/keyhome/tofu.db differ diff --git a/test/keyhome/trustdb.gpg b/test/keyhome/trustdb.gpg new file mode 100644 index 0000000..7691303 Binary files /dev/null and b/test/keyhome/trustdb.gpg differ diff --git a/test/msgin/clear2clear.msg b/test/msgin/clear2clear.msg new file mode 100644 index 0000000..ae577d1 --- /dev/null +++ b/test/msgin/clear2clear.msg @@ -0,0 +1,5 @@ +From: Dave +To: Carlos +Subject: Test + +Body of the message. diff --git a/test/msgin/clear2ed.msg b/test/msgin/clear2ed.msg new file mode 100644 index 0000000..63b6c66 --- /dev/null +++ b/test/msgin/clear2ed.msg @@ -0,0 +1,5 @@ +From: Dave +To: Bob +Subject: Test + +Body of the message. diff --git a/test/msgin/clear2rsa.msg b/test/msgin/clear2rsa.msg new file mode 100644 index 0000000..9dfd134 --- /dev/null +++ b/test/msgin/clear2rsa.msg @@ -0,0 +1,5 @@ +From: Dave +To: Alice +Subject: Test + +Body of the message. diff --git a/test/msgin/ed2ed.msg b/test/msgin/ed2ed.msg new file mode 100644 index 0000000..90b73a8 --- /dev/null +++ b/test/msgin/ed2ed.msg @@ -0,0 +1,13 @@ +From: Dave +To: Bob +Subject: Test +Content-Transfer-Encoding: 7bit + +-----BEGIN PGP MESSAGE----- + +hF4DujWCoRS24dYSAQdAyGDF9Us11JDr8+XPmvlJHsMS7A4UBIcCiresJyZpSxYw +Cqcugy5AX5fgSAiL1Cd2b1zpQ/rYdTWkFYMVbH4jBEoPC3z/aSd+hTnneJFDUdXl +0koBDIw7NQylu6SrW+Y/DmXgalIHtwACuKivJTq/z9jdwFScV7adRR/VO53Inah3 +L1+Ho7Zta95AYW3UPu71Gw3rrkfjY4uGDiFAFg== +=yTzD +-----END PGP MESSAGE----- diff --git a/test/msgin/multipart2rsa.msg b/test/msgin/multipart2rsa.msg new file mode 100644 index 0000000..3b8f623 --- /dev/null +++ b/test/msgin/multipart2rsa.msg @@ -0,0 +1,43 @@ +Date: Sun, 18 Jul 2021 16:53:45 +0200 +From: User Alice +To: User Bob +Subject: encrypted +Message-ID: +MIME-Version: 1.0 +Content-Type: multipart/encrypted; protocol="application/pgp-encrypted"; + boundary="95hZs/zeBetwhuEy" +Content-Disposition: inline +Status: RO +Content-Length: 1140 +Lines: 30 + + +--95hZs/zeBetwhuEy +Content-Type: application/pgp-encrypted +Content-Disposition: attachment + +Version: 1 + +--95hZs/zeBetwhuEy +Content-Type: application/octet-stream +Content-Disposition: attachment; filename="msg.asc" + +-----BEGIN PGP MESSAGE----- + +hQGMA/vsqjpkmZurAQwAnb+2kDPgVFWVLkafuzVJGqFWKNtdVsvk7I1zhzFw5Hsr +h4irSHcH0X0QjaHprNiMBDfIZaCx5VVsvGYLiu/iQkdVPXItugTpln8aAvDt8/Bp +Hse69tgG5S9o4fPK4K2bMjNdomclDdz51cu9NXYjk/6OtzVwcSypyEmxgw24Oo1+ +Q8KfZN9n6VTXGNlrV9KnAZYs/5aaSABTeC+cDvOcjDbPAmwDHYS3qsbITYoGHnEz +QfPIakYWPtPWkajhm4Z/iyEUSTeqew1/gAJ8sZnJpV0eg1Cr/44XgklZKFr8aJgk +SG8PkQxsyzAZklpwMSWdbb+t9a5nEKvky3zMpdmS1GE7ubTO7nQ1geUdBiv1UUNh +BY9d4nlGirqxX1MZUTGZidJgCy0365xbJSKkU0yFFW2uWtCKzJTEQBk3YZkNmnGH +h8BiVvMhQ8SxKBRPeH6Zb6HHlbcgkPvJAAI4VLqkZPCBvp9irmcdFGmrgCWLxzgk +sIjYGLA+ZuSXOKuAssXE0sAbASPAkUJRTIjzXFrCnr/MB3ZonESH01fsbsX+E/Qi ++2oLrgjjPHcPq76fvdO6fJP6c1pM8TlOoZKn/RkPm1llULtOn4n5JZJjeUA0F2ID +Te/U9i4YtcFZbuvw2bjeu8sAf77U6O3iTTBWkPWQT3H4YMskQc7lS1Mug6A9HL/n +TQvAwh2MIveYyEy/y/dKeFUbpSKxyOInhTg1XtYFiT8bzEF7OEJLU9GyF5oMs67d +o12uYlEnPhWz9oZp11aSdnyeADpVu6BQsPbwfTifcpajQSarH5sG8+rDSPju +=7CnH +-----END PGP MESSAGE----- + +--95hZs/zeBetwhuEy-- diff --git a/test/msgin/signed.msg b/test/msgin/signed.msg new file mode 100644 index 0000000..917291e --- /dev/null +++ b/test/msgin/signed.msg @@ -0,0 +1,39 @@ +Date: Sun, 18 Jul 2021 12:08:41 +0200 +From: User Alice +To: User Bob +Subject: signed +Message-ID: +MIME-Version: 1.0 +Content-Type: multipart/signed; micalg=pgp-sha256; + protocol="application/pgp-signature"; boundary="U/XjR71RAixRcb28" +Content-Disposition: inline +Status: RO +Content-Length: 870 +Lines: 26 + + +--U/XjR71RAixRcb28 +Content-Type: text/plain; charset=us-ascii +Content-Disposition: inline + +A signed msg. + +--U/XjR71RAixRcb28 +Content-Type: application/pgp-signature; name="signature.asc" + +-----BEGIN PGP SIGNATURE----- + +iQGzBAEBCAAdFiEEHNJFMI8JY9A46INXlzz02Th8RNcFAmDz/aUACgkQlzz02Th8 +RNdtOQv/ca8c51KoVq7CyPJUr54n4DEk/LlYniR0W51tL2a4rQxyF2AxqjdI8T4u +bT1+bqPNYgegesyCLokeZKqhLVtCH+UVOTdtUq5bB1J7ALuuVTOIdR5woMBBsazV +ETYEMzL6y2sGPW92ynriEw6B9pPnFKFPhOOZLrnMzM8CpkTfNmGoej+EdV74s0z4 +RayKu/WaZ1Dtx2Vy2YDtG36p/Y3n62bnzQJCRyPYfrmCxH5X5i5oibQwxLROCFNE +4X3iVZLPHFg/DS9m4L7mBe0MJewGa1oPFr7t3ZfJ+24aJ/AvUv5uQIO+s6a7AcjD +Pgw/IjeM/uZdPrzniZI2zsWEgsjRCL1fj49XWVNkTHrWCqLvkBg+suucNO2SR0/d +ps+RP5mkJJHaSZyPpxwo9/PHKX67Mkpn/uEXlE8nV6IqKoXRzr1N0qwyhvbZQZLD +FMumxx/eOSiOpaiRhGhoZiUpf+VdnV/1ClpAcdbthy/psx/CMYVblAM8xg74NR9+ +Q/WlFbRl +=uMdE +-----END PGP SIGNATURE----- + +--U/XjR71RAixRcb28-- diff --git a/test/relay.py b/test/relay.py new file mode 100644 index 0000000..a8e3db7 --- /dev/null +++ b/test/relay.py @@ -0,0 +1,87 @@ +#!/usr/local/bin/python2 +# +# This quick-and-dirty script supports only the happy case of SMTP session, +# i.e. what gpg-mailgate/gpg-lacre needs to deliver encrypted email. +# +# It listens on the port given as the only command-line argument and consumes a +# message, then prints it to standard output. The goal is to be able to +# compare that output with expected clear-text or encrypted message body. +# + +import sys +import socket + + +EXIT_UNAVAILABLE = 1 + +ENCODING = 'utf-8' + +BUFFER_SIZE = 4096 +EOM = "\r\n.\r\n" +LAST_LINE = -3 + + +def welcome(msg): + return b"220 %b\r\n" % (msg) + +def ok(msg = b"OK"): + return b"250 %b\r\n" % (msg) + +def bye(): + return b"251 Bye" + +def provide_message(): + return b"354 Enter a message, ending it with a '.' on a line by itself\r\n" + +def receive_and_confirm(session): + session.recv(BUFFER_SIZE) + session.sendall(ok()) + +def localhost_at(port): + return ('127.0.0.1', port) + +def serve(port): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + s.bind(localhost_at(port)) + s.listen(1) + except socket.error as e: + print("Cannot connect", e) + sys.exit(EXIT_UNAVAILABLE) + + (conn, addr) = s.accept() + conn.sendall(welcome(b"TEST SERVER")) + + receive_and_confirm(conn) # Ignore HELO/EHLO + receive_and_confirm(conn) # Ignore sender address + receive_and_confirm(conn) # Ignore recipient address + + data = conn.recv(BUFFER_SIZE) + conn.sendall(provide_message()) + + # Consume until we get ., the end-of-message marker. + message = '' + while not message.endswith(EOM): + message += conn.recv(BUFFER_SIZE).decode(ENCODING) + conn.sendall(ok(b"OK, id=test")) + + conn.recv(BUFFER_SIZE) + conn.sendall(bye()) + + conn.close() + + # Trim EOM marker as we're only interested in the message body. + return message[:-len(EOM)] + +def error(msg, exit_code): + print("ERROR: %s" % (msg)) + sys.exit(exit_code) + + +if len(sys.argv) < 2: + error("Usage: relay.py PORT_NUMBER", EXIT_UNAVAILABLE) + +port = int(sys.argv[1]) +body = serve(port) + +print(body) diff --git a/test/test_gnupg.py b/test/test_gnupg.py new file mode 100644 index 0000000..5712acd --- /dev/null +++ b/test/test_gnupg.py @@ -0,0 +1,16 @@ +import GnuPG + +import unittest + +class GnuPGUtilitiesTest(unittest.TestCase): + def test_build_default_command(self): + cmd = GnuPG.build_command("test/keyhome") + self.assertEqual(cmd, ["gpg", "--homedir", "test/keyhome"]) + + def test_build_command_extended_with_args(self): + cmd = GnuPG.build_command("test/keyhome", "--foo", "--bar") + self.assertEqual(cmd, ["gpg", "--homedir", "test/keyhome", "--foo", "--bar"]) + + +if __name__ == '__main__': + unittest.main()