duty-board-dog/db_classes.py

107 lines
3.4 KiB
Python

# SPDX-FileCopyrightText: 2023 Egor Guslyancev <electromagneticcyclone@disroot.org>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
"The module provides DB backends."
import pickle as p
from os import path
class PrototypeDB:
"Prototype for database classes."
def __init__(self, dbfile: str):
pass
def save(self, dbfo: str = None):
"Export database to the file."
raise AssertionError("Unimplemented method")
def load(self, dbfi: str = None):
"Import database from the file."
raise AssertionError("Unimplemented method")
def write(self, field: str, value: any, data=None):
"Write `value` to the database's `field`. `data` argument is not used. Returns new db."
raise AssertionError("Unimplemented method")
def read(self, field: str, default: any = None, data=None) -> any:
"Read `field` from the database. `default` argument returned when where's no such a field."
raise AssertionError("Unimplemented method")
def pop(self, field: str) -> any:
"Remove `field` from the database."
raise AssertionError("Unimplemented method")
class PickleDB(PrototypeDB):
"Database that uses pickle as a backend."
__db = {}
__dbfile = ""
def __init__(self, dbfile: str):
self.__dbfile = dbfile
self.load()
def save(self, dbfo: str = None):
"Export database to the file."
if dbfo is None:
dbfo = self.__dbfile
with open(dbfo, "wb") as fo:
p.dump(self.__db, fo)
def load(self, dbfi: str = None):
"Import database from the file."
if dbfi is None:
dbfi = self.__dbfile
if not path.exists(dbfi):
self.save()
return
with open(dbfi, "rb") as fi:
self.__db = p.load(fi)
def write(self, field: str, value: any, data=None):
"Write `value` to the database's `field`. `data` argument is not used. Returns new db."
head = False
if data is None:
data = self.__db
head = True
field = field.split(".")
if len(field) == 1:
data[field[0]] = value
else:
if (field[0] not in data) or (data[field[0]] is None):
data[field[0]] = {}
if not isinstance(data[field[0]], dict):
raise ValueError("Поле не является группой.")
data[field[0]] = self.write(".".join(field[1:]), value, data[field[0]])
self.save()
if head:
self.__db = data
return data
def read(self, field: str, default: any = None, data=None) -> any:
"Read `field` from the database. `default` argument returned when where's no such a field."
if data is None:
data = self.__db
if field == "":
return data
field = field.split(".")
if field[0] not in data:
return default
if len(field) == 1:
return data[field[0]]
return self.read(".".join(field[1:]), default, data[field[0]])
def pop(self, field: str) -> any:
"Remove `field` from the database."
ret = self.read(field)
field = field.split(".")
val = self.read(".".join(field[:-1]))
if not isinstance(val, dict):
return None
val.pop(field[-1])
self.write(".".join(field[:-1]), val)
return ret