Rework how E2E tests are executed

Tests kept breaking (not failing) randomly with "Broken pipe" errors.  Rework
how processes are spawned to make sure that it doesn't break them again.
This commit is contained in:
Piotr F. Mieszkowski 2022-01-19 21:57:46 +01:00
parent c81c6e6e0d
commit a201265f87

View file

@ -20,6 +20,8 @@
import os
import sys
import subprocess
import difflib
import configparser
@ -70,7 +72,7 @@ def load_file(name):
contents = f.read()
f.close()
return contents
return bytes(contents, 'utf-8')
def report_result(message_file, expected, test_output):
status = None
@ -91,23 +93,35 @@ def execute_e2e_test(case_name, config, config_path):
# This environment variable is set in Makefile.
python_path = os.getenv('PYTHON', 'python3')
test_command = "GPG_MAILGATE_CONFIG=%s %s gpg-mailgate.py %s < %s" % (
config_path,
python_path,
config.get(case_name, "to"),
config.get(case_name, "in"))
result_command = "%s %s %d" % (python_path, config.get("relay", "script"), config.getint("relay", "port"))
gpglacre_cmd = [python_path,
"gpg-mailgate.py",
config.get(case_name, "to")]
logging.debug("Spawning relay: '%s'" % (result_command))
pipe = os.popen(result_command, 'r')
relay_cmd = [python_path,
config.get("relay", "script"),
config.get("relay", "port")]
logging.debug("Spawning GPG-Lacre: '%s'" % (test_command))
msgin = os.popen(test_command, 'w')
msgin.write(load_file(config.get(case_name, "in")))
msgin.close()
logging.debug("Spawning relay: '%s'" % (relay_cmd))
relay_proc = subprocess.Popen(relay_cmd,
stdin = None,
stdout = subprocess.PIPE)
testout = pipe.read()
pipe.close()
logging.debug("Spawning GPG-Lacre: '%s', stdin = %s"
% (gpglacre_cmd,
config.get(case_name, "in")))
# pass PATH because otherwise it would be dropped
gpglacre_proc = subprocess.run(gpglacre_cmd,
input = load_file(config.get(case_name, "in")),
capture_output = True,
env = {"GPG_MAILGATE_CONFIG": config_path,
"PATH": os.getenv("PATH")})
# Let the relay process the data.
relay_proc.wait()
(testout, _) = relay_proc.communicate()
testout = testout.decode('utf-8')
logging.debug("Read %d characters of test output: '%s'" % (len(testout), testout))