diff --git a/.gitignore b/.gitignore index 7808c4b..140af95 100644 --- a/.gitignore +++ b/.gitignore @@ -26,6 +26,18 @@ pip-log.txt .tox nosetests.xml +# GPG-Mailgate test files +test/logs +test/tmp +test/gpg-mailgate.conf +test/keyhome/random_seed + +# Emacs files +*~ +TAGS +TAGS-Python +TAGS-PHP + # Translations *.mo diff --git a/GnuPG/__init__.py b/GnuPG/__init__.py index c9bbee0..540622e 100644 --- a/GnuPG/__init__.py +++ b/GnuPG/__init__.py @@ -23,9 +23,20 @@ import subprocess import shutil import random import string +import sys + + +LINE_FINGERPRINT = 'fpr' +LINE_USER_ID = 'uid' + +POS_FINGERPRINT = 9 + +def build_command(key_home, *args, **kwargs): + cmd = ["gpg", '--homedir', key_home] + list(args) + return cmd def private_keys( keyhome ): - cmd = ['/usr/bin/gpg', '--homedir', keyhome, '--list-secret-keys', '--with-colons'] + cmd = build_command(keyhome, '--list-secret-keys', '--with-colons') p = subprocess.Popen( cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) p.wait() keys = dict() @@ -39,17 +50,24 @@ def private_keys( keyhome ): return keys def public_keys( keyhome ): - cmd = ['/usr/bin/gpg', '--homedir', keyhome, '--list-keys', '--with-colons'] + cmd = build_command(keyhome, '--list-keys', '--with-colons') p = subprocess.Popen( cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) p.wait() + keys = dict() + fingerprint = None + email = None for line in p.stdout.readlines(): - if line[0:3] == 'uid' or line[0:3] == 'pub': + if line[0:3] == LINE_FINGERPRINT: + fingerprint = line.split(':')[POS_FINGERPRINT] + if line[0:3] == LINE_USER_ID: if ('<' not in line or '>' not in line): continue email = line.split('<')[1].split('>')[0] - fingerprint = line.split(':')[4] + if not (fingerprint is None or email is None): keys[fingerprint] = email + fingerprint = None + email = None return keys # confirms a key has a given email address @@ -64,7 +82,7 @@ def confirm_key( content, email ): os.mkdir(tmpkeyhome) localized_env = os.environ.copy() localized_env["LANG"] = "C" - p = subprocess.Popen( ['/usr/bin/gpg', '--homedir', tmpkeyhome, '--import', '--batch'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=localized_env ) + p = subprocess.Popen( build_command(tmpkeyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=localized_env ) result = p.communicate(input=content)[1] confirmed = False @@ -83,7 +101,7 @@ def confirm_key( content, email ): # adds a key and ensures it has the given email address def add_key( keyhome, content ): - p = subprocess.Popen( ['/usr/bin/gpg', '--homedir', keyhome, '--import', '--batch'], stdin=subprocess.PIPE, stdout=subprocess.PIPE,stderr=subprocess.PIPE ) + p = subprocess.Popen( build_command(keyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) p.communicate(input=content) p.wait() @@ -93,7 +111,7 @@ def delete_key( keyhome, email ): if result[1]: # delete all keys matching this email address - p = subprocess.Popen( ['/usr/bin/gpg', '--homedir', keyhome, '--delete-key', '--batch', '--yes', result[1]], stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) + p = subprocess.Popen( build_command(keyhome, '--delete-key', '--batch', '--yes', result[1]), stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) p.wait() return True @@ -117,7 +135,7 @@ class GPGEncryptor: return (encdata, p.returncode) def _command(self): - cmd = ["/usr/bin/gpg", "--trust-model", "always", "--homedir", self._keyhome, "--batch", "--yes", "--pgp7", "--no-secmem-warning", "-a", "-e"] + cmd = build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--pgp7", "--no-secmem-warning", "-a", "-e") # add recipients for recipient in self._recipients: @@ -145,6 +163,4 @@ class GPGDecryptor: return (decdata, p.returncode) def _command(self): - cmd = ["/usr/bin/gpg", "--trust-model", "always", "--homedir", self._keyhome, "--batch", "--yes", "--no-secmem-warning", "-a", "-d"] - - return cmd \ No newline at end of file + return build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--no-secmem-warning", "-a", "-d") diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..a14332a --- /dev/null +++ b/Makefile @@ -0,0 +1,32 @@ +PYTHON = python2.7 + +.PHONY: test pre-clean clean + +# +# Run a set of end-to-end tests. +# +# Test scenarios are described and configured by the test/e2e.ini +# file. Basically this is just a script that feeds GPG Mailgate with +# known input and checks whether output meets expectations. +# +test: test/tmp test/logs pre-clean + $(PYTHON) test/e2e_test.py + +# +# Run unit tests +# +unittest: + $(PYTHON) -m unittest discover -s test + +pre-clean: + rm -fv test/gpg-mailgate.conf + rm -f test/logs/*.log + +test/tmp: + mkdir test/tmp + +test/logs: + mkdir test/logs + +clean: pre-clean + rm -rfv test/tmp test/logs diff --git a/doc/testing.md b/doc/testing.md new file mode 100644 index 0000000..a462b0f --- /dev/null +++ b/doc/testing.md @@ -0,0 +1,31 @@ +# Testing + +First tests have been set up to cover GPG Mailgate with at least basic test +that would be easy to run. The tests are called "end-to-end", meaning that we +feed some input to GPG Mailgate and inspect the output. + +## Running tests + +To run tests, use command `make test` or `make unittest`. + +Tests produce some helpful logs, so inspect contents of `test/logs` directory +if something goes wrong. + +## Key building blocks + +- *Test Script* (`test/e2e_test.py`) that orchestrates the other components. + It performs test cases described in the *Test Configuration*. It spawns + *Test Mail Relay* and *GPG Mailgate* in appropriate order. +- *Test Mail Relay* (`test/relay.py`), a simplistic mail daemon that only + supports the happy path. It accepts a mail message and prints it to + stdandard output. +- *Test Configuration* (`test/e2e.ini`) specifies test cases: their input, + expected results and helpful documentation. It also specifies the port that + the *Test Mail Relay* should listen on. + +## Limitations + +Currently tests only check if the message has been encrypted, without +verifying that the correct key has been used. That's because we don't know +(yet) how to have a reproducible encrypted message. Option +`--faked-system-time` wasn't enough to produce identical output. diff --git a/gpg-mailgate.py b/gpg-mailgate.py index 9830262..b39c786 100755 --- a/gpg-mailgate.py +++ b/gpg-mailgate.py @@ -39,9 +39,14 @@ import traceback from M2Crypto import BIO, Rand, SMIME, X509 from email.mime.message import MIMEMessage +# Environment variable name we read to retrieve configuration path. This is to +# enable non-root users to set up and run GPG Mailgate and to make the software +# testable. +CONFIG_PATH_ENV = "GPG_MAILGATE_CONFIG" + # Read configuration from /etc/gpg-mailgate.conf _cfg = RawConfigParser() -_cfg.read('/etc/gpg-mailgate.conf') +_cfg.read(os.getenv(CONFIG_PATH_ENV, '/etc/gpg-mailgate.conf')) cfg = dict() for sect in _cfg.sections(): cfg[sect] = dict() diff --git a/test/certs/.keep b/test/certs/.keep new file mode 100644 index 0000000..e69de29 diff --git a/test/e2e.ini b/test/e2e.ini new file mode 100644 index 0000000..2d6fb0b --- /dev/null +++ b/test/e2e.ini @@ -0,0 +1,55 @@ +# +# gpg-mailgate +# +# This file is part of the gpg-mailgate source code. +# +# gpg-mailgate is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# gpg-mailgate source code is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with gpg-mailgate source code. If not, see . +# + +# NOTE: We use : syntax, because some values contain +# colons and that is default ConfigParser key-value separator. + +[relay] +port: 2500 +script: test/relay.py + +[dirs] +keys: test/keyhome +certs: test/certs + +[tests] +# Number of "test-*" sections in this file, describing test cases. +cases: 3 +e2e_log: test/logs/e2e.log +e2e_log_format: %(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s +e2e_log_datefmt: %Y-%m-%d %H:%M:%S +lacre_log: test/logs/gpg-mailgate.log + +[case-1] +descr: Clear text message to a user without a key +to: carlos@disposlab +in: test/msgin/clear2clear.msg +out: Body of the message. + +[case-2] +descr: Clear text message to a user with an RSA key +to: alice@disposlab +in: test/msgin/clear2rsa.msg +out: -----BEGIN PGP MESSAGE----- + +[case-3] +descr: Clear text message to a user with an Ed25519 key +to: bob@disposlab +in: test/msgin/clear2ed.msg +out: -----BEGIN PGP MESSAGE----- diff --git a/test/e2e_test.py b/test/e2e_test.py new file mode 100644 index 0000000..d53d6d9 --- /dev/null +++ b/test/e2e_test.py @@ -0,0 +1,149 @@ +#!/usr/local/bin/python2 + +# +# gpg-mailgate +# +# This file is part of the gpg-mailgate source code. +# +# gpg-mailgate is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# gpg-mailgate source code is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with gpg-mailgate source code. If not, see . +# + +import os +import sys + +import difflib + +import ConfigParser +import logging + +from time import sleep + +RELAY_SCRIPT = "test/relay.py" +CONFIG_FILE = "test/gpg-mailgate.conf" + +PYTHON_BIN = "python2.7" + +def build_config(config): + cp = ConfigParser.ConfigParser() + + cp.add_section("logging") + cp.set("logging", "file", config["log_file"]) + cp.set("logging", "verbose", "yes") + + cp.add_section("gpg") + cp.set("gpg", "keyhome", config["gpg_keyhome"]) + + cp.add_section("smime") + cp.set("smime", "cert_path", config["smime_certpath"]) + + cp.add_section("relay") + cp.set("relay", "host", "localhost") + cp.set("relay", "port", config["port"]) + + cp.add_section("enc_keymap") + cp.set("enc_keymap", "alice@disposlab", "1CD245308F0963D038E88357973CF4D9387C44D7") + cp.set("enc_keymap", "bob@disposlab", "19CF4B47ECC9C47AFA84D4BD96F39FDA0E31BB67") + + logging.debug("Created config with keyhome=%s, cert_path=%s and relay at port %d" % + (config["gpg_keyhome"], config["smime_certpath"], config["port"])) + return cp + +def write_test_config(outfile, **config): + logging.debug("Generating configuration with %s" % repr(config)) + + out = open(outfile, "w+") + cp = build_config(config) + cp.write(out) + out.close() + + logging.debug("Wrote configuration to %s" % outfile) + +def load_file(name): + f = open(name, 'r') + contents = f.read() + f.close() + + return contents + +def report_result(message_file, expected, test_output): + status = None + if expected in test_output: + status = "Success" + else: + status = "Failure" + + print message_file.ljust(30), status + +def execute_e2e_test(case_name, config, config_path): + """Read test case configuration from config and run that test case. + + Parameter case_name should refer to a section in test + config file. Each of these sections should contain + following properties: 'descr', 'to', 'in' and 'out'. + """ + + test_command = "GPG_MAILGATE_CONFIG=%s %s gpg-mailgate.py %s < %s" % ( + config_path, + PYTHON_BIN, + config.get(case_name, "to"), + config.get(case_name, "in")) + result_command = "%s %s %d" % (PYTHON_BIN, config.get("relay", "script"), config.getint("relay", "port")) + + logging.debug("Spawning relay: '%s'" % (result_command)) + pipe = os.popen(result_command, 'r') + + logging.debug("Spawning GPG-Lacre: '%s'" % (test_command)) + msgin = os.popen(test_command, 'w') + msgin.write(load_file(config.get(case_name, "in"))) + msgin.close() + + testout = pipe.read() + pipe.close() + + logging.debug("Read %d characters of test output: '%s'" % (len(testout), testout)) + + report_result(config.get(case_name, "in"), config.get(case_name, "out"), testout) + +def load_test_config(): + cp = ConfigParser.ConfigParser() + cp.read("test/e2e.ini") + + return cp + + +config = load_test_config() + +logging.basicConfig(filename = config.get("tests", "e2e_log"), + # Get raw values of log and date formats because they + # contain %-sequences and we don't want them to be expanded + # by the ConfigParser. + format = config.get("tests", "e2e_log_format", True), + datefmt = config.get("tests", "e2e_log_datefmt", True), + level = logging.DEBUG) + +config_path = os.getcwd() + "/" + CONFIG_FILE + +write_test_config(config_path, + port = config.getint("relay", "port"), + gpg_keyhome = config.get("dirs", "keys"), + smime_certpath = config.get("dirs", "certs"), + log_file = config.get("tests", "lacre_log")) + +for case_no in range(1, config.getint("tests", "cases")+1): + case_name = "case-%d" % (case_no) + logging.info("Executing %s: %s", case_name, config.get(case_name, "descr")) + + execute_e2e_test(case_name, config, config_path) + +print "See diagnostic output for details. Tests: '%s', Lacre: '%s'" % (config.get("tests", "e2e_log"), config.get("tests", "lacre_log")) diff --git a/test/keyhome/crls.d/DIR.txt b/test/keyhome/crls.d/DIR.txt new file mode 100644 index 0000000..2a29a47 --- /dev/null +++ b/test/keyhome/crls.d/DIR.txt @@ -0,0 +1 @@ +v:1: diff --git a/test/keyhome/pubring.kbx b/test/keyhome/pubring.kbx new file mode 100644 index 0000000..83e1be1 Binary files /dev/null and b/test/keyhome/pubring.kbx differ diff --git a/test/keyhome/tofu.db b/test/keyhome/tofu.db new file mode 100644 index 0000000..16104b5 Binary files /dev/null and b/test/keyhome/tofu.db differ diff --git a/test/keyhome/trustdb.gpg b/test/keyhome/trustdb.gpg new file mode 100644 index 0000000..7691303 Binary files /dev/null and b/test/keyhome/trustdb.gpg differ diff --git a/test/msgin/clear2clear.msg b/test/msgin/clear2clear.msg new file mode 100644 index 0000000..ae577d1 --- /dev/null +++ b/test/msgin/clear2clear.msg @@ -0,0 +1,5 @@ +From: Dave +To: Carlos +Subject: Test + +Body of the message. diff --git a/test/msgin/clear2ed.msg b/test/msgin/clear2ed.msg new file mode 100644 index 0000000..63b6c66 --- /dev/null +++ b/test/msgin/clear2ed.msg @@ -0,0 +1,5 @@ +From: Dave +To: Bob +Subject: Test + +Body of the message. diff --git a/test/msgin/clear2rsa.msg b/test/msgin/clear2rsa.msg new file mode 100644 index 0000000..9dfd134 --- /dev/null +++ b/test/msgin/clear2rsa.msg @@ -0,0 +1,5 @@ +From: Dave +To: Alice +Subject: Test + +Body of the message. diff --git a/test/relay.py b/test/relay.py new file mode 100644 index 0000000..01b93d4 --- /dev/null +++ b/test/relay.py @@ -0,0 +1,82 @@ +#!/usr/local/bin/python2 +# +# This quick-and-dirty script supports only the happy case of SMTP session, +# i.e. what gpg-mailgate/gpg-lacre needs to deliver encrypted email. +# +# It listens on the port given as the only command-line argument and consumes a +# message, then prints it to standard output. The goal is to be able to +# compare that output with expected clear-text or encrypted message body. +# + +import sys +import socket + + +EXIT_UNAVAILABLE = 1 + +BUFFER_SIZE = 4096 +EOM = "\r\n.\r\n" +LAST_LINE = -3 + + +def welcome(msg): + return "220 %s\r\n" % (msg) + +def ok(msg = "OK"): + return "250 %s\r\n" % (msg) + +def bye(): + return "251 Bye" + +def provide_message(): + return "354 Enter a message, ending it with a '.' on a line by itself\r\n" + +def receive_and_confirm(session): + session.recv(BUFFER_SIZE) + session.sendall(ok()) + +def serve(port): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + s.bind(('', port)) + s.listen(1) + except socket.error, e: + print "Cannot connect", e + sys.exit(EXIT_UNAVAILABLE) + + (conn, addr) = s.accept() + conn.sendall(welcome("TEST SERVER")) + + receive_and_confirm(conn) # Ignore HELO/EHLO + receive_and_confirm(conn) # Ignore sender address + receive_and_confirm(conn) # Ignore recipient address + + data = conn.recv(BUFFER_SIZE) + conn.sendall(provide_message()) + + # Consume until we get ., the end-of-message marker. + message = '' + while not message.endswith(EOM): + message += conn.recv(BUFFER_SIZE) + conn.sendall(ok("OK, id=test")) + + conn.recv(BUFFER_SIZE) + conn.sendall(bye()) + + conn.close() + + # Trim EOM marker as we're only interested in the message body. + return message[:-len(EOM)] + +def error(msg): + print "ERROR: %s" % (msg) + sys.exit(1) + + +if len(sys.argv) < 2: + error("Usage: relay.py PORT_NUMBER") + +port = int(sys.argv[1]) +body = serve(port) + +print body diff --git a/test/test_gnupg.py b/test/test_gnupg.py new file mode 100644 index 0000000..5712acd --- /dev/null +++ b/test/test_gnupg.py @@ -0,0 +1,16 @@ +import GnuPG + +import unittest + +class GnuPGUtilitiesTest(unittest.TestCase): + def test_build_default_command(self): + cmd = GnuPG.build_command("test/keyhome") + self.assertEqual(cmd, ["gpg", "--homedir", "test/keyhome"]) + + def test_build_command_extended_with_args(self): + cmd = GnuPG.build_command("test/keyhome", "--foo", "--bar") + self.assertEqual(cmd, ["gpg", "--homedir", "test/keyhome", "--foo", "--bar"]) + + +if __name__ == '__main__': + unittest.main()