Add tests for Python code #51
|
@ -26,6 +26,18 @@ pip-log.txt
|
||||||
.tox
|
.tox
|
||||||
nosetests.xml
|
nosetests.xml
|
||||||
|
|
||||||
|
# GPG-Mailgate test files
|
||||||
|
test/logs
|
||||||
|
test/tmp
|
||||||
|
test/gpg-mailgate.conf
|
||||||
|
|||||||
|
test/keyhome/random_seed
|
||||||
|
|
||||||
|
# Emacs files
|
||||||
|
*~
|
||||||
|
TAGS
|
||||||
|
TAGS-Python
|
||||||
|
TAGS-PHP
|
||||||
|
|
||||||
# Translations
|
# Translations
|
||||||
*.mo
|
*.mo
|
||||||
|
|
||||||
|
|
|
@ -23,9 +23,20 @@ import subprocess
|
||||||
import shutil
|
import shutil
|
||||||
import random
|
import random
|
||||||
import string
|
import string
|
||||||
|
import sys
|
||||||
|
|
||||||
|
|
||||||
|
LINE_FINGERPRINT = 'fpr'
|
||||||
|
LINE_USER_ID = 'uid'
|
||||||
|
|
||||||
|
POS_FINGERPRINT = 9
|
||||||
|
|
||||||
|
def build_command(key_home, *args, **kwargs):
|
||||||
|
cmd = ["gpg", '--homedir', key_home] + list(args)
|
||||||
|
return cmd
|
||||||
|
|
||||||
def private_keys( keyhome ):
|
def private_keys( keyhome ):
|
||||||
cmd = ['/usr/bin/gpg', '--homedir', keyhome, '--list-secret-keys', '--with-colons']
|
cmd = build_command(keyhome, '--list-secret-keys', '--with-colons')
|
||||||
p = subprocess.Popen( cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
|
p = subprocess.Popen( cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
|
||||||
p.wait()
|
p.wait()
|
||||||
keys = dict()
|
keys = dict()
|
||||||
|
@ -39,17 +50,24 @@ def private_keys( keyhome ):
|
||||||
return keys
|
return keys
|
||||||
|
|
||||||
def public_keys( keyhome ):
|
def public_keys( keyhome ):
|
||||||
cmd = ['/usr/bin/gpg', '--homedir', keyhome, '--list-keys', '--with-colons']
|
cmd = build_command(keyhome, '--list-keys', '--with-colons')
|
||||||
p = subprocess.Popen( cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
|
p = subprocess.Popen( cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
|
||||||
p.wait()
|
p.wait()
|
||||||
|
|
||||||
keys = dict()
|
keys = dict()
|
||||||
|
fingerprint = None
|
||||||
|
email = None
|
||||||
for line in p.stdout.readlines():
|
for line in p.stdout.readlines():
|
||||||
if line[0:3] == 'uid' or line[0:3] == 'pub':
|
if line[0:3] == LINE_FINGERPRINT:
|
||||||
|
fingerprint = line.split(':')[POS_FINGERPRINT]
|
||||||
|
if line[0:3] == LINE_USER_ID:
|
||||||
if ('<' not in line or '>' not in line):
|
if ('<' not in line or '>' not in line):
|
||||||
continue
|
continue
|
||||||
email = line.split('<')[1].split('>')[0]
|
email = line.split('<')[1].split('>')[0]
|
||||||
fingerprint = line.split(':')[4]
|
if not (fingerprint is None or email is None):
|
||||||
keys[fingerprint] = email
|
keys[fingerprint] = email
|
||||||
|
fingerprint = None
|
||||||
|
email = None
|
||||||
return keys
|
return keys
|
||||||
|
|
||||||
# confirms a key has a given email address
|
# confirms a key has a given email address
|
||||||
|
@ -64,7 +82,7 @@ def confirm_key( content, email ):
|
||||||
os.mkdir(tmpkeyhome)
|
os.mkdir(tmpkeyhome)
|
||||||
localized_env = os.environ.copy()
|
localized_env = os.environ.copy()
|
||||||
localized_env["LANG"] = "C"
|
localized_env["LANG"] = "C"
|
||||||
p = subprocess.Popen( ['/usr/bin/gpg', '--homedir', tmpkeyhome, '--import', '--batch'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=localized_env )
|
p = subprocess.Popen( build_command(tmpkeyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=localized_env )
|
||||||
result = p.communicate(input=content)[1]
|
result = p.communicate(input=content)[1]
|
||||||
confirmed = False
|
confirmed = False
|
||||||
|
|
||||||
|
@ -83,7 +101,7 @@ def confirm_key( content, email ):
|
||||||
|
|
||||||
# adds a key and ensures it has the given email address
|
# adds a key and ensures it has the given email address
|
||||||
def add_key( keyhome, content ):
|
def add_key( keyhome, content ):
|
||||||
p = subprocess.Popen( ['/usr/bin/gpg', '--homedir', keyhome, '--import', '--batch'], stdin=subprocess.PIPE, stdout=subprocess.PIPE,stderr=subprocess.PIPE )
|
p = subprocess.Popen( build_command(keyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
|
||||||
p.communicate(input=content)
|
p.communicate(input=content)
|
||||||
p.wait()
|
p.wait()
|
||||||
|
|
||||||
|
@ -93,7 +111,7 @@ def delete_key( keyhome, email ):
|
||||||
|
|
||||||
if result[1]:
|
if result[1]:
|
||||||
# delete all keys matching this email address
|
# delete all keys matching this email address
|
||||||
p = subprocess.Popen( ['/usr/bin/gpg', '--homedir', keyhome, '--delete-key', '--batch', '--yes', result[1]], stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
|
p = subprocess.Popen( build_command(keyhome, '--delete-key', '--batch', '--yes', result[1]), stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
|
||||||
p.wait()
|
p.wait()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -117,7 +135,7 @@ class GPGEncryptor:
|
||||||
return (encdata, p.returncode)
|
return (encdata, p.returncode)
|
||||||
|
|
||||||
def _command(self):
|
def _command(self):
|
||||||
cmd = ["/usr/bin/gpg", "--trust-model", "always", "--homedir", self._keyhome, "--batch", "--yes", "--pgp7", "--no-secmem-warning", "-a", "-e"]
|
cmd = build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--pgp7", "--no-secmem-warning", "-a", "-e")
|
||||||
|
|
||||||
# add recipients
|
# add recipients
|
||||||
for recipient in self._recipients:
|
for recipient in self._recipients:
|
||||||
|
@ -145,6 +163,4 @@ class GPGDecryptor:
|
||||||
return (decdata, p.returncode)
|
return (decdata, p.returncode)
|
||||||
|
|
||||||
def _command(self):
|
def _command(self):
|
||||||
cmd = ["/usr/bin/gpg", "--trust-model", "always", "--homedir", self._keyhome, "--batch", "--yes", "--no-secmem-warning", "-a", "-d"]
|
return build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--no-secmem-warning", "-a", "-d")
|
||||||
|
|
||||||
return cmd
|
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
PYTHON = python2.7
|
||||||
|
|
||||||
|
.PHONY: test pre-clean clean
|
||||||
|
|
||||||
|
#
|
||||||
|
# Run a set of end-to-end tests.
|
||||||
|
#
|
||||||
|
# Test scenarios are described and configured by the test/e2e.ini
|
||||||
|
# file. Basically this is just a script that feeds GPG Mailgate with
|
||||||
|
# known input and checks whether output meets expectations.
|
||||||
|
#
|
||||||
|
test: test/tmp test/logs pre-clean
|
||||||
|
$(PYTHON) test/e2e_test.py
|
||||||
|
|
||||||
|
#
|
||||||
|
# Run unit tests
|
||||||
|
#
|
||||||
|
unittest:
|
||||||
|
$(PYTHON) -m unittest discover -s test
|
||||||
|
|
||||||
|
pre-clean:
|
||||||
|
rm -fv test/gpg-mailgate.conf
|
||||||
|
rm -f test/logs/*.log
|
||||||
|
|
||||||
|
test/tmp:
|
||||||
|
mkdir test/tmp
|
||||||
|
|
||||||
|
test/logs:
|
||||||
|
mkdir test/logs
|
||||||
|
|
||||||
|
clean: pre-clean
|
||||||
|
rm -rfv test/tmp test/logs
|
|
@ -0,0 +1,31 @@
|
||||||
|
# Testing
|
||||||
|
|
||||||
|
First tests have been set up to cover GPG Mailgate with at least basic test
|
||||||
|
that would be easy to run. The tests are called "end-to-end", meaning that we
|
||||||
|
feed some input to GPG Mailgate and inspect the output.
|
||||||
|
|
||||||
|
## Running tests
|
||||||
|
|
||||||
|
To run tests, use command `make test` or `make unittest`.
|
||||||
|
|
||||||
|
Tests produce some helpful logs, so inspect contents of `test/logs` directory
|
||||||
|
if something goes wrong.
|
||||||
|
|
||||||
|
## Key building blocks
|
||||||
|
|
||||||
|
- *Test Script* (`test/e2e_test.py`) that orchestrates the other components.
|
||||||
|
It performs test cases described in the *Test Configuration*. It spawns
|
||||||
|
*Test Mail Relay* and *GPG Mailgate* in appropriate order.
|
||||||
|
- *Test Mail Relay* (`test/relay.py`), a simplistic mail daemon that only
|
||||||
|
supports the happy path. It accepts a mail message and prints it to
|
||||||
|
stdandard output.
|
||||||
|
- *Test Configuration* (`test/e2e.ini`) specifies test cases: their input,
|
||||||
|
expected results and helpful documentation. It also specifies the port that
|
||||||
|
the *Test Mail Relay* should listen on.
|
||||||
|
|
||||||
|
## Limitations
|
||||||
|
|
||||||
|
Currently tests only check if the message has been encrypted, without
|
||||||
|
verifying that the correct key has been used. That's because we don't know
|
||||||
|
(yet) how to have a reproducible encrypted message. Option
|
||||||
|
`--faked-system-time` wasn't enough to produce identical output.
|
|
@ -39,9 +39,14 @@ import traceback
|
||||||
from M2Crypto import BIO, Rand, SMIME, X509
|
from M2Crypto import BIO, Rand, SMIME, X509
|
||||||
from email.mime.message import MIMEMessage
|
from email.mime.message import MIMEMessage
|
||||||
|
|
||||||
|
# Environment variable name we read to retrieve configuration path. This is to
|
||||||
|
# enable non-root users to set up and run GPG Mailgate and to make the software
|
||||||
|
# testable.
|
||||||
|
CONFIG_PATH_ENV = "GPG_MAILGATE_CONFIG"
|
||||||
|
|
||||||
# Read configuration from /etc/gpg-mailgate.conf
|
# Read configuration from /etc/gpg-mailgate.conf
|
||||||
_cfg = RawConfigParser()
|
_cfg = RawConfigParser()
|
||||||
_cfg.read('/etc/gpg-mailgate.conf')
|
_cfg.read(os.getenv(CONFIG_PATH_ENV, '/etc/gpg-mailgate.conf'))
|
||||||
cfg = dict()
|
cfg = dict()
|
||||||
for sect in _cfg.sections():
|
for sect in _cfg.sections():
|
||||||
cfg[sect] = dict()
|
cfg[sect] = dict()
|
||||||
|
|
|
@ -0,0 +1,55 @@
|
||||||
|
#
|
||||||
|
# gpg-mailgate
|
||||||
|
#
|
||||||
|
# This file is part of the gpg-mailgate source code.
|
||||||
|
#
|
||||||
|
# gpg-mailgate is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# gpg-mailgate source code is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
|
||||||
|
# NOTE: We use <key>:<value> syntax, because some values contain
|
||||||
|
# colons and that is default ConfigParser key-value separator.
|
||||||
|
|
||||||
|
[relay]
|
||||||
|
port: 2500
|
||||||
|
script: test/relay.py
|
||||||
|
|
||||||
|
[dirs]
|
||||||
|
keys: test/keyhome
|
||||||
|
certs: test/certs
|
||||||
|
|
||||||
|
[tests]
|
||||||
|
# Number of "test-*" sections in this file, describing test cases.
|
||||||
|
cases: 3
|
||||||
|
e2e_log: test/logs/e2e.log
|
||||||
|
e2e_log_format: %(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s
|
||||||
|
e2e_log_datefmt: %Y-%m-%d %H:%M:%S
|
||||||
|
lacre_log: test/logs/gpg-mailgate.log
|
||||||
|
|
||||||
|
[case-1]
|
||||||
|
descr: Clear text message to a user without a key
|
||||||
|
to: carlos@disposlab
|
||||||
|
in: test/msgin/clear2clear.msg
|
||||||
|
out: Body of the message.
|
||||||
|
|
||||||
|
[case-2]
|
||||||
|
descr: Clear text message to a user with an RSA key
|
||||||
|
to: alice@disposlab
|
||||||
|
in: test/msgin/clear2rsa.msg
|
||||||
|
out: -----BEGIN PGP MESSAGE-----
|
||||||
|
|
||||||
|
[case-3]
|
||||||
|
descr: Clear text message to a user with an Ed25519 key
|
||||||
|
to: bob@disposlab
|
||||||
|
in: test/msgin/clear2ed.msg
|
||||||
|
out: -----BEGIN PGP MESSAGE-----
|
|
@ -0,0 +1,149 @@
|
||||||
|
#!/usr/local/bin/python2
|
||||||
|
|
||||||
|
#
|
||||||
|
# gpg-mailgate
|
||||||
|
#
|
||||||
|
# This file is part of the gpg-mailgate source code.
|
||||||
|
#
|
||||||
|
# gpg-mailgate is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# gpg-mailgate source code is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import difflib
|
||||||
|
|
||||||
|
import ConfigParser
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
RELAY_SCRIPT = "test/relay.py"
|
||||||
|
CONFIG_FILE = "test/gpg-mailgate.conf"
|
||||||
|
|
||||||
|
PYTHON_BIN = "python2.7"
|
||||||
|
|
||||||
|
def build_config(config):
|
||||||
|
cp = ConfigParser.ConfigParser()
|
||||||
|
|
||||||
|
cp.add_section("logging")
|
||||||
|
cp.set("logging", "file", config["log_file"])
|
||||||
|
cp.set("logging", "verbose", "yes")
|
||||||
|
|
||||||
|
cp.add_section("gpg")
|
||||||
|
cp.set("gpg", "keyhome", config["gpg_keyhome"])
|
||||||
|
|
||||||
|
cp.add_section("smime")
|
||||||
|
cp.set("smime", "cert_path", config["smime_certpath"])
|
||||||
|
|
||||||
|
cp.add_section("relay")
|
||||||
|
cp.set("relay", "host", "localhost")
|
||||||
|
cp.set("relay", "port", config["port"])
|
||||||
|
|
||||||
|
cp.add_section("enc_keymap")
|
||||||
|
cp.set("enc_keymap", "alice@disposlab", "1CD245308F0963D038E88357973CF4D9387C44D7")
|
||||||
|
cp.set("enc_keymap", "bob@disposlab", "19CF4B47ECC9C47AFA84D4BD96F39FDA0E31BB67")
|
||||||
|
|
||||||
|
logging.debug("Created config with keyhome=%s, cert_path=%s and relay at port %d" %
|
||||||
|
(config["gpg_keyhome"], config["smime_certpath"], config["port"]))
|
||||||
|
return cp
|
||||||
|
|
||||||
|
def write_test_config(outfile, **config):
|
||||||
|
logging.debug("Generating configuration with %s" % repr(config))
|
||||||
|
|
||||||
|
out = open(outfile, "w+")
|
||||||
|
cp = build_config(config)
|
||||||
|
cp.write(out)
|
||||||
|
out.close()
|
||||||
|
|
||||||
|
logging.debug("Wrote configuration to %s" % outfile)
|
||||||
|
|
||||||
|
def load_file(name):
|
||||||
|
f = open(name, 'r')
|
||||||
|
contents = f.read()
|
||||||
|
f.close()
|
||||||
|
|
||||||
|
return contents
|
||||||
|
|
||||||
|
def report_result(message_file, expected, test_output):
|
||||||
|
status = None
|
||||||
|
if expected in test_output:
|
||||||
|
status = "Success"
|
||||||
|
else:
|
||||||
|
status = "Failure"
|
||||||
|
|
||||||
|
print message_file.ljust(30), status
|
||||||
|
|
||||||
|
def execute_e2e_test(case_name, config, config_path):
|
||||||
|
"""Read test case configuration from config and run that test case.
|
||||||
|
|
||||||
|
Parameter case_name should refer to a section in test
|
||||||
|
config file. Each of these sections should contain
|
||||||
|
following properties: 'descr', 'to', 'in' and 'out'.
|
||||||
|
"""
|
||||||
|
|
||||||
|
test_command = "GPG_MAILGATE_CONFIG=%s %s gpg-mailgate.py %s < %s" % (
|
||||||
|
config_path,
|
||||||
|
PYTHON_BIN,
|
||||||
|
config.get(case_name, "to"),
|
||||||
|
config.get(case_name, "in"))
|
||||||
|
result_command = "%s %s %d" % (PYTHON_BIN, config.get("relay", "script"), config.getint("relay", "port"))
|
||||||
|
|
||||||
|
logging.debug("Spawning relay: '%s'" % (result_command))
|
||||||
|
pipe = os.popen(result_command, 'r')
|
||||||
|
|
||||||
|
logging.debug("Spawning GPG-Lacre: '%s'" % (test_command))
|
||||||
|
msgin = os.popen(test_command, 'w')
|
||||||
|
msgin.write(load_file(config.get(case_name, "in")))
|
||||||
|
msgin.close()
|
||||||
|
|
||||||
|
testout = pipe.read()
|
||||||
|
pipe.close()
|
||||||
|
|
||||||
|
logging.debug("Read %d characters of test output: '%s'" % (len(testout), testout))
|
||||||
|
|
||||||
|
report_result(config.get(case_name, "in"), config.get(case_name, "out"), testout)
|
||||||
|
|
||||||
|
def load_test_config():
|
||||||
|
cp = ConfigParser.ConfigParser()
|
||||||
|
cp.read("test/e2e.ini")
|
||||||
|
|
||||||
|
return cp
|
||||||
|
|
||||||
|
|
||||||
|
config = load_test_config()
|
||||||
|
|
||||||
|
logging.basicConfig(filename = config.get("tests", "e2e_log"),
|
||||||
|
# Get raw values of log and date formats because they
|
||||||
|
# contain %-sequences and we don't want them to be expanded
|
||||||
|
# by the ConfigParser.
|
||||||
|
format = config.get("tests", "e2e_log_format", True),
|
||||||
|
datefmt = config.get("tests", "e2e_log_datefmt", True),
|
||||||
|
level = logging.DEBUG)
|
||||||
|
|
||||||
|
config_path = os.getcwd() + "/" + CONFIG_FILE
|
||||||
|
|
||||||
|
write_test_config(config_path,
|
||||||
|
port = config.getint("relay", "port"),
|
||||||
|
gpg_keyhome = config.get("dirs", "keys"),
|
||||||
|
smime_certpath = config.get("dirs", "certs"),
|
||||||
|
log_file = config.get("tests", "lacre_log"))
|
||||||
|
|
||||||
|
for case_no in range(1, config.getint("tests", "cases")+1):
|
||||||
|
case_name = "case-%d" % (case_no)
|
||||||
|
logging.info("Executing %s: %s", case_name, config.get(case_name, "descr"))
|
||||||
|
|
||||||
|
execute_e2e_test(case_name, config, config_path)
|
||||||
|
|
||||||
|
print "See diagnostic output for details. Tests: '%s', Lacre: '%s'" % (config.get("tests", "e2e_log"), config.get("tests", "lacre_log"))
|
|
@ -0,0 +1 @@
|
||||||
|
v:1:
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1,5 @@
|
||||||
|
From: Dave <dave@localhost>
|
||||||
|
To: Carlos <carlos@localhost>
|
||||||
|
Subject: Test
|
||||||
|
|
||||||
|
Body of the message.
|
|
@ -0,0 +1,5 @@
|
||||||
|
From: Dave <dave@localhost>
|
||||||
|
To: Bob <bob@localhost>
|
||||||
|
Subject: Test
|
||||||
|
|
||||||
|
Body of the message.
|
|
@ -0,0 +1,5 @@
|
||||||
|
From: Dave <dave@localhost>
|
||||||
|
To: Alice <alice@localhost>
|
||||||
|
Subject: Test
|
||||||
|
|
||||||
|
Body of the message.
|
|
@ -0,0 +1,82 @@
|
||||||
|
#!/usr/local/bin/python2
|
||||||
|
#
|
||||||
|
# This quick-and-dirty script supports only the happy case of SMTP session,
|
||||||
|
# i.e. what gpg-mailgate/gpg-lacre needs to deliver encrypted email.
|
||||||
|
#
|
||||||
|
# It listens on the port given as the only command-line argument and consumes a
|
||||||
|
# message, then prints it to standard output. The goal is to be able to
|
||||||
|
# compare that output with expected clear-text or encrypted message body.
|
||||||
|
#
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import socket
|
||||||
|
|
||||||
|
|
||||||
|
EXIT_UNAVAILABLE = 1
|
||||||
|
|
||||||
|
BUFFER_SIZE = 4096
|
||||||
|
EOM = "\r\n.\r\n"
|
||||||
|
LAST_LINE = -3
|
||||||
|
|
||||||
|
|
||||||
|
def welcome(msg):
|
||||||
|
return "220 %s\r\n" % (msg)
|
||||||
|
|
||||||
|
def ok(msg = "OK"):
|
||||||
|
return "250 %s\r\n" % (msg)
|
||||||
|
|
||||||
|
def bye():
|
||||||
|
return "251 Bye"
|
||||||
|
|
||||||
|
def provide_message():
|
||||||
|
return "354 Enter a message, ending it with a '.' on a line by itself\r\n"
|
||||||
|
|
||||||
|
def receive_and_confirm(session):
|
||||||
|
session.recv(BUFFER_SIZE)
|
||||||
|
session.sendall(ok())
|
||||||
|
|
||||||
|
def serve(port):
|
||||||
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
try:
|
||||||
|
s.bind(('', port))
|
||||||
|
s.listen(1)
|
||||||
|
except socket.error, e:
|
||||||
|
print "Cannot connect", e
|
||||||
|
sys.exit(EXIT_UNAVAILABLE)
|
||||||
|
|
||||||
|
(conn, addr) = s.accept()
|
||||||
|
conn.sendall(welcome("TEST SERVER"))
|
||||||
|
|
||||||
|
receive_and_confirm(conn) # Ignore HELO/EHLO
|
||||||
|
receive_and_confirm(conn) # Ignore sender address
|
||||||
|
receive_and_confirm(conn) # Ignore recipient address
|
||||||
|
|
||||||
|
data = conn.recv(BUFFER_SIZE)
|
||||||
|
conn.sendall(provide_message())
|
||||||
|
|
||||||
|
# Consume until we get <CR><LF>.<CR><LF>, the end-of-message marker.
|
||||||
|
message = ''
|
||||||
|
while not message.endswith(EOM):
|
||||||
|
message += conn.recv(BUFFER_SIZE)
|
||||||
|
conn.sendall(ok("OK, id=test"))
|
||||||
|
|
||||||
|
conn.recv(BUFFER_SIZE)
|
||||||
|
conn.sendall(bye())
|
||||||
|
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
# Trim EOM marker as we're only interested in the message body.
|
||||||
|
return message[:-len(EOM)]
|
||||||
|
|
||||||
|
def error(msg):
|
||||||
|
print "ERROR: %s" % (msg)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
error("Usage: relay.py PORT_NUMBER")
|
||||||
|
|
||||||
|
port = int(sys.argv[1])
|
||||||
|
body = serve(port)
|
||||||
|
|
||||||
|
print body
|
|
@ -0,0 +1,16 @@
|
||||||
|
import GnuPG
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
class GnuPGUtilitiesTest(unittest.TestCase):
|
||||||
|
def test_build_default_command(self):
|
||||||
|
cmd = GnuPG.build_command("test/keyhome")
|
||||||
|
self.assertEqual(cmd, ["gpg", "--homedir", "test/keyhome"])
|
||||||
|
|
||||||
|
def test_build_command_extended_with_args(self):
|
||||||
|
cmd = GnuPG.build_command("test/keyhome", "--foo", "--bar")
|
||||||
|
self.assertEqual(cmd, ["gpg", "--homedir", "test/keyhome", "--foo", "--bar"])
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
Loading…
Reference in New Issue
shouldn't we also have
test/keyhome/random_seed
?You're right - I've fixed that in the newest commit.