Compare commits

...

2 Commits

Author SHA1 Message Date
Anedroid 3ae0ff509c
using enum for status codes 2022-12-29 16:50:19 +01:00
Anedroid 6d19b90cc0
migrated from camelCase to snake_case 2022-12-29 14:45:07 +01:00
9 changed files with 171 additions and 171 deletions

View File

@ -18,7 +18,7 @@ Last step open your favourite Gemini client and jump into gemini://localhost
Have fun!
## Screenshots (cooming soon)
## Screenshots (comming soon)
## License

View File

@ -1,46 +1,56 @@
import sqlite3
from enum import Enum
import logging
DEBUG = False
logging.basicConfig(level=logging.DEBUG)
def dict_factory(cursor, row):
fields = [column[0] for column in cursor.description]
return {key: value for key, value in zip(fields, row)}
def generateToken(length):
def generate_token(length):
import random
token = ''
for i in range(0, length):
token += random.choice('1234567890abcdefghijklmnopqrstuvwxyz')
return token
class auth:
class Auth:
ENABLE_REGISTRATION = True
LINK_EXPIRE = 10*60
ANTIC_EXPIRE = 60*60*24
# status codes (I have no idea how to use Enum properly)
STATUS = Enum('STATUS', [
'SUCCESS',
'NAME_IN_USE',
'ACTION_DISABLED',
'BAD_TOKEN',
'KEY_IN_USE',
'NOT_FOUND'
])
hash = None
certName = None
cert_name = None
username = None
anticsrf = False
# User row cache. It's enough to ask database once and update cache only when asked for outdated or missing columns.
user = {}
userOutdated = []
user_outdated = []
# User keys cache indexed by hashes
keys = {}
keysOutdated = ["all"]
# getKeys always returns all keys owned by user.
keys_outdated = ["all"]
# get_keys always returns all keys owned by user.
def __init__(self, dbFile):
def __init__(self, db_file):
"""
database auto-creation, garbage collection
"""
self.con = sqlite3.connect(dbFile)
self.con = sqlite3.connect(db_file)
self.con.row_factory = dict_factory
if (DEBUG):
self.con.set_trace_callback(logging.warning)
self.con.set_trace_callback(logging.debug)
self.cur = self.con.cursor()
self.cur.execute("""
@ -68,37 +78,37 @@ class auth:
""")
# TODO: database migration
# self.migrateDatabase()
self.garbageCollector()
# self.migrate_database()
self.garbage_collector()
def garbageCollector(self):
def garbage_collector(self):
"""
delete all unlinked keys and expired tokens
"""
# garbageCollector is intended to run before caching initialization
# garbage_collector is intended to run before caching initialization
self.cur.execute("DELETE FROM keys WHERE user IS NULL")
self.cur.execute("UPDATE users SET link_token = NULL, link_token_time = NULL WHERE link_token_time + ? - strftime('%s') <= 0", (self.LINK_EXPIRE, ))
# field request_rename expire together with anticsrf
self.cur.execute("UPDATE users SET anticsrf = NULL, anticsrf_time = NULL, request_rename = NULL WHERE anticsrf_time + ? - strftime('%s') <= 0", (self.ANTIC_EXPIRE, ))
self.con.commit()
def passKey(self, hash, name=None):
def pass_key(self, hash, name=None):
"""
pass given key to the object
"""
self.hash = hash
self.certName = name
self.cert_name = name
key = self.fetchKey()
key = self.fetch_key()
if (not key):
self.registerKey()
self.register_key()
return # if key is just registered, there is not username yet
self.username = key['username']
self.updateKey()
self.update_key()
# we do not need to update cache right now
def fetchKey(self):
def fetch_key(self):
"""
get current key and username
"""
@ -115,7 +125,7 @@ class auth:
return res
def getKeys(self, require=[]):
def get_keys(self, require=[]):
"""
get all keys of current user
"require" argument controls which fields must be up to date - if ommited, there are no requirements!
@ -124,13 +134,13 @@ class auth:
return None
outdated = False
if ('all' not in self.keysOutdated):
if ('all' not in self.keys_outdated):
for key in require:
if (key in self.keysOutdated):
if (key in self.keys_outdated):
outdated = True
break
else:
self.keysOutdated.remove('all')
self.keys_outdated.remove('all')
outdated = True
if (outdated):
@ -141,52 +151,50 @@ class auth:
for key in res:
self.keys[key['hash']] = key
del key['hash']
self.keysOutdated.clear()
self.keys_outdated.clear()
if (DEBUG):
logging.warning({"keys": self.keys, "keysOutdated": self.keysOutdated})
logging.debug({"keys": self.keys, "keys_outdated": self.keys_outdated})
return self.keys
def userInfo(self, column):
def user_info(self, column):
"""
get user row from database/cache
"""
if (not self.username):
return None
if (DEBUG):
logging.warning({"user": self.user, "userOutdated": self.userOutdated, "requested": column})
logging.debug({"user": self.user, "user_outdated": self.user_outdated, "requested": column})
if (column in self.user and column not in self.userOutdated):
if (column in self.user and column not in self.user_outdated):
return self.user[column]
res = self.cur.execute("SELECT * FROM users WHERE users.name = ?", (self.username, ))
self.user = res.fetchone()
self.userOutdated.clear()
self.user_outdated.clear()
return self.user[column]
def updateUserInfo(self, column, value):
def update_user_info(self, column, value):
"""
handy function for quick update self.user and self.userOutdated
for outdating it's enough to self.userOutdated.append(key)
handy function for quick update self.user and self.user_outdated
for outdating it's enough to self.user_outdated.append(key)
"""
self.user[column] = value
while (column in self.userOutdated): # remove duplicates
self.userOutdated.remove(column)
while (column in self.user_outdated): # remove duplicates
self.user_outdated.remove(column)
def updateKeyInfo(self, hash, column, value):
def update_key_info(self, hash, column, value):
"""
handy function (at the moment almost inevitable) for quick update self.keys and self.keysOutdated
for outdating it's enough to self.keysOutdated.append(key)
handy function (at the moment almost inevitable) for quick update self.keys and self.keys_outdated
for outdating it's enough to self.keys_outdated.append(key)
"""
if (hash not in self.keys):
self.keys[hash] = {}
self.keys[hash][column] = value
while (column in self.keysOutdated): # remove duplicates
self.keysOutdated.remove(column)
while (column in self.keys_outdated): # remove duplicates
self.keys_outdated.remove(column)
def genAnticSRF(self):
def gen_anticsrf(self):
"""
generate antic cross-site request forgery token
There's one token per session.
@ -196,46 +204,46 @@ class auth:
# skip generating token if already generated for this session
if (self.anticsrf):
return self.userInfo('anticsrf')
return self.user_info('anticsrf')
token = generateToken(4)
token = generate_token(4)
self.cur.execute("UPDATE users SET anticsrf = ?, anticsrf_time = strftime('%s') WHERE name = ?", (token, self.username))
self.con.commit()
self.anticsrf = True
self.updateUserInfo('anticsrf', token)
self.userOutdated.append('anticsrf_time')
self.update_user_info('anticsrf', token)
self.user_outdated.append('anticsrf_time')
return token
def checkAnticSRF(self, token):
def check_anticsrf(self, token):
"""
check antic cross-site request forgery token validity
Remider: there's one token per session
"""
if (not self.username):
return None
validity = token == self.userInfo('anticsrf')
validity = token == self.user_info('anticsrf')
self.cur.execute("UPDATE users SET anticsrf = NULL, anticsrf_time = NULL WHERE name = ?", (self.username, ))
self.con.commit()
self.updateUserInfo('anticsrf', None)
self.updateUserInfo('anticsrf_time', None)
self.update_user_info('anticsrf', None)
self.update_user_info('anticsrf_time', None)
return validity
def registerKey(self):
def register_key(self):
"""
insert new key into database (will be deleted if not linked)
"""
if (not self.hash):
return None
self.cur.execute("INSERT INTO keys (hash, name) VALUES (?, ?)", (self.hash, self.certName))
self.cur.execute("INSERT INTO keys (hash, name) VALUES (?, ?)", (self.hash, self.cert_name))
# unlinked key is deleted anyway, so why don't wait with commit until linking and avoid unneccessary disk IO?
self.updateKeyInfo(self.hash, 'name', self.certName)
self.update_key_info(self.hash, 'name', self.cert_name)
def updateKey(self):
def update_key(self):
"""
touch current key timestamp
"""
@ -244,16 +252,9 @@ class auth:
self.cur.execute("UPDATE keys SET last_seen = strftime('%s') WHERE hash = ?", (self.hash, ))
self.con.commit()
self.keysOutdated.append('last_seen')
self.keys_outdated.append('last_seen')
SUCCESS = 0
NAME_IN_USE = 1
ACTION_DISABLED = 2
BAD_TOKEN = 3
KEY_IN_USE = 4
NOT_FOUND = 5
def registerUser(self, username):
def register_user(self, username):
"""
link new user to the current key
"""
@ -261,11 +262,11 @@ class auth:
return None
if (not self.ENABLE_REGISTRATION):
return self.ACTION_DISABLED
return self.STATUS.ACTION_DISABLED
res = self.cur.execute("SELECT * FROM users WHERE name = ?", (username, ))
if (res.fetchone()):
return self.NAME_IN_USE
return self.STATUS.NAME_IN_USE
self.cur.execute("INSERT INTO users (name) VALUES (?)", (username, ))
uid = self.cur.lastrowid
@ -273,11 +274,11 @@ class auth:
# now the key is protected from autodeletion
self.con.commit()
self.username = username
self.updateKeyInfo(self.hash, 'user', uid)
self.update_key_info(self.hash, 'user', uid)
return self.SUCCESS
return self.STATUS.SUCCESS
def requestLink(self, cancel=False):
def request_link(self, cancel=False):
"""
generate link token
"""
@ -285,7 +286,7 @@ class auth:
return None
if (cancel):
self.burnLink(self.username)
self.burn_link(self.username)
self.con.commit()
return True
@ -293,7 +294,7 @@ class auth:
token = None
while not token:
token = generateToken(16)
token = generate_token(16)
res = self.cur.execute("SELECT * FROM users WHERE link_token = ?", (token, ))
if (res.fetchone()):
token = None
@ -303,18 +304,18 @@ class auth:
self.cur.execute("UPDATE users SET link_token = ?, link_token_time = strftime('%s') WHERE name = ?", (token, self.username))
self.con.commit()
self.updateUserInfo('link_token', token)
self.userOutdated.append('link_token_time')
self.update_user_info('link_token', token)
self.user_outdated.append('link_token_time')
return token
def burnLink(self, username):
def burn_link(self, username):
"""
force link token expiration
"""
self.cur.execute("UPDATE users SET link_token = NULL, link_token_time = NULL WHERE name = ?", (username, ))
self.updateUserInfo('link_token', None)
self.updateUserInfo('link_token_time', None)
self.update_user_info('link_token', None)
self.update_user_info('link_token_time', None)
def link(self, token):
"""
@ -327,13 +328,13 @@ class auth:
res = res.fetchone()
if (res):
self.cur.execute("UPDATE keys SET user = ? WHERE hash = ?", (res['id'], self.hash))
self.burnLink(res['name'])
self.burn_link(res['name'])
self.con.commit()
self.updateKeyInfo(self.hash, 'user', res['id'])
self.update_key_info(self.hash, 'user', res['id'])
self.username = res['name']
return self.SUCCESS
return self.STATUS.SUCCESS
else:
return self.BAD_TOKEN
return self.STATUS.BAD_TOKEN
def unlink(self, hash):
"""
@ -343,49 +344,49 @@ class auth:
return None
if (hash == self.hash):
return self.KEY_IN_USE
return self.STATUS.KEY_IN_USE
if (hash in self.getKeys()):
if (hash in self.get_keys()):
self.cur.execute("DELETE FROM keys WHERE hash = ?", (hash, ))
self.con.commit()
del self.keys[hash]
return self.SUCCESS
return self.STATUS.SUCCESS
return self.NOT_FOUND
return self.STATUS.NOT_FOUND
def requestRename(self, hash):
def request_rename(self, hash):
"""
prepare for changing name of given key
"""
if (not self.username):
return None
if (hash in self.getKeys()):
if (hash in self.get_keys()):
self.cur.execute("UPDATE users SET request_rename = ? WHERE name = ?", (hash, self.username))
self.con.commit()
self.user['request_rename'] = hash
return self.SUCCESS
return self.NOT_FOUND
return self.STATUS.SUCCESS
def renameKey(self, name):
return self.STATUS.NOT_FOUND
def rename_key(self, name):
"""
Change the display name of given key
"""
if (not self.username):
return None
hash = self.userInfo('request_rename')
hash = self.user_info('request_rename')
# key hash in database is probably good, but's better safe than sorry
if (hash in self.getKeys()):
if (hash in self.get_keys()):
self.cur.execute("UPDATE keys SET name = ? WHERE hash = ?", (name, hash))
self.cur.execute("UPDATE users SET request_rename = NULL WHERE name = ?", (self.username, ))
self.con.commit()
self.updateKeyInfo(hash, 'name', name)
self.updateUserInfo('request_rename', None)
return self.SUCCESS
self.update_key_info(hash, 'name', name)
self.update_user_info('request_rename', None)
return self.STATUS.SUCCESS
return self.NOT_FOUND
return self.STATUS.NOT_FOUND
if (__name__ == '__main__'):
@ -393,10 +394,9 @@ if (__name__ == '__main__'):
if (len(sys.argv) > 1):
auth(sys.argv[1])
print({
"enableRegistration": auth.ENABLE_REGISTRATION,
"linkExpire": auth.LINK_EXPIRE,
"anticExpire": auth.ANTIC_EXPIRE,
"debug": DEBUG
"enable_registration": Auth.ENABLE_REGISTRATION,
"link_expire": Auth.LINK_EXPIRE,
"antic_expire": Auth.ANTIC_EXPIRE
})
else:
print('Database file not specified')

