oxen-pyoxenmq/examples/exit_auth.py

93 lines
2.2 KiB
Python
Raw Normal View History

import pylokimq
import base64
import subprocess
import shlex
import argparse
import io
import time
2020-09-14 15:12:51 +02:00
def decode_str(data, first=None):
2020-09-14 15:31:00 +02:00
l = b''
2020-09-14 15:12:51 +02:00
if first is not None:
2020-09-14 15:31:00 +02:00
l += first
while True:
ch = data.read(1)
if ch == b':':
2020-09-14 15:34:37 +02:00
return data.read(int(l.decode('ascii')))
else:
2020-09-14 15:31:00 +02:00
l += ch
2020-09-14 15:12:51 +02:00
def decode_list(data, first=None):
l = []
while True:
2020-09-14 15:12:51 +02:00
ch = data.read(1)
if ch == b'e':
return l
2020-09-14 15:12:51 +02:00
l += decode_value(data, first=ch)
2020-09-14 15:12:51 +02:00
def decode_dict(data, first=None):
d = dict()
while True:
2020-09-14 15:12:51 +02:00
ch = data.read(1)
if ch == b'e':
return d
2020-09-14 15:12:51 +02:00
k = decode_str(data, first=ch)
2020-09-14 15:31:00 +02:00
v = decode_value(data)
d[k] = v
2020-09-14 15:12:51 +02:00
def decode_int(data, first=None):
2020-09-14 15:31:00 +02:00
i = b''
while True:
ch = data.read(1)
if ch == b'e':
2020-09-14 15:31:00 +02:00
return int(i.decode('ascii'))
i += ch
2020-09-14 15:12:51 +02:00
def decode_value(data, first=None):
if first:
ch = first
else:
ch = data.read(1)
if ch in b'0123456789':
2020-09-14 15:31:00 +02:00
return decode_str(data, first=ch)
if ch == b'i':
2020-09-14 15:12:51 +02:00
return decode_int(data, first=ch)
if ch == b'd':
2020-09-14 15:12:51 +02:00
return decode_dict(data, first=ch)
if ch == b'l':
2020-09-14 15:12:51 +02:00
return decode_list(data, first=ch)
raise Exception("invalid char: {}".format(ch))
def decode_address(data):
return '{}.loki'.format(pylokimq.base32z_encode(decode_dict(data)[b's'][b's']))
def handle_auth(args, cmd):
2020-09-14 15:12:51 +02:00
cmd2 = cmd
cmd2 += decode_address(io.BytesIO(args[0]))
cmd2 += base64.b64encode(args[1]).decode('ascii')
result = subprocess.run(args=cmd2, check=False)
if result.returncode == 0:
2020-09-14 15:12:51 +02:00
return "OKAY"
else:
return "REJECT"
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--bind", required=True, help="url to bind auth socket to")
ap.add_argument("--cmd", required=True, help="script to call for authentication")
args = ap.parse_args()
cmd = shlex.split(args.cmd)
lmq = pylokimq.LokiMQ()
lmq.listen_plain(args.bind)
lmq.add_anonymous_category("llarp")
lmq.add_request_command("llarp", "auth", lambda x : handle_auth(x, cmd))
lmq.start()
print("server started")
while True:
time.sleep(1)
if __name__ == '__main__':
main()