# SPDX-FileCopyrightText: 2023 Egor Guslyancev # # SPDX-License-Identifier: AGPL-3.0-or-later "The module provides DB backends." import pickle as p from os import path class PrototypeDB: "Prototype for database classes." def __init__(self, dbfile: str): pass def save(self, dbfo: str = None): "Export database to the file." raise AssertionError("Unimplemented method") def load(self, dbfi: str = None): "Import database from the file." raise AssertionError("Unimplemented method") def write(self, field: str, value: any, data=None): "Write `value` to the database's `field`. `data` argument is not used. Returns new db." raise AssertionError("Unimplemented method") def read(self, field: str, default: any = None, data=None) -> any: "Read `field` from the database. `default` argument returned when where's no such a field." raise AssertionError("Unimplemented method") def pop(self, field: str) -> any: "Remove `field` from the database." raise AssertionError("Unimplemented method") class PickleDB(PrototypeDB): "Database that uses pickle as a backend." __db = {} __dbfile = "" def __init__(self, dbfile: str): self.__dbfile = dbfile self.load() def save(self, dbfo: str = None): "Export database to the file." if dbfo is None: dbfo = self.__dbfile with open(dbfo, "wb") as fo: p.dump(self.__db, fo) def load(self, dbfi: str = None): "Import database from the file." if dbfi is None: dbfi = self.__dbfile if not path.exists(dbfi): self.save() return with open(dbfi, "rb") as fi: self.__db = p.load(fi) def write(self, field: str, value: any, data=None): "Write `value` to the database's `field`. `data` argument is not used. Returns new db." head = False if data is None: data = self.__db head = True field = field.split(".") if len(field) == 1: data[field[0]] = value else: if (field[0] not in data) or (data[field[0]] is None): data[field[0]] = {} if not isinstance(data[field[0]], dict): raise ValueError("Поле не является группой.") data[field[0]] = self.write(".".join(field[1:]), value, data[field[0]]) self.save() if head: self.__db = data return data def read(self, field: str, default: any = None, data=None) -> any: "Read `field` from the database. `default` argument returned when where's no such a field." if data is None: data = self.__db if field == "": return data field = field.split(".") if field[0] not in data: return default if len(field) == 1: return data[field[0]] return self.read(".".join(field[1:]), default, data[field[0]]) def pop(self, field: str) -> any: "Remove `field` from the database." ret = self.read(field) field = field.split(".") val = self.read(".".join(field[:-1])) if not isinstance(val, dict): return None val.pop(field[-1]) self.write(".".join(field[:-1]), val) return ret