A lot of changes

* New config file
* Incapsulated db
* Update notifier
This commit is contained in:
Egor Guslyancev 2023-12-10 13:37:24 -03:00
parent 4118265d00
commit 74bc59df12
GPG Key ID: D7E709AA465A55F9
4 changed files with 231 additions and 195 deletions

5
.gitignore vendored
View File

@ -565,6 +565,5 @@ FodyWeavers.xsd
# Hide internal database and it's backups # Hide internal database and it's backups
*.db *.db
# Hide token file # Hide config file
token config.ini
token.devel

342
bot.py
View File

@ -3,90 +3,31 @@
# SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-License-Identifier: AGPL-3.0-or-later
import telebot import telebot
import re
import time import time
import datetime as dt import datetime as dt
import sys import sys
import signal import signal
import os import os
import subprocess import subprocess
import pickle as p import configparser
from os import path from os import path
from sys import stderr, stdout, stdin from sys import stderr, stdout, stdin
from threading import Thread from threading import Thread
filename = "config.ini"
config = configparser.ConfigParser()
config.read(filename)
db = {} # TODO more backends
from pickle_db import *
def save_db(dbfo = ".db"):
global db
try:
with open(dbfo, "wb") as fo:
p.dump(db, fo)
except:
stderr.write("Не удалось записать базу!")
sys.exit(1)
def load_db(dbfi = ".db"):
global db
if not path.exists(dbfi):
save_db()
return
try:
with open(dbfi, "rb") as fi:
db = p.load(fi)
except:
stderr.write("Не удалось прочитать базу, пробуем сохранить…")
save_db()
def write_db(field: str, value: any, data = None):
global db
head = False
if data == None:
data = db
head = True
field = field.split(".")
if len(field) == 1:
data[field[0]] = value
else:
if field[0] not in data or data[field[0]] is None:
data[field[0]] = {}
if type(data[field[0]]) is not dict:
raise ValueError("Поле не является группой.")
data[field[0]] = write_db('.'.join(field[1:]), value, data[field[0]])
save_db()
if head:
db = data
return data
def read_db(field: str, default = None, data = None) -> any:
global db
if data == None:
data = db
field = field.split(".")
if field[0] not in data:
return default
if len(field) == 1:
return data[field[0]]
else:
return read_db('.'.join(field[1:]), default, data[field[0]])
def pop_db(field: str):
global db
ret = read_db(field)
field = field.split(".")
val = read_db('.'.join(field[:-1]))
if type(val) is not dict:
return
val.pop(field[-1])
write_db('.'.join(field[:-1]), val)
return ret
load_db() load_db()
write_db("about.version", "v1.0rc4") __cur_version = "v1.0rc5"
__version = read_db("about.version", __cur_version)
write_db("about.updatedfrom", __version)
write_db("about.version", __cur_version)
write_db("about.author", "electromagneticcyclone") write_db("about.author", "electromagneticcyclone")
write_db("about.tester", "angelbeautifull") write_db("about.tester", "angelbeautifull")
write_db("about.source", "https://git.disroot.org/electromagneticcyclone/duty-board-dog")
if (read_db("about.host") is None) and __debug__: if (read_db("about.host") is None) and __debug__:
stdout.write("Введите username хоста: ") stdout.write("Введите username хоста: ")
stdout.flush() stdout.flush()
@ -95,15 +36,32 @@ if (read_db("about.host") is None) and __debug__:
try: try:
bot = None bot = None
filename = "token.devel" if __debug__ else "token" if __version != __cur_version:
with open(filename, encoding = "utf-8") as fi: import re
for i in fi: if path.exists("token"):
i = i.strip() with open("token", encoding = "utf-8") as fi:
pattern = re.compile("^[\\d]{10}:[\\w\\d\\-\\+\\*]{35}$") for i in fi:
matches = pattern.fullmatch(i) is not None i = i.strip()
if matches: pattern = re.compile("^[\\d]{10}:[\\w\\d\\-\\+\\*]{35}$")
bot = telebot.TeleBot(i, parse_mode = "MarkdownV2") matches = pattern.fullmatch(i) is not None
break if matches:
config['tokens']['prod'] = i
config.write(filename)
break
os.remove("token")
if path.exists("token.devel"):
with open("token.devel", encoding = "utf-8") as fi:
for i in fi:
i = i.strip()
pattern = re.compile("^[\\d]{10}:[\\w\\d\\-\\+\\*]{35}$")
matches = pattern.fullmatch(i) is not None
if matches:
config['tokens']['devel'] = i
config.write(filename)
break
os.remove("token.devel")
bot = telebot.TeleBot( config['tokens']['devel' if __debug__ else 'prod'],
parse_mode = "MarkdownV2")
if bot is None: if bot is None:
stderr.write("В файле нет токена\n") stderr.write("В файле нет токена\n")
raise Exception raise Exception
@ -111,6 +69,7 @@ except:
stderr.write("Ошибка чтения файла токена\n") stderr.write("Ошибка чтения файла токена\n")
sys.exit(1) sys.exit(1)
def get_time(forum: int): def get_time(forum: int):
return dt.datetime.now(dt.UTC) \ return dt.datetime.now(dt.UTC) \
+ dt.timedelta(hours = read_db(str(forum) + ".settings.timezone", 3)) + dt.timedelta(hours = read_db(str(forum) + ".settings.timezone", 3))
@ -251,6 +210,89 @@ def insert_user_in_current_order(forum: int, uid) -> bool:
write_db(str(forum) + ".rookies.order", list(order.keys())[1:]) write_db(str(forum) + ".rookies.order", list(order.keys())[1:])
return True return True
def parse_dates(forum: int, args):
dates = []
OK = True
for a in args:
if a.lower() == "сегодня":
dates.append(get_time(forum).date())
continue
if a.lower() == "завтра":
dates.append(get_time(forum).date() + dt.timedelta(days=1))
continue
if a.lower() == "послезавтра":
dates.append(get_time(forum).date() + dt.timedelta(days=2))
continue
if a.lower() == "вчера":
dates.append(get_time(forum).date() - dt.timedelta(days=1))
continue
if a.lower() == "позавчера":
dates.append(get_time(forum).date() - dt.timedelta(days=2))
continue
d = a.split('.')
a_dates = []
if len(d) in (2, 3):
try:
d = list(map(int, d))
except:
print("Ne ok")
OK = False
if OK:
cur_date = get_time(forum).date() - dt.timedelta(days=1)
cur_year = cur_date.year
if len(d) == 2:
years = [cur_year, cur_year + 1]
else:
years = [(cur_year // 100 * 100) + d[2] if (d[2] < 100) else d[2]]
for y in years:
try:
a_dates.append(dt.datetime(y, d[1], d[0]).date())
except:
pass
a_dates = sorted(filter(lambda x: cur_date + dt.timedelta(days=120) > x > cur_date, a_dates))
if len(a_dates) == 0:
OK = False
else:
OK = False
if OK:
dates.append(a_dates[0])
else:
return a
return dates
def mod_days(message, target, neighbour):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) == 0:
dates = [get_time(forum).date()]
else:
dates = parse_dates(forum, args)
if type(dates) is str:
bot.reply_to(chat, telebot.formatting.escape_markdown(dates)
+ " — это точно дата из ближайшего будущего?")
return
if dates is None:
bot.reply_to(chat, "Нечего добавлять")
return
t = read_db(target)
if t is None:
t = []
n = read_db(neighbour)
if n is None:
n = []
write_db(neighbour, list(filter(lambda x: x not in dates, n)))
write_db(target, list(sorted(set(t + dates))))
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat,
"Добавил " + telebot.formatting.escape_markdown(
", ".join(['.'.join(map(str, (d.day, d.month, d.year))) for d in dates])))
timeout = None timeout = None
caution = True caution = True
def antispam(message, chat, forum): def antispam(message, chat, forum):
@ -320,9 +362,26 @@ def help(message):
+ "/stop — остановить дежурства\n" + "/stop — остановить дежурства\n"
+ "/begin \\[@\\] — начать сначала с определённого студента") + "/begin \\[@\\] — начать сначала с определённого студента")
@bot.message_handler(commands=['exec']) if __debug__:
def exec_bot(message): def pretty(d, indent=0):
if __debug__: for key, value in d.items():
print(' ' * indent + str(key))
if isinstance(value, dict):
pretty(value, indent+1)
else:
print(' ' * (indent+1) + str(value))
@bot.message_handler(commands=['info'])
def info(message):
forum = message.chat.id
chat = get_chat(message, True)
if chat is not None:
bot.delete_message(forum, message.id)
if message.from_user.username == read_db("about.host"):
pretty(db)
@bot.message_handler(commands=['exec'])
def exec_bot(message):
forum = message.chat.id forum = message.chat.id
chat = get_chat(message, True) chat = get_chat(message, True)
if chat is not None: if chat is not None:
@ -332,24 +391,6 @@ def exec_bot(message):
else: else:
bot.delete_message(forum, message.id) bot.delete_message(forum, message.id)
def pretty(d, indent=0):
for key, value in d.items():
print(' ' * indent + str(key))
if isinstance(value, dict):
pretty(value, indent+1)
else:
print(' ' * (indent+1) + str(value))
@bot.message_handler(commands=['info'])
def info(message):
if __debug__:
forum = message.chat.id
chat = get_chat(message, True)
if chat is not None:
bot.delete_message(forum, message.id)
if message.from_user.username == read_db("about.host"):
pretty(db)
@bot.message_handler(commands=['fuck', 'fuck_you', 'fuck-you', 'fuckyou']) @bot.message_handler(commands=['fuck', 'fuck_you', 'fuck-you', 'fuckyou'])
def rude(message): def rude(message):
forum = message.chat.id forum = message.chat.id
@ -762,89 +803,6 @@ def calendar(message):
+ "*Рабочие:*\n" + telebot.formatting.escape_markdown( + "*Рабочие:*\n" + telebot.formatting.escape_markdown(
"\n".join(['.'.join(map(str, (d.day, d.month, d.year))) for d in work]))) "\n".join(['.'.join(map(str, (d.day, d.month, d.year))) for d in work])))
def parse_dates(forum: int, args):
dates = []
OK = True
for a in args:
if a.lower() == "сегодня":
dates.append(get_time(forum).date())
continue
if a.lower() == "завтра":
dates.append(get_time(forum).date() + dt.timedelta(days=1))
continue
if a.lower() == "послезавтра":
dates.append(get_time(forum).date() + dt.timedelta(days=2))
continue
if a.lower() == "вчера":
dates.append(get_time(forum).date() - dt.timedelta(days=1))
continue
if a.lower() == "позавчера":
dates.append(get_time(forum).date() - dt.timedelta(days=2))
continue
d = a.split('.')
a_dates = []
if len(d) in (2, 3):
try:
d = list(map(int, d))
except:
print("Ne ok")
OK = False
if OK:
cur_date = get_time(forum).date() - dt.timedelta(days=1)
cur_year = cur_date.year
if len(d) == 2:
years = [cur_year, cur_year + 1]
else:
years = [(cur_year // 100 * 100) + d[2] if (d[2] < 100) else d[2]]
for y in years:
try:
a_dates.append(dt.datetime(y, d[1], d[0]).date())
except:
pass
a_dates = sorted(filter(lambda x: cur_date + dt.timedelta(days=120) > x > cur_date, a_dates))
if len(a_dates) == 0:
OK = False
else:
OK = False
if OK:
dates.append(a_dates[0])
else:
return a
return dates
def mod_days(message, target, neighbour):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) == 0:
dates = [get_time(forum).date()]
else:
dates = parse_dates(forum, args)
if type(dates) is str:
bot.reply_to(chat, telebot.formatting.escape_markdown(dates)
+ " — это точно дата из ближайшего будущего?")
return
if dates is None:
bot.reply_to(chat, "Нечего добавлять")
return
t = read_db(target)
if t is None:
t = []
n = read_db(neighbour)
if n is None:
n = []
write_db(neighbour, list(filter(lambda x: x not in dates, n)))
write_db(target, list(sorted(set(t + dates))))
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat,
"Добавил " + telebot.formatting.escape_markdown(
", ".join(['.'.join(map(str, (d.day, d.month, d.year))) for d in dates])))
@bot.message_handler(commands=['skip']) @bot.message_handler(commands=['skip'])
def skip_days(message): def skip_days(message):
forum = message.chat.id forum = message.chat.id
@ -1169,13 +1127,6 @@ def set_timezone(message):
def get_hours(forum: int) -> list: def get_hours(forum: int) -> list:
utc_range = (8, 20) utc_range = (8, 20)
return utc_range return utc_range
# Obsoleted by `get_time`
# tz = read_db(str(forum) + ".settings.timezone", 3)
# utc_range = tuple(map(lambda x: (x - tz) % 24, utc_range))
# if utc_range[0] > utc_range[1]:
# return list(range(utc_range[0], 24)) + list(range(0, utc_range[1]))
# else:
# return list(range(utc_range[0], utc_range[1]))
def stack_update(forum: int, force_reset = False): def stack_update(forum: int, force_reset = False):
now = get_time(forum) now = get_time(forum)
@ -1261,6 +1212,17 @@ def update(forum: int):
write_db(str(forum) + ".schedule.last_notification_date", now_date) write_db(str(forum) + ".schedule.last_notification_date", now_date)
remind_users(forum) remind_users(forum)
def update_notify(forum: int):
bot.reply_to(get_chat(forum),
"Обновился до версии " + telebot.formatting.escape_markdown(__version))
if __version != __cur_version:
for i in db.keys():
try:
update_notify(int(i))
except ValueError:
pass
def process1(): def process1():
bot.infinity_polling(none_stop=True) bot.infinity_polling(none_stop=True)

