duty-board-dog/bot.py
2023-12-06 17:04:41 -03:00

1289 lines
53 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# SPDX-FileCopyrightText: 2023 Egor Guslyancev <electromagneticcyclone@disroot.org>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
import telebot
import re
import time
import datetime as dt
import sys
import signal
import os
import subprocess
import pickle as p
from os import path
from sys import stderr, stdout, stdin
from threading import Thread
db = {}
def save_db(dbfo = ".db"):
global db
try:
with open(dbfo, "wb") as fo:
p.dump(db, fo)
except:
stderr.write("Не удалось записать базу!")
sys.exit(1)
def load_db(dbfi = ".db"):
global db
if not path.exists(dbfi):
save_db()
return
try:
with open(dbfi, "rb") as fi:
db = p.load(fi)
except:
stderr.write("Не удалось прочитать базу, пробуем сохранить…")
save_db()
def write_db(field: str, value: any, data = None):
global db
head = False
if data == None:
data = db
head = True
field = field.split(".")
if len(field) == 1:
data[field[0]] = value
else:
if field[0] not in data or data[field[0]] is None:
data[field[0]] = {}
if type(data[field[0]]) is not dict:
raise ValueError("Поле не является группой.")
data[field[0]] = write_db('.'.join(field[1:]), value, data[field[0]])
save_db()
if head:
db = data
return data
def read_db(field: str, default = None, data = None) -> any:
global db
if data == None:
data = db
field = field.split(".")
if field[0] not in data:
return default
if len(field) == 1:
return data[field[0]]
else:
return read_db('.'.join(field[1:]), default, data[field[0]])
def pop_db(field: str):
global db
ret = read_db(field)
field = field.split(".")
val = read_db('.'.join(field[:-1]))
if type(val) is not dict:
return
val.pop(field[-1])
write_db('.'.join(field[:-1]), val)
return ret
load_db()
write_db("about.version", "v1.0rc4")
write_db("about.author", "electromagneticcyclone")
write_db("about.tester", "angelbeautifull")
if (read_db("about.host") is None) and __debug__:
stdout.write("Введите username хоста: ")
stdout.flush()
write_db("about.host", stdin.readline()[:-1])
try:
bot = None
filename = "token.devel" if __debug__ else "token"
with open(filename, encoding = "utf-8") as fi:
for i in fi:
i = i.strip()
pattern = re.compile("^[\\d]{10}:[\\w\\d\\-\\+\\*]{35}$")
matches = pattern.fullmatch(i) is not None
if matches:
bot = telebot.TeleBot(i, parse_mode = "MarkdownV2")
break
if bot is None:
stderr.write("В файле нет токена\n")
raise Exception
except:
stderr.write("Ошибка чтения файла токена\n")
sys.exit(1)
def get_time(forum: int):
return dt.datetime.now(dt.UTC) \
+ dt.timedelta(hours = read_db(str(forum) + ".settings.timezone", 3))
def change_phase(forum: int, date = None):
if date is None:
date = get_time(forum).date()
phase = read_db(str(forum) + ".schedule.phase")
phases = read_db(str(forum) + ".schedule.phases")
last_changed = read_db(str(forum) + ".schedule.last_phase_changing_date")
if None in (phase, last_changed, phases):
return
now = date
weekday = now.weekday()
if not (now > last_changed and weekday == 0):
return
phase = (phase + 1) % phases
write_db(str(forum) + ".schedule.phase", phase)
write_db(str(forum) + ".schedule.last_phase_changing_date", now)
def get_status(forum: int, date = None) -> bool:
if date is None:
date = get_time(forum).date()
now = date
weekday = now.weekday()
if weekday == 6:
return None
phase = read_db(str(forum) + ".schedule.phase", 0)
work_days = read_db(str(forum) + ".schedule.work_days", [])
skip_days = read_db(str(forum) + ".schedule.skip_days", [])
days = read_db(str(forum) + ".schedule.days", ([0] * 6, [0] * 6))
is_active = read_db(str(forum) + ".is_active", False)
if is_active:
if now in work_days:
return True
if now in skip_days:
return 0
if days[phase][weekday]:
return True
return False
def get_chat(message, start = False):
if type(message) is int:
is_forum = True
forum = message
else:
is_forum = message.chat.is_forum
forum = message.chat.id
if (read_db(str(forum)) is None or not is_forum) and not start:
return None
chat = read_db(str(forum) + ".settings.chat")
if type(message) is int:
return chat
if (chat is None) or (chat.id == message.reply_to_message.id):
return message.reply_to_message if is_forum else message
else:
return None
def check_if_admin(message) -> bool:
forum = message.chat.id
admin = read_db(str(forum) + ".settings.admin")
if admin is None:
return True
if admin["id"] is None:
return admin["username"] == message.from_user.username
return admin["id"] == message.from_user.id
def mention(forum: int, user_id: int) -> str:
user_id = str(user_id)
if read_db(str(forum) + ".people." + user_id) is None:
stderr.write("Пользователя с ID " + user_id + " нет в базе.\n")
return
return "[" + read_db(str(forum) + ".people." + user_id + ".name") + " " \
+ read_db(str(forum) + ".people." + user_id + ".surname") + \
"](tg://user?id=" + str(user_id) + ")"
def find_uids(forum: int, s: str):
people = read_db(str(forum) + ".people")
if people is None:
return
if people is None:
return None
if s[0] == '@':
s = s[1:]
f = [i for i in people.keys() if len([j for j in people[i].values() if s in j]) > 0]
if len(f) == 0:
return None
return f
def format_user_info(forum: int, uid):
uid = str(uid)
person = read_db(str(forum) + ".people." + uid)
if person is None:
return ''
r = ''
r += "\\#" + uid + '\n'
for k, i in person.items():
r += ("" + ' ' + k + ' \\= '
+ telebot.formatting.escape_markdown(i) + '\n')
return r
def prepend_user(forum: int, ulist_s: str, uid):
uid = str(uid)
ulist = read_db(str(forum) + "." + ulist_s, [])
ulist = list(set([uid] + ulist))
write_db(str(forum) + "." + ulist_s, ulist)
def append_user(forum: int, ulist_s: str, uid):
uid = str(uid)
ulist = read_db(str(forum) + "." + ulist_s, [])
ulist = list(set(ulist + [uid]))
write_db(str(forum) + "." + ulist_s, ulist)
def pop_user(forum: int, ulist_s: str, uid):
uid = str(uid)
ulist = read_db(str(forum) + "." + ulist_s, [])
r = None
if len(ulist) > 0:
r = ulist.pop(0)
write_db(str(forum) + "." + ulist_s, ulist)
return r
def insert_user_in_current_order(forum: int, uid) -> bool:
uid = str(uid)
order = read_db(str(forum) + ".rookies.order", [])
people = read_db(str(forum) + ".people", {})
current = read_db(str(forum) + ".rookies.current")
if uid not in people:
return False
order = list(map(lambda x: (x, people[x]), order))
if current is not None:
order = dict(sorted(list(order.items()) + [(current, people[current])], key = lambda item: item[1]["surname"]))
order = dict(sorted(list(order.items()) + [uid, people[uid]], key = lambda item: item[1]["surname"]))
pos = list(order.keys()).index(current)
if pos == 0:
return False
else:
write_db(str(forum) + ".rookies.order", list(order.keys())[1:])
return True
timeout = None
caution = True
def antispam(message, chat, forum):
if not check_if_admin(message):
global timeout, caution
if timeout is not None:
period = read_db(str(forum) + ".settings.antispam.period")
if period is None:
write_db(str(forum) + ".settings.antispam.period", 600)
period = 600
if time.time() - timeout < period:
if caution:
bot.reply_to(chat, "*Хватит спамить\\!\\!\\!*")
caution = False
bot.delete_message(forum, message.id)
return True
caution = True
timeout = time.time()
return False
@bot.message_handler(commands=['start'])
def start(message):
forum = message.chat.id
chat = get_chat(message, start)
if chat is not None:
if antispam(message, chat, forum):
return
if message.chat.is_forum:
bot.reply_to(chat,
"Привет\\! Я бот для управления дежурствами и напоминания о них\\. Напиши /link, чтобы привязать комнату\\.")
else:
bot.reply_to(chat,
"Я работаю только на форумах \\(супергруппах с комнатами\\)\\. Пригласи меня в один из них и напиши /start")
@bot.message_handler(commands=['help'])
def help(message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, ""
+ "/admin \\[@\\] — переопределить *админа*\n"
+ "/link — привязать комнату\n"
+ "/unlink — отвязать комнату\n"
+ "/cleaner 0/1 — удалять команды\n"
+ "/new @ Имя Фамилия — добавить ~салагу~ студента\n"
+ "/readd \\# @ — если студент поменял ник, это нужно отметить\n"
+ "/remind — напомнить дежурным подежурить, а новичкам отметится\n"
+ "/forget — забыть всех новичков\n"
+ "/del @/\\#/Имя — убрать студента\n"
+ "/list \\[@/\\#/Имя\\] — получить информацию по имени/фамиили\n"
+ "/purge — удалить форум из базы *\\!\\!\\!ОПАСНО\\!\\!\\!*\n"
+ "/timezone ±n — установить часовой пояс по UTC\n"
+ "/days \\*\\*\\*\\*\\*\\* \\*\\*\\*\\*\\*\\* \\(\\* \\= \\[\\0/1\\]\\) — задать дни недели \\(ПН\\-СБ\\), когда необходимо дежурство, отдельно для числителей и знаменателей\n"
+ "/calendar \\[Числитель/знаменатель\\] — показать календарь дежурств\n"
+ "/phase \\[Числитель/знаменатель\\] — узнать или скорректировать фазу текущей недели\n"
+ "/skip \\[00\\.00\\] — пропустить сегодняшний или заданный день\n"
+ "/work \\[00\\.00\\] — поработать в сегодняшнем или заданном дне\n"
+ "/honor \\[\\-\\]@ — пропуск следующего дежурства, так как студент молодец\n"
+ "/sick @ \\[дата\\] — пропуск дежурства по причине болезни\n"
+ "/force \\[\\-\\]@ — провинившийся дежурит как только так сразу\n"
+ "/order — посмотреть очередь дежурств\n"
+ "/stop — остановить дежурства\n"
+ "/begin \\[@\\] — начать сначала с определённого студента")
@bot.message_handler(commands=['exec'])
def exec_bot(message):
if __debug__:
forum = message.chat.id
chat = get_chat(message, True)
if chat is not None:
if message.from_user.username == read_db("about.host"):
bot.reply_to(chat, telebot.formatting.escape_markdown(
str(eval(' '.join(message.text.split(' ')[1:])))))
else:
bot.delete_message(forum, message.id)
def pretty(d, indent=0):
for key, value in d.items():
print(' ' * indent + str(key))
if isinstance(value, dict):
pretty(value, indent+1)
else:
print(' ' * (indent+1) + str(value))
@bot.message_handler(commands=['info'])
def info(message):
if __debug__:
forum = message.chat.id
chat = get_chat(message, True)
if chat is not None:
bot.delete_message(forum, message.id)
if message.from_user.username == read_db("about.host"):
pretty(db)
@bot.message_handler(commands=['fuck', 'fuck_you', 'fuck-you', 'fuckyou'])
def rude(message):
forum = message.chat.id
chat = get_chat(message, True)
if chat is not None:
bot.delete_message(forum, message.id)
@bot.message_handler(commands=['backup'])
def backup_db(message):
forum = message.chat.id
chat = get_chat(message, True)
if chat is not None:
bot.delete_message(forum, message.id)
if message.from_user.username == read_db("about.host"):
args = message.text.split()[1:]
if len(args) == 0:
args.append('')
save_db(args[0] + ".backup.db")
@bot.message_handler(commands=['restore'])
def restore_db(message):
forum = message.chat.id
chat = get_chat(message, True)
if chat is not None:
bot.delete_message(forum, message.id)
if message.from_user.username == read_db("about.host"):
args = message.text.split()[1:]
if len(args) == 0:
args.append('')
load_db(args[0] + ".backup.db")
@bot.message_handler(commands=['link'])
def link(message):
forum = message.chat.id
chat = get_chat(message, True)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
write_db(str(forum) + ".settings.chat", message.reply_to_message)
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(message.reply_to_message,
"Комната \"" + str(message.reply_to_message.forum_topic_created.name) + "\" привязана\\. "
+ "Чек-лист того, что нужно написать в первую очередь:\n"
+ "/admin \\[@\\], чтобы задать админа\n"
+ "/help, чтобы узнать, что я умею\n"
+ "/tz, чтобы задать часовой пояс\n"
+ "/phase \\[Числитель/знаменатель\\], чтобы задать недельную фазу\n"
+ "/days \\*\\*\\*\\*\\*\\* \\*\\*\\*\\*\\*\\* \\(\\* \\= \\[\\0/1\\]\\), чтобы задать дни дежурств\n"
+ "/new @ Имя Фамилия, чтобы добавить ~салаг~ студетов\n"
+ "/begin \\[@\\], чтобы начать дежурство")
@bot.message_handler(commands=['unlink'])
def unlink(message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
write_db(str(forum) + ".settings.chat", None)
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat,
"Комната отвязана")
@bot.message_handler(commands=['purge'])
def purge_db_people(message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
if message.text == "/purge ДА УДАЛЯЙ ДАВАЙ":
write_db(str(forum), None)
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat,
"База студентов удалена")
else:
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat,
"Чтобы подтвердить свои благие намерения, напиши\n"
+ "`/purge ДА УДАЛЯЙ ДАВАЙ`")
@bot.message_handler(commands=['forget'])
def forget_db_pending(message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
empty = read_db(str(forum) + ".pending") is None
write_db(str(forum) + ".pending", None)
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat,
"База новичков удалена"
+ (", хоть и была уже пустая…" if empty else ""))
@bot.message_handler(commands=['admin'])
def set_admin(message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
admin = [i for i in message.text.split() if i[0] == '@']
if len(admin) == 0:
uid = read_db(str(forum) + ".settings.admin.id")
if uid == message.from_user.id:
bot.reply_to(chat, "Ты уже тут главный")
return
write_db(str(forum) + ".settings.admin.id", message.from_user.id)
write_db(str(forum) + ".settings.admin.username", message.from_user.username)
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat,
"Рад познакомиться, " + message.from_user.first_name + "\\!")
else:
admin = admin[0][1:]
uadmin = read_db(str(forum) + ".settings.admin.username")
if uadmin == admin:
bot.reply_to(chat, "Ты уже тут главный")
return
write_db(str(forum) + ".settings.admin.id", None)
write_db(str(forum) + ".settings.admin.username", admin)
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat,
"Теперь @" + telebot.formatting.escape_markdown(admin) + " тут царь и бог\\!\n"
+ "Напиши /admin в чат, чтобы я знал тебя в лицо")
@bot.message_handler(commands=['list'])
def list_users(message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
s = message.text.split()[1:]
if len(s) == 0:
s = ['']
r = []
for i in s:
f = find_uids(forum, i)
if f is not None:
r += f
if len(r) == 0:
bot.reply_to(chat, "Никого не нашёл")
return
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
res = []
for i in r:
res.append(format_user_info(forum, i))
bot.reply_to(chat, '\n'.join(res))
@bot.message_handler(commands=['remind'])
def remind_users(message):
auto = False
if type(message) is int:
auto = True
forum = message
else:
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if not auto and antispam(message, chat, forum):
return
if auto or check_if_admin(message):
pending = read_db(str(forum) + ".pending", {})
r = ""
if len(pending.keys()) == 0:
if not auto and read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
else:
for i in pending.keys():
r += "@" + telebot.formatting.escape_markdown(i) + " "
r += "нужно нажать /new\n"
status = get_status(forum)
if status:
current = read_db(str(forum) + ".rookies.current")
if current is not None:
rookie = read_db(str(forum) + ".people." + current)
if rookie is not None:
r += mention(forum, current) + " сегодня дежурит"
if not auto and read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
if r == "":
if not auto:
bot.reply_to(chat, "Дежурств сегодня нет")
else:
bot.reply_to(chat, r)
@bot.message_handler(commands=['del'])
def del_person(message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) < 1:
bot.reply_to(chat, "Давай хоть кого\\-нибудь удалим")
return
if len(args) > 1:
bot.reply_to(chat, "Давай удалять их по очереди")
return
user = args[0]
if user[1:] == bot.get_me().username:
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Ты сейчас быканул\\?")
return
f = find_uids(forum, user)
if f is None:
bot.reply_to(chat, "Никого не нашёл")
return
if len(f) > 1:
bot.reply_to(chat, "Немогу определиться…")
list_users(message)
bot.reply_to(chat, "Конкретезируй плиз")
return
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Скатертью дорога, " + mention(forum, f[0]) + "\\!")
pop_db(str(forum) + ".people." + f[0])
@bot.message_handler(commands=['cleaner'])
def cleaner(message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) != 1:
bot.reply_to(chat, "Нужно указать `0` или `1`")
return
if args[0] not in ('0', '1'):
bot.reply_to(chat, "Нужно указать `0` или `1`")
return
state = (args[0] == '1')
write_db(str(forum) + ".settings.delete_messages", state)
if state:
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Очистка команд " + ("включена" if state else "отключена") + (
"\nЧтобы чистка работала, мне нужно дать право удалять сообщения" if state else ""))
@bot.message_handler(commands=['new'])
def add_new(message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
args = message.text.split()[1:]
if len(args) == 0:
pending = read_db(str(forum) + ".pending")
if pending is None:
antispam(message, chat, forum)
return
user = message.from_user.username
if user not in pending.keys():
antispam(message, chat, forum)
return
uid = str(message.from_user.id)
write_db(str(forum) + ".people." + uid + ".username", user)
write_db(str(forum) + ".people." + uid + ".name", pending[user]["name"])
write_db(str(forum) + ".people." + uid + ".surname", pending[user]["surname"])
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat,
"Рад познакомиться, " + mention(forum, uid) + "\\!")
pop_db(str(forum) + ".pending." + user)
elif check_if_admin(message):
if len(args) != 3:
bot.reply_to(chat, "Нужно указать @, имя и фамилию\nНе больше, не меньше")
return
user = args[0]
if user[0] != '@':
bot.reply_to(chat, "Пользователей нужно помянуть через `@`")
return
user = user[1:]
if user == bot.get_me().username:
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Я тебе не салага\\!")
return
people = read_db(str(forum) + ".people", [])
pending = read_db(str(forum) + ".pending", {})
if any( [people[i]["username"] == user for i in people]
+ [user in pending.keys()]):
bot.reply_to(chat, "Пользователь с таким ником уже в базе")
return
name = args[1]
surname = args[2]
write_db(str(forum) + ".pending." + user + ".name", name)
write_db(str(forum) + ".pending." + user + ".surname", surname)
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat,
"Добро пожаловать, @" + telebot.formatting.escape_markdown(user)
+ " \\(" + name + " " + surname + "\\)\\!\n"
+ "Первое твоё обязательство — написать /new")
@bot.message_handler(commands=['readd'])
def readd(message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) != 2:
bot.reply_to(chat, "Нужно указать id и новый ник\nНе больше, не меньше")
return
uid = args[0]
try:
int(uid)
except:
bot.reply_to(chat, "ID — это число\nНа буквах пока считать не научились")
return
if read_db(str(forum) + ".people." + uid) is None:
bot.reply_to(chat, "Такого пользователя нет в базе")
return
user = args[1]
if user[0] != '@':
bot.reply_to(chat, "Пользователей нужно помянуть через `@`")
return
user = user[1:]
if user == bot.get_me().username:
bot.reply_to(chat, "Я тебе не салага\\!")
return
write_db(str(forum) + ".people." + uid + ".username", user)
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "@" + telebot.formatting.escape_markdown(user) + " от меня не скроется\\!")
@bot.message_handler(commands=['days'])
def days(message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) != 2:
bot.reply_to(chat, "Мне нужно два двоичных числа")
return
num1 = args[0]
num2 = args[1]
if len(num1) != 6:
bot.reply_to(chat, "В первом числе не 6 цифр")
return
if len(num2) != 6:
bot.reply_to(chat, "Во втором числе не 6 цифр")
return
if not all([i == '0' or i == '1' for i in num1]):
bot.reply_to(chat, "Цифры двоичные должны быть в первом")
return
if not all([i == '0' or i == '1' for i in num2]):
bot.reply_to(chat, "Цифры двоичные должны быть во втором")
return
write_db(str(forum) + ".schedule.days", ( [True if num1[i] == '1' else False for i in range(6)],
[True if num2[i] == '1' else False for i in range(6)] ))
message.text = ""
calendar(message)
@bot.message_handler(commands=['calendar'])
def calendar(message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
args = message.text.split()[1:]
if len(args) > 1:
bot.reply_to(chat, "Многа букав")
return
en1, en2 = False, False
if len(args) == 1:
en1 = args[0].lower() in "числитель"
en2 = args[0].lower() in "знаменатель"
if en1 == en2 == False:
en1, en2 = True, True
days = read_db(str(forum) + ".schedule.days", ([False] * 6, [False] * 6))
skip = read_db(str(forum) + ".schedule.skip_days", [])
work = read_db(str(forum) + ".schedule.work_days", [])
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat,
"`\\|#\\|Пн \\|Вт \\|Ср \\|Чт \\|Пт \\|Сб \\|\n"
+ (" Ч" + ''.join([" " + ("X" if days[0][i] else " ") + " " for i in range(6)]) + '\n' if en1 else "")
+ (" З" + ''.join([" " + ("X" if days[1][i] else " ") + " " for i in range(6)]) if en2 else "")
+ '`'
+ '\n\n'
+ "*Пропуски:*\n" + telebot.formatting.escape_markdown(
"\n".join(['.'.join(map(str, (d.day, d.month, d.year))) for d in skip]))
+ '\n\n'
+ "*Рабочие:*\n" + telebot.formatting.escape_markdown(
"\n".join(['.'.join(map(str, (d.day, d.month, d.year))) for d in work])))
def parse_dates(forum: int, args):
dates = []
OK = True
for a in args:
if a.lower() == "сегодня":
dates.append(get_time(forum).date())
continue
if a.lower() == "завтра":
dates.append(get_time(forum).date() + dt.timedelta(days=1))
continue
if a.lower() == "послезавтра":
dates.append(get_time(forum).date() + dt.timedelta(days=2))
continue
if a.lower() == "вчера":
dates.append(get_time(forum).date() - dt.timedelta(days=1))
continue
if a.lower() == "позавчера":
dates.append(get_time(forum).date() - dt.timedelta(days=2))
continue
d = a.split('.')
a_dates = []
if len(d) in (2, 3):
try:
d = list(map(int, d))
except:
print("Ne ok")
OK = False
if OK:
cur_date = get_time(forum).date() - dt.timedelta(days=1)
cur_year = cur_date.year
if len(d) == 2:
years = [cur_year, cur_year + 1]
else:
years = [(cur_year // 100 * 100) + d[2] if (d[2] < 100) else d[2]]
for y in years:
try:
a_dates.append(dt.datetime(y, d[1], d[0]).date())
except:
pass
a_dates = sorted(filter(lambda x: cur_date + dt.timedelta(days=120) > x > cur_date, a_dates))
if len(a_dates) == 0:
OK = False
else:
OK = False
if OK:
dates.append(a_dates[0])
else:
return a
return dates
def mod_days(message, target, neighbour):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) == 0:
dates = [get_time(forum).date()]
else:
dates = parse_dates(forum, args)
if type(dates) is str:
bot.reply_to(chat, telebot.formatting.escape_markdown(dates)
+ " — это точно дата из ближайшего будущего?")
return
if dates is None:
bot.reply_to(chat, "Нечего добавлять")
return
t = read_db(target)
if t is None:
t = []
n = read_db(neighbour)
if n is None:
n = []
write_db(neighbour, list(filter(lambda x: x not in dates, n)))
write_db(target, list(sorted(set(t + dates))))
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat,
"Добавил " + telebot.formatting.escape_markdown(
", ".join(['.'.join(map(str, (d.day, d.month, d.year))) for d in dates])))
@bot.message_handler(commands=['skip'])
def skip_days(message):
forum = message.chat.id
mod_days(message, str(forum) + ".schedule.skip_days", str(forum) + ".schedule.work_days")
@bot.message_handler(commands=['work'])
def work_days(message):
forum = message.chat.id
mod_days(message, str(forum) + ".schedule.work_days", str(forum) + ".schedule.skip_days")
@bot.message_handler(commands=['stop'])
def stop_queue(message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
status = read_db(str(forum) + ".is_active")
if status is None or not status:
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Держурство уже не идёт")
return
status = False
write_db(str(forum) + ".is_active", status)
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Держурство остановлено")
@bot.message_handler(commands=['honor'])
def add_honor(message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) != 1:
bot.reply_to(chat, "Нужно указать человека")
return
user = args[0]
if user[1:] == bot.get_me().username:
bot.reply_to(chat, "Рад, что всегда в почёте")
return
f = find_uids(forum, user)
if f is None:
bot.reply_to(chat, "Никого не нашёл")
return
if len(f) > 1:
bot.reply_to(chat, "Немогу определиться…")
list_users(message)
bot.reply_to(chat, "Конкретезируй плиз")
return
user = f[0]
force = read_db(str(forum) + ".rookies.force_order", [])
honor = read_db(str(forum) + ".rookies.honor_order", [])
people = read_db(str(forum) + ".people")
honor.append(user)
for i in people:
if (i in force) and (i in honor):
force.remove(i)
honor.remove(i)
write_db(str(forum) + ".rookies.force_order", force)
write_db(str(forum) + ".rookies.honor_order", honor)
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Вы в почёте, " + telebot.formatting.escape_markdown(
people[user]['name'] + " " + people[user]['surname']))
@bot.message_handler(commands=['force'])
def add_force(message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) != 1:
bot.reply_to(chat, "Нужно указать человека")
return
user = args[0]
if user[1:] == bot.get_me().username:
bot.reply_to(chat, "Рад, что всегда в почёте")
return
f = find_uids(forum, user)
if f is None:
bot.reply_to(chat, "Никого не нашёл")
return
if len(f) > 1:
bot.reply_to(chat, "Немогу определиться…")
list_users(message)
bot.reply_to(chat, "Конкретезируй плиз")
return
user = f[0]
force = read_db(str(forum) + ".rookies.force_order", [])
honor = read_db(str(forum) + ".rookies.honor_order", [])
people = read_db(str(forum) + ".people")
force.append(user)
for i in people:
if (i in force) and (i in honor):
force.remove(i)
honor.remove(i)
write_db(str(forum) + ".rookies.force_order", force)
write_db(str(forum) + ".rookies.honor_order", honor)
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Отрабатывай, " + telebot.formatting.escape_markdown(
people[user]['name'] + " " + people[user]['surname']))
@bot.message_handler(commands=['sick'])
def add_sick(message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) not in (1, 2):
bot.reply_to(chat, "Нужно указать человека и дату выздоровления")
return
user = args[0]
if user[1:] == bot.get_me().username:
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Ты сейчас быканул\\?")
return
f = find_uids(forum, user)
if f is None:
bot.reply_to(chat, "Никого не нашёл")
return
if len(f) > 1:
bot.reply_to(chat, "Немогу определиться…")
list_users(message)
bot.reply_to(chat, "Конкретезируй плиз")
return
user = f[0]
if len(args) > 1:
dates = parse_dates(forum, args[1:])
if type(dates) is str:
bot.reply_to(chat, telebot.formatting.escape_markdown(dates)
+ " — это точно дата из ближайшего будущего?")
return
if dates is None:
bot.reply_to(chat, "Нечего добавлять")
return
date = dates[0]
else:
date = get_time(forum).date() + dt.timedelta(days=30)
sicks = read_db(str(forum) + ".rookies.sick_order", {})
people = read_db(str(forum) + ".people")
sicks[user] = (date, False)
write_db(str(forum) + ".rookies.sick_order", sicks)
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat,
telebot.formatting.escape_markdown(
people[user]['name'] + " " + people[user]['surname'])
+ " болеет до " + telebot.formatting.escape_markdown(
'.'.join(map(str, (date.day, date.month, date.year)))))
@bot.message_handler(commands=['order'])
def view_order(message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
args = message.text.split()[1:]
if len(args) > 0:
bot.reply_to(chat, "Многа букав")
return
order = read_db(str(forum) + ".rookies.order", [])
force = read_db(str(forum) + ".rookies.force_order", [])
honor = read_db(str(forum) + ".rookies.honor_order", [])
sicks = read_db(str(forum) + ".rookies.sick_order", {})
people = read_db(str(forum) + ".people")
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat,
"*Очередь:*\n" + telebot.formatting.escape_markdown(
"\n".join([people[u]["surname"] + " " + people[u]["name"] for u in order]))
+ '\n\n'
+ "*Почитаемые:*\n" + telebot.formatting.escape_markdown(
"\n".join([people[u]["surname"] + " " + people[u]["name"] for u in honor]))
+ '\n'
+ "*Виноватые:*\n" + telebot.formatting.escape_markdown(
"\n".join([people[u]["surname"] + " " + people[u]["name"] for u in force]))
+ '\n'
+ "*Больные:*\n" + telebot.formatting.escape_markdown(
"\n".join([ people[u]["surname"] + " " + people[u]["name"]
+ " до " + '.'.join(map(str,
(sicks[u][0].day, sicks[u][0].month, sicks[u][0].year)))
for u in sicks])))
@bot.message_handler(commands=['begin'])
def begin_queue(message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) > 1:
bot.reply_to(chat, "Нужно указать человека, с которого начнётся дежурство")
return
if len(args) == 1:
user = args[0]
if user[1:] == bot.get_me().username:
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Ты сейчас быканул\\?")
return
f = find_uids(forum, user)
if f is None:
bot.reply_to(chat, "Никого не нашёл")
return
if len(f) > 1:
bot.reply_to(chat, "Немогу определиться…")
list_users(message)
bot.reply_to(chat, "Конкретезируй плиз")
return
start_with = f[0]
else:
start_with = None
phase = read_db(str(forum) + ".schedule.phase")
if phase is None:
bot.reply_to(chat, "Задай фазу с помощью /phase")
return
status = read_db(str(forum) + ".is_active")
if status:
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat, "Держурство уже идёт")
return
status = True
write_db(str(forum) + ".is_active", status)
stack_update(forum, start_with)
now_date = get_time(forum).date()
start_with = read_db(str(forum) + ".rookies.order", [None])[0]
if start_with is None:
bot.reply_to(chat, "Людей нет")
return
write_db(str(forum) + ".schedule.last_stack_update_date", now_date)
write_db(str(forum) + ".schedule.last_notification_date", now_date)
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat,
"Держурство начнёт "
+ read_db(str(forum) + ".people." + start_with + ".name") + " "
+ read_db(str(forum) + ".people." + start_with + ".surname") + " ")
@bot.message_handler(commands=['phase'])
def phase(message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) > 1:
bot.reply_to(chat, "Многа букав")
return
else:
en2 = False
if len(args) == 1:
en1 = args[0].lower() in "числитель"
en2 = args[0].lower() in "знаменатель"
if en1 == en2:
bot.reply_to(chat, "Это числитель или знаменатель?")
return
now = get_time(forum).date()
phase = int(bool(en2))
phases = 2
write_db(str(forum) + ".schedule.phase", phase)
write_db(str(forum) + ".schedule.phases", phases)
write_db(str(forum) + ".schedule.last_phase_changing_date", now)
phase = read_db(str(forum) + ".schedule.phase", 0)
phases = read_db(str(forum) + ".schedule.phase", 2)
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat,
"Текущая неделя: " + ("знаменатель" if phase else "числитель"))
@bot.message_handler(commands=['timezone', 'tz'])
def set_timezone(message):
forum = message.chat.id
chat = get_chat(message)
if chat is not None:
if antispam(message, chat, forum):
return
if check_if_admin(message):
args = message.text.split()[1:]
if len(args) > 1:
bot.reply_to(chat, "Многа букав")
return
if len(args) == 1:
try:
args[0] = args[0].lower()
if "utc" in args[0]:
args[0] = args[0][3:]
tz = int(args[0])
if tz not in range(-12, 14 + 1):
raise ValueError
except ValueError:
bot.reply_to(chat, "Нужно указать [смещение по UTC](https://ru.wikipedia.org/wiki/%D0%92%D1%81%D0%B5%D0%BC%D0%B8%D1%80%D0%BD%D0%BE%D0%B5_%D0%BA%D0%BE%D0%BE%D1%80%D0%B4%D0%B8%D0%BD%D0%B8%D1%80%D0%BE%D0%B2%D0%B0%D0%BD%D0%BD%D0%BE%D0%B5_%D0%B2%D1%80%D0%B5%D0%BC%D1%8F)")
return
write_db(str(forum) + ".settings.timezone", tz)
tz = read_db(str(forum) + ".settings.timezone", 3)
if read_db(str(forum) + ".settings.delete_messages"):
bot.delete_message(forum, message.id)
bot.reply_to(chat,
"Часовой пояс: UTC" + ("\\+" if tz >= 0 else "") +
telebot.formatting.escape_markdown(str(tz)))
def get_hours(forum: int) -> list:
utc_range = (8, 20)
return utc_range
# Obsoleted by `get_time`
# tz = read_db(str(forum) + ".settings.timezone", 3)
# utc_range = tuple(map(lambda x: (x - tz) % 24, utc_range))
# if utc_range[0] > utc_range[1]:
# return list(range(utc_range[0], 24)) + list(range(0, utc_range[1]))
# else:
# return list(range(utc_range[0], utc_range[1]))
def stack_update(forum: int, force_reset = False):
now = get_time(forum)
now_date = now.date()
order = read_db(str(forum) + ".rookies.order", [])
force = read_db(str(forum) + ".rookies.force_order", [])
honor = read_db(str(forum) + ".rookies.honor_order", [])
sicks = read_db(str(forum) + ".rookies.sick_order", {})
people = read_db(str(forum) + ".people", {})
for i in sicks:
if now_date >= sicks[i][0]:
if sicks[i][1]:
prepend_user(forum, ".rookies.force_order", i)
sicks.pop(i)
write_db(str(forum) + ".rookies.sick_order", sicks)
for i in people:
if (i in force) and (i in honor):
force.remove(i)
honor.remove(i)
write_db(str(forum) + ".rookies.force_order", force)
write_db(str(forum) + ".rookies.honor_order", honor)
for i in order:
if i not in people.keys():
order.remove(i)
if len(order) == 0 or force_reset != False:
order = list(dict(sorted(people.items(), key = lambda item: item[1]["surname"])).keys())
if force_reset != False:
write_db(str(forum) + ".rookies.force_order", [])
write_db(str(forum) + ".rookies.honor_order", [])
write_db(str(forum) + ".rookies.sick_order", {})
if force_reset is not None:
try:
order = order[order.index(force_reset):]
except:
pass
write_db(str(forum) + ".rookies.order", order)
if len(order) == 0:
return
if force_reset == False:
if len(force) > 0:
write_db(str(forum) + ".rookies.current", pop_user(forum, "rookies.force_order"))
else:
current = pop_user(forum, "rookies.order")
if current in honor:
honor.remove(current)
write_db(str(forum) + ".rookies.honor_order", honor)
stack_update(forum)
elif any([(sicks[i][1] == False) and (i == current) for i in sicks]):
skipped = list(sicks[current])
skipped[1] = True
sicks[current] = tuple(skipped)
write_db(str(forum) + ".rookies.sick_order", sicks)
stack_update(forum)
else:
write_db(str(forum) + ".rookies.current", pop_user(forum, "rookies.order"))
def clean_old_dates(date, array: str):
a = read_db(array)
for i in range(len(a)):
if a[i] < date:
a[i] = None
a = [i for i in a if i is not None]
write_db(array, a)
def update(forum: int):
now = get_time(forum)
now_date = now.date()
now_time = now.time()
last_notif = read_db(str(forum) + ".schedule.last_notification_date")
last_upd_stack = read_db(str(forum) + ".schedule.last_stack_update_date")
is_active = get_status(forum, now_date)
hours_range = get_hours(forum)
change_phase(forum, now_date)
clean_old_dates(now_date, str(forum) + ".schedule.work_days")
clean_old_dates(now_date, str(forum) + ".schedule.skip_days")
if is_active and (last_upd_stack is None or now_date > last_upd_stack):
write_db(str(forum) + ".schedule.last_stack_update_date", now_date)
stack_update(forum)
print(now_time.hour, hours_range)
if now_time.hour in hours_range:
if last_notif is None or now_date > last_notif:
print("Notified")
write_db(str(forum) + ".schedule.last_notification_date", now_date)
remind_users(forum)
def process1():
bot.infinity_polling(none_stop=True)
def process2():
period = 120
prev_time = time.time() - period
while True:
cur_time = time.time()
if cur_time - prev_time >= period:
prev_time = cur_time
stdout.write("Update\n")
for i in db.keys():
try:
update(int(i))
except ValueError:
pass
funcs = [process1, process2]
threads = map(lambda x: Thread(target = x), funcs)
for i in threads:
i.daemon = True
i.start()
while True:
time.sleep(1)