549 lines
20 KiB
Python
Executable File
549 lines
20 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
from secrets import token_urlsafe
|
|
from sys import argv
|
|
from sys import exit as sysexit
|
|
from tempfile import TemporaryDirectory
|
|
|
|
import gpg
|
|
from gpg.gpgme import GPG_ERR_NO_ERROR, GPGME_DELETE_FORCE, gpgme_op_delete_ext
|
|
|
|
import websocket
|
|
from database import DataBase
|
|
|
|
NONCE_BYTES = 128
|
|
ADMIN_ACCESS = 100
|
|
|
|
|
|
class JsonMissingFieldException(Exception):
|
|
def __init__(self, missing):
|
|
self.missing = missing
|
|
|
|
|
|
class RequestHandler(BaseHTTPRequestHandler):
|
|
_RE_FILES = re.compile(
|
|
r"^(/|/admin\.html|/data\.html|/forms\.html|/config\.js|/public\.js|/registered\.js|/openpgp\.min\.js)$"
|
|
)
|
|
_RE_FORM_NAME = re.compile(r"^([-_0-9a-zA-Z]{1,64})$")
|
|
_RE_KEY_FINGERPRINT = re.compile(r"^([0-9a-fA-F]{40})$")
|
|
_RE_PGP_JSON_REQUEST_FIELD = re.compile(r"^[0-9a-zA-Z_/]{3,100}$")
|
|
_VALID_CONTENT_SUBTYPES = {"html", "javascript", "json", "plain"}
|
|
|
|
def _api_answer_pgp_json(self, payload):
|
|
request_answer = websocket.encrypt_pgp_json(
|
|
{"request": self._pgpjson_pending_request, "payload": payload},
|
|
self._pgpjson_client_key,
|
|
self._gpg_context,
|
|
)
|
|
websocket.send_message(self.wfile, request_answer)
|
|
self._close_websocket()
|
|
|
|
def _api_collect_data(self):
|
|
form_name = re.match(
|
|
self._RE_FORM_NAME, self.path.replace("/post/", "")
|
|
)
|
|
if form_name:
|
|
secret = self._read().decode()
|
|
self.server.db.store_form_data(form_name.group(1), secret)
|
|
self.send_response(200)
|
|
self.end_headers()
|
|
|
|
def _api_get_collected_data(self):
|
|
form_name = re.match(
|
|
self._RE_FORM_NAME,
|
|
self._pgpjson_pending_request.replace("data/get/", ""),
|
|
)
|
|
if form_name:
|
|
form_name = form_name.group(1)
|
|
form_users = self.server.db.get_form_keys_fingerprints(form_name)
|
|
if (
|
|
self._pgpjson_user["fingerprint"]
|
|
not in form_users["fingerprints"]
|
|
):
|
|
self._close_websocket(1008, "Access denied.")
|
|
else:
|
|
collected_data = self.server.db.get_collected_data(form_name)
|
|
self._api_answer_pgp_json(collected_data)
|
|
else:
|
|
self._close_websocket(1002, "Invalid form name.")
|
|
|
|
def _api_get_form_keys_fingerprints(self):
|
|
if self._pgpjson_user["access_level"] != ADMIN_ACCESS:
|
|
self._close_websocket(1008, "Access denied.")
|
|
return
|
|
|
|
form_name = re.match(
|
|
self._RE_FORM_NAME,
|
|
self._pgpjson_pending_request.replace("form/get/", ""),
|
|
)
|
|
if form_name:
|
|
form_key_list = self.server.db.get_form_keys_fingerprints(
|
|
form_name.group(1)
|
|
)
|
|
self._api_answer_pgp_json(form_key_list)
|
|
else:
|
|
self._close_websocket(1002, "Invalid form name.")
|
|
|
|
def _api_get_user(self):
|
|
user_fingerprint = re.match(
|
|
self._RE_KEY_FINGERPRINT,
|
|
self._pgpjson_pending_request.replace("user/", ""),
|
|
)
|
|
if user_fingerprint:
|
|
fpr = user_fingerprint.group(1)
|
|
if (
|
|
self._pgpjson_user["access_level"] == ADMIN_ACCESS
|
|
or self._pgpjson_user["fingerprint"] == fpr
|
|
):
|
|
user = self.server.db.get_user(fpr)
|
|
self._api_answer_pgp_json(user)
|
|
else:
|
|
self._close_websocket(1008, "Access denied.")
|
|
elif self._pgpjson_pending_request == "user/all":
|
|
if self._pgpjson_user["access_level"] == ADMIN_ACCESS:
|
|
users = self.server.db.get_user()
|
|
self._api_answer_pgp_json(users)
|
|
else:
|
|
self._close_websocket(1008, "Access denied.")
|
|
else:
|
|
self._close_websocket(1002, "Invalid user fingerprint.")
|
|
|
|
def _api_get_user_key(self, fingerprints=None):
|
|
keys = list()
|
|
if fingerprints is None:
|
|
for user in self.server.db.get_user():
|
|
keys.append(
|
|
{
|
|
"fingerprint": user["fingerprint"],
|
|
"armored_key": (
|
|
self._gpg_context.key_export(user["fingerprint"])
|
|
).decode(),
|
|
}
|
|
)
|
|
else:
|
|
for fpr in fingerprints:
|
|
if self.server.db.get_user(fpr) is not None:
|
|
keys.append(
|
|
{
|
|
"fingerprint": fpr,
|
|
"armored_key": (
|
|
self._gpg_context.key_export(fpr)
|
|
).decode(),
|
|
}
|
|
)
|
|
else:
|
|
self._close_websocket(1008, "Asking for an unknown key.")
|
|
return
|
|
self._api_answer_pgp_json(keys)
|
|
|
|
def _api_register_user(self):
|
|
try:
|
|
req = self._read_json(["pubKey"])
|
|
except JsonMissingFieldException as missing_field:
|
|
self._api_register_user_answer(
|
|
400, f"Missing {missing_field.missing}."
|
|
)
|
|
return
|
|
results = self.server.gpg_context.key_import(req["pubKey"].encode())
|
|
if results == "IMPORT_PROBLEM" or not results.considered:
|
|
self._api_register_user_answer(400, "Invalid pubKey.")
|
|
elif results.imported:
|
|
access_level = (
|
|
ADMIN_ACCESS
|
|
if len(self.server.db.get_user())
|
|
== 0 # first user get admin access level
|
|
else 0
|
|
)
|
|
self.server.db.add_user(results.imports[0].fpr, access_level)
|
|
print(f"Imported key {results.imports[0].fpr}.")
|
|
self._api_register_user_answer(201, "Key registered.")
|
|
elif results.unchanged:
|
|
self._api_register_user_answer(200, "Key already on server.")
|
|
else:
|
|
self._api_register_user_answer(418, "What happened?")
|
|
print(
|
|
f"Tried to add following pubKey and got strange results:\n{req['pubKey']}\n\n{results}"
|
|
)
|
|
|
|
def _api_register_user_answer(self, code: int, message: str):
|
|
self.send_response(code)
|
|
self._set_content_type("json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"message": message}).encode())
|
|
|
|
def _api_set_collected_data(self):
|
|
if self._pgpjson_user["access_level"] != ADMIN_ACCESS:
|
|
self._close_websocket(1008, "Access denied.")
|
|
return
|
|
|
|
form_name = re.match(
|
|
self._RE_FORM_NAME,
|
|
self._pgpjson_pending_request.replace("data/set/", ""),
|
|
)
|
|
if form_name:
|
|
if self._pgpjson_payload is None:
|
|
self._close_websocket(1002, "Missing payload.")
|
|
return
|
|
for data in self._pgpjson_payload:
|
|
if "secret" not in data or "id" not in data:
|
|
self._close_websocket(1002, "Invalid payload")
|
|
return
|
|
elif not data["secret"].startswith(
|
|
"-----BEGIN PGP MESSAGE-----"
|
|
):
|
|
self._close_websocket(1002, "Invalid PGP message")
|
|
return
|
|
self.server.db.set_collected_data(
|
|
form_name.group(1), self._pgpjson_payload
|
|
)
|
|
self._close_websocket()
|
|
else:
|
|
self._close_websocket(1002, "Invalid form name.")
|
|
|
|
def _api_set_form_keys_fingerprints(self):
|
|
if self._pgpjson_user["access_level"] != ADMIN_ACCESS:
|
|
self._close_websocket(1008, "Access denied.")
|
|
return
|
|
|
|
form_name = re.match(
|
|
self._RE_FORM_NAME,
|
|
self._pgpjson_pending_request.replace("form/set/", ""),
|
|
)
|
|
if form_name:
|
|
if (
|
|
self._pgpjson_payload is not None
|
|
and "fingerprints" in self._pgpjson_payload
|
|
and isinstance(self._pgpjson_payload["fingerprints"], list)
|
|
):
|
|
self.server.db.set_form_keys_fingerprints(
|
|
form_name.group(1), self._pgpjson_payload["fingerprints"]
|
|
)
|
|
self._close_websocket()
|
|
else:
|
|
self._close_websocket(1002, "Invalid request payload.")
|
|
else:
|
|
self._close_websocket(1002, "Invalid form name.")
|
|
|
|
def _api_unregister_user(self):
|
|
if self._pgpjson_user["access_level"] == ADMIN_ACCESS:
|
|
self._close_websocket(1008, "Cannot delete admin account.")
|
|
return
|
|
result = gpgme_op_delete_ext(
|
|
self._gpg_context.wrapped,
|
|
self._pgpjson_client_key,
|
|
GPGME_DELETE_FORCE,
|
|
)
|
|
if result == GPG_ERR_NO_ERROR:
|
|
self.server.db.delete_user(self._pgpjson_client_key.fpr)
|
|
print(f"Key {self._pgpjson_client_key.fpr} deleted.")
|
|
self._close_websocket()
|
|
else:
|
|
print(
|
|
f"Failed to delete key {self._pgpjson_client_key.fpr}, status code: {result}."
|
|
)
|
|
self._close_websocket(1011, "Failed to delete key.")
|
|
|
|
def _close_websocket(self, code=1000, reason=None):
|
|
websocket.close(self.wfile, code, reason)
|
|
self._websocket_connected = False
|
|
print(
|
|
f"Closed WebSocket connection with {self.client_address[0]}:{self.client_address[1]}"
|
|
)
|
|
if code != 1000:
|
|
print(f"Code: {code} reason: {reason}")
|
|
|
|
def _handle_api_request(self, message: websocket.WebSocketMessage):
|
|
signature_key = (
|
|
self._pgpjson_client_key
|
|
if self._pgpjson_client_key is not None
|
|
else None
|
|
)
|
|
json_, signature = websocket.decrypt_pgp_json(
|
|
self.wfile, self._gpg_context, message, signature_key
|
|
)
|
|
|
|
if "request" not in json_ or not re.match(
|
|
self._RE_PGP_JSON_REQUEST_FIELD, json_["request"]
|
|
):
|
|
self._close_websocket(1002, "Missing or invalid request field.")
|
|
return
|
|
|
|
elif self._pgpjson_nonce is None:
|
|
self._pgpjson_pending_request = json_["request"]
|
|
if "payload" in json_:
|
|
self._pgpjson_payload = json_["payload"]
|
|
self._pgpjson_client_key = self._gpg_context.get_key(
|
|
signature[0].fpr
|
|
)
|
|
self._request_signature()
|
|
return
|
|
elif "nonce" not in json_ or json_["nonce"] != self._pgpjson_nonce:
|
|
self._close_websocket(1002, "Authentication failed.")
|
|
return
|
|
|
|
self._pgpjson_user = self.server.db.get_user(
|
|
self._pgpjson_client_key.fpr
|
|
)
|
|
|
|
# API "routes"
|
|
if self._pgpjson_pending_request.startswith("data/get/"):
|
|
self._api_get_collected_data()
|
|
elif self._pgpjson_pending_request.startswith("data/set/"):
|
|
self._api_set_collected_data()
|
|
elif self._pgpjson_pending_request.startswith("form/get/"):
|
|
self._api_get_form_keys_fingerprints()
|
|
elif self._pgpjson_pending_request.startswith("form/set/"):
|
|
self._api_set_form_keys_fingerprints()
|
|
elif self._pgpjson_pending_request == "key/delete":
|
|
self._api_unregister_user()
|
|
elif self._pgpjson_pending_request.startswith("user/"):
|
|
self._api_get_user()
|
|
else:
|
|
print(self._pgpjson_pending_request)
|
|
self._close_websocket(1002, "API function undefined.")
|
|
|
|
def _handle_websocket_request(self):
|
|
self._pgpjson_client_key = None
|
|
self._pgpjson_nonce = None
|
|
self._pgpjson_pending_request = None
|
|
self._pgpjson_payload = None
|
|
self._pgpjson_user = None
|
|
self._gpg_context = gpg.Context(
|
|
armor=True,
|
|
home_dir=self.server.gpg_context.home_dir,
|
|
offline=True,
|
|
signers=[self.server.key],
|
|
)
|
|
|
|
try:
|
|
proto = websocket.handshake(self, ["pgp-json"])
|
|
if proto is None:
|
|
self._close_websocket(1002, "I only speak pgp-json")
|
|
return
|
|
self._websocket_connected = True
|
|
print(
|
|
f"WebSocket connection with {self.client_address[0]}:{self.client_address[1]}"
|
|
)
|
|
except websocket.HandshakeError as error:
|
|
print(
|
|
f"WebSocket handshake failed. {error.get_reason(error.why)}: {error.what}={error.value}"
|
|
)
|
|
return
|
|
|
|
try:
|
|
while self._websocket_connected:
|
|
message = websocket.read_next_message(self.rfile, self.wfile)
|
|
self._handle_api_request(message)
|
|
except websocket.WebSocketCloseException as close:
|
|
print(
|
|
f"WebSocket closed with code {close.code}, reason {close.reason}"
|
|
)
|
|
|
|
def _read(self):
|
|
if not self.headers["Content-Length"]:
|
|
return ""
|
|
length = int(self.headers["Content-Length"])
|
|
return self.rfile.read(length)
|
|
|
|
def _read_json(self, required_fields=list()):
|
|
try:
|
|
data = json.loads(self._read())
|
|
except json.decoder.JSONDecodeError:
|
|
data = {}
|
|
for f in required_fields:
|
|
if f not in data:
|
|
raise JsonMissingFieldException(missing=f)
|
|
return data
|
|
|
|
def _request_signature(self):
|
|
self._pgpjson_nonce = token_urlsafe(NONCE_BYTES)
|
|
signature_request = websocket.encrypt_pgp_json(
|
|
{
|
|
"nonce": self._pgpjson_nonce,
|
|
"request": self._pgpjson_pending_request,
|
|
},
|
|
self._pgpjson_client_key,
|
|
self._gpg_context,
|
|
)
|
|
websocket.send_message(self.wfile, signature_request)
|
|
|
|
def _serve_err404(self):
|
|
self.send_response(404)
|
|
self._set_content_type("plain")
|
|
self.end_headers()
|
|
self.wfile.write("Not found".encode())
|
|
|
|
def _serve_file(self, filepath):
|
|
if filepath == "/":
|
|
filepath = "register.html"
|
|
else:
|
|
filepath = filepath[1:]
|
|
self.send_response(200)
|
|
if filepath.endswith("html"):
|
|
self._set_content_type("html")
|
|
else:
|
|
self._set_content_type("javascript")
|
|
self.end_headers()
|
|
# assume files are utf-8 encoded
|
|
with open(filepath, "rb") as f:
|
|
output = f.read()
|
|
self.wfile.write(output)
|
|
|
|
def _serve_form_keys(self):
|
|
form_name = re.match(
|
|
self._RE_FORM_NAME, self.path.replace("/formkeys/", "")
|
|
)
|
|
if form_name:
|
|
form_name = form_name.group(1)
|
|
keys = []
|
|
keylist = self.server.db.get_form_keys_fingerprints(form_name)
|
|
for fpr in keylist["fingerprints"]:
|
|
keys.append(self.server.gpg_context.key_export(fpr).decode())
|
|
self.send_response(200)
|
|
self._set_cors()
|
|
self._set_content_type("json")
|
|
self.end_headers()
|
|
self.wfile.write(
|
|
json.dumps({"form": form_name, "keys": keys}).encode()
|
|
)
|
|
else:
|
|
self._serve_err404()
|
|
|
|
def _set_content_type(self, subtype):
|
|
if subtype not in self._VALID_CONTENT_SUBTYPES:
|
|
raise ValueError(
|
|
f"RequestHandler._set_content_type: _type must be one of {self._VALID_CONTENT_SUBTYPES}."
|
|
)
|
|
_type = "text"
|
|
if subtype == "json":
|
|
_type = "application"
|
|
self.send_header("Content-Type", f"{_type}/{subtype}; charset=utf-8")
|
|
|
|
def _set_cors(self):
|
|
self.send_header("Access-Control-Allow-Origin", "*")
|
|
|
|
def do_GET(self):
|
|
filepath = re.match(self._RE_FILES, self.path)
|
|
if self.headers.get("Upgrade") == "websocket":
|
|
self._handle_websocket_request()
|
|
elif self.path.startswith("/formkeys/"):
|
|
self._serve_form_keys()
|
|
elif self.path == "/key/srv":
|
|
# May raise GPGMEerror
|
|
key = self.server.gpg_context.key_export(
|
|
self.server.db.get_config("server_key")
|
|
)
|
|
self.send_response(200)
|
|
self._set_cors()
|
|
self._set_content_type("plain")
|
|
self.end_headers()
|
|
self.wfile.write(key)
|
|
elif self.path.startswith("/key/"):
|
|
key_fingerprint = re.match(
|
|
self._RE_KEY_FINGERPRINT, self.path.replace("/key/", "")
|
|
)
|
|
key = self.server.gpg_context.key_export(key_fingerprint.group(1))
|
|
key = key if key is not None else b"Unknown"
|
|
self.send_response(200)
|
|
self._set_cors()
|
|
self._set_content_type("plain")
|
|
self.end_headers()
|
|
self.wfile.write(key)
|
|
elif filepath:
|
|
self._serve_file(filepath.group(1))
|
|
else:
|
|
self._serve_err404()
|
|
|
|
def do_POST(self):
|
|
if self.path == "/key/add":
|
|
self._api_register_user()
|
|
elif self.path.startswith("/post/"):
|
|
self._api_collect_data()
|
|
else:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
|
|
|
|
class Server(HTTPServer):
|
|
def __init__(self, listen_to, gpg_home, db):
|
|
self.gpg_context = gpg.Context(
|
|
armor=True, home_dir=gpg_home, offline=True
|
|
)
|
|
self.db = DataBase(db)
|
|
self.initSrvKeys()
|
|
super().__init__(listen_to, RequestHandler)
|
|
|
|
def initSrvKeys(self):
|
|
fingerprint = self.db.get_config("server_key")
|
|
if not fingerprint:
|
|
try:
|
|
result = self.gpg_context.create_key(
|
|
userid="PoC server",
|
|
algorithm="ed25519",
|
|
expires=False,
|
|
sign=True,
|
|
)
|
|
fingerprint = result.fpr
|
|
result = self.gpg_context.create_subkey(
|
|
key=self.gpg_context.get_key(fingerprint),
|
|
algorithm="cv25519",
|
|
expires=False,
|
|
encrypt=True,
|
|
)
|
|
self.db.set_config("server_key", fingerprint)
|
|
self.key = self.gpg_context.get_key(fingerprint)
|
|
except gpg.errors.GPGMEError as error:
|
|
print("Failed to create server key.")
|
|
sysexit(error)
|
|
except gpg.errors.KeyError as error:
|
|
print("Can't find the key I just created... wtf?")
|
|
sysexit(error)
|
|
else:
|
|
try:
|
|
self.key = self.gpg_context.get_key(fingerprint)
|
|
except KeyError as error:
|
|
print(f"Cannot find server key {fingerprint}.")
|
|
sysexit(error)
|
|
print(f"Loaded server key {fingerprint}.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
def run(listen_to, gpg_home, db):
|
|
httpd = Server(listen_to, gpg_home, db)
|
|
try:
|
|
print(
|
|
f"Starting server… listening to {listen_to[0]}:{listen_to[1]} with keystore {gpg_home}, db {db}"
|
|
)
|
|
httpd.serve_forever()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
print("Stopping server")
|
|
httpd.server_close()
|
|
|
|
datadir = None
|
|
host = "127.0.0.1"
|
|
port = 8000
|
|
if len(argv) > 3:
|
|
datadir = argv[3]
|
|
if len(argv) > 2:
|
|
host = argv[2]
|
|
if len(argv) > 1:
|
|
port = int(argv[1])
|
|
|
|
if not datadir:
|
|
with TemporaryDirectory(prefix="websocket-pgp-json") as tempdir:
|
|
run((host, port), tempdir, ":memory:")
|
|
else:
|
|
gpg_home = os.path.join(datadir, "gpg")
|
|
db = os.path.join(datadir, "sqlite.db")
|
|
try:
|
|
os.mkdir(gpg_home)
|
|
except FileExistsError:
|
|
print("Using existing GPG home dir.")
|
|
run((host, port), gpg_home, db)
|