View File

@ -2,5 +2,6 @@
# #
# SPDX-License-Identifier: Unlicense # SPDX-License-Identifier: Unlicense
# Enter your token in the `token` file [tokens]
0000000000:ooooooooooooooooooooooooooooooooooo # Enter your token in the `config.ini` file
prod = 0000000000:ooooooooooooooooooooooooooooooooooo

74
pickle_db.py Normal file
View File

@ -0,0 +1,74 @@
# SPDX-FileCopyrightText: 2023 Egor Guslyancev <electromagneticcyclone@disroot.org>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
import sys
import pickle as p
from os import path
from sys import stderr, stdout, stdin
db = {}
def save_db(dbfo = ".db"):
global db
try:
with open(dbfo, "wb") as fo:
p.dump(db, fo)
except:
stderr.write("Не удалось записать базу!")
sys.exit(1)
def load_db(dbfi = ".db"):
global db
if not path.exists(dbfi):
save_db()
return
try:
with open(dbfi, "rb") as fi:
db = p.load(fi)
except:
stderr.write("Не удалось прочитать базу, пробуем сохранить…")
save_db()
def write_db(field: str, value: any, data = None):
global db
head = False
if data == None:
data = db
head = True
field = field.split(".")
if len(field) == 1:
data[field[0]] = value
else:
if field[0] not in data or data[field[0]] is None:
data[field[0]] = {}
if type(data[field[0]]) is not dict:
raise ValueError("Поле не является группой.")
data[field[0]] = write_db('.'.join(field[1:]), value, data[field[0]])
save_db()
if head:
db = data
return data
def read_db(field: str, default = None, data = None) -> any:
global db
if data == None:
data = db
field = field.split(".")
if field[0] not in data:
return default
if len(field) == 1:
return data[field[0]]
else:
return read_db('.'.join(field[1:]), default, data[field[0]])
def pop_db(field: str):
global db
ret = read_db(field)
field = field.split(".")
val = read_db('.'.join(field[:-1]))
if type(val) is not dict:
return
val.pop(field[-1])
write_db('.'.join(field[:-1]), val)
return ret