View File

@ -10,13 +10,13 @@ if (not hash):
# no CC
print('60 Authentication is required\r\n')
exit()
certName = os.environ.get('REMOTE_USER')
cert_name = os.environ.get('REMOTE_USER')
print('20 text/gemini\r\n')
from auth import auth
auth = auth('data/data.db')
auth.passKey(hash, certName)
from auth import Auth
auth = Auth('data/data.db')
auth.pass_key(hash, cert_name)
if (not auth.username):
# mismatch
@ -29,10 +29,10 @@ else:
print('Hello, {}!'.format(auth.username))
print('## Your keys')
myKeys = auth.getKeys(['last_seen'])
for hash in myKeys:
key = myKeys[hash]
lastSeen = datetime.fromtimestamp(key['last_seen'])
my_keys = auth.get_keys(['last_seen'])
for hash in my_keys:
key = my_keys[hash]
last_seen = datetime.fromtimestamp(key['last_seen'])
current = hash == auth.hash
name = key['name']
@ -44,14 +44,14 @@ else:
print(label)
print('hash:', hash)
print('last seen:', lastSeen)
print('=> rename-request.gmi?{} rename'.format(auth.genAnticSRF() + hash))
print('last seen:', last_seen)
print('=> rename-request.gmi?{} rename'.format(auth.gen_anticsrf() + hash))
if (not current):
print('=> unlink.gmi?{} unlink'.format(auth.genAnticSRF() + hash))
print('=> unlink.gmi?{} unlink'.format(auth.gen_anticsrf() + hash))
linkToken = auth.userInfo('link_token')
if(linkToken):
expire = datetime.fromtimestamp(auth.userInfo('link_token_time') + auth.LINK_EXPIRE)
link_token = auth.user_info('link_token')
if(link_token):
expire = datetime.fromtimestamp(auth.user_info('link_token_time') + auth.LINK_EXPIRE)
delta = expire - datetime.now()
minutes = delta.seconds // 60
seconds = delta.seconds % 60
@ -59,7 +59,7 @@ else:
def zero(n):
return str(n) if n>10 else '0'+str(n)
print('### Link new key')
print('Token {} will expire in {}:{}'.format(linkToken, zero(minutes), zero(seconds)))
print('Token {} will expire in {}:{}'.format(link_token, zero(minutes), zero(seconds)))
print('=> link.gmi?cancel cancel')
else:
print('---')

View File

@ -10,11 +10,11 @@ if (not hash):
# no CC
print('60 Authentication is required\r\n')
exit()
certName = os.environ.get('REMOTE_USER')
cert_name = os.environ.get('REMOTE_USER')
from auth import auth
auth = auth('data/data.db')
auth.passKey(hash, certName)
from auth import Auth
auth = Auth('data/data.db')
auth.pass_key(hash, cert_name)
query = os.environ.get('QUERY_STRING')
@ -24,7 +24,7 @@ if (auth.username):
# empty
from datetime import datetime
token = auth.requestLink()
token = auth.request_link()
if (not token):
print('40 Unknown error\r\n')
exit()
@ -36,9 +36,9 @@ if (auth.username):
print('=> ?cancel cancel')
elif (query == 'cancel'):
# cancel
auth.requestLink(cancel=True)
auth.request_link(cancel=True)
print('30 index.gmi\r\n')
elif (query == auth.userInfo('link_token')):
elif (query == auth.user_info('link_token')):
print('20 text/gemini\r\n')
print('Tip: open this link on new device in order to link new key to your account')
else:
@ -51,11 +51,11 @@ else:
else:
# token
res = auth.link(query)
if (res == auth.SUCCESS):
if (res == auth.STATUS.SUCCESS):
print('20 text/gemini\r\n')
print('Successfully linked to {}!'.format(auth.username))
print('=> index.gmi back to home')
elif (res == auth.BAD_TOKEN):
elif (res == auth.STATUS.BAD_TOKEN):
print('20 text/gemini\r\n')
print('It seems have you entered invalid or expired token. Try to generate a new one.')
else:

View File

@ -10,11 +10,11 @@ if (not hash):
# no CC
print('60 Authentication is required\r\n')
exit()
certName = os.environ.get('REMOTE_USER')
cert_name = os.environ.get('REMOTE_USER')
from auth import auth
auth = auth('data/data.db')
auth.passKey(hash, certName)
from auth import Auth
auth = Auth('data/data.db')
auth.pass_key(hash, cert_name)
if (auth.username):
# match
@ -33,10 +33,10 @@ else:
print('10 Choose your name\r\n')
else:
# string
res = auth.registerUser(username)
if (res == auth.SUCCESS):
res = auth.register_user(username)
if (res == auth.STATUS.SUCCESS):
print('31 index.gmi\r\n')
elif (res == auth.NAME_IN_USE):
elif (res == auth.STATUS.NAME_IN_USE):
print('10 Chose your name (name already in use)\r\n')
# Skipped ACTION_DISABLED because we already checked that
else:

View File

@ -10,11 +10,11 @@ if (not hash):
# no CC
print('60 Authentication is required\r\n')
exit()
certName = os.environ.get('REMOTE_USER')
cert_name = os.environ.get('REMOTE_USER')
from auth import auth
auth = auth('data/data.db')
auth.passKey(hash, certName)
from auth import Auth
auth = Auth('data/data.db')
auth.pass_key(hash, cert_name)
if (not auth.username):
# mismatch
@ -29,22 +29,22 @@ else:
print('20 text/gemini\r\n')
print('Which key would you like to rename?')
myKeys = auth.getKeys(['last_seen'])
for hash in myKeys:
key = myKeys['hash']
lastSeen = datetime.fromtimestamp(key['last_seen'])
my_keys = auth.get_keys(['last_seen'])
for hash in my_keys:
key = my_keys['hash']
last_seen = datetime.fromtimestamp(key['last_seen'])
current = hash == auth.hash
name = key['name']
name = '"' + name + '"' if name else '[no name]'
label = '=> rename-request.gmi?{} {}'.format(auth.genAnticSRF() + hash, name)
label = '=> rename-request.gmi?{} {}'.format(auth.gen_anticsrf() + hash, name)
if (current):
label += ' (currently used)'
print(label)
print('hash:', hash)
print('last seen:', lastSeen)
print('last seen:', last_seen)
else:
anticsrf = query[:4]
hash = query[4:]
@ -54,11 +54,11 @@ else:
exit()
# anticsrf+hash
if (auth.checkAnticSRF(anticsrf)):
res = auth.requestRename(hash)
if (res == auth.SUCCESS):
if (auth.check_anticsrf(anticsrf)):
res = auth.request_rename(hash)
if (res == auth.STATUS.SUCCESS):
print('30 rename.gmi\r\n')
elif (res == auth.NOT_FOUND):
elif (res == auth.STATUS.NOT_FOUND):
print('20 text/gemini\r\n')
print('Failed to rename non-existing key, or key which does not belong to you.')
print('=> index.gmi back to home')

View File

@ -10,11 +10,11 @@ if (not hash):
# no CC
print('60 Authentication is required\r\n')
exit()
certName = os.environ.get('REMOTE_USER')
cert_name = os.environ.get('REMOTE_USER')
from auth import auth
auth = auth('data/data.db')
auth.passKey(hash, certName)
from auth import Auth
auth = Auth('data/data.db')
auth.pass_key(hash, cert_name)
if (not auth.username):
# mismatch
@ -24,17 +24,17 @@ else:
name = os.environ.get('QUERY_STRING')
if (not name):
# empty
if (auth.userInfo('request_rename')):
if (auth.user_info('request_rename')):
# TODO: tell which key are you renaming
print('10 Choose new name for your key\r\n')
else:
print('30 rename-request.gmi\r\n')
else:
# string
res = auth.renameKey(name)
if (res == auth.SUCCESS):
res = auth.rename_key(name)
if (res == auth.STATUS.SUCCESS):
print('30 index.gmi\r\n')
elif (res == auth.NOT_FOUND):
elif (res == auth.STATUS.NOT_FOUND):
print('20 text/gemini\r\n')
print('Failed to rename non-existing key, or key which does not belong to you.')
print('=> index.gmi back to home')

View File

@ -10,11 +10,11 @@ if (not hash):
# no CC
print('60 Authentication is required\r\n')
exit()
certName = os.environ.get('REMOTE_USER')
cert_name = os.environ.get('REMOTE_USER')
from auth import auth
auth = auth('data/data.db')
auth.passKey(hash, certName)
from auth import Auth
auth = Auth('data/data.db')
auth.pass_key(hash, cert_name)
if (not auth.username):
# mismatch
@ -35,16 +35,16 @@ else:
exit()
# anticsrf+hash
if (auth.checkAnticSRF(anticsrf)):
if (auth.check_anticsrf(anticsrf)):
res = auth.unlink(hash)
if (res == auth.SUCCESS):
if (res == auth.STATUS.SUCCESS):
print('30 index.gmi\r\n')
elif (res == auth.KEY_IN_USE):
elif (res == auth.STATUS.KEY_IN_USE):
print('20 text/gemini\r\n')
print('You have requested to delete the key, which is being used by you RIGHT NOW.')
print('This could lead to the loss of your account access. If you want to proceed, authenticate with another key and try again.')
print('=> index.gmi back to home')
elif (res == auth.NOT_FOUND):
elif (res == auth.STATUS.NOT_FOUND):
print('20 text/gemini\r\n')
print('Failed to delete non-existing key, or key which does not belong to you.')
print('Maybe you\'re trying to delete already deleted key?')

View File

@ -10,16 +10,16 @@ hash = os.environ.get('TLS_CLIENT_HASH')
if (not hash):
print('60 Authentication is required\r\n')
exit()
certName = os.environ.get('REMOTE_USER')
cert_name = os.environ.get('REMOTE_USER')
print('20 text/gemini\r\n')
from auth import auth
auth = auth('data/data.db')
auth.passKey(hash, certName)
from auth import Auth
auth = Auth('data/data.db')
auth.pass_key(hash, cert_name)
print('Your hash:', auth.hash)
print('Your common name:', certName)
print('Your common name:', cert_name)
if (auth.username):
print('Your username:', auth.username)
print('=> account/index.gmi manage your account')