duty-board-dog/bot.py

1471 lines
56 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# SPDX-FileCopyrightText: 2023 Egor Guslyancev <electromagneticcyclone@disroot.org>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
"Main bot module."
import time
import ast
import datetime as dt
import typing
from sys import stderr, stdout, stdin
from threading import Thread
import telebot
import config_reader as cr
import db_classes
# TODO more backends (redis at least)
db = db_classes.PickleDB(".db")
db.load()
CURRENT_VERSION = "v1.0rc8"
VERSION = db.read("about.version", CURRENT_VERSION)
db.write("about.updatedfrom", VERSION)
db.write("about.version", CURRENT_VERSION)
db.write("about.author", "electromagneticcyclone")
db.write("about.tester", "angelbeautifull")
db.write(
"about.source", "https://git.disroot.org/electromagneticcyclone/duty-board-dog"
)
if (db.read("about.host") is None) and __debug__:
stdout.write("Введите username хоста: ")
stdout.flush()
db.write("about.host", stdin.readline()[:-1])
bot = telebot.TeleBot(
cr.read(f"tokens.{("devel" if __debug__ else "prod")}"), parse_mode="MarkdownV2"
)
def get_time(forum: int) -> dt.datetime:
"Get datetime.now in forum's timezone. Default timezone is UTC+3."
return dt.datetime.now(dt.UTC) + dt.timedelta(
hours=db.read(str(forum) + ".settings.timezone", 3)
)
def change_phase(forum: int, date: dt.datetime = None) -> None:
"Changes forum's current phase."
if date is None:
date = get_time(forum).date()
phase = db.read(str(forum) + ".schedule.phase")
phases = db.read(str(forum) + ".schedule.phases")
last_changed = db.read(str(forum) + ".schedule.last_phase_changing_date")
if None in (phase, last_changed, phases):
return
now = date
weekday = now.weekday()
if not (now > last_changed and weekday == 0):
return
phase = (phase + 1) % phases
db.write(str(forum) + ".schedule.phase", phase)
db.write(str(forum) + ".schedule.last_phase_changing_date", now)
def get_status(forum: int, now: dt.datetime = None) -> bool:
"Returns forum's duty status. Sundays are free."
if now is None:
now = get_time(forum).date()
weekday = now.weekday()
if weekday == 6:
return False
phase = db.read(str(forum) + ".schedule.phase", 0)
work_days = db.read(str(forum) + ".schedule.work_days", [])
skip_days = db.read(str(forum) + ".schedule.skip_days", [])
days = db.read(str(forum) + ".schedule.days", ([0] * 6, [0] * 6))
is_active = db.read(str(forum) + ".is_active", False)
if is_active:
if now in work_days:
return True
if now in skip_days:
return 0
if days[phase][weekday]:
return True
return False
def get_chat(
message: telebot.types.Message | int, start: bool = False
) -> telebot.types.Message | None:
"Returns forum's root message or user's message if chat is not a forum."
if isinstance(message, int):
is_forum = True
forum = message
else:
is_forum = message.chat.is_forum
forum = message.chat.id
if (db.read(str(forum)) is None or not is_forum) and not start:
return None
chat = db.read(str(forum) + ".settings.chat")
if isinstance(message, int):
return chat
if (chat is None) or (chat.id == message.reply_to_message.id):
return message.reply_to_message if is_forum else message
return None
def check_if_admin(message: telebot.types.Message) -> bool | None:
"Checks if the message is sent by the forum's admin."
forum = message.chat.id
admin = db.read(str(forum) + ".settings.admin")
if admin is None:
return True
if admin["id"] is None:
return admin["username"] == message.from_user.username
return admin["id"] == message.from_user.id
def mention(forum: int, uid: int) -> str | None:
"Returns markdown formatted string with user's mention."
uid = str(uid)
if db.read(str(forum) + ".people." + uid) is None:
stderr.write("Пользователя с ID " + uid + " нет в базе.\n")
return None
return (
"["
+ db.read(str(forum) + ".people." + uid + ".name")
+ " "
+ db.read(str(forum) + ".people." + uid + ".surname")
+ "](tg://user?id="
+ str(uid)
+ ")"
)
def find_uids(forum: int, s: str) -> list | None:
"Find user's id by nickname, name or surname."
people = db.read(str(forum) + ".people")
if people is None:
return None
if s[0] == "@":
s = s[1:]
f = [i for i in people.keys() if len([j for j in people[i].values() if s in j]) > 0]
if len(f) == 0:
return None
return f
def format_user_info(forum: int, uid: int) -> str:
"Returns markdown formatted string with all user's info by their id."
uid = str(uid)
person = db.read(str(forum) + ".people." + uid)
if person is None:
return ""
r = ""
r += "\\#" + uid + "\n"
for k, i in person.items():
r += "" + " " + k + " \\= " + telebot.formatting.escape_markdown(i) + "\n"
return r
def prepend_user(forum: int, ulist_s: str, uid: int) -> None:
"Inserts user id at the start of provided db list in forum's context."
uid = str(uid)
ulist = db.read(str(forum) + "." + ulist_s, [])
ulist = list(set([uid] + ulist))
db.write(str(forum) + "." + ulist_s, ulist)
def append_user(forum: int, ulist_s: str, uid: int) -> None:
"Inserts user id at the end of provided db list in forum's context."
uid = str(uid)
ulist = db.read(str(forum) + "." + ulist_s, [])
ulist = list(set(ulist + [uid]))
db.write(str(forum) + "." + ulist_s, ulist)
def pop_user(forum: int, ulist_s: str) -> dict | None:
"Removes user id from the start of provided db list in forum's context. Returns user id."
ulist = db.read(str(forum) + "." + ulist_s, [])
r = None
if len(ulist) > 0:
r = ulist.pop(0)
db.write(str(forum) + "." + ulist_s, ulist)
return r
def insert_user_in_current_order(forum: int, uid: int) -> bool:
uid = str(uid)
order = db.read(str(forum) + ".rookies.order", [])
people = db.read(str(forum) + ".people", {})
current = db.read(str(forum) + ".rookies.current")
if uid not in people:
return False
order = dict(map(lambda x: (x, people[x]), order))
if current is not None:
order = dict(
sorted(
list(order.items()) + [(current, people[current])],
key=lambda item: item[1]["surname"],
)
)
order = dict(
sorted(
list(order.items()) + [uid, people[uid]],
key=lambda item: item[1]["surname"],
)
)
pos = list(order.keys()).index(current)
if pos == 0:
return False
db.write(str(forum) + ".rookies.order", list(order.keys())[1:])
return True
def parse_dates(forum: int, args: typing.Iterable) -> list | str:
dates = []
cur_date = get_time(forum).date() - dt.timedelta(days=1)
cur_year = cur_date.year
for a in args:
if a.lower() == "сегодня":
dates.append(get_time(forum).date())
continue
if a.lower() == "завтра":
dates.append(get_time(forum).date() + dt.timedelta(days=1))
continue
if a.lower() == "послезавтра":
dates.append(get_time(forum).date() + dt.timedelta(days=2))
continue
if a.lower() == "вчера":
dates.append(get_time(forum).date() - dt.timedelta(days=1))
continue
if a.lower() == "позавчера":
dates.append(get_time(forum).date() - dt.timedelta(days=2))
continue
d = a.split(".")
a_dates = []
if len(d) in (2, 3):
try:
d = list(map(int, d))
except ValueError:
return a
if len(d) == 2:
years = [cur_year, cur_year + 1]
else:
years = [(cur_year // 100 * 100) + d[2] if (d[2] < 100) else d[2]]
for y in years:
try:
a_dates.append(dt.datetime(y, d[1], d[0]).date())
except ValueError:
pass
a_dates = sorted(
filter(
lambda x: cur_date + dt.timedelta(days=120) > x > cur_date,
a_dates,
)
)
if len(a_dates) == 0:
return a
else:
return a
dates.append(a_dates[0])
return dates
def mod_days(message: telebot.types.Message, target: str, neighbour: str) -> None:
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) == 0:
dates = [get_time(forum).date()]
else:
dates = parse_dates(forum, args)
if isinstance(dates, str):
bot.reply_to(
chat,
telebot.formatting.escape_markdown(dates)
+ " — это точно дата из ближайшего будущего?",
)
return
if dates is None:
bot.reply_to(chat, "Нечего добавлять")
return
t = db.read(target)
if t is None:
t = []
n = db.read(neighbour)
if n is None:
n = []
db.write(neighbour, list(filter(lambda x: x not in dates, n)))
db.write(target, list(sorted(set(t + dates))))
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"Добавил "
+ telebot.formatting.escape_markdown(
", ".join(
[".".join(map(str, (d.day, d.month, d.year))) for d in dates]
)
),
)
class Timeout:
__timeout = None
__caution = True
def __init__(self, period: int):
self.period = period
@property
def period(self) -> int:
return self.__period
@period.setter
def period(self, period: int):
self.__period = period
def check(self, once_func: typing.Callable, always_func: typing.Callable) -> bool:
if self.__timeout is not None:
if time.time() - self.__timeout < self.period:
if self.__caution:
once_func()
self.__caution = False
always_func()
return True
self.__caution = True
self.__timeout = time.time()
return False
tmo = Timeout(600)
def antispam(
message: telebot.types.Message, chat: telebot.types.Message, forum: int
) -> bool:
"Removes frequent non admin's commands."
if check_if_admin(message):
return False
tmo.period = db.read(str(forum) + ".settings.antispam.period", 600)
return tmo.check(
lambda: bot.reply_to(chat, "*Хватит спамить\\!\\!\\!*"),
lambda: bot.delete_message(forum, message.id),
)
@bot.message_handler(commands=["start"])
def start_bot(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message, True)
if chat is not None:
if antispam(message, chat, forum):
return
if message.chat.is_forum:
bot.reply_to(
chat,
"Привет\\! Я бот для управления дежурствами и напоминания о них\\. "
+ "Напиши /link, чтобы привязать комнату\\.",
)
else:
bot.reply_to(
chat,
"Я работаю только на форумах \\(супергруппах с комнатами\\)\\. "
+ "Пригласи меня в один из них и напиши /start",
)
@bot.message_handler(commands=["help"])
def get_help(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
""
+ "/admin \\[@\\] — переопределить *админа*\n"
+ "/link — привязать комнату\n"
+ "/unlink — отвязать комнату\n"
+ "/cleaner 0/1 — удалять команды\n"
+ "/new @ Имя Фамилия — добавить ~салагу~ студента\n"
+ "/readd \\# @ — если студент поменял ник, это нужно отметить\n"
+ "/remind — напомнить дежурным подежурить, а новичкам отметится\n"
+ "/forget — забыть всех новичков\n"
+ "/del @/\\#/Имя — убрать студента\n"
+ "/list \\[@/\\#/Имя\\] — получить информацию по имени/фамиили\n"
+ "/purge — удалить форум из базы *\\!\\!\\!ОПАСНО\\!\\!\\!*\n"
+ "/timezone ±n — установить часовой пояс по UTC\n"
+ "/days \\*\\*\\*\\*\\*\\* \\*\\*\\*\\*\\*\\* \\(\\* \\= \\[\\0/1\\]\\) — задать дни недели \\(ПН\\-СБ\\), когда необходимо дежурство, отдельно для числителей и знаменателей\n"
+ "/calendar \\[Числитель/знаменатель\\] — показать календарь дежурств\n"
+ "/phase \\[Числитель/знаменатель\\] — узнать или скорректировать фазу текущей недели\n"
+ "/skip \\[00\\.00\\] — пропустить сегодняшний или заданный день\n"
+ "/work \\[00\\.00\\] — поработать в сегодняшнем или заданном дне\n"
+ "/honor \\[\\-\\]@ — пропуск следующего дежурства, так как студент молодец\n"
+ "/sick @ \\[дата\\] — пропуск дежурства по причине болезни\n"
+ "/force \\[\\-\\]@ — провинившийся дежурит как только так сразу\n"
+ "/order — посмотреть очередь дежурств\n"
+ "/stop — остановить дежурства\n"
+ "/begin \\[@\\] — начать сначала с определённого студента",
)
if __debug__:
def pretty(d, indent=0):
for key, value in d.items():
print(" " * indent + str(key))
if isinstance(value, dict):
pretty(value, indent + 1)
else:
print(" " * (indent + 1) + str(value))
@bot.message_handler(commands=["info"])
def info(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message, True)
if chat is not None:
bot.delete_message(forum, message.id)
if message.from_user.username == db.read("about.host"):
pretty(db.read(""))
@bot.message_handler(commands=["exec"])
def exec_bot(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message, True)
if chat is not None:
if message.from_user.username == db.read("about.host"):
try:
result = ast.literal_eval(" ".join(message.text.split(" ")[1:]))
# Disabling W0718 because everything can be excepted with eval
except Exception as e: # pylint: disable=broad-exception-caught
bot.reply_to(chat, "Ошибка выполнения: " + str(e))
return
bot.reply_to(
chat,
telebot.formatting.escape_markdown(str(result)),
)
else:
bot.delete_message(forum, message.id)
@bot.message_handler(commands=["backup"])
def backup_db(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message, True)
if chat is not None:
bot.delete_message(forum, message.id)
if message.from_user.username == db.read("about.host"):
args = message.text.split()[1:]
if len(args) == 0:
args.append("")
db.save(args[0] + ".backup.db")
@bot.message_handler(commands=["restore"])
def restore_db(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message, True)
if chat is not None:
bot.delete_message(forum, message.id)
if message.from_user.username == db.read("about.host"):
args = message.text.split()[1:]
if len(args) == 0:
args.append("")
db.load(args[0] + ".backup.db")
@bot.message_handler(commands=["link"])
def link(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message, True)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
db.write(str(forum) + ".settings.chat", message.reply_to_message)
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
message.reply_to_message,
'Комната "'
+ str(message.reply_to_message.forum_topic_created.name)
+ '" привязана\\. '
+ "Чек-лист того, что нужно написать в первую очередь:\n"
+ "/admin \\[@\\], чтобы задать админа\n"
+ "/help, чтобы узнать, что я умею\n"
+ "/tz, чтобы задать часовой пояс\n"
+ "/phase \\[Числитель/знаменатель\\], чтобы задать недельную фазу\n"
+ "/days \\*\\*\\*\\*\\*\\* \\*\\*\\*\\*\\*\\* \\(\\* \\= \\[\\0/1\\]\\), чтобы задать дни дежурств\n"
+ "/new @ Имя Фамилия, чтобы добавить ~салаг~ студетов\n"
+ "/begin \\[@\\], чтобы начать дежурство",
)
@bot.message_handler(commands=["unlink"])
def unlink(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
db.write(str(forum) + ".settings.chat", None)
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Комната отвязана")
@bot.message_handler(commands=["purge"])
def purge_db_people(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
if message.text == "/purge ДА УДАЛЯЙ ДАВАЙ":
db.write(str(forum), None)
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "База студентов удалена")
else:
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"Чтобы подтвердить свои благие намерения, напиши\n"
+ "`/purge ДА УДАЛЯЙ ДАВАЙ`",
)
@bot.message_handler(commands=["forget"])
def forget_db_pending(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
empty = db.read(str(forum) + ".pending") is None
db.write(str(forum) + ".pending", None)
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"База новичков удалена"
+ (", хоть и была уже пустая…" if empty else ""),
)
@bot.message_handler(commands=["admin"])
def set_admin(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
admin = [i for i in message.text.split() if i[0] == "@"]
if len(admin) == 0:
uid = db.read(str(forum) + ".settings.admin.id")
if uid == message.from_user.id:
bot.reply_to(chat, "Ты уже тут главный")
return
db.write(str(forum) + ".settings.admin.id", message.from_user.id)
db.write(
str(forum) + ".settings.admin.username", message.from_user.username
)
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat, "Рад познакомиться, " + message.from_user.first_name + "\\!"
)
else:
admin = admin[0][1:]
uadmin = db.read(str(forum) + ".settings.admin.username")
if uadmin == admin:
bot.reply_to(chat, "Ты уже тут главный")
return
db.write(str(forum) + ".settings.admin.id", None)
db.write(str(forum) + ".settings.admin.username", admin)
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"Теперь @"
+ telebot.formatting.escape_markdown(admin)
+ " тут царь и бог\\!\n"
+ "Напиши /admin в чат, чтобы я знал тебя в лицо",
)
@bot.message_handler(commands=["list"])
def list_users(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
s = message.text.split()[1:]
if len(s) == 0:
s = [""]
r = []
for i in s:
f = find_uids(forum, i)
if f is not None:
r += f
if len(r) == 0:
bot.reply_to(chat, "Никого не нашёл")
return
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
res = []
for i in r:
res.append(format_user_info(forum, i))
bot.reply_to(chat, "\n".join(res))
@bot.message_handler(commands=["remind"])
def remind_users(message: telebot.types.Message):
auto = False
if isinstance(message, int):
auto = True
forum = message
else:
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if not auto and antispam(message, chat, forum):
return
if auto or check_if_admin(message):
pending = db.read(str(forum) + ".pending", {})
r = ""
if len(pending.keys()) == 0:
if not auto and db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
else:
for i in pending.keys():
r += "@" + telebot.formatting.escape_markdown(i) + " "
r += "нужно нажать /new\n"
status = get_status(forum)
if status:
current = db.read(str(forum) + ".rookies.current")
if current is not None:
rookie = db.read(str(forum) + ".people." + current)
if rookie is not None:
r += mention(forum, current) + " сегодня дежурит"
if not auto and db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
if r == "":
if not auto:
bot.reply_to(chat, "Дежурств сегодня нет")
else:
bot.reply_to(chat, r)
@bot.message_handler(commands=["del"])
def del_person(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) < 1:
bot.reply_to(chat, "Давай хоть кого\\-нибудь удалим")
return
if len(args) > 1:
bot.reply_to(chat, "Давай удалять их по очереди")
return
user = args[0]
if user[1:] == bot.get_me().username:
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Ты сейчас быканул\\?")
return
f = find_uids(forum, user)
if f is None:
bot.reply_to(chat, "Никого не нашёл")
return
if len(f) > 1:
bot.reply_to(chat, "Немогу определиться…")
list_users(message)
bot.reply_to(chat, "Конкретезируй плиз")
return
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Скатертью дорога, " + mention(forum, f[0]) + "\\!")
db.pop(str(forum) + ".people." + f[0])
@bot.message_handler(commands=["cleaner"])
def cleaner(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) != 1:
bot.reply_to(chat, "Нужно указать `0` или `1`")
return
if args[0] not in ("0", "1"):
bot.reply_to(chat, "Нужно указать `0` или `1`")
return
state = args[0] == "1"
db.write(str(forum) + ".settings.delete_messages", state)
if state:
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"Очистка команд "
+ ("включена" if state else "отключена")
+ (
"\nЧтобы чистка работала, мне нужно дать право удалять сообщения"
if state
else ""
),
)
@bot.message_handler(commands=["new"])
def add_new(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
args = message.text.split()[1:]
if len(args) == 0:
pending = db.read(str(forum) + ".pending")
if pending is None:
antispam(message, chat, forum)
return
user = message.from_user.username
if user not in pending.keys():
antispam(message, chat, forum)
return
uid = str(message.from_user.id)
db.write(str(forum) + ".people." + uid + ".username", user)
db.write(str(forum) + ".people." + uid + ".name", pending[user]["name"])
db.write(
str(forum) + ".people." + uid + ".surname", pending[user]["surname"]
)
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Рад познакомиться, " + mention(forum, uid) + "\\!")
db.pop(str(forum) + ".pending." + user)
elif check_if_admin(message):
if len(args) != 3:
bot.reply_to(
chat, "Нужно указать @, имя и фамилию\nНе больше, не меньше"
)
return
user = args[0]
if user[0] != "@":
bot.reply_to(chat, "Пользователей нужно помянуть через `@`")
return
user = user[1:]
if user == bot.get_me().username:
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Я тебе не салага\\!")
return
people = db.read(str(forum) + ".people", [])
pending = db.read(str(forum) + ".pending", {})
if any(
[people[i]["username"] == user for i in people]
+ [user in pending.keys()]
):
bot.reply_to(chat, "Пользователь с таким ником уже в базе")
return
name = args[1]
surname = args[2]
db.write(str(forum) + ".pending." + user + ".name", name)
db.write(str(forum) + ".pending." + user + ".surname", surname)
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"Добро пожаловать, @"
+ telebot.formatting.escape_markdown(user)
+ " \\("
+ name
+ " "
+ surname
+ "\\)\\!\n"
+ "Первое твоё обязательство — написать /new",
)
@bot.message_handler(commands=["readd"])
def readd(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) != 2:
bot.reply_to(chat, "Нужно указать id и новый ник\nНе больше, не меньше")
return
uid = args[0]
try:
int(uid)
except ValueError:
bot.reply_to(
chat, "ID — это число\nНа буквах пока считать не научились"
)
return
if db.read(str(forum) + ".people." + uid) is None:
bot.reply_to(chat, "Такого пользователя нет в базе")
return
user = args[1]
if user[0] != "@":
bot.reply_to(chat, "Пользователей нужно помянуть через `@`")
return
user = user[1:]
if user == bot.get_me().username:
bot.reply_to(chat, "Я тебе не салага\\!")
return
db.write(str(forum) + ".people." + uid + ".username", user)
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"@"
+ telebot.formatting.escape_markdown(user)
+ " от меня не скроется\\!",
)
@bot.message_handler(commands=["days"])
def set_days(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) != 2:
bot.reply_to(chat, "Мне нужно два двоичных числа")
return
num1 = args[0]
num2 = args[1]
if len(num1) != 6:
bot.reply_to(chat, "В первом числе не 6 цифр")
return
if len(num2) != 6:
bot.reply_to(chat, "Во втором числе не 6 цифр")
return
if not all([i == "0" or i == "1" for i in num1]):
bot.reply_to(chat, "Цифры двоичные должны быть в первом")
return
if not all([i == "0" or i == "1" for i in num2]):
bot.reply_to(chat, "Цифры двоичные должны быть во втором")
return
db.write(
str(forum) + ".schedule.days",
(
[True if num1[i] == "1" else False for i in range(6)],
[True if num2[i] == "1" else False for i in range(6)],
),
)
message.text = ""
calendar(message)
@bot.message_handler(commands=["calendar"])
def calendar(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
args = message.text.split()[1:]
if len(args) > 1:
bot.reply_to(chat, "Многа букав")
return
en1, en2 = False, False
if len(args) == 1:
en1 = args[0].lower() in "числитель"
en2 = args[0].lower() in "знаменатель"
if en1 == en2 == False:
en1, en2 = True, True
days = db.read(str(forum) + ".schedule.days", ([False] * 6, [False] * 6))
skip = db.read(str(forum) + ".schedule.skip_days", [])
work = db.read(str(forum) + ".schedule.work_days", [])
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"`\\|#\\|Пн \\|Вт \\|Ср \\|Чт \\|Пт \\|Сб \\|\n"
+ (
" Ч"
+ "".join([" " + ("X" if days[0][i] else " ") + " " for i in range(6)])
+ "\n"
if en1
else ""
)
+ (
" З"
+ "".join([" " + ("X" if days[1][i] else " ") + " " for i in range(6)])
if en2
else ""
)
+ "`"
+ "\n\n"
+ "*Пропуски:*\n"
+ telebot.formatting.escape_markdown(
"\n".join([".".join(map(str, (d.day, d.month, d.year))) for d in skip])
)
+ "\n\n"
+ "*Рабочие:*\n"
+ telebot.formatting.escape_markdown(
"\n".join([".".join(map(str, (d.day, d.month, d.year))) for d in work])
),
)
@bot.message_handler(commands=["skip"])
def set_skip_days(message: telebot.types.Message):
forum = message.chat.id
mod_days(
message, str(forum) + ".schedule.skip_days", str(forum) + ".schedule.work_days"
)
@bot.message_handler(commands=["work"])
def set_work_days(message: telebot.types.Message):
forum = message.chat.id
mod_days(
message, str(forum) + ".schedule.work_days", str(forum) + ".schedule.skip_days"
)
@bot.message_handler(commands=["stop"])
def stop_queue(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
status = db.read(str(forum) + ".is_active")
if status is None or not status:
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Держурство уже не идёт")
return
status = False
db.write(str(forum) + ".is_active", status)
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Держурство остановлено")
@bot.message_handler(commands=["honor"])
def add_honor(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) != 1:
bot.reply_to(chat, "Нужно указать человека")
return
user = args[0]
if user[1:] == bot.get_me().username:
bot.reply_to(chat, "Рад, что всегда в почёте")
return
f = find_uids(forum, user)
if f is None:
bot.reply_to(chat, "Никого не нашёл")
return
if len(f) > 1:
bot.reply_to(chat, "Немогу определиться…")
list_users(message)
bot.reply_to(chat, "Конкретезируй плиз")
return
user = f[0]
force = db.read(str(forum) + ".rookies.force_order", [])
honor = db.read(str(forum) + ".rookies.honor_order", [])
people = db.read(str(forum) + ".people")
honor.append(user)
for i in people:
if (i in force) and (i in honor):
force.remove(i)
honor.remove(i)
db.write(str(forum) + ".rookies.force_order", force)
db.write(str(forum) + ".rookies.honor_order", honor)
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"Вы в почёте, "
+ telebot.formatting.escape_markdown(
people[user]["name"] + " " + people[user]["surname"]
),
)
@bot.message_handler(commands=["force"])
def add_force(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) != 1:
bot.reply_to(chat, "Нужно указать человека")
return
user = args[0]
if user[1:] == bot.get_me().username:
bot.reply_to(chat, "Рад, что всегда в почёте")
return
f = find_uids(forum, user)
if f is None:
bot.reply_to(chat, "Никого не нашёл")
return
if len(f) > 1:
bot.reply_to(chat, "Немогу определиться…")
list_users(message)
bot.reply_to(chat, "Конкретезируй плиз")
return
user = f[0]
force = db.read(str(forum) + ".rookies.force_order", [])
honor = db.read(str(forum) + ".rookies.honor_order", [])
people = db.read(str(forum) + ".people")
force.append(user)
for i in people:
if (i in force) and (i in honor):
force.remove(i)
honor.remove(i)
db.write(str(forum) + ".rookies.force_order", force)
db.write(str(forum) + ".rookies.honor_order", honor)
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"Отрабатывай, "
+ telebot.formatting.escape_markdown(
people[user]["name"] + " " + people[user]["surname"]
),
)
@bot.message_handler(commands=["sick"])
def add_sick(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) not in (1, 2):
bot.reply_to(chat, "Нужно указать человека и дату выздоровления")
return
user = args[0]
if user[1:] == bot.get_me().username:
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Ты сейчас быканул\\?")
return
f = find_uids(forum, user)
if f is None:
bot.reply_to(chat, "Никого не нашёл")
return
if len(f) > 1:
bot.reply_to(chat, "Немогу определиться…")
list_users(message)
bot.reply_to(chat, "Конкретезируй плиз")
return
user = f[0]
if len(args) > 1:
dates = parse_dates(forum, args[1:])
if isinstance(dates, str):
bot.reply_to(
chat,
telebot.formatting.escape_markdown(dates)
+ " — это точно дата из ближайшего будущего?",
)
return
if dates is None:
bot.reply_to(chat, "Нечего добавлять")
return
date = dates[0]
else:
date = get_time(forum).date() + dt.timedelta(days=30)
sicks = db.read(str(forum) + ".rookies.sick_order", {})
people = db.read(str(forum) + ".people")
sicks[user] = (date, False)
db.write(str(forum) + ".rookies.sick_order", sicks)
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
telebot.formatting.escape_markdown(
people[user]["name"] + " " + people[user]["surname"]
)
+ " болеет до "
+ telebot.formatting.escape_markdown(
".".join(map(str, (date.day, date.month, date.year)))
),
)
@bot.message_handler(commands=["order"])
def view_order(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
args = message.text.split()[1:]
if len(args) > 0:
bot.reply_to(chat, "Многа букав")
return
order = db.read(str(forum) + ".rookies.order", [])
force = db.read(str(forum) + ".rookies.force_order", [])
honor = db.read(str(forum) + ".rookies.honor_order", [])
sicks = db.read(str(forum) + ".rookies.sick_order", {})
people = db.read(str(forum) + ".people")
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"*Очередь:*\n"
+ telebot.formatting.escape_markdown(
"\n".join(
[people[u]["surname"] + " " + people[u]["name"] for u in order]
)
)
+ "\n\n"
+ "*Почитаемые:*\n"
+ telebot.formatting.escape_markdown(
"\n".join(
[people[u]["surname"] + " " + people[u]["name"] for u in honor]
)
)
+ "\n"
+ "*Виноватые:*\n"
+ telebot.formatting.escape_markdown(
"\n".join(
[people[u]["surname"] + " " + people[u]["name"] for u in force]
)
)
+ "\n"
+ "*Больные:*\n"
+ telebot.formatting.escape_markdown(
"\n".join(
[
people[u]["surname"]
+ " "
+ people[u]["name"]
+ " до "
+ ".".join(
map(
str,
(sicks[u][0].day, sicks[u][0].month, sicks[u][0].year),
)
)
for u in sicks
]
)
),
)
@bot.message_handler(commands=["begin"])
def begin_queue(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) > 1:
bot.reply_to(
chat, "Нужно указать человека, с которого начнётся дежурство"
)
return
if len(args) == 1:
user = args[0]
if user[1:] == bot.get_me().username:
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Ты сейчас быканул\\?")
return
f = find_uids(forum, user)
if f is None:
bot.reply_to(chat, "Никого не нашёл")
return
if len(f) > 1:
bot.reply_to(chat, "Немогу определиться…")
list_users(message)
bot.reply_to(chat, "Конкретезируй плиз")
return
start_with = f[0]
else:
start_with = None
phase = db.read(str(forum) + ".schedule.phase")
if phase is None:
bot.reply_to(chat, "Задай фазу с помощью /phase")
return
status = db.read(str(forum) + ".is_active")
if status:
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Держурство уже идёт")
return
status = True
db.write(str(forum) + ".is_active", status)
stack_update(forum, start_with)
now_date = get_time(forum).date()
start_with = db.read(str(forum) + ".rookies.order", [None])[0]
if start_with is None:
bot.reply_to(chat, "Людей нет")
return
db.write(str(forum) + ".schedule.last_stack_update_date", now_date)
db.write(str(forum) + ".schedule.last_notification_date", now_date)
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"Держурство начнёт "
+ db.read(str(forum) + ".people." + start_with + ".name")
+ " "
+ db.read(str(forum) + ".people." + start_with + ".surname")
+ " ",
)
@bot.message_handler(commands=["phase"])
def set_phase(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) > 1:
bot.reply_to(chat, "Многа букав")
return
en2 = False
if len(args) == 1:
en1 = args[0].lower() in "числитель"
en2 = args[0].lower() in "знаменатель"
if en1 == en2:
bot.reply_to(chat, "Это числитель или знаменатель?")
return
now = get_time(forum).date()
phase = int(bool(en2))
phases = 2
db.write(str(forum) + ".schedule.phase", phase)
db.write(str(forum) + ".schedule.phases", phases)
db.write(str(forum) + ".schedule.last_phase_changing_date", now)
phase = db.read(str(forum) + ".schedule.phase", 0)
phases = db.read(str(forum) + ".schedule.phase", 2)
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat, "Текущая неделя: " + ("знаменатель" if phase else "числитель")
)
@bot.message_handler(commands=["timezone", "tz"])
def set_timezone(message: telebot.types.Message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) > 1:
bot.reply_to(chat, "Многа букав")
return
if len(args) == 1:
try:
args[0] = args[0].lower()
if "utc" in args[0]:
args[0] = args[0][3:]
tz = int(args[0])
if tz not in range(-12, 14 + 1):
raise ValueError
except ValueError:
bot.reply_to(
chat,
"Нужно указать [смещение по UTC](https:"
+ "//ru.wikipedia.org/wiki/%D0%92%D1%81%D"
+ "0%B5%D0%BC%D0%B8%D1%80%D0%BD%D0%BE%D0%"
+ "B5_%D0%BA%D0%BE%D0%BE%D1%80%D0%B4%D0%B"
+ "8%D0%BD%D0%B8%D1%80%D0%BE%D0%B2%D0%B0%"
+ "D0%BD%D0%BD%D0%BE%D0%B5_%D0%B2%D1%80%D"
+ "0%B5%D0%BC%D1%8F)",
)
return
db.write(str(forum) + ".settings.timezone", tz)
tz = db.read(str(forum) + ".settings.timezone", 3)
if db.read(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(
chat,
"Часовой пояс: UTC"
+ ("\\+" if tz >= 0 else "")
+ telebot.formatting.escape_markdown(str(tz)),
)
def get_hours() -> tuple:
return (8, 20)
def stack_update(forum: int, force_reset: bool = False) -> None:
now = get_time(forum)
now_date = now.date()
order = db.read(str(forum) + ".rookies.order", [])
force = db.read(str(forum) + ".rookies.force_order", [])
honor = db.read(str(forum) + ".rookies.honor_order", [])
sicks = db.read(str(forum) + ".rookies.sick_order", {})
people = db.read(str(forum) + ".people", {})
for i in sicks:
if now_date >= sicks[i][0]:
if sicks[i][1]:
prepend_user(forum, ".rookies.force_order", i)
sicks.pop(i)
db.write(str(forum) + ".rookies.sick_order", sicks)
for i in people:
if (i in force) and (i in honor):
force.remove(i)
honor.remove(i)
db.write(str(forum) + ".rookies.force_order", force)
db.write(str(forum) + ".rookies.honor_order", honor)
for i in order:
if i not in people.keys():
order.remove(i)
if len(order) == 0 or force_reset is not False:
order = list(
dict(sorted(people.items(), key=lambda item: item[1]["surname"])).keys()
)
if force_reset is not False:
db.write(str(forum) + ".rookies.force_order", [])
db.write(str(forum) + ".rookies.honor_order", [])
db.write(str(forum) + ".rookies.sick_order", {})
if force_reset is not None:
try:
order = order[order.index(force_reset) :]
except ValueError:
pass
db.write(str(forum) + ".rookies.order", order)
if len(order) == 0:
return
if force_reset is False:
if len(force) > 0:
db.write(
str(forum) + ".rookies.current", pop_user(forum, "rookies.force_order")
)
else:
current = pop_user(forum, "rookies.order")
if current in honor:
honor.remove(current)
db.write(str(forum) + ".rookies.honor_order", honor)
stack_update(forum)
elif any([(sicks[i][1] is False) and (i == current) for i in sicks]):
skipped = list(sicks[current])
skipped[1] = True
sicks[current] = tuple(skipped)
db.write(str(forum) + ".rookies.sick_order", sicks)
stack_update(forum)
else:
db.write(
str(forum) + ".rookies.current", pop_user(forum, "rookies.order")
)
def clean_old_dates(date: dt.datetime, array: str) -> None:
"Removes dates from db's array which are older than `date`."
a = db.read(array)
a = a.filter(lambda x: x >= date, a)
db.write(array, a)
def update(forum: int) -> None:
now = get_time(forum)
now_date = now.date()
now_time = now.time()
last_notif = db.read(str(forum) + ".schedule.last_notification_date")
last_upd_stack = db.read(str(forum) + ".schedule.last_stack_update_date")
is_active = get_status(forum, now_date)
hours_range = get_hours()
change_phase(forum, now_date)
clean_old_dates(now_date, str(forum) + ".schedule.work_days")
clean_old_dates(now_date, str(forum) + ".schedule.skip_days")
if is_active and (last_upd_stack is None or now_date > last_upd_stack):
db.write(str(forum) + ".schedule.last_stack_update_date", now_date)
stack_update(forum)
print(now_time.hour, hours_range)
if now_time.hour in hours_range:
if last_notif is None or now_date > last_notif:
print("Notified")
db.write(str(forum) + ".schedule.last_notification_date", now_date)
remind_users(forum)
def update_notify(forum: int) -> None:
"Notifies the forum about bot's new version."
bot.reply_to(
get_chat(forum),
"Обновился до версии " + telebot.formatting.escape_markdown(CURRENT_VERSION),
)
def process1():
"The process runs telegram infinite polling."
bot.infinity_polling(none_stop=True)
def process2():
"The process updates duty order for every forum once a `period` seconds."
period = int(cr.read("settings.notify_period"))
prev_time = time.time()
while True:
cur_time = time.time()
if cur_time - prev_time >= period:
prev_time = cur_time
stdout.write("Process 2 update\n")
for i in db.read("").keys():
try:
update(int(i))
except ValueError:
pass
funcs = [process1, process2]
threads = map(lambda x: Thread(target=x), funcs)
for thread in threads:
thread.daemon = True
thread.start()
if VERSION != CURRENT_VERSION:
for FORUM in db.read("").keys():
try:
update_notify(int(FORUM))
print("New version notification", FORUM)
except ValueError:
pass
while True:
time.sleep(1)