duty-board-dog/bot.py

1463 lines
56 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# SPDX-FileCopyrightText: 2023 Egor Guslyancev <electromagneticcyclone@disroot.org>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
"Main bot module."
import time
import ast
import datetime as dt
import typing
from sys import stderr, stdout, stdin
from threading import Thread
import telebot
import config_reader as cr
import db_classes
import timeout as tmo
# TODO more backends (redis at least)
db = db_classes.PickleDB(".db")
db.load()
CURRENT_VERSION = "v1.0rc12"
VERSION = db.read("about.version", CURRENT_VERSION)
db.write("about.author", "electromagneticcyclone")
db.write("about.tester", "angelbeautifull")
db.write(
"about.source", "https://git.disroot.org/electromagneticcyclone/duty-board-dog"
)
if (db.read("about.host") is None) and __debug__:
stdout.write("Введите username хоста: ")
stdout.flush()
db.write("about.host", stdin.readline()[:-1])
MODE = "devel" if __debug__ else "prod"
bot = telebot.TeleBot(cr.read(f"tokens.{MODE}"), parse_mode="MarkdownV2")
def get_time(forum: int) -> dt.datetime:
"Get datetime.now in forum's timezone. Default timezone is UTC+3."
return dt.datetime.now(dt.UTC) + dt.timedelta(
hours=db.read(f"{forum}.settings.timezone", 3)
)
def change_phase(forum: int, date: dt.datetime = None) -> None:
"Changes forum's current phase."
if date is None:
date = get_time(forum).date()
phase = db.read(f"{forum}.schedule.phase")
phases = db.read(f"{forum}.schedule.phases")
last_changed = db.read(f"{forum}.schedule.last_phase_changing_date")
if None in (phase, last_changed, phases):
return
now = date
weekday = now.weekday()
if not (now > last_changed and weekday == 0):
return
phase = (phase + 1) % phases
db.write(f"{forum}.schedule.phase", phase)
db.write(f"{forum}.schedule.last_phase_changing_date", now)
def get_status(forum: int, now: dt.datetime = None) -> bool:
"Returns forum's duty status. Sundays are free."
if now is None:
now = get_time(forum).date()
weekday = now.weekday()
if weekday == 6:
return False
phase = db.read(f"{forum}.schedule.phase", 0)
work_days = db.read(f"{forum}.schedule.work_days", [])
skip_days = db.read(f"{forum}.schedule.skip_days", [])
days = db.read(f"{forum}.schedule.days", ([0] * 6, [0] * 6))
is_active = db.read(f"{forum}.is_active", False)
if is_active:
if now in work_days:
return True
if now in skip_days:
return 0
if days[phase][weekday]:
return True
return False
def get_chat(
message: telebot.types.Message | int, start: bool = False
) -> telebot.types.Message | None:
"Returns forum's root message or user's message if chat is not a forum."
if isinstance(message, int):
is_forum = True
forum = message
else:
is_forum = message.chat.is_forum
forum = message.chat.id
if (db.read(str(forum)) is None or not is_forum) and not start:
return None
chat = db.read(f"{forum}.settings.chat")
if isinstance(message, int):
return chat
if (chat is None) or (chat.id == message.reply_to_message.id):
return message.reply_to_message if is_forum else message
return None
def check_if_admin(message: telebot.types.Message) -> bool | None:
"Checks if the message is sent by the forum's admin."
forum = message.chat.id
admin = db.read(f"{forum}.settings.admin")
if admin is None:
return True
if admin["id"] is None:
return admin["username"] == message.from_user.username
return admin["id"] == message.from_user.id
def mention(forum: int, uid: int) -> str | None:
"Returns markdown formatted string with user's mention."
uid = str(uid)
if db.read(f"{forum}.people.{uid}") is None:
stderr.write(f"Пользователя с ID {uid} нет в базе.\n")
return None
return (
f"[{db.read(f'{forum}.people.{uid}.name')} "
+ f"{db.read(f'{forum}.people.{uid}.surname')}](tg://user?id={uid})"
)
def find_uids(forum: int, s: str) -> list | None:
"Find user's id by nickname, name or surname."
people = db.read(f"{forum}.people")
if people is None:
return None
if len(s) > 0:
if s[0] == "@":
s = s[1:]
f = list(filter(lambda x: s in people[x].values(), people.keys()))
else:
f = list(people.keys())
if len(f) == 0:
return None
return f
def format_user_info(forum: int, uid: int) -> str:
"Returns markdown formatted string with all user's info by their id."
uid = str(uid)
person = db.read(f"{forum}.people.{uid}")
if person is None:
return ""
r = ""
r += f"\\#{uid}\n"
for k, i in person.items():
r += f"{k} \\= {telebot.formatting.escape_markdown(i)}\n"
return r
def prepend_user(forum: int, ulist_s: str, uid: int) -> None:
"Inserts user id at the start of provided db list in forum's context."
uid = str(uid)
ulist = db.read(f"{forum}.{ulist_s}", [])
ulist = list(set([uid] + ulist))
db.write(f"{forum}.{ulist_s}", ulist)
def append_user(forum: int, ulist_s: str, uid: int) -> None:
"Inserts user id at the end of provided db list in forum's context."
uid = str(uid)
ulist = db.read(f"{forum}.{ulist_s}", [])
ulist = list(set(ulist + [uid]))
db.write(f"{forum}.{ulist_s}", ulist)
def pop_user(forum: int, ulist_s: str) -> dict | None:
"Removes user id from the start of provided db list in forum's context. Returns user id."
ulist = db.read(f"{forum}.{ulist_s}", [])
r = None
if len(ulist) > 0:
r = ulist.pop(0)
db.write(f"{forum}.{ulist_s}", ulist)
return r
def insert_user_in_current_order(forum: int, uid: int) -> bool:
"Inserts user id into current order list."
uid = str(uid)
order = db.read(f"{forum}.rookies.order", [])
people = db.read(f"{forum}.people", {})
current = db.read(f"{forum}.rookies.current")
if uid not in people:
return False
order = dict(map(lambda x: (x, people[x]), order))
if current is not None:
order = dict(
sorted(
list(order.items()) + [(current, people[current])],
key=lambda item: item[1]["surname"],
)
)
order = dict(
sorted(
list(order.items()) + [uid, people[uid]],
key=lambda item: item[1]["surname"],
)
)
pos = list(order.keys()).index(current)
if pos == 0:
return False
db.write(f"{forum}.rookies.order", list(order.keys())[1:])
return True
def parse_dates(forum: int, args: typing.Iterable) -> list | str:
"""
Translates strings into dates in forum's context.
Returns problematic string if it couldn't be parsed.
"""
dates = []
cur_date = get_time(forum).date() - dt.timedelta(days=1)
cur_year = cur_date.year
for a in args:
human_relative = {
"сегодня": get_time(forum).date(),
"завтра": get_time(forum).date() + dt.timedelta(days=1),
"послезавтра": get_time(forum).date() + dt.timedelta(days=2),
"вчера": get_time(forum).date() - dt.timedelta(days=1),
"позавчера": get_time(forum).date() - dt.timedelta(days=2),
}.get(a.lower())
if human_relative is not None:
dates.append(human_relative)
d = a.split(".")
a_dates = []
if len(d) in (2, 3):
try:
d = list(map(int, d))
except ValueError:
return a
if len(d) == 2:
years = [cur_year, cur_year + 1]
else:
years = [(cur_year // 100 * 100) + d[2] if (d[2] < 100) else d[2]]
for y in years:
try:
a_dates.append(dt.datetime(y, d[1], d[0]).date())
except ValueError:
pass
a_dates = sorted(
filter(
lambda x: cur_date + dt.timedelta(days=120) > x > cur_date,
a_dates,
)
)
if len(a_dates) == 0:
return a
else:
return a
dates.append(a_dates[0])
return dates
def mod_days(message: telebot.types.Message, target: str, neighbour: str) -> None:
"Helper function to add skip and work days."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) == 0:
dates = [get_time(forum).date()]
else:
dates = parse_dates(forum, args)
if isinstance(dates, str):
bot.reply_to(
chat,
telebot.formatting.escape_markdown(dates)
+ " — это точно дата из ближайшего будущего?",
)
return
if dates is None:
bot.reply_to(chat, "Нечего добавлять")
return
t = db.read(target)
if t is None:
t = []
n = db.read(neighbour)
if n is None:
n = []
db.write(neighbour, list(filter(lambda x: x not in dates, n)))
db.write(target, list(sorted(set(t + dates))))
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"Добавил "
+ telebot.formatting.escape_markdown(
", ".join(
[".".join(map(str, (d.day, d.month, d.year))) for d in dates]
)
),
)
antispam_tmo = tmo.Timeout(10 * 60)
def antispam(
message: telebot.types.Message, chat: telebot.types.Message, forum: int
) -> bool:
"Removes frequent non admin's commands."
if check_if_admin(message):
return False
antispam_tmo.period = db.read(f"{forum}.settings.antispam.period", 600)
return antispam_tmo.check(
lambda: bot.reply_to(chat, "*Хватит спамить\\!\\!\\!*"),
lambda: bot.delete_message(forum, message.id),
)
@bot.message_handler(commands=["start"])
def start_bot(message: telebot.types.Message):
"Command to print kickstart info."
forum = message.chat.id
chat = get_chat(message, True)
if chat is not None:
if antispam(message, chat, forum):
return
if message.chat.is_forum:
bot.reply_to(
chat,
"Привет\\! Я бот для управления дежурствами и напоминания о них\\."
+ " Напиши /link, чтобы привязать комнату\\.",
)
else:
bot.reply_to(
chat,
"Я работаю только на форумах \\(супергруппах с комнатами\\)\\."
+ " Пригласи меня в один из них и напиши /start",
)
@bot.message_handler(commands=["help"])
def get_help(message: telebot.types.Message):
"Command to print info about all of the available commands."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
""
+ "/admin \\[@\\] — переопределить *админа*\n"
+ "/link — привязать комнату\n"
+ "/unlink — отвязать комнату\n"
+ "/cleaner 0/1 — удалять команды\n"
+ "/new @ Имя Фамилия — добавить ~салагу~ студента\n"
+ "/readd \\# @ — если студент поменял ник, это нужно отметить\n"
+ "/remind — напомнить дежурным подежурить, а новичкам отметится\n"
+ "/forget — забыть всех новичков\n"
+ "/del @/\\#/Имя — убрать студента\n"
+ "/list \\[@/\\#/Имя\\] — получить информацию по имени/фамиили\n"
+ "/purge — удалить форум из базы *\\!\\!\\!ОПАСНО\\!\\!\\!*\n"
+ "/timezone ±n — установить часовой пояс по UTC\n"
+ "/days \\*\\*\\*\\*\\*\\* \\*\\*\\*\\*\\*\\*"
+ " \\(\\* \\= \\[\\0/1\\]\\) — задать дни недели \\(ПН\\-СБ\\),"
+ " когда необходимо дежурство, отдельно для числителей и знаменателей\n"
+ "/calendar \\[Числитель/знаменатель\\] — показать календарь дежурств\n"
+ "/phase \\[Числитель/знаменатель\\] — узнать или скорректировать"
+ " фазу текущей недели\n"
+ "/skip \\[00\\.00\\] — пропустить сегодняшний или заданный день\n"
+ "/work \\[00\\.00\\] — поработать в сегодняшнем или заданном дне\n"
+ "/honor \\[\\-\\]@ — пропуск следующего дежурства, так как студент молодец\n"
+ "/sick @ \\[дата\\] — пропуск дежурства по причине болезни\n"
+ "/force \\[\\-\\]@ — провинившийся дежурит как только так сразу\n"
+ "/order — посмотреть очередь дежурств\n"
+ "/stop — остановить дежурства\n"
+ "/begin \\[@\\] — начать сначала с определённого студента",
)
if __debug__:
def pretty(d, indent=0):
"Print pretty dict."
for key, value in d.items():
stderr.write(" " * indent + f"{key}\n")
if isinstance(value, dict):
pretty(value, indent + 1)
else:
stderr.write(" " * (indent + 1) + f"{value}\n")
@bot.message_handler(commands=["info"])
def info(message: telebot.types.Message):
"Command to print db."
forum = message.chat.id
chat = get_chat(message, True)
if chat is not None:
bot.delete_message(forum, message.id)
if message.from_user.username == db.read("about.host"):
pretty(db.read(""))
@bot.message_handler(commands=["exec"])
def exec_bot(message: telebot.types.Message):
"Command to eval python code."
forum = message.chat.id
chat = get_chat(message, True)
if chat is not None:
if message.from_user.username == db.read("about.host"):
try:
result = ast.literal_eval(" ".join(message.text.split(" ")[1:]))
# Disabling W0718 because everything can be excepted with eval
except Exception as e: # pylint: disable=broad-exception-caught
bot.reply_to(chat, f"Ошибка выполнения: {e}")
return
bot.reply_to(
chat,
telebot.formatting.escape_markdown(str(result)),
)
else:
bot.delete_message(forum, message.id)
@bot.message_handler(commands=["backup"])
def backup_db(message: telebot.types.Message):
"Command to backup database."
forum = message.chat.id
chat = get_chat(message, True)
if chat is not None:
bot.delete_message(forum, message.id)
if message.from_user.username == db.read("about.host"):
args = message.text.split()[1:]
if len(args) == 0:
args.append("")
db.save(f"{args[0]}.backup.db")
@bot.message_handler(commands=["restore"])
def restore_db(message: telebot.types.Message):
"Command to restore from backup"
forum = message.chat.id
chat = get_chat(message, True)
if chat is not None:
bot.delete_message(forum, message.id)
if message.from_user.username == db.read("about.host"):
args = message.text.split()[1:]
if len(args) == 0:
args.append("")
db.load(f"{args[0]}.backup.db")
@bot.message_handler(commands=["link"])
def link(message: telebot.types.Message):
"Command to link forum's room."
forum = message.chat.id
chat = get_chat(message, True)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
db.write(f"{forum}.settings.chat", message.reply_to_message)
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
message.reply_to_message,
'Комната "'
+ f"{message.reply_to_message.forum_topic_created.name}"
+ '" привязана\\. '
+ "Чек-лист того, что нужно написать в первую очередь:\n"
+ "/admin \\[@\\], чтобы задать админа\n"
+ "/help, чтобы узнать, что я умею\n"
+ "/tz, чтобы задать часовой пояс\n"
+ "/phase \\[Числитель/знаменатель\\], чтобы задать недельную фазу\n"
+ "/days \\*\\*\\*\\*\\*\\* \\*\\*\\*\\*\\*\\* \\(\\* \\= \\[\\0/1\\]\\),"
+ " чтобы задать дни дежурств\n"
+ "/new @ Имя Фамилия, чтобы добавить ~салаг~ студетов\n"
+ "/begin \\[@\\], чтобы начать дежурство",
)
@bot.message_handler(commands=["unlink"])
def unlink(message: telebot.types.Message):
"Command to unlink room."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
db.write(f"{forum}.settings.chat", None)
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Комната отвязана")
@bot.message_handler(commands=["purge"])
def purge_db_people(message: telebot.types.Message):
"Command to clear info about people from database."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
if message.text == "/purge ДА УДАЛЯЙ ДАВАЙ":
db.write(str(forum), None)
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "База студентов удалена")
else:
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"Чтобы подтвердить свои благие намерения, напиши\n"
+ "`/purge ДА УДАЛЯЙ ДАВАЙ`",
)
@bot.message_handler(commands=["forget"])
def forget_db_pending(message: telebot.types.Message):
"Command to forget unregistered newbies."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
empty = db.read(f"{forum}.pending") is None
db.write(f"{forum}.pending", None)
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"База новичков удалена"
+ (", хоть и была уже пустая…" if empty else ""),
)
@bot.message_handler(commands=["admin"])
def set_admin(message: telebot.types.Message):
"Command to register admin."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
admin = [i for i in message.text.split() if i[0] == "@"]
if len(admin) == 0:
uid = db.read(f"{forum}.settings.admin.id")
if uid == message.from_user.id:
bot.reply_to(chat, "Ты уже тут главный")
return
db.write(f"{forum}.settings.admin.id", message.from_user.id)
db.write(f"{forum}.settings.admin.username", message.from_user.username)
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat, f"Рад познакомиться, {message.from_user.first_name}\\!"
)
else:
admin = admin[0][1:]
uadmin = db.read(f"{forum}.settings.admin.username")
if uadmin == admin:
bot.reply_to(chat, "Ты уже тут главный")
return
db.write(f"{forum}.settings.admin.id", None)
db.write(f"{forum}.settings.admin.username", admin)
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"Теперь @"
+ telebot.formatting.escape_markdown(admin)
+ " тут царь и бог\\!\n"
+ "Напиши /admin в чат, чтобы я знал тебя в лицо",
)
@bot.message_handler(commands=["list"])
def list_users(message: telebot.types.Message):
"Command to list users."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
s = message.text.split()[1:]
if len(s) == 0:
s = [""]
r = []
for i in s:
f = find_uids(forum, i)
if f is not None:
r += f
if len(r) == 0:
bot.reply_to(chat, "Никого не нашёл")
return
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
res = []
for i in r:
res.append(format_user_info(forum, i))
bot.reply_to(chat, "\n".join(res))
@bot.message_handler(commands=["remind"])
def remind_users(message: telebot.types.Message | int):
"Command to remind about duty or about registration. Can be called automatically."
auto, forum = (
(True, message) if isinstance(message, int) else (False, message.chat.id)
)
chat = get_chat(message)
if chat is not None:
if not auto and antispam(message, chat, forum):
return
if auto or check_if_admin(message):
pending = db.read(f"{forum}.pending", {})
r = ""
if len(pending.keys()) != 0:
for i in pending.keys():
r += f"@{telebot.formatting.escape_markdown(i)}\n"
r += "нужно нажать /new\n\n"
status = get_status(forum)
if status:
current = db.read(f"{forum}.rookies.current")
if current is not None:
rookie = db.read(f"{forum}.people.{current}")
if rookie is not None:
r += f"{mention(forum, current)} сегодня дежурит"
if not auto and db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
if not auto and r == "":
bot.reply_to(chat, "Дежурств сегодня нет")
else:
bot.reply_to(chat, r)
@bot.message_handler(commands=["del"])
def del_person(message: telebot.types.Message):
"Command to remove person from forum."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) < 1:
bot.reply_to(chat, "Давай хоть кого\\-нибудь удалим")
return
if len(args) > 1:
bot.reply_to(chat, "Давай удалять их по очереди")
return
user = args[0]
if user[1:] == bot.get_me().username:
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Ты сейчас быканул\\?")
return
f = find_uids(forum, user)
if f is None:
bot.reply_to(chat, "Никого не нашёл")
return
if len(f) > 1:
bot.reply_to(chat, "Немогу определиться…")
list_users(message)
bot.reply_to(chat, "Конкретезируй плиз")
return
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, f"Скатертью дорога, {mention(forum, f[0])}\\!")
db.pop(f"{forum}.people.{f[0]}")
@bot.message_handler(commands=["cleaner"])
def cleaner(message: telebot.types.Message):
"Command to set commands cleaner status."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) != 1:
bot.reply_to(chat, "Нужно указать `0` или `1`")
return
if args[0] not in ("0", "1"):
bot.reply_to(chat, "Нужно указать `0` или `1`")
return
state = args[0] == "1"
db.write(f"{forum}.settings.delete_messages", state)
if state:
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"Очистка команд "
+ ("включена" if state else "отключена")
+ (
"\nЧтобы чистка работала, мне нужно дать право удалять сообщения"
if state
else ""
),
)
@bot.message_handler(commands=["new"])
def add_new(message: telebot.types.Message):
"Command to add new person to forum."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
args = message.text.split()[1:]
if len(args) == 0:
pending = db.read(f"{forum}.pending")
if pending is None:
antispam(message, chat, forum)
return
user = message.from_user.username
if user not in pending.keys():
antispam(message, chat, forum)
return
uid = str(message.from_user.id)
db.write(f"{forum}.people.{uid}.username", user)
db.write(f"{forum}.people.{uid}.name", pending[user]["name"])
db.write(f"{forum}.people.{uid}.surname", pending[user]["surname"])
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, f"Рад познакомиться, {mention(forum, uid)}\\!")
db.pop(f"{forum}.pending.{user}")
elif check_if_admin(message):
if len(args) != 3:
bot.reply_to(
chat, "Нужно указать @, имя и фамилию\nНе больше, не меньше"
)
return
user = args[0]
if user[0] != "@":
bot.reply_to(chat, "Пользователей нужно помянуть через `@`")
return
user = user[1:]
if user == bot.get_me().username:
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Я тебе не салага\\!")
return
people = db.read(f"{forum}.people", [])
pending = db.read(f"{forum}.pending", {})
if any(
[people[i]["username"] == user for i in people]
+ [user in pending.keys()]
):
bot.reply_to(chat, "Пользователь с таким ником уже в базе")
return
name = args[1]
surname = args[2]
db.write(f"{forum}.pending.{user}.name", name)
db.write(f"{forum}.pending.{user}.surname", surname)
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"Добро пожаловать, @"
+ telebot.formatting.escape_markdown(user)
+ f" \\({name} {surname}\\)\\!\n"
+ "Первое твоё обязательство — написать /new",
)
@bot.message_handler(commands=["readd"])
def readd(message: telebot.types.Message):
"Command to change person's nickname."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) != 2:
bot.reply_to(chat, "Нужно указать id и новый ник\nНе больше, не меньше")
return
uid = args[0]
try:
int(uid)
except ValueError:
bot.reply_to(
chat, "ID — это число\nНа буквах пока считать не научились"
)
return
if db.read(f"{forum}.people.{uid}") is None:
bot.reply_to(chat, "Такого пользователя нет в базе")
return
user = args[1]
if user[0] != "@":
bot.reply_to(chat, "Пользователей нужно помянуть через `@`")
return
user = user[1:]
if user == bot.get_me().username:
bot.reply_to(chat, "Я тебе не салага\\!")
return
db.write(f"{forum}.people.{uid}.username", user)
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
f"@{telebot.formatting.escape_markdown(user)}"
+ " от меня не скроется\\!",
)
@bot.message_handler(commands=["days"])
def set_days(message: telebot.types.Message):
"Command to define duty days."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) != 2:
bot.reply_to(chat, "Мне нужно два двоичных числа")
return
num1 = args[0]
num2 = args[1]
if len(num1) != 6:
bot.reply_to(chat, "В первом числе не 6 цифр")
return
if len(num2) != 6:
bot.reply_to(chat, "Во втором числе не 6 цифр")
return
if not all(i in ("0", "1") for i in num1):
bot.reply_to(chat, "Цифры двоичные должны быть в первом")
return
if not all(i in ("0", "1") for i in num2):
bot.reply_to(chat, "Цифры двоичные должны быть во втором")
return
db.write(
f"{forum}.schedule.days",
(
[num1[i] == "1" for i in range(6)],
[num2[i] == "1" for i in range(6)],
),
)
message.text = ""
calendar(message)
@bot.message_handler(commands=["calendar"])
def calendar(message: telebot.types.Message):
"Command to display duty calendar."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
args = message.text.split()[1:]
if len(args) > 1:
bot.reply_to(chat, "Многа букав")
return
en1, en2 = False, False
if len(args) == 1:
en1 = args[0].lower() in "числитель"
en2 = args[0].lower() in "знаменатель"
if en1 == en2 == False:
en1, en2 = True, True
days = db.read(f"{forum}.schedule.days", ([False] * 6, [False] * 6))
skip = db.read(f"{forum}.schedule.skip_days", [])
work = db.read(f"{forum}.schedule.work_days", [])
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"`\\|#\\|Пн \\|Вт \\|Ср \\|Чт \\|Пт \\|Сб \\|\n"
+ (
" Ч"
+ "".join([" " + ("X" if days[0][i] else " ") + " " for i in range(6)])
+ "\n"
if en1
else ""
)
+ (
" З"
+ "".join([" " + ("X" if days[1][i] else " ") + " " for i in range(6)])
if en2
else ""
)
+ "`"
+ "\n\n"
+ "*Пропуски:*\n"
+ telebot.formatting.escape_markdown(
"\n".join([".".join(map(str, (d.day, d.month, d.year))) for d in skip])
)
+ "\n\n"
+ "*Рабочие:*\n"
+ telebot.formatting.escape_markdown(
"\n".join([".".join(map(str, (d.day, d.month, d.year))) for d in work])
),
)
@bot.message_handler(commands=["skip"])
def set_skip_days(message: telebot.types.Message):
"Command to set skipped days."
forum = message.chat.id
mod_days(message, f"{forum}.schedule.skip_days", f"{forum}.schedule.work_days")
@bot.message_handler(commands=["work"])
def set_work_days(message: telebot.types.Message):
"Command to set work days."
forum = message.chat.id
mod_days(message, f"{forum}.schedule.work_days", f"{forum}.schedule.skip_days")
@bot.message_handler(commands=["stop"])
def stop_queue(message: telebot.types.Message):
"Command to stop duty."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
status = db.read(f"{forum}.is_active")
if status is None or not status:
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Держурство уже не идёт")
return
status = False
db.write(f"{forum}.is_active", status)
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Держурство остановлено")
@bot.message_handler(commands=["honor"])
def add_honor(message: telebot.types.Message):
"Command to add honored person."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) != 1:
bot.reply_to(chat, "Нужно указать человека")
return
user = args[0]
if user[1:] == bot.get_me().username:
bot.reply_to(chat, "Рад, что всегда в почёте")
return
f = find_uids(forum, user)
if f is None:
bot.reply_to(chat, "Никого не нашёл")
return
if len(f) > 1:
bot.reply_to(chat, "Немогу определиться…")
list_users(message)
bot.reply_to(chat, "Конкретезируй плиз")
return
user = f[0]
force = db.read(f"{forum}.rookies.force_order", [])
honor = db.read(f"{forum}.rookies.honor_order", [])
people = db.read(f"{forum}.people")
honor.append(user)
for i in people:
if (i in force) and (i in honor):
force.remove(i)
honor.remove(i)
db.write(f"{forum}.rookies.force_order", force)
db.write(f"{forum}.rookies.honor_order", honor)
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"Вы в почёте, "
+ telebot.formatting.escape_markdown(
f"{people[user]['name']} {people[user]['surname']}"
),
)
@bot.message_handler(commands=["force"])
def add_force(message: telebot.types.Message):
"Command to add guilty person."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) != 1:
bot.reply_to(chat, "Нужно указать человека")
return
user = args[0]
if user[1:] == bot.get_me().username:
bot.reply_to(chat, "Рад, что всегда в почёте")
return
f = find_uids(forum, user)
if f is None:
bot.reply_to(chat, "Никого не нашёл")
return
if len(f) > 1:
bot.reply_to(chat, "Немогу определиться…")
list_users(message)
bot.reply_to(chat, "Конкретезируй плиз")
return
user = f[0]
force = db.read(f"{forum}.rookies.force_order", [])
honor = db.read(f"{forum}.rookies.honor_order", [])
people = db.read(f"{forum}.people")
force.append(user)
for i in people:
if (i in force) and (i in honor):
force.remove(i)
honor.remove(i)
db.write(f"{forum}.rookies.force_order", force)
db.write(f"{forum}.rookies.honor_order", honor)
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"Отрабатывай, "
+ telebot.formatting.escape_markdown(
f"{people[user]['name']} {people[user]['surname']}"
),
)
@bot.message_handler(commands=["sick"])
def add_sick(message: telebot.types.Message):
"Command to add sick person."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) not in (1, 2):
bot.reply_to(chat, "Нужно указать человека и дату выздоровления")
return
user = args[0]
if user[1:] == bot.get_me().username:
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Ты сейчас быканул\\?")
return
f = find_uids(forum, user)
if f is None:
bot.reply_to(chat, "Никого не нашёл")
return
if len(f) > 1:
bot.reply_to(chat, "Немогу определиться…")
list_users(message)
bot.reply_to(chat, "Конкретезируй плиз")
return
user = f[0]
if len(args) > 1:
dates = parse_dates(forum, args[1:])
if isinstance(dates, str):
bot.reply_to(
chat,
telebot.formatting.escape_markdown(dates)
+ " — это точно дата из ближайшего будущего?",
)
return
if dates is None:
bot.reply_to(chat, "Нечего добавлять")
return
date = dates[0]
else:
date = get_time(forum).date() + dt.timedelta(days=30)
sicks = db.read(f"{forum}.rookies.sick_order", {})
people = db.read(f"{forum}.people")
sicks[user] = (date, False)
db.write(f"{forum}.rookies.sick_order", sicks)
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
telebot.formatting.escape_markdown(
f"{people[user]['name']} {people[user]['surname']}"
)
+ " болеет до "
+ telebot.formatting.escape_markdown(
".".join(map(str, (date.day, date.month, date.year)))
),
)
@bot.message_handler(commands=["order"])
def view_order(message: telebot.types.Message):
"Command to display duty order, honored, guilty and sick people."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
args = message.text.split()[1:]
if len(args) > 0:
bot.reply_to(chat, "Многа букав")
return
order = db.read(f"{forum}.rookies.order", [])
force = db.read(f"{forum}.rookies.force_order", [])
honor = db.read(f"{forum}.rookies.honor_order", [])
sicks = db.read(f"{forum}.rookies.sick_order", {})
people = db.read(f"{forum}.people")
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"*Очередь:*\n"
+ telebot.formatting.escape_markdown(
"\n".join(
[f"{people[u]['name']} {people[u]['surname']}" for u in order]
)
)
+ "\n\n"
+ "*Почитаемые:*\n"
+ telebot.formatting.escape_markdown(
"\n".join(
[f"{people[u]['name']} {people[u]['surname']}" for u in honor]
)
)
+ "\n"
+ "*Виноватые:*\n"
+ telebot.formatting.escape_markdown(
"\n".join(
[f"{people[u]['name']} {people[u]['surname']}" for u in force]
)
)
+ "\n"
+ "*Больные:*\n"
+ telebot.formatting.escape_markdown(
"\n".join(
[
f"{people[u]['name']} {people[u]['surname']} до "
+ ".".join(
map(
str,
(sicks[u][0].day, sicks[u][0].month, sicks[u][0].year),
)
)
for u in sicks
]
)
),
)
@bot.message_handler(commands=["begin"])
def begin_queue(message: telebot.types.Message):
"Command to begin queue."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) > 1:
bot.reply_to(
chat, "Нужно указать человека, с которого начнётся дежурство"
)
return
if len(args) == 1:
user = args[0]
if user[1:] == bot.get_me().username:
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Ты сейчас быканул\\?")
return
f = find_uids(forum, user)
if f is None:
bot.reply_to(chat, "Никого не нашёл")
return
if len(f) > 1:
bot.reply_to(chat, "Немогу определиться…")
list_users(message)
bot.reply_to(chat, "Конкретезируй плиз")
return
start_with = f[0]
else:
start_with = None
phase = db.read(f"{forum}.schedule.phase")
if phase is None:
bot.reply_to(chat, "Задай фазу с помощью /phase")
return
status = db.read(f"{forum}.is_active")
if status:
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Держурство уже идёт")
return
status = True
db.write(f"{forum}.is_active", status)
stack_update(forum, start_with)
now_date = get_time(forum).date()
start_with = db.read(f"{forum}.rookies.order", [None])[0]
if start_with is None:
bot.reply_to(chat, "Людей нет")
return
db.write(f"{forum}.schedule.last_stack_update_date", now_date)
db.write(f"{forum}.schedule.last_notification_date", now_date)
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"Держурство начнёт "
+ f"{db.read(f'{forum}.people.{start_with}.name')} "
+ f"{db.read(f'{forum}.people.{start_with}.surname')}",
)
@bot.message_handler(commands=["phase"])
def set_phase(message: telebot.types.Message):
"Command to set start phase."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) > 1:
bot.reply_to(chat, "Многа букав")
return
en2 = False
if len(args) == 1:
en1 = args[0].lower() in "числитель"
en2 = args[0].lower() in "знаменатель"
if en1 == en2:
bot.reply_to(chat, "Это числитель или знаменатель?")
return
now = get_time(forum).date()
phase = int(bool(en2))
phases = 2
db.write(f"{forum}.schedule.phase", phase)
db.write(f"{forum}.schedule.phases", phases)
db.write(f"{forum}.schedule.last_phase_changing_date", now)
phase = db.read(f"{forum}.schedule.phase", 0)
phases = db.read(f"{forum}.schedule.phase", 2)
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat, "Текущая неделя: " + ("знаменатель" if phase else "числитель")
)
@bot.message_handler(commands=["timezone", "tz"])
def set_timezone(message: telebot.types.Message):
"Command to set forum's timezone."
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) > 1:
bot.reply_to(chat, "Многа букав")
return
if len(args) == 1:
try:
args[0] = args[0].lower()
if "utc" in args[0]:
args[0] = args[0][3:]
tz = int(args[0])
if tz not in range(-12, 14 + 1):
raise ValueError
except ValueError:
bot.reply_to(
chat,
"Нужно указать [смещение по UTC](https:"
+ "//ru.wikipedia.org/wiki/%D0%92%D1%81%D"
+ "0%B5%D0%BC%D0%B8%D1%80%D0%BD%D0%BE%D0%"
+ "B5_%D0%BA%D0%BE%D0%BE%D1%80%D0%B4%D0%B"
+ "8%D0%BD%D0%B8%D1%80%D0%BE%D0%B2%D0%B0%"
+ "D0%BD%D0%BD%D0%BE%D0%B5_%D0%B2%D1%80%D"
+ "0%B5%D0%BC%D1%8F)",
)
return
db.write(f"{forum}.settings.timezone", tz)
tz = db.read(f"{forum}.settings.timezone", 3)
if db.read(f"{forum}.settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"Часовой пояс: UTC"
+ ("\\+" if tz >= 0 else "")
+ telebot.formatting.escape_markdown(str(tz)),
)
def get_hours() -> tuple:
"Returns forum's work hours."
# TODO command to set range
return tuple(range(8, 20))
def stack_update(forum: int, force_reset: bool = False) -> None:
"Updates forum's stacks."
now = get_time(forum)
now_date = now.date()
order = db.read(f"{forum}.rookies.order", [])
force = db.read(f"{forum}.rookies.force_order", [])
honor = db.read(f"{forum}.rookies.honor_order", [])
sicks = db.read(f"{forum}.rookies.sick_order", {})
people = db.read(f"{forum}.people", {})
to_pop = []
for i in sicks:
if now_date >= sicks[i][0]:
if sicks[i][1]:
prepend_user(forum, ".rookies.force_order", i)
to_pop.append(i)
for i in to_pop:
sicks.pop(i)
db.write(f"{forum}.rookies.sick_order", sicks)
for i in people:
if (i in force) and (i in honor):
force.remove(i)
honor.remove(i)
db.write(f"{forum}.rookies.force_order", force)
db.write(f"{forum}.rookies.honor_order", honor)
for i in order:
if i not in people.keys():
order.remove(i)
if len(order) == 0 or force_reset is not False:
order = list(
dict(sorted(people.items(), key=lambda item: item[1]["surname"])).keys()
)
if force_reset is not False:
db.write(f"{forum}.rookies.force_order", [])
db.write(f"{forum}.rookies.honor_order", [])
db.write(f"{forum}.rookies.sick_order", {})
if force_reset is not None:
try:
order = order[order.index(force_reset) :]
except ValueError:
pass
db.write(f"{forum}.rookies.order", order)
if len(order) == 0:
return
if force_reset is False:
if len(force) > 0:
db.write(f"{forum}.rookies.current", pop_user(forum, "rookies.force_order"))
else:
current = pop_user(forum, "rookies.order")
if current in honor:
honor.remove(current)
db.write(f"{forum}.rookies.honor_order", honor)
stack_update(forum)
elif any((sicks[i][1] is False) and (i == current) for i in sicks):
skipped = list(sicks[current])
skipped[1] = True
sicks[current] = tuple(skipped)
db.write(f"{forum}.rookies.sick_order", sicks)
stack_update(forum)
else:
db.write(f"{forum}.rookies.current", pop_user(forum, "rookies.order"))
def clean_old_dates(date: dt.datetime, array: str) -> None:
"Removes dates from db's array which are older than `date`."
a = db.read(array)
a = list(filter(lambda x: x >= date, a))
db.write(array, a)
def update(forum: int) -> None:
"""
Updates forum's state:
· Cleans old skip/work dates.
· Updates order stack.
· Notifies about upcoming duty.
"""
now = get_time(forum)
now_date = now.date()
now_time = now.time()
last_notif = db.read(f"{forum}.schedule.last_notification_date")
last_upd_stack = db.read(f"{forum}.schedule.last_stack_update_date")
is_active = get_status(forum, now_date)
hours_range = get_hours()
change_phase(forum, now_date)
clean_old_dates(now_date, f"{forum}.schedule.work_days")
clean_old_dates(now_date, f"{forum}.schedule.skip_days")
if is_active and (last_upd_stack is None or now_date > last_upd_stack):
db.write(f"{forum}.schedule.last_stack_update_date", now_date)
stack_update(forum)
if now_time.hour in hours_range:
if last_notif is None or now_date > last_notif:
stdout.write("Notified\n")
db.write(f"{forum}.schedule.last_notification_date", now_date)
remind_users(forum)
if db.read("about.updatedfrom") != db.read("about.version"):
db.write("about.updatenotified", True)
update_notify(forum)
def update_notify(forum: int) -> None:
"Notifies the forum about bot's new version."
bot.reply_to(
get_chat(forum),
# f"Обновился до версии {telebot.formatting.escape_markdown(CURRENT_VERSION)}",
f"Обновился до версии {telebot.formatting.escape_markdown(CURRENT_VERSION)}\n" \
+ "Дежурик поздравляет всех бета-тестеров С ноВЫыыыыыМ Г0Йда и напоминает о том," \
+ " что на каникулах и сессии дежурства продолжаются",
)
def process1():
"The process runs telegram infinite polling."
bot.infinity_polling(none_stop=True)
p2_tmo = tmo.Timeout(120)
def process2():
"The process updates duty order for every forum once a `period` seconds."
def p2():
"Helper function."
stdout.write("Process 2 update\n")
if db.read("about.updatenotified", True):
db.write("about.updatedfrom", db.read("about.version"))
for f in db.read("").keys():
try:
update(int(f))
except ValueError:
pass
p2_tmo.period = int(cr.read("settings.notify_period"))
while True:
p2_tmo.check(p2, lambda: None)
funcs = [process1, process2]
threads = map(lambda x: Thread(target=x), funcs)
for thread in threads:
thread.daemon = True
thread.start()
db.write("about.updatenotified", False)
db.write("about.updatedfrom", VERSION)
db.write("about.version", CURRENT_VERSION)
while True:
time.sleep(1)