Improve error-handling for simple filter and test relay
This commit is contained in:
parent
8def4b40dc
commit
8a42f3fea1
3 changed files with 87 additions and 42 deletions
|
@ -41,15 +41,20 @@ if missing_params:
|
|||
LOG.error(f"Aborting delivery! Following mandatory config parameters are missing: {missing_params!r}")
|
||||
sys.exit(lacre.EX_CONFIG)
|
||||
|
||||
# Read e-mail from stdin, parse it
|
||||
raw = sys.stdin.read()
|
||||
raw_message = email.message_from_string(raw, policy=SMTPUTF8)
|
||||
from_addr = raw_message['From']
|
||||
# Read recipients from the command-line
|
||||
to_addrs = sys.argv[1:]
|
||||
try:
|
||||
|
||||
# Let's start
|
||||
core.deliver_message(raw_message, from_addr, to_addrs)
|
||||
process_t = (time.process_time() - start) * 1000
|
||||
# Read e-mail from stdin, parse it
|
||||
raw = sys.stdin.read()
|
||||
raw_message = email.message_from_string(raw, policy=SMTPUTF8)
|
||||
from_addr = raw_message['From']
|
||||
# Read recipients from the command-line
|
||||
to_addrs = sys.argv[1:]
|
||||
|
||||
LOG.info("Message delivered in {process:.2f} ms".format(process=process_t))
|
||||
# Let's start
|
||||
core.deliver_message(raw_message, from_addr, to_addrs)
|
||||
process_t = (time.process_time() - start) * 1000
|
||||
|
||||
LOG.info("Message delivered in {process:.2f} ms".format(process=process_t))
|
||||
except:
|
||||
LOG.exception('Could not handle message')
|
||||
core.failover_delivery(raw_message, to_addrs)
|
||||
|
|
|
@ -605,6 +605,20 @@ def send_msg_bytes(message: bytes, recipients, fromaddr=None):
|
|||
LOG.info("No recipient found")
|
||||
|
||||
|
||||
def failover_delivery(message: EmailMessage, recipients):
|
||||
"""Try delivering message just one last time."""
|
||||
LOG.debug('Failover delivery')
|
||||
if message.get_content_maintype() == 'text':
|
||||
LOG.debug('Re-packing message content of: %s', message)
|
||||
message.set_content(message.get_payload())
|
||||
b = message.as_bytes(policy=SMTPUTF8)
|
||||
LOG.debug('Sending... %s', b)
|
||||
send_msg_bytes(b, recipients)
|
||||
LOG.debug('Sent!')
|
||||
else:
|
||||
LOG.warning('No failover strategy')
|
||||
|
||||
|
||||
def _is_encrypted(raw_message: EmailMessage):
|
||||
if raw_message.get_content_type() == 'multipart/encrypted':
|
||||
return True
|
||||
|
|
|
@ -17,6 +17,8 @@ import email.policy
|
|||
|
||||
|
||||
EXIT_UNAVAILABLE = 1
|
||||
EXIT_NETWORK = 2
|
||||
EXIT_UNKNOWN = 3
|
||||
|
||||
ENCODING = 'utf-8'
|
||||
|
||||
|
@ -25,69 +27,85 @@ EOM = b"\r\n.\r\n"
|
|||
LAST_LINE = -3
|
||||
|
||||
|
||||
def welcome(msg):
|
||||
def _welcome(msg):
|
||||
return b"220 %b\r\n" % (msg)
|
||||
|
||||
def ok(msg = b"OK"):
|
||||
|
||||
def _ok(msg=b"OK"):
|
||||
return b"250 %b\r\n" % (msg)
|
||||
|
||||
def bye():
|
||||
|
||||
def _bye():
|
||||
return b"251 Bye"
|
||||
|
||||
def provide_message():
|
||||
|
||||
def _provide_message():
|
||||
return b"354 Enter a message, ending it with a '.' on a line by itself\r\n"
|
||||
|
||||
def receive_and_confirm(session):
|
||||
session.recv(BUFFER_SIZE)
|
||||
session.sendall(ok())
|
||||
|
||||
def receive_and_ignore(session):
|
||||
def _receive_and_confirm(session):
|
||||
session.recv(BUFFER_SIZE)
|
||||
session.sendall(_ok())
|
||||
|
||||
|
||||
def _receive_and_ignore(session):
|
||||
session.recv(BUFFER_SIZE)
|
||||
|
||||
def localhost_at(port):
|
||||
|
||||
def _localhost_at(port):
|
||||
return ('127.0.0.1', port)
|
||||
|
||||
def serve(port) -> bytes:
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
|
||||
def _receive_bytes(conn) -> bytes:
|
||||
return conn.recv(BUFFER_SIZE)
|
||||
|
||||
|
||||
def _listen(port, sock):
|
||||
try:
|
||||
s.bind(localhost_at(port))
|
||||
logging.info(f"Listening on localhost, port {port}")
|
||||
s.listen(1)
|
||||
logging.info("Listening...")
|
||||
sock.bind(_localhost_at(port))
|
||||
sock.listen(1)
|
||||
except socket.error as e:
|
||||
print("Cannot connect", e)
|
||||
logging.error(f"Cannot connect {e}")
|
||||
sys.exit(EXIT_UNAVAILABLE)
|
||||
logging.exception('Cannot connect')
|
||||
sys.exit(EXIT_NETWORK)
|
||||
|
||||
|
||||
def _serve(port) -> bytes:
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
_listen(port, s)
|
||||
|
||||
logging.debug("About to accept a connection...")
|
||||
(conn, addr) = s.accept()
|
||||
logging.debug(f"Accepting connection from {conn}")
|
||||
conn.sendall(welcome(b"TEST SERVER"))
|
||||
conn.sendall(_welcome(b"TEST SERVER"))
|
||||
|
||||
receive_and_confirm(conn) # Ignore HELO/EHLO
|
||||
receive_and_confirm(conn) # Ignore sender address
|
||||
receive_and_confirm(conn) # Ignore recipient address
|
||||
_receive_and_confirm(conn) # Ignore HELO/EHLO
|
||||
_receive_and_confirm(conn) # Ignore sender address
|
||||
_receive_and_confirm(conn) # Ignore recipient address
|
||||
|
||||
receive_and_ignore(conn)
|
||||
conn.sendall(provide_message())
|
||||
_receive_and_ignore(conn)
|
||||
conn.sendall(_provide_message())
|
||||
|
||||
# Consume until we get <CR><LF>.<CR><LF>, the end-of-message marker.
|
||||
message = b''
|
||||
while not message.endswith(EOM):
|
||||
message += conn.recv(BUFFER_SIZE)
|
||||
conn.sendall(ok(b"OK, id=test"))
|
||||
buf = _receive_bytes(conn)
|
||||
logging.debug('Received data: %s', buf)
|
||||
message += buf
|
||||
conn.sendall(_ok(b"OK, id=test"))
|
||||
|
||||
conn.recv(BUFFER_SIZE)
|
||||
conn.sendall(bye())
|
||||
conn.sendall(_bye())
|
||||
|
||||
conn.close()
|
||||
|
||||
logging.debug(f"Received {len(message)} bytes of data")
|
||||
logging.debug('Received %d bytes of data', len(message))
|
||||
|
||||
# Trim EOM marker as we're only interested in the message body.
|
||||
return message[:-len(EOM)]
|
||||
|
||||
def error(msg, exit_code):
|
||||
|
||||
def _error(msg, exit_code):
|
||||
logging.error(msg)
|
||||
print("ERROR: %s" % (msg))
|
||||
sys.exit(exit_code)
|
||||
|
@ -101,10 +119,18 @@ logging.basicConfig(filename='test/logs/relay.log',
|
|||
level=logging.DEBUG)
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
error("Usage: relay.py PORT_NUMBER", EXIT_UNAVAILABLE)
|
||||
_error("Usage: relay.py PORT_NUMBER", EXIT_UNAVAILABLE)
|
||||
|
||||
port = int(sys.argv[1])
|
||||
body = serve(port)
|
||||
|
||||
msg = email.message_from_bytes(body, policy=email.policy.SMTP)
|
||||
print(msg)
|
||||
try:
|
||||
body = _serve(port)
|
||||
logging.debug('Parsing message')
|
||||
msg = email.message_from_bytes(body, policy=email.policy.SMTP)
|
||||
print(msg)
|
||||
except ConnectionResetError:
|
||||
logging.exception('Communication issue')
|
||||
_error('Could not receive complete message', EXIT_NETWORK)
|
||||
except BrokenPipeError:
|
||||
logging.exception('Pipe error')
|
||||
_error('Pipe error', EXIT_UNKNOWN)
|
||||
|
|
Loading…
Reference in a new issue