Merge pull request 'Migrate to Python 3.x' (#58) from py3-migration into master

Reviewed-on: #58
Reviewed-by: muppeth <muppeth@no-reply@disroot.org>
This commit is contained in:
pfm 2022-03-08 20:47:25 +00:00
commit 968677f1ec
12 changed files with 244 additions and 93 deletions

View File

@ -58,6 +58,7 @@ def public_keys( keyhome ):
fingerprint = None fingerprint = None
email = None email = None
for line in p.stdout.readlines(): for line in p.stdout.readlines():
line = line.decode('utf-8')
if line[0:3] == LINE_FINGERPRINT: if line[0:3] == LINE_FINGERPRINT:
fingerprint = line.split(':')[POS_FINGERPRINT] fingerprint = line.split(':')[POS_FINGERPRINT]
if line[0:3] == LINE_USER_ID: if line[0:3] == LINE_USER_ID:
@ -120,7 +121,7 @@ def delete_key( keyhome, email ):
class GPGEncryptor: class GPGEncryptor:
def __init__(self, keyhome, recipients = None, charset = None): def __init__(self, keyhome, recipients = None, charset = None):
self._keyhome = keyhome self._keyhome = keyhome
self._message = '' self._message = b''
self._recipients = list() self._recipients = list()
self._charset = charset self._charset = charset
if recipients != None: if recipients != None:
@ -130,7 +131,7 @@ class GPGEncryptor:
self._message += message self._message += message
def encrypt(self): def encrypt(self):
p = subprocess.Popen( self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE,stderr=subprocess.PIPE ) p = subprocess.Popen( self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
encdata = p.communicate(input=self._message)[0] encdata = p.communicate(input=self._message)[0]
return (encdata, p.returncode) return (encdata, p.returncode)
@ -158,7 +159,7 @@ class GPGDecryptor:
self._message += message self._message += message
def decrypt(self): def decrypt(self):
p = subprocess.Popen( self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE,stderr=subprocess.PIPE ) p = subprocess.Popen( self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
decdata = p.communicate(input=self._message)[0] decdata = p.communicate(input=self._message)[0]
return (decdata, p.returncode) return (decdata, p.returncode)

View File

@ -1,6 +1,16 @@
PYTHON = python2.7 .POSIX:
.PHONY: test unittest pre-clean clean
.PHONY: test pre-clean clean #
# On systems where Python 3.x binary has a different name, just
# overwrite the name/path on the command line, like:
#
# make test PYTHON=/usr/local/bin/python3.8
#
# This marco is passed via environment to test/e2e_test.py, where it's
# used to compute further commands.
#
PYTHON = python3
# #
# Run a set of end-to-end tests. # Run a set of end-to-end tests.

View File

@ -11,6 +11,10 @@ To run tests, use command `make test` or `make unittest`.
Tests produce some helpful logs, so inspect contents of `test/logs` directory Tests produce some helpful logs, so inspect contents of `test/logs` directory
if something goes wrong. if something goes wrong.
If your system's Python binary isn't found in your `$PATH` or you want to use
a specific binary, use make's macro overriding: `make test
PYTHON=/path/to/python`.
## Key building blocks ## Key building blocks
- *Test Script* (`test/e2e_test.py`) that orchestrates the other components. - *Test Script* (`test/e2e_test.py`) that orchestrates the other components.
@ -29,3 +33,8 @@ Currently tests only check if the message has been encrypted, without
verifying that the correct key has been used. That's because we don't know verifying that the correct key has been used. That's because we don't know
(yet) how to have a reproducible encrypted message. Option (yet) how to have a reproducible encrypted message. Option
`--faked-system-time` wasn't enough to produce identical output. `--faked-system-time` wasn't enough to produce identical output.
## Troubleshooting
When things go wrong, be sure to study `test/logs/e2e.log` and
`test/logs/gpg-mailgate.log` files -- they contain some useful information.

View File

@ -19,7 +19,7 @@
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>. # along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
# #
from ConfigParser import RawConfigParser from configparser import RawConfigParser
import GnuPG import GnuPG
import MySQLdb import MySQLdb
import smtplib import smtplib
@ -64,7 +64,7 @@ for sect in _cfg.sections():
for (name, value) in _cfg.items(sect): for (name, value) in _cfg.items(sect):
cfg[sect][name] = value cfg[sect][name] = value
if cfg.has_key('database') and cfg['database'].has_key('enabled') and cfg['database']['enabled'] == 'yes' and cfg['database'].has_key('name') and cfg['database'].has_key('host') and cfg['database'].has_key('username') and cfg['database'].has_key('password'): if 'database' in cfg and 'enabled' in cfg['database'] and cfg['database']['enabled'] == 'yes' and 'name' in cfg['database'] and 'host' in cfg['database'] and 'username' in cfg['database'] and 'password' in cfg['database']:
connection = MySQLdb.connect(host = cfg['database']['host'], user = cfg['database']['username'], passwd = cfg['database']['password'], db = cfg['database']['name'], port = 3306) connection = MySQLdb.connect(host = cfg['database']['host'], user = cfg['database']['username'], passwd = cfg['database']['password'], db = cfg['database']['name'], port = 3306)
cursor = connection.cursor() cursor = connection.cursor()
@ -83,17 +83,17 @@ if cfg.has_key('database') and cfg['database'].has_key('enabled') and cfg['datab
GnuPG.add_key(cfg['gpg']['keyhome'], row[0]) # import the key to gpg GnuPG.add_key(cfg['gpg']['keyhome'], row[0]) # import the key to gpg
cursor.execute("UPDATE gpgmw_keys SET status = 1 WHERE id = %s", (row[1],)) # mark key as accepted cursor.execute("UPDATE gpgmw_keys SET status = 1 WHERE id = %s", (row[1],)) # mark key as accepted
appendLog('Imported key from <' + row[2] + '>') appendLog('Imported key from <' + row[2] + '>')
if cfg['cron'].has_key('send_email') and cfg['cron']['send_email'] == 'yes': if 'send_email' in cfg['cron'] and cfg['cron']['send_email'] == 'yes':
send_msg( "PGP key registration successful", "registrationSuccess.md", row[2] ) send_msg( "PGP key registration successful", "registrationSuccess.md", row[2] )
else: else:
cursor.execute("DELETE FROM gpgmw_keys WHERE id = %s", (row[1],)) # delete key cursor.execute("DELETE FROM gpgmw_keys WHERE id = %s", (row[1],)) # delete key
appendLog('Import confirmation failed for <' + row[2] + '>') appendLog('Import confirmation failed for <' + row[2] + '>')
if cfg['cron'].has_key('send_email') and cfg['cron']['send_email'] == 'yes': if 'send_email' in cfg['cron'] and cfg['cron']['send_email'] == 'yes':
send_msg( "PGP key registration failed", "registrationError.md", row[2] ) send_msg( "PGP key registration failed", "registrationError.md", row[2] )
else: else:
# delete key so we don't continue processing it # delete key so we don't continue processing it
cursor.execute("DELETE FROM gpgmw_keys WHERE id = %s", (row[1],)) cursor.execute("DELETE FROM gpgmw_keys WHERE id = %s", (row[1],))
if cfg['cron'].has_key('send_email') and cfg['cron']['send_email'] == 'yes': if 'send_email' in cfg['cron'] and cfg['cron']['send_email'] == 'yes':
send_msg( "PGP key deleted", "keyDeleted.md", row[2]) send_msg( "PGP key deleted", "keyDeleted.md", row[2])
connection.commit() connection.commit()
@ -108,4 +108,4 @@ if cfg.has_key('database') and cfg['database'].has_key('enabled') and cfg['datab
appendLog('Deleted key for <' + row[0] + '>') appendLog('Deleted key for <' + row[0] + '>')
connection.commit() connection.commit()
else: else:
print "Warning: doing nothing since database settings are not configured!" print("Warning: doing nothing since database settings are not configured!")

View File

@ -19,7 +19,7 @@
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>. # along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
# #
from ConfigParser import RawConfigParser from configparser import RawConfigParser
from email.mime.base import MIMEBase from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart from email.mime.multipart import MIMEMultipart
import copy import copy
@ -206,7 +206,7 @@ def decrypt_inline_with_attachments( payloads, success, message = None ):
message = email.mime.multipart.MIMEMultipart(payloads.get_content_subtype()) message = email.mime.multipart.MIMEMultipart(payloads.get_content_subtype())
for payload in payloads.get_payload(): for payload in payloads.get_payload():
if( type( payload.get_payload() ) == list ): if( isinstance(payload.get_payload(), list) ):
# Take care of cascaded MIME messages # Take care of cascaded MIME messages
submessage, subsuccess = decrypt_inline_with_attachments( payload, success ) submessage, subsuccess = decrypt_inline_with_attachments( payload, success )
message.attach(submessage) message.attach(submessage)
@ -255,7 +255,7 @@ def decrypt_inline_with_attachments( payloads, success, message = None ):
# There was no encrypted payload found, so the original payload is attached # There was no encrypted payload found, so the original payload is attached
message.attach(payload) message.attach(payload)
return message, success return message, success
def decrypt_inline_without_attachments( decrypted_message ): def decrypt_inline_without_attachments( decrypted_message ):
@ -341,7 +341,7 @@ def gpg_encrypt( raw_message, recipients ):
ungpg_to.append(to) ungpg_to.append(to)
if gpg_to != list(): if gpg_to != list():
log("Encrypting email to: %s" % ' '.join( map(lambda x: x[0], gpg_to) )) log("Encrypting email to: %s" % ' '.join( x[0] for x in gpg_to ))
# Getting PGP style for recipient # Getting PGP style for recipient
gpg_to_smtp_mime = list() gpg_to_smtp_mime = list()
@ -378,10 +378,10 @@ def gpg_encrypt( raw_message, recipients ):
if get_bool_from_cfg('default', 'add_header', 'yes'): if get_bool_from_cfg('default', 'add_header', 'yes'):
raw_message_mime['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate' raw_message_mime['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
if raw_message_mime.has_key('Content-Transfer-Encoding'): if 'Content-Transfer-Encoding' in raw_message_mime:
raw_message_mime.replace_header('Content-Transfer-Encoding','8BIT') raw_message_mime.replace_header('Content-Transfer-Encoding', '8BIT')
else: else:
raw_message_mime['Content-Transfer-Encoding'] = '8BIT' raw_message_mime['Content-Transfer-Encoding'] = '8BIT'
encrypted_payloads = encrypt_all_payloads_mime( raw_message_mime, gpg_to_cmdline_mime ) encrypted_payloads = encrypt_all_payloads_mime( raw_message_mime, gpg_to_cmdline_mime )
raw_message_mime.set_payload( encrypted_payloads ) raw_message_mime.set_payload( encrypted_payloads )
@ -395,10 +395,10 @@ def gpg_encrypt( raw_message, recipients ):
if get_bool_from_cfg('default', 'add_header', 'yes'): if get_bool_from_cfg('default', 'add_header', 'yes'):
raw_message_inline['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate' raw_message_inline['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
if raw_message_inline.has_key('Content-Transfer-Encoding'): if 'Content-Transfer-Encoding' in raw_message_inline:
raw_message_inline.replace_header('Content-Transfer-Encoding','8BIT') raw_message_inline.replace_header('Content-Transfer-Encoding', '8BIT')
else: else:
raw_message_inline['Content-Transfer-Encoding'] = '8BIT' raw_message_inline['Content-Transfer-Encoding'] = '8BIT'
encrypted_payloads = encrypt_all_payloads_inline( raw_message_inline, gpg_to_cmdline_inline ) encrypted_payloads = encrypt_all_payloads_inline( raw_message_inline, gpg_to_cmdline_inline )
raw_message_inline.set_payload( encrypted_payloads ) raw_message_inline.set_payload( encrypted_payloads )
@ -411,11 +411,11 @@ def encrypt_all_payloads_inline( message, gpg_to_cmdline ):
# This breaks cascaded MIME messages. Blame PGP/INLINE. # This breaks cascaded MIME messages. Blame PGP/INLINE.
encrypted_payloads = list() encrypted_payloads = list()
if type( message.get_payload() ) == str: if isinstance(message.get_payload(), str):
return encrypt_payload( message, gpg_to_cmdline ).get_payload() return encrypt_payload( message, gpg_to_cmdline ).get_payload()
for payload in message.get_payload(): for payload in message.get_payload():
if( type( payload.get_payload() ) == list ): if( isinstance(payload.get_payload(), list) ):
encrypted_payloads.extend( encrypt_all_payloads_inline( payload, gpg_to_cmdline ) ) encrypted_payloads.extend( encrypt_all_payloads_inline( payload, gpg_to_cmdline ) )
else: else:
encrypted_payloads.append( encrypt_payload( payload, gpg_to_cmdline ) ) encrypted_payloads.append( encrypt_payload( payload, gpg_to_cmdline ) )
@ -437,13 +437,13 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ):
submsg2.set_param('inline', "", 'Content-Disposition' ) submsg2.set_param('inline', "", 'Content-Disposition' )
submsg2.set_param('filename', "encrypted.asc", 'Content-Disposition' ) submsg2.set_param('filename', "encrypted.asc", 'Content-Disposition' )
if type ( message.get_payload() ) == str: if isinstance(message.get_payload(), str):
# WTF! It seems to swallow the first line. Not sure why. Perhaps # WTF! It seems to swallow the first line. Not sure why. Perhaps
# it's skipping an imaginary blank line someplace. (ie skipping a header) # it's skipping an imaginary blank line someplace. (ie skipping a header)
# Workaround it here by prepending a blank line. # Workaround it here by prepending a blank line.
# This happens only on text only messages. # This happens only on text only messages.
additionalSubHeader="" additionalSubHeader=""
if message.has_key('Content-Type') and not message['Content-Type'].startswith('multipart'): if 'Content-Type' in message and not message['Content-Type'].startswith('multipart'):
additionalSubHeader="Content-Type: "+message['Content-Type']+"\n" additionalSubHeader="Content-Type: "+message['Content-Type']+"\n"
submsg2.set_payload(additionalSubHeader+"\n" +message.get_payload(decode=True)) submsg2.set_payload(additionalSubHeader+"\n" +message.get_payload(decode=True))
check_nested = True check_nested = True
@ -460,7 +460,7 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ):
boundary = junk_msg.get_boundary() boundary = junk_msg.get_boundary()
# This also modifies the boundary in the body of the message, ie it gets parsed. # This also modifies the boundary in the body of the message, ie it gets parsed.
if message.has_key('Content-Type'): if 'Content-Type' in message:
message.replace_header('Content-Type', "multipart/encrypted; protocol=\"application/pgp-encrypted\";\nboundary=\"%s\"\n" % boundary) message.replace_header('Content-Type', "multipart/encrypted; protocol=\"application/pgp-encrypted\";\nboundary=\"%s\"\n" % boundary)
else: else:
message['Content-Type'] = "multipart/encrypted; protocol=\"application/pgp-encrypted\";\nboundary=\"%s\"\n" % boundary message['Content-Type'] = "multipart/encrypted; protocol=\"application/pgp-encrypted\";\nboundary=\"%s\"\n" % boundary
@ -470,7 +470,7 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ):
def encrypt_payload( payload, gpg_to_cmdline, check_nested = True ): def encrypt_payload( payload, gpg_to_cmdline, check_nested = True ):
raw_payload = payload.get_payload(decode=True) raw_payload = payload.get_payload(decode=True)
if check_nested and "-----BEGIN PGP MESSAGE-----" in raw_payload and "-----END PGP MESSAGE-----" in raw_payload: if check_nested and b"-----BEGIN PGP MESSAGE-----" in raw_payload and b"-----END PGP MESSAGE-----" in raw_payload:
if verbose: if verbose:
log("Message is already pgp encrypted. No nested encryption needed.") log("Message is already pgp encrypted. No nested encryption needed.")
return payload return payload
@ -596,9 +596,13 @@ def sanitize_case_sense( address ):
if get_bool_from_cfg('default', 'mail_case_insensitive', 'yes'): if get_bool_from_cfg('default', 'mail_case_insensitive', 'yes'):
address = address.lower() address = address.lower()
else: else:
splitted_address = address.split('@') if isinstance(address, str):
sep = '@'
else:
sep = b'@'
splitted_address = address.split(sep)
if len(splitted_address) > 1: if len(splitted_address) > 1:
address = splitted_address[0] + '@' + splitted_address[1].lower() address = splitted_address[0] + sep + splitted_address[1].lower()
return address return address
@ -608,7 +612,7 @@ def generate_message_from_payloads( payloads, message = None ):
message = email.mime.multipart.MIMEMultipart(payloads.get_content_subtype()) message = email.mime.multipart.MIMEMultipart(payloads.get_content_subtype())
for payload in payloads.get_payload(): for payload in payloads.get_payload():
if( type( payload.get_payload() ) == list ): if( isinstance(payload.get_payload(), list) ):
message.attach(generate_message_from_payloads(payload)) message.attach(generate_message_from_payloads(payload))
else: else:
message.attach(payload) message.attach(payload)
@ -624,7 +628,7 @@ def get_first_payload( payloads ):
def send_msg( message, recipients ): def send_msg( message, recipients ):
recipients = filter(None, recipients) recipients = [_f for _f in recipients if _f]
if recipients: if recipients:
if not (get_bool_from_cfg('relay', 'host') and get_bool_from_cfg('relay', 'port')): if not (get_bool_from_cfg('relay', 'host') and get_bool_from_cfg('relay', 'port')):
log("Missing settings for relay. Sending email aborted.") log("Missing settings for relay. Sending email aborted.")
@ -632,8 +636,8 @@ def send_msg( message, recipients ):
log("Sending email to: <%s>" % '> <'.join( recipients )) log("Sending email to: <%s>" % '> <'.join( recipients ))
relay = (cfg['relay']['host'], int(cfg['relay']['port'])) relay = (cfg['relay']['host'], int(cfg['relay']['port']))
smtp = smtplib.SMTP(relay[0], relay[1]) smtp = smtplib.SMTP(relay[0], relay[1])
if cfg.has_key('relay') and cfg['relay'].has_key('starttls') and cfg['relay']['starttls'] == 'yes': if 'relay' in cfg and 'starttls' in cfg['relay'] and cfg['relay']['starttls'] == 'yes':
smtp.starttls() smtp.starttls()
smtp.sendmail( from_addr, recipients, message ) smtp.sendmail( from_addr, recipients, message )
else: else:
log("No recipient found") log("No recipient found")
@ -658,7 +662,7 @@ def sort_recipients( raw_message, from_addr, to_addrs ):
return return
first_payload = first_payload.get_payload(decode=True) first_payload = first_payload.get_payload(decode=True)
if "-----BEGIN PGP MESSAGE-----" in first_payload and "-----END PGP MESSAGE-----" in first_payload: if b"-----BEGIN PGP MESSAGE-----" in first_payload and b"-----END PGP MESSAGE-----" in first_payload:
if verbose: if verbose:
log("Message is already encrypted as PGP/INLINE. Encryption aborted.") log("Message is already encrypted as PGP/INLINE. Encryption aborted.")
send_msg(raw_message.as_string(), recipients_left) send_msg(raw_message.as_string(), recipients_left)

View File

@ -1,6 +1,6 @@
#!/usr/bin/python #!/usr/bin/python
from ConfigParser import RawConfigParser from configparser import RawConfigParser
import email, os, smtplib, sys, traceback, markdown, syslog, requests import email, os, smtplib, sys, traceback, markdown, syslog, requests
from M2Crypto import BIO, Rand, SMIME, X509 from M2Crypto import BIO, Rand, SMIME, X509
@ -17,7 +17,7 @@ for sect in _cfg.sections():
cfg[sect][name] = value cfg[sect][name] = value
def log(msg): def log(msg):
if cfg.has_key('logging') and cfg['logging'].has_key('file'): if 'logging' in cfg and 'file' in cfg['logging']:
if cfg['logging']['file'] == "syslog": if cfg['logging']['file'] == "syslog":
syslog.syslog(syslog.LOG_INFO | syslog.LOG_MAIL, msg) syslog.syslog(syslog.LOG_INFO | syslog.LOG_MAIL, msg)
else: else:
@ -78,9 +78,9 @@ if __name__ == "__main__":
sys.exit(0) sys.exit(0)
if sign_type == 'smime': if sign_type == 'smime':
raw_sig = sign_part.get_payload().replace("\n","") raw_sig = sign_part.get_payload().replace("\n", "")
# re-wrap signature so that it fits base64 standards # re-wrap signature so that it fits base64 standards
cooked_sig = '\n'.join(raw_sig[pos:pos+76] for pos in xrange(0, len(raw_sig), 76)) cooked_sig = '\n'.join(raw_sig[pos:pos+76] for pos in range(0, len(raw_sig), 76))
# now, wrap the signature in a PKCS7 block # now, wrap the signature in a PKCS7 block
sig = """ sig = """
@ -106,7 +106,7 @@ if __name__ == "__main__":
# format in user-specific data # format in user-specific data
# sending success mail only for S/MIME as GPGMW handles this on its own # sending success mail only for S/MIME as GPGMW handles this on its own
success_msg = file(cfg['mailregister']['mail_templates']+"/registrationSuccess.md").read() success_msg = file(cfg['mailregister']['mail_templates']+"/registrationSuccess.md").read()
success_msg = success_msg.replace("[:FROMADDRESS:]",from_addr) success_msg = success_msg.replace("[:FROMADDRESS:]", from_addr)
msg = MIMEMultipart("alternative") msg = MIMEMultipart("alternative")
msg["From"] = cfg['mailregister']['register_email'] msg["From"] = cfg['mailregister']['register_email']
@ -128,7 +128,7 @@ if __name__ == "__main__":
if r.status_code != 200: if r.status_code != 200:
log("Could not hand registration over to GPGMW. Error: %s" % r.status_code) log("Could not hand registration over to GPGMW. Error: %s" % r.status_code)
error_msg = file(cfg['mailregister']['mail_templates']+"/gpgmwFailed.md").read() error_msg = file(cfg['mailregister']['mail_templates']+"/gpgmwFailed.md").read()
error_msg = error_msg.replace("[:FROMADDRESS:]",from_addr) error_msg = error_msg.replace("[:FROMADDRESS:]", from_addr)
msg = MIMEMultipart("alternative") msg = MIMEMultipart("alternative")
msg["From"] = cfg['mailregister']['register_email'] msg["From"] = cfg['mailregister']['register_email']

View File

@ -30,7 +30,7 @@ certs: test/certs
[tests] [tests]
# Number of "test-*" sections in this file, describing test cases. # Number of "test-*" sections in this file, describing test cases.
cases: 3 cases: 6
e2e_log: test/logs/e2e.log e2e_log: test/logs/e2e.log
e2e_log_format: %(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s e2e_log_format: %(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s
e2e_log_datefmt: %Y-%m-%d %H:%M:%S e2e_log_datefmt: %Y-%m-%d %H:%M:%S
@ -53,3 +53,21 @@ descr: Clear text message to a user with an Ed25519 key
to: bob@disposlab to: bob@disposlab
in: test/msgin/clear2ed.msg in: test/msgin/clear2ed.msg
out: -----BEGIN PGP MESSAGE----- out: -----BEGIN PGP MESSAGE-----
[case-4]
descr: Encrypted message to a user with an Ed25519 key
to: bob@disposlab
in: test/msgin/ed2ed.msg
out: -----BEGIN PGP MESSAGE-----
[case-5]
descr: Signed message to a user with an Ed25519 key
to: bob@disposlab
in: test/msgin/signed.msg
out: -----BEGIN PGP MESSAGE-----
[case-6]
descr: Multipart encrypted message to a user with an Ed25519 key.
to: bob@disposlab
in: test/msgin/multipart2rsa.msg
out: -----BEGIN PGP MESSAGE-----

View File

@ -1,5 +1,3 @@
#!/usr/local/bin/python2
# #
# gpg-mailgate # gpg-mailgate
# #
@ -22,9 +20,11 @@
import os import os
import sys import sys
import subprocess
import difflib import difflib
import ConfigParser import configparser
import logging import logging
from time import sleep from time import sleep
@ -32,10 +32,8 @@ from time import sleep
RELAY_SCRIPT = "test/relay.py" RELAY_SCRIPT = "test/relay.py"
CONFIG_FILE = "test/gpg-mailgate.conf" CONFIG_FILE = "test/gpg-mailgate.conf"
PYTHON_BIN = "python2.7"
def build_config(config): def build_config(config):
cp = ConfigParser.ConfigParser() cp = configparser.ConfigParser()
cp.add_section("logging") cp.add_section("logging")
cp.set("logging", "file", config["log_file"]) cp.set("logging", "file", config["log_file"])
@ -55,26 +53,25 @@ def build_config(config):
cp.set("enc_keymap", "alice@disposlab", "1CD245308F0963D038E88357973CF4D9387C44D7") cp.set("enc_keymap", "alice@disposlab", "1CD245308F0963D038E88357973CF4D9387C44D7")
cp.set("enc_keymap", "bob@disposlab", "19CF4B47ECC9C47AFA84D4BD96F39FDA0E31BB67") cp.set("enc_keymap", "bob@disposlab", "19CF4B47ECC9C47AFA84D4BD96F39FDA0E31BB67")
logging.debug("Created config with keyhome=%s, cert_path=%s and relay at port %d" % logging.debug(f"Created config with keyhome={config['gpg_keyhome']}, cert_path={config['smime_certpath']} and relay at port {config['port']}")
(config["gpg_keyhome"], config["smime_certpath"], config["port"]))
return cp return cp
def write_test_config(outfile, **config): def write_test_config(outfile, **config):
logging.debug("Generating configuration with %s" % repr(config)) logging.debug(f"Generating configuration with {config!r}")
out = open(outfile, "w+") out = open(outfile, "w+")
cp = build_config(config) cp = build_config(config)
cp.write(out) cp.write(out)
out.close() out.close()
logging.debug("Wrote configuration to %s" % outfile) logging.debug(f"Wrote configuration to {outfile}")
def load_file(name): def load_file(name):
f = open(name, 'r') f = open(name, 'r')
contents = f.read() contents = f.read()
f.close() f.close()
return contents return bytes(contents, 'utf-8')
def report_result(message_file, expected, test_output): def report_result(message_file, expected, test_output):
status = None status = None
@ -83,7 +80,7 @@ def report_result(message_file, expected, test_output):
else: else:
status = "Failure" status = "Failure"
print message_file.ljust(30), status print(message_file.ljust(30), status)
def execute_e2e_test(case_name, config, config_path): def execute_e2e_test(case_name, config, config_path):
"""Read test case configuration from config and run that test case. """Read test case configuration from config and run that test case.
@ -92,31 +89,43 @@ def execute_e2e_test(case_name, config, config_path):
config file. Each of these sections should contain config file. Each of these sections should contain
following properties: 'descr', 'to', 'in' and 'out'. following properties: 'descr', 'to', 'in' and 'out'.
""" """
# This environment variable is set in Makefile.
python_path = os.getenv('PYTHON', 'python3')
test_command = "GPG_MAILGATE_CONFIG=%s %s gpg-mailgate.py %s < %s" % ( gpglacre_cmd = [python_path,
config_path, "gpg-mailgate.py",
PYTHON_BIN, config.get(case_name, "to")]
config.get(case_name, "to"),
config.get(case_name, "in"))
result_command = "%s %s %d" % (PYTHON_BIN, config.get("relay", "script"), config.getint("relay", "port"))
logging.debug("Spawning relay: '%s'" % (result_command)) relay_cmd = [python_path,
pipe = os.popen(result_command, 'r') config.get("relay", "script"),
config.get("relay", "port")]
logging.debug("Spawning GPG-Lacre: '%s'" % (test_command)) logging.debug(f"Spawning relay: {relay_cmd}")
msgin = os.popen(test_command, 'w') relay_proc = subprocess.Popen(relay_cmd,
msgin.write(load_file(config.get(case_name, "in"))) stdin = None,
msgin.close() stdout = subprocess.PIPE)
testout = pipe.read() logging.debug(f"Spawning GPG-Lacre: {gpglacre_cmd}, stdin = {config.get(case_name, 'in')}")
pipe.close()
logging.debug("Read %d characters of test output: '%s'" % (len(testout), testout)) # pass PATH because otherwise it would be dropped
gpglacre_proc = subprocess.run(gpglacre_cmd,
input = load_file(config.get(case_name, "in")),
capture_output = True,
env = {"GPG_MAILGATE_CONFIG": config_path,
"PATH": os.getenv("PATH")})
# Let the relay process the data.
relay_proc.wait()
(testout, _) = relay_proc.communicate()
testout = testout.decode('utf-8')
logging.debug(f"Read {len(testout)} characters of test output: '{testout}'")
report_result(config.get(case_name, "in"), config.get(case_name, "out"), testout) report_result(config.get(case_name, "in"), config.get(case_name, "out"), testout)
def load_test_config(): def load_test_config():
cp = ConfigParser.ConfigParser() cp = configparser.ConfigParser()
cp.read("test/e2e.ini") cp.read("test/e2e.ini")
return cp return cp
@ -128,22 +137,22 @@ logging.basicConfig(filename = config.get("tests", "e2e_log"),
# Get raw values of log and date formats because they # Get raw values of log and date formats because they
# contain %-sequences and we don't want them to be expanded # contain %-sequences and we don't want them to be expanded
# by the ConfigParser. # by the ConfigParser.
format = config.get("tests", "e2e_log_format", True), format = config.get("tests", "e2e_log_format", raw=True),
datefmt = config.get("tests", "e2e_log_datefmt", True), datefmt = config.get("tests", "e2e_log_datefmt", raw=True),
level = logging.DEBUG) level = logging.DEBUG)
config_path = os.getcwd() + "/" + CONFIG_FILE config_path = os.getcwd() + "/" + CONFIG_FILE
write_test_config(config_path, write_test_config(config_path,
port = config.getint("relay", "port"), port = config.get("relay", "port"),
gpg_keyhome = config.get("dirs", "keys"), gpg_keyhome = config.get("dirs", "keys"),
smime_certpath = config.get("dirs", "certs"), smime_certpath = config.get("dirs", "certs"),
log_file = config.get("tests", "lacre_log")) log_file = config.get("tests", "lacre_log"))
for case_no in range(1, config.getint("tests", "cases")+1): for case_no in range(1, config.getint("tests", "cases")+1):
case_name = "case-%d" % (case_no) case_name = f"case-{case_no}"
logging.info("Executing %s: %s", case_name, config.get(case_name, "descr")) logging.info(f"Executing {case_name}: {config.get(case_name, 'descr')}")
execute_e2e_test(case_name, config, config_path) execute_e2e_test(case_name, config, config_path)
print "See diagnostic output for details. Tests: '%s', Lacre: '%s'" % (config.get("tests", "e2e_log"), config.get("tests", "lacre_log")) print("See diagnostic output for details. Tests: '%s', Lacre: '%s'" % (config.get("tests", "e2e_log"), config.get("tests", "lacre_log")))

13
test/msgin/ed2ed.msg Normal file
View File

@ -0,0 +1,13 @@
From: Dave <dave@localhost>
To: Bob <bob@localhost>
Subject: Test
Content-Transfer-Encoding: 7bit
-----BEGIN PGP MESSAGE-----
hF4DujWCoRS24dYSAQdAyGDF9Us11JDr8+XPmvlJHsMS7A4UBIcCiresJyZpSxYw
Cqcugy5AX5fgSAiL1Cd2b1zpQ/rYdTWkFYMVbH4jBEoPC3z/aSd+hTnneJFDUdXl
0koBDIw7NQylu6SrW+Y/DmXgalIHtwACuKivJTq/z9jdwFScV7adRR/VO53Inah3
L1+Ho7Zta95AYW3UPu71Gw3rrkfjY4uGDiFAFg==
=yTzD
-----END PGP MESSAGE-----

View File

@ -0,0 +1,43 @@
Date: Sun, 18 Jul 2021 16:53:45 +0200
From: User Alice <alice@disposlab>
To: User Bob <bob@disposlab>
Subject: encrypted
Message-ID: <YPRAeEEc3z2M9BCy@disposlab>
MIME-Version: 1.0
Content-Type: multipart/encrypted; protocol="application/pgp-encrypted";
boundary="95hZs/zeBetwhuEy"
Content-Disposition: inline
Status: RO
Content-Length: 1140
Lines: 30
--95hZs/zeBetwhuEy
Content-Type: application/pgp-encrypted
Content-Disposition: attachment
Version: 1
--95hZs/zeBetwhuEy
Content-Type: application/octet-stream
Content-Disposition: attachment; filename="msg.asc"
-----BEGIN PGP MESSAGE-----
hQGMA/vsqjpkmZurAQwAnb+2kDPgVFWVLkafuzVJGqFWKNtdVsvk7I1zhzFw5Hsr
h4irSHcH0X0QjaHprNiMBDfIZaCx5VVsvGYLiu/iQkdVPXItugTpln8aAvDt8/Bp
Hse69tgG5S9o4fPK4K2bMjNdomclDdz51cu9NXYjk/6OtzVwcSypyEmxgw24Oo1+
Q8KfZN9n6VTXGNlrV9KnAZYs/5aaSABTeC+cDvOcjDbPAmwDHYS3qsbITYoGHnEz
QfPIakYWPtPWkajhm4Z/iyEUSTeqew1/gAJ8sZnJpV0eg1Cr/44XgklZKFr8aJgk
SG8PkQxsyzAZklpwMSWdbb+t9a5nEKvky3zMpdmS1GE7ubTO7nQ1geUdBiv1UUNh
BY9d4nlGirqxX1MZUTGZidJgCy0365xbJSKkU0yFFW2uWtCKzJTEQBk3YZkNmnGH
h8BiVvMhQ8SxKBRPeH6Zb6HHlbcgkPvJAAI4VLqkZPCBvp9irmcdFGmrgCWLxzgk
sIjYGLA+ZuSXOKuAssXE0sAbASPAkUJRTIjzXFrCnr/MB3ZonESH01fsbsX+E/Qi
+2oLrgjjPHcPq76fvdO6fJP6c1pM8TlOoZKn/RkPm1llULtOn4n5JZJjeUA0F2ID
Te/U9i4YtcFZbuvw2bjeu8sAf77U6O3iTTBWkPWQT3H4YMskQc7lS1Mug6A9HL/n
TQvAwh2MIveYyEy/y/dKeFUbpSKxyOInhTg1XtYFiT8bzEF7OEJLU9GyF5oMs67d
o12uYlEnPhWz9oZp11aSdnyeADpVu6BQsPbwfTifcpajQSarH5sG8+rDSPju
=7CnH
-----END PGP MESSAGE-----
--95hZs/zeBetwhuEy--

39
test/msgin/signed.msg Normal file
View File

@ -0,0 +1,39 @@
Date: Sun, 18 Jul 2021 12:08:41 +0200
From: User Alice <alice@disposlab>
To: User Bob <bob@disposlab>
Subject: signed
Message-ID: <YPP9qer9j2u4qXsq@disposlab>
MIME-Version: 1.0
Content-Type: multipart/signed; micalg=pgp-sha256;
protocol="application/pgp-signature"; boundary="U/XjR71RAixRcb28"
Content-Disposition: inline
Status: RO
Content-Length: 870
Lines: 26
--U/XjR71RAixRcb28
Content-Type: text/plain; charset=us-ascii
Content-Disposition: inline
A signed msg.
--U/XjR71RAixRcb28
Content-Type: application/pgp-signature; name="signature.asc"
-----BEGIN PGP SIGNATURE-----
iQGzBAEBCAAdFiEEHNJFMI8JY9A46INXlzz02Th8RNcFAmDz/aUACgkQlzz02Th8
RNdtOQv/ca8c51KoVq7CyPJUr54n4DEk/LlYniR0W51tL2a4rQxyF2AxqjdI8T4u
bT1+bqPNYgegesyCLokeZKqhLVtCH+UVOTdtUq5bB1J7ALuuVTOIdR5woMBBsazV
ETYEMzL6y2sGPW92ynriEw6B9pPnFKFPhOOZLrnMzM8CpkTfNmGoej+EdV74s0z4
RayKu/WaZ1Dtx2Vy2YDtG36p/Y3n62bnzQJCRyPYfrmCxH5X5i5oibQwxLROCFNE
4X3iVZLPHFg/DS9m4L7mBe0MJewGa1oPFr7t3ZfJ+24aJ/AvUv5uQIO+s6a7AcjD
Pgw/IjeM/uZdPrzniZI2zsWEgsjRCL1fj49XWVNkTHrWCqLvkBg+suucNO2SR0/d
ps+RP5mkJJHaSZyPpxwo9/PHKX67Mkpn/uEXlE8nV6IqKoXRzr1N0qwyhvbZQZLD
FMumxx/eOSiOpaiRhGhoZiUpf+VdnV/1ClpAcdbthy/psx/CMYVblAM8xg74NR9+
Q/WlFbRl
=uMdE
-----END PGP SIGNATURE-----
--U/XjR71RAixRcb28--

View File

@ -14,38 +14,43 @@ import socket
EXIT_UNAVAILABLE = 1 EXIT_UNAVAILABLE = 1
ENCODING = 'utf-8'
BUFFER_SIZE = 4096 BUFFER_SIZE = 4096
EOM = "\r\n.\r\n" EOM = "\r\n.\r\n"
LAST_LINE = -3 LAST_LINE = -3
def welcome(msg): def welcome(msg):
return "220 %s\r\n" % (msg) return b"220 %b\r\n" % (msg)
def ok(msg = "OK"): def ok(msg = b"OK"):
return "250 %s\r\n" % (msg) return b"250 %b\r\n" % (msg)
def bye(): def bye():
return "251 Bye" return b"251 Bye"
def provide_message(): def provide_message():
return "354 Enter a message, ending it with a '.' on a line by itself\r\n" return b"354 Enter a message, ending it with a '.' on a line by itself\r\n"
def receive_and_confirm(session): def receive_and_confirm(session):
session.recv(BUFFER_SIZE) session.recv(BUFFER_SIZE)
session.sendall(ok()) session.sendall(ok())
def localhost_at(port):
return ('127.0.0.1', port)
def serve(port): def serve(port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try: try:
s.bind(('', port)) s.bind(localhost_at(port))
s.listen(1) s.listen(1)
except socket.error, e: except socket.error as e:
print "Cannot connect", e print("Cannot connect", e)
sys.exit(EXIT_UNAVAILABLE) sys.exit(EXIT_UNAVAILABLE)
(conn, addr) = s.accept() (conn, addr) = s.accept()
conn.sendall(welcome("TEST SERVER")) conn.sendall(welcome(b"TEST SERVER"))
receive_and_confirm(conn) # Ignore HELO/EHLO receive_and_confirm(conn) # Ignore HELO/EHLO
receive_and_confirm(conn) # Ignore sender address receive_and_confirm(conn) # Ignore sender address
@ -57,8 +62,8 @@ def serve(port):
# Consume until we get <CR><LF>.<CR><LF>, the end-of-message marker. # Consume until we get <CR><LF>.<CR><LF>, the end-of-message marker.
message = '' message = ''
while not message.endswith(EOM): while not message.endswith(EOM):
message += conn.recv(BUFFER_SIZE) message += conn.recv(BUFFER_SIZE).decode(ENCODING)
conn.sendall(ok("OK, id=test")) conn.sendall(ok(b"OK, id=test"))
conn.recv(BUFFER_SIZE) conn.recv(BUFFER_SIZE)
conn.sendall(bye()) conn.sendall(bye())
@ -68,15 +73,15 @@ def serve(port):
# Trim EOM marker as we're only interested in the message body. # Trim EOM marker as we're only interested in the message body.
return message[:-len(EOM)] return message[:-len(EOM)]
def error(msg): def error(msg, exit_code):
print "ERROR: %s" % (msg) print("ERROR: %s" % (msg))
sys.exit(1) sys.exit(exit_code)
if len(sys.argv) < 2: if len(sys.argv) < 2:
error("Usage: relay.py PORT_NUMBER") error("Usage: relay.py PORT_NUMBER", EXIT_UNAVAILABLE)
port = int(sys.argv[1]) port = int(sys.argv[1])
body = serve(port) body = serve(port)
print body print(body)