108 lines
3.4 KiB
Python
108 lines
3.4 KiB
Python
# SPDX-FileCopyrightText: 2023 Egor Guslyancev <electromagneticcyclone@disroot.org>
|
|
#
|
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
|
|
"The module provides DB backends."
|
|
|
|
import pickle as p
|
|
from os import path
|
|
|
|
|
|
class PrototypeDB:
|
|
"Prototype for database classes."
|
|
|
|
def __init__(self, dbfile: str):
|
|
pass
|
|
|
|
def save(self, dbfo: str = None):
|
|
"Export database to the file."
|
|
raise AssertionError("Unimplemented method")
|
|
|
|
def load(self, dbfi: str = None):
|
|
"Import database from the file."
|
|
raise AssertionError("Unimplemented method")
|
|
|
|
def write(self, field: str, value: any, data=None):
|
|
"Write `value` to the database's `field`. `data` argument is not used. Returns new db."
|
|
raise AssertionError("Unimplemented method")
|
|
|
|
def read(self, field: str, default: any = None, data=None) -> any:
|
|
"Read `field` from the database. `default` argument returned when where's no such a field."
|
|
raise AssertionError("Unimplemented method")
|
|
|
|
def pop(self, field: str) -> any:
|
|
"Remove `field` from the database."
|
|
raise AssertionError("Unimplemented method")
|
|
|
|
|
|
class PickleDB(PrototypeDB):
|
|
"Database that uses pickle as a backend."
|
|
__db = {}
|
|
__dbfile = ""
|
|
|
|
def __init__(self, dbfile: str):
|
|
self.__dbfile = dbfile
|
|
self.load()
|
|
|
|
def save(self, dbfo: str = None):
|
|
"Export database to the file."
|
|
if dbfo is None:
|
|
dbfo = self.__dbfile
|
|
with open(dbfo, "wb") as fo:
|
|
p.dump(self.__db, fo)
|
|
|
|
def load(self, dbfi: str = None):
|
|
"Import database from the file."
|
|
if dbfi is None:
|
|
dbfi = self.__dbfile
|
|
if not path.exists(dbfi):
|
|
self.save()
|
|
return
|
|
with open(dbfi, "rb") as fi:
|
|
self.__db = p.load(fi)
|
|
|
|
def write(self, field: str, value: any, data=None):
|
|
"Write `value` to the database's `field`. `data` argument is not used. Returns new db."
|
|
head = False
|
|
if data is None:
|
|
data = self.__db
|
|
head = True
|
|
field = field.split(".")
|
|
if len(field) == 1:
|
|
data[field[0]] = value
|
|
else:
|
|
if (field[0] not in data) or (data[field[0]] is None):
|
|
data[field[0]] = {}
|
|
if not isinstance(data[field[0]], dict):
|
|
raise ValueError("Поле не является группой.")
|
|
data[field[0]] = self.write(".".join(field[1:]), value, data[field[0]])
|
|
self.save()
|
|
if head:
|
|
self.__db = data
|
|
return data
|
|
|
|
def read(self, field: str, default: any = None, data=None) -> any:
|
|
"Read `field` from the database. `default` argument returned when where's no such a field."
|
|
if data is None:
|
|
data = self.__db
|
|
if field == "":
|
|
return data
|
|
field = field.split(".")
|
|
if field[0] not in data:
|
|
return default
|
|
if len(field) == 1:
|
|
return data[field[0]]
|
|
return self.read(".".join(field[1:]), default, data[field[0]])
|
|
|
|
def pop(self, field: str) -> any:
|
|
"Remove `field` from the database."
|
|
ret = self.read(field)
|
|
field = field.split(".")
|
|
val = self.read(".".join(field[:-1]))
|
|
if not isinstance(val, dict):
|
|
return None
|
|
if field[-1] not in val:
|
|
return None
|
|
val.pop(field[-1])
|
|
self.write(".".join(field[:-1]), val)
|
|
return ret
|