sorryops/server.py

290 lines
11 KiB
Python

# SPDX-FileCopyrightText: 2024 Egor Guslyancev <electromagneticcyclone@disroot.org>
# SPDX-FileCopyrightText: 2023 Miel Donkers
#
# SPDX-License-Identifier: AGPL-3.0-or-later
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
import sys
from threading import Thread
from os import listdir
from time import sleep, time
from itertools import combinations
from markdown import markdown
import db_classes
import timeout as tmo
CHARSET = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
db = db_classes.PickleDB(".db")
db.load()
def comb(s):
result = []
for r in range(1, len(s) + 1):
result.extend([''.join(p) for p in combinations(s, r)])
return result
def resolve_logs(tries = 3, backup = True, backup_name = None):
print("Resolving…")
if tries < 1:
try:
db.pop("pending")
except KeyError:
pass
print("Resolved")
return
pending = db.read("pending", [])
if len(pending) == 0:
print("Resolved")
return
if backup:
backup_name = f"backup{str(int(time()))}.db"
db.save()
try:
for test in pending:
logs = db.pop(f"tests.{test}.logs")
if logs is None:
continue
correct = db.pop(f"tests.{test}.correct")
if correct is None:
correct = {}
incorrect = db.pop(f"tests.{test}.incorrect")
if incorrect is None:
incorrect = {}
for l in logs:
if 'shadow' not in l:
l['shadow'] = l['answers']
for a in l['answers']:
if a == "dummy":
continue
if a[0] in correct:
if a[2] == correct[a[0]]:
l['correct'] -= 1
a[0] = '-'
if a[0] in incorrect:
if a[2] in incorrect[a[0]]:
a[0] = '-'
l['answers'] = list(filter(lambda a: a[0] != '-', l['answers']))
if len(l['answers']) == l['correct']:
for a in l['answers']:
if a == "dummy":
continue
correct[a[0]] = a[2]
if a[0] in incorrect:
incorrect.pop(a[0])
l['answers'] = []
elif l['correct'] == 0:
for a in l['answers']:
if a == "dummy":
continue
if a[0] not in incorrect:
incorrect[a[0]] = []
incorrect[a[0]] = list(set(incorrect[a[0]] + [a[2]]))
match a[2][0]:
case "[":
pass
case "{":
if len(incorrect[a[0]]) == (2**a[3] - 2): # Exclude empty and unknown
for c in comb(CHARSET[:a[3]]):
if c not in incorrect[a[0]]:
correct[a[0]] = c
break
incorrect.pop(a[0])
case _:
if len(incorrect[a[0]]) == (a[3] - 1):
for c in CHARSET[:a[3]]:
if c not in incorrect[a[0]]:
correct[a[0]] = c
break
incorrect.pop(a[0])
l['answers'] = []
elif l['correct'] < 0:
for a in l['shadow']:
if a[0] in incorrect:
incorrect.pop(a[0])
if a[0] in correct:
correct.pop(a[0])
l['answers'] = l['shadow']
logs = list(filter(lambda l: (len(l['answers']) != l['answers'].count("dummy")), logs))
new_logs = db.read(f"tests.{test}.logs")
if new_logs is None:
new_logs = []
db.write(f"tests.{test}.logs", logs + new_logs)
db.write(f"tests.{test}.correct", correct)
db.write(f"tests.{test}.incorrect", incorrect)
except:
print("Something really bad happend. Recovering from backup")
if backup_name is not None:
db.load(backup_name)
resolve_logs(tries - 1, False, backup_name)
def parse_request(r):
try:
results = json.loads(r)
for field in ['type', 'id', 'uid', 'answers', 'correct', 'all']:
if field not in results:
return 400
rtype = results['type']
test_id = results['id']
user_id = results['uid']
blacklist = db.read('users.blacklist', set())
vip = db.read('users.vip', set())
if user_id in blacklist:
return 403
match rtype:
case "test_results":
answers = results['answers']
all_answ = int(results['all'])
while len(answers) != all_answ:
answers.append("dummy")
log = {
'answers': answers,
'shadow': answers,
'correct': int(results['correct']),
}
logs = db.read(f'tests.{test_id}.logs', [])
logs = logs + [log]
db.write(f'tests.{test_id}.logs', logs)
pending = db.read('pending', set())
pending = set(list(pending) + [test_id])
db.write('pending', pending)
contributors = db.read('contributors', set())
contributors = set(list(contributors) + [user_id])
db.write('contributors', contributors)
return 202
case "add_vip":
if user_id not in vip:
return 403
vip = set(list(vip) + [results['user']])
db.write('users.vip', vip)
case "del_vip":
if user_id not in vip:
return 403
vip.remove(results['user'])
db.write('users.vip', vip)
case "add_blacklist":
if user_id not in vip:
return 403
blacklist = set(list(blacklist) + [results['user']])
db.write('users.blacklist', blacklist)
case "del_blacklist":
if user_id not in vip:
return 403
blacklist.remove(results['user'])
db.write('users.blacklist', blacklist)
case _:
raise KeyError()
except KeyError:
print("Invalid request")
return 400
except json.decoder.JSONDecodeError:
print("Bad request")
return 400
except:
print("Something bad happend")
return 400
class S(BaseHTTPRequestHandler):
def _set_response(self, status):
self.send_response(status)
def do_GET(self):
match self.path:
case "/":
self._set_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
# TODO Autodetect browser language
with open("README.ru.md", "r", encoding='utf-8') as fi:
self.wfile.write(
markdown(fi.read(), extensions=['fenced_code', 'codehilite'])
.encode('utf-8'))
case "/license":
self._set_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
for i in listdir("LICENSES"):
with open(f"LICENSES/{i}", "r", encoding='utf-8') as fi:
self.wfile.write(fi.read().encode('utf-8'))
self.wfile.write("-----------------------------------".encode('utf-8'))
case "/source":
self.send_response(301)
self.send_header(
'Location',
'https://git.disroot.org/electromagneticcyclone/sorryops'
)
self.end_headers()
case _ if self.path.startswith("/TOS."):
self._set_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
path = self.path[1:] + ("" if self.path.endswith(".md") else ".md")
if path in listdir("./"):
with open(path, "r", encoding='utf-8') as fi:
self.wfile.write(
markdown(fi.read(), extensions=['fenced_code', 'codehilite'])
.encode('utf-8')
)
else:
self._set_response(404)
self.end_headers()
self.wfile.write("404 Not found".encode('utf-8'))
case _ if self.path.startswith("/assets/"):
self._set_response(200)
with open(self.path[1:], "rb") as fi:
data = fi.read()
self.send_header('Accept-Ranges', 'bytes')
self.send_header('Content-Disposition', 'attachment')
self.send_header('Content-Length', len(data))
self.end_headers()
self.wfile.write(data)
case _:
db_path = 'tests.' + '.'.join(self.path[1:].split('/'))
data = db.read(db_path, None)
if data is None:
self._set_response(404)
self.end_headers()
self.wfile.write("404 Not found".encode('utf-8'))
else:
send_data = dict()
if 'correct' in data:
send_data['correct'] = data['correct']
if 'incorrect' in data:
send_data['incorrect'] = data['incorrect']
self._set_response(200)
self.send_header('Content-type', 'text/json; charset=utf-8')
self.end_headers()
self.wfile.write(json.dumps(send_data).encode('utf-8'))
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length).decode('utf-8')
self._set_response(parse_request(post_data))
self.wfile.write(f"POST request for {self.path}".encode('utf-8'))
def run_resolver():
p2_tmo = tmo.Timeout(2 * 60)
while True:
p2_tmo.check(resolve_logs, lambda: None)
def run(server_class=HTTPServer, handler_class=S, port=8000):
server_address = ('127.0.0.1', port)
httpd = server_class(server_address, handler_class)
funcs = [httpd.serve_forever, run_resolver]
threads = map(lambda x: Thread(target=x), funcs)
for thread in threads:
thread.daemon = True
thread.start()
try:
while True:
sleep(1)
except KeyboardInterrupt:
pass
for thread in threads:
thread.stop()
httpd.server_close()
if __name__ == '__main__' and sys.flags.interactive == 0:
run()