Browse Source

Perform automatic migration to Python 3.x

Use lib2to3 automatic migration tool provided by Python 2.x to convert
codebase to new idioms.

Command line:

find . -type f -name '*.py' \
    -exec python2.7 -m lib2to3 \
    -f all -f idioms -f buffer -f set_literal -f ws_comma -w \
    '{}' '+'
pull/58/head
Piotr F. Mieszkowski 7 months ago
parent
commit
5f02223ec7
  1. 6
      GnuPG/__init__.py
  2. 12
      gpg-mailgate-web/cron.py
  3. 38
      gpg-mailgate.py
  4. 12
      register-handler.py
  5. 10
      test/e2e_test.py
  6. 8
      test/relay.py

6
GnuPG/__init__.py

@ -101,7 +101,7 @@ def confirm_key( content, email ):
# adds a key and ensures it has the given email address
def add_key( keyhome, content ):
p = subprocess.Popen( build_command(keyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
p = subprocess.Popen( ['/usr/bin/gpg', '--homedir', keyhome, '--import', '--batch'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
p.communicate(input=content)
p.wait()
@ -130,7 +130,7 @@ class GPGEncryptor:
self._message += message
def encrypt(self):
p = subprocess.Popen( self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE,stderr=subprocess.PIPE )
p = subprocess.Popen( self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
encdata = p.communicate(input=self._message)[0]
return (encdata, p.returncode)
@ -158,7 +158,7 @@ class GPGDecryptor:
self._message += message
def decrypt(self):
p = subprocess.Popen( self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE,stderr=subprocess.PIPE )
p = subprocess.Popen( self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
decdata = p.communicate(input=self._message)[0]
return (decdata, p.returncode)

12
gpg-mailgate-web/cron.py

@ -19,7 +19,7 @@
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
#
from ConfigParser import RawConfigParser
from configparser import RawConfigParser
import GnuPG
import MySQLdb
import smtplib
@ -64,7 +64,7 @@ for sect in _cfg.sections():
for (name, value) in _cfg.items(sect):
cfg[sect][name] = value
if cfg.has_key('database') and cfg['database'].has_key('enabled') and cfg['database']['enabled'] == 'yes' and cfg['database'].has_key('name') and cfg['database'].has_key('host') and cfg['database'].has_key('username') and cfg['database'].has_key('password'):
if 'database' in cfg and 'enabled' in cfg['database'] and cfg['database']['enabled'] == 'yes' and 'name' in cfg['database'] and 'host' in cfg['database'] and 'username' in cfg['database'] and 'password' in cfg['database']:
connection = MySQLdb.connect(host = cfg['database']['host'], user = cfg['database']['username'], passwd = cfg['database']['password'], db = cfg['database']['name'], port = 3306)
cursor = connection.cursor()
@ -83,17 +83,17 @@ if cfg.has_key('database') and cfg['database'].has_key('enabled') and cfg['datab
GnuPG.add_key(cfg['gpg']['keyhome'], row[0]) # import the key to gpg
cursor.execute("UPDATE gpgmw_keys SET status = 1 WHERE id = %s", (row[1],)) # mark key as accepted
appendLog('Imported key from <' + row[2] + '>')
if cfg['cron'].has_key('send_email') and cfg['cron']['send_email'] == 'yes':
if 'send_email' in cfg['cron'] and cfg['cron']['send_email'] == 'yes':
send_msg( "PGP key registration successful", "registrationSuccess.md", row[2] )
else:
cursor.execute("DELETE FROM gpgmw_keys WHERE id = %s", (row[1],)) # delete key
appendLog('Import confirmation failed for <' + row[2] + '>')
if cfg['cron'].has_key('send_email') and cfg['cron']['send_email'] == 'yes':
if 'send_email' in cfg['cron'] and cfg['cron']['send_email'] == 'yes':
send_msg( "PGP key registration failed", "registrationError.md", row[2] )
else:
# delete key so we don't continue processing it
cursor.execute("DELETE FROM gpgmw_keys WHERE id = %s", (row[1],))
if cfg['cron'].has_key('send_email') and cfg['cron']['send_email'] == 'yes':
if 'send_email' in cfg['cron'] and cfg['cron']['send_email'] == 'yes':
send_msg( "PGP key deleted", "keyDeleted.md", row[2])
connection.commit()
@ -108,4 +108,4 @@ if cfg.has_key('database') and cfg['database'].has_key('enabled') and cfg['datab
appendLog('Deleted key for <' + row[0] + '>')
connection.commit()
else:
print "Warning: doing nothing since database settings are not configured!"
print("Warning: doing nothing since database settings are not configured!")

38
gpg-mailgate.py

@ -19,7 +19,7 @@
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
#
from ConfigParser import RawConfigParser
from configparser import RawConfigParser
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
import copy
@ -94,7 +94,7 @@ def gpg_decrypt( raw_message, recipients ):
keys[fingerprint] = sanitize_case_sense(keys[fingerprint])
for to in recipients:
if to in keys.values() and not get_bool_from_cfg('default', 'dec_keymap_only', 'yes'):
if to in list(keys.values()) and not get_bool_from_cfg('default', 'dec_keymap_only', 'yes'):
gpg_to.append(to)
# Is this recipient defined in regex for default decryption?
elif not (dec_regex is None) and not (re.match(dec_regex, to) is None):
@ -106,7 +106,7 @@ def gpg_decrypt( raw_message, recipients ):
if not cfg['dec_keymap'][to] in keys:
log("Key '%s' in decryption keymap not found in keyring for email address '%s'. Won't decrypt." % (cfg['dec_keymap'][to], to))
# Avoid unwanted encryption if set
if to in keys.values() and get_bool_from_cfg('default', 'failsave_dec', 'yes'):
if to in list(keys.values()) and get_bool_from_cfg('default', 'failsave_dec', 'yes'):
noenc_to.append(to)
else:
ungpg_to.append(to)
@ -116,7 +116,7 @@ def gpg_decrypt( raw_message, recipients ):
if verbose:
log("Recipient (%s) not in PGP domain list for decrypting." % to)
# Avoid unwanted encryption if set
if to in keys.values() and get_bool_from_cfg('default', 'failsave_dec', 'yes'):
if to in list(keys.values()) and get_bool_from_cfg('default', 'failsave_dec', 'yes'):
noenc_to.append(to)
else:
ungpg_to.append(to)
@ -206,7 +206,7 @@ def decrypt_inline_with_attachments( payloads, success, message = None ):
message = email.mime.multipart.MIMEMultipart(payloads.get_content_subtype())
for payload in payloads.get_payload():
if( type( payload.get_payload() ) == list ):
if( isinstance(payload.get_payload(), list) ):
# Take care of cascaded MIME messages
submessage, subsuccess = decrypt_inline_with_attachments( payload, success )
message.attach(submessage)
@ -317,7 +317,7 @@ def gpg_encrypt( raw_message, recipients ):
log("Key '%s' in encrypt keymap not found in keyring for email address '%s'." % (cfg['enc_keymap'][to], to))
# Check if key in keychain is present
if to in keys.values() and not get_bool_from_cfg('default', 'enc_keymap_only', 'yes'):
if to in list(keys.values()) and not get_bool_from_cfg('default', 'enc_keymap_only', 'yes'):
gpg_to.append( (to, to) )
continue
@ -341,7 +341,7 @@ def gpg_encrypt( raw_message, recipients ):
ungpg_to.append(to)
if gpg_to != list():
log("Encrypting email to: %s" % ' '.join( map(lambda x: x[0], gpg_to) ))
log("Encrypting email to: %s" % ' '.join( [x[0] for x in gpg_to] ))
# Getting PGP style for recipient
gpg_to_smtp_mime = list()
@ -378,8 +378,8 @@ def gpg_encrypt( raw_message, recipients ):
if get_bool_from_cfg('default', 'add_header', 'yes'):
raw_message_mime['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
if raw_message_mime.has_key('Content-Transfer-Encoding'):
raw_message_mime.replace_header('Content-Transfer-Encoding','8BIT')
if 'Content-Transfer-Encoding' in raw_message_mime:
raw_message_mime.replace_header('Content-Transfer-Encoding', '8BIT')
else:
raw_message_mime['Content-Transfer-Encoding'] = '8BIT'
@ -395,8 +395,8 @@ def gpg_encrypt( raw_message, recipients ):
if get_bool_from_cfg('default', 'add_header', 'yes'):
raw_message_inline['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
if raw_message_inline.has_key('Content-Transfer-Encoding'):
raw_message_inline.replace_header('Content-Transfer-Encoding','8BIT')
if 'Content-Transfer-Encoding' in raw_message_inline:
raw_message_inline.replace_header('Content-Transfer-Encoding', '8BIT')
else:
raw_message_inline['Content-Transfer-Encoding'] = '8BIT'
@ -411,11 +411,11 @@ def encrypt_all_payloads_inline( message, gpg_to_cmdline ):
# This breaks cascaded MIME messages. Blame PGP/INLINE.
encrypted_payloads = list()
if type( message.get_payload() ) == str:
if isinstance(message.get_payload(), str):
return encrypt_payload( message, gpg_to_cmdline ).get_payload()
for payload in message.get_payload():
if( type( payload.get_payload() ) == list ):
if( isinstance(payload.get_payload(), list) ):
encrypted_payloads.extend( encrypt_all_payloads_inline( payload, gpg_to_cmdline ) )
else:
encrypted_payloads.append( encrypt_payload( payload, gpg_to_cmdline ) )
@ -437,13 +437,13 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ):
submsg2.set_param('inline', "", 'Content-Disposition' )
submsg2.set_param('filename', "encrypted.asc", 'Content-Disposition' )
if type ( message.get_payload() ) == str:
if isinstance(message.get_payload(), str):
# WTF! It seems to swallow the first line. Not sure why. Perhaps
# it's skipping an imaginary blank line someplace. (ie skipping a header)
# Workaround it here by prepending a blank line.
# This happens only on text only messages.
additionalSubHeader=""
if message.has_key('Content-Type') and not message['Content-Type'].startswith('multipart'):
if 'Content-Type' in message and not message['Content-Type'].startswith('multipart'):
additionalSubHeader="Content-Type: "+message['Content-Type']+"\n"
submsg2.set_payload(additionalSubHeader+"\n" +message.get_payload(decode=True))
check_nested = True
@ -460,7 +460,7 @@ def encrypt_all_payloads_mime( message, gpg_to_cmdline ):
boundary = junk_msg.get_boundary()
# This also modifies the boundary in the body of the message, ie it gets parsed.
if message.has_key('Content-Type'):
if 'Content-Type' in message:
message.replace_header('Content-Type', "multipart/encrypted; protocol=\"application/pgp-encrypted\";\nboundary=\"%s\"\n" % boundary)
else:
message['Content-Type'] = "multipart/encrypted; protocol=\"application/pgp-encrypted\";\nboundary=\"%s\"\n" % boundary
@ -608,7 +608,7 @@ def generate_message_from_payloads( payloads, message = None ):
message = email.mime.multipart.MIMEMultipart(payloads.get_content_subtype())
for payload in payloads.get_payload():
if( type( payload.get_payload() ) == list ):
if( isinstance(payload.get_payload(), list) ):
message.attach(generate_message_from_payloads(payload))
else:
message.attach(payload)
@ -624,7 +624,7 @@ def get_first_payload( payloads ):
def send_msg( message, recipients ):
recipients = filter(None, recipients)
recipients = [_f for _f in recipients if _f]
if recipients:
if not (get_bool_from_cfg('relay', 'host') and get_bool_from_cfg('relay', 'port')):
log("Missing settings for relay. Sending email aborted.")
@ -632,7 +632,7 @@ def send_msg( message, recipients ):
log("Sending email to: <%s>" % '> <'.join( recipients ))
relay = (cfg['relay']['host'], int(cfg['relay']['port']))
smtp = smtplib.SMTP(relay[0], relay[1])
if cfg.has_key('relay') and cfg['relay'].has_key('starttls') and cfg['relay']['starttls'] == 'yes':
if 'relay' in cfg and 'starttls' in cfg['relay'] and cfg['relay']['starttls'] == 'yes':
smtp.starttls()
smtp.sendmail( from_addr, recipients, message )
else:

12
register-handler.py

@ -1,6 +1,6 @@
#!/usr/bin/python
from ConfigParser import RawConfigParser
from configparser import RawConfigParser
import email, os, smtplib, sys, traceback, markdown, syslog, requests
from M2Crypto import BIO, Rand, SMIME, X509
@ -17,7 +17,7 @@ for sect in _cfg.sections():
cfg[sect][name] = value
def log(msg):
if cfg.has_key('logging') and cfg['logging'].has_key('file'):
if 'logging' in cfg and 'file' in cfg['logging']:
if cfg['logging']['file'] == "syslog":
syslog.syslog(syslog.LOG_INFO | syslog.LOG_MAIL, msg)
else:
@ -78,9 +78,9 @@ if __name__ == "__main__":
sys.exit(0)
if sign_type == 'smime':
raw_sig = sign_part.get_payload().replace("\n","")
raw_sig = sign_part.get_payload().replace("\n", "")
# re-wrap signature so that it fits base64 standards
cooked_sig = '\n'.join(raw_sig[pos:pos+76] for pos in xrange(0, len(raw_sig), 76))
cooked_sig = '\n'.join(raw_sig[pos:pos+76] for pos in range(0, len(raw_sig), 76))
# now, wrap the signature in a PKCS7 block
sig = """
@ -106,7 +106,7 @@ if __name__ == "__main__":
# format in user-specific data
# sending success mail only for S/MIME as GPGMW handles this on its own
success_msg = file(cfg['mailregister']['mail_templates']+"/registrationSuccess.md").read()
success_msg = success_msg.replace("[:FROMADDRESS:]",from_addr)
success_msg = success_msg.replace("[:FROMADDRESS:]", from_addr)
msg = MIMEMultipart("alternative")
msg["From"] = cfg['mailregister']['register_email']
@ -128,7 +128,7 @@ if __name__ == "__main__":
if r.status_code != 200:
log("Could not hand registration over to GPGMW. Error: %s" % r.status_code)
error_msg = file(cfg['mailregister']['mail_templates']+"/gpgmwFailed.md").read()
error_msg = error_msg.replace("[:FROMADDRESS:]",from_addr)
error_msg = error_msg.replace("[:FROMADDRESS:]", from_addr)
msg = MIMEMultipart("alternative")
msg["From"] = cfg['mailregister']['register_email']

10
test/e2e_test.py

@ -24,7 +24,7 @@ import sys
import difflib
import ConfigParser
import configparser
import logging
from time import sleep
@ -35,7 +35,7 @@ CONFIG_FILE = "test/gpg-mailgate.conf"
PYTHON_BIN = "python2.7"
def build_config(config):
cp = ConfigParser.ConfigParser()
cp = configparser.ConfigParser()
cp.add_section("logging")
cp.set("logging", "file", config["log_file"])
@ -83,7 +83,7 @@ def report_result(message_file, expected, test_output):
else:
status = "Failure"
print message_file.ljust(30), status
print(message_file.ljust(30), status)
def execute_e2e_test(case_name, config, config_path):
"""Read test case configuration from config and run that test case.
@ -116,7 +116,7 @@ def execute_e2e_test(case_name, config, config_path):
report_result(config.get(case_name, "in"), config.get(case_name, "out"), testout)
def load_test_config():
cp = ConfigParser.ConfigParser()
cp = configparser.ConfigParser()
cp.read("test/e2e.ini")
return cp
@ -146,4 +146,4 @@ for case_no in range(1, config.getint("tests", "cases")+1):
execute_e2e_test(case_name, config, config_path)
print "See diagnostic output for details. Tests: '%s', Lacre: '%s'" % (config.get("tests", "e2e_log"), config.get("tests", "lacre_log"))
print("See diagnostic output for details. Tests: '%s', Lacre: '%s'" % (config.get("tests", "e2e_log"), config.get("tests", "lacre_log")))

8
test/relay.py

@ -40,8 +40,8 @@ def serve(port):
try:
s.bind(('', port))
s.listen(1)
except socket.error, e:
print "Cannot connect", e
except socket.error as e:
print("Cannot connect", e)
sys.exit(EXIT_UNAVAILABLE)
(conn, addr) = s.accept()
@ -69,7 +69,7 @@ def serve(port):
return message[:-len(EOM)]
def error(msg):
print "ERROR: %s" % (msg)
print("ERROR: %s" % (msg))
sys.exit(1)
@ -79,4 +79,4 @@ if len(sys.argv) < 2:
port = int(sys.argv[1])
body = serve(port)
print body
print(body)

Loading…
Cancel
Save