bot/bozenka/instances/telegram/utils/simpler/solution_simpler.py

323 lines
13 KiB
Python

import logging
import re
import time
from datetime import datetime
from typing import Any
from aiogram import types, Bot
from aiogram.exceptions import TelegramRetryAfter, TelegramNotFound
from sqlalchemy import select, insert, Select, Insert, Update
from aiogram.filters import CommandObject
from aiogram.types import ChatPermissions
from aiogram.enums import ChatMemberStatus
from aiogram.types import ChatPermissions, ChatAdministratorRights
from sqlalchemy.ext.asyncio import async_sessionmaker
from bozenka.database import get_user, Users
from bozenka.database.tables.telegram import get_settings, ChatSettings
async def basic_ban(msg: types.Message, user: types.User, config: dict[str, None | str | bool], session: async_sessionmaker) -> None:
"""
Bans a user by user_id and writes everything in database.
Protects from getting limited as flood
:param msg: Message telegram object
:param user: User telegram object to ban
:param config: Dictionary with information about ban
:param session: Session maker object of SqlAlchemy
:return: Nothing
"""
try:
await msg.chat.ban(user.id, config["ban_time"], config["revert_msg"])
logging.log(
msg=f"Banned user @{msg.reply_to_message.from_user.full_name} id={msg.reply_to_message.from_user.id}",
level=logging.INFO)
if not await get_user(user_id=user.id, chat_id=msg.chat.id, session=session):
new_user = Users(
user_id=user.id,
chat_id=msg.chat.id,
is_banned=True,
ban_reason=None if config["reason"] == "" else config["reason"],
is_muted=None,
mute_reason=None
)
async with session() as session:
async with session.begin():
await session.merge(new_user)
else:
async with session() as session:
async with session.begin():
await session.execute(
Update(Users)
.values(is_banned=True, ban_reason=None if config["reason"] == "" else config["reason"])
.where(Users.user_id == msg.from_user.id and Users.chat_id == msg.chat.id))
except TelegramRetryAfter as ex:
time.sleep(ex.retry_after)
await msg.chat.ban(msg.reply_to_message.from_user.id, config["ban_time"], config["revert_msg"])
logging.log(
msg=f"Banned user @{msg.reply_to_message.from_user.full_name} id={msg.reply_to_message.from_user.id}",
level=logging.INFO)
user = await get_user(user_id=msg.from_user.id, chat_id=msg.chat.id, session=session)
if not user:
new_user = Users(
user_id=msg.from_user.id,
chat_id=msg.chat.id,
is_banned=True,
ban_reason=None if config["reason"] == "" else config["reason"],
is_muted=None,
mute_reason=None
)
async with session() as session:
async with session.begin():
await session.merge(new_user)
else:
async with session() as session:
async with session.begin():
await session.execute(
Update(Users)
.values(is_banned=True, ban_reason=None if config["reason"] == "" else config["reason"])
.where(Users.user_id == msg.from_user.id and Users.chat_id == msg.chat.id))
except TelegramNotFound:
logging.log(
msg=f"Can't ban user, something was not avaible or got disabled",
level=logging.INFO)
def count_time(counted_time: str) -> int:
"""
Counts unix time, for seconds, minutes, hours, days, weeks
:param counted_time: Time, needs to be converted
:return:
"""
mute_times = {"d": int(time.time()) + int(counted_time[0]) * 60 * 60 * 24,
"s": int(time.time()) + int(counted_time[0]),
"m": int(time.time()) + int(counted_time[0]) * 60,
"h": int(time.time()) + int(counted_time[0]) * 60 * 60,
"w": int(time.time()) + int(counted_time[0]) * 60 * 60 * 24 * 7}
return mute_times[counted_time[1]]
class SolutionSimpler:
"""
Making feature 'result in your direct message' easy and cleaner to complete.
Including logging and debugging.
"""
@staticmethod
async def ban_user(msg: types.Message, cmd: CommandObject, session: async_sessionmaker) -> dict[str, None | str | bool]:
"""
Bans user or user, returns config, by config you can send special message.
Support multibans & antiflood telegram protection
:param msg: Message telegram object
:param cmd: Object of telegram command
:param session: Session maker object of SqlAlchemy
:return:
"""
config = {
"revert_msg": None,
"ban_time": None,
"reason": ""
}
if cmd.args:
data = re.split(" ", cmd.args, 2)
for item in data:
if re.match(r"^n\w", item) and not config["revert_msg"]:
config["revert_msg"] = False
elif re.match(r"^ye\w", item) and not config["revert_msg"]:
config["revert_msg"] = True
elif re.match(r"^\d[wdysmh]", item) and not config["ban_time"]:
config["ban_time"] = datetime.utcfromtimestamp(count_time(item)).strftime('%Y-%m-%d %H:%M:%S')
else:
config["reason"] += item + " "
if msg.reply_to_message.text:
await basic_ban(msg, msg.reply_to_message.from_user, config, session)
return config
@staticmethod
async def unban_user(msg: types.Message, session: async_sessionmaker) -> None:
"""
Unbans user by reply, returns nothing
:param msg: Message telegram object
:param session: Session maker object of SqlAlchemy
:return:
"""
await msg.chat.unban(msg.reply_to_message.from_user.id)
logging.log(
msg=f"Unbanned user @{msg.reply_to_message.from_user.full_name} id={msg.reply_to_message.from_user.id}",
level=logging.INFO)
if await get_user(user_id=msg.from_user.id, chat_id=msg.chat.id, session=session):
async with session() as session:
async with session.begin():
await session.execute(
Update(Users)
.values(is_banned=False, ban_reason=None)
.where(Users.user_id == msg.from_user.id and Users.chat_id == msg.chat.id)
)
@staticmethod
async def get_status(msg: types.Message, session: async_sessionmaker) -> dict[str, bool | None | Any]:
"""
Get status of user, is it muted or banned.
:param msg: Message telegram object
:param session: Session maker object of SqlAlchemy
:return:
"""
config = {
"is_banned": None,
"ban_reason": None,
"is_muted": None,
"mute_reason": None,
}
user_tg = await msg.chat.get_member(msg.from_user.id)
user_db = await get_user(user_id=msg.from_user.id, chat_id=msg.chat.id, session=session)
config["is_banned"] = True if user_tg.status == ChatMemberStatus.KICKED else False
config["ban_reason"] = user_db.ban_reason if user_db.ban_reason else None
config["is_muted"] = True if not user_tg.can_send_messages else False
config["mute_reason"] = user_db.mute_reason if user_db.mute_reason else None
return config
@staticmethod
async def mute_user(msg: types.Message, cmd: CommandObject, session: async_sessionmaker):
"""
Mutes user, returns config, by config you can send special message.
:param msg: Message telegram object
:param cmd: Object of telegram command
:param session: Session maker object of SqlAlchemy
:return:
"""
config = {
"mute_time": None,
"reason": ""
}
if cmd.args:
data = re.split(" ", cmd.args, 2)
for item in data:
if re.match(r"^\d[wdysmh]", item) and not config["mute_time"]:
config["mute_time"] = datetime.utcfromtimestamp(count_time(item)).strftime('%Y-%m-%d %H:%M:%S')
else:
config["reason"] += item + " "
perms = ChatPermissions(can_send_messages=False)
await msg.chat.restrict(msg.reply_to_message.from_user.id, until_date=config["mute_time"], permissions=perms)
logging.log(
msg=f"Muted user @{msg.reply_to_message.from_user.full_name} id={msg.reply_to_message.from_user.id}",
level=logging.INFO)
user = await get_user(user_id=msg.from_user.id, chat_id=msg.chat.id, session=session)
user
if not user:
new_user = Users(
user_id=msg.from_user.id,
chat_id=msg.chat.id,
is_banned=None,
ban_reason=None,
is_muted=True,
mute_reason=None if config["reason"] == "" else config["reason"]
)
async with session() as session:
async with session.begin():
await session.merge(new_user)
else:
async with session() as session:
async with session.begin():
await session.execute(
Update(Users)
.values(is_muted=True, mute_reason=None if config["reason"] == "" else config["reason"])
.where(Users.user_id == msg.from_user.id and Users.chat_id == msg.chat.id)
)
return config
@staticmethod
async def unmute_user(msg: types.Message, session: async_sessionmaker):
"""
Unmutes user, returns nothing
:param msg: Message telegram object
:param session: Session maker object of SqlAlchemy
:return:
"""
perms = ChatPermissions(can_send_messages=True)
await msg.chat.restrict(msg.reply_to_message.from_user.id, permissions=perms)
user = await get_user(user_id=msg.from_user.id, chat_id=msg.chat.id, session=session)
logging.log(
msg=f"Unmuted user @{msg.reply_to_message.from_user.full_name} id={msg.reply_to_message.from_user.id}",
level=logging.INFO)
async with session() as session:
async with session.begin():
await session.execute(
Update(Users)
.values(is_muted=False, mute_reason=None, )
.where(Users.user_id == msg.from_user.id and Users.chat_id == msg.chat.id)
)
@staticmethod
async def pin_msg(msg: types.Message) -> None:
"""
Pins replied message, returns nothing
:param msg: Message telegram object
:return:
"""
await msg.chat.pin_message(message_id=msg.reply_to_message.message_id)
logging.log(
msg=f"Pinned message id={msg.reply_to_message.message_id} chat_id={msg.chat.id}",
level=logging.INFO)
@staticmethod
async def unpin_msg(msg: types.Message) -> None:
"""
Unpins replied message, if it pinned, always returns nothing.
:param msg: Message telegram object
:return:
"""
await msg.chat.unpin_message(message_id=msg.reply_to_message.message_id)
logging.log(
msg=f"Unpinned message id={msg.reply_to_message.message_id} chat_id={msg.chat.id}",
level=logging.INFO)
@staticmethod
async def unpin_all_msg(msg: types.Message) -> None:
"""
Unpins all pinned messages, returns nothing
:param msg: Message telegram object
:return:
"""
logging.log(
msg=f"Unpinned all messages chat_id={msg.chat.id}",
level=logging.INFO)
await msg.chat.unpin_all_messages()
@staticmethod
async def create_invite(msg: types.Message, cmd: CommandObject):
"""
Creating invite, returning invite link
:param msg: Message telegram object
:param cmd: Object of telegram command
:return:
"""
if cmd.args and re.match(r"^\d[wdysmh]", cmd.args):
link = await msg.chat.create_invite_link()
link = await msg.chat.create_invite_link()
logging.log(
msg=f"Created invite into chat by @{msg.from_user.full_name} chat_id={msg.chat.id}",
level=logging.INFO)
return link.invite_link
@staticmethod
async def auto_settings(msg: types.Message, session: async_sessionmaker):
"""
Creating settings related to chat automaticly
after adding
:param msg: Message telegram object
:param session: Object of telegram command
"""
print("123")
chat_data = await get_settings(msg.chat.id, session)
print(chat_data)
if not chat_data:
new_chat_data = ChatSettings(chat_id=msg.chat.id)
async with session() as session:
async with session.begin():
await session.merge(new_chat_data)