Merge pull request 'Implement Advanced Content Filter' (#97) from daemon into main
Reviewed-on: #97 Reviewed-by: muppeth <muppeth@no-reply@disroot.org>
This commit is contained in:
commit
13636bfddd
|
@ -1,22 +1,24 @@
|
|||
#
|
||||
# gpg-mailgate
|
||||
# gpg-mailgate
|
||||
#
|
||||
# This file is part of the gpg-mailgate source code.
|
||||
# This file is part of the gpg-mailgate source code.
|
||||
#
|
||||
# gpg-mailgate is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
# gpg-mailgate is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# gpg-mailgate source code is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
# gpg-mailgate source code is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
"""GnuPG wrapper module."""
|
||||
|
||||
import os
|
||||
import os.path
|
||||
import subprocess
|
||||
|
@ -24,6 +26,7 @@ import shutil
|
|||
import random
|
||||
import string
|
||||
import sys
|
||||
import logging
|
||||
|
||||
|
||||
LINE_FINGERPRINT = 'fpr'
|
||||
|
@ -31,136 +34,161 @@ LINE_USER_ID = 'uid'
|
|||
|
||||
POS_FINGERPRINT = 9
|
||||
|
||||
def build_command(key_home, *args, **kwargs):
|
||||
cmd = ["gpg", '--homedir', key_home] + list(args)
|
||||
return cmd
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
def public_keys( keyhome ):
|
||||
cmd = build_command(keyhome, '--list-keys', '--with-colons')
|
||||
p = subprocess.Popen( cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
|
||||
p.wait()
|
||||
|
||||
keys = dict()
|
||||
fingerprint = None
|
||||
email = None
|
||||
for line in p.stdout.readlines():
|
||||
line = line.decode(sys.getdefaultencoding())
|
||||
if line[0:3] == LINE_FINGERPRINT:
|
||||
fingerprint = line.split(':')[POS_FINGERPRINT]
|
||||
if line[0:3] == LINE_USER_ID:
|
||||
if ('<' not in line or '>' not in line):
|
||||
continue
|
||||
email = line.split('<')[1].split('>')[0]
|
||||
if not (fingerprint is None or email is None):
|
||||
keys[fingerprint] = email
|
||||
fingerprint = None
|
||||
email = None
|
||||
return keys
|
||||
def _build_command(key_home, *args, **kwargs):
|
||||
cmd = ["gpg", '--homedir', key_home] + list(args)
|
||||
return cmd
|
||||
|
||||
|
||||
def public_keys(keyhome):
|
||||
"""List public keys from keyring KEYHOME."""
|
||||
cmd = _build_command(keyhome, '--list-keys', '--with-colons')
|
||||
p = subprocess.Popen(cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
p.wait()
|
||||
|
||||
keys = dict()
|
||||
fingerprint = None
|
||||
email = None
|
||||
for line in p.stdout.readlines():
|
||||
line = line.decode(sys.getdefaultencoding())
|
||||
if line[0:3] == LINE_FINGERPRINT:
|
||||
fingerprint = line.split(':')[POS_FINGERPRINT]
|
||||
if line[0:3] == LINE_USER_ID:
|
||||
if ('<' not in line or '>' not in line):
|
||||
continue
|
||||
email = line.split('<')[1].split('>')[0]
|
||||
if not (fingerprint is None or email is None):
|
||||
keys[fingerprint] = email
|
||||
fingerprint = None
|
||||
email = None
|
||||
return keys
|
||||
|
||||
|
||||
def _to_bytes(s) -> bytes:
|
||||
if isinstance(s, str):
|
||||
return bytes(s, sys.getdefaultencoding())
|
||||
else:
|
||||
return s
|
||||
|
||||
def to_bytes(s) -> bytes:
|
||||
if isinstance(s, str):
|
||||
return bytes(s, sys.getdefaultencoding())
|
||||
else:
|
||||
return s
|
||||
|
||||
# Confirms a key has a given email address by importing it into a temporary
|
||||
# keyring. If this operation succeeds and produces a message mentioning the
|
||||
# expected email, a key is confirmed.
|
||||
def confirm_key( content, email ):
|
||||
tmpkeyhome = ''
|
||||
content = to_bytes(content)
|
||||
expected_email = to_bytes(email.lower())
|
||||
def confirm_key(content, email):
|
||||
"""Verify that the key CONTENT is assigned to identity EMAIL."""
|
||||
tmpkeyhome = ''
|
||||
content = _to_bytes(content)
|
||||
expected_email = _to_bytes(email.lower())
|
||||
|
||||
while True:
|
||||
tmpkeyhome = '/tmp/' + ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(12))
|
||||
if not os.path.exists(tmpkeyhome):
|
||||
break
|
||||
while True:
|
||||
tmpkeyhome = '/tmp/' + ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(12))
|
||||
if not os.path.exists(tmpkeyhome):
|
||||
break
|
||||
|
||||
# let only the owner access the directory, otherwise gpg would complain
|
||||
os.mkdir(tmpkeyhome, mode=0o700)
|
||||
localized_env = os.environ.copy()
|
||||
localized_env["LANG"] = "C"
|
||||
p = subprocess.Popen( build_command(tmpkeyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=localized_env )
|
||||
result = p.communicate(input=content)[1]
|
||||
confirmed = False
|
||||
# let only the owner access the directory, otherwise gpg would complain
|
||||
os.mkdir(tmpkeyhome, mode=0o700)
|
||||
localized_env = os.environ.copy()
|
||||
localized_env["LANG"] = "C"
|
||||
p = subprocess.Popen(_build_command(tmpkeyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=localized_env)
|
||||
result = p.communicate(input=content)[1]
|
||||
confirmed = False
|
||||
|
||||
for line in result.split(b"\n"):
|
||||
if b'imported' in line and b'<' in line and b'>' in line:
|
||||
if line.split(b'<')[1].split(b'>')[0].lower() == expected_email:
|
||||
confirmed = True
|
||||
break
|
||||
else:
|
||||
break # confirmation failed
|
||||
for line in result.split(b"\n"):
|
||||
if b'imported' in line and b'<' in line and b'>' in line:
|
||||
if line.split(b'<')[1].split(b'>')[0].lower() == expected_email:
|
||||
confirmed = True
|
||||
break
|
||||
else:
|
||||
break # confirmation failed
|
||||
|
||||
# cleanup
|
||||
shutil.rmtree(tmpkeyhome)
|
||||
# cleanup
|
||||
shutil.rmtree(tmpkeyhome)
|
||||
|
||||
return confirmed
|
||||
|
||||
return confirmed
|
||||
|
||||
# adds a key and ensures it has the given email address
|
||||
def add_key( keyhome, content ):
|
||||
if isinstance(content, str):
|
||||
content = bytes(content, sys.getdefaultencoding())
|
||||
p = subprocess.Popen( build_command(keyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
|
||||
p.communicate(input=content)
|
||||
p.wait()
|
||||
def add_key(keyhome, content):
|
||||
"""Register new key CONTENT in the keyring KEYHOME."""
|
||||
if isinstance(content, str):
|
||||
content = bytes(content, sys.getdefaultencoding())
|
||||
p = subprocess.Popen(_build_command(keyhome, '--import', '--batch'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
p.communicate(input=content)
|
||||
p.wait()
|
||||
|
||||
def delete_key( keyhome, email ):
|
||||
from email.utils import parseaddr
|
||||
result = parseaddr(email)
|
||||
|
||||
if result[1]:
|
||||
# delete all keys matching this email address
|
||||
p = subprocess.Popen( build_command(keyhome, '--delete-key', '--batch', '--yes', result[1]), stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
|
||||
p.wait()
|
||||
return True
|
||||
def delete_key(keyhome, email):
|
||||
"""Remove key assigned to identity EMAIL from keyring KEYHOME."""
|
||||
from email.utils import parseaddr
|
||||
result = parseaddr(email)
|
||||
|
||||
if result[1]:
|
||||
# delete all keys matching this email address
|
||||
p = subprocess.Popen(_build_command(keyhome, '--delete-key', '--batch', '--yes', result[1]), stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
p.wait()
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
return False
|
||||
|
||||
class GPGEncryptor:
|
||||
def __init__(self, keyhome, recipients = None, charset = None):
|
||||
self._keyhome = keyhome
|
||||
self._message = b''
|
||||
self._recipients = list()
|
||||
self._charset = charset
|
||||
if recipients != None:
|
||||
self._recipients.extend(recipients)
|
||||
"""A wrapper for 'gpg -e' command."""
|
||||
|
||||
def update(self, message):
|
||||
self._message += message
|
||||
def __init__(self, keyhome, recipients=None, charset=None):
|
||||
"""Initialise the wrapper."""
|
||||
self._keyhome = keyhome
|
||||
self._message = b''
|
||||
self._recipients = list()
|
||||
self._charset = charset
|
||||
if recipients is not None:
|
||||
self._recipients.extend(recipients)
|
||||
|
||||
def encrypt(self):
|
||||
p = subprocess.Popen( self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
|
||||
encdata = p.communicate(input=self._message)[0]
|
||||
return (encdata, p.returncode)
|
||||
def update(self, message):
|
||||
"""Append MESSAGE to buffer about to be encrypted."""
|
||||
self._message += message
|
||||
|
||||
def _command(self):
|
||||
cmd = build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--pgp7", "--no-secmem-warning", "-a", "-e")
|
||||
def encrypt(self):
|
||||
"""Feed GnuPG with the message."""
|
||||
p = subprocess.Popen(self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
encdata = p.communicate(input=self._message)[0]
|
||||
return (encdata, p.returncode)
|
||||
|
||||
# add recipients
|
||||
for recipient in self._recipients:
|
||||
cmd.append("-r")
|
||||
cmd.append(recipient)
|
||||
def _command(self):
|
||||
cmd = _build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--pgp7", "--no-secmem-warning", "-a", "-e")
|
||||
|
||||
# add on the charset, if set
|
||||
if self._charset:
|
||||
cmd.append("--comment")
|
||||
cmd.append('Charset: ' + self._charset)
|
||||
# add recipients
|
||||
for recipient in self._recipients:
|
||||
cmd.append("-r")
|
||||
cmd.append(recipient)
|
||||
|
||||
# add on the charset, if set
|
||||
if self._charset:
|
||||
cmd.append("--comment")
|
||||
cmd.append('Charset: ' + self._charset)
|
||||
|
||||
LOG.debug(f'Built command: {cmd!r}')
|
||||
return cmd
|
||||
|
||||
return cmd
|
||||
|
||||
class GPGDecryptor:
|
||||
def __init__(self, keyhome):
|
||||
self._keyhome = keyhome
|
||||
self._message = ''
|
||||
"""A wrapper for 'gpg -d' command."""
|
||||
|
||||
def update(self, message):
|
||||
self._message += message
|
||||
def __init__(self, keyhome):
|
||||
"""Initialise the wrapper."""
|
||||
self._keyhome = keyhome
|
||||
self._message = ''
|
||||
|
||||
def decrypt(self):
|
||||
p = subprocess.Popen( self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
|
||||
decdata = p.communicate(input=self._message)[0]
|
||||
return (decdata, p.returncode)
|
||||
def update(self, message):
|
||||
"""Append encrypted content to be decrypted."""
|
||||
self._message += message
|
||||
|
||||
def _command(self):
|
||||
return build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--no-secmem-warning", "-a", "-d")
|
||||
def decrypt(self):
|
||||
"""Decrypt the message."""
|
||||
p = subprocess.Popen(self._command(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
decdata = p.communicate(input=self._message)[0]
|
||||
return (decdata, p.returncode)
|
||||
|
||||
def _command(self):
|
||||
return _build_command(self._keyhome, "--trust-model", "always", "--batch", "--yes", "--no-secmem-warning", "-a", "-d")
|
||||
|
|
19
Makefile
19
Makefile
|
@ -1,5 +1,5 @@
|
|||
.POSIX:
|
||||
.PHONY: test e2etest unittest crontest pre-clean clean
|
||||
.PHONY: test e2etest unittest crontest daemontest pre-clean clean restore-keyhome
|
||||
|
||||
#
|
||||
# On systems where Python 3.x binary has a different name, just
|
||||
|
@ -17,7 +17,7 @@ TEST_DB = test/lacre.db
|
|||
#
|
||||
# Main goal to run tests.
|
||||
#
|
||||
test: e2etest unittest crontest
|
||||
test: e2etest unittest daemontest crontest
|
||||
|
||||
#
|
||||
# Run a set of end-to-end tests.
|
||||
|
@ -26,7 +26,7 @@ test: e2etest unittest crontest
|
|||
# file. Basically this is just a script that feeds GPG Mailgate with
|
||||
# known input and checks whether output meets expectations.
|
||||
#
|
||||
e2etest: test/tmp test/logs pre-clean
|
||||
e2etest: test/tmp test/logs pre-clean restore-keyhome
|
||||
$(PYTHON) test/e2e_test.py
|
||||
|
||||
#
|
||||
|
@ -40,7 +40,13 @@ crontest: clean-db $(TEST_DB)
|
|||
GPG_MAILGATE_CONFIG=test/gpg-mailgate-cron-test.conf PYTHONPATH=`pwd` $(PYTHON) gpg-mailgate-web/cron.py
|
||||
|
||||
$(TEST_DB):
|
||||
$(PYTHON) test/schema.py $(TEST_DB)
|
||||
$(PYTHON) test/utils/schema.py $(TEST_DB)
|
||||
|
||||
#
|
||||
# Run an e2e test of Advanced Content Filter.
|
||||
#
|
||||
daemontest:
|
||||
$(PYTHON) test/daemon_test.py
|
||||
|
||||
# Before running the crontest goal we need to make sure that the
|
||||
# database gets regenerated.
|
||||
|
@ -51,12 +57,15 @@ clean-db:
|
|||
# Run unit tests
|
||||
#
|
||||
unittest:
|
||||
$(PYTHON) -m unittest discover -s test
|
||||
$(PYTHON) -m unittest discover -s test/modules
|
||||
|
||||
pre-clean:
|
||||
rm -fv test/gpg-mailgate.conf
|
||||
rm -f test/logs/*.log
|
||||
|
||||
restore-keyhome:
|
||||
git restore test/keyhome
|
||||
|
||||
test/tmp:
|
||||
mkdir test/tmp
|
||||
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
# Advanced Filter
|
||||
|
||||
## Postfix Filters
|
||||
|
||||
There are two types of Postfix mail filters: Simple Filters and Advanced
|
||||
Filters. Simple Filters are executed for each incoming email as a new
|
||||
process, which may turn out to be expensive in terms of resources. Advanced
|
||||
Filters work as a mail-processing proxies.
|
||||
|
||||
For detailed documentation, see [FILTER README](https://www.postfix.org/FILTER_README.html).
|
||||
|
||||
## Installation
|
||||
|
||||
Just use the following command to install dependencies:
|
||||
|
||||
pip install -r requirements.txt
|
||||
|
||||
## Configuration
|
||||
|
||||
Lacre Advanced Filter, also known as daemon, is configured in the `[daemon]`
|
||||
section of configuration file. Two obligatory parameters to be defined there
|
||||
are:
|
||||
|
||||
* `host` -- IP address or a host name;
|
||||
* `port` -- TCP port Lacre should listen on.
|
||||
|
||||
The other very important section is `[relay]`, which by default uses Simple
|
||||
Filter destination. It has to be adjusted for Advanced Filter to work,
|
||||
setting port to `10026`.
|
||||
|
||||
Command to spawn a Lacre daemon process is:
|
||||
|
||||
GPG_MAILGATE_CONFIG=/etc/gpg-mailgate.conf PYTHONPATH=... python -m lacre.daemon
|
||||
|
||||
Two environment variables used here are:
|
||||
|
||||
* `GPG_MAILGATE_CONFIG` (not mandatory) -- path to Lacre configuration,
|
||||
unless it's kept in default location (`/etc/gpg-maillgate.conf`).
|
||||
* `PYTHONPATH` (not mandatory) -- location of Lacre modules. You can place
|
||||
them below your Python's `site-packages` to be reachable by any other
|
||||
Python software.
|
|
@ -8,27 +8,31 @@ feed some input to GPG Mailgate and inspect the output.
|
|||
|
||||
To run tests, use command `make test`.
|
||||
|
||||
There are 3 types of tests:
|
||||
There are 4 types of tests:
|
||||
|
||||
* `make e2etest` -- they cover a complete Lacre flow, from feeding it with
|
||||
an email to accepting its encrypted form;
|
||||
* `make daemontest` -- similar to the original `e2etest` goal, but tests the
|
||||
behaviour of the Lacre Daemon, i.e. key part of the Advanced Content
|
||||
Filter.
|
||||
* `make unittest` -- just small tests of small units of code;
|
||||
* `make crontest` -- execute cron job with a SQLite database.
|
||||
* `make crontest` -- execute cron job with a SQLite database.
|
||||
|
||||
E2E tests (`make e2etest`) should produce some helpful logs, so inspect
|
||||
contents of `test/logs` directory if something goes wrong.
|
||||
|
||||
If your system's Python binary isn't found in your `$PATH` or you want to use
|
||||
a specific binary, use make's macro overriding: `make test
|
||||
a specific binary, use make's macro overriding: `make test
|
||||
PYTHON=/path/to/python`.
|
||||
|
||||
## Key building blocks
|
||||
|
||||
- *Test Script* (`test/e2e_test.py`) that orchestrates the other components.
|
||||
It performs test cases described in the *Test Configuration*. It spawns
|
||||
*Test Mail Relay* and *GPG Mailgate* in appropriate order.
|
||||
- *Test Mail Relay* (`test/relay.py`), a simplistic mail daemon that only
|
||||
supports the happy path. It accepts a mail message and prints it to
|
||||
- *Test Script* (`test/e2e_test.py`) and *Daemon Test Script*
|
||||
(`test/daemon_test.py`) that orchestrate the other components. They perform
|
||||
test cases described in the *Test Configuration*. They spawn *Test Mail
|
||||
Relay* and *GPG Mailgate* in appropriate order.
|
||||
- *Test Mail Relay* (`test/relay.py`), a simplistic mail daemon that only
|
||||
supports the happy path. It accepts a mail message and prints it to
|
||||
stdandard output.
|
||||
- *Test Configuration* (`test/e2e.ini`) specifies test cases: their input,
|
||||
expected results and helpful documentation. It also specifies the port that
|
||||
|
@ -38,7 +42,7 @@ PYTHON=/path/to/python`.
|
|||
|
||||
Currently tests only check if the message has been encrypted, without
|
||||
verifying that the correct key has been used. That's because we don't know
|
||||
(yet) how to have a reproducible encrypted message. Option
|
||||
(yet) how to have a reproducible encrypted message. Option
|
||||
`--faked-system-time` wasn't enough to produce identical output.
|
||||
|
||||
## Troubleshooting
|
||||
|
|
|
@ -75,6 +75,14 @@ mail_templates = /var/gpgmailgate/cron_templates
|
|||
# https://docs.python.org/3/library/logging.config.html#logging-config-fileformat
|
||||
config = /etc/gpg-lacre-logging.conf
|
||||
|
||||
[daemon]
|
||||
# Advanced Content Filter section.
|
||||
#
|
||||
# Advanced filters differ from Simple ones by providing a daemon that handles
|
||||
# requests, instead of starting a new process each time a message arrives.
|
||||
host = 127.0.0.1
|
||||
port = 10025
|
||||
|
||||
[relay]
|
||||
# the relay settings to use for Postfix
|
||||
# gpg-mailgate will submit email to this relay after it is done processing
|
||||
|
|
|
@ -1,22 +1,21 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
#
|
||||
# gpg-mailgate
|
||||
# gpg-mailgate
|
||||
#
|
||||
# This file is part of the gpg-mailgate source code.
|
||||
# This file is part of the gpg-mailgate source code.
|
||||
#
|
||||
# gpg-mailgate is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
# gpg-mailgate is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# gpg-mailgate source code is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
# gpg-mailgate source code is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
import email
|
||||
|
@ -26,23 +25,24 @@ import logging
|
|||
|
||||
import lacre
|
||||
import lacre.config as conf
|
||||
import lacre.mailgate as mailgate
|
||||
|
||||
|
||||
start = time.time()
|
||||
conf.load_config()
|
||||
lacre.init_logging(conf.get_item('logging', 'config'))
|
||||
|
||||
# This has to be executed *after* logging initialisation.
|
||||
import lacre.mailgate as mailgate
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
missing_params = conf.validate_config()
|
||||
if missing_params:
|
||||
LOG.error(f"Aborting delivery! Following mandatory config parameters are missing: {missing_params!r}")
|
||||
sys.exit(lacre.EX_CONFIG)
|
||||
LOG.error(f"Aborting delivery! Following mandatory config parameters are missing: {missing_params!r}")
|
||||
sys.exit(lacre.EX_CONFIG)
|
||||
|
||||
# Read e-mail from stdin
|
||||
raw = sys.stdin.read()
|
||||
raw_message = email.message_from_string( raw )
|
||||
raw_message = email.message_from_string(raw)
|
||||
from_addr = raw_message['From']
|
||||
to_addrs = sys.argv[1:]
|
||||
|
||||
|
|
118
lacre/config.py
118
lacre/config.py
|
@ -16,72 +16,102 @@ CONFIG_PATH_ENV = "GPG_MAILGATE_CONFIG"
|
|||
# List of mandatory configuration parameters. Each item on this list should be
|
||||
# a pair: a section name and a parameter name.
|
||||
MANDATORY_CONFIG_ITEMS = [("relay", "host"),
|
||||
("relay", "port")]
|
||||
("relay", "port"),
|
||||
("daemon", "host"),
|
||||
("daemon", "port")]
|
||||
|
||||
# Global dict to keep configuration parameters. It's hidden behind several
|
||||
# utility functions to make it easy to replace it with ConfigParser object in
|
||||
# the future.
|
||||
cfg = dict()
|
||||
|
||||
|
||||
def load_config() -> dict:
|
||||
"""Parses configuration file.
|
||||
"""Parse configuration file.
|
||||
|
||||
If environment variable identified by CONFIG_PATH_ENV
|
||||
variable is set, its value is taken as a configuration file
|
||||
path. Otherwise, the default is taken
|
||||
('/etc/gpg-mailgate.conf').
|
||||
"""
|
||||
configFile = os.getenv(CONFIG_PATH_ENV, '/etc/gpg-mailgate.conf')
|
||||
If environment variable identified by CONFIG_PATH_ENV
|
||||
variable is set, its value is taken as a configuration file
|
||||
path. Otherwise, the default is taken
|
||||
('/etc/gpg-mailgate.conf').
|
||||
"""
|
||||
configFile = os.getenv(CONFIG_PATH_ENV, '/etc/gpg-mailgate.conf')
|
||||
|
||||
parser = read_config(configFile)
|
||||
parser = _read_config(configFile)
|
||||
|
||||
global cfg
|
||||
cfg = copy_to_dict(parser)
|
||||
return cfg
|
||||
global cfg
|
||||
cfg = _copy_to_dict(parser)
|
||||
return cfg
|
||||
|
||||
def read_config(fileName) -> RawConfigParser:
|
||||
cp = RawConfigParser()
|
||||
cp.read(fileName)
|
||||
|
||||
return cp
|
||||
def _read_config(fileName) -> RawConfigParser:
|
||||
cp = RawConfigParser()
|
||||
cp.read(fileName)
|
||||
|
||||
def copy_to_dict(confParser) -> dict:
|
||||
config = dict()
|
||||
return cp
|
||||
|
||||
for sect in confParser.sections():
|
||||
config[sect] = dict()
|
||||
for (name, value) in confParser.items(sect):
|
||||
config[sect][name] = value
|
||||
|
||||
return config
|
||||
def _copy_to_dict(confParser) -> dict:
|
||||
config = dict()
|
||||
|
||||
for sect in confParser.sections():
|
||||
config[sect] = dict()
|
||||
for (name, value) in confParser.items(sect):
|
||||
config[sect][name] = value
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def get_item(section, key, empty_value=None):
|
||||
global cfg
|
||||
if config_item_set(section, key):
|
||||
return cfg[section][key]
|
||||
else:
|
||||
return empty_value
|
||||
|
||||
def get_item(section, key, empty_value = None):
|
||||
global cfg
|
||||
if config_item_set(section, key):
|
||||
return cfg[section][key]
|
||||
else:
|
||||
return empty_value
|
||||
|
||||
def has_section(section) -> bool:
|
||||
global cfg
|
||||
return section in cfg
|
||||
return section in cfg
|
||||
|
||||
|
||||
def config_item_set(section, key) -> bool:
|
||||
global cfg
|
||||
return section in cfg and (key in cfg[section]) and not (cfg[section][key] is None)
|
||||
return section in cfg and (key in cfg[section]) and not (cfg[section][key] is None)
|
||||
|
||||
|
||||
def config_item_equals(section, key, value) -> bool:
|
||||
global cfg
|
||||
return section in cfg and key in cfg[section] and cfg[section][key] == value
|
||||
return section in cfg and key in cfg[section] and cfg[section][key] == value
|
||||
|
||||
|
||||
def flag_enabled(section, key) -> bool:
|
||||
return config_item_equals(section, key, 'yes')
|
||||
|
||||
|
||||
def validate_config():
|
||||
"""Checks whether the configuration is complete.
|
||||
"""Check if configuration is complete.
|
||||
|
||||
Returns a list of missing parameters, so an empty list means
|
||||
configuration is complete.
|
||||
"""
|
||||
missing = []
|
||||
for (section, param) in MANDATORY_CONFIG_ITEMS:
|
||||
if not config_item_set(section, param):
|
||||
missing.append((section, param))
|
||||
return missing
|
||||
Returns a list of missing parameters, so an empty list means
|
||||
configuration is complete.
|
||||
"""
|
||||
missing = []
|
||||
for (section, param) in MANDATORY_CONFIG_ITEMS:
|
||||
if not config_item_set(section, param):
|
||||
missing.append((section, param))
|
||||
return missing
|
||||
|
||||
|
||||
#
|
||||
# High level access to configuration.
|
||||
#
|
||||
|
||||
def relay_params():
|
||||
"""Return a (HOST, PORT) tuple identifying the mail relay."""
|
||||
return (cfg["relay"]["host"], int(cfg["relay"]["port"]))
|
||||
|
||||
|
||||
def daemon_params():
|
||||
"""Return a (HOST, PORT) tuple to setup a server socket for Lacre daemon."""
|
||||
return (cfg["daemon"]["host"], int(cfg["daemon"]["port"]))
|
||||
|
||||
|
||||
def strict_mode():
|
||||
"""Check if Lacre is configured to support only a fixed list of keys."""
|
||||
return ("default" in cfg and cfg["default"]["enc_keymap_only"] == "yes")
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
"""Lacre Daemon, the Advanced Mail Filter message dispatcher."""
|
||||
|
||||
import logging
|
||||
import lacre
|
||||
import lacre.config as conf
|
||||
import sys
|
||||
from aiosmtpd.controller import Controller
|
||||
from aiosmtpd.smtp import Envelope
|
||||
import asyncio
|
||||
import email
|
||||
import time
|
||||
from watchdog.observers import Observer
|
||||
|
||||
# Mail status constants.
|
||||
#
|
||||
# These are the only values that our mail handler is allowed to return.
|
||||
RESULT_OK = '250 OK'
|
||||
RESULT_ERROR = '500 Could not process your message'
|
||||
RESULT_NOT_IMPLEMENTED = '500 Not implemented yet'
|
||||
|
||||
# Load configuration and init logging, in this order. Only then can we load
|
||||
# the last Lacre module, i.e. lacre.mailgate.
|
||||
conf.load_config()
|
||||
lacre.init_logging(conf.get_item("logging", "config"))
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
import lacre.mailgate as gate
|
||||
import lacre.keyring as kcache
|
||||
|
||||
|
||||
class MailEncryptionProxy:
|
||||
"""A mail handler dispatching to appropriate mail operation."""
|
||||
|
||||
def __init__(self, keyring: kcache.KeyRing):
|
||||
"""Initialise the mail proxy with a reference to the key cache."""
|
||||
self._keyring = keyring
|
||||
|
||||
async def handle_DATA(self, server, session, envelope: Envelope):
|
||||
"""Accept a message and either encrypt it or forward as-is."""
|
||||
start = time.process_time()
|
||||
try:
|
||||
keys = await self._keyring.freeze_identities()
|
||||
message = email.message_from_bytes(envelope.content)
|
||||
for operation in gate.delivery_plan(envelope.rcpt_tos, keys):
|
||||
LOG.debug(f"Sending mail via {operation!r}")
|
||||
new_message = operation.perform(message)
|
||||
gate.send_msg(new_message, operation.recipients(), envelope.mail_from)
|
||||
except TypeError as te:
|
||||
LOG.exception("Got exception while processing", exc_info=te)
|
||||
return RESULT_ERROR
|
||||
|
||||
ellapsed = time.process_time() - start
|
||||
LOG.info(f'Message delivered in {ellapsed} ms')
|
||||
return RESULT_OK
|
||||
|
||||
|
||||
def _init_controller(keys: kcache.KeyRing, tout: float = 5):
|
||||
proxy = MailEncryptionProxy(keys)
|
||||
host, port = conf.daemon_params()
|
||||
LOG.info(f"Initialising a mail Controller at {host}:{port}")
|
||||
return Controller(proxy, hostname=host, port=port, ready_timeout=tout)
|
||||
|
||||
|
||||
def _init_reloader(keyring_dir: str, reloader) -> kcache.KeyringModificationListener:
|
||||
listener = kcache.KeyringModificationListener(reloader)
|
||||
observer = Observer()
|
||||
observer.schedule(listener, keyring_dir, recursive=False)
|
||||
return observer
|
||||
|
||||
|
||||
def _validate_config():
|
||||
missing = conf.validate_config()
|
||||
if missing:
|
||||
params = ", ".join([_full_param_name(tup) for tup in missing])
|
||||
LOG.error(f"Following mandatory parameters are missing: {params}")
|
||||
sys.exit(lacre.EX_CONFIG)
|
||||
|
||||
|
||||
def _full_param_name(tup):
|
||||
return f"[{tup[0]}]{tup[1]}"
|
||||
|
||||
|
||||
async def _sleep():
|
||||
while True:
|
||||
await asyncio.sleep(360)
|
||||
|
||||
|
||||
def _main():
|
||||
_validate_config()
|
||||
|
||||
keyring_path = conf.get_item('gpg', 'keyhome')
|
||||
keyring = kcache.KeyRing(keyring_path)
|
||||
controller = _init_controller(keyring)
|
||||
reloader = _init_reloader(keyring_path, keyring)
|
||||
|
||||
LOG.info(f'Watching keyring directory {keyring_path}...')
|
||||
reloader.start()
|
||||
|
||||
LOG.info('Starting the daemon...')
|
||||
controller.start()
|
||||
|
||||
try:
|
||||
asyncio.run(_sleep())
|
||||
except KeyboardInterrupt:
|
||||
LOG.info("Finishing...")
|
||||
finally:
|
||||
LOG.info('Shutting down keyring watcher and the daemon...')
|
||||
reloader.stop()
|
||||
reloader.join()
|
||||
controller.stop()
|
||||
|
||||
LOG.info("Done")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
_main()
|
|
@ -0,0 +1,147 @@
|
|||
"""Data structures and utilities to make keyring access easier.
|
||||
|
||||
IMPORTANT: This module has to be loaded _after_ initialisation of the logging
|
||||
module.
|
||||
"""
|
||||
|
||||
import lacre.text as text
|
||||
import logging
|
||||
from os import stat
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
from asyncio import Semaphore, run
|
||||
import copy
|
||||
|
||||
import GnuPG
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _sanitize(keys):
|
||||
for fingerprint in keys:
|
||||
keys[fingerprint] = text.sanitize_case_sense(keys[fingerprint])
|
||||
|
||||
return keys
|
||||
|
||||
|
||||
class KeyCacheMisconfiguration(Exception):
|
||||
"""Exception used to signal that KeyCache is misconfigured."""
|
||||
|
||||
|
||||
class KeyCache:
|
||||
"""A store for OpenPGP keys.
|
||||
|
||||
Key case is sanitised while loading from GnuPG if so
|
||||
configured. See mail_case_insensitive parameter in section
|
||||
[default].
|
||||
"""
|
||||
|
||||
def __init__(self, keys: dict = None):
|
||||
"""Initialise an empty cache.
|
||||
|
||||
With keyring_dir given, set location of the directory from which keys should be loaded.
|
||||
"""
|
||||
self._keys = keys
|
||||
|
||||
def __getitem__(self, fingerpring):
|
||||
"""Look up email assigned to the given fingerprint."""
|
||||
return self._keys[fingerpring]
|
||||
|
||||
def __setitem__(self, fingerprint, email):
|
||||
"""Assign an email to a fingerpring, overwriting it if it was already present."""
|
||||
self._keys[fingerprint] = email
|
||||
|
||||
def __contains__(self, fingerprint):
|
||||
"""Check if the given fingerprint is assigned to an email."""
|
||||
# This method has to be present for KeyCache to be a dict substitute.
|
||||
# See mailgate, function _identify_gpg_recipients.
|
||||
return fingerprint in self._keys
|
||||
|
||||
def has_email(self, email):
|
||||
"""Check if cache contains a key assigned to the given email."""
|
||||
return email in self._keys.values()
|
||||
|
||||
|
||||
class KeyRing:
|
||||
"""A high-level adapter for GnuPG-maintained keyring directory.
|
||||
|
||||
Its role is to keep a cache of keys present in the keyring,
|
||||
reload it when necessary and produce static copies of
|
||||
fingerprint=>email maps.
|
||||
"""
|
||||
|
||||
def __init__(self, path: str):
|
||||
"""Initialise the adapter."""
|
||||
self._path = path
|
||||
self._keys = self._load_and_sanitize()
|
||||
self._sema = Semaphore()
|
||||
self._last_mod = None
|
||||
|
||||
def _load_and_sanitize(self):
|
||||
keys = self._load_keyring_from(self._path)
|
||||
return _sanitize(keys)
|
||||
|
||||
def _load_keyring_from(self, keyring_dir):
|
||||
return GnuPG.public_keys(keyring_dir)
|
||||
|
||||
async def freeze_identities(self) -> KeyCache:
|
||||
"""Return a static, async-safe copy of the identity map."""
|
||||
async with self._sema:
|
||||
keys = copy.deepcopy(self._keys)
|
||||
return KeyCache(keys)
|
||||
|
||||
def load(self):
|
||||
"""Load keyring, replacing any previous contents of the cache."""
|
||||
LOG.debug('Reloading keys...')
|
||||
run(self._load())
|
||||
|
||||
async def _load(self):
|
||||
last_mod = self._read_mod_time()
|
||||
if self._is_modified(last_mod):
|
||||
async with self._sema:
|
||||
self.replace_keyring(self._load_keyring_from(self._path))
|
||||
|
||||
self._last_mod = self._read_mod_time()
|
||||
|
||||
reload = load
|
||||
|
||||
def replace_keyring(self, keys: dict):
|
||||
"""Overwrite previously stored key cache with KEYS."""
|
||||
keys = _sanitize(keys)
|
||||
|
||||
LOG.info(f'Storing {len(keys)} keys')
|
||||
self._keys = keys
|
||||
|
||||
def _read_mod_time(self):
|
||||
# (mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime)
|
||||
MTIME = 8
|
||||
st = stat(self._path)
|
||||
return st[MTIME]
|
||||
|
||||
def _is_modified(self, last_mod):
|
||||
if self._last_mod is None:
|
||||
LOG.debug('Keyring not loaded before')
|
||||
return True
|
||||
elif self._last_mod != last_mod:
|
||||
LOG.debug('Keyring directory mtime changed')
|
||||
return True
|
||||
else:
|
||||
LOG.debug('Keyring not modified ')
|
||||
return False
|
||||
|
||||
|
||||
class KeyringModificationListener(FileSystemEventHandler):
|
||||
"""A filesystem event listener that triggers key cache reload."""
|
||||
|
||||
def __init__(self, keyring: KeyRing):
|
||||
"""Initialise a listener with a callback to be executed upon each change."""
|
||||
self._keyring = keyring
|
||||
|
||||
def handle(self, event):
|
||||
"""Reload keys upon FS event."""
|
||||
LOG.debug(f'Reloading on event {event!r}')
|
||||
self._keyring.reload()
|
||||
|
||||
# All methods should do the same: reload the key cache.
|
||||
# on_created = handle
|
||||
# on_deleted = handle
|
||||
on_modified = handle
|
|
@ -1,24 +1,28 @@
|
|||
"""Lacre's actual mail-delivery module.
|
||||
|
||||
IMPORTANT: This module has to be loaded _after_ initialisation of the logging
|
||||
module.
|
||||
"""
|
||||
|
||||
#
|
||||
# gpg-mailgate
|
||||
# gpg-mailgate
|
||||
#
|
||||
# This file is part of the gpg-mailgate source code.
|
||||
# This file is part of the gpg-mailgate source code.
|
||||
#
|
||||
# gpg-mailgate is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
# gpg-mailgate is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# gpg-mailgate source code is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
# gpg-mailgate source code is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
from configparser import RawConfigParser
|
||||
from email.mime.base import MIMEBase
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
import copy
|
||||
import email
|
||||
|
@ -26,415 +30,538 @@ import email.message
|
|||
import email.utils
|
||||
import GnuPG
|
||||
import os
|
||||
import re
|
||||
import smtplib
|
||||
import sys
|
||||
import syslog
|
||||
import traceback
|
||||
import time
|
||||
|
||||
import asyncio
|
||||
|
||||
# imports for S/MIME
|
||||
from M2Crypto import BIO, Rand, SMIME, X509
|
||||
from email.mime.message import MIMEMessage
|
||||
from M2Crypto import BIO, SMIME, X509
|
||||
|
||||
import logging
|
||||
import lacre
|
||||
import lacre.text as text
|
||||
import lacre.config as conf
|
||||
import lacre.keyring as kcache
|
||||
from lacre.mailop import KeepIntact, InlineOpenPGPEncrypt, MimeOpenPGPEncrypt
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
def gpg_encrypt( raw_message, recipients ):
|
||||
global LOG
|
||||
|
||||
if not conf.config_item_set('gpg', 'keyhome'):
|
||||
LOG.error("No valid entry for gpg keyhome. Encryption aborted.")
|
||||
return recipients
|
||||
|
||||
keys = GnuPG.public_keys( conf.get_item('gpg', 'keyhome') )
|
||||
for fingerprint in keys:
|
||||
keys[fingerprint] = sanitize_case_sense(keys[fingerprint])
|
||||
|
||||
# This list will be filled with pairs (M, N), where M is the destination
|
||||
# address we're going to deliver the message to and N is the identity we're
|
||||
# going to encrypt it for.
|
||||
gpg_to = list()
|
||||
|
||||
ungpg_to = list()
|
||||
|
||||
enc_keymap_only = conf.config_item_equals('default', 'enc_keymap_only', 'yes')
|
||||
|
||||
for to in recipients:
|
||||
|
||||
# Check if recipient is in keymap
|
||||
if conf.config_item_set('enc_keymap', to):
|
||||
LOG.info("Encrypt keymap has key '%s'" % conf.get_item('enc_keymap', to) )
|
||||
# Check we've got a matching key!
|
||||
if conf.get_item('enc_keymap', to) in keys:
|
||||
gpg_to.append( (to, conf.get_item('enc_keymap', to)) )
|
||||
continue
|
||||
else:
|
||||
LOG.info("Key '%s' in encrypt keymap not found in keyring for email address '%s'." % (conf.get_item('enc_keymap', to), to))
|
||||
|
||||
# Check if key in keychain is present
|
||||
if not enc_keymap_only:
|
||||
if to in keys.values():
|
||||
gpg_to.append( (to, to) )
|
||||
continue
|
||||
|
||||
# If this is an address with a delimiter (i.e. "foo+bar@example.com"),
|
||||
# then strip whatever is found after the delimiter and try this address.
|
||||
(newto, topic) = text.parse_delimiter(to)
|
||||
if newto in keys.values():
|
||||
gpg_to.append((to, newto))
|
||||
|
||||
# Check if there is a default key for the domain
|
||||
splitted_to = to.split('@')
|
||||
if len(splitted_to) > 1:
|
||||
domain = splitted_to[1]
|
||||
if conf.config_item_set('enc_domain_keymap', domain):
|
||||
LOG.info("Encrypt domain keymap has key '%s'" % conf.get_item('enc_domain_keymap', domain) )
|
||||
# Check we've got a matching key!
|
||||
if conf.get_item('enc_domain_keymap', domain) in keys:
|
||||
LOG.info("Using default domain key for recipient '%s'" % to)
|
||||
gpg_to.append( (to, conf.get_item('enc_domain_keymap', domain)) )
|
||||
continue
|
||||
else:
|
||||
LOG.info("Key '%s' in encrypt domain keymap not found in keyring for email address '%s'." % (conf.get_item('enc_domain_keymap', domain), to))
|
||||
|
||||
# At this point no key has been found
|
||||
LOG.debug("Recipient (%s) not in PGP domain list for encrypting." % to)
|
||||
ungpg_to.append(to)
|
||||
|
||||
if gpg_to:
|
||||
LOG.info("Encrypting email to: %s" % ' '.join( x[0] for x in gpg_to ))
|
||||
|
||||
# Getting PGP style for recipient
|
||||
gpg_to_smtp_mime = list()
|
||||
gpg_to_cmdline_mime = list()
|
||||
|
||||
gpg_to_smtp_inline = list()
|
||||
gpg_to_cmdline_inline = list()
|
||||
|
||||
for rcpt in gpg_to:
|
||||
# Checking pre defined styles in settings first
|
||||
if conf.config_item_equals('pgp_style', rcpt[0], 'mime'):
|
||||
gpg_to_smtp_mime.append(rcpt[0])
|
||||
gpg_to_cmdline_mime.extend(rcpt[1].split(','))
|
||||
elif conf.config_item_equals('pgp_style', rcpt[0], 'inline'):
|
||||
gpg_to_smtp_inline.append(rcpt[0])
|
||||
gpg_to_cmdline_inline.extend(rcpt[1].split(','))
|
||||
else:
|
||||
# Log message only if an unknown style is defined
|
||||
if conf.config_item_set('pgp_style', rcpt[0]):
|
||||
LOG.debug("Style %s for recipient %s is not known. Use default as fallback." % (conf.get_item("pgp_style", rcpt[0]), rcpt[0]))
|
||||
|
||||
# If no style is in settings defined for recipient, use default from settings
|
||||
if conf.config_item_equals('default', 'mime_conversion', 'yes'):
|
||||
gpg_to_smtp_mime.append(rcpt[0])
|
||||
gpg_to_cmdline_mime.extend(rcpt[1].split(','))
|
||||
else:
|
||||
gpg_to_smtp_inline.append(rcpt[0])
|
||||
gpg_to_cmdline_inline.extend(rcpt[1].split(','))
|
||||
|
||||
if gpg_to_smtp_mime:
|
||||
# Encrypt mail with PGP/MIME
|
||||
raw_message_mime = copy.deepcopy(raw_message)
|
||||
|
||||
if conf.config_item_equals('default', 'add_header', 'yes'):
|
||||
raw_message_mime['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
|
||||
|
||||
if 'Content-Transfer-Encoding' in raw_message_mime:
|
||||
raw_message_mime.replace_header('Content-Transfer-Encoding', '8BIT')
|
||||
else:
|
||||
raw_message_mime['Content-Transfer-Encoding'] = '8BIT'
|
||||
|
||||
encrypted_payloads = encrypt_all_payloads_mime( raw_message_mime, gpg_to_cmdline_mime )
|
||||
raw_message_mime.set_payload( encrypted_payloads )
|
||||
|
||||
send_msg( raw_message_mime.as_string(), gpg_to_smtp_mime )
|
||||
|
||||
if gpg_to_smtp_inline:
|
||||
# Encrypt mail with PGP/INLINE
|
||||
raw_message_inline = copy.deepcopy(raw_message)
|
||||
|
||||
if conf.config_item_equals('default', 'add_header', 'yes'):
|
||||
raw_message_inline['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
|
||||
|
||||
if 'Content-Transfer-Encoding' in raw_message_inline:
|
||||
raw_message_inline.replace_header('Content-Transfer-Encoding', '8BIT')
|
||||
else:
|
||||
raw_message_inline['Content-Transfer-Encoding'] = '8BIT'
|
||||
|
||||
encrypted_payloads = encrypt_all_payloads_inline( raw_message_inline, gpg_to_cmdline_inline )
|
||||
raw_message_inline.set_payload( encrypted_payloads )
|
||||
|
||||
send_msg( raw_message_inline.as_string(), gpg_to_smtp_inline )
|
||||
|
||||
return ungpg_to
|
||||
|
||||
def encrypt_all_payloads_inline( message, gpg_to_cmdline ):
|
||||
|
||||
# This breaks cascaded MIME messages. Blame PGP/INLINE.
|
||||
encrypted_payloads = list()
|
||||
if isinstance(message.get_payload(), str):
|
||||
return encrypt_payload( message, gpg_to_cmdline ).get_payload()
|
||||
|
||||
for payload in message.get_payload():
|
||||
if( isinstance(payload.get_payload(), list) ):
|
||||
encrypted_payloads.extend( encrypt_all_payloads_inline( payload, gpg_to_cmdline ) )
|
||||
else:
|
||||
encrypted_payloads.append( encrypt_payload( payload, gpg_to_cmdline ) )
|
||||
|
||||
return encrypted_payloads
|
||||
|
||||
def encrypt_all_payloads_mime( message, gpg_to_cmdline ):
|
||||
# Convert a plain text email into PGP/MIME attachment style. Modeled after enigmail.
|
||||
pgp_ver_part = email.message.Message()
|
||||
pgp_ver_part.set_payload("Version: 1"+text.EOL)
|
||||
pgp_ver_part.set_type("application/pgp-encrypted")
|
||||
pgp_ver_part.set_param('PGP/MIME version identification', "", 'Content-Description' )
|
||||
|
||||
encrypted_part = email.message.Message()
|
||||
encrypted_part.set_type("application/octet-stream")
|
||||
encrypted_part.set_param('name', "encrypted.asc")
|
||||
encrypted_part.set_param('OpenPGP encrypted message', "", 'Content-Description' )
|
||||
encrypted_part.set_param('inline', "", 'Content-Disposition' )
|
||||
encrypted_part.set_param('filename', "encrypted.asc", 'Content-Disposition' )
|
||||
|
||||
if isinstance(message.get_payload(), str):
|
||||
# WTF! It seems to swallow the first line. Not sure why. Perhaps
|
||||
# it's skipping an imaginary blank line someplace. (ie skipping a header)
|
||||
# Workaround it here by prepending a blank line.
|
||||
# This happens only on text only messages.
|
||||
additionalSubHeader=""
|
||||
encoding = sys.getdefaultencoding()
|
||||
if 'Content-Type' in message and not message['Content-Type'].startswith('multipart'):
|
||||
additionalSubHeader="Content-Type: "+message['Content-Type']+text.EOL
|
||||
(base, encoding) = text.parse_content_type(message['Content-Type'])
|
||||
LOG.debug(f"Identified encoding as {encoding}")
|
||||
encrypted_part.set_payload(additionalSubHeader+text.EOL +message.get_payload(decode=True).decode(encoding))
|
||||
check_nested = True
|
||||
else:
|
||||
processed_payloads = generate_message_from_payloads(message)
|
||||
encrypted_part.set_payload(processed_payloads.as_string())
|
||||
check_nested = False
|
||||
|
||||
message.preamble = "This is an OpenPGP/MIME encrypted message (RFC 2440 and 3156)"
|
||||
|
||||
# Use this just to generate a MIME boundary string.
|
||||
junk_msg = MIMEMultipart()
|
||||
junk_str = junk_msg.as_string() # WTF! Without this, get_boundary() will return 'None'!
|
||||
boundary = junk_msg.get_boundary()
|
||||
|
||||
# This also modifies the boundary in the body of the message, ie it gets parsed.
|
||||
if 'Content-Type' in message:
|
||||
message.replace_header('Content-Type', f"multipart/encrypted; protocol=\"application/pgp-encrypted\"; boundary=\"{boundary}\""+text.EOL)
|
||||
else:
|
||||
message['Content-Type'] = f"multipart/encrypted; protocol=\"application/pgp-encrypted\"; boundary=\"{boundary}\""+text.EOL
|
||||
|
||||
return [ pgp_ver_part, encrypt_payload(encrypted_part, gpg_to_cmdline, check_nested) ]
|
||||
|
||||
def encrypt_payload( payload, gpg_to_cmdline, check_nested = True ):
|
||||
global LOG
|
||||
|
||||
raw_payload = payload.get_payload(decode=True)
|
||||
if check_nested and text.is_pgp_inline(raw_payload):
|
||||
LOG.debug("Message is already pgp encrypted. No nested encryption needed.")
|
||||
return payload
|
||||
|
||||
# No check is needed for conf.get_item('gpg', 'keyhome') as this is already done in method gpg_encrypt
|
||||
gpg = GnuPG.GPGEncryptor( conf.get_item('gpg', 'keyhome'), gpg_to_cmdline, payload.get_content_charset() )
|
||||
gpg.update( raw_payload )
|
||||
encrypted_data, returncode = gpg.encrypt()
|
||||
LOG.debug("Return code from encryption=%d (0 indicates success)." % returncode)
|
||||
if returncode != 0:
|
||||
LOG.info("Encrytion failed with return code %d. Encryption aborted." % returncode)
|
||||
return payload
|
||||
|
||||
payload.set_payload( encrypted_data )
|
||||
isAttachment = payload.get_param( 'attachment', None, 'Content-Disposition' ) is not None
|
||||
|
||||
if isAttachment:
|
||||
filename = payload.get_filename()
|
||||
if filename:
|
||||
pgpFilename = filename + ".pgp"
|
||||
if not (payload.get('Content-Disposition') is None):
|
||||
payload.set_param( 'filename', pgpFilename, 'Content-Disposition' )
|
||||
if not (payload.get('Content-Type') is None) and not (payload.get_param( 'name' ) is None):
|
||||
payload.set_param( 'name', pgpFilename )
|
||||
if not (payload.get('Content-Transfer-Encoding') is None):
|
||||
payload.replace_header( 'Content-Transfer-Encoding', "7bit" )
|
||||
|
||||
return payload
|
||||
|
||||
def smime_encrypt( raw_message, recipients ):
|
||||
global LOG
|
||||
global from_addr
|
||||
|
||||
if not conf.config_item_set('smime', 'cert_path'):
|
||||
LOG.info("No valid path for S/MIME certs found in config file. S/MIME encryption aborted.")
|
||||
return recipients
|
||||
|
||||
cert_path = conf.get_item('smime', 'cert_path')+"/"
|
||||
s = SMIME.SMIME()
|
||||
sk = X509.X509_Stack()
|
||||
smime_to = list()
|
||||
unsmime_to = list()
|
||||
|
||||
for addr in recipients:
|
||||
cert_and_email = get_cert_for_email(addr, cert_path)
|
||||
|
||||
if not (cert_and_email is None):
|
||||
(to_cert, normal_email) = cert_and_email
|
||||
LOG.debug("Found cert " + to_cert + " for " + addr + ": " + normal_email)
|
||||
smime_to.append(addr)
|
||||
x509 = X509.load_cert(to_cert, format=X509.FORMAT_PEM)
|
||||
sk.push(x509)
|
||||
else:
|
||||
unsmime_to.append(addr)
|
||||
|
||||
if smime_to:
|
||||
s.set_x509_stack(sk)
|
||||
s.set_cipher(SMIME.Cipher('aes_192_cbc'))
|
||||
p7 = s.encrypt( BIO.MemoryBuffer( raw_message.as_string() ) )
|
||||
# Output p7 in mail-friendly format.
|
||||
out = BIO.MemoryBuffer()
|
||||
out.write('From: ' + from_addr + text.EOL)
|
||||
out.write('To: ' + raw_message['To'] + text.EOL)
|
||||
if raw_message['Cc']:
|
||||
out.write('Cc: ' + raw_message['Cc'] + text.EOL)
|
||||
if raw_message['Bcc']:
|
||||
out.write('Bcc: ' + raw_message['Bcc'] + text.EOL)
|
||||
if raw_message['Subject']:
|
||||
out.write('Subject: '+ raw_message['Subject'] + text.EOL)
|
||||
|
||||
if conf.config_item_equals('default', 'add_header', 'yes'):
|
||||
out.write('X-GPG-Mailgate: Encrypted by GPG Mailgate' + text.EOL)
|
||||
|
||||
s.write(out, p7)
|
||||
|
||||
LOG.debug(f"Sending message from {from_addr} to {smime_to}")
|
||||
|
||||
send_msg(out.read(), smime_to)
|
||||
if unsmime_to:
|
||||
LOG.debug(f"Unable to find valid S/MIME certificates for {unsmime_to}")
|
||||
|
||||
return unsmime_to
|
||||
|
||||
def get_cert_for_email( to_addr, cert_path ):
|
||||
global LOG
|
||||
|
||||
insensitive = conf.config_item_equals('default', 'mail_case_insensitive', 'yes')
|
||||
|
||||
files_in_directory = os.listdir(cert_path)
|
||||
for filename in files_in_directory:
|
||||
file_path = os.path.join(cert_path, filename)
|
||||
if not os.path.isfile(file_path):
|
||||
continue
|
||||
|
||||
if insensitive:
|
||||
if filename.casefold() == to_addr:
|
||||
return (file_path, to_addr)
|
||||
else:
|
||||
if filename == to_addr:
|
||||
return (file_path, to_addr)
|
||||
|
||||
# support foo+ignore@bar.com -> foo@bar.com
|
||||
(fixed_up_email, topic) = text.parse_delimiter(to_addr)
|
||||
if topic is None:
|
||||
# delimiter not used
|
||||
return None
|
||||
else:
|
||||
LOG.debug(f"Looking up certificate for {fixed_up_email} after parsing {to_addr}")
|
||||
return get_cert_for_email(fixed_up_email, cert_path)
|
||||
|
||||
def sanitize_case_sense( address ):
|
||||
if conf.config_item_equals('default', 'mail_case_insensitive', 'yes'):
|
||||
address = address.lower()
|
||||
else:
|
||||
splitted_address = address.split('@')
|
||||
if len(splitted_address) > 1:
|
||||
address = splitted_address[0] + '@' + splitted_address[1].lower()
|
||||
|
||||
return address
|
||||
|
||||
def generate_message_from_payloads( payloads, message = None ):
|
||||
if message == None:
|
||||
message = email.mime.multipart.MIMEMultipart(payloads.get_content_subtype())
|
||||
|
||||
for payload in payloads.get_payload():
|
||||
if( isinstance(payload.get_payload(), list) ):
|
||||
message.attach(generate_message_from_payloads(payload))
|
||||
else:
|
||||
message.attach(payload)
|
||||
|
||||
return message
|
||||
|
||||
def get_first_payload( payloads ):
|
||||
if payloads.is_multipart():
|
||||
return get_first_payload(payloads.get_payload(0))
|
||||
else:
|
||||
return payloads
|
||||
|
||||
def send_msg( message, recipients ):
|
||||
global LOG
|
||||
global from_addr
|
||||
|
||||
recipients = [_f for _f in recipients if _f]
|
||||
if recipients:
|
||||
LOG.info(f"Sending email to: {recipients!r}")
|
||||
relay = (conf.get_item('relay', 'host'), int(conf.get_item('relay', 'port')))
|
||||
smtp = smtplib.SMTP(relay[0], relay[1])
|
||||
if conf.config_item_equals('relay', 'starttls', 'yes'):
|
||||
smtp.starttls()
|
||||
smtp.sendmail( from_addr, recipients, message )
|
||||
else:
|
||||
LOG.info("No recipient found")
|
||||
|
||||
def deliver_message( raw_message, from_address, to_addrs ):
|
||||
global LOG
|
||||
global from_addr
|
||||
|
||||
# Ugly workaround to keep the code working without too many changes.
|
||||
from_addr = from_address
|
||||
|
||||
recipients_left = [sanitize_case_sense(recipient) for recipient in to_addrs]
|
||||
|
||||
# There is no need for nested encryption
|
||||
first_payload = get_first_payload(raw_message)
|
||||
if first_payload.get_content_type() == 'application/pkcs7-mime':
|
||||
LOG.debug("Message is already encrypted with S/MIME. Encryption aborted.")
|
||||
send_msg(raw_message.as_string(), recipients_left)
|
||||
return
|
||||
|
||||
first_payload = first_payload.get_payload(decode=True)
|
||||
if text.is_pgp_inline(first_payload):
|
||||
LOG.debug("Message is already encrypted as PGP/INLINE. Encryption aborted.")
|
||||
send_msg(raw_message.as_string(), recipients_left)
|
||||
return
|
||||
|
||||
if raw_message.get_content_type() == 'multipart/encrypted':
|
||||
LOG.debug("Message is already encrypted. Encryption aborted.")
|
||||
send_msg(raw_message.as_string(), recipients_left)
|
||||
return
|
||||
|
||||
# Encrypt mails for recipients with known public PGP keys
|
||||
recipients_left = gpg_encrypt(raw_message, recipients_left)
|
||||
if not recipients_left:
|
||||
return
|
||||
|
||||
# Encrypt mails for recipients with known S/MIME certificate
|
||||
recipients_left = smime_encrypt(raw_message, recipients_left)
|
||||
if not recipients_left:
|
||||
return
|
||||
|
||||
# Send out mail to recipients which are left
|
||||
send_msg(raw_message.as_string(), recipients_left)
|
||||
|
||||
def _gpg_encrypt(raw_message, recipients):
|
||||
if not conf.config_item_set('gpg', 'keyhome'):
|
||||
LOG.error("No valid entry for gpg keyhome. Encryption aborted.")
|
||||
return recipients
|
||||
|
||||
gpg_to, ungpg_to = _identify_gpg_recipients(recipients, _load_keys())
|
||||
|
||||
LOG.info(f"Got addresses: gpg_to={gpg_to!r}, ungpg_to={ungpg_to!r}")
|
||||
|
||||
if gpg_to:
|
||||
LOG.info("Encrypting email to: %s" % ' '.join(x.email() for x in gpg_to))
|
||||
|
||||
gpg_to_smtp_mime, gpg_to_cmdline_mime, \
|
||||
gpg_to_smtp_inline, gpg_to_cmdline_inline = \
|
||||
_sort_gpg_recipients(gpg_to)
|
||||
|
||||
if gpg_to_smtp_mime:
|
||||
# Encrypt mail with PGP/MIME
|
||||
_gpg_encrypt_and_deliver(raw_message,
|
||||
gpg_to_cmdline_mime, gpg_to_smtp_mime,
|
||||
_encrypt_all_payloads_mime)
|
||||
|
||||
if gpg_to_smtp_inline:
|
||||
# Encrypt mail with PGP/INLINE
|
||||
_gpg_encrypt_and_deliver(raw_message,
|
||||
gpg_to_cmdline_inline, gpg_to_smtp_inline,
|
||||
_encrypt_all_payloads_inline)
|
||||
|
||||
LOG.info(f"Not processed emails: {ungpg_to}")
|
||||
return ungpg_to
|
||||
|
||||
|
||||
def _sort_gpg_recipients(gpg_to):
|
||||
gpg_to_smtp_mime = list()
|
||||
gpg_to_cmdline_mime = list()
|
||||
|
||||
gpg_to_smtp_inline = list()
|
||||
gpg_to_cmdline_inline = list()
|
||||
|
||||
default_to_pgp_mime = conf.config_item_equals('default', 'mime_conversion', 'yes')
|
||||
|
||||
for rcpt in gpg_to:
|
||||
# Checking pre defined styles in settings first
|
||||
if conf.config_item_equals('pgp_style', rcpt.email(), 'mime'):
|
||||
gpg_to_smtp_mime.append(rcpt.email())
|
||||
gpg_to_cmdline_mime.extend(rcpt.key().split(','))
|
||||
elif conf.config_item_equals('pgp_style', rcpt.email(), 'inline'):
|
||||
gpg_to_smtp_inline.append(rcpt.email())
|
||||
gpg_to_cmdline_inline.extend(rcpt.key().split(','))
|
||||
else:
|
||||
# Log message only if an unknown style is defined
|
||||
if conf.config_item_set('pgp_style', rcpt.email()):
|
||||
LOG.debug("Style %s for recipient %s is not known. Use default as fallback."
|
||||
% (conf.get_item("pgp_style", rcpt.email()), rcpt.email()))
|
||||
|
||||
# If no style is in settings defined for recipient, use default from settings
|
||||
if default_to_pgp_mime:
|
||||
gpg_to_smtp_mime.append(rcpt.email())
|
||||
gpg_to_cmdline_mime.extend(rcpt.key().split(','))
|
||||
else:
|
||||
gpg_to_smtp_inline.append(rcpt.email())
|
||||
gpg_to_cmdline_inline.extend(rcpt.key().split(','))
|
||||
|
||||
return gpg_to_smtp_mime, gpg_to_cmdline_mime, gpg_to_smtp_inline, gpg_to_cmdline_inline
|
||||
|
||||
|
||||
def _gpg_encrypt_and_return(message, cmdline, to, encrypt_f) -> str:
|
||||
msg_copy = copy.deepcopy(message)
|
||||
_customise_headers(msg_copy)
|
||||
encrypted_payloads = encrypt_f(msg_copy, cmdline)
|
||||
msg_copy.set_payload(encrypted_payloads)
|
||||
return msg_copy.as_string()
|
||||
|
||||
|
||||
def _gpg_encrypt_and_deliver(message, cmdline, to, encrypt_f):
|
||||
out = _gpg_encrypt_and_return(message, cmdline, to, encrypt_f)
|
||||
send_msg(out, to)
|
||||
|
||||
|
||||
def _customise_headers(msg_copy):
|
||||
if conf.config_item_equals('default', 'add_header', 'yes'):
|
||||
msg_copy['X-GPG-Mailgate'] = 'Encrypted by GPG Mailgate'
|
||||
|
||||
if 'Content-Transfer-Encoding' in msg_copy:
|
||||
msg_copy.replace_header('Content-Transfer-Encoding', '8BIT')
|
||||
else:
|
||||
msg_copy['Content-Transfer-Encoding'] = '8BIT'
|
||||
|
||||
|
||||
def _load_keys():
|
||||
"""Return a map from a key's fingerprint to email address."""
|
||||
keyring = kcache.KeyRing(conf.get_item('gpg', 'keyhome'))
|
||||
return asyncio.run(keyring.freeze_identities())
|
||||
|
||||
|
||||
class GpgRecipient:
|
||||
"""A tuple-like object that contains GPG recipient data."""
|
||||
|
||||
def __init__(self, left, right):
|
||||
"""Initialise a tuple-like object that contains GPG recipient data."""
|
||||
self._left = left
|
||||
self._right = right
|
||||
|
||||
def __getitem__(self, index):
|
||||
"""Pretend this object is a tuple by returning an indexed tuple element."""
|
||||
if index == 0:
|
||||
return self._left
|
||||
elif index == 1:
|
||||
return self._right
|
||||
else:
|
||||
raise IndexError()
|
||||
|
||||
def __repr__(self):
|
||||
"""Return textual representation of this GPG Recipient."""
|
||||
return f"GpgRecipient({self._left!r}, {self._right!r})"
|
||||
|
||||
def email(self):
|
||||
"""Return this recipient's email address."""
|
||||
return self._left
|
||||
|
||||
def key(self):
|
||||
"""Return this recipient's key ID."""
|
||||
return self._right
|
||||
|
||||
|
||||
def _identify_gpg_recipients(recipients, keys: kcache.KeyCache):
|
||||
# This list will be filled with pairs (M, N), where M is the destination
|
||||
# address we're going to deliver the message to and N is the identity we're
|
||||
# going to encrypt it for.
|
||||
gpg_to = list()
|
||||
|
||||
# This will be the list of recipients that haven't provided us with their
|
||||
# public keys.
|
||||
ungpg_to = list()
|
||||
|
||||
# In "strict mode", only keys included in configuration are used to encrypt
|
||||
# email.
|
||||
strict_mode = conf.strict_mode()
|
||||
|
||||
# GnuPG keys found in our keyring.
|
||||
|
||||
LOG.info(f'Processisng recipients: {recipients!r}; keys: {keys!r}')
|
||||
for to in recipients:
|
||||
LOG.info(f"At to={to!r}")
|
||||
own_key = _try_configured_key(to, keys)
|
||||
if own_key is not None:
|
||||
gpg_to.append(GpgRecipient(own_key[0], own_key[1]))
|
||||
continue
|
||||
|
||||
direct_key = _try_direct_key_lookup(to, keys, strict_mode)
|
||||
if direct_key is not None:
|
||||
gpg_to.append(GpgRecipient(direct_key[0], direct_key[1]))
|
||||
continue
|
||||
|
||||
domain_key = _try_configured_domain_key(to, keys)
|
||||
if domain_key is not None:
|
||||
gpg_to.append(GpgRecipient(domain_key[0], domain_key[1]))
|
||||
continue
|
||||
|
||||
ungpg_to.append(to)
|
||||
|
||||
LOG.debug(f'Collected recipients; GPG: {gpg_to}; UnGPG: {ungpg_to}')
|
||||
return gpg_to, ungpg_to
|
||||
|
||||
|
||||
def _find_key(recipient, keys, strict_mode):
|
||||
own_key = _try_configured_key(recipient, keys)
|
||||
if own_key is not None:
|
||||
return own_key
|
||||
|
||||
direct_key = _try_direct_key_lookup(recipient, keys, strict_mode)
|
||||
if direct_key is not None:
|
||||
return direct_key
|
||||
|
||||
domain_key = _try_configured_domain_key(recipient, keys)
|
||||
if domain_key is not None:
|
||||
return domain_key
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _try_configured_key(recipient, keys):
|
||||
if conf.config_item_set('enc_keymap', recipient):
|
||||
key = conf.get_item('enc_keymap', recipient)
|
||||
if key in keys:
|
||||
LOG.debug(f"Found key {key} configured for {recipient}")
|
||||
return (recipient, key)
|
||||
|
||||
LOG.debug(f"No configured key found for {recipient}")
|
||||
return None
|
||||
|
||||
|
||||
def _try_direct_key_lookup(recipient, keys, strict_mode):
|
||||
if strict_mode:
|
||||
return None
|
||||
|
||||
if keys.has_email(recipient):
|
||||
LOG.info(f"Found key for {recipient}")
|
||||
return recipient, recipient
|
||||
|
||||
(newto, topic) = text.parse_delimiter(recipient)
|
||||
if keys.has_email(newto):
|
||||
LOG.info(f"Found key for {newto}, stripped {recipient}")
|
||||
return recipient, newto
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _try_configured_domain_key(recipient, keys):
|
||||
parts = recipient.split('@')
|
||||
if len(parts) != 2:
|
||||
return None
|
||||
|
||||
domain = parts[1]
|
||||
if conf.config_item_set('enc_domain_keymap', domain):
|
||||
domain_key = conf.get_item('enc_domain_keymap', domain)
|
||||
if domain_key in keys:
|
||||
LOG.debug(f"Found domain key {domain_key} for {recipient}")
|
||||
return recipient, domain_key
|
||||
|
||||
LOG.debug(f"No domain key for {recipient}")
|
||||
return None
|
||||
|
||||
|
||||
def _encrypt_all_payloads_inline(message, gpg_to_cmdline):
|
||||
|
||||
# This breaks cascaded MIME messages. Blame PGP/INLINE.
|
||||
encrypted_payloads = list()
|
||||
if isinstance(message.get_payload(), str):
|
||||
return _encrypt_payload(message, gpg_to_cmdline).get_payload()
|
||||
|
||||
for payload in message.get_payload():
|
||||
if(isinstance(payload.get_payload(), list)):
|
||||
encrypted_payloads.extend(_encrypt_all_payloads_inline(payload, gpg_to_cmdline))
|
||||
else:
|
||||
encrypted_payloads.append(_encrypt_payload(payload, gpg_to_cmdline))
|
||||
|
||||
return encrypted_payloads
|
||||
|
||||
|
||||
def _encrypt_all_payloads_mime(message, gpg_to_cmdline):
|
||||
# Convert a plain text email into PGP/MIME attachment style. Modeled after enigmail.
|
||||
pgp_ver_part = email.message.Message()
|
||||
pgp_ver_part.set_payload("Version: 1"+text.EOL)
|
||||
pgp_ver_part.set_type("application/pgp-encrypted")
|
||||
pgp_ver_part.set_param('PGP/MIME version identification', "", 'Content-Description')
|
||||
|
||||
encrypted_part = email.message.Message()
|
||||
encrypted_part.set_type("application/octet-stream")
|
||||
encrypted_part.set_param('name', "encrypted.asc")
|
||||
encrypted_part.set_param('OpenPGP encrypted message', "", 'Content-Description')
|
||||
encrypted_part.set_param('inline', "", 'Content-Disposition')
|
||||
encrypted_part.set_param('filename', "encrypted.asc", 'Content-Disposition')
|
||||
|
||||
if isinstance(message.get_payload(), str):
|
||||
# WTF! It seems to swallow the first line. Not sure why. Perhaps
|
||||
# it's skipping an imaginary blank line someplace. (ie skipping a header)
|
||||
# Workaround it here by prepending a blank line.
|
||||
# This happens only on text only messages.
|
||||
additionalSubHeader = ""
|
||||
encoding = sys.getdefaultencoding()
|
||||
if 'Content-Type' in message and not message['Content-Type'].startswith('multipart'):
|
||||
additionalSubHeader = "Content-Type: " + message['Content-Type'] + text.EOL
|
||||
(base, encoding) = text.parse_content_type(message['Content-Type'])
|
||||
LOG.debug(f"Identified encoding as {encoding}")
|
||||
encrypted_part.set_payload(additionalSubHeader+text.EOL + message.get_payload(decode=True).decode(encoding))
|
||||
check_nested = True
|
||||
else:
|
||||
processed_payloads = _generate_message_from_payloads(message)
|
||||
encrypted_part.set_payload(processed_payloads.as_string())
|
||||
check_nested = False
|
||||
|
||||
message.preamble = "This is an OpenPGP/MIME encrypted message (RFC 2440 and 3156)"
|
||||
|
||||
# Use this just to generate a MIME boundary string.
|
||||
junk_msg = MIMEMultipart()
|
||||
junk_str = junk_msg.as_string() # WTF! Without this, get_boundary() will return 'None'!
|
||||
boundary = junk_msg.get_boundary()
|
||||
|
||||
# This also modifies the boundary in the body of the message, ie it gets parsed.
|
||||
if 'Content-Type' in message:
|
||||
message.replace_header('Content-Type', f"multipart/encrypted; protocol=\"application/pgp-encrypted\"; boundary=\"{boundary}\""+text.EOL)
|
||||
else:
|
||||
message['Content-Type'] = f"multipart/encrypted; protocol=\"application/pgp-encrypted\"; boundary=\"{boundary}\""+text.EOL
|
||||
|
||||
return [pgp_ver_part, _encrypt_payload(encrypted_part, gpg_to_cmdline, check_nested)]
|
||||
|
||||
|
||||
def _encrypt_payload(payload, gpg_to_cmdline, check_nested=True):
|
||||
raw_payload = payload.get_payload(decode=True)
|
||||
if check_nested and text.is_pgp_inline(raw_payload):
|
||||
LOG.debug("Message is already pgp encrypted. No nested encryption needed.")
|
||||
return payload
|
||||
|
||||
# No check is needed for conf.get_item('gpg', 'keyhome') as this is already
|
||||
# done in method gpg_encrypt
|
||||
gpg = GnuPG.GPGEncryptor(conf.get_item('gpg', 'keyhome'), gpg_to_cmdline,
|
||||
payload.get_content_charset())
|
||||
gpg.update(raw_payload)
|
||||
encrypted_data, returncode = gpg.encrypt()
|
||||
LOG.debug("Return code from encryption=%d (0 indicates success)." % returncode)
|
||||
if returncode != 0:
|
||||
LOG.info("Encrytion failed with return code %d. Encryption aborted." % returncode)
|
||||
return payload
|
||||
|
||||
payload.set_payload(encrypted_data)
|
||||
isAttachment = payload.get_param('attachment', None, 'Content-Disposition') is not None
|
||||
|
||||
if isAttachment:
|
||||
filename = payload.get_filename()
|
||||
if filename:
|
||||
pgpFilename = filename + ".pgp"
|
||||
if not (payload.get('Content-Disposition') is None):
|
||||
payload.set_param('filename', pgpFilename, 'Content-Disposition')
|
||||
if not (payload.get('Content-Type') is None) and not (payload.get_param('name') is None):
|
||||
payload.set_param('name', pgpFilename)
|
||||
if not (payload.get('Content-Transfer-Encoding') is None):
|
||||
payload.replace_header('Content-Transfer-Encoding', "7bit")
|
||||
|
||||
return payload
|
||||
|
||||
|
||||
def _smime_encrypt(raw_message, recipients):
|
||||
global LOG
|
||||
global from_addr
|
||||
|
||||
if not conf.config_item_set('smime', 'cert_path'):
|
||||
LOG.info("No valid path for S/MIME certs found in config file. S/MIME encryption aborted.")
|
||||
return recipients
|
||||
|
||||
cert_path = conf.get_item('smime', 'cert_path')+"/"
|
||||
s = SMIME.SMIME()
|
||||
sk = X509.X509_Stack()
|
||||
smime_to = list()
|
||||
unsmime_to = list()
|
||||
|
||||
for addr in recipients:
|
||||
cert_and_email = _get_cert_for_email(addr[0], cert_path)
|
||||
|
||||
if not (cert_and_email is None):
|
||||
(to_cert, normal_email) = cert_and_email
|
||||
LOG.debug("Found cert " + to_cert + " for " + addr + ": " + normal_email)
|
||||
smime_to.append(addr)
|
||||
x509 = X509.load_cert(to_cert, format=X509.FORMAT_PEM)
|
||||
sk.push(x509)
|
||||
else:
|
||||
unsmime_to.append(addr)
|
||||
|
||||
if smime_to:
|
||||
s.set_x509_stack(sk)
|
||||
s.set_cipher(SMIME.Cipher('aes_192_cbc'))
|
||||
p7 = s.encrypt(BIO.MemoryBuffer(raw_message.as_string()))
|
||||
# Output p7 in mail-friendly format.
|
||||
out = BIO.MemoryBuffer()
|
||||
out.write('From: ' + from_addr + text.EOL)
|
||||
out.write('To: ' + raw_message['To'] + text.EOL)
|
||||
if raw_message['Cc']:
|
||||
out.write('Cc: ' + raw_message['Cc'] + text.EOL)
|
||||
if raw_message['Bcc']:
|
||||
out.write('Bcc: ' + raw_message['Bcc'] + text.EOL)
|
||||
if raw_message['Subject']:
|
||||
out.write('Subject: ' + raw_message['Subject'] + text.EOL)
|
||||
|
||||
if conf.config_item_equals('default', 'add_header', 'yes'):
|
||||
out.write('X-GPG-Mailgate: Encrypted by GPG Mailgate' + text.EOL)
|
||||
|
||||
s.write(out, p7)
|
||||
|
||||
LOG.debug(f"Sending message from {from_addr} to {smime_to}")
|
||||
|
||||
send_msg(out.read(), smime_to)
|
||||
if unsmime_to:
|
||||
LOG.debug(f"Unable to find valid S/MIME certificates for {unsmime_to}")
|
||||
|
||||
return unsmime_to
|
||||
|
||||
|
||||
def _get_cert_for_email(to_addr, cert_path):
|
||||
insensitive = conf.config_item_equals('default', 'mail_case_insensitive', 'yes')
|
||||
|
||||
LOG.info(f'Retrieving certificate for {to_addr!r} from {cert_path!r}, sensitivity={insensitive!r}')
|
||||
|
||||
files_in_directory = os.listdir(cert_path)
|
||||
for filename in files_in_directory:
|
||||
file_path = os.path.join(cert_path, filename)
|
||||
if not os.path.isfile(file_path):
|
||||
continue
|
||||
|
||||
if insensitive:
|
||||
if filename.casefold() == to_addr:
|
||||
return (file_path, to_addr)
|
||||
else:
|
||||
if filename == to_addr:
|
||||
return (file_path, to_addr)
|
||||
|
||||
# support foo+ignore@bar.com -> foo@bar.com
|
||||
LOG.info(f"An email with topic? {to_addr}")
|
||||
(fixed_up_email, topic) = text.parse_delimiter(to_addr)
|
||||
LOG.info(f'Got {fixed_up_email!r} and {topic!r}')
|
||||
if topic is None:
|
||||
# delimiter not used
|
||||
LOG.info('Topic not found')
|
||||
return None
|
||||
else:
|
||||
LOG.info(f"Looking up certificate for {fixed_up_email} after parsing {to_addr}")
|
||||
return _get_cert_for_email(fixed_up_email, cert_path)
|
||||
|
||||
|
||||
def _generate_message_from_payloads(payloads, message=None):
|
||||
if message is None:
|
||||
message = email.mime.multipart.MIMEMultipart(payloads.get_content_subtype())
|
||||
|
||||
for payload in payloads.get_payload():
|
||||
if(isinstance(payload.get_payload(), list)):
|
||||
message.attach(_generate_message_from_payloads(payload))
|
||||
else:
|
||||
message.attach(payload)
|
||||
|
||||
return message
|
||||
|
||||
|
||||
def _get_first_payload(payloads):
|
||||
if payloads.is_multipart():
|
||||
return _get_first_payload(payloads.get_payload(0))
|
||||
else:
|
||||
return payloads
|
||||
|
||||
|
||||
def send_msg(message: str, recipients, fromaddr=None):
|
||||
"""Send MESSAGE to RECIPIENTS to the mail relay."""
|
||||
global from_addr
|
||||
|
||||
if fromaddr is not None:
|
||||
from_addr = fromaddr
|
||||
|
||||
recipients = [_f for _f in recipients if _f]
|
||||
if recipients:
|
||||
LOG.info(f"Sending email to: {recipients!r}")
|
||||
relay = conf.relay_params()
|
||||
smtp = smtplib.SMTP(relay[0], relay[1])
|
||||
if conf.flag_enabled('relay', 'starttls'):
|
||||
smtp.starttls()
|
||||
smtp.sendmail(from_addr, recipients, message)
|
||||
else:
|
||||
LOG.info("No recipient found")
|
||||
|
||||
|
||||
def _is_encrypted(raw_message):
|
||||
if raw_message.get_content_type() == 'multipart/encrypted':
|
||||
return True
|
||||
|
||||
first_part = _get_first_payload(raw_message)
|
||||
if first_part.get_content_type() == 'application/pkcs7-mime':
|
||||
return True
|
||||
|
||||
first_payload = first_part.get_payload(decode=True)
|
||||
return text.is_pgp_inline(first_payload)
|
||||
|
||||
|
||||
def delivery_plan(recipients, key_cache: kcache.KeyCache):
|
||||
"""Generate a sequence of delivery strategies."""
|
||||
gpg_to, ungpg_to = _identify_gpg_recipients(recipients, key_cache)
|
||||
|
||||
gpg_mime_to, gpg_mime_cmd, gpg_inline_to, gpg_inline_cmd = \
|
||||
_sort_gpg_recipients(gpg_to)
|
||||
|
||||
keyhome = conf.get_item('gpg', 'keyhome')
|
||||
|
||||
plan = []
|
||||
if gpg_mime_to:
|
||||
plan.append(MimeOpenPGPEncrypt(gpg_mime_to, gpg_mime_cmd, keyhome))
|
||||
if gpg_inline_to:
|
||||
plan.append(InlineOpenPGPEncrypt(gpg_inline_to, gpg_inline_cmd, keyhome))
|
||||
if ungpg_to:
|
||||
plan.append(KeepIntact(ungpg_to))
|
||||
|
||||
return plan
|
||||
|
||||
|
||||
def deliver_message(raw_message: email.message.Message, from_address, to_addrs):
|
||||
"""Send RAW_MESSAGE to all TO_ADDRS using the best encryption method available."""
|
||||
global from_addr
|
||||
|
||||
# Ugly workaround to keep the code working without too many changes.
|
||||
from_addr = from_address
|
||||
|
||||
recipients_left = [text.sanitize_case_sense(recipient) for recipient in to_addrs]
|
||||
|
||||
# There is no need for nested encryption
|
||||
LOG.debug("Seeing if it's already encrypted")
|
||||
if _is_encrypted(raw_message):
|
||||
LOG.debug("Message is already encrypted. Encryption aborted.")
|
||||
send_msg(raw_message.as_string(), recipients_left)
|
||||
return
|
||||
|
||||
# Encrypt mails for recipients with known public PGP keys
|
||||
LOG.debug("Encrypting with OpenPGP")
|
||||
recipients_left = _gpg_encrypt(raw_message, recipients_left)
|
||||
if not recipients_left:
|
||||
return
|
||||
|
||||
# Encrypt mails for recipients with known S/MIME certificate
|
||||
LOG.debug("Encrypting with S/MIME")
|
||||
recipients_left = _smime_encrypt(raw_message, recipients_left)
|
||||
if not recipients_left:
|
||||
return
|
||||
|
||||
# Send out mail to recipients which are left
|
||||
LOG.debug("Sending the rest as text/plain")
|
||||
send_msg(raw_message.as_string(), recipients_left)
|
||||
|
||||
|
||||
def exec_time_info(start_timestamp):
|
||||
"""Calculate time since the given timestamp."""
|
||||
elapsed_s = time.time() - start_timestamp
|
||||
process_t = time.process_time()
|
||||
return (elapsed_s, process_t)
|
||||
|
|
|
@ -0,0 +1,130 @@
|
|||
"""Mail operations for a given recipient.
|
||||
|
||||
There are 3 operations available:
|
||||
|
||||
- OpenPGPEncrypt: to deliver the message to a recipient with an OpenPGP public
|
||||
key available.
|
||||
|
||||
- SMimeEncrypt: to deliver the message to a recipient with an S/MIME
|
||||
certificate.
|
||||
|
||||
- KeepIntact: a no-operation (implementation of the Null Object pattern), used
|
||||
for messages already encrypted or those who haven't provided their keys or
|
||||
certificates.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import lacre.mailgate as mailgate
|
||||
from email.message import Message
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MailOperation:
|
||||
"""Contract for an operation to be performed on a message."""
|
||||
|
||||
def __init__(self, recipients=[]):
|
||||
"""Initialise the operation with a recipient."""
|
||||
self._recipients = recipients
|
||||
|
||||
def perform(self, message: Message):
|
||||
"""Perform this operation on MESSAGE.
|
||||
|
||||
Return target message.
|
||||
"""
|
||||
raise NotImplementedError(self.__class__())
|
||||
|
||||
def recipients(self):
|
||||
"""Return list of recipients of the message."""
|
||||
return self._recipients
|
||||
|
||||
def add_recipient(self, recipient):
|
||||
"""Register another message recipient."""
|
||||
self._recipients.append(recipient)
|
||||
|
||||
|
||||
class OpenPGPEncrypt(MailOperation):
|
||||
"""OpenPGP-encrypt the message."""
|
||||
|
||||
def __init__(self, recipients, keys, keyhome):
|
||||
"""Initialise encryption operation."""
|
||||
super().__init__(recipients)
|
||||
self._keys = keys
|
||||
self._keyhome = keyhome
|
||||
|
||||
def extend_keys(self, keys):
|
||||
"""Register GPG keys to encrypt this message for."""
|
||||
self._keys.extend(keys)
|
||||
|
||||
def __repr__(self):
|
||||
"""Generate a representation with just method and key."""
|
||||
return f"<{type(self).__name__} {self._recipients} {self._keys}>"
|
||||
|
||||
|
||||
class InlineOpenPGPEncrypt(OpenPGPEncrypt):
|
||||
"""Inline encryption strategy."""
|
||||
|
||||
def __init__(self, recipients, keys, keyhome):
|
||||
"""Initialise strategy object."""
|
||||
super().__init__(recipients, keys, keyhome)
|
||||
|
||||
def perform(self, msg: Message):
|
||||
"""Encrypt with PGP Inline."""
|
||||
LOG.debug('Sending PGP/Inline...')
|
||||
return mailgate._gpg_encrypt_and_return(msg,
|
||||
self._keys, self._recipients,
|
||||
mailgate._encrypt_all_payloads_inline)
|
||||
|
||||
|
||||
class MimeOpenPGPEncrypt(OpenPGPEncrypt):
|
||||
"""MIME encryption strategy."""
|
||||
|
||||
def __init__(self, recipients, keys, keyhome):
|
||||
"""Initialise strategy object."""
|
||||
super().__init__(recipients, keys, keyhome)
|
||||
|
||||
def perform(self, msg: Message):
|
||||
"""Encrypt with PGP MIME."""
|
||||
LOG.debug('Sending PGP/MIME...')
|
||||
return mailgate._gpg_encrypt_and_return(msg,
|
||||
self._keys, self._recipients,
|
||||
mailgate._encrypt_all_payloads_mime)
|
||||
|
||||
|
||||
class SMimeEncrypt(MailOperation):
|
||||
"""S/MIME encryption operation."""
|
||||
|
||||
def __init__(self, recipient, email, certificate):
|
||||
"""Initialise S/MIME encryption for a given EMAIL and CERTIFICATE."""
|
||||
super().__init__(recipient)
|
||||
self._email = email
|
||||
self._cert = certificate
|
||||
|
||||
def perform(self, message: Message):
|
||||
"""Encrypt with a certificate."""
|
||||
LOG.warning(f"Delivering clear-text to {self._recipients}")
|
||||
return message
|
||||
|
||||
def __repr__(self):
|
||||
"""Generate a representation with just method and key."""
|
||||
return f"<S/MIME {self._recipients}, {self._cert}>"
|
||||
|
||||
|
||||
class KeepIntact(MailOperation):
|
||||
"""A do-nothing operation (Null Object implementation).
|
||||
|
||||
This operation should be used for mail that's already encrypted.
|
||||
"""
|
||||
|
||||
def __init__(self, recipients):
|
||||
"""Initialise pass-through operation for a given recipient."""
|
||||
super().__init__(recipients)
|
||||
|
||||
def perform(self, message: Message):
|
||||
"""Return MESSAGE unmodified."""
|
||||
return message.as_string()
|
||||
|
||||
def __repr__(self):
|
||||
"""Return representation with just method and email."""
|
||||
return f"<KeepIntact {self._recipients}>"
|
|
@ -1,5 +1,8 @@
|
|||
import sys
|
||||
import re
|
||||
import logging
|
||||
|
||||
import lacre.config as conf
|
||||
|
||||
# The standard way to encode line-ending in email:
|
||||
EOL = "\r\n"
|
||||
|
@ -7,29 +10,58 @@ EOL = "\r\n"
|
|||
PGP_INLINE_BEGIN = b"-----BEGIN PGP MESSAGE-----"
|
||||
PGP_INLINE_END = b"-----END PGP MESSAGE-----"
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def parse_content_type(content_type):
|
||||
parts = [p.strip() for p in content_type.split(';')]
|
||||
if len(parts) == 1:
|
||||
# No additional attributes provided. Use default encoding.
|
||||
return (content_type, sys.getdefaultencoding())
|
||||
"""Analyse Content-Type email header.
|
||||
|
||||
# At least one attribute provided. Find out if any of them is named
|
||||
# 'charset' and if so, use it.
|
||||
ctype = parts[0]
|
||||
encoding = [p for p in parts[1:] if p.startswith('charset=') ]
|
||||
if encoding:
|
||||
eq_idx = encoding[0].index('=')
|
||||
return (ctype, encoding[0][eq_idx+1:])
|
||||
else:
|
||||
return (ctype, sys.getdefaultencoding())
|
||||
Return a pair: type and sub-type.
|
||||
"""
|
||||
parts = [p.strip() for p in content_type.split(';')]
|
||||
if len(parts) == 1:
|
||||
# No additional attributes provided. Use default encoding.
|
||||
return (content_type, sys.getdefaultencoding())
|
||||
|
||||
def parse_delimiter(address):
|
||||
withdelim = re.match('^([^\+]+)\+([^@]+)@(.*)$', address)
|
||||
if withdelim:
|
||||
return (withdelim.group(1) + '@' + withdelim.group(3), withdelim.group(2))
|
||||
else:
|
||||
return (address, None)
|
||||
# At least one attribute provided. Find out if any of them is named
|
||||
# 'charset' and if so, use it.
|
||||
ctype = parts[0]
|
||||
encoding = [p for p in parts[1:] if p.startswith('charset=')]
|
||||
if encoding:
|
||||
eq_idx = encoding[0].index('=')
|
||||
return (ctype, encoding[0][eq_idx+1:])
|
||||
else:
|
||||
return (ctype, sys.getdefaultencoding())
|
||||
|
||||
def is_pgp_inline(payload):
|
||||
"""Finds out if the payload (bytes) contains PGP/INLINE markers."""
|
||||
return PGP_INLINE_BEGIN in payload and PGP_INLINE_END in payload
|
||||
|
||||
def parse_delimiter(address: str):
|
||||
"""Parse an email with delimiter and topic.
|
||||
|
||||
Return destination emaili and topic as a tuple.
|
||||
"""
|
||||
withdelim = re.match('^([^\\+]+)\\+([^@]+)@(.*)$', address)
|
||||
LOG.debug(f'Parsed email: {withdelim!r}')
|
||||
|
||||
if withdelim:
|
||||
return (withdelim.group(1) + '@' + withdelim.group(3), withdelim.group(2))
|
||||
else:
|
||||
return (address, None)
|
||||
|
||||
|
||||
def sanitize_case_sense(address):
|
||||
"""Sanitize email case."""
|
||||
# TODO: find a way to make it more unit-testable
|
||||
if conf.flag_enabled('default', 'mail_case_insensitive'):
|
||||
address = address.lower()
|
||||
else:
|
||||
splitted_address = address.split('@')
|
||||
if len(splitted_address) > 1:
|
||||
address = splitted_address[0] + '@' + splitted_address[1].lower()
|
||||
|
||||
return address
|
||||
|
||||
|
||||
def is_pgp_inline(payload) -> bool:
|
||||
"""Find out if the payload (bytes) contains PGP/INLINE markers."""
|
||||
return PGP_INLINE_BEGIN in payload and PGP_INLINE_END in payload
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
SQLAlchemy==1.4.32
|
||||
Markdown==3.4.1
|
||||
M2Crypto==0.38.0
|
||||
requests==2.27.1
|
||||
watchdog==2.1.9
|
|
@ -0,0 +1,117 @@
|
|||
#
|
||||
# gpg-mailgate
|
||||
#
|
||||
# This file is part of the gpg-mailgate source code.
|
||||
#
|
||||
# gpg-mailgate is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# gpg-mailgate source code is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
import configparser
|
||||
import logging
|
||||
import subprocess
|
||||
import os
|
||||
import time
|
||||
|
||||
|
||||
def _spawn(cmd):
|
||||
env_dict = {
|
||||
"PATH": os.getenv("PATH"),
|
||||
"PYTHONPATH": os.getcwd(),
|
||||
"GPG_MAILGATE_CONFIG": "test/gpg-mailgate-daemon-test.conf"
|
||||
}
|
||||
logging.debug(f"Spawning command: {cmd} with environment: {env_dict!r}")
|
||||
return subprocess.Popen(cmd,
|
||||
stdin=None,
|
||||
stdout=subprocess.PIPE,
|
||||
env=env_dict)
|
||||
|
||||
|
||||
def _interrupt(proc):
|
||||
# proc.send_signal(signal.SIGINT)
|
||||
proc.terminate()
|
||||
|
||||
|
||||
def _load(name):
|
||||
logging.debug(f"Loading file {name}")
|
||||
f = open(name, "r")
|
||||
contents = f.read()
|
||||
f.close()
|
||||
return contents
|
||||
|
||||
|
||||
def _send(host, port, mail_from, mail_to, message):
|
||||
logging.debug(f"Sending message to {host}:{port}")
|
||||
_spawn([os.getenv("PYTHON") or "python",
|
||||
"test/utils/sendmail.py",
|
||||
"-f", mail_from,
|
||||
"-t", mail_to,
|
||||
"-m", message])
|
||||
|
||||
|
||||
def _load_test_config():
|
||||
cp = configparser.ConfigParser()
|
||||
cp.read("test/e2e.ini")
|
||||
return cp
|
||||
|
||||
|
||||
def _report_result(message_file, expected, test_output):
|
||||
status = None
|
||||
if expected in test_output:
|
||||
status = "Success"
|
||||
else:
|
||||
status = "Failure"
|
||||
|
||||
print(message_file.ljust(30), status)
|
||||
|
||||
|
||||
def _execute_case(config, case_name):
|
||||
logging.info(f"Executing case {case_name}")
|
||||
python = os.getenv("PYTHON", "python")
|
||||
|
||||
relay_mock = _spawn([python, "test/utils/relay.py", "2500"])
|
||||
time.sleep(1) # Wait for the relay to start up.
|
||||
|
||||
_send("localhost", 10025, "dave@disposlab",
|
||||
config.get(case_name, 'to'), config.get(case_name, 'in'))
|
||||
|
||||
relay_mock.wait()
|
||||
(test_out, _) = relay_mock.communicate()
|
||||
|
||||
test_out = test_out.decode('utf-8')
|
||||
logging.debug(f"Read {len(test_out)} characters of output: '{test_out}'")
|
||||
|
||||
_report_result(config.get(case_name, "in"), config.get(case_name, "out"), test_out)
|
||||
|
||||
|
||||
def _main():
|
||||
conf = _load_test_config()
|
||||
|
||||
logging.basicConfig(filename="test/logs/daemon-test.log",
|
||||
format="%(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
level=logging.DEBUG)
|
||||
|
||||
logging.info("Starting Lacre Daemon tests...")
|
||||
python = os.getenv("PYTHON", "python")
|
||||
|
||||
server = _spawn([python, "-m", "lacre.daemon"])
|
||||
|
||||
for case_no in range(1, conf.getint("tests", "cases")):
|
||||
_execute_case(conf, case_name=f"case-{case_no}")
|
||||
|
||||
_interrupt(server)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
_main()
|
|
@ -22,7 +22,7 @@
|
|||
|
||||
[relay]
|
||||
port: 2500
|
||||
script: test/relay.py
|
||||
script: test/utils/relay.py
|
||||
|
||||
[dirs]
|
||||
keys: test/keyhome
|
||||
|
|
|
@ -1,38 +1,34 @@
|
|||
#
|
||||
# gpg-mailgate
|
||||
# gpg-mailgate
|
||||
#
|
||||
# This file is part of the gpg-mailgate source code.
|
||||
# This file is part of the gpg-mailgate source code.
|
||||
#
|
||||
# gpg-mailgate is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
# gpg-mailgate is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# gpg-mailgate source code is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
# gpg-mailgate source code is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with gpg-mailgate source code. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
import subprocess
|
||||
|
||||
import difflib
|
||||
|
||||
import configparser
|
||||
import logging
|
||||
|
||||
from time import sleep
|
||||
|
||||
RELAY_SCRIPT = "test/relay.py"
|
||||
RELAY_SCRIPT = "test/utils/relay.py"
|
||||
CONFIG_FILE = "test/gpg-mailgate.conf"
|
||||
|
||||
def build_config(config):
|
||||
|
||||
def _build_config(config):
|
||||
cp = configparser.RawConfigParser()
|
||||
|
||||
cp.add_section("logging")
|
||||
|
@ -48,6 +44,10 @@ def build_config(config):
|
|||
cp.set("relay", "host", "localhost")
|
||||
cp.set("relay", "port", config["port"])
|
||||
|
||||
cp.add_section("daemon")
|
||||
cp.set("daemon", "host", "localhost")
|
||||
cp.set("daemon", "port", "10025")
|
||||
|
||||
cp.add_section("enc_keymap")
|
||||
cp.set("enc_keymap", "alice@disposlab", "1CD245308F0963D038E88357973CF4D9387C44D7")
|
||||
cp.set("enc_keymap", "bob@disposlab", "19CF4B47ECC9C47AFA84D4BD96F39FDA0E31BB67")
|
||||
|
@ -61,24 +61,27 @@ def build_config(config):
|
|||
logging.debug(f"Created config with keyhome={config['gpg_keyhome']}, cert_path={config['smime_certpath']} and relay at port {config['port']}")
|
||||
return cp
|
||||
|
||||
def write_test_config(outfile, **config):
|
||||
|
||||
def _write_test_config(outfile, **config):
|
||||
logging.debug(f"Generating configuration with {config!r}")
|
||||
|
||||
out = open(outfile, "w+")
|
||||
cp = build_config(config)
|
||||
cp = _build_config(config)
|
||||
cp.write(out)
|
||||
out.close()
|
||||
|
||||
logging.debug(f"Wrote configuration to {outfile}")
|
||||
|
||||
def load_file(name):
|
||||
f = open(name, 'r')
|
||||
contents = f.read()
|
||||
f.close()
|
||||
|
||||
return bytes(contents, 'utf-8')
|
||||
def _load_file(name):
|
||||
f = open(name, 'r')
|
||||
contents = f.read()
|
||||
f.close()
|
||||
|
||||
def report_result(message_file, expected, test_output):
|
||||
return bytes(contents, 'utf-8')
|
||||
|
||||
|
||||
def _report_result(message_file, expected, test_output):
|
||||
status = None
|
||||
if expected in test_output:
|
||||
status = "Success"
|
||||
|
@ -87,7 +90,8 @@ def report_result(message_file, expected, test_output):
|
|||
|
||||
print(message_file.ljust(30), status)
|
||||
|
||||
def execute_e2e_test(case_name, config, config_path):
|
||||
|
||||
def _execute_e2e_test(case_name, config, config_path):
|
||||
"""Read test case configuration from config and run that test case.
|
||||
|
||||
Parameter case_name should refer to a section in test
|
||||
|
@ -107,17 +111,17 @@ def execute_e2e_test(case_name, config, config_path):
|
|||
|
||||
logging.debug(f"Spawning relay: {relay_cmd}")
|
||||
relay_proc = subprocess.Popen(relay_cmd,
|
||||
stdin = None,
|
||||
stdout = subprocess.PIPE)
|
||||
stdin=None,
|
||||
stdout=subprocess.PIPE)
|
||||
|
||||
logging.debug(f"Spawning GPG-Lacre: {gpglacre_cmd}, stdin = {config.get(case_name, 'in')}")
|
||||
|
||||
# pass PATH because otherwise it would be dropped
|
||||
gpglacre_proc = subprocess.run(gpglacre_cmd,
|
||||
input = load_file(config.get(case_name, "in")),
|
||||
capture_output = True,
|
||||
env = {"GPG_MAILGATE_CONFIG": config_path,
|
||||
"PATH": os.getenv("PATH")})
|
||||
input=_load_file(config.get(case_name, "in")),
|
||||
capture_output=True,
|
||||
env={"GPG_MAILGATE_CONFIG": config_path,
|
||||
"PATH": os.getenv("PATH")})
|
||||
|
||||
# Let the relay process the data.
|
||||
relay_proc.wait()
|
||||
|
@ -127,16 +131,17 @@ def execute_e2e_test(case_name, config, config_path):
|
|||
|
||||
logging.debug(f"Read {len(testout)} characters of test output: '{testout}'")
|
||||
|
||||
report_result(config.get(case_name, "in"), config.get(case_name, "out"), testout)
|
||||
_report_result(config.get(case_name, "in"), config.get(case_name, "out"), testout)
|
||||
|
||||
def load_test_config():
|
||||
|
||||
def _load_test_config():
|
||||
cp = configparser.ConfigParser()
|
||||
cp.read("test/e2e.ini")
|
||||
|
||||
return cp
|
||||
|
||||
|
||||
config = load_test_config()
|
||||
config = _load_test_config()
|
||||
|
||||
logging.basicConfig(filename = config.get("tests", "e2e_log"),
|
||||
# Get raw values of log and date formats because they
|
||||
|
@ -148,7 +153,7 @@ logging.basicConfig(filename = config.get("tests", "e2e_log"),
|
|||
|
||||
config_path = os.getcwd() + "/" + CONFIG_FILE
|
||||
|
||||
write_test_config(config_path,
|
||||
_write_test_config(config_path,
|
||||
port = config.get("relay", "port"),
|
||||
gpg_keyhome = config.get("dirs", "keys"),
|
||||
smime_certpath = config.get("dirs", "certs"),
|
||||
|
@ -158,6 +163,6 @@ for case_no in range(1, config.getint("tests", "cases")+1):
|
|||
case_name = f"case-{case_no}"
|
||||
logging.info(f"Executing {case_name}: {config.get(case_name, 'descr')}")
|
||||
|
||||
execute_e2e_test(case_name, config, config_path)
|
||||
_execute_e2e_test(case_name, config, config_path)
|
||||
|
||||
print("See diagnostic output for details. Tests: '%s', Lacre: '%s'" % (config.get("tests", "e2e_log"), config.get("tests", "lacre_log")))
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
[logging]
|
||||
config = test/gpg-lacre-log.ini
|
||||
file = test/logs/gpg-mailgate.log
|
||||
format = %(asctime)s %(module)s[%(process)d]: %(message)s
|
||||
date_format = ISO
|
||||
|
||||
[gpg]
|
||||
keyhome = test/keyhome
|
||||
cache_refresh_minutes = 1
|
||||
|
||||
[smime]
|
||||
cert_path = test/certs
|
||||
|
||||
[database]
|
||||
enabled = yes
|
||||
url = sqlite:///test/lacre.db
|
||||
|
||||
[relay]
|
||||
host = localhost
|
||||
port = 2500
|
||||
|
||||
[daemon]
|
||||
host = localhost
|
||||
port = 10025
|
||||
|
||||
[cron]
|
||||
send_email = no
|
||||
|
||||
[enc_keymap]
|
||||
alice@disposlab = 1CD245308F0963D038E88357973CF4D9387C44D7
|
||||
bob@disposlab = 19CF4B47ECC9C47AFA84D4BD96F39FDA0E31BB67
|
||||
|
|
@ -4,11 +4,11 @@ import unittest
|
|||
|
||||
class GnuPGUtilitiesTest(unittest.TestCase):
|
||||
def test_build_default_command(self):
|
||||
cmd = GnuPG.build_command("test/keyhome")
|
||||
cmd = GnuPG._build_command("test/keyhome")
|
||||
self.assertEqual(cmd, ["gpg", "--homedir", "test/keyhome"])
|
||||
|
||||
def test_build_command_extended_with_args(self):
|
||||
cmd = GnuPG.build_command("test/keyhome", "--foo", "--bar")
|
||||
cmd = GnuPG._build_command("test/keyhome", "--foo", "--bar")
|
||||
self.assertEqual(cmd, ["gpg", "--homedir", "test/keyhome", "--foo", "--bar"])
|
||||
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
from lacre.keyring import KeyCache
|
||||
|
||||
import unittest
|
||||
|
||||
class LacreKeyCacheTest(unittest.TestCase):
|
||||
def test_extend_keyring(self):
|
||||
kc = KeyCache({'FINGERPRINT': 'john.doe@example.com'})
|
||||
self.assertTrue('FINGERPRINT' in kc)
|
||||
|
||||
def test_membership_methods(self):
|
||||
kc = KeyCache({
|
||||
'FINGERPRINT': 'alice@example.com',
|
||||
'OTHERPRINT': 'bob@example.com'
|
||||
})
|
||||
|
||||
self.assertTrue('FINGERPRINT' in kc)
|
||||
self.assertFalse('FOOTPRINT' in kc)
|
||||
|
||||
self.assertTrue(kc.has_email('bob@example.com'))
|
||||
self.assertFalse(kc.has_email('dave@example.com'))
|
|
@ -0,0 +1,37 @@
|
|||
import lacre.text
|
||||
import sys
|
||||
|
||||
import unittest
|
||||
|
||||
class LacreTextTest(unittest.TestCase):
|
||||
def test_parse_content_type_without_charset(self):
|
||||
(mtype, mcharset) = lacre.text.parse_content_type('text/plain')
|
||||
self.assertEqual(mtype, 'text/plain')
|
||||
self.assertEqual(mcharset, sys.getdefaultencoding())
|
||||
|
||||
def test_parse_content_type_with_charset(self):
|
||||
(mtype, mcharset) = lacre.text.parse_content_type('text/plain; charset="UTF-8"')
|
||||
self.assertEqual(mtype, 'text/plain')
|
||||
self.assertEqual(mcharset, '"UTF-8"')
|
||||
|
||||
def test_parse_content_type_with_other_attributes(self):
|
||||
(mtype, mcharset) = lacre.text.parse_content_type('text/plain; some-param="Some Value"')
|
||||
self.assertEqual(mtype, 'text/plain')
|
||||
self.assertEqual(mcharset, sys.getdefaultencoding())
|
||||
|
||||
def test_parse_content_type_with_several_attributes(self):
|
||||
(mtype, mcharset) = lacre.text.parse_content_type('text/plain; charset="UTF-8"; some-param="Some Value"')
|
||||
self.assertEqual(mtype, 'text/plain')
|
||||
self.assertEqual(mcharset, '"UTF-8"')
|
||||
|
||||
def test_parse_email_without_delimiter(self):
|
||||
addr = "Some.Name@example.com"
|
||||
(addr2, topic) = lacre.text.parse_delimiter(addr)
|
||||
self.assertEqual(addr2, "Some.Name@example.com")
|
||||
self.assertEqual(topic, None)
|
||||
|
||||
def test_parse_email_with_delimiter(self):
|
||||
addr = "Some.Name+some-topic@example.com"
|
||||
(addr2, topic) = lacre.text.parse_delimiter(addr)
|
||||
self.assertEqual(addr2, "Some.Name@example.com")
|
||||
self.assertEqual(topic, "some-topic")
|
|
@ -0,0 +1,15 @@
|
|||
from M2Crypto import BIO
|
||||
import unittest
|
||||
|
||||
class M2CryptoBioMemoryBufferTest(unittest.TestCase):
|
||||
def test_memory_buffer_write_str(self):
|
||||
mb = BIO.MemoryBuffer()
|
||||
mb.write("Foo")
|
||||
mb.close()
|
||||
self.assertEqual(len(mb), 3)
|
||||
|
||||
def test_memory_buffer_write_bytes(self):
|
||||
mb = BIO.MemoryBuffer()
|
||||
mb.write(b"Foo")
|
||||
mb.close()
|
||||
self.assertEqual(len(mb), 3)
|
104
test/relay.py
104
test/relay.py
|
@ -1,104 +0,0 @@
|
|||
#!/usr/local/bin/python2
|
||||
#
|
||||
# This quick-and-dirty script supports only the happy case of SMTP session,
|
||||
# i.e. what gpg-mailgate/gpg-lacre needs to deliver encrypted email.
|
||||
#
|
||||
# It listens on the port given as the only command-line argument and consumes a
|
||||
# message, then prints it to standard output. The goal is to be able to
|
||||
# compare that output with expected clear-text or encrypted message body.
|
||||
#
|
||||
|
||||
import sys
|
||||
import socket
|
||||
|
||||
import logging
|
||||
|
||||
|
||||
EXIT_UNAVAILABLE = 1
|
||||
|
||||
ENCODING = 'utf-8'
|
||||
|
||||
BUFFER_SIZE = 4096
|
||||
EOM = "\r\n.\r\n"
|
||||
LAST_LINE = -3
|
||||
|
||||
|
||||
def welcome(msg):
|
||||
return b"220 %b\r\n" % (msg)
|
||||
|
||||
def ok(msg = b"OK"):
|
||||
return b"250 %b\r\n" % (msg)
|
||||
|
||||
def bye():
|
||||
return b"251 Bye"
|
||||
|
||||
def provide_message():
|
||||
return b"354 Enter a message, ending it with a '.' on a line by itself\r\n"
|
||||
|
||||
def receive_and_confirm(session):
|
||||
session.recv(BUFFER_SIZE)
|
||||
session.sendall(ok())
|
||||
|
||||
def localhost_at(port):
|
||||
return ('127.0.0.1', port)
|
||||
|
||||
def serve(port):
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
try:
|
||||
s.bind(localhost_at(port))
|
||||
logging.info(f"Listening on localhost, port {port}")
|
||||
s.listen(1)
|
||||
logging.info("Listening...")
|
||||
except socket.error as e:
|
||||
print("Cannot connect", e)
|
||||
logging.error(f"Cannot connect {e}")
|
||||
sys.exit(EXIT_UNAVAILABLE)
|
||||
|
||||
logging.debug("About to accept a connection...")
|
||||
(conn, addr) = s.accept()
|
||||
logging.debug(f"Accepting connection from {conn}")
|
||||
conn.sendall(welcome(b"TEST SERVER"))
|
||||
|
||||
receive_and_confirm(conn) # Ignore HELO/EHLO
|
||||
receive_and_confirm(conn) # Ignore sender address
|
||||
receive_and_confirm(conn) # Ignore recipient address
|
||||
|
||||
data = conn.recv(BUFFER_SIZE)
|
||||
conn.sendall(provide_message())
|
||||
|
||||
# Consume until we get <CR><LF>.<CR><LF>, the end-of-message marker.
|
||||
message = ''
|
||||
while not message.endswith(EOM):
|
||||
message += conn.recv(BUFFER_SIZE).decode(ENCODING)
|
||||
conn.sendall(ok(b"OK, id=test"))
|
||||
|
||||
conn.recv(BUFFER_SIZE)
|
||||
conn.sendall(bye())
|
||||
|
||||
conn.close()
|
||||
|
||||
logging.debug(f"Received {len(message)} characters of data")
|
||||
|
||||
# Trim EOM marker as we're only interested in the message body.
|
||||
return message[:-len(EOM)]
|
||||
|
||||
def error(msg, exit_code):
|
||||
logging.error(msg)
|
||||
print("ERROR: %s" % (msg))
|
||||
sys.exit(exit_code)
|
||||
|
||||
|
||||
# filename is relative to where we run the tests from, i.e. the project root
|
||||
# directory
|
||||
logging.basicConfig(filename = 'test/logs/relay.log',
|
||||
format = '%(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s',
|
||||
datefmt = '%Y-%m-%d %H:%M:%S',
|
||||
level = logging.DEBUG)
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
error("Usage: relay.py PORT_NUMBER", EXIT_UNAVAILABLE)
|
||||
|
||||
port = int(sys.argv[1])
|
||||
body = serve(port)
|
||||
|
||||
print(body)
|
|
@ -1,37 +0,0 @@
|
|||
import lacre.text
|
||||
import sys
|
||||
|
||||
import unittest
|
||||
|
||||
class LacreTextTest(unittest.TestCase):
|
||||
def test_parse_content_type_without_charset(self):
|
||||
(mtype, mcharset) = lacre.text.parse_content_type('text/plain')
|
||||
self.assertEqual(mtype, 'text/plain')
|
||||
self.assertEqual(mcharset, sys.getdefaultencoding())
|
||||
|
||||
def test_parse_content_type_with_charset(self):
|
||||
(mtype, mcharset) = lacre.text.parse_content_type('text/plain; charset="UTF-8"')
|
||||
self.assertEqual(mtype, 'text/plain')
|
||||
self.assertEqual(mcharset, '"UTF-8"')
|
||||
|
||||
def test_parse_content_type_with_other_attributes(self):
|
||||
(mtype, mcharset) = lacre.text.parse_content_type('text/plain; some-param="Some Value"')
|
||||
self.assertEqual(mtype, 'text/plain')
|
||||
self.assertEqual(mcharset, sys.getdefaultencoding())
|
||||
|
||||
def test_parse_content_type_with_several_attributes(self):
|
||||
(mtype, mcharset) = lacre.text.parse_content_type('text/plain; charset="UTF-8"; some-param="Some Value"')
|
||||
self.assertEqual(mtype, 'text/plain')
|
||||
self.assertEqual(mcharset, '"UTF-8"')
|
||||
|
||||
def test_parse_email_without_delimiter(self):
|
||||
addr = "Some.Name@example.com"
|
||||
(addr2, topic) = lacre.text.parse_delimiter(addr)
|
||||
self.assertEqual(addr2, "Some.Name@example.com")
|
||||
self.assertEqual(topic, None)
|
||||
|
||||
def test_parse_email_with_delimiter(self):
|
||||
addr = "Some.Name+some-topic@example.com"
|
||||
(addr2, topic) = lacre.text.parse_delimiter(addr)
|
||||
self.assertEqual(addr2, "Some.Name@example.com")
|
||||
self.assertEqual(topic, "some-topic")
|
|
@ -0,0 +1,104 @@
|
|||
#!/usr/local/bin/python2
|
||||
#
|
||||
# This quick-and-dirty script supports only the happy case of SMTP session,
|
||||
# i.e. what gpg-mailgate/gpg-lacre needs to deliver encrypted email.
|
||||
#
|
||||
# It listens on the port given as the only command-line argument and consumes a
|
||||
# message, then prints it to standard output. The goal is to be able to
|
||||
# compare that output with expected clear-text or encrypted message body.
|
||||
#
|
||||
|
||||
import sys
|
||||
import socket
|
||||
|
||||
import logging
|
||||
|
||||
|
||||
EXIT_UNAVAILABLE = 1
|
||||
|
||||
ENCODING = 'utf-8'
|
||||
|
||||
BUFFER_SIZE = 4096
|
||||
EOM = "\r\n.\r\n"
|
||||
LAST_LINE = -3
|
||||
|
||||
|
||||
def welcome(msg):
|
||||
return b"220 %b\r\n" % (msg)
|
||||
|
||||
def ok(msg = b"OK"):
|
||||
return b"250 %b\r\n" % (msg)
|
||||
|
||||
def bye():
|
||||
return b"251 Bye"
|
||||
|
||||
def provide_message():
|
||||
return b"354 Enter a message, ending it with a '.' on a line by itself\r\n"
|
||||
|
||||
def receive_and_confirm(session):
|
||||
session.recv(BUFFER_SIZE)
|
||||
session.sendall(ok())
|
||||
|
||||
def localhost_at(port):
|
||||
return ('127.0.0.1', port)
|
||||
|
||||
def serve(port):
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
try:
|
||||
s.bind(localhost_at(port))
|
||||
logging.info(f"Listening on localhost, port {port}")
|
||||
s.listen(1)
|
||||
logging.info("Listening...")
|
||||
except socket.error as e:
|
||||
print("Cannot connect", e)
|
||||
logging.error(f"Cannot connect {e}")
|
||||
sys.exit(EXIT_UNAVAILABLE)
|
||||
|
||||
logging.debug("About to accept a connection...")
|
||||
(conn, addr) = s.accept()
|
||||
logging.debug(f"Accepting connection from {conn}")
|
||||
conn.sendall(welcome(b"TEST SERVER"))
|
||||
|
||||
receive_and_confirm(conn) # Ignore HELO/EHLO
|
||||
receive_and_confirm(conn) # Ignore sender address
|
||||
receive_and_confirm(conn) # Ignore recipient address
|
||||
|
||||
conn.recv(BUFFER_SIZE)
|
||||
conn.sendall(provide_message())
|
||||
|
||||
# Consume until we get <CR><LF>.<CR><LF>, the end-of-message marker.
|
||||
message = ''
|
||||
while not message.endswith(EOM):
|
||||
message += conn.recv(BUFFER_SIZE).decode(ENCODING)
|
||||
conn.sendall(ok(b"OK, id=test"))
|
||||
|
||||
conn.recv(BUFFER_SIZE)
|
||||
conn.sendall(bye())
|
||||
|
||||
conn.close()
|
||||
|
||||
logging.debug(f"Received {len(message)} characters of data")
|
||||
|
||||
# Trim EOM marker as we're only interested in the message body.
|
||||
return message[:-len(EOM)]
|
||||
|
||||
def error(msg, exit_code):
|
||||
logging.error(msg)
|
||||
print("ERROR: %s" % (msg))
|
||||
sys.exit(exit_code)
|
||||
|
||||
|
||||
# filename is relative to where we run the tests from, i.e. the project root
|
||||
# directory
|
||||
logging.basicConfig(filename='test/logs/relay.log',
|
||||
format='%(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S',
|
||||
level=logging.DEBUG)
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
error("Usage: relay.py PORT_NUMBER", EXIT_UNAVAILABLE)
|
||||
|
||||
port = int(sys.argv[1])
|
||||
body = serve(port)
|
||||
|
||||
print(body)
|
|
@ -0,0 +1,56 @@
|
|||
import logging
|
||||
import smtplib
|
||||
import sys
|
||||
import getopt
|
||||
|
||||
|
||||
def _load_file(name):
|
||||
f = open(name, 'r')
|
||||
contents = f.read()
|
||||
f.close()
|
||||
return contents
|
||||
|
||||
|
||||
def _send(host, port, from_addr, recipients, message):
|
||||
logging.info(f"From {from_addr} to {recipients} at {host}:{port}")
|
||||
try:
|
||||
smtp = smtplib.SMTP(host, port)
|
||||
# smtp.starttls()
|
||||
return smtp.sendmail(from_addr, recipients, message)
|
||||
except smtplib.SMTPDataError as e:
|
||||
logging.error(f"Couldn't deliver message. Got error: {e}")
|
||||
return None
|
||||
except ConnectionRefusedError as e:
|
||||
logging.exception(f"Connection refused: {e}")
|
||||
return None
|
||||
|
||||
|
||||
logging.basicConfig(filename="test/logs/sendmail.log",
|
||||
format="%(asctime)s %(pathname)s:%(lineno)d %(levelname)s [%(funcName)s] %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
level=logging.DEBUG)
|
||||
|
||||
sender = recipient = message = None
|
||||
|
||||
opts, _ = getopt.getopt(sys.argv[1:], "f:t:m:")
|
||||
for opt, value in opts:
|
||||
if opt == "-f":
|
||||
sender = value
|
||||
logging.debug(f"Sender is {sender}")
|
||||
if opt == "-t":
|
||||
recipient = value
|
||||
logging.debug(f"Recipient is {recipient}")
|
||||
if opt == "-m":
|
||||
message = _load_file(value)
|
||||
logging.debug(f"Message is {message}")
|
||||
|
||||
if message is None:
|
||||
message = """\
|
||||
From: dave@disposlab
|
||||
To: alice@disposlab
|
||||
Subject: Test message
|
||||
|
||||
Lorem ipsum dolor sit amet.
|
||||
"""
|
||||
|
||||
_send('localhost', 10025, sender, [recipient], message)
|
Loading…
Reference in New Issue