bot/bozenka/instances/telegram/utils/simpler/solution_simpler.py

321 lines
13 KiB
Python
Raw Normal View History

2023-10-28 19:25:10 +02:00
import logging
import re
import time
from datetime import datetime
from typing import Any
from aiogram import types, Bot
from aiogram.exceptions import TelegramRetryAfter, TelegramNotFound
2023-10-28 19:25:10 +02:00
from sqlalchemy import select, insert, Select, Insert, Update
from aiogram.filters import CommandObject
from aiogram.types import ChatPermissions
from aiogram.enums import ChatMemberStatus
from aiogram.types import ChatPermissions, ChatAdministratorRights
from sqlalchemy.ext.asyncio import async_sessionmaker
from bozenka.database import get_user, Users
from bozenka.database.tables.telegram import get_settings, ChatSettings
2023-11-25 17:49:27 +01:00
async def basic_ban(msg: types.Message, user: types.User, config: dict[str, None | str | bool], session: async_sessionmaker) -> None:
"""
Bans a user by user_id and writes everything in database.
Protects from getting limited as flood
:param msg: Message telegram object
:param user: User telegram object to ban
:param config: Dictionary with information about ban
:param session: Session maker object of SqlAlchemy
:return: Nothing
"""
try:
await msg.chat.ban(user.id, config["ban_time"], config["revert_msg"])
logging.log(
msg=f"Banned user @{msg.reply_to_message.from_user.full_name} id={msg.reply_to_message.from_user.id}",
level=logging.INFO)
if not await get_user(user_id=user.id, chat_id=msg.chat.id, session=session):
new_user = Users(
user_id=user.id,
chat_id=msg.chat.id,
is_banned=True,
ban_reason=None if config["reason"] == "" else config["reason"],
is_muted=None,
mute_reason=None
)
async with session() as session:
async with session.begin():
await session.merge(new_user)
else:
async with session() as session:
async with session.begin():
await session.execute(
Update(Users)
.values(is_banned=True, ban_reason=None if config["reason"] == "" else config["reason"])
.where(Users.user_id == msg.from_user.id and Users.chat_id == msg.chat.id))
except TelegramRetryAfter as ex:
time.sleep(ex.retry_after)
await msg.chat.ban(msg.reply_to_message.from_user.id, config["ban_time"], config["revert_msg"])
logging.log(
msg=f"Banned user @{msg.reply_to_message.from_user.full_name} id={msg.reply_to_message.from_user.id}",
level=logging.INFO)
user = await get_user(user_id=msg.from_user.id, chat_id=msg.chat.id, session=session)
if not user:
new_user = Users(
user_id=msg.from_user.id,
chat_id=msg.chat.id,
is_banned=True,
ban_reason=None if config["reason"] == "" else config["reason"],
is_muted=None,
mute_reason=None
)
async with session() as session:
async with session.begin():
await session.merge(new_user)
else:
async with session() as session:
async with session.begin():
await session.execute(
Update(Users)
.values(is_banned=True, ban_reason=None if config["reason"] == "" else config["reason"])
.where(Users.user_id == msg.from_user.id and Users.chat_id == msg.chat.id))
except TelegramNotFound:
logging.log(
msg=f"Can't ban user, something was not avaible or got disabled",
level=logging.INFO)
2023-10-28 19:25:10 +02:00
def count_time(counted_time: str) -> int:
"""
Counts unix time, for seconds, minutes, hours, days, weeks
:param counted_time: Time, needs to be converted
2023-10-28 19:25:10 +02:00
:return:
"""
mute_times = {"d": int(time.time()) + int(counted_time[0]) * 60 * 60 * 24,
"s": int(time.time()) + int(counted_time[0]),
"m": int(time.time()) + int(counted_time[0]) * 60,
"h": int(time.time()) + int(counted_time[0]) * 60 * 60,
"w": int(time.time()) + int(counted_time[0]) * 60 * 60 * 24 * 7}
return mute_times[counted_time[1]]
class SolutionSimpler:
"""
Making feature 'result in your direct message' easy and cleaner to complete.
Including logging and debugging.
"""
2023-10-28 19:25:10 +02:00
@staticmethod
async def ban_user(msg: types.Message, cmd: CommandObject, session: async_sessionmaker) -> dict[str, None | str | bool]:
2023-10-28 19:25:10 +02:00
"""
Bans user or user, returns config, by config you can send special message.
Support multibans & antiflood telegram protection
:param msg: Message telegram object
:param cmd: Object of telegram command
:param session: Session maker object of SqlAlchemy
2023-10-28 19:25:10 +02:00
:return:
"""
config = {
"revert_msg": None,
"ban_time": None,
"reason": ""
}
if cmd.args:
data = re.split(" ", cmd.args, 2)
for item in data:
if re.match(r"^n\w", item) and not config["revert_msg"]:
config["revert_msg"] = False
elif re.match(r"^ye\w", item) and not config["revert_msg"]:
config["revert_msg"] = True
elif re.match(r"^\d[wdysmh]", item) and not config["ban_time"]:
config["ban_time"] = datetime.utcfromtimestamp(count_time(item)).strftime('%Y-%m-%d %H:%M:%S')
else:
config["reason"] += item + " "
if msg.reply_to_message.text:
await basic_ban(msg, msg.reply_to_message.from_user, config, session)
2023-10-28 19:25:10 +02:00
return config
@staticmethod
async def unban_user(msg: types.Message, session: async_sessionmaker) -> None:
"""
Unbans user by reply, returns nothing
:param msg: Message telegram object
:param session: Session maker object of SqlAlchemy
2023-10-28 19:25:10 +02:00
:return:
"""
await msg.chat.unban(msg.reply_to_message.from_user.id)
logging.log(
msg=f"Unbanned user @{msg.reply_to_message.from_user.full_name} id={msg.reply_to_message.from_user.id}",
level=logging.INFO)
if await get_user(user_id=msg.from_user.id, chat_id=msg.chat.id, session=session):
async with session() as session:
async with session.begin():
await session.execute(
Update(Users)
.values(is_banned=False, ban_reason=None)
.where(Users.user_id == msg.from_user.id and Users.chat_id == msg.chat.id)
)
2023-10-28 19:25:10 +02:00
@staticmethod
async def get_status(msg: types.Message, session: async_sessionmaker) -> dict[str, bool | None | Any]:
"""
Get status of user, is it muted or banned.
:param msg: Message telegram object
:param session: Session maker object of SqlAlchemy
2023-10-28 19:25:10 +02:00
:return:
"""
config = {
"is_banned": None,
"ban_reason": None,
"is_muted": None,
"mute_reason": None,
}
user_tg = await msg.chat.get_member(msg.from_user.id)
user_db = await get_user(user_id=msg.from_user.id, chat_id=msg.chat.id, session=session)
config["is_banned"] = True if user_tg.status == ChatMemberStatus.KICKED else False
config["ban_reason"] = user_db.ban_reason if user_db.ban_reason else None
config["is_muted"] = True if not user_tg.can_send_messages else False
config["mute_reason"] = user_db.mute_reason if user_db.mute_reason else None
return config
@staticmethod
async def mute_user(msg: types.Message, cmd: CommandObject, session: async_sessionmaker):
"""
Mutes user, returns config, by config you can send special message.
:param msg: Message telegram object
:param cmd: Object of telegram command
:param session: Session maker object of SqlAlchemy
2023-10-28 19:25:10 +02:00
:return:
"""
config = {
"mute_time": None,
"reason": ""
}
if cmd.args:
data = re.split(" ", cmd.args, 2)
for item in data:
if re.match(r"^\d[wdysmh]", item) and not config["mute_time"]:
config["mute_time"] = datetime.utcfromtimestamp(count_time(item)).strftime('%Y-%m-%d %H:%M:%S')
else:
config["reason"] += item + " "
perms = ChatPermissions(can_send_messages=False)
await msg.chat.restrict(msg.reply_to_message.from_user.id, until_date=config["mute_time"], permissions=perms)
logging.log(
msg=f"Muted user @{msg.reply_to_message.from_user.full_name} id={msg.reply_to_message.from_user.id}",
level=logging.INFO)
user = await get_user(user_id=msg.from_user.id, chat_id=msg.chat.id, session=session)
if not user:
new_user = Users(
user_id=msg.from_user.id,
chat_id=msg.chat.id,
is_banned=None,
ban_reason=None,
is_muted=True,
mute_reason=None if config["reason"] == "" else config["reason"]
)
async with session() as session:
async with session.begin():
await session.merge(new_user)
else:
async with session() as session:
async with session.begin():
await session.execute(
Update(Users)
.values(is_muted=True, mute_reason=None if config["reason"] == "" else config["reason"])
.where(Users.user_id == msg.from_user.id and Users.chat_id == msg.chat.id)
)
return config
@staticmethod
async def unmute_user(msg: types.Message, session: async_sessionmaker):
"""
Unmutes user, returns nothing
:param msg: Message telegram object
:param session: Session maker object of SqlAlchemy
2023-10-28 19:25:10 +02:00
:return:
"""
perms = ChatPermissions(can_send_messages=True)
await msg.chat.restrict(msg.reply_to_message.from_user.id, permissions=perms)
user = await get_user(user_id=msg.from_user.id, chat_id=msg.chat.id, session=session)
logging.log(
msg=f"Unmuted user @{msg.reply_to_message.from_user.full_name} id={msg.reply_to_message.from_user.id}",
level=logging.INFO)
async with session() as session:
async with session.begin():
await session.execute(
Update(Users)
.values(is_muted=False, mute_reason=None, )
.where(Users.user_id == msg.from_user.id and Users.chat_id == msg.chat.id)
)
@staticmethod
async def pin_msg(msg: types.Message) -> None:
"""
Pins replied message, returns nothing
:param msg: Message telegram object
2023-10-28 19:25:10 +02:00
:return:
"""
await msg.chat.pin_message(message_id=msg.reply_to_message.message_id)
logging.log(
msg=f"Pinned message id={msg.reply_to_message.message_id} chat_id={msg.chat.id}",
level=logging.INFO)
@staticmethod
async def unpin_msg(msg: types.Message) -> None:
"""
Unpins replied message, if it pinned, always returns nothing.
:param msg: Message telegram object
2023-10-28 19:25:10 +02:00
:return:
"""
await msg.chat.unpin_message(message_id=msg.reply_to_message.message_id)
logging.log(
msg=f"Unpinned message id={msg.reply_to_message.message_id} chat_id={msg.chat.id}",
level=logging.INFO)
@staticmethod
async def unpin_all_msg(msg: types.Message) -> None:
"""
Unpins all pinned messages, returns nothing
:param msg: Message telegram object
2023-10-28 19:25:10 +02:00
:return:
"""
logging.log(
msg=f"Unpinned all messages chat_id={msg.chat.id}",
level=logging.INFO)
await msg.chat.unpin_all_messages()
@staticmethod
async def create_invite(msg: types.Message, cmd: CommandObject):
"""
Creating invite, returning invite link
:param msg: Message telegram object
:param cmd: Object of telegram command
2023-10-28 19:25:10 +02:00
:return:
"""
if cmd.args and re.match(r"^\d[wdysmh]", cmd.args):
link = await msg.chat.create_invite_link()
link = await msg.chat.create_invite_link()
logging.log(
msg=f"Created invite into chat by @{msg.from_user.full_name} chat_id={msg.chat.id}",
level=logging.INFO)
return link.invite_link
@staticmethod
async def auto_settings(msg: types.Message, session: async_sessionmaker):
"""
Creating settings related to chat automaticly
after adding
:param msg: Message telegram object
:param session: Object of telegram command
"""
chat_data = await get_settings(msg.chat.id, session)
if not chat_data:
new_chat_data = ChatSettings(chat_id=msg.chat.id)
async with session() as session:
async with session.begin():
await session.merge(new_chat_data)