Pylint (2)
This commit is contained in:
parent
7fe6115e61
commit
c9274de4bc
|
@ -0,0 +1,99 @@
|
|||
# SPDX-FileCopyrightText: 2023 Egor Guslyancev <electromagneticcyclone@disroot.org>
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
import pickle as p
|
||||
from os import path
|
||||
|
||||
|
||||
class PrototypeDB:
|
||||
"Prototype for database classes."
|
||||
|
||||
def __init__(self, dbfile: str):
|
||||
pass
|
||||
|
||||
def save(self, dbfo: str = None):
|
||||
"Export database to the file."
|
||||
|
||||
def load(self, dbfi: str = None):
|
||||
"Import database from the file."
|
||||
|
||||
def write(self, field: str, value: any, data=None):
|
||||
"Write `value` to the database's `field`. `data` argument is not used. Returns new db."
|
||||
|
||||
def read(self, field: str, default: any = None, data=None) -> any:
|
||||
"Read `field` from the database. `default` argument returned when where's no such a field."
|
||||
|
||||
def pop(self, field: str) -> any | None:
|
||||
"Remove `field` from the database."
|
||||
|
||||
|
||||
class PickleDB(PrototypeDB):
|
||||
"Database that uses pickle as a backend."
|
||||
__db = {}
|
||||
__dbfile = ""
|
||||
|
||||
def __init__(self, dbfile: str):
|
||||
self.__dbfile = dbfile
|
||||
self.load()
|
||||
|
||||
def save(self, dbfo: str = None):
|
||||
"Export database to the file."
|
||||
if dbfo is None:
|
||||
dbfo = self.__dbfile
|
||||
with open(dbfo, "wb") as fo:
|
||||
p.dump(self.__db, fo)
|
||||
|
||||
def load(self, dbfi: str = None):
|
||||
"Import database from the file."
|
||||
if dbfi is None:
|
||||
dbfi = self.__dbfile
|
||||
if not path.exists(dbfi):
|
||||
self.save()
|
||||
return
|
||||
with open(dbfi, "rb") as fi:
|
||||
self.__db = p.load(fi)
|
||||
|
||||
def write(self, field: str, value: any, data=None):
|
||||
"Write `value` to the database's `field`. `data` argument is not used. Returns new db."
|
||||
head = False
|
||||
if data is None:
|
||||
data = self.__db
|
||||
head = True
|
||||
field = field.split(".")
|
||||
if len(field) == 1:
|
||||
data[field[0]] = value
|
||||
else:
|
||||
if field[0] not in data or data[field[0]] is None:
|
||||
data[field[0]] = {}
|
||||
if not isinstance(type(data[field[0]]), dict):
|
||||
raise ValueError("Поле не является группой.")
|
||||
data[field[0]] = self.write(".".join(field[1:]), value, data[field[0]])
|
||||
self.save()
|
||||
if head:
|
||||
self.__db = data
|
||||
return data
|
||||
|
||||
def read(self, field: str, default: any = None, data=None) -> any:
|
||||
"Read `field` from the database. `default` argument returned when where's no such a field."
|
||||
if data is None:
|
||||
data = self.__db
|
||||
if field == "":
|
||||
return data
|
||||
field = field.split(".")
|
||||
if field[0] not in data:
|
||||
return default
|
||||
if len(field) == 1:
|
||||
return data[field[0]]
|
||||
return self.read(".".join(field[1:]), default, data[field[0]])
|
||||
|
||||
def pop(self, field: str) -> any:
|
||||
"Remove `field` from the database."
|
||||
ret = self.read(field)
|
||||
field = field.split(".")
|
||||
val = self.read(".".join(field[:-1]))
|
||||
if not isinstance(val, dict):
|
||||
return None
|
||||
val.pop(field[-1])
|
||||
self.write(".".join(field[:-1]), val)
|
||||
return ret
|
76
pickle_db.py
76
pickle_db.py
|
@ -1,76 +0,0 @@
|
|||
# SPDX-FileCopyrightText: 2023 Egor Guslyancev <electromagneticcyclone@disroot.org>
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
import sys
|
||||
import pickle as p
|
||||
from os import path
|
||||
from sys import stderr, stdout, stdin
|
||||
|
||||
db = {}
|
||||
|
||||
def save_db(dbfo = ".db"):
|
||||
global db
|
||||
try:
|
||||
with open(dbfo, "wb") as fo:
|
||||
p.dump(db, fo)
|
||||
except:
|
||||
stderr.write("Не удалось записать базу!")
|
||||
sys.exit(1)
|
||||
|
||||
def load_db(dbfi = ".db"):
|
||||
global db
|
||||
if not path.exists(dbfi):
|
||||
save_db()
|
||||
return
|
||||
try:
|
||||
with open(dbfi, "rb") as fi:
|
||||
db = p.load(fi)
|
||||
except:
|
||||
stderr.write("Не удалось прочитать базу, пробуем сохранить…")
|
||||
save_db()
|
||||
|
||||
def write_db(field: str, value: any, data = None):
|
||||
global db
|
||||
head = False
|
||||
if data == None:
|
||||
data = db
|
||||
head = True
|
||||
field = field.split(".")
|
||||
if len(field) == 1:
|
||||
data[field[0]] = value
|
||||
else:
|
||||
if field[0] not in data or data[field[0]] is None:
|
||||
data[field[0]] = {}
|
||||
if type(data[field[0]]) is not dict:
|
||||
raise ValueError("Поле не является группой.")
|
||||
data[field[0]] = write_db('.'.join(field[1:]), value, data[field[0]])
|
||||
save_db()
|
||||
if head:
|
||||
db = data
|
||||
return data
|
||||
|
||||
def read_db(field: str, default = None, data = None) -> any:
|
||||
global db
|
||||
if data == None:
|
||||
data = db
|
||||
if field == "":
|
||||
return data
|
||||
field = field.split(".")
|
||||
if field[0] not in data:
|
||||
return default
|
||||
if len(field) == 1:
|
||||
return data[field[0]]
|
||||
else:
|
||||
return read_db('.'.join(field[1:]), default, data[field[0]])
|
||||
|
||||
def pop_db(field: str):
|
||||
global db
|
||||
ret = read_db(field)
|
||||
field = field.split(".")
|
||||
val = read_db('.'.join(field[:-1]))
|
||||
if not isinstance(val, dict):
|
||||
return
|
||||
val.pop(field[-1])
|
||||
write_db('.'.join(field[:-1]), val)
|
||||
return ret
|
Loading…
Reference in New Issue