# SPDX-FileCopyrightText: 2023 Egor Guslyancev # # SPDX-License-Identifier: AGPL-3.0-or-later "Main bot module." import time import ast import datetime as dt import typing from sys import stderr, stdout, stdin from threading import Thread import telebot import config_reader as cr import db_classes import timeout as tmo # TODO more backends (redis at least) db = db_classes.PickleDB(".db") db.load() CURRENT_VERSION = "v1.0rc12" VERSION = db.read("about.version", CURRENT_VERSION) db.write("about.author", "electromagneticcyclone") db.write("about.tester", "angelbeautifull") db.write( "about.source", "https://git.disroot.org/electromagneticcyclone/duty-board-dog" ) if (db.read("about.host") is None) and __debug__: stdout.write("Введите username хоста: ") stdout.flush() db.write("about.host", stdin.readline()[:-1]) MODE = "devel" if __debug__ else "prod" bot = telebot.TeleBot(cr.read(f"tokens.{MODE}"), parse_mode="MarkdownV2") def get_time(forum: int) -> dt.datetime: "Get datetime.now in forum's timezone. Default timezone is UTC+3." return dt.datetime.now(dt.UTC) + dt.timedelta( hours=db.read(f"{forum}.settings.timezone", 3) ) def change_phase(forum: int, date: dt.datetime = None) -> None: "Changes forum's current phase." if date is None: date = get_time(forum).date() phase = db.read(f"{forum}.schedule.phase") phases = db.read(f"{forum}.schedule.phases") last_changed = db.read(f"{forum}.schedule.last_phase_changing_date") if None in (phase, last_changed, phases): return now = date weekday = now.weekday() if not (now > last_changed and weekday == 0): return phase = (phase + 1) % phases db.write(f"{forum}.schedule.phase", phase) db.write(f"{forum}.schedule.last_phase_changing_date", now) def get_status(forum: int, now: dt.datetime = None) -> bool: "Returns forum's duty status. Sundays are free." if now is None: now = get_time(forum).date() weekday = now.weekday() if weekday == 6: return False phase = db.read(f"{forum}.schedule.phase", 0) work_days = db.read(f"{forum}.schedule.work_days", []) skip_days = db.read(f"{forum}.schedule.skip_days", []) days = db.read(f"{forum}.schedule.days", ([0] * 6, [0] * 6)) is_active = db.read(f"{forum}.is_active", False) if is_active: if now in work_days: return True if now in skip_days: return 0 if days[phase][weekday]: return True return False def get_chat( message: telebot.types.Message | int, start: bool = False ) -> telebot.types.Message | None: "Returns forum's root message or user's message if chat is not a forum." if isinstance(message, int): is_forum = True forum = message else: is_forum = message.chat.is_forum forum = message.chat.id if (db.read(str(forum)) is None or not is_forum) and not start: return None chat = db.read(f"{forum}.settings.chat") if isinstance(message, int): return chat if (chat is None) or (chat.id == message.reply_to_message.id): return message.reply_to_message if is_forum else message return None def check_if_admin(message: telebot.types.Message) -> bool | None: "Checks if the message is sent by the forum's admin." forum = message.chat.id admin = db.read(f"{forum}.settings.admin") if admin is None: return True if admin["id"] is None: return admin["username"] == message.from_user.username return admin["id"] == message.from_user.id def mention(forum: int, uid: int) -> str | None: "Returns markdown formatted string with user's mention." uid = str(uid) if db.read(f"{forum}.people.{uid}") is None: stderr.write(f"Пользователя с ID {uid} нет в базе.\n") return None return ( f"[{db.read(f'{forum}.people.{uid}.name')} " + f"{db.read(f'{forum}.people.{uid}.surname')}](tg://user?id={uid})" ) def find_uids(forum: int, s: str) -> list | None: "Find user's id by nickname, name or surname." people = db.read(f"{forum}.people") if people is None: return None if len(s) > 0: if s[0] == "@": s = s[1:] f = list(filter(lambda x: s in people[x].values(), people.keys())) else: f = list(people.keys()) if len(f) == 0: return None return f def format_user_info(forum: int, uid: int) -> str: "Returns markdown formatted string with all user's info by their id." uid = str(uid) person = db.read(f"{forum}.people.{uid}") if person is None: return "" r = "" r += f"\\#{uid}\n" for k, i in person.items(): r += f"‖ {k} \\= {telebot.formatting.escape_markdown(i)}\n" return r def prepend_user(forum: int, ulist_s: str, uid: int) -> None: "Inserts user id at the start of provided db list in forum's context." uid = str(uid) ulist = db.read(f"{forum}.{ulist_s}", []) ulist = list(set([uid] + ulist)) db.write(f"{forum}.{ulist_s}", ulist) def append_user(forum: int, ulist_s: str, uid: int) -> None: "Inserts user id at the end of provided db list in forum's context." uid = str(uid) ulist = db.read(f"{forum}.{ulist_s}", []) ulist = list(set(ulist + [uid])) db.write(f"{forum}.{ulist_s}", ulist) def pop_user(forum: int, ulist_s: str) -> dict | None: "Removes user id from the start of provided db list in forum's context. Returns user id." ulist = db.read(f"{forum}.{ulist_s}", []) r = None if len(ulist) > 0: r = ulist.pop(0) db.write(f"{forum}.{ulist_s}", ulist) return r def insert_user_in_current_order(forum: int, uid: int) -> bool: "Inserts user id into current order list." uid = str(uid) order = db.read(f"{forum}.rookies.order", []) people = db.read(f"{forum}.people", {}) current = db.read(f"{forum}.rookies.current") if uid not in people: return False order = dict(map(lambda x: (x, people[x]), order)) if current is not None: order = dict( sorted( list(order.items()) + [(current, people[current])], key=lambda item: item[1]["surname"], ) ) order = dict( sorted( list(order.items()) + [uid, people[uid]], key=lambda item: item[1]["surname"], ) ) pos = list(order.keys()).index(current) if pos == 0: return False db.write(f"{forum}.rookies.order", list(order.keys())[1:]) return True def parse_dates(forum: int, args: typing.Iterable) -> list | str: """ Translates strings into dates in forum's context. Returns problematic string if it couldn't be parsed. """ dates = [] cur_date = get_time(forum).date() - dt.timedelta(days=1) cur_year = cur_date.year for a in args: human_relative = { "сегодня": get_time(forum).date(), "завтра": get_time(forum).date() + dt.timedelta(days=1), "послезавтра": get_time(forum).date() + dt.timedelta(days=2), "вчера": get_time(forum).date() - dt.timedelta(days=1), "позавчера": get_time(forum).date() - dt.timedelta(days=2), }.get(a.lower()) if human_relative is not None: dates.append(human_relative) d = a.split(".") a_dates = [] if len(d) in (2, 3): try: d = list(map(int, d)) except ValueError: return a if len(d) == 2: years = [cur_year, cur_year + 1] else: years = [(cur_year // 100 * 100) + d[2] if (d[2] < 100) else d[2]] for y in years: try: a_dates.append(dt.datetime(y, d[1], d[0]).date()) except ValueError: pass a_dates = sorted( filter( lambda x: cur_date + dt.timedelta(days=120) > x > cur_date, a_dates, ) ) if len(a_dates) == 0: return a else: return a dates.append(a_dates[0]) return dates def mod_days(message: telebot.types.Message, target: str, neighbour: str) -> None: "Helper function to add skip and work days." forum = message.chat.id chat = get_chat(message) if chat is not None: if antispam(message, chat, forum): return if check_if_admin(message): args = message.text.split()[1:] if len(args) == 0: dates = [get_time(forum).date()] else: dates = parse_dates(forum, args) if isinstance(dates, str): bot.reply_to( chat, telebot.formatting.escape_markdown(dates) + " — это точно дата из ближайшего будущего?", ) return if dates is None: bot.reply_to(chat, "Нечего добавлять") return t = db.read(target) if t is None: t = [] n = db.read(neighbour) if n is None: n = [] db.write(neighbour, list(filter(lambda x: x not in dates, n))) db.write(target, list(sorted(set(t + dates)))) if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to( chat, "Добавил " + telebot.formatting.escape_markdown( ", ".join( [".".join(map(str, (d.day, d.month, d.year))) for d in dates] ) ), ) antispam_tmo = tmo.Timeout(10 * 60) def antispam( message: telebot.types.Message, chat: telebot.types.Message, forum: int ) -> bool: "Removes frequent non admin's commands." if check_if_admin(message): return False antispam_tmo.period = db.read(f"{forum}.settings.antispam.period", 600) return antispam_tmo.check( lambda: bot.reply_to(chat, "*Хватит спамить\\!\\!\\!*"), lambda: bot.delete_message(forum, message.id), ) @bot.message_handler(commands=["start"]) def start_bot(message: telebot.types.Message): "Command to print kickstart info." forum = message.chat.id chat = get_chat(message, True) if chat is not None: if antispam(message, chat, forum): return if message.chat.is_forum: bot.reply_to( chat, "Привет\\! Я бот для управления дежурствами и напоминания о них\\." + " Напиши /link, чтобы привязать комнату\\.", ) else: bot.reply_to( chat, "Я работаю только на форумах \\(супергруппах с комнатами\\)\\." + " Пригласи меня в один из них и напиши /start", ) @bot.message_handler(commands=["help"]) def get_help(message: telebot.types.Message): "Command to print info about all of the available commands." forum = message.chat.id chat = get_chat(message) if chat is not None: if antispam(message, chat, forum): return if check_if_admin(message): if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to( chat, "" + "/admin \\[@\\] — переопределить *админа*\n" + "/link — привязать комнату\n" + "/unlink — отвязать комнату\n" + "/cleaner 0/1 — удалять команды\n" + "/new @ Имя Фамилия — добавить ~салагу~ студента\n" + "/readd \\# @ — если студент поменял ник, это нужно отметить\n" + "/remind — напомнить дежурным подежурить, а новичкам отметится\n" + "/forget — забыть всех новичков\n" + "/del @/\\#/Имя — убрать студента\n" + "/list \\[@/\\#/Имя\\] — получить информацию по имени/фамиили\n" + "/purge — удалить форум из базы *\\!\\!\\!ОПАСНО\\!\\!\\!*\n" + "/timezone ±n — установить часовой пояс по UTC\n" + "/days \\*\\*\\*\\*\\*\\* \\*\\*\\*\\*\\*\\*" + " \\(\\* \\= \\[\\0/1\\]\\) — задать дни недели \\(ПН\\-СБ\\)," + " когда необходимо дежурство, отдельно для числителей и знаменателей\n" + "/calendar \\[Числитель/знаменатель\\] — показать календарь дежурств\n" + "/phase \\[Числитель/знаменатель\\] — узнать или скорректировать" + " фазу текущей недели\n" + "/skip \\[00\\.00\\] — пропустить сегодняшний или заданный день\n" + "/work \\[00\\.00\\] — поработать в сегодняшнем или заданном дне\n" + "/honor \\[\\-\\]@ — пропуск следующего дежурства, так как студент молодец\n" + "/sick @ \\[дата\\] — пропуск дежурства по причине болезни\n" + "/force \\[\\-\\]@ — провинившийся дежурит как только так сразу\n" + "/order — посмотреть очередь дежурств\n" + "/stop — остановить дежурства\n" + "/begin \\[@\\] — начать сначала с определённого студента", ) if __debug__: def pretty(d, indent=0): "Print pretty dict." for key, value in d.items(): stderr.write(" " * indent + f"{key}\n") if isinstance(value, dict): pretty(value, indent + 1) else: stderr.write(" " * (indent + 1) + f"{value}\n") @bot.message_handler(commands=["info"]) def info(message: telebot.types.Message): "Command to print db." forum = message.chat.id chat = get_chat(message, True) if chat is not None: bot.delete_message(forum, message.id) if message.from_user.username == db.read("about.host"): pretty(db.read("")) @bot.message_handler(commands=["exec"]) def exec_bot(message: telebot.types.Message): "Command to eval python code." forum = message.chat.id chat = get_chat(message, True) if chat is not None: if message.from_user.username == db.read("about.host"): try: result = ast.literal_eval(" ".join(message.text.split(" ")[1:])) # Disabling W0718 because everything can be excepted with eval except Exception as e: # pylint: disable=broad-exception-caught bot.reply_to(chat, f"Ошибка выполнения: {e}") return bot.reply_to( chat, telebot.formatting.escape_markdown(str(result)), ) else: bot.delete_message(forum, message.id) @bot.message_handler(commands=["backup"]) def backup_db(message: telebot.types.Message): "Command to backup database." forum = message.chat.id chat = get_chat(message, True) if chat is not None: bot.delete_message(forum, message.id) if message.from_user.username == db.read("about.host"): args = message.text.split()[1:] if len(args) == 0: args.append("") db.save(f"{args[0]}.backup.db") @bot.message_handler(commands=["restore"]) def restore_db(message: telebot.types.Message): "Command to restore from backup" forum = message.chat.id chat = get_chat(message, True) if chat is not None: bot.delete_message(forum, message.id) if message.from_user.username == db.read("about.host"): args = message.text.split()[1:] if len(args) == 0: args.append("") db.load(f"{args[0]}.backup.db") @bot.message_handler(commands=["link"]) def link(message: telebot.types.Message): "Command to link forum's room." forum = message.chat.id chat = get_chat(message, True) if chat is not None: if antispam(message, chat, forum): return if check_if_admin(message): db.write(f"{forum}.settings.chat", message.reply_to_message) if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to( message.reply_to_message, 'Комната "' + f"{message.reply_to_message.forum_topic_created.name}" + '" привязана\\. ' + "Чек-лист того, что нужно написать в первую очередь:\n" + "/admin \\[@\\], чтобы задать админа\n" + "/help, чтобы узнать, что я умею\n" + "/tz, чтобы задать часовой пояс\n" + "/phase \\[Числитель/знаменатель\\], чтобы задать недельную фазу\n" + "/days \\*\\*\\*\\*\\*\\* \\*\\*\\*\\*\\*\\* \\(\\* \\= \\[\\0/1\\]\\)," + " чтобы задать дни дежурств\n" + "/new @ Имя Фамилия, чтобы добавить ~салаг~ студетов\n" + "/begin \\[@\\], чтобы начать дежурство", ) @bot.message_handler(commands=["unlink"]) def unlink(message: telebot.types.Message): "Command to unlink room." forum = message.chat.id chat = get_chat(message) if chat is not None: if antispam(message, chat, forum): return if check_if_admin(message): db.write(f"{forum}.settings.chat", None) if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to(chat, "Комната отвязана") @bot.message_handler(commands=["purge"]) def purge_db_people(message: telebot.types.Message): "Command to clear info about people from database." forum = message.chat.id chat = get_chat(message) if chat is not None: if antispam(message, chat, forum): return if check_if_admin(message): if message.text == "/purge ДА УДАЛЯЙ ДАВАЙ": db.write(str(forum), None) if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to(chat, "База студентов удалена") else: if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to( chat, "Чтобы подтвердить свои благие намерения, напиши\n" + "`/purge ДА УДАЛЯЙ ДАВАЙ`", ) @bot.message_handler(commands=["forget"]) def forget_db_pending(message: telebot.types.Message): "Command to forget unregistered newbies." forum = message.chat.id chat = get_chat(message) if chat is not None: if antispam(message, chat, forum): return if check_if_admin(message): empty = db.read(f"{forum}.pending") is None db.write(f"{forum}.pending", None) if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to( chat, "База новичков удалена" + (", хоть и была уже пустая…" if empty else ""), ) @bot.message_handler(commands=["admin"]) def set_admin(message: telebot.types.Message): "Command to register admin." forum = message.chat.id chat = get_chat(message) if chat is not None: if antispam(message, chat, forum): return if check_if_admin(message): admin = [i for i in message.text.split() if i[0] == "@"] if len(admin) == 0: uid = db.read(f"{forum}.settings.admin.id") if uid == message.from_user.id: bot.reply_to(chat, "Ты уже тут главный") return db.write(f"{forum}.settings.admin.id", message.from_user.id) db.write(f"{forum}.settings.admin.username", message.from_user.username) if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to( chat, f"Рад познакомиться, {message.from_user.first_name}\\!" ) else: admin = admin[0][1:] uadmin = db.read(f"{forum}.settings.admin.username") if uadmin == admin: bot.reply_to(chat, "Ты уже тут главный") return db.write(f"{forum}.settings.admin.id", None) db.write(f"{forum}.settings.admin.username", admin) if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to( chat, "Теперь @" + telebot.formatting.escape_markdown(admin) + " тут царь и бог\\!\n" + "Напиши /admin в чат, чтобы я знал тебя в лицо", ) @bot.message_handler(commands=["list"]) def list_users(message: telebot.types.Message): "Command to list users." forum = message.chat.id chat = get_chat(message) if chat is not None: if antispam(message, chat, forum): return if check_if_admin(message): s = message.text.split()[1:] if len(s) == 0: s = [""] r = [] for i in s: f = find_uids(forum, i) if f is not None: r += f if len(r) == 0: bot.reply_to(chat, "Никого не нашёл") return if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) res = [] for i in r: res.append(format_user_info(forum, i)) bot.reply_to(chat, "\n".join(res)) @bot.message_handler(commands=["remind"]) def remind_users(message: telebot.types.Message | int): "Command to remind about duty or about registration. Can be called automatically." auto, forum = ( (True, message) if isinstance(message, int) else (False, message.chat.id) ) chat = get_chat(message) if chat is not None: if not auto and antispam(message, chat, forum): return if auto or check_if_admin(message): pending = db.read(f"{forum}.pending", {}) r = "" if len(pending.keys()) != 0: for i in pending.keys(): r += f"@{telebot.formatting.escape_markdown(i)}\n" r += "нужно нажать /new\n\n" status = get_status(forum) if status: current = db.read(f"{forum}.rookies.current") if current is not None: rookie = db.read(f"{forum}.people.{current}") if rookie is not None: r += f"{mention(forum, current)} сегодня дежурит" if not auto and db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) if not auto and r == "": bot.reply_to(chat, "Дежурств сегодня нет") else: bot.reply_to(chat, r) @bot.message_handler(commands=["del"]) def del_person(message: telebot.types.Message): "Command to remove person from forum." forum = message.chat.id chat = get_chat(message) if chat is not None: if antispam(message, chat, forum): return if check_if_admin(message): args = message.text.split()[1:] if len(args) < 1: bot.reply_to(chat, "Давай хоть кого\\-нибудь удалим") return if len(args) > 1: bot.reply_to(chat, "Давай удалять их по очереди") return user = args[0] if user[1:] == bot.get_me().username: if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to(chat, "Ты сейчас быканул\\?") return f = find_uids(forum, user) if f is None: bot.reply_to(chat, "Никого не нашёл") return if len(f) > 1: bot.reply_to(chat, "Немогу определиться…") list_users(message) bot.reply_to(chat, "Конкретезируй плиз") return if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to(chat, f"Скатертью дорога, {mention(forum, f[0])}\\!") db.pop(f"{forum}.people.{f[0]}") @bot.message_handler(commands=["cleaner"]) def cleaner(message: telebot.types.Message): "Command to set commands cleaner status." forum = message.chat.id chat = get_chat(message) if chat is not None: if antispam(message, chat, forum): return if check_if_admin(message): args = message.text.split()[1:] if len(args) != 1: bot.reply_to(chat, "Нужно указать `0` или `1`") return if args[0] not in ("0", "1"): bot.reply_to(chat, "Нужно указать `0` или `1`") return state = args[0] == "1" db.write(f"{forum}.settings.delete_messages", state) if state: bot.delete_message(forum, message.id) bot.reply_to( chat, "Очистка команд " + ("включена" if state else "отключена") + ( "\nЧтобы чистка работала, мне нужно дать право удалять сообщения" if state else "" ), ) @bot.message_handler(commands=["new"]) def add_new(message: telebot.types.Message): "Command to add new person to forum." forum = message.chat.id chat = get_chat(message) if chat is not None: args = message.text.split()[1:] if len(args) == 0: pending = db.read(f"{forum}.pending") if pending is None: antispam(message, chat, forum) return user = message.from_user.username if user not in pending.keys(): antispam(message, chat, forum) return uid = str(message.from_user.id) db.write(f"{forum}.people.{uid}.username", user) db.write(f"{forum}.people.{uid}.name", pending[user]["name"]) db.write(f"{forum}.people.{uid}.surname", pending[user]["surname"]) if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to(chat, f"Рад познакомиться, {mention(forum, uid)}\\!") db.pop(f"{forum}.pending.{user}") elif check_if_admin(message): if len(args) != 3: bot.reply_to( chat, "Нужно указать @, имя и фамилию\nНе больше, не меньше" ) return user = args[0] if user[0] != "@": bot.reply_to(chat, "Пользователей нужно помянуть через `@`") return user = user[1:] if user == bot.get_me().username: if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to(chat, "Я тебе не салага\\!") return people = db.read(f"{forum}.people", []) pending = db.read(f"{forum}.pending", {}) if any( [people[i]["username"] == user for i in people] + [user in pending.keys()] ): bot.reply_to(chat, "Пользователь с таким ником уже в базе") return name = args[1] surname = args[2] db.write(f"{forum}.pending.{user}.name", name) db.write(f"{forum}.pending.{user}.surname", surname) if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to( chat, "Добро пожаловать, @" + telebot.formatting.escape_markdown(user) + f" \\({name} {surname}\\)\\!\n" + "Первое твоё обязательство — написать /new", ) @bot.message_handler(commands=["readd"]) def readd(message: telebot.types.Message): "Command to change person's nickname." forum = message.chat.id chat = get_chat(message) if chat is not None: if antispam(message, chat, forum): return if check_if_admin(message): args = message.text.split()[1:] if len(args) != 2: bot.reply_to(chat, "Нужно указать id и новый ник\nНе больше, не меньше") return uid = args[0] try: int(uid) except ValueError: bot.reply_to( chat, "ID — это число\nНа буквах пока считать не научились" ) return if db.read(f"{forum}.people.{uid}") is None: bot.reply_to(chat, "Такого пользователя нет в базе") return user = args[1] if user[0] != "@": bot.reply_to(chat, "Пользователей нужно помянуть через `@`") return user = user[1:] if user == bot.get_me().username: bot.reply_to(chat, "Я тебе не салага\\!") return db.write(f"{forum}.people.{uid}.username", user) if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to( chat, f"@{telebot.formatting.escape_markdown(user)}" + " от меня не скроется\\!", ) @bot.message_handler(commands=["days"]) def set_days(message: telebot.types.Message): "Command to define duty days." forum = message.chat.id chat = get_chat(message) if chat is not None: if antispam(message, chat, forum): return if check_if_admin(message): args = message.text.split()[1:] if len(args) != 2: bot.reply_to(chat, "Мне нужно два двоичных числа") return num1 = args[0] num2 = args[1] if len(num1) != 6: bot.reply_to(chat, "В первом числе не 6 цифр") return if len(num2) != 6: bot.reply_to(chat, "Во втором числе не 6 цифр") return if not all(i in ("0", "1") for i in num1): bot.reply_to(chat, "Цифры двоичные должны быть в первом") return if not all(i in ("0", "1") for i in num2): bot.reply_to(chat, "Цифры двоичные должны быть во втором") return db.write( f"{forum}.schedule.days", ( [num1[i] == "1" for i in range(6)], [num2[i] == "1" for i in range(6)], ), ) message.text = "" calendar(message) @bot.message_handler(commands=["calendar"]) def calendar(message: telebot.types.Message): "Command to display duty calendar." forum = message.chat.id chat = get_chat(message) if chat is not None: if antispam(message, chat, forum): return args = message.text.split()[1:] if len(args) > 1: bot.reply_to(chat, "Многа букав") return en1, en2 = False, False if len(args) == 1: en1 = args[0].lower() in "числитель" en2 = args[0].lower() in "знаменатель" if en1 == en2 == False: en1, en2 = True, True days = db.read(f"{forum}.schedule.days", ([False] * 6, [False] * 6)) skip = db.read(f"{forum}.schedule.skip_days", []) work = db.read(f"{forum}.schedule.work_days", []) if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to( chat, "`\\|#\\|Пн \\|Вт \\|Ср \\|Чт \\|Пт \\|Сб \\|\n" + ( " Ч" + "".join([" " + ("X" if days[0][i] else " ") + " " for i in range(6)]) + "\n" if en1 else "" ) + ( " З" + "".join([" " + ("X" if days[1][i] else " ") + " " for i in range(6)]) if en2 else "" ) + "`" + "\n\n" + "*Пропуски:*\n" + telebot.formatting.escape_markdown( "\n".join([".".join(map(str, (d.day, d.month, d.year))) for d in skip]) ) + "\n\n" + "*Рабочие:*\n" + telebot.formatting.escape_markdown( "\n".join([".".join(map(str, (d.day, d.month, d.year))) for d in work]) ), ) @bot.message_handler(commands=["skip"]) def set_skip_days(message: telebot.types.Message): "Command to set skipped days." forum = message.chat.id mod_days(message, f"{forum}.schedule.skip_days", f"{forum}.schedule.work_days") @bot.message_handler(commands=["work"]) def set_work_days(message: telebot.types.Message): "Command to set work days." forum = message.chat.id mod_days(message, f"{forum}.schedule.work_days", f"{forum}.schedule.skip_days") @bot.message_handler(commands=["stop"]) def stop_queue(message: telebot.types.Message): "Command to stop duty." forum = message.chat.id chat = get_chat(message) if chat is not None: if antispam(message, chat, forum): return if check_if_admin(message): status = db.read(f"{forum}.is_active") if status is None or not status: if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to(chat, "Держурство уже не идёт") return status = False db.write(f"{forum}.is_active", status) if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to(chat, "Держурство остановлено") @bot.message_handler(commands=["honor"]) def add_honor(message: telebot.types.Message): "Command to add honored person." forum = message.chat.id chat = get_chat(message) if chat is not None: if antispam(message, chat, forum): return if check_if_admin(message): args = message.text.split()[1:] if len(args) != 1: bot.reply_to(chat, "Нужно указать человека") return user = args[0] if user[1:] == bot.get_me().username: bot.reply_to(chat, "Рад, что всегда в почёте") return f = find_uids(forum, user) if f is None: bot.reply_to(chat, "Никого не нашёл") return if len(f) > 1: bot.reply_to(chat, "Немогу определиться…") list_users(message) bot.reply_to(chat, "Конкретезируй плиз") return user = f[0] force = db.read(f"{forum}.rookies.force_order", []) honor = db.read(f"{forum}.rookies.honor_order", []) people = db.read(f"{forum}.people") honor.append(user) for i in people: if (i in force) and (i in honor): force.remove(i) honor.remove(i) db.write(f"{forum}.rookies.force_order", force) db.write(f"{forum}.rookies.honor_order", honor) if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to( chat, "Вы в почёте, " + telebot.formatting.escape_markdown( f"{people[user]['name']} {people[user]['surname']}" ), ) @bot.message_handler(commands=["force"]) def add_force(message: telebot.types.Message): "Command to add guilty person." forum = message.chat.id chat = get_chat(message) if chat is not None: if antispam(message, chat, forum): return if check_if_admin(message): args = message.text.split()[1:] if len(args) != 1: bot.reply_to(chat, "Нужно указать человека") return user = args[0] if user[1:] == bot.get_me().username: bot.reply_to(chat, "Рад, что всегда в почёте") return f = find_uids(forum, user) if f is None: bot.reply_to(chat, "Никого не нашёл") return if len(f) > 1: bot.reply_to(chat, "Немогу определиться…") list_users(message) bot.reply_to(chat, "Конкретезируй плиз") return user = f[0] force = db.read(f"{forum}.rookies.force_order", []) honor = db.read(f"{forum}.rookies.honor_order", []) people = db.read(f"{forum}.people") force.append(user) for i in people: if (i in force) and (i in honor): force.remove(i) honor.remove(i) db.write(f"{forum}.rookies.force_order", force) db.write(f"{forum}.rookies.honor_order", honor) if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to( chat, "Отрабатывай, " + telebot.formatting.escape_markdown( f"{people[user]['name']} {people[user]['surname']}" ), ) @bot.message_handler(commands=["sick"]) def add_sick(message: telebot.types.Message): "Command to add sick person." forum = message.chat.id chat = get_chat(message) if chat is not None: if antispam(message, chat, forum): return if check_if_admin(message): args = message.text.split()[1:] if len(args) not in (1, 2): bot.reply_to(chat, "Нужно указать человека и дату выздоровления") return user = args[0] if user[1:] == bot.get_me().username: if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to(chat, "Ты сейчас быканул\\?") return f = find_uids(forum, user) if f is None: bot.reply_to(chat, "Никого не нашёл") return if len(f) > 1: bot.reply_to(chat, "Немогу определиться…") list_users(message) bot.reply_to(chat, "Конкретезируй плиз") return user = f[0] if len(args) > 1: dates = parse_dates(forum, args[1:]) if isinstance(dates, str): bot.reply_to( chat, telebot.formatting.escape_markdown(dates) + " — это точно дата из ближайшего будущего?", ) return if dates is None: bot.reply_to(chat, "Нечего добавлять") return date = dates[0] else: date = get_time(forum).date() + dt.timedelta(days=30) sicks = db.read(f"{forum}.rookies.sick_order", {}) people = db.read(f"{forum}.people") sicks[user] = (date, False) db.write(f"{forum}.rookies.sick_order", sicks) if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to( chat, telebot.formatting.escape_markdown( f"{people[user]['name']} {people[user]['surname']}" ) + " болеет до " + telebot.formatting.escape_markdown( ".".join(map(str, (date.day, date.month, date.year))) ), ) @bot.message_handler(commands=["order"]) def view_order(message: telebot.types.Message): "Command to display duty order, honored, guilty and sick people." forum = message.chat.id chat = get_chat(message) if chat is not None: if antispam(message, chat, forum): return args = message.text.split()[1:] if len(args) > 0: bot.reply_to(chat, "Многа букав") return order = db.read(f"{forum}.rookies.order", []) force = db.read(f"{forum}.rookies.force_order", []) honor = db.read(f"{forum}.rookies.honor_order", []) sicks = db.read(f"{forum}.rookies.sick_order", {}) people = db.read(f"{forum}.people") if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to( chat, "*Очередь:*\n" + telebot.formatting.escape_markdown( "\n".join( [f"{people[u]['name']} {people[u]['surname']}" for u in order] ) ) + "\n\n" + "*Почитаемые:*\n" + telebot.formatting.escape_markdown( "\n".join( [f"{people[u]['name']} {people[u]['surname']}" for u in honor] ) ) + "\n" + "*Виноватые:*\n" + telebot.formatting.escape_markdown( "\n".join( [f"{people[u]['name']} {people[u]['surname']}" for u in force] ) ) + "\n" + "*Больные:*\n" + telebot.formatting.escape_markdown( "\n".join( [ f"{people[u]['name']} {people[u]['surname']} до " + ".".join( map( str, (sicks[u][0].day, sicks[u][0].month, sicks[u][0].year), ) ) for u in sicks ] ) ), ) @bot.message_handler(commands=["begin"]) def begin_queue(message: telebot.types.Message): "Command to begin queue." forum = message.chat.id chat = get_chat(message) if chat is not None: if antispam(message, chat, forum): return if check_if_admin(message): args = message.text.split()[1:] if len(args) > 1: bot.reply_to( chat, "Нужно указать человека, с которого начнётся дежурство" ) return if len(args) == 1: user = args[0] if user[1:] == bot.get_me().username: if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to(chat, "Ты сейчас быканул\\?") return f = find_uids(forum, user) if f is None: bot.reply_to(chat, "Никого не нашёл") return if len(f) > 1: bot.reply_to(chat, "Немогу определиться…") list_users(message) bot.reply_to(chat, "Конкретезируй плиз") return start_with = f[0] else: start_with = None phase = db.read(f"{forum}.schedule.phase") if phase is None: bot.reply_to(chat, "Задай фазу с помощью /phase") return status = db.read(f"{forum}.is_active") if status: if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to(chat, "Держурство уже идёт") return status = True db.write(f"{forum}.is_active", status) stack_update(forum, start_with) now_date = get_time(forum).date() start_with = db.read(f"{forum}.rookies.order", [None])[0] if start_with is None: bot.reply_to(chat, "Людей нет") return db.write(f"{forum}.schedule.last_stack_update_date", now_date) db.write(f"{forum}.schedule.last_notification_date", now_date) if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to( chat, "Держурство начнёт " + f"{db.read(f'{forum}.people.{start_with}.name')} " + f"{db.read(f'{forum}.people.{start_with}.surname')}", ) @bot.message_handler(commands=["phase"]) def set_phase(message: telebot.types.Message): "Command to set start phase." forum = message.chat.id chat = get_chat(message) if chat is not None: if antispam(message, chat, forum): return if check_if_admin(message): args = message.text.split()[1:] if len(args) > 1: bot.reply_to(chat, "Многа букав") return en2 = False if len(args) == 1: en1 = args[0].lower() in "числитель" en2 = args[0].lower() in "знаменатель" if en1 == en2: bot.reply_to(chat, "Это числитель или знаменатель?") return now = get_time(forum).date() phase = int(bool(en2)) phases = 2 db.write(f"{forum}.schedule.phase", phase) db.write(f"{forum}.schedule.phases", phases) db.write(f"{forum}.schedule.last_phase_changing_date", now) phase = db.read(f"{forum}.schedule.phase", 0) phases = db.read(f"{forum}.schedule.phase", 2) if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to( chat, "Текущая неделя: " + ("знаменатель" if phase else "числитель") ) @bot.message_handler(commands=["timezone", "tz"]) def set_timezone(message: telebot.types.Message): "Command to set forum's timezone." forum = message.chat.id chat = get_chat(message) if chat is not None: if antispam(message, chat, forum): return if check_if_admin(message): args = message.text.split()[1:] if len(args) > 1: bot.reply_to(chat, "Многа букав") return if len(args) == 1: try: args[0] = args[0].lower() if "utc" in args[0]: args[0] = args[0][3:] tz = int(args[0]) if tz not in range(-12, 14 + 1): raise ValueError except ValueError: bot.reply_to( chat, "Нужно указать [смещение по UTC](https:" + "//ru.wikipedia.org/wiki/%D0%92%D1%81%D" + "0%B5%D0%BC%D0%B8%D1%80%D0%BD%D0%BE%D0%" + "B5_%D0%BA%D0%BE%D0%BE%D1%80%D0%B4%D0%B" + "8%D0%BD%D0%B8%D1%80%D0%BE%D0%B2%D0%B0%" + "D0%BD%D0%BD%D0%BE%D0%B5_%D0%B2%D1%80%D" + "0%B5%D0%BC%D1%8F)", ) return db.write(f"{forum}.settings.timezone", tz) tz = db.read(f"{forum}.settings.timezone", 3) if db.read(f"{forum}.settings.delete_messages"): bot.delete_message(forum, message.id) bot.reply_to( chat, "Часовой пояс: UTC" + ("\\+" if tz >= 0 else "") + telebot.formatting.escape_markdown(str(tz)), ) def get_hours() -> tuple: "Returns forum's work hours." # TODO command to set range return tuple(range(8, 20)) def stack_update(forum: int, force_reset: bool = False) -> None: "Updates forum's stacks." now = get_time(forum) now_date = now.date() order = db.read(f"{forum}.rookies.order", []) force = db.read(f"{forum}.rookies.force_order", []) honor = db.read(f"{forum}.rookies.honor_order", []) sicks = db.read(f"{forum}.rookies.sick_order", {}) people = db.read(f"{forum}.people", {}) to_pop = [] for i in sicks: if now_date >= sicks[i][0]: if sicks[i][1]: prepend_user(forum, ".rookies.force_order", i) to_pop.append(i) for i in to_pop: sicks.pop(i) db.write(f"{forum}.rookies.sick_order", sicks) for i in people: if (i in force) and (i in honor): force.remove(i) honor.remove(i) db.write(f"{forum}.rookies.force_order", force) db.write(f"{forum}.rookies.honor_order", honor) for i in order: if i not in people.keys(): order.remove(i) if len(order) == 0 or force_reset is not False: order = list( dict(sorted(people.items(), key=lambda item: item[1]["surname"])).keys() ) if force_reset is not False: db.write(f"{forum}.rookies.force_order", []) db.write(f"{forum}.rookies.honor_order", []) db.write(f"{forum}.rookies.sick_order", {}) if force_reset is not None: try: order = order[order.index(force_reset) :] except ValueError: pass db.write(f"{forum}.rookies.order", order) if len(order) == 0: return if force_reset is False: if len(force) > 0: db.write(f"{forum}.rookies.current", pop_user(forum, "rookies.force_order")) else: current = pop_user(forum, "rookies.order") if current in honor: honor.remove(current) db.write(f"{forum}.rookies.honor_order", honor) stack_update(forum) elif any((sicks[i][1] is False) and (i == current) for i in sicks): skipped = list(sicks[current]) skipped[1] = True sicks[current] = tuple(skipped) db.write(f"{forum}.rookies.sick_order", sicks) stack_update(forum) else: db.write(f"{forum}.rookies.current", pop_user(forum, "rookies.order")) def clean_old_dates(date: dt.datetime, array: str) -> None: "Removes dates from db's array which are older than `date`." a = db.read(array) a = list(filter(lambda x: x >= date, a)) db.write(array, a) def update(forum: int) -> None: """ Updates forum's state: · Cleans old skip/work dates. · Updates order stack. · Notifies about upcoming duty. """ now = get_time(forum) now_date = now.date() now_time = now.time() last_notif = db.read(f"{forum}.schedule.last_notification_date") last_upd_stack = db.read(f"{forum}.schedule.last_stack_update_date") is_active = get_status(forum, now_date) hours_range = get_hours() change_phase(forum, now_date) clean_old_dates(now_date, f"{forum}.schedule.work_days") clean_old_dates(now_date, f"{forum}.schedule.skip_days") if is_active and (last_upd_stack is None or now_date > last_upd_stack): db.write(f"{forum}.schedule.last_stack_update_date", now_date) stack_update(forum) if now_time.hour in hours_range: if last_notif is None or now_date > last_notif: stdout.write("Notified\n") db.write(f"{forum}.schedule.last_notification_date", now_date) remind_users(forum) if db.read("about.updatedfrom") != db.read("about.version"): db.write("about.updatenotified", True) update_notify(forum) def update_notify(forum: int) -> None: "Notifies the forum about bot's new version." bot.reply_to( get_chat(forum), # f"Обновился до версии {telebot.formatting.escape_markdown(CURRENT_VERSION)}", f"Обновился до версии {telebot.formatting.escape_markdown(CURRENT_VERSION)}\n" \ + "Дежурик поздравляет всех бета-тестеров С ноВЫыыыыыМ Г0Йда и напоминает о том," \ + " что на каникулах и сессии дежурства продолжаются", ) def process1(): "The process runs telegram infinite polling." bot.infinity_polling(none_stop=True) p2_tmo = tmo.Timeout(120) def process2(): "The process updates duty order for every forum once a `period` seconds." def p2(): "Helper function." stdout.write("Process 2 update\n") if db.read("about.updatenotified", True): db.write("about.updatedfrom", db.read("about.version")) for f in db.read("").keys(): try: update(int(f)) except ValueError: pass p2_tmo.period = int(cr.read("settings.notify_period")) while True: p2_tmo.check(p2, lambda: None) funcs = [process1, process2] threads = map(lambda x: Thread(target=x), funcs) for thread in threads: thread.daemon = True thread.start() db.write("about.updatenotified", False) db.write("about.updatedfrom", VERSION) db.write("about.version", CURRENT_VERSION) while True: time.sleep(1)