Merge branch 'master' into php_update

This commit is contained in:
muppeth 2022-03-15 10:37:25 +00:00
commit e9ad17e7bc
22 changed files with 616 additions and 277 deletions

12
.gitignore vendored
View File

@ -26,6 +26,18 @@ pip-log.txt
.tox .tox
nosetests.xml nosetests.xml
# GPG-Mailgate test files
test/logs
test/tmp
test/gpg-mailgate.conf
test/keyhome/random_seed
# Emacs files
*~
TAGS
TAGS-Python
TAGS-PHP
# Translations # Translations
*.mo *.mo

View File

@ -23,9 +23,20 @@ import subprocess
import shutil import shutil
import random import random
import string import string
import sys
LINE_FINGERPRINT = 'fpr'
LINE_USER_ID = 'uid'
POS_FINGERPRINT = 9
def build_command(key_home, *args, **kwargs):
cmd = ["gpg", '--homedir', key_home] + list(args)
return cmd
def private_keys( keyhome ): def private_keys( keyhome ):
cmd = ['/usr/bin/gpg', '--homedir', keyhome, '--list-secret-keys', '--with-colons'] cmd = build_command(keyhome, '--list-secret-keys', '--with-colons')
p = subprocess.Popen( cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) p = subprocess.Popen( cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
p.wait() p.wait()
keys = dict() keys = dict()
@ -39,17 +50,25 @@ def private_keys( keyhome ):
return keys return keys
def public_keys( keyhome ): def public_keys( keyhome ):
cmd = ['/usr/bin/gpg', '--homedir', keyhome, '--list-keys', '--with-colons'] cmd = build_command(keyhome, '--list-keys', '--with-colons')
p = subprocess.Popen( cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) p = subprocess.Popen( cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
p.wait() p.wait()
keys = dict() keys = dict()
fingerprint = None
email = None
for line in p.stdout.readlines(): for line in p.stdout.readlines():
if line[0:3] == 'uid' or line[0:3] == 'pub': line = line.decode('utf-8')
if line[0:3] == LINE_FINGERPRINT:
fingerprint = line.split(':')[POS_FINGERPRINT]
if line[0:3] == LINE_USER_ID:
if ('<' not in line or '>' not in line): if ('<' not in line or '>' not in line):
continue continue
email = line.split('<')[1].split('>')[0] email = line.split('<')[1].split('>')[0]
fingerprint = line.split(':')[4] if not (fingerprint is None or email is None):
keys[fingerprint] = email keys[fingerprint] = email
fingerprint = None
email = None
return keys return keys
# confirms a key has a given email address # confirms a key has a given email address
@ -64,7 +83,7 @@ def confirm_key( content, email ):
os.mkdir(tmpkeyhome) os.mkdir(tmpkeyhome)
localized_env = os.environ.copy() localized_env = os.environ.copy()
localized_env["LANG"] = "C" localized_env["LANG"] = "C"
p = subprocess.Popen( ['/usr/bin/gpg', '--homedir', tmpkeyhome, '--import', '--batch'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=localized_env ) p = subprocess.Popen( build_command(tmpkeyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=localized_env )
result = p.communicate(input=content)[1] result = p.communicate(input=content)[1]
confirmed = False confirmed = False
@ -83,7 +102,7 @@ def confirm_key( content, email ):
# adds a key and ensures it has the given email address # adds a key and ensures it has the given email address
def add_key( keyhome, content ): def add_key( keyhome, content ):
p = subprocess.Popen( ['/usr/bin/gpg', '--homedir', keyhome, '--import', '--batch'], stdin=subprocess.PIPE, stdout=subprocess.PIPE,stderr=subprocess.PIPE ) p = subprocess.Popen( build_command(keyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
p.communicate(input=content) p.communicate(input=content)
p.wait() p.wait()
@ -93,7 +112,7 @@ def delete_key( keyhome, email ):
if result[1]: if result[1]:
# delete all keys matching this email address # delete all keys matching this email address
p = subprocess.Popen( ['/usr/bin/gpg', '--homedir', keyhome, '--delete-key', '--batch', '--yes', result[1]], stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) p = subprocess.Popen( build_command(keyhome, '--delete-key', '--batch', '--yes', result[1]), stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
p.wait() p.wait()
return True return True
@ -102,7 +121,7 @@ def delete_key( keyhome, email ):
class GPGEncryptor: class GPGEncryptor:
def __init__(self, keyhome, recipients = None, charset = None): def __init__(self, keyhome, recipients = None, charset = None):
self._keyhome = keyhome self._keyhome = keyhome
self._message = '' self._message = b''
self._recipients = list() self._recipients = list()
self._charset = charset self._charset = charset
if recipients != None: if recipients != None:
@ -112,12 +131,12 @@ class GPGEncryptor:
self._message += message self._message += message
def encrypt(self): def encrypt(self):
p = subprocess.Popen( self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE,stderr=subprocess.PIPE ) p = subprocess.Popen( self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
encdata = p.communicate(input=self._message)[0] encdata = p.communicate(input=self._message)[0]
return (encdata, p.returncode) return (encdata, p.returncode)
def _command(self): def _command(self):
cmd = ["/usr/bin/gpg", "--trust-model", "always", "--homedir", self._keyhome, "--batch", "--yes", "--pgp7", "--no-secmem-warning", "-a", "-e"] cmd = build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--pgp7", "--no-secmem-warning", "-a", "-e")
# add recipients # add recipients
for recipient in self._recipients: for recipient in self._recipients:
@ -140,11 +159,9 @@ class GPGDecryptor:
self._message += message self._message += message
def decrypt(self): def decrypt(self):
p = subprocess.Popen( self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE,stderr=subprocess.PIPE ) p = subprocess.Popen( self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
decdata = p.communicate(input=self._message)[0] decdata = p.communicate(input=self._message)[0]
return (decdata, p.returncode) return (decdata, p.returncode)
def _command(self): def _command(self):
cmd = ["/usr/bin/gpg", "--trust-model", "always", "--homedir", self._keyhome, "--batch", "--yes", "--no-secmem-warning", "-a", "-d"] return build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--no-secmem-warning", "-a", "-d")
return cmd

42
Makefile Normal file
View File

@ -0,0 +1,42 @@
.POSIX:
.PHONY: test unittest pre-clean clean
#
# On systems where Python 3.x binary has a different name, just
# overwrite the name/path on the command line, like:
#
# make test PYTHON=/usr/local/bin/python3.8
#
# This marco is passed via environment to test/e2e_test.py, where it's
# used to compute further commands.
#
PYTHON = python3
#
# Run a set of end-to-end tests.
#
# Test scenarios are described and configured by the test/e2e.ini
# file. Basically this is just a script that feeds GPG Mailgate with
# known input and checks whether output meets expectations.
#
test: test/tmp test/logs pre-clean
$(PYTHON) test/e2e_test.py
#
# Run unit tests
#
unittest:
$(PYTHON) -m unittest discover -s test
pre-clean:
rm -fv test/gpg-mailgate.conf
rm -f test/logs/*.log
test/tmp:
mkdir test/tmp
test/logs:
mkdir test/logs
clean: pre-clean
rm -rfv test/tmp test/logs

40
doc/testing.md Normal file
View File

@ -0,0 +1,40 @@
# Testing
First tests have been set up to cover GPG Mailgate with at least basic test
that would be easy to run. The tests are called "end-to-end", meaning that we
feed some input to GPG Mailgate and inspect the output.
## Running tests
To run tests, use command `make test` or `make unittest`.
Tests produce some helpful logs, so inspect contents of `test/logs` directory
if something goes wrong.
If your system's Python binary isn't found in your `$PATH` or you want to use
a specific binary, use make's macro overriding: `make test
PYTHON=/path/to/python`.
## Key building blocks
- *Test Script* (`test/e2e_test.py`) that orchestrates the other components.
It performs test cases described in the *Test Configuration*. It spawns
*Test Mail Relay* and *GPG Mailgate* in appropriate order.
- *Test Mail Relay* (`test/relay.py`), a simplistic mail daemon that only
supports the happy path. It accepts a mail message and prints it to
stdandard output.
- *Test Configuration* (`test/e2e.ini`) specifies test cases: their input,
expected results and helpful documentation. It also specifies the port that
the *Test Mail Relay* should listen on.
## Limitations
Currently tests only check if the message has been encrypted, without
verifying that the correct key has been used. That's because we don't know
(yet) how to have a reproducible encrypted message. Option
`--faked-system-time` wasn't enough to produce identical output.
## Troubleshooting
When things go wrong, be sure to study `test/logs/e2e.log` and
`test/logs/gpg-mailgate.log` files -- they contain some useful information.

View File

@ -19,7 +19,7 @@
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>. # along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
# #
from ConfigParser import RawConfigParser from configparser import RawConfigParser
import GnuPG import GnuPG
import MySQLdb import MySQLdb
import smtplib import smtplib
@ -64,7 +64,7 @@ for sect in _cfg.sections():
for (name, value) in _cfg.items(sect): for (name, value) in _cfg.items(sect):
cfg[sect][name] = value cfg[sect][name] = value
if cfg.has_key('database') and cfg['database'].has_key('enabled') and cfg['database']['enabled'] == 'yes' and cfg['database'].has_key('name') and cfg['database'].has_key('host') and cfg['database'].has_key('username') and cfg['database'].has_key('password'): if 'database' in cfg and 'enabled' in cfg['database'] and cfg['database']['enabled'] == 'yes' and 'name' in cfg['database'] and 'host' in cfg['database'] and 'username' in cfg['database'] and 'password' in cfg['database']:
connection = MySQLdb.connect(host = cfg['database']['host'], user = cfg['database']['username'], passwd = cfg['database']['password'], db = cfg['database']['name'], port = 3306) connection = MySQLdb.connect(host = cfg['database']['host'], user = cfg['database']['username'], passwd = cfg['database']['password'], db = cfg['database']['name'], port = 3306)
cursor = connection.cursor() cursor = connection.cursor()
@ -83,17 +83,17 @@ if cfg.has_key('database') and cfg['database'].has_key('enabled') and cfg['datab
GnuPG.add_key(cfg['gpg']['keyhome'], row[0]) # import the key to gpg GnuPG.add_key(cfg['gpg']['keyhome'], row[0]) # import the key to gpg
cursor.execute("UPDATE gpgmw_keys SET status = 1 WHERE id = %s", (row[1],)) # mark key as accepted cursor.execute("UPDATE gpgmw_keys SET status = 1 WHERE id = %s", (row[1],)) # mark key as accepted
appendLog('Imported key from <' + row[2] + '>') appendLog('Imported key from <' + row[2] + '>')
if cfg['cron'].has_key('send_email') and cfg['cron']['send_email'] == 'yes': if 'send_email' in cfg['cron'] and cfg['cron']['send_email'] == 'yes':
send_msg( "PGP key registration successful", "registrationSuccess.md", row[2] ) send_msg( "PGP key registration successful", "registrationSuccess.md", row[2] )
else: else:
cursor.execute("DELETE FROM gpgmw_keys WHERE id = %s", (row[1],)) # delete key cursor.execute("DELETE FROM gpgmw_keys WHERE id = %s", (row[1],)) # delete key
appendLog('Import confirmation failed for <' + row[2] + '>') appendLog('Import confirmation failed for <' + row[2] + '>')
if cfg['cron'].has_key('send_email') and cfg['cron']['send_email'] == 'yes': if 'send_email' in cfg['cron'] and cfg['cron']['send_email'] == 'yes':
send_msg( "PGP key registration failed", "registrationError.md", row[2] ) send_msg( "PGP key registration failed", "registrationError.md", row[2] )
else: else:
# delete key so we don't continue processing it # delete key so we don't continue processing it
cursor.execute("DELETE FROM gpgmw_keys WHERE id = %s", (row[1],)) cursor.execute("DELETE FROM gpgmw_keys WHERE id = %s", (row[1],))
if cfg['cron'].has_key('send_email') and cfg['cron']['send_email'] == 'yes': if 'send_email' in cfg['cron'] and cfg['cron']['send_email'] == 'yes':
send_msg( "PGP key deleted", "keyDeleted.md", row[2]) send_msg( "PGP key deleted", "keyDeleted.md", row[2])
connection.commit() connection.commit()
@ -108,4 +108,4 @@ if cfg.has_key('database') and cfg['database'].has_key('enabled') and cfg['datab
appendLog('Deleted key for <' + row[0] + '>') appendLog('Deleted key for <' + row[0] + '>')
connection.commit() connection.commit()
else: else:
print "Warning: doing nothing since database settings are not configured!" print("Warning: doing nothing since database settings are not configured!")

View File

@ -19,7 +19,7 @@
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>. # along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
# #
from ConfigParser import RawConfigParser from configparser import RawConfigParser
from email.mime.base import MIMEBase from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart from email.mime.multipart import MIMEMultipart
import copy import copy
@ -39,9 +39,14 @@ import traceback
from M2Crypto import BIO, Rand, SMIME, X509 from M2Crypto import BIO, Rand, SMIME, X509
from email.mime.message import MIMEMessage from email.mime.message import MIMEMessage
# Environment variable name we read to retrieve configuration path. This is to
# enable non-root users to set up and run GPG Mailgate and to make the software
# testable.
CONFIG_PATH_ENV = "GPG_MAILGATE_CONFIG"
# Read configuration from /etc/gpg-mailgate.conf # Read configuration from /etc/gpg-mailgate.conf
_cfg = RawConfigParser() _cfg = RawConfigParser()
_cfg.read('/etc/gpg-mailgate.conf') _cfg.read(os.getenv(CONFIG_PATH_ENV, '/etc/gpg-mailgate.conf'))
cfg = dict() cfg = dict()
for sect in _cfg.sections(): for sect in _cfg.sections():
cfg[sect] = dict() cfg[sect] = dict()
@ -65,227 +70,6 @@ raw_message = email.message_from_string( raw )
from_addr = raw_message['From'] from_addr = raw_message['From']
to_addrs = sys.argv[1:] to_addrs = sys.argv[1:]
def gpg_decrypt( raw_message, recipients ):
gpg_to = list()
ungpg_to = list()
# This is needed to avoid encryption if decryption is set to keymap only,
# private key is present but not in keymap.
noenc_to = list()
if not get_bool_from_cfg('gpg', 'keyhome'):
log("No valid entry for gpg keyhome. Decryption aborted.")
return recipients
keys = GnuPG.private_keys( cfg['gpg']['keyhome'] )
if get_bool_from_cfg('default', 'dec_regex'):
dec_regex = cfg['default']['dec_regex']
else:
dec_regex = None
for fingerprint in keys:
keys[fingerprint] = sanitize_case_sense(keys[fingerprint])
for to in recipients:
if to in keys.values() and not get_bool_from_cfg('default', 'dec_keymap_only', 'yes'):
gpg_to.append(to)
# Is this recipient defined in regex for default decryption?
elif not (dec_regex is None) and not (re.match(dec_regex, to) is None):
log("Using default decrytion defined in dec_regex for recipient '%s'" % to)
gpg_to.append(to)
elif get_bool_from_cfg('dec_keymap', to):
log("Decrypt keymap has key '%s'" % cfg['dec_keymap'][to] )
# Check we've got a matching key! If not, decline to attempt decryption. The key is checked for safty reasons.
if not cfg['dec_keymap'][to] in keys:
log("Key '%s' in decryption keymap not found in keyring for email address '%s'. Won't decrypt." % (cfg['dec_keymap'][to], to))
# Avoid unwanted encryption if set
if to in keys.values() and get_bool_from_cfg('default', 'failsave_dec', 'yes'):
noenc_to.append(to)
else:
ungpg_to.append(to)
else:
gpg_to.append(to)
else:
if verbose:
log("Recipient (%s) not in PGP domain list for decrypting." % to)
# Avoid unwanted encryption if set
if to in keys.values() and get_bool_from_cfg('default', 'failsave_dec', 'yes'):
noenc_to.append(to)
else:
ungpg_to.append(to)
if gpg_to != list():
send_msg( gpg_decrypt_all_payloads( raw_message ).as_string(), gpg_to )
if noenc_to != list():
log("Do not try to encrypt mails for: %s" % ', '.join( noenc_to ))
send_msg(raw_message.as_string(), noenc_to)
return ungpg_to
def gpg_decrypt_all_payloads( message ):
# We don't want to modify the original message
decrypted_message = copy.deepcopy(message)
# Check if message is PGP/MIME encrypted
if not (message.get_param('protocol') is None) and message.get_param('protocol') == 'application/pgp-encrypted' and message.is_multipart():
decrypted_message = decrypt_mime(decrypted_message)
# At this point the message could only be PGP/INLINE encrypted, unencrypted or
# encrypted with a mechanism not covered by GPG-Mailgate
elif get_bool_from_cfg('default', 'no_inline_dec', 'no'):
# Check if message is PGP/INLINE encrypted and has attachments (or unencrypted with attachments)
if message.is_multipart():
# Set message's payload to list so payloads can be attached later on
decrypted_message.set_payload(list())
# We only need to hand over the original message here. Not needed for other decrypt implementations.
decrypted_message, success = decrypt_inline_with_attachments(message, False, decrypted_message)
# Add header here to avoid it being appended several times
if get_bool_from_cfg('default', 'add_header', 'yes') and success:
decrypted_message['X-GPG-Mailgate'] = 'Decrypted by GPG Mailgate'
# Check if message is PGP/INLINE encrypted without attachments (or unencrypted without attachments)
else:
decrypted_message = decrypt_inline_without_attachments(decrypted_message)
return decrypted_message
def decrypt_mime( decrypted_message ):
# Please note: Signatures will disappear while decrypting and will not be checked
# Getting the part which should be PGP encrypted (according to RFC)
msg_content = decrypted_message.get_payload(1).get_payload()
if "-----BEGIN PGP MESSAGE-----" in msg_content and "-----END PGP MESSAGE-----" in msg_content:
start = msg_content.find("-----BEGIN PGP MESSAGE-----")
end = msg_content.find("-----END PGP MESSAGE-----")
decrypted_payload, decrypt_success = decrypt_payload(msg_content[start:end + 25])
if decrypt_success:
# Making decrypted_message a "normal" unencrypted message
decrypted_message.del_param('protocol')
decrypted_message.set_type(decrypted_payload.get_content_type())
# Restore Content-Disposition header from original message
if not (decrypted_payload.get('Content-Disposition') is None):
if not (decrypted_message.get('Content-Disposition') is None):
decrypted_message.replace_header('Content-Disposition', decrypted_payload.get('Content-Disposition'))
else:
decrypted_message.set_param(decrypted_payload.get('Content-Disposition'), "", 'Content-Disposition')
if decrypted_payload.is_multipart():
# Clear message's original payload and insert the decrypted payloads
decrypted_message.set_payload(list())
decrypted_message = generate_message_from_payloads( decrypted_payload, decrypted_message )
decrypted_message.preamble = "This is a multi-part message in MIME format"
else:
decrypted_message.set_payload(decrypted_payload.get_payload())
decrypted_message.preamble = None
if get_bool_from_cfg('default', 'add_header', 'yes'):
decrypted_message['X-GPG-Mailgate'] = 'Decrypted by GPG Mailgate'
# If decryption fails, decrypted_message is equal to the original message
return decrypted_message
def decrypt_inline_with_attachments( payloads, success, message = None ):
if message is None:
message = email.mime.multipart.MIMEMultipart(payloads.get_content_subtype())
for payload in payloads.get_payload():
if( type( payload.get_payload() ) == list ):
# Take care of cascaded MIME messages
submessage, subsuccess = decrypt_inline_with_attachments( payload, success )
message.attach(submessage)
success = success or subsuccess
else:
msg_content = payload.get_payload()
# Getting values for different implementations as PGP/INLINE is not implemented
# the same on different clients
pgp_inline_tags = "-----BEGIN PGP MESSAGE-----" in msg_content and "-----END PGP MESSAGE-----" in msg_content
attachment_filename = payload.get_filename()
if pgp_inline_tags or not (attachment_filename is None) and not (re.search('.\.pgp$', attachment_filename) is None):
if pgp_inline_tags:
start = msg_content.find("-----BEGIN PGP MESSAGE-----")
end = msg_content.find("-----END PGP MESSAGE-----")
decrypted_payload, decrypt_success = decrypt_payload(msg_content[start:end + 25])
# Some implementations like Enigmail have strange interpretations of PGP/INLINE
# This tries to cope with it as good as possible.
else:
build_message = """
-----BEGIN PGP MESSAGE-----
%s
-----END PGP MESSAGE-----""" % msg_content
decrypted_payload, decrypt_success = decrypt_payload(build_message)
# Was at least one decryption successful?
success = success or decrypt_success
if decrypt_success:
if not (attachment_filename is None):
attachment_filename = re.sub('\.pgp$', '', attachment_filename)
payload.set_param('filename', attachment_filename, 'Content-Disposition')
payload.set_param('name', attachment_filename, 'Content-Type')
# Need this nasty hack to avoid double blank lines at beginning of message
payload.set_payload(decrypted_payload.as_string()[1:])
message.attach(payload)
else:
# Message could not be decrypted, so non-decrypted message is attached
message.attach(payload)
else:
# There was no encrypted payload found, so the original payload is attached
message.attach(payload)
return message, success
def decrypt_inline_without_attachments( decrypted_message ):
msg_content = decrypted_message.get_payload()
if "-----BEGIN PGP MESSAGE-----" in msg_content and "-----END PGP MESSAGE-----" in msg_content:
start = msg_content.find("-----BEGIN PGP MESSAGE-----")
end = msg_content.find("-----END PGP MESSAGE-----")
decrypted_payload, decrypt_success = decrypt_payload(msg_content[start:end + 25])
if decrypt_success:
# Need this nasty hack to avoid double blank lines at beginning of message
decrypted_message.set_payload(decrypted_payload.as_string()[1:])
if get_bool_from_cfg('default', 'add_header', 'yes'):
decrypted_message['X-GPG-Mailgate'] = 'Decrypted by GPG Mailgate'
# If message was not encrypted, this will just return the original message
return decrypted_message
def decrypt_payload( payload ):
gpg = GnuPG.GPGDecryptor( cfg['gpg']['keyhome'] )
gpg.update( payload )
decrypted_data, returncode = gpg.decrypt()
if verbose:
log("Return code from decryption=%d (0 indicates success)." % returncode)
if returncode != 0:
log("Decrytion failed with return code %d. Decryption aborted." % returncode)
return payload, False
# Decryption always generate a new message
decrypted_msg = email.message_from_string(decrypted_data)
return decrypted_msg, True
def gpg_encrypt( raw_message, recipients ): def gpg_encrypt( raw_message, recipients ):
if not get_bool_from_cfg('gpg', 'keyhome'): if not get_bool_from_cfg('gpg', 'keyhome'):
@ -336,7 +120,7 @@ def gpg_encrypt( raw_message, recipients ):
ungpg_to.append(to) ungpg_to.append(to)
if gpg_to != list(): if gpg_to != list():
log("Encrypting email to: %s" % ' '.join( map(lambda x: x[0], gpg_to) )) log("Encrypting email to: %s" % ' '.join( x[0] for x in gpg_to ))
# Getting PGP style for recipient # Getting PGP style for recipient
gpg_to_smtp_mime = list() gpg_to_smtp_mime = list()
@ -373,10 +157,10 @@ def gpg_encrypt( raw_message, recipients ):
if get_bool_from_cfg('default', 'add_header', 'yes'): if get_bool_from_cfg('default', 'add_header', 'yes'):
raw_message_mime['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate' raw_message_mime['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
if raw_message_mime.has_key('Content-Transfer-Encoding'): if 'Content-Transfer-Encoding' in raw_message_mime:
raw_message_mime.replace_header('Content-Transfer-Encoding','8BIT') raw_message_mime.replace_header('Content-Transfer-Encoding', '8BIT')
else: else:
raw_message_mime['Content-Transfer-Encoding'] = '8BIT' raw_message_mime['Content-Transfer-Encoding'] = '8BIT'
encrypted_payloads = encrypt_all_payloads_mime( raw_message_mime, gpg_to_cmdline_mime ) encrypted_payloads = encrypt_all_payloads_mime( raw_message_mime, gpg_to_cmdline_mime )
raw_message_mime.set_payload( encrypted_payloads ) raw_message_mime.set_payload( encrypted_payloads )
@ -390,10 +174,10 @@ def gpg_encrypt( raw_message, recipients ):
if get_bool_from_cfg('default', 'add_header', 'yes'): if get_bool_from_cfg('default', 'add_header', 'yes'):
raw_message_inline['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate' raw_message_inline['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
if raw_message_inline.has_key('Content-Transfer-Encoding'): if 'Content-Transfer-Encoding' in raw_message_inline:
raw_message_inline.replace_header('Content-Transfer-Encoding','8BIT') raw_message_inline.replace_header('Content-Transfer-Encoding', '8BIT')
else: else:
raw_message_inline['Content-Transfer-Encoding'] = '8BIT' raw_message_inline['Content-Transfer-Encoding'] = '8BIT'
encrypted_payloads = encrypt_all_payloads_inline( raw_message_inline, gpg_to_cmdline_inline ) encrypted_payloads = encrypt_all_payloads_inline( raw_message_inline, gpg_to_cmdline_inline )
raw_message_inline.set_payload( encrypted_payloads ) raw_message_inline.set_payload( encrypted_payloads )
@ -406,11 +190,11 @@ def encrypt_all_payloads_inline( message, gpg_to_cmdline ):
# This breaks cascaded MIME messages. Blame PGP/INLINE. # This breaks cascaded MIME messages. Blame PGP/INLINE.
encrypted_payloads = list() encrypted_payloads = list()
if type( message.get_payload() ) == str: if isinstance(message.get_payload(), str):
return encrypt_payload( message, gpg_to_cmdline ).get_payload() return encrypt_payload( message, gpg_to_cmdline ).get_payload()
for payload in message.get_payload(): for payload in message.get_payload():
if( type( payload.get_payload() ) == list ): if( isinstance(payload.get_payload(), list) ):
encrypted_payloads.extend( encrypt_all_payloads_inline( payload, gpg_to_cmdline ) ) encrypted_payloads.extend( encrypt_all_payloads_inline( payload, gpg_to_cmdline ) )
else: else:
encrypted_payloads.append( encrypt_payload( payload, gpg_to_cmdline ) ) encrypted_payloads.append( encrypt_payload( payload, gpg_to_cmdline ) )
@ -432,13 +216,13 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ):
submsg2.set_param('inline', "", 'Content-Disposition' ) submsg2.set_param('inline', "", 'Content-Disposition' )
submsg2.set_param('filename', "encrypted.asc", 'Content-Disposition' ) submsg2.set_param('filename', "encrypted.asc", 'Content-Disposition' )
if type ( message.get_payload() ) == str: if isinstance(message.get_payload(), str):
# WTF! It seems to swallow the first line. Not sure why. Perhaps # WTF! It seems to swallow the first line. Not sure why. Perhaps
# it's skipping an imaginary blank line someplace. (ie skipping a header) # it's skipping an imaginary blank line someplace. (ie skipping a header)
# Workaround it here by prepending a blank line. # Workaround it here by prepending a blank line.
# This happens only on text only messages. # This happens only on text only messages.
additionalSubHeader="" additionalSubHeader=""
if message.has_key('Content-Type') and not message['Content-Type'].startswith('multipart'): if 'Content-Type' in message and not message['Content-Type'].startswith('multipart'):
additionalSubHeader="Content-Type: "+message['Content-Type']+"\n" additionalSubHeader="Content-Type: "+message['Content-Type']+"\n"
submsg2.set_payload(additionalSubHeader+"\n" +message.get_payload(decode=True)) submsg2.set_payload(additionalSubHeader+"\n" +message.get_payload(decode=True))
check_nested = True check_nested = True
@ -454,8 +238,8 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ):
junk_str = junk_msg.as_string() # WTF! Without this, get_boundary() will return 'None'! junk_str = junk_msg.as_string() # WTF! Without this, get_boundary() will return 'None'!
boundary = junk_msg.get_boundary() boundary = junk_msg.get_boundary()
# This also modifies the boundary in the body of the message, ie it gets parsed. # This also modifies the boundary in the body of the message, ie it gets parsed.
if message.has_key('Content-Type'): if 'Content-Type' in message:
message.replace_header('Content-Type', "multipart/encrypted; protocol=\"application/pgp-encrypted\";\nboundary=\"%s\"\n" % boundary) message.replace_header('Content-Type', "multipart/encrypted; protocol=\"application/pgp-encrypted\";\nboundary=\"%s\"\n" % boundary)
else: else:
message['Content-Type'] = "multipart/encrypted; protocol=\"application/pgp-encrypted\";\nboundary=\"%s\"\n" % boundary message['Content-Type'] = "multipart/encrypted; protocol=\"application/pgp-encrypted\";\nboundary=\"%s\"\n" % boundary
@ -465,7 +249,7 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ):
def encrypt_payload( payload, gpg_to_cmdline, check_nested = True ): def encrypt_payload( payload, gpg_to_cmdline, check_nested = True ):
raw_payload = payload.get_payload(decode=True) raw_payload = payload.get_payload(decode=True)
if check_nested and "-----BEGIN PGP MESSAGE-----" in raw_payload and "-----END PGP MESSAGE-----" in raw_payload: if check_nested and b"-----BEGIN PGP MESSAGE-----" in raw_payload and b"-----END PGP MESSAGE-----" in raw_payload:
if verbose: if verbose:
log("Message is already pgp encrypted. No nested encryption needed.") log("Message is already pgp encrypted. No nested encryption needed.")
return payload return payload
@ -591,9 +375,13 @@ def sanitize_case_sense( address ):
if get_bool_from_cfg('default', 'mail_case_insensitive', 'yes'): if get_bool_from_cfg('default', 'mail_case_insensitive', 'yes'):
address = address.lower() address = address.lower()
else: else:
splitted_address = address.split('@') if isinstance(address, str):
sep = '@'
else:
sep = b'@'
splitted_address = address.split(sep)
if len(splitted_address) > 1: if len(splitted_address) > 1:
address = splitted_address[0] + '@' + splitted_address[1].lower() address = splitted_address[0] + sep + splitted_address[1].lower()
return address return address
@ -603,7 +391,7 @@ def generate_message_from_payloads( payloads, message = None ):
message = email.mime.multipart.MIMEMultipart(payloads.get_content_subtype()) message = email.mime.multipart.MIMEMultipart(payloads.get_content_subtype())
for payload in payloads.get_payload(): for payload in payloads.get_payload():
if( type( payload.get_payload() ) == list ): if( isinstance(payload.get_payload(), list) ):
message.attach(generate_message_from_payloads(payload)) message.attach(generate_message_from_payloads(payload))
else: else:
message.attach(payload) message.attach(payload)
@ -619,7 +407,7 @@ def get_first_payload( payloads ):
def send_msg( message, recipients ): def send_msg( message, recipients ):
recipients = filter(None, recipients) recipients = [_f for _f in recipients if _f]
if recipients: if recipients:
if not (get_bool_from_cfg('relay', 'host') and get_bool_from_cfg('relay', 'port')): if not (get_bool_from_cfg('relay', 'host') and get_bool_from_cfg('relay', 'port')):
log("Missing settings for relay. Sending email aborted.") log("Missing settings for relay. Sending email aborted.")
@ -627,8 +415,8 @@ def send_msg( message, recipients ):
log("Sending email to: <%s>" % '> <'.join( recipients )) log("Sending email to: <%s>" % '> <'.join( recipients ))
relay = (cfg['relay']['host'], int(cfg['relay']['port'])) relay = (cfg['relay']['host'], int(cfg['relay']['port']))
smtp = smtplib.SMTP(relay[0], relay[1]) smtp = smtplib.SMTP(relay[0], relay[1])
if cfg.has_key('relay') and cfg['relay'].has_key('starttls') and cfg['relay']['starttls'] == 'yes': if 'relay' in cfg and 'starttls' in cfg['relay'] and cfg['relay']['starttls'] == 'yes':
smtp.starttls() smtp.starttls()
smtp.sendmail( from_addr, recipients, message ) smtp.sendmail( from_addr, recipients, message )
else: else:
log("No recipient found") log("No recipient found")
@ -639,11 +427,6 @@ def sort_recipients( raw_message, from_addr, to_addrs ):
for recipient in to_addrs: for recipient in to_addrs:
recipients_left.append(sanitize_case_sense(recipient)) recipients_left.append(sanitize_case_sense(recipient))
# Decrypt mails for recipients with known private PGP keys
recipients_left = gpg_decrypt(raw_message, recipients_left)
if recipients_left == list():
return
# There is no need for nested encryption # There is no need for nested encryption
first_payload = get_first_payload(raw_message) first_payload = get_first_payload(raw_message)
if first_payload.get_content_type() == 'application/pkcs7-mime': if first_payload.get_content_type() == 'application/pkcs7-mime':
@ -653,7 +436,7 @@ def sort_recipients( raw_message, from_addr, to_addrs ):
return return
first_payload = first_payload.get_payload(decode=True) first_payload = first_payload.get_payload(decode=True)
if "-----BEGIN PGP MESSAGE-----" in first_payload and "-----END PGP MESSAGE-----" in first_payload: if b"-----BEGIN PGP MESSAGE-----" in first_payload and b"-----END PGP MESSAGE-----" in first_payload:
if verbose: if verbose:
log("Message is already encrypted as PGP/INLINE. Encryption aborted.") log("Message is already encrypted as PGP/INLINE. Encryption aborted.")
send_msg(raw_message.as_string(), recipients_left) send_msg(raw_message.as_string(), recipients_left)

View File

@ -1,6 +1,6 @@
#!/usr/bin/python #!/usr/bin/python
from ConfigParser import RawConfigParser from configparser import RawConfigParser
import email, os, smtplib, sys, traceback, markdown, syslog, requests import email, os, smtplib, sys, traceback, markdown, syslog, requests
from M2Crypto import BIO, Rand, SMIME, X509 from M2Crypto import BIO, Rand, SMIME, X509
@ -17,7 +17,7 @@ for sect in _cfg.sections():
cfg[sect][name] = value cfg[sect][name] = value
def log(msg): def log(msg):
if cfg.has_key('logging') and cfg['logging'].has_key('file'): if 'logging' in cfg and 'file' in cfg['logging']:
if cfg['logging']['file'] == "syslog": if cfg['logging']['file'] == "syslog":
syslog.syslog(syslog.LOG_INFO | syslog.LOG_MAIL, msg) syslog.syslog(syslog.LOG_INFO | syslog.LOG_MAIL, msg)
else: else:
@ -78,9 +78,9 @@ if __name__ == "__main__":
sys.exit(0) sys.exit(0)
if sign_type == 'smime': if sign_type == 'smime':
raw_sig = sign_part.get_payload().replace("\n","") raw_sig = sign_part.get_payload().replace("\n", "")
# re-wrap signature so that it fits base64 standards # re-wrap signature so that it fits base64 standards
cooked_sig = '\n'.join(raw_sig[pos:pos+76] for pos in xrange(0, len(raw_sig), 76)) cooked_sig = '\n'.join(raw_sig[pos:pos+76] for pos in range(0, len(raw_sig), 76))
# now, wrap the signature in a PKCS7 block # now, wrap the signature in a PKCS7 block
sig = """ sig = """
@ -106,7 +106,7 @@ if __name__ == "__main__":
# format in user-specific data # format in user-specific data
# sending success mail only for S/MIME as GPGMW handles this on its own # sending success mail only for S/MIME as GPGMW handles this on its own
success_msg = file(cfg['mailregister']['mail_templates']+"/registrationSuccess.md").read() success_msg = file(cfg['mailregister']['mail_templates']+"/registrationSuccess.md").read()
success_msg = success_msg.replace("[:FROMADDRESS:]",from_addr) success_msg = success_msg.replace("[:FROMADDRESS:]", from_addr)
msg = MIMEMultipart("alternative") msg = MIMEMultipart("alternative")
msg["From"] = cfg['mailregister']['register_email'] msg["From"] = cfg['mailregister']['register_email']
@ -128,7 +128,7 @@ if __name__ == "__main__":
if r.status_code != 200: if r.status_code != 200:
log("Could not hand registration over to GPGMW. Error: %s" % r.status_code) log("Could not hand registration over to GPGMW. Error: %s" % r.status_code)
error_msg = file(cfg['mailregister']['mail_templates']+"/gpgmwFailed.md").read() error_msg = file(cfg['mailregister']['mail_templates']+"/gpgmwFailed.md").read()
error_msg = error_msg.replace("[:FROMADDRESS:]",from_addr) error_msg = error_msg.replace("[:FROMADDRESS:]", from_addr)
msg = MIMEMultipart("alternative") msg = MIMEMultipart("alternative")
msg["From"] = cfg['mailregister']['register_email'] msg["From"] = cfg['mailregister']['register_email']

0
test/certs/.keep Normal file
View File

73
test/e2e.ini Normal file
View File

@ -0,0 +1,73 @@
#
# gpg-mailgate
#
# This file is part of the gpg-mailgate source code.
#
# gpg-mailgate is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# gpg-mailgate source code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
#
# NOTE: We use <key>:<value> syntax, because some values contain
# colons and that is default ConfigParser key-value separator.
[relay]
port: 2500
script: test/relay.py
[dirs]
keys: test/keyhome
certs: test/certs
[tests]
# Number of "test-*" sections in this file, describing test cases.
cases: 6
e2e_log: test/logs/e2e.log
e2e_log_format: %(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s
e2e_log_datefmt: %Y-%m-%d %H:%M:%S
lacre_log: test/logs/gpg-mailgate.log
[case-1]
descr: Clear text message to a user without a key
to: carlos@disposlab
in: test/msgin/clear2clear.msg
out: Body of the message.
[case-2]
descr: Clear text message to a user with an RSA key
to: alice@disposlab
in: test/msgin/clear2rsa.msg
out: -----BEGIN PGP MESSAGE-----
[case-3]
descr: Clear text message to a user with an Ed25519 key
to: bob@disposlab
in: test/msgin/clear2ed.msg
out: -----BEGIN PGP MESSAGE-----
[case-4]
descr: Encrypted message to a user with an Ed25519 key
to: bob@disposlab
in: test/msgin/ed2ed.msg
out: -----BEGIN PGP MESSAGE-----
[case-5]
descr: Signed message to a user with an Ed25519 key
to: bob@disposlab
in: test/msgin/signed.msg
out: -----BEGIN PGP MESSAGE-----
[case-6]
descr: Multipart encrypted message to a user with an Ed25519 key.
to: bob@disposlab
in: test/msgin/multipart2rsa.msg
out: -----BEGIN PGP MESSAGE-----

158
test/e2e_test.py Normal file
View File

@ -0,0 +1,158 @@
#
# gpg-mailgate
#
# This file is part of the gpg-mailgate source code.
#
# gpg-mailgate is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# gpg-mailgate source code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
#
import os
import sys
import subprocess
import difflib
import configparser
import logging
from time import sleep
RELAY_SCRIPT = "test/relay.py"
CONFIG_FILE = "test/gpg-mailgate.conf"
def build_config(config):
cp = configparser.ConfigParser()
cp.add_section("logging")
cp.set("logging", "file", config["log_file"])
cp.set("logging", "verbose", "yes")
cp.add_section("gpg")
cp.set("gpg", "keyhome", config["gpg_keyhome"])
cp.add_section("smime")
cp.set("smime", "cert_path", config["smime_certpath"])
cp.add_section("relay")
cp.set("relay", "host", "localhost")
cp.set("relay", "port", config["port"])
cp.add_section("enc_keymap")
cp.set("enc_keymap", "alice@disposlab", "1CD245308F0963D038E88357973CF4D9387C44D7")
cp.set("enc_keymap", "bob@disposlab", "19CF4B47ECC9C47AFA84D4BD96F39FDA0E31BB67")
logging.debug(f"Created config with keyhome={config['gpg_keyhome']}, cert_path={config['smime_certpath']} and relay at port {config['port']}")
return cp
def write_test_config(outfile, **config):
logging.debug(f"Generating configuration with {config!r}")
out = open(outfile, "w+")
cp = build_config(config)
cp.write(out)
out.close()
logging.debug(f"Wrote configuration to {outfile}")
def load_file(name):
f = open(name, 'r')
contents = f.read()
f.close()
return bytes(contents, 'utf-8')
def report_result(message_file, expected, test_output):
status = None
if expected in test_output:
status = "Success"
else:
status = "Failure"
print(message_file.ljust(30), status)
def execute_e2e_test(case_name, config, config_path):
"""Read test case configuration from config and run that test case.
Parameter case_name should refer to a section in test
config file. Each of these sections should contain
following properties: 'descr', 'to', 'in' and 'out'.
"""
# This environment variable is set in Makefile.
python_path = os.getenv('PYTHON', 'python3')
gpglacre_cmd = [python_path,
"gpg-mailgate.py",
config.get(case_name, "to")]
relay_cmd = [python_path,
config.get("relay", "script"),
config.get("relay", "port")]
logging.debug(f"Spawning relay: {relay_cmd}")
relay_proc = subprocess.Popen(relay_cmd,
stdin = None,
stdout = subprocess.PIPE)
logging.debug(f"Spawning GPG-Lacre: {gpglacre_cmd}, stdin = {config.get(case_name, 'in')}")
# pass PATH because otherwise it would be dropped
gpglacre_proc = subprocess.run(gpglacre_cmd,
input = load_file(config.get(case_name, "in")),
capture_output = True,
env = {"GPG_MAILGATE_CONFIG": config_path,
"PATH": os.getenv("PATH")})
# Let the relay process the data.
relay_proc.wait()
(testout, _) = relay_proc.communicate()
testout = testout.decode('utf-8')
logging.debug(f"Read {len(testout)} characters of test output: '{testout}'")
report_result(config.get(case_name, "in"), config.get(case_name, "out"), testout)
def load_test_config():
cp = configparser.ConfigParser()
cp.read("test/e2e.ini")
return cp
config = load_test_config()
logging.basicConfig(filename = config.get("tests", "e2e_log"),
# Get raw values of log and date formats because they
# contain %-sequences and we don't want them to be expanded
# by the ConfigParser.
format = config.get("tests", "e2e_log_format", raw=True),
datefmt = config.get("tests", "e2e_log_datefmt", raw=True),
level = logging.DEBUG)
config_path = os.getcwd() + "/" + CONFIG_FILE
write_test_config(config_path,
port = config.get("relay", "port"),
gpg_keyhome = config.get("dirs", "keys"),
smime_certpath = config.get("dirs", "certs"),
log_file = config.get("tests", "lacre_log"))
for case_no in range(1, config.getint("tests", "cases")+1):
case_name = f"case-{case_no}"
logging.info(f"Executing {case_name}: {config.get(case_name, 'descr')}")
execute_e2e_test(case_name, config, config_path)
print("See diagnostic output for details. Tests: '%s', Lacre: '%s'" % (config.get("tests", "e2e_log"), config.get("tests", "lacre_log")))

View File

@ -0,0 +1 @@
v:1:

BIN
test/keyhome/pubring.kbx Normal file

Binary file not shown.

BIN
test/keyhome/tofu.db Normal file

Binary file not shown.

BIN
test/keyhome/trustdb.gpg Normal file

Binary file not shown.

View File

@ -0,0 +1,5 @@
From: Dave <dave@localhost>
To: Carlos <carlos@localhost>
Subject: Test
Body of the message.

5
test/msgin/clear2ed.msg Normal file
View File

@ -0,0 +1,5 @@
From: Dave <dave@localhost>
To: Bob <bob@localhost>
Subject: Test
Body of the message.

5
test/msgin/clear2rsa.msg Normal file
View File

@ -0,0 +1,5 @@
From: Dave <dave@localhost>
To: Alice <alice@localhost>
Subject: Test
Body of the message.

13
test/msgin/ed2ed.msg Normal file
View File

@ -0,0 +1,13 @@
From: Dave <dave@localhost>
To: Bob <bob@localhost>
Subject: Test
Content-Transfer-Encoding: 7bit
-----BEGIN PGP MESSAGE-----
hF4DujWCoRS24dYSAQdAyGDF9Us11JDr8+XPmvlJHsMS7A4UBIcCiresJyZpSxYw
Cqcugy5AX5fgSAiL1Cd2b1zpQ/rYdTWkFYMVbH4jBEoPC3z/aSd+hTnneJFDUdXl
0koBDIw7NQylu6SrW+Y/DmXgalIHtwACuKivJTq/z9jdwFScV7adRR/VO53Inah3
L1+Ho7Zta95AYW3UPu71Gw3rrkfjY4uGDiFAFg==
=yTzD
-----END PGP MESSAGE-----

View File

@ -0,0 +1,43 @@
Date: Sun, 18 Jul 2021 16:53:45 +0200
From: User Alice <alice@disposlab>
To: User Bob <bob@disposlab>
Subject: encrypted
Message-ID: <YPRAeEEc3z2M9BCy@disposlab>
MIME-Version: 1.0
Content-Type: multipart/encrypted; protocol="application/pgp-encrypted";
boundary="95hZs/zeBetwhuEy"
Content-Disposition: inline
Status: RO
Content-Length: 1140
Lines: 30
--95hZs/zeBetwhuEy
Content-Type: application/pgp-encrypted
Content-Disposition: attachment
Version: 1
--95hZs/zeBetwhuEy
Content-Type: application/octet-stream
Content-Disposition: attachment; filename="msg.asc"
-----BEGIN PGP MESSAGE-----
hQGMA/vsqjpkmZurAQwAnb+2kDPgVFWVLkafuzVJGqFWKNtdVsvk7I1zhzFw5Hsr
h4irSHcH0X0QjaHprNiMBDfIZaCx5VVsvGYLiu/iQkdVPXItugTpln8aAvDt8/Bp
Hse69tgG5S9o4fPK4K2bMjNdomclDdz51cu9NXYjk/6OtzVwcSypyEmxgw24Oo1+
Q8KfZN9n6VTXGNlrV9KnAZYs/5aaSABTeC+cDvOcjDbPAmwDHYS3qsbITYoGHnEz
QfPIakYWPtPWkajhm4Z/iyEUSTeqew1/gAJ8sZnJpV0eg1Cr/44XgklZKFr8aJgk
SG8PkQxsyzAZklpwMSWdbb+t9a5nEKvky3zMpdmS1GE7ubTO7nQ1geUdBiv1UUNh
BY9d4nlGirqxX1MZUTGZidJgCy0365xbJSKkU0yFFW2uWtCKzJTEQBk3YZkNmnGH
h8BiVvMhQ8SxKBRPeH6Zb6HHlbcgkPvJAAI4VLqkZPCBvp9irmcdFGmrgCWLxzgk
sIjYGLA+ZuSXOKuAssXE0sAbASPAkUJRTIjzXFrCnr/MB3ZonESH01fsbsX+E/Qi
+2oLrgjjPHcPq76fvdO6fJP6c1pM8TlOoZKn/RkPm1llULtOn4n5JZJjeUA0F2ID
Te/U9i4YtcFZbuvw2bjeu8sAf77U6O3iTTBWkPWQT3H4YMskQc7lS1Mug6A9HL/n
TQvAwh2MIveYyEy/y/dKeFUbpSKxyOInhTg1XtYFiT8bzEF7OEJLU9GyF5oMs67d
o12uYlEnPhWz9oZp11aSdnyeADpVu6BQsPbwfTifcpajQSarH5sG8+rDSPju
=7CnH
-----END PGP MESSAGE-----
--95hZs/zeBetwhuEy--

39
test/msgin/signed.msg Normal file
View File

@ -0,0 +1,39 @@
Date: Sun, 18 Jul 2021 12:08:41 +0200
From: User Alice <alice@disposlab>
To: User Bob <bob@disposlab>
Subject: signed
Message-ID: <YPP9qer9j2u4qXsq@disposlab>
MIME-Version: 1.0
Content-Type: multipart/signed; micalg=pgp-sha256;
protocol="application/pgp-signature"; boundary="U/XjR71RAixRcb28"
Content-Disposition: inline
Status: RO
Content-Length: 870
Lines: 26
--U/XjR71RAixRcb28
Content-Type: text/plain; charset=us-ascii
Content-Disposition: inline
A signed msg.
--U/XjR71RAixRcb28
Content-Type: application/pgp-signature; name="signature.asc"
-----BEGIN PGP SIGNATURE-----
iQGzBAEBCAAdFiEEHNJFMI8JY9A46INXlzz02Th8RNcFAmDz/aUACgkQlzz02Th8
RNdtOQv/ca8c51KoVq7CyPJUr54n4DEk/LlYniR0W51tL2a4rQxyF2AxqjdI8T4u
bT1+bqPNYgegesyCLokeZKqhLVtCH+UVOTdtUq5bB1J7ALuuVTOIdR5woMBBsazV
ETYEMzL6y2sGPW92ynriEw6B9pPnFKFPhOOZLrnMzM8CpkTfNmGoej+EdV74s0z4
RayKu/WaZ1Dtx2Vy2YDtG36p/Y3n62bnzQJCRyPYfrmCxH5X5i5oibQwxLROCFNE
4X3iVZLPHFg/DS9m4L7mBe0MJewGa1oPFr7t3ZfJ+24aJ/AvUv5uQIO+s6a7AcjD
Pgw/IjeM/uZdPrzniZI2zsWEgsjRCL1fj49XWVNkTHrWCqLvkBg+suucNO2SR0/d
ps+RP5mkJJHaSZyPpxwo9/PHKX67Mkpn/uEXlE8nV6IqKoXRzr1N0qwyhvbZQZLD
FMumxx/eOSiOpaiRhGhoZiUpf+VdnV/1ClpAcdbthy/psx/CMYVblAM8xg74NR9+
Q/WlFbRl
=uMdE
-----END PGP SIGNATURE-----
--U/XjR71RAixRcb28--

87
test/relay.py Normal file
View File

@ -0,0 +1,87 @@
#!/usr/local/bin/python2
#
# This quick-and-dirty script supports only the happy case of SMTP session,
# i.e. what gpg-mailgate/gpg-lacre needs to deliver encrypted email.
#
# It listens on the port given as the only command-line argument and consumes a
# message, then prints it to standard output. The goal is to be able to
# compare that output with expected clear-text or encrypted message body.
#
import sys
import socket
EXIT_UNAVAILABLE = 1
ENCODING = 'utf-8'
BUFFER_SIZE = 4096
EOM = "\r\n.\r\n"
LAST_LINE = -3
def welcome(msg):
return b"220 %b\r\n" % (msg)
def ok(msg = b"OK"):
return b"250 %b\r\n" % (msg)
def bye():
return b"251 Bye"
def provide_message():
return b"354 Enter a message, ending it with a '.' on a line by itself\r\n"
def receive_and_confirm(session):
session.recv(BUFFER_SIZE)
session.sendall(ok())
def localhost_at(port):
return ('127.0.0.1', port)
def serve(port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.bind(localhost_at(port))
s.listen(1)
except socket.error as e:
print("Cannot connect", e)
sys.exit(EXIT_UNAVAILABLE)
(conn, addr) = s.accept()
conn.sendall(welcome(b"TEST SERVER"))
receive_and_confirm(conn) # Ignore HELO/EHLO
receive_and_confirm(conn) # Ignore sender address
receive_and_confirm(conn) # Ignore recipient address
data = conn.recv(BUFFER_SIZE)
conn.sendall(provide_message())
# Consume until we get <CR><LF>.<CR><LF>, the end-of-message marker.
message = ''
while not message.endswith(EOM):
message += conn.recv(BUFFER_SIZE).decode(ENCODING)
conn.sendall(ok(b"OK, id=test"))
conn.recv(BUFFER_SIZE)
conn.sendall(bye())
conn.close()
# Trim EOM marker as we're only interested in the message body.
return message[:-len(EOM)]
def error(msg, exit_code):
print("ERROR: %s" % (msg))
sys.exit(exit_code)
if len(sys.argv) < 2:
error("Usage: relay.py PORT_NUMBER", EXIT_UNAVAILABLE)
port = int(sys.argv[1])
body = serve(port)
print(body)

16
test/test_gnupg.py Normal file
View File

@ -0,0 +1,16 @@
import GnuPG
import unittest
class GnuPGUtilitiesTest(unittest.TestCase):
def test_build_default_command(self):
cmd = GnuPG.build_command("test/keyhome")
self.assertEqual(cmd, ["gpg", "--homedir", "test/keyhome"])
def test_build_command_extended_with_args(self):
cmd = GnuPG.build_command("test/keyhome", "--foo", "--bar")
self.assertEqual(cmd, ["gpg", "--homedir", "test/keyhome", "--foo", "--bar"])
if __name__ == '__main__':
unittest.main()