This repository has been archived on 2024-05-17. You can view files and clone it, but cannot push or open issues or pull requests.
gemini-auth/lib/auth/__init__.py

232 lines
7.2 KiB
Python

import sqlite3
def dict_factory(cursor, row):
fields = [column[0] for column in cursor.description]
return {key: value for key, value in zip(fields, row)}
def generateToken(length):
import random
token = ''
for i in range(0, length):
token += random.choice('1234567890abcdefghijklmnopqrstuvwxyz')
return token
class auth:
ENABLE_REGISTRATION = True
LINK_EXPIRE = 100*60
hash = None
username = None
# If outdated, userInfo will fetch data from database. Instead, it can be updated directly by this library to reduce disk IO.
user = {"outdated": True}
def __init__(self, dbFile):
"""
database auto-creation, garbage collection
"""
self.con = sqlite3.connect(dbFile)
self.con.row_factory = dict_factory
self.cur = self.con.cursor()
self.cur.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(255) UNIQUE,
link_token VARCHAR(16) UNIQUE,
link_token_time INTEGER,
request_delete VARCHAR(16),
request_delete_time INTEGER,
anticsrf VARCHAR(4),
anticsrf_time INTEGER
)
""")
self.cur.execute("""
CREATE TABLE IF NOT EXISTS keys (
hash VARCHAR(255) PRIMARY KEY,
user INTEGER,
last_seen INTEGER NOT NULL DEFAULT (strftime('%s')),
name VARCHAR(255),
FOREIGN KEY (user) REFERENCES users (id)
ON DELETE CASCADE
)
""")
self.garbageCollector()
def garbageCollector(self):
"""
delete all unlinked keys
"""
self.cur.execute("DELETE FROM keys WHERE user IS NULL")
self.cur.execute("UPDATE users SET link_token = NULL, link_token_time = NULL WHERE link_token_time + ? - strftime('%s') <= 0", (self.LINK_EXPIRE, ))
self.con.commit()
def passKey(self, hash):
"""
pass given key to the object
"""
self.hash = hash
key = self.fetchKey()
if (not key):
self.registerKey()
return # if key is just registered, there is not username yet
self.username = key['username']
self.updateKey()
# we do not need to update cache right now
def fetchKey(self):
"""
get current key and username
"""
if (not self.hash):
return None
res = self.cur.execute("SELECT keys.*, users.name as username FROM users, keys WHERE users.id = keys.user AND keys.hash = ?", (self.hash, ))
return res.fetchone()
def getKeys(self):
"""
get all keys of current user
"""
if (not self.username):
return None
res = self.cur.execute("SELECT * FROM users, keys WHERE users.id = keys.user AND users.name = ?", (self.username, ))
return res.fetchall()
def userInfo(self, key=None):
"""
get user row from database/cache
"""
if (not self.username):
return None
if (not self.user['outdated']):
return self.user if not key else self.user[key]
res = self.cur.execute("SELECT * FROM users WHERE users.name = ?", (self.username, ))
self.user = res.fetchone()
self.user['outdated'] = False
return self.user if not key else self.user[key]
def registerKey(self):
"""
insert new key into database (will be deleted if not linked)
TODO: autoset common name
"""
if (not self.hash):
return None
self.cur.execute("INSERT INTO keys (hash) VALUES (?)", (self.hash, ))
# unlinked key is deleted anyway, so why don't wait with commit until linking and avoid unneccessary disk IO?
def updateKey(self):
"""
touch current key timestamp
"""
if (not self.hash):
return None
self.cur.execute("UPDATE keys SET last_seen = strftime('%s') WHERE hash = ?", (self.hash, ))
self.con.commit()
SUCCESS = 0
NAME_IN_USE = 1
ACTION_DISABLED = 2
BAD_TOKEN = 1
def registerUser(self, username):
"""
link new user to the current key
response codes:
0 - registered successfully
1 - name in use
2 - registration disabled
3 - (future) captcha challenge
"""
if (not self.hash):
return None
if (not self.ENABLE_REGISTRATION):
return self.ACTION_DISABLED
res = self.cur.execute("SELECT * FROM users WHERE name = ?", (username, ))
if (res.fetchone()):
return self.NAME_IN_USE
self.cur.execute("INSERT INTO users (name) VALUES (?)", (username, ))
uid = self.cur.lastrowid
self.cur.execute("UPDATE keys SET user = ? WHERE hash = ?", (uid, self.hash))
# now the key is protected from autodeletion
self.con.commit()
self.username = username
return self.SUCCESS
def requestLink(self, cancel=False):
"""
generate link token
"""
if (not self.username):
return None
if (cancel):
# self.cur.execute("UPDATE users SET link_token = NULL, link_token_time = NULL WHERE name = ?", (self.username, ))
self.burnLink(self.username)
self.con.commit()
if (self.user):
self.user['link_token'] = self.user['link_token_time'] = None
return True
trials = 3
token = None
while not token:
token = generateToken(16)
res = self.cur.execute("SELECT * FROM users WHERE link_token = ?", (token, ))
if (res.fetchone()):
tokenSet = None
trials -= 1
if (trials < 0):
return False
self.cur.execute("UPDATE users SET link_token = ?, link_token_time = strftime('%s') WHERE name = ?", (token, self.username))
self.con.commit()
self.user['outdated'] = True
return token
def burnLink(self, username):
"""
force link token expiration
"""
self.cur.execute("UPDATE users SET link_token = NULL, link_token_time = NULL WHERE name = ?", (username, ))
def link(self, token):
"""
link current key to user of given token
"""
if (not self.hash):
return None
res = self.cur.execute("SELECT id, name FROM users WHERE link_token = ?", (token, ))
res = res.fetchone()
if (res):
self.cur.execute("UPDATE keys SET user = ? WHERE hash = ?", (res['id'], self.hash))
self.burnLink(res['name'])
self.con.commit()
self.username = res['name']
return auth.SUCCESS
else:
return auth.BAD_TOKEN
if (__name__ == '__main__'):
import sys
if (len(sys.argv) > 1):
auth(sys.argv[1])
print({"enableRegistration": auth.ENABLE_REGISTRATION})
else:
print('Database file not specified')