using enum for status codes

This commit is contained in:
Anedroid 2022-12-29 16:50:19 +01:00
parent 6d19b90cc0
commit 3ae0ff509c
Signed by: anedroid
GPG Key ID: F149EE15E69C7F45
8 changed files with 60 additions and 60 deletions

View File

@ -1,7 +1,8 @@
import sqlite3 import sqlite3
from enum import Enum
import logging import logging
DEBUG = False logging.basicConfig(level=logging.DEBUG)
def dict_factory(cursor, row): def dict_factory(cursor, row):
fields = [column[0] for column in cursor.description] fields = [column[0] for column in cursor.description]
@ -14,11 +15,21 @@ def generate_token(length):
token += random.choice('1234567890abcdefghijklmnopqrstuvwxyz') token += random.choice('1234567890abcdefghijklmnopqrstuvwxyz')
return token return token
class auth: class Auth:
ENABLE_REGISTRATION = True ENABLE_REGISTRATION = True
LINK_EXPIRE = 10*60 LINK_EXPIRE = 10*60
ANTIC_EXPIRE = 60*60*24 ANTIC_EXPIRE = 60*60*24
# status codes (I have no idea how to use Enum properly)
STATUS = Enum('STATUS', [
'SUCCESS',
'NAME_IN_USE',
'ACTION_DISABLED',
'BAD_TOKEN',
'KEY_IN_USE',
'NOT_FOUND'
])
hash = None hash = None
cert_name = None cert_name = None
username = None username = None
@ -39,8 +50,7 @@ class auth:
""" """
self.con = sqlite3.connect(db_file) self.con = sqlite3.connect(db_file)
self.con.row_factory = dict_factory self.con.row_factory = dict_factory
if (DEBUG): self.con.set_trace_callback(logging.debug)
self.con.set_trace_callback(logging.warning)
self.cur = self.con.cursor() self.cur = self.con.cursor()
self.cur.execute(""" self.cur.execute("""
@ -143,8 +153,7 @@ class auth:
del key['hash'] del key['hash']
self.keys_outdated.clear() self.keys_outdated.clear()
if (DEBUG): logging.debug({"keys": self.keys, "keys_outdated": self.keys_outdated})
logging.warning({"keys": self.keys, "keys_outdated": self.keys_outdated})
return self.keys return self.keys
def user_info(self, column): def user_info(self, column):
@ -154,8 +163,7 @@ class auth:
if (not self.username): if (not self.username):
return None return None
if (DEBUG): logging.debug({"user": self.user, "user_outdated": self.user_outdated, "requested": column})
logging.warning({"user": self.user, "user_outdated": self.user_outdated, "requested": column})
if (column in self.user and column not in self.user_outdated): if (column in self.user and column not in self.user_outdated):
return self.user[column] return self.user[column]
@ -206,7 +214,7 @@ class auth:
self.user_outdated.append('anticsrf_time') self.user_outdated.append('anticsrf_time')
return token return token
def check_anticsrf(self, token): def check_anticsrf(self, token):
""" """
check antic cross-site request forgery token validity check antic cross-site request forgery token validity
@ -214,7 +222,7 @@ class auth:
""" """
if (not self.username): if (not self.username):
return None return None
validity = token == self.user_info('anticsrf') validity = token == self.user_info('anticsrf')
self.cur.execute("UPDATE users SET anticsrf = NULL, anticsrf_time = NULL WHERE name = ?", (self.username, )) self.cur.execute("UPDATE users SET anticsrf = NULL, anticsrf_time = NULL WHERE name = ?", (self.username, ))
@ -246,13 +254,6 @@ class auth:
self.con.commit() self.con.commit()
self.keys_outdated.append('last_seen') self.keys_outdated.append('last_seen')
SUCCESS = 0
NAME_IN_USE = 1
ACTION_DISABLED = 2
BAD_TOKEN = 3
KEY_IN_USE = 4
NOT_FOUND = 5
def register_user(self, username): def register_user(self, username):
""" """
link new user to the current key link new user to the current key
@ -261,11 +262,11 @@ class auth:
return None return None
if (not self.ENABLE_REGISTRATION): if (not self.ENABLE_REGISTRATION):
return self.ACTION_DISABLED return self.STATUS.ACTION_DISABLED
res = self.cur.execute("SELECT * FROM users WHERE name = ?", (username, )) res = self.cur.execute("SELECT * FROM users WHERE name = ?", (username, ))
if (res.fetchone()): if (res.fetchone()):
return self.NAME_IN_USE return self.STATUS.NAME_IN_USE
self.cur.execute("INSERT INTO users (name) VALUES (?)", (username, )) self.cur.execute("INSERT INTO users (name) VALUES (?)", (username, ))
uid = self.cur.lastrowid uid = self.cur.lastrowid
@ -275,7 +276,7 @@ class auth:
self.username = username self.username = username
self.update_key_info(self.hash, 'user', uid) self.update_key_info(self.hash, 'user', uid)
return self.SUCCESS return self.STATUS.SUCCESS
def request_link(self, cancel=False): def request_link(self, cancel=False):
""" """
@ -331,9 +332,9 @@ class auth:
self.con.commit() self.con.commit()
self.update_key_info(self.hash, 'user', res['id']) self.update_key_info(self.hash, 'user', res['id'])
self.username = res['name'] self.username = res['name']
return self.SUCCESS return self.STATUS.SUCCESS
else: else:
return self.BAD_TOKEN return self.STATUS.BAD_TOKEN
def unlink(self, hash): def unlink(self, hash):
""" """
@ -343,15 +344,15 @@ class auth:
return None return None
if (hash == self.hash): if (hash == self.hash):
return self.KEY_IN_USE return self.STATUS.KEY_IN_USE
if (hash in self.get_keys()): if (hash in self.get_keys()):
self.cur.execute("DELETE FROM keys WHERE hash = ?", (hash, )) self.cur.execute("DELETE FROM keys WHERE hash = ?", (hash, ))
self.con.commit() self.con.commit()
del self.keys[hash] del self.keys[hash]
return self.SUCCESS return self.STATUS.SUCCESS
return self.NOT_FOUND return self.STATUS.NOT_FOUND
def request_rename(self, hash): def request_rename(self, hash):
""" """
@ -359,14 +360,14 @@ class auth:
""" """
if (not self.username): if (not self.username):
return None return None
if (hash in self.get_keys()): if (hash in self.get_keys()):
self.cur.execute("UPDATE users SET request_rename = ? WHERE name = ?", (hash, self.username)) self.cur.execute("UPDATE users SET request_rename = ? WHERE name = ?", (hash, self.username))
self.con.commit() self.con.commit()
self.user['request_rename'] = hash self.user['request_rename'] = hash
return self.SUCCESS return self.STATUS.SUCCESS
return self.NOT_FOUND return self.STATUS.NOT_FOUND
def rename_key(self, name): def rename_key(self, name):
""" """
@ -383,9 +384,9 @@ class auth:
self.con.commit() self.con.commit()
self.update_key_info(hash, 'name', name) self.update_key_info(hash, 'name', name)
self.update_user_info('request_rename', None) self.update_user_info('request_rename', None)
return self.SUCCESS return self.STATUS.SUCCESS
return self.NOT_FOUND return self.STATUS.NOT_FOUND
if (__name__ == '__main__'): if (__name__ == '__main__'):
@ -393,10 +394,9 @@ if (__name__ == '__main__'):
if (len(sys.argv) > 1): if (len(sys.argv) > 1):
auth(sys.argv[1]) auth(sys.argv[1])
print({ print({
"enable_registration": auth.ENABLE_REGISTRATION, "enable_registration": Auth.ENABLE_REGISTRATION,
"link_expire": auth.LINK_EXPIRE, "link_expire": Auth.LINK_EXPIRE,
"antic_expire": auth.ANTIC_EXPIRE, "antic_expire": Auth.ANTIC_EXPIRE
"debug": DEBUG
}) })
else: else:
print('Database file not specified') print('Database file not specified')

View File

@ -14,8 +14,8 @@ cert_name = os.environ.get('REMOTE_USER')
print('20 text/gemini\r\n') print('20 text/gemini\r\n')
from auth import auth from auth import Auth
auth = auth('data/data.db') auth = Auth('data/data.db')
auth.pass_key(hash, cert_name) auth.pass_key(hash, cert_name)
if (not auth.username): if (not auth.username):

View File

@ -12,8 +12,8 @@ if (not hash):
exit() exit()
cert_name = os.environ.get('REMOTE_USER') cert_name = os.environ.get('REMOTE_USER')
from auth import auth from auth import Auth
auth = auth('data/data.db') auth = Auth('data/data.db')
auth.pass_key(hash, cert_name) auth.pass_key(hash, cert_name)
query = os.environ.get('QUERY_STRING') query = os.environ.get('QUERY_STRING')
@ -51,11 +51,11 @@ else:
else: else:
# token # token
res = auth.link(query) res = auth.link(query)
if (res == auth.SUCCESS): if (res == auth.STATUS.SUCCESS):
print('20 text/gemini\r\n') print('20 text/gemini\r\n')
print('Successfully linked to {}!'.format(auth.username)) print('Successfully linked to {}!'.format(auth.username))
print('=> index.gmi back to home') print('=> index.gmi back to home')
elif (res == auth.BAD_TOKEN): elif (res == auth.STATUS.BAD_TOKEN):
print('20 text/gemini\r\n') print('20 text/gemini\r\n')
print('It seems have you entered invalid or expired token. Try to generate a new one.') print('It seems have you entered invalid or expired token. Try to generate a new one.')
else: else:

View File

@ -12,8 +12,8 @@ if (not hash):
exit() exit()
cert_name = os.environ.get('REMOTE_USER') cert_name = os.environ.get('REMOTE_USER')
from auth import auth from auth import Auth
auth = auth('data/data.db') auth = Auth('data/data.db')
auth.pass_key(hash, cert_name) auth.pass_key(hash, cert_name)
if (auth.username): if (auth.username):
@ -34,9 +34,9 @@ else:
else: else:
# string # string
res = auth.register_user(username) res = auth.register_user(username)
if (res == auth.SUCCESS): if (res == auth.STATUS.SUCCESS):
print('31 index.gmi\r\n') print('31 index.gmi\r\n')
elif (res == auth.NAME_IN_USE): elif (res == auth.STATUS.NAME_IN_USE):
print('10 Chose your name (name already in use)\r\n') print('10 Chose your name (name already in use)\r\n')
# Skipped ACTION_DISABLED because we already checked that # Skipped ACTION_DISABLED because we already checked that
else: else:

View File

@ -12,8 +12,8 @@ if (not hash):
exit() exit()
cert_name = os.environ.get('REMOTE_USER') cert_name = os.environ.get('REMOTE_USER')
from auth import auth from auth import Auth
auth = auth('data/data.db') auth = Auth('data/data.db')
auth.pass_key(hash, cert_name) auth.pass_key(hash, cert_name)
if (not auth.username): if (not auth.username):
@ -56,9 +56,9 @@ else:
# anticsrf+hash # anticsrf+hash
if (auth.check_anticsrf(anticsrf)): if (auth.check_anticsrf(anticsrf)):
res = auth.request_rename(hash) res = auth.request_rename(hash)
if (res == auth.SUCCESS): if (res == auth.STATUS.SUCCESS):
print('30 rename.gmi\r\n') print('30 rename.gmi\r\n')
elif (res == auth.NOT_FOUND): elif (res == auth.STATUS.NOT_FOUND):
print('20 text/gemini\r\n') print('20 text/gemini\r\n')
print('Failed to rename non-existing key, or key which does not belong to you.') print('Failed to rename non-existing key, or key which does not belong to you.')
print('=> index.gmi back to home') print('=> index.gmi back to home')

View File

@ -12,8 +12,8 @@ if (not hash):
exit() exit()
cert_name = os.environ.get('REMOTE_USER') cert_name = os.environ.get('REMOTE_USER')
from auth import auth from auth import Auth
auth = auth('data/data.db') auth = Auth('data/data.db')
auth.pass_key(hash, cert_name) auth.pass_key(hash, cert_name)
if (not auth.username): if (not auth.username):
@ -32,9 +32,9 @@ else:
else: else:
# string # string
res = auth.rename_key(name) res = auth.rename_key(name)
if (res == auth.SUCCESS): if (res == auth.STATUS.SUCCESS):
print('30 index.gmi\r\n') print('30 index.gmi\r\n')
elif (res == auth.NOT_FOUND): elif (res == auth.STATUS.NOT_FOUND):
print('20 text/gemini\r\n') print('20 text/gemini\r\n')
print('Failed to rename non-existing key, or key which does not belong to you.') print('Failed to rename non-existing key, or key which does not belong to you.')
print('=> index.gmi back to home') print('=> index.gmi back to home')

View File

@ -12,8 +12,8 @@ if (not hash):
exit() exit()
cert_name = os.environ.get('REMOTE_USER') cert_name = os.environ.get('REMOTE_USER')
from auth import auth from auth import Auth
auth = auth('data/data.db') auth = Auth('data/data.db')
auth.pass_key(hash, cert_name) auth.pass_key(hash, cert_name)
if (not auth.username): if (not auth.username):
@ -37,14 +37,14 @@ else:
# anticsrf+hash # anticsrf+hash
if (auth.check_anticsrf(anticsrf)): if (auth.check_anticsrf(anticsrf)):
res = auth.unlink(hash) res = auth.unlink(hash)
if (res == auth.SUCCESS): if (res == auth.STATUS.SUCCESS):
print('30 index.gmi\r\n') print('30 index.gmi\r\n')
elif (res == auth.KEY_IN_USE): elif (res == auth.STATUS.KEY_IN_USE):
print('20 text/gemini\r\n') print('20 text/gemini\r\n')
print('You have requested to delete the key, which is being used by you RIGHT NOW.') print('You have requested to delete the key, which is being used by you RIGHT NOW.')
print('This could lead to the loss of your account access. If you want to proceed, authenticate with another key and try again.') print('This could lead to the loss of your account access. If you want to proceed, authenticate with another key and try again.')
print('=> index.gmi back to home') print('=> index.gmi back to home')
elif (res == auth.NOT_FOUND): elif (res == auth.STATUS.NOT_FOUND):
print('20 text/gemini\r\n') print('20 text/gemini\r\n')
print('Failed to delete non-existing key, or key which does not belong to you.') print('Failed to delete non-existing key, or key which does not belong to you.')
print('Maybe you\'re trying to delete already deleted key?') print('Maybe you\'re trying to delete already deleted key?')

View File

@ -14,8 +14,8 @@ cert_name = os.environ.get('REMOTE_USER')
print('20 text/gemini\r\n') print('20 text/gemini\r\n')
from auth import auth from auth import Auth
auth = auth('data/data.db') auth = Auth('data/data.db')
auth.pass_key(hash, cert_name) auth.pass_key(hash, cert_name)
print('Your hash:', auth.hash) print('Your hash:', auth.hash)