sorryops/server.py
2024-04-29 02:30:45 -03:00

239 lines
9.1 KiB
Python

# SPDX-FileCopyrightText: 2024 Egor Guslyancev <electromagneticcyclone@disroot.org>
# SPDX-FileCopyrightText: 2023 Miel Donkers
#
# SPDX-License-Identifier: AGPL-3.0-or-later
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
import sys
from threading import Thread
from os import listdir
from time import sleep, time
from markdown import markdown
import db_classes
import timeout as tmo
CHARSET = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
db = db_classes.PickleDB(".db")
db.load()
def resolve_logs(tries = 3, backup = True):
print("Resolving…")
if tries < 1:
try:
db.pop("pending")
except KeyError:
pass
print("Resolved")
return
pending = db.read("pending", [])
if len(pending) == 0:
print("Resolved")
return
if backup:
db.save(f"backup{str(int(time()))}.db")
for test in pending:
logs = db.pop(f"tests.{test}.logs")
if logs is None:
continue
correct = db.pop(f"tests.{test}.correct")
if correct is None:
correct = {}
incorrect = db.pop(f"tests.{test}.incorrect")
if incorrect is None:
incorrect = {}
for l in logs:
for a in l['answers']:
if a[0] in correct:
if a[2] == correct[a[0]]:
l['correct'] -= 1
a[0] = '-'
if a[0] in incorrect:
if a[2] in incorrect[a[0]]:
a[0] = '-'
l['answers'] = list(filter(lambda a: a[0] != '-', l['answers']))
if len(l['answers']) == l['correct']:
for a in l['answers']:
correct[a[0]] = a[2]
if a[0] in incorrect:
incorrect.pop(a[0])
l['answers'] = []
elif l['correct'] == 0:
for a in l['answers']:
if a[0] not in incorrect:
incorrect[a[0]] = []
incorrect[a[0]] = list(set(incorrect[a[0]] + [a[2]]))
# TODO implement for checkboxes and text
if a[2][0] != "[":
if a[2][0] != "{":
if len(incorrect[a[0]]) == (a[3] - 1):
for c in CHARSET[:a[3]]:
if c not in incorrect[a[0]]:
correct[a[0]] = c
break
incorrect.pop(a[0])
l['answers'] = []
logs = list(filter(lambda l: len(l['answers']) != 0, logs))
new_logs = db.read(f"tests.{test}.logs")
if new_logs is None:
new_logs = []
db.write(f"tests.{test}.logs", logs + new_logs)
db.write(f"tests.{test}.correct", correct)
db.write(f"tests.{test}.incorrect", incorrect)
resolve_logs(tries - 1, False)
def parse_request(r):
try:
results = json.loads(r)
rtype = results['type']
test_id = results['id']
user_id = results['uid']
blacklist = db.read('users.blacklist', set())
vip = db.read('users.vip', set())
if user_id in blacklist:
return
match (rtype):
case "test_results":
log = {
'answers': results['answers'],
'correct': int(results['correct']),
}
logs = db.read(f'tests.{test_id}.logs', [])
logs = logs + [log]
db.write(f'tests.{test_id}.logs', logs)
pending = db.read('pending', set())
pending = set(list(pending) + [test_id])
db.write('pending', pending)
return 202
case "add_vip":
if user_id not in vip:
return 403
vip = set(list(vip) + [results['user']])
db.write('users.vip', vip)
case "del_vip":
if user_id not in vip:
return 403
vip.remove(results['user'])
db.write('users.vip', vip)
case "add_blacklist":
if user_id not in vip:
return 403
blacklist = set(list(blacklist) + [results['user']])
db.write('users.blacklist', blacklist)
case "del_blacklist":
if user_id not in vip:
return 403
blacklist.remove(results['user'])
db.write('users.blacklist', blacklist)
case _:
raise KeyError()
except KeyError:
print("Invalid request")
return 400
except:
print("Something bad happend")
return 400
class S(BaseHTTPRequestHandler):
def _set_response(self, status):
self.send_response(status)
def do_GET(self):
match (self.path):
case "/":
self._set_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
# TODO Autodetect browser language
with open("README.ru.md", "r", encoding='utf-8') as fi:
self.wfile.write(
markdown(fi.read(), extensions=['fenced_code', 'codehilite'])
.encode('utf-8'))
case "/license":
self._set_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
for i in listdir("LICENSES"):
with open(f"LICENSES/{i}", "r", encoding='utf-8') as fi:
self.wfile.write(fi.read().encode('utf-8'))
self.wfile.write("-----------------------------------".encode('utf-8'))
case "/source":
self.send_response(301)
self.send_header(
'Location',
'https://git.disroot.org/electromagneticcyclone/sorryops'
)
self.end_headers()
case _ if self.path.startswith("/TOS."):
self._set_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
path = self.path[1:] + ("" if self.path.endswith(".md") else ".md")
if path in listdir("./"):
with open(path, "r", encoding='utf-8') as fi:
self.wfile.write(
markdown(fi.read(), extensions=['fenced_code', 'codehilite'])
.encode('utf-8')
)
else:
self._set_response(404)
self.end_headers()
self.wfile.write("404 Not found".encode('utf-8'))
case _ if self.path.startswith("/assets/"):
self._set_response(200)
with open(self.path[1:], "rb") as fi:
data = fi.read()
self.send_header('Accept-Ranges', 'bytes')
self.send_header('Content-Disposition', 'attachment')
self.send_header('Content-Length', len(data))
self.end_headers()
self.wfile.write(data)
case _:
db_path = 'tests.' + '.'.join(self.path[1:].split('/'))
data = db.read(db_path, None)
if data is None:
self._set_response(404)
self.end_headers()
self.wfile.write("404 Not found".encode('utf-8'))
else:
send_data = dict()
if 'correct' in data:
send_data['correct'] = data['correct']
if 'incorrect' in data:
send_data['incorrect'] = data['incorrect']
self._set_response(200)
self.send_header('Content-type', 'text/json; charset=utf-8')
self.end_headers()
self.wfile.write(json.dumps(send_data).encode('utf-8'))
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length).decode('utf-8')
self._set_response(parse_request(post_data))
self.wfile.write(f"POST request for {self.path}".encode('utf-8'))
def run_resolver():
p2_tmo = tmo.Timeout(2 * 60)
while True:
p2_tmo.check(resolve_logs, lambda: None)
def run(server_class=HTTPServer, handler_class=S, port=8000):
server_address = ('127.0.0.1', port)
httpd = server_class(server_address, handler_class)
funcs = [httpd.serve_forever, run_resolver]
threads = map(lambda x: Thread(target=x), funcs)
for thread in threads:
thread.daemon = True
thread.start()
try:
while True:
sleep(1)
except KeyboardInterrupt:
pass
for thread in threads:
thread.stop()
httpd.server_close()
if __name__ == '__main__' and sys.flags.interactive == 0:
run()