Added all code

This commit is contained in:
kittyneverdies catofeyev 2023-10-28 17:07:52 +00:00
parent 4e7af16752
commit 27af04b449
111 changed files with 2586 additions and 0 deletions

114
alembic.ini Normal file
View File

@ -0,0 +1,114 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = bozenka/db/migrate
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python-dateutil library that can be
# installed by adding `alembic[tz]` to the pip requirements
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to bozenka/db/migrate/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:bozenka/db/migrate/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = postgresql+asyncpg://postgres:1234@localhost/postgres
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
# hooks = ruff
# ruff.type = exec
# ruff.executable = %(here)s/.venv/bin/ruff
# ruff.options = --fix REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

24
bozenka/__init__.py Normal file
View File

@ -0,0 +1,24 @@
import asyncio
import logging
import g4f
from bozenka.telegram import launch_telegram_instance
from bozenka.db import generate_url, get_async_engine, get_sessions_maker
import threading
def launch_instances() -> None:
"""
Launches bozenka instances, working async
:return:
"""
logging.log(msg="Setting up g4f logging!", level=logging.INFO)
g4f.logging = True
db_url = generate_url()
engine = get_async_engine(db_url)
session_maker = get_sessions_maker(engine)
launch_telegram_instance(session_maker)
logging.log(msg="Launched all instances!", level=logging.INFO)

Binary file not shown.

5
bozenka/db/__init__.py Normal file
View File

@ -0,0 +1,5 @@
__all__ = ["MainModel", "get_async_engine", "get_sessions_maker", "schemas", "Users", "get_user", "generate_url"]
from .main import MainModel
from .engine import get_async_engine, get_sessions_maker, schemas, generate_url
from bozenka.db.tables.telegram import Users, get_user

Binary file not shown.

Binary file not shown.

Binary file not shown.

52
bozenka/db/engine.py Normal file
View File

@ -0,0 +1,52 @@
import os
from sqlalchemy import URL, create_engine, Engine
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import sessionmaker
def get_async_engine(url: URL | str) -> AsyncEngine:
"""
Creates AsyncEngine
:param url:
:return:
"""
return create_async_engine(url=url, echo=True, pool_pre_ping=True)
@DeprecationWarning
async def schemas(engine: AsyncEngine, metadata) -> None:
"""
Commiting all changes & create databases
:param engine:
:param metadata:
:return:
"""
"""
async with engine.begin() as connect:
await connect.run_sync(metadata.create_all)
"""
def get_sessions_maker(engine: AsyncEngine) -> async_sessionmaker:
"""
Creates SessionMaker (Async!)
:param engine:
:return:
"""
return async_sessionmaker(engine, class_=AsyncSession)
def generate_url() -> URL:
"""
Generates URL for postgresql database
:return:
"""
return URL.create(
"postgresql+asyncpg",
username=os.getenv("db_username"),
host=os.getenv("db_host"),
password=os.getenv("db_password"),
database=os.getenv("db_name"),
port=os.getenv("db_port")
)

3
bozenka/db/main.py Normal file
View File

@ -0,0 +1,3 @@
from sqlalchemy.ext.declarative import declarative_base
MainModel = declarative_base()

View File

@ -0,0 +1 @@
Generic single-database configuration with an async dbapi.

Binary file not shown.

91
bozenka/db/migrate/env.py Normal file
View File

@ -0,0 +1,91 @@
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
import bozenka.db.main
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = bozenka.db.main.MainModel.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@ -0,0 +1,26 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,40 @@
"""init
Revision ID: 1a3e461d3f6f
Revises:
Create Date: 2023-09-05 14:16:43.578747
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '1a3e461d3f6f'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('users',
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('chat_id', sa.Integer(), nullable=False),
sa.Column('is_banned', sa.Boolean(), nullable=True),
sa.Column('ban_reason', sa.Text(), nullable=True),
sa.Column('is_muted', sa.Boolean(), nullable=True),
sa.Column('mute_reason', sa.Text(), nullable=True),
sa.PrimaryKeyConstraint('user_id', 'chat_id'),
sa.UniqueConstraint('chat_id'),
sa.UniqueConstraint('user_id')
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('users')
# ### end Alembic commands ###

View File

@ -0,0 +1,44 @@
"""fixed int error
Revision ID: 43130cd7eeaf
Revises: 1a3e461d3f6f
Create Date: 2023-09-05 16:52:22.023582
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '43130cd7eeaf'
down_revision: Union[str, None] = '1a3e461d3f6f'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('users', 'user_id',
existing_type=sa.INTEGER(),
type_=sa.BigInteger(),
existing_nullable=False)
op.alter_column('users', 'chat_id',
existing_type=sa.INTEGER(),
type_=sa.BigInteger(),
existing_nullable=False)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('users', 'chat_id',
existing_type=sa.BigInteger(),
type_=sa.INTEGER(),
existing_nullable=False)
op.alter_column('users', 'user_id',
existing_type=sa.BigInteger(),
type_=sa.INTEGER(),
existing_nullable=False)
# ### end Alembic commands ###

View File

@ -0,0 +1,26 @@
"""empty message
Revision ID: 6abf81176a38
Revises: 43130cd7eeaf
Create Date: 2023-09-07 16:02:00.730073
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '6abf81176a38'
down_revision: Union[str, None] = '43130cd7eeaf'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
pass
def downgrade() -> None:
pass

View File

@ -0,0 +1,26 @@
"""empty message
Revision ID: d9ac923e0c6e
Revises: 6abf81176a38
Create Date: 2023-09-20 17:10:46.740094
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'd9ac923e0c6e'
down_revision: Union[str, None] = '6abf81176a38'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
pass
def downgrade() -> None:
pass

View File

Binary file not shown.

Binary file not shown.

View File

View File

@ -0,0 +1,84 @@
from typing import Tuple, Any
from sqlalchemy import Column, Integer, VARCHAR, Boolean, Text, select, BigInteger, Row
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import sessionmaker
from bozenka.db.main import MainModel
class Users(MainModel):
"""
Telegram users table, contains:
- Telegram user_id
- Telegram chat_id
- Is user banned status
- Ban reason
- Is user muted status
- Mute reason
"""
__tablename__ = 'users'
user_id = Column(BigInteger, unique=True, nullable=False, primary_key=True)
chat_id = Column(BigInteger, unique=True, nullable=False, primary_key=True)
is_banned = Column(Boolean, nullable=True, unique=False)
ban_reason = Column(Text, nullable=True, unique=False)
is_muted = Column(Boolean, nullable=True, unique=False)
mute_reason = Column(Text, nullable=True, unique=False)
def __str__(self) -> str:
return f"<User:{self.user_id}:{self.chat_id}>"
class ChatSettings(MainModel):
"""
Telegram of chat settings table, contains:
- Telegram chat_id
- Moderation setting
- Pins and Topics setting
- Gpt Conversation setting
- Welcome & Goodbye messages setting
- Hi command setting
- Open AI token (P.S in future)
"""
__tablename__ = 'chats_tg'
chat_id = Column(BigInteger, unique=True, nullable=False, primary_key=True)
moderation = Column(Boolean, default=True, unique=False)
gpt_conversations = Column(Boolean, default=False, unique=False)
topics = Column(Boolean, default=False, unique=False)
pins = Column(Boolean, default=False, unique=False)
welcome_messages = Column(Boolean, default=True, unique=False)
hi_command = Column(Boolean, default=False, unique=False)
invite_generator = Column(Boolean, default=True, unique=False)
chat_info = Column(Boolean, default=True, unique=False)
results_in_dm = Column(Boolean, default=True, unique=False)
restrict_notification = Column(Boolean, default=True, unique=False)
# openai_token = Column(Text)
async def get_settings(user_id: int, chat_id: int, session: async_sessionmaker):
"""
Return settings with sessionmaker by chat_id
:param user_id:
:param chat_id:
:param session:
:return:
"""
async with session() as session:
async with session.begin():
return (await session.execute(select(Users).where(Users.user_id == user_id and Users.chat_id == chat_id))).one_or_none()
async def get_user(user_id: int, chat_id: int, session: async_sessionmaker) -> Row[tuple[Any, ...] | Any] | None:
"""
Return user with sessionmaker by user_id and chat_id.
:param user_id: id of telegram user
:param chat_id: id of telegram chat
:param session: sessionmaker from dispatcher
:return:
"""
async with session() as session:
async with session.begin():
return (await session.execute(select(Users).where(Users.user_id == user_id and Users.chat_id == chat_id))).one_or_none()

0
bozenka/gpt/__init__.py Normal file
View File

View File

@ -0,0 +1,21 @@
import os
import logging
import g4f
from aiogram import Dispatcher, Bot
from sqlalchemy.ext.asyncio import async_sessionmaker
from bozenka.telegram.cmds import register_handlers
async def launch_telegram_instance(session_maker: async_sessionmaker) -> None:
"""
Launches telegram bot with token from enviroment
:return:
"""
logging.basicConfig(level=logging.INFO)
logging.log(msg="Setting up logging!", level=logging.INFO)
g4f.logging = True
bot = Bot(token=os.getenv("tg_bot_token"), parse_mode="HTML")
dp = Dispatcher(bot=bot, session_maker=session_maker)
await dp.start_polling(on_startup=[register_handlers(dp)])

Binary file not shown.

View File

@ -0,0 +1,25 @@
import logging
from aiogram import Dispatcher
from bozenka.telegram.cmds.admin import register_admin_cmd
from bozenka.telegram.queries import register_queries
from bozenka.telegram.cmds.dev import register_dev_cmd
from bozenka.telegram.cmds.main import register_main_cmd
from bozenka.telegram.cmds.user import register_user_cmd
from bozenka.telegram.utils.middleware import register_middlewares
def register_handlers(dp: Dispatcher):
"""
Registers all handlers
:param dp:
:return:
"""
logging.log(msg="Starting registering all handlers", level=logging.INFO)
register_dev_cmd(dp)
register_user_cmd(dp)
register_admin_cmd(dp)
register_main_cmd(dp)
register_queries(dp)
register_middlewares(dp)

View File

@ -0,0 +1,44 @@
__all__ = ["bans", "mutes", "pins", "topics"]
import logging
from aiogram import Router, F
from aiogram.filters import Command
from bozenka.telegram.cmds.admin.mutes import mute
from bozenka.telegram.cmds.admin.pins import *
from bozenka.telegram.cmds.admin.topics import *
from bozenka.telegram.cmds.admin.bans import ban
from bozenka.telegram.utils.filters import (
IsAdminFilter,
UserHasPermissions
)
def register_admin_cmd(router: Router) -> None:
"""
Registers all commands related to administrators in group.
All commands there require access to some group perms.
:param router:
:return:
"""
logging.log(msg="Registering administrator commands", level=logging.INFO)
router.message.register(ban, Command(commands="ban"),
IsAdminFilter(True), F.reply_to_message.text)
router.message.register(pin, Command(commands="pin"), UserHasPermissions(["can_pin_messages"]), F.reply_to_message.text)
router.message.register(unpin, Command(commands="unpin"), UserHasPermissions(["can_pin_messages"]), F.reply_to_message.text)
router.message.register(unpin_all, Command(commands="unpin_all"), IsAdminFilter(True), F.reply_to_message.text)
router.message.register(reopen_topic, Command(commands=["reopen_topic", "open_topic", "open"]),
UserHasPermissions(["can_pin_messages"]), F.chat.is_forum)
router.message.register(close_topic, Command(commands=["close_topic", "close"]),
UserHasPermissions(["can_pin_messages"]), F.chat.is_forum)
router.message.register(mute, Command(commands=["mute", "re"]), UserHasPermissions(["can_restrict_members"]))
router.message.register(close_general_topic, Command(commands=["close_general"]),
UserHasPermissions(["can_pin_messages"]), F.chat.is_forum)
router.message.register(reopen_general_topic, Command(commands=["reopen_general", "open_general"]),
UserHasPermissions(["can_pin_messages"]), F.chat.is_forum)
router.message.register(hide_general_topic, Command(commands=["hide_general"]),
UserHasPermissions(["can_pin_messages"]), F.chat.is_forum)
router.message.register(unhide_general_topic, Command(commands=["unhide_general", "show_general"]),
UserHasPermissions(["can_pin_messages"]), F.chat.is_forum)

View File

@ -0,0 +1,106 @@
from aiogram.filters import CommandObject
from aiogram.types import Message
from aiogram.enums import ChatMemberStatus
from sqlalchemy.ext.asyncio import async_sessionmaker
from bozenka.telegram.utils.keyboards import ban_keyboard, delete_keyboard
from bozenka.telegram.utils.simpler import SolutionSimpler, ru_cmds
async def ban(msg: Message, command: CommandObject, session_maker: async_sessionmaker):
"""
/ban command function, supports time and reasons.
:param msg: Message telegram object
:param command: Object of telegram command
:param session_maker: Session maker object of SqlAlchemy
:return:
"""
banned_user = await msg.chat.get_member(msg.reply_to_message.from_user.id)
if banned_user.status == ChatMemberStatus.KICKED:
await msg.answer(ru_cmds["ban_6"], reply_markup=delete_keyboard(msg.from_user.id))
return
config = await SolutionSimpler.ban_user(msg, command, session_maker)
if config["reason"] and config["ban_time"]:
await msg.answer(ru_cmds["ban_1"].replace
("banned", msg.reply_to_message.from_user.mention_html()).replace
("admin", msg.from_user.mention_html()).replace
("ban_reason", config["reason"]).replace("ban_time", config["ban_time"]),
reply_markup=ban_keyboard(msg.from_user.id, msg.reply_to_message.from_user.id))
elif config["reason"]:
await msg.answer(
ru_cmds["ban_2"].replace
("banned", msg.reply_to_message.from_user.mention_html()).replace
("admin", msg.from_user.mention_html()).replace
("ban_reason", config["reason"]),
reply_markup=ban_keyboard(admin_id=msg.from_user.id, ban_id=msg.reply_to_message.from_user.id)
)
elif config["ban_time"]:
await msg.answer(
ru_cmds["ban_4"].replace
("banned", msg.reply_to_message.from_user.mention_html()).replace
("admin", msg.from_user.mention_html()).replace("ban_time", config["ban_time"]),
reply_markup=ban_keyboard(admin_id=msg.from_user.id, ban_id=msg.reply_to_message.from_user.id)
)
else:
await msg.answer(
ru_cmds["ban_3"].replace
("banned", msg.reply_to_message.from_user.mention_html()).replace
("admin", msg.from_user.mention_html()),
reply_markup=ban_keyboard(msg.from_user.id, msg.reply_to_message.from_user.id)
)
async def unban(msg: Message, command: CommandObject, session_maker: async_sessionmaker):
"""
/unban command function
:param msg: Message telegram object
:param command: Object of telegram command
:param session_maker: Session maker object of SqlAlchemy
"""
await SolutionSimpler.unban_user(msg, command, session_maker)
unbanned_user = await msg.chat.get_member(msg.reply_to_message.from_user.id)
if unbanned_user.is_member and unbanned_user.status != ChatMemberStatus.KICKED:
await msg.answer(
ru_cmds["unban_3"],
reply_markup=delete_keyboard(admin_id=msg.from_user.id)
)
elif not command.text:
await msg.answer(
ru_cmds["unban_1"]
.replace("unbanned", msg.reply_to_message.from_user.mention_html())
.replace("admin", msg.from_user.mention_html()),
reply_markup=ban_keyboard(admin_id=msg.from_user.id, ban_id=msg.reply_to_message.from_user.id)
)
else:
await msg.answer(
ru_cmds["unban_2"]
.replace("unbanned", msg.reply_to_message.from_user.mention_html())
.replace("admin", msg.from_user.mention_html())
.replace("reason", CommandObject.text),
reply_markup=ban_keyboard(admin_id=msg.from_user.id, ban_id=msg.reply_to_message.from_user.id)
)
async def status(msg: Message, session_maker: async_sessionmaker):
"""
/status command function
Checks is user banned and muted
:param msg:
:param command:
:param session_maker:
:return:
"""
config = await SolutionSimpler.get_status(msg, session_maker)
msg_text = ""
if config["is_banned"]:
msg_text += "Находится в бане"
if config["ban_reason"]:
msg_text += f"по причине <code>{config['ban_reason']}</code>"
msg_text += "🔨\n"
if config["is_muted"]:
msg_text += "Находится в муте"
if config["mute_reason"]:
msg_text += f"по причине <code>{config['mute_reason']}</code>"
msg_text += "🤐\n"
await msg.answer(msg_text, reply_markup=delete_keyboard(msg.from_user.id))

View File

@ -0,0 +1,60 @@
from aiogram.filters import CommandObject
from aiogram.types import Message as Message
from aiogram.enums import ChatMemberStatus
from sqlalchemy.ext.asyncio import async_sessionmaker
from bozenka.telegram.utils.keyboards import mute_keyboard, unmute_keyboard
from bozenka.telegram.utils.simpler import SolutionSimpler
async def mute(msg: Message, command: CommandObject, session_maker: async_sessionmaker):
"""
Handler of command /mute
Restricts member from using chat
:param msg: Message telegram object
:param command: Object of telegram command
:param session_maker: Session maker object of SqlAlchemy
:return:
"""
restricting = await msg.chat.get_member(msg.reply_to_message.from_user.id)
if restricting.status == ChatMemberStatus.LEFT or restricting.status == ChatMemberStatus.KICKED:
return
config = await SolutionSimpler.mute_user(msg, command, session_maker)
if config["mute_time"] and config["reason"] != "":
await msg.answer("Удача ✅\n"
f"Пользователь {msg.from_user.mention_html()} запретил писать "
f"сообщения пользователю {msg.reply_to_message.from_user.mention_html()}.\n"
f"По причине {config['reason']}, до даты {config['mute_time']}",
reply_markup=mute_keyboard(msg.from_user.id, restricting.user.id))
elif config["mute_reason"] != "":
await msg.answer("Удача ✅\n"
f"Пользователь {msg.from_user.mention_html()} запретил писать "
f"сообщения пользователю {msg.reply_to_message.from_user.mention_html()}.\n"
f"По причине {config['reason']}",
reply_markup=mute_keyboard(msg.from_user.id, restricting.user.id))
elif config["mute_time"]:
await msg.answer("Удача ✅\n"
f"Пользователь {msg.from_user.mention_html()} запретил писать "
f"сообщения пользователю {msg.reply_to_message.from_user.mention_html()}.\n"
f"До даты {config['mute_time']}",
reply_markup=mute_keyboard(msg.from_user.id, restricting.user.id))
else:
await msg.answer("Удача ✅\n"
f"Пользователь {msg.from_user.mention_html()} запретил писать "
f"сообщения пользователю {msg.reply_to_message.from_user.mention_html()}.\n",
reply_markup=mute_keyboard(msg.from_user.id, restricting.user.id))
async def unmute(msg: Message, session_maker: async_sessionmaker):
"""
Handler of command /unmute
Gives access member to send messages into chat
:param msg: Message telegram object
:param command: Object of telegram command
:param session_maker: Session maker object of SqlAlchemy
:return:
"""
await SolutionSimpler.unmute_user(msg, session_maker)
await msg.answer("Удача ✅"
f"Пользователь {msg.from_user.mention_html()} разрешил писать "
f"сообщения пользователю {msg.reply_to_message.from_user.mention_html()}",
reply_markup=unmute_keyboard(msg.from_user.id, msg.reply_to_message.from_user.id))

View File

@ -0,0 +1,29 @@
from aiogram.types import Message as Message
async def pin(msg: Message):
"""
/pin command function, pins replied command
:param msg: Message telegram object
:return:
"""
await msg.chat.pin_message(message_id=msg.reply_to_message.message_id)
async def unpin(msg: Message):
"""
/unpin command function, unpins replied command
:param msg: Message telegram object
:return:
"""
await msg.chat.unpin_message(message_id=msg.reply_to_message.message_id)
async def unpin_all(msg: Message):
"""
/unpin_all command function, unpins all messages in chat
:param msg: Message telegram object
:return:
"""
await msg.chat.unpin_all_messages()

View File

@ -0,0 +1,83 @@
from aiogram import Bot
from aiogram.filters import CommandObject
from aiogram.types import Message as Message
from bozenka.telegram.utils.keyboards import delete_keyboard
from bozenka.telegram.utils.simpler import ru_cmds
async def close_topic(msg: Message, bot: Bot) -> None:
"""
/close command function. Closing thread
:param msg: Message telegram object
:param bot: Object of telegram bot
:return:
"""
await bot.close_forum_topic(chat_id=msg.chat.id, message_thread_id=msg.message_thread_id)
await msg.answer(ru_cmds["topic_closed"].replace("user", msg.from_user.mention_html()),
reply_markup=delete_keyboard(msg.from_user.id))
async def reopen_topic(msg: Message, bot: Bot) -> None:
"""
/open command function. Opens thread
:param msg:
:param bot:
:return:
"""
await bot.reopen_forum_topic(chat_id=msg.chat.id, message_thread_id=msg.message_thread_id)
await msg.answer(ru_cmds["open_topic"].replace("user", msg.from_user.mention_html()),
reply_markup=delete_keyboard(msg.from_user.id))
async def close_general_topic(msg: Message, bot: Bot):
"""
/close_general command function. Closes general thread
:param msg:
:param bot:
:return:
"""
await bot.close_general_forum_topic(chat_id=msg.chat.id)
await msg.answer(ru_cmds["close_general"].replace("user", msg.from_user.mention_html()),
reply_markup=delete_keyboard(msg.from_user.id))
async def reopen_general_topic(msg: Message, bot: Bot):
"""
/open_general command function. Opens general thread
:param msg:
:param bot:
:return:
"""
await bot.reopen_general_forum_topic(chat_id=msg.chat.id)
await msg.answer(ru_cmds["open_general"].replace("user", msg.from_user.mention_html()),
reply_markup=delete_keyboard(msg.from_user.id))
async def hide_general_topic(msg: Message, bot: Bot):
"""
/hide_general command function. Hides general thread
:param msg: Message telegram object
:param bot: Object of telegram bot
:return:
"""
await bot.hide_general_forum_topic(chat_id=msg.chat.id)
await msg.answer(ru_cmds["hide_general"].replace("user", msg.from_user.mention_html()),
reply_markup=delete_keyboard(msg.from_user.id))
async def unhide_general_topic(msg: Message, bot: Bot):
"""
/show_general command function. Shows back general thread.
:param msg: Message telegram object
:param bot: Object of telegram bot
:return:
"""
await bot.unhide_general_forum_topic(chat_id=msg.chat.id)
await msg.answer(ru_cmds["show_general"].replace("user", msg.from_user.mention_html()),
reply_markup=delete_keyboard(msg.from_user.id))
async def rename_topic(msg: Message, bot: Bot, command: CommandObject):
await msg.general_forum_topic_unhidden
await bot.edit_forum_topic(name=command.text, chat_id=msg.chat.id, message_thread_id=msg.message_thread_id)

View File

@ -0,0 +1,27 @@
__all__ = ["ai", "hello"]
import logging
from aiogram.filters import Command
from bozenka.telegram.cmds.dev.hello import hi, testing
from bozenka.telegram.cmds.dev.ai import *
from bozenka.telegram.utils.simpler import AnsweringGPT4Free, AnsweringGpt4All
from aiogram import Router
def register_dev_cmd(router: Router) -> None:
"""
Registering testing commands in development or planned in future and need much time to realise it.
Don't need any special perms in group.
:param router:
:return:
"""
logging.log(msg="Registering developer commands", level=logging.INFO)
router.message.register(hi, Command(commands=["hi", "welcome", "sup", "wassup", "hello", "priv",
"privet", "хай", "прив", "привет", "ку"]))
router.message.register(start_gpt_cmd, Command(commands=["conversation"]))
router.message.register(g4f_generate_answer, AnsweringGPT4Free.ready_to_answer, ~Command(commands=["cancel"]))
router.message.register(g4a_generate_answer, AnsweringGpt4All.answering)
router.message.register(cancel_answering, Command(commands=["cancel"]))
router.message.register(testing, Command(commands=["testingtest"]))

View File

@ -0,0 +1,91 @@
import g4f
from gpt4all import GPT4All
from aiogram.fsm.context import FSMContext
from aiogram.types import Message as Message
from bozenka.telegram.utils.keyboards import gpt_categories_keyboard, delete_keyboard
from bozenka.telegram.utils.simpler import generate_gpt4free_providers, ru_cmds
async def start_gpt_cmd(msg: Message, state: FSMContext):
"""
/conversation command handler, start
:param msg:
:param state:
:return:
"""
if await state.get_state():
return
await msg.answer("Пожалуста, выберите сервис для ИИ.",
reply_markup=gpt_categories_keyboard
(user_id=msg.from_user.id))
async def cancel_answering(msg: Message, state: FSMContext):
"""
Canceling dialog with ChatGPT
:param msg:
:param state:
:return:
"""
current = await state.get_state()
if current is None:
return
await state.clear()
await msg.answer("Удача ✅\n"
"Диалог отменён!", reply_markup=delete_keyboard(admin_id=msg.from_user.id))
async def g4a_generate_answer(msg: Message, state: FSMContext):
"""
Generating answer if Gpt4All has been selected
:param msg:
:param state:
:return:
"""
model = GPT4All("ggml-model-gpt4all-falcon-q4_0.bin")
model.list_models()
output = model.generate(msg.text, max_tokens=3, )
await msg.answer(text=output)
async def g4f_generate_answer(msg: Message, state: FSMContext):
"""
Generating answer if GPT4Free model and provider has been selected
:param msg:
:param state:
:return:
"""
print("starting")
info = await state.get_data()
"""
if info.get("forum_thread_id") is not None:
if msg.message_thread_id != info["forum_thread_id"] and info["forum_thread_id"] is not None:
return
"""
print("starting 2")
providers = generate_gpt4free_providers()
reply = await msg.reply(ru_cmds["generate_answer"])
# try:
messages = []
messages.append({"role": "user", "content": msg.text})
if info.get("ready_to_answer"):
for message in info["ready_to_answer"]:
messages.append(message)
response = await g4f.ChatCompletion.create_async(
model=info["set_model"],
messages=messages,
provider=providers[info["set_provider"]],
stream=False
)
print(response)
await reply.edit_text(response)
messages.append({"role": "assistant", "content": response})
await state.update_data(ready_to_answer=messages)
# except Exception:
print(Exception)
# await reply.edit_text("Простите, произошла ошибка 😔\n"
# "Если это продолжается, пожалуйста используйте /cancel", reply_markup=delete_keyboard(admin_id=msg.from_user.id))
async def generate_image(msg: Message):
pass

View File

@ -0,0 +1,26 @@
from aiogram import Bot
from aiogram.filters import CommandObject
from aiogram.types import Message as Message, User, Chat
from sqlalchemy.ext.asyncio import async_sessionmaker
from bozenka.telegram.utils.simpler import ru_cmds
async def hi(msg: Message):
"""
Test command, sending welcome message.
Made for testing bot working status.
:param msg:
:return:
"""
await msg.answer(
ru_cmds["hi"].replace("user", msg.from_user.mention_html(ru_cmds["user"])))
async def testing(msg: Message, session_maker: async_sessionmaker, command: CommandObject, user: User, target: User, chat: Chat, bot: Bot):
print(user.full_name)
print(target.full_name)
print(msg)
print(command.args)
print(command.mention)
print(command.command)

View File

@ -0,0 +1,23 @@
__all__ = ["setup", "start"]
import logging
from aiogram import Router, F
from aiogram.enums import ContentType
from aiogram.filters import Command
from bozenka.telegram.cmds.main.setup import after_adding, setup_cmd
def register_main_cmd(router: Router) -> None:
"""
Registers all commands related to basic commands or main commands in bot.
Don't require any special perms for bot in group.
:param router:
:return:
"""
#router.message.register(start_cmd, CommandStart)
router.message.register(setup_cmd, Command(commands=["setup"]))
logging.log(msg="Registering main related commands", level=logging.INFO)
router.message.register(after_adding, F.content_type == ContentType.SUPERGROUP_CHAT_CREATED)
router.message.register(after_adding, F.content_type == ContentType.GROUP_CHAT_CREATED)

View File

@ -0,0 +1,22 @@
from aiogram.types import Message as Message
from bozenka.telegram.utils.simpler import ru_cmds
from bozenka.telegram.utils.keyboards import setup_keyboard
async def setup_cmd(msg: Message):
"""
/setup handler
:param msg:
:return:
"""
await msg.answer("Привет владелец чата 👋\n"
"Чтобы меня настроить, используй меню под данным сообщением", reply_markup=setup_keyboard(msg.from_user.id))
async def after_adding(msg: Message):
"""
Send message after adding bozenka into group chat
:param msg:
:return:
"""
await msg.answer(ru_cmds["after_adding"])

View File

@ -0,0 +1,45 @@
from aiogram.types import Message as Message
from bozenka.telegram.utils.keyboards import start_keyboard
async def start_cmd(msg: Message):
"""
/start command function
:param msg:
:return:
"""
await msg.answer(
'Привет, пользователь, я - Бозенька 👋\n'
'Я мультизадачный телеграм бот, разрабатываемый Bozo Developement\n'
f'Выберите, что будете делать, {msg.from_user.mention_html()}',
reply_markup=start_keyboard.as_markup()
)
async def features_list(msg: Message):
"""
Shows features list from reply keyboard
:param msg:
:return:
"""
await msg.answer("List will be soon")
async def about_devs(msg: Message):
"""
Shows info about devs from reply keyboard
:param msg:
:return:
"""
await msg.answer("Info about developers will be added soon")
async def add_to_chat(msg: Message):
"""
Sends link for adding bot into chat
:param msg:
:return:
"""
await msg.answer("Will be soon")

View File

@ -0,0 +1,25 @@
__all__ = ["about", "invite", "welcome", ""]
from aiogram.enums import ContentType
from aiogram.filters import Command
from aiogram import Router, F
from bozenka.telegram.cmds.user.about import about
from bozenka.telegram.cmds.user.invite import invite
from bozenka.telegram.cmds.user.info import chat_info
from bozenka.telegram.cmds.user.welcome import *
def register_user_cmd(router: Router) -> None:
"""
Registers all commands related to users, and can be used by them.
Some of them require access to some perms for bot.
:param router:
:return:
"""
logging.log(msg="Registering user commands", level=logging.INFO)
router.message.register(invite, Command(commands=["invite"]))
router.message.register(about, Command(commands=["about"]))
router.message.register(leave, F.content_type == ContentType.LEFT_CHAT_MEMBER)
router.message.register(join, F.content_type == ContentType.NEW_CHAT_MEMBERS)
router.message.register(chat_info, Command(commands=["info"]))

View File

@ -0,0 +1,19 @@
import logging
from aiogram.types import Message as Message
from bozenka.telegram.utils.keyboards import about_keyboard
async def about(msg: Message):
"""
Sending information about bot by command `/about`
Will be deleted by its use
:param msg:
:return:
"""
logging.log(msg=f"Sending about information for user_id={msg.from_user.id}",
level=logging.INFO)
await msg.answer("Кто я? 👁"
"\nЯ - многозадачный бот, разрабатываемый Bozo Developement и всё ещё нахожусь в разработке"
"\n(Ссылочки на нас внизу короче)☺️",
reply_markup=about_keyboard.as_markup())

View File

@ -0,0 +1,24 @@
import logging
from aiogram.types import Message
from bozenka.telegram.utils.simpler import ru_cmds
from bozenka.telegram.utils.keyboards import delete_keyboard
async def chat_info(msg: Message):
"""
Shows information about chat by command `/info`
:param msg:
:return:
"""
logging.log(msg=f"Sending information about chat user_id={msg.from_user.id}",
level=logging.INFO)
chat = await msg.bot.get_chat(msg.chat.id)
await msg.answer(ru_cmds["info"].replace
("nameofchathere", "<code>" + chat.title + "</code>").replace
("chattype", ru_cmds["chat_types"][chat.type]).replace
("isforum", ru_cmds["forum_type"][chat.is_forum]).replace
("requiredinvite", ru_cmds["required_invite"][chat.join_by_request]).replace
("ishiddenmembers", ru_cmds["hidden_members"][chat.has_hidden_members]).
replace("isprotected", ru_cmds["isprotected"][chat.has_protected_content]).
replace("descr", "\n" + chat.description), reply_markup=delete_keyboard(admin_id=msg.from_user.id))

View File

@ -0,0 +1,22 @@
import logging
from aiogram.types import Message
from bozenka.telegram.utils.keyboards import invite_keyboard
from bozenka.telegram.utils.simpler import ru_cmds
async def invite(msg: Message):
"""
Generating invite to group by /invite command
:param msg:
:return:
"""
logging.log(msg=f"Generating invite for user_id={msg.from_user.id}",
level=logging.INFO)
link = await msg.chat.create_invite_link()
print(link.invite_link[0])
await msg.answer(
ru_cmds["invite_generation"].replace("user", msg.from_user.mention_html(ru_cmds["sir"])),
reply_markup=invite_keyboard(link=str(link.invite_link), admin_id=msg.from_user.id, chat_name=msg.chat.full_name)
)

View File

@ -0,0 +1,44 @@
import logging
from aiogram import Bot
from aiogram.types import Message as Message
from bozenka.telegram.utils.simpler import ru_cmds
async def join(msg: Message):
"""
Send welcome message, after adding new member to chat.
Also works on adding bot to chat and sending welcome message.
:param msg:
:return:
"""
for new in msg.new_chat_members:
if new.id != msg.bot.id:
logging.log(msg=f"Saing welcome for user_id={new.id}, chat_id={msg.chat.id}",
level=logging.INFO)
await msg.answer(
f"Пользователь {new.mention_html()} переехал в конфу, благодаря {msg.from_user.mention_html()}👋",
)
else:
logging.log(msg=f"Saing welcome to administrators for chat_id={msg.chat.id}",
level=logging.INFO)
await msg.answer(ru_cmds["after_adding"])
await msg.delete()
async def leave(msg: Message, bot: Bot):
"""
Sens goodbye message, after deleting member from chat
:param msg:
:param bot:
:return:
"""
await msg.delete()
if msg.from_user.id == bot.id:
return
logging.log(msg=f"Saing goodbye for user_id={msg.left_chat_member.id}, chat_id={msg.chat.id}",
level=logging.INFO)
await msg.answer(
f"Пользователь {msg.left_chat_member.mention_html()} съехал с конфы, благодаря {msg.from_user.mention_html()}👋"
)

View File

@ -0,0 +1,29 @@
__all__ = ["ban", "delete", "gpt"]
from aiogram import Router, F
from bozenka.telegram.utils.callbacks_factory import *
from bozenka.telegram.queries.ban import inline_ban, inline_unban
from bozenka.telegram.queries.delete import inline_delete
from bozenka.telegram.queries.revoke import inline_revoke
from bozenka.telegram.queries.gpt import *
def register_queries(router: Router) -> None:
"""
Register all callback queries.
:param router:
:return:
"""
logging.log(msg="Registering callback queries", level=logging.INFO)
router.callback_query.register(inline_ban, BanData.filter())
router.callback_query.register(inline_unban, UnbanData.filter())
router.callback_query.register(inline_delete, DeleteCallbackData.filter())
router.callback_query.register(inline_revoke, RevokeCallbackData.filter())
# r.callback_query.register(inline_gpt_menu, GptCategoryCallbackData.filter())
router.callback_query.register(inline_g4f_ready, Gpt4freeResult.filter())
router.callback_query.register(inline_gpt4free_models, Gpt4FreeProvider.filter())
router.callback_query.register(inline_gpt4free, GptCategory.filter(F.category == "Gpt4Free"))
router.callback_query.register(generate_next_page, Gpt4FreePage.filter(), flags={"rate_limit": {"rate": 5}})
router.callback_query.register(inline_return_pages, F.data == "gotpages")
router.callback_query.register(inline_g4a_ready, GptCategory.filter(F.category == "Gpt4All"))

View File

@ -0,0 +1,51 @@
import logging
from aiogram import types
from bozenka.telegram.utils.callbacks_factory import BanData, UnbanData
from bozenka.telegram.utils.simpler import ru_cmds
from aiogram.enums import ChatMemberStatus
from bozenka.telegram.utils.keyboards import ban_keyboard, unban_keyboard
async def inline_ban(call: types.CallbackQuery, callback_data: BanData) -> None:
"""
Query, what bannes users after callback
:param call:
:param callback_data:
:return:
"""
clicked_user = await call.message.chat.get_member(call.from_user.id)
banned_user = await call.message.chat.get_member(int(callback_data.user_id_ban))
if not banned_user.is_member and banned_user.status == ChatMemberStatus.KICKED:
return
elif call.from_user.id == callback_data.user_id_clicked or clicked_user.status == ChatMemberStatus.ADMINISTRATOR:
await call.answer(ru_cmds["ban_success"])
await call.message.edit_text(
ru_cmds["ban_3"].replace("banned", banned_user.user.mention_html()).replace("admin", call.from_user.mention_html()),
reply_markup=ban_keyboard(admin_id=call.from_user.id, ban_id=banned_user.user.id)
)
logging.log(msg=f"Banned user @{banned_user.user.full_name} user_id=f{banned_user.user.id}", level=logging.INFO)
async def inline_unban(call: types.CallbackQuery, callback_data: UnbanData) -> None:
"""
Query, what unbannes users after callback
:param call:
:param callback_data:
:return:
"""
clicked_user = await call.message.chat.get_member(call.from_user.id)
unbanned_user = await call.message.chat.get_member(int(callback_data.user_id_unban))
if unbanned_user.is_member and unbanned_user.status != ChatMemberStatus.KICKED:
return
if call.from_user.id == callback_data.user_id_clicked or clicked_user.status == ChatMemberStatus.ADMINISTRATOR:
await call.answer(ru_cmds["unban_success"])
await call.message.edit_text(
ru_cmds["unban_2"]
.replace("unbanned", unbanned_user.user.mention_html())
.replace("admin", call.from_user.mention_html()),
reply_markup=unban_keyboard(admin_id=call.from_user.id, ban_id=unbanned_user.user.id)
)
logging.log(msg=f"Unbanned user @{unbanned_user.user.full_name} user_id=f{unbanned_user.user.id}", level=logging.INFO)

View File

@ -0,0 +1,21 @@
import logging
from aiogram import types
from bozenka.telegram.utils.callbacks_factory import DeleteCallbackData
from aiogram.enums import ChatMemberStatus
async def inline_delete(call: types.CallbackQuery, callback_data: DeleteCallbackData) -> None:
"""
Deletes messsage, after special callback
:param call:
:param callback_data:
:return:
"""
user_clicked = await call.message.chat.get_member(call.from_user.id)
if call.from_user.id == callback_data.user_id_clicked or user_clicked.status == ChatMemberStatus.ADMINISTRATOR:
await call.answer("Хорошо ✅")
logging.log(msg=f"Deleted message with message_id={call.message.message_id}",
level=logging.INFO)
await call.message.delete()

View File

@ -0,0 +1,137 @@
import logging
from aiogram import types
from aiogram.fsm.context import FSMContext
# Callbacks for GPT
from bozenka.telegram.utils.callbacks_factory import (
GptCategory,
Gpt4FreeProvider,
Gpt4freeResult,
Gpt4FreePage,
Gpt4All
)
# Keyboards for messages
from bozenka.telegram.utils.keyboards import (
gpt4free_models_keyboard,
generate_gpt4free_page,
delete_keyboard
)
# Simpler utlilities
from bozenka.telegram.utils.simpler import (
AnsweringGPT4Free,
AnsweringGpt4All,
ru_cmds
)
async def inline_gpt4free(call: types.CallbackQuery, callback_data: GptCategory, state: FSMContext) -> None:
"""
Query, what creating providers selecting menu.
:param state:
:param call:
:param callback_data:
:return:
"""
if call.from_user.id != callback_data.user_id:
return
logging.log(msg=f"Selected gpt4free category by user_id={call.from_user.id}",
level=logging.INFO)
await state.update_data(set_category=callback_data.category)
await state.set_state(AnsweringGPT4Free.set_provider)
await call.message.edit_text(ru_cmds["select_provider_message"],
reply_markup=generate_gpt4free_page(page=0, user_id=callback_data.user_id))
await call.answer(ru_cmds["select_provider"])
async def inline_gpt4free_models(call: types.CallbackQuery, callback_data: Gpt4FreeProvider, state: FSMContext) -> None:
"""
Query, what creating models selecting menu.
:param state:
:param call:
:param callback_data:
:return:
"""
if call.from_user.id != callback_data.user_id:
return
logging.log(msg=f"Selected gpt4free provider {callback_data.provider} by user_id={call.from_user.id}",
level=logging.INFO)
await state.update_data(set_provider=callback_data.provider)
await state.set_state(AnsweringGPT4Free.set_model)
await call.message.edit_text(ru_cmds["select_model_message"], reply_markup=gpt4free_models_keyboard(
user_id=callback_data.user_id,
provider=callback_data.provider
))
await call.answer(ru_cmds["select_model"])
async def inline_g4f_ready(call: types.CallbackQuery, callback_data: Gpt4freeResult, state: FSMContext) -> None:
"""
Query, what says about getting ready to questions for ChatGPT from Gpt4Free.
:param state:
:param call:
:param callback_data:
:return:
"""
if call.from_user.id != callback_data.user_id:
return
logging.log(msg=f"Selected gpt4free model {callback_data.model} by user_id={call.from_user.id}",
level=logging.INFO)
await state.update_data(set_model=callback_data.model)
await state.set_state(AnsweringGPT4Free.ready_to_answer)
logging.log(msg=f"Loaded GPT answering status for user_id={call.from_user.id}",
level=logging.INFO)
await call.message.edit_text(ru_cmds["finish_gptfree_message"]
.replace("modelname", callback_data.model).
replace("providername", callback_data.provider),
reply_markup=delete_keyboard(admin_id=callback_data.user_id))
await call.answer(ru_cmds["finish_gpt"])
async def inline_g4a_ready(call: types.CallbackQuery, callback_data: Gpt4All, state: FSMContext) -> None:
"""
Query, what says about getting ready for questions for Gpt4All.
:param call:
:param callback_data:
:param state:
:return:
"""
if callback_data.user_id != call.from_user.id:
return
await state.set_state(AnsweringGpt4All.answering)
logging.log(msg=f"Loaded GPT answering status for user_id={call.from_user.id}",
level=logging.INFO)
await call.message.edit_text(ru_cmds["finish_gpt4all_message"],
reply_markup=delete_keyboard(admin_id=callback_data.user_id))
await call.answer(ru_cmds["finish_gpt"])
async def generate_next_page(call: types.CallbackQuery, callback_data: Gpt4FreePage, state: FSMContext) -> None:
"""
Query, what generates a next page for user.
:param call:
:param callback_data:
:param state:
:return:
"""
if call.from_user.id != callback_data.user_id:
return
logging.log(msg=f"Changed page to {str(callback_data.page+1)} user_id={call.from_user.id}",
level=logging.INFO)
await call.message.edit_text(ru_cmds["select_provider_page"].replace("pagecount", str(callback_data.page+1)),
reply_markup=generate_gpt4free_page(user_id=callback_data.user_id, page=callback_data.page))
await call.answer(ru_cmds["moved_page"].replace("pagecount", str(callback_data.page+1)))
async def inline_return_pages(call: types.CallbackQuery) -> None:
"""
Query, made for helping purposes.
Shows current page.
:param call:
:return:
"""
logging.log(msg=f"Showed helping info for user_id={call.from_user.id}",
level=logging.INFO)
await call.answer(ru_cmds["help_notification"])

View File

@ -0,0 +1,24 @@
import logging
from aiogram import types
from bozenka.telegram.utils.callbacks_factory import RevokeCallbackData
from aiogram.enums import ChatMemberStatus
from bozenka.telegram.utils.simpler import ru_cmds
async def inline_revoke(call: types.CallbackQuery, callback_data: RevokeCallbackData):
"""
Revokes invite link
:param call:
:param callback_data:
:return:
"""
user_clicked = await call.message.chat.get_member(call.from_user.id)
if callback_data.admin_id == call.from_user.id or user_clicked.status == ChatMemberStatus.ADMINISTRATOR or user_clicked.status == ChatMemberStatus.CREATOR:
logging.log(msg=f"Revoking link for user_id={call.from_user.id}",
level=logging.INFO)
await call.message.chat.revoke_invite_link(invite_link="https://" + str(callback_data.link))
await call.answer(ru_cmds["success"])
await call.message.delete()

View File

@ -0,0 +1,9 @@
from aiogram.types import *
async def inline_admins(call: CallbackQuery):
"""
List of admins features to enable.
:param call:
:return:
"""

View File

@ -0,0 +1,5 @@
from .admin import *
from .delete import DeleteCallbackData
from .revoke import RevokeCallbackData
from .gpt_selector import *
from .setup import *

View File

@ -0,0 +1,34 @@
from aiogram.filters.callback_data import CallbackData
class BanData(CallbackData, prefix="ban"):
"""
Callback with information to ban user
"""
user_id_ban: int
user_id_clicked: int
class UnbanData(CallbackData, prefix="unban"):
"""
Callback with information to unban user
"""
user_id_unban: int
user_id_clicked: int
class MuteData(CallbackData, prefix="mute"):
"""
Callback with information to mute user
"""
user_id_mute: int
user_id_clicked: int
class UnmuteData(CallbackData, prefix="unmute"):
"""
Callback with information to unmute user
"""
user_id_unmute: int
user_id_clicked: int

View File

@ -0,0 +1,8 @@
from aiogram.filters.callback_data import CallbackData
class DeleteCallbackData(CallbackData, prefix="delete"):
"""
Callback with information to delete message
"""
user_id_clicked: int

View File

@ -0,0 +1,41 @@
from aiogram.filters.callback_data import CallbackData
class Gpt4FreeProvider(CallbackData, prefix="provider"):
"""
Callback with information related to selected provider
"""
user_id: int
provider: str
class GptCategory(CallbackData, prefix="gpt"):
"""
Callback with information to show content, related to gpt category
"""
user_id: int
category: str
class Gpt4freeResult(CallbackData, prefix="endselect"):
"""
Callback with information of selected g4f content
"""
user_id: int
provider: str
model: str
class Gpt4FreePage(CallbackData, prefix="gptpage"):
"""
Callback with information to show next page of providers
"""
user_id: int
page: int
class Gpt4All(CallbackData, prefix="gpt4all"):
"""
Callback with information to show GPT4All content
"""
user_id: int

View File

@ -0,0 +1,6 @@
from aiogram.filters.callback_data import CallbackData
class RevokeCallbackData(CallbackData, prefix="mute"):
admin_id: int
link: str

View File

@ -0,0 +1,17 @@
from aiogram.filters.callback_data import CallbackData
class SetupCategory(CallbackData, prefix="scategory"):
"""
Callback data of setup categories
"""
owner_id: int
category_name: str
class SetupFeature(CallbackData, prefix="sfeature"):
"""
Callback data of features category
"""
owner_id: int
feature_name: str

View File

@ -0,0 +1 @@
from .permissions import *

View File

@ -0,0 +1,103 @@
from typing import Any
from aiogram.filters import Filter
from aiogram.types import Message
from aiogram.enums import ChatMemberStatus
from bozenka.telegram.utils.simpler import ru_cmds
class UserHasPermissions(Filter):
"""
Check, does user have permissions, what user need to work with bot.
"""
# List of permissions avaible to users.
# Basic permissions for administration and user
permissions = [
"can_manage_chat",
"can_delete_messages",
"can_manage_video_chats",
"can_restrict_members",
"can_promote_members",
"can_change_info",
"can_invite_users",
"can_post_messages",
"can_edit_messages",
"can_pin_messages",
"can_manage_topics",
"can_send_messages",
"can_send_audios",
"can_send_documents",
"can_send_photos",
"can_send_videos",
"can_send_video_notes",
"can_send_voice_notes",
"can_send_polls",
"can_send_other_messages",
"can_add_web_page_previews",
]
def __init__(self, perms: list[Any]) -> None:
self.perms = perms
async def check_permissions(self, permission, msg: Message) -> bool:
"""
Checking permissions, included to user.
:return:
"""
if permission.count(False) > 0 or permission.count(None) > 0:
await msg.answer(ru_cmds["no_perms"])
return False
return True
def generate_perms_list(self, user) -> list[Any]:
"""
Generates list of permissions, included to user
:param user:
:return:
"""
permission = []
for rule in self.perms:
if rule in permission:
exec(f"permission.append(user.{rule})")
return permission
async def __call__(self, msg: Message) -> bool:
user = await msg.chat.get_member(msg.from_user.id)
permission = self.generate_perms_list(user)
return True if user.status == ChatMemberStatus.CREATOR else self.check_permissions(permission, msg)
class BotHasPermissions(UserHasPermissions):
"""
Check, does bot have permissions, what user need to work with bot.
"""
async def __call__(self, msg: Message, *args, **kwargs):
bot = await msg.chat.get_member(msg.chat.bot.id)
permission = self.generate_perms_list(bot)
return self.check_permissions(permission, msg)
class IsOwner(Filter):
"""
Checks, is User are owner of chat
"""
def __init__(self, is_admin: bool) -> None:
self.is_admin = is_admin
async def __call__(self, msg: Message) -> bool:
user = await msg.chat.get_member(msg.from_user.id)
if ChatMemberStatus.CREATOR != user.status:
await msg.answer(ru_cmds["no-perms"])
return ChatMemberStatus.CREATOR == user.status
class IsAdminFilter(Filter):
def __init__(self, is_admin: bool) -> None:
self.is_admin = is_admin
async def __call__(self, msg: Message) -> bool:
user = await msg.chat.get_member(msg.from_user.id)
if ChatMemberStatus.CREATOR == user.status:
return True
return ChatMemberStatus.ADMINISTRATOR == user.status

View File

@ -0,0 +1,2 @@
from .inline import *
from .reply import start_keyboard

View File

@ -0,0 +1,237 @@
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from bozenka.telegram.utils.callbacks_factory import *
from bozenka.telegram.utils.simpler import gpt_categories, gpt4free_providers, generate_gpt4free_providers
"""
File, contains inline keyboard & menus and their work.
Right now only on Russian language, multi-language planning soon.
"""
def setup_keyboard(owner_id):
"""
Generate keybaord for /setup command
:param owner_id:
:return:
"""
kb = InlineKeyboardMarkup(inline_keyboard=[[
InlineKeyboardButton(text="Администраторы 👮‍♂",
callback_data=SetupCategory(owner_id=owner_id, category_name="Admins").pack())
], [
InlineKeyboardButton(text="Пользователи 👤",
callback_data=SetupCategory(owner_id=owner_id, category_name="Members").pack())
]])
return kb
def delete_keyboard(admin_id):
"""
Basic keyboard for all messages from bot.
By pressing this button, message from bot will get deleted.
:param admin_id:
:return:
"""
kb = InlineKeyboardMarkup(inline_keyboard=[[
InlineKeyboardButton(text="Спасибо ✅", callback_data=DeleteCallbackData(user_id_clicked=str(admin_id)).pack())
]])
return kb
def gpt_categories_keyboard(user_id: int):
"""
Create list keyboard list of gpt libraries, available in the bot
:param user_id:
:return: InlineKeyboardMarkup
"""
builder = InlineKeyboardBuilder()
for category in gpt_categories:
builder.button(text=category, callback_data=GptCategory(
user_id=str(user_id),
category=category,
))
return builder.as_markup()
def generate_gpt4free_page(user_id, page):
"""
Generate page of gpt providers, can be used by user.
:param user_id:
:param page:
:return:
"""
names = []
providers = generate_gpt4free_providers()
pages = [len(providers) // 4 - 1 if page - 1 == -1 else page - 1,
0 if page + 1 >= len(providers) // 4 else page + 1]
for provider, count in zip(providers, range(1, len(providers))):
if count not in [0 + page * 4, 1 + page * 4, 2 + page * 4, 3 + page * 4]:
continue
names.append(provider)
generated_page = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text=names[0],
callback_data=Gpt4FreeProvider
(user_id=user_id,
provider=names[0]).pack())],
[InlineKeyboardButton(text=names[1],
callback_data=Gpt4FreeProvider
(user_id=user_id,
provider=names[1]).pack())],
[InlineKeyboardButton(text=names[2],
callback_data=Gpt4FreeProvider
(user_id=user_id,
provider=names[2]).pack())],
[InlineKeyboardButton(text=names[3],
callback_data=Gpt4FreeProvider
(user_id=user_id,
provider=names[3]).pack())] if len(names) == 4 else [],
[
InlineKeyboardButton(text=str(len(providers) // 4 if page == 0 else "1"),
callback_data=Gpt4FreePage(
page=str(len(providers) // 4 - 1 if page == 0 else "1"),
user_id=user_id).pack()),
InlineKeyboardButton(text="⬅️", callback_data=Gpt4FreePage(page=pages[0], user_id=user_id).pack()),
InlineKeyboardButton(text=str(page + 1), callback_data="gotpages"),
InlineKeyboardButton(text="➡️", callback_data=Gpt4FreePage(page=pages[1], user_id=user_id).pack()),
InlineKeyboardButton(text=str(len(providers) // 4),
callback_data=Gpt4FreePage(
page=str(len(providers) // 4 - 1),
user_id=user_id).pack())
]
])
return generated_page
def gpt4free_category_keyboard(user_id):
"""
Generating list of GPT4Free providers, can be used to generate text.
Will be reworked.
:param user_id:
:return:
"""
builder = InlineKeyboardBuilder()
for provider in gpt4free_providers:
print(provider)
builder.button(text=provider,
callback_data=Gpt4FreeProvider(user_id=str(user_id), provider=provider).pack())
return builder.as_markup()
def gpt4free_models_keyboard(user_id, provider):
"""
Generating list of GPT4Free provider's models, can be used to generate text.
Will be also reworked.
"""
builder = InlineKeyboardBuilder()
if provider in gpt4free_providers:
for model in gpt4free_providers[provider]:
builder.row(InlineKeyboardButton(
text=model.replace("-", " "),
callback_data=Gpt4freeResult(user_id=str(user_id),
provider=provider,
model=model).pack()))
else:
providers = generate_gpt4free_providers()
if providers[provider].supports_gpt_4:
builder.row(InlineKeyboardButton(text="gpt 4",
callback_data=Gpt4freeResult(user_id=str(user_id),
provider=provider,
model="gpt-4").pack()))
if providers[provider].supports_gpt_35_turbo:
builder.row(InlineKeyboardButton(text="gpt 3.5 turbo",
callback_data=Gpt4freeResult(user_id=str(user_id),
provider=provider,
model="gpt-3.5-turbo").pack()))
return builder.as_markup()
def ban_keyboard(admin_id, ban_id):
"""
Generating menu for /ban command.
:param admin_id:
:param ban_id:
:return:
"""
kb = InlineKeyboardMarkup(inline_keyboard=[[
InlineKeyboardButton(text="Спасибо ✅", callback_data=DeleteCallbackData(user_id_clicked=str(admin_id)).pack())
], [
InlineKeyboardButton(text="Разбанить 🛠️", callback_data=UnbanData(user_id_unban=str(ban_id),
user_id_clicked=str(admin_id)).pack())
]])
return kb
def unban_keyboard(admin_id, ban_id):
"""
Generating menu for /unban command.
:param admin_id:
:param ban_id:
:return:
"""
print(ban_id)
kb = InlineKeyboardMarkup(inline_keyboard=[[
InlineKeyboardButton(text="Спасибо ✅", callback_data=DeleteCallbackData(user_id_clicked=str(admin_id)).pack())
], [
InlineKeyboardButton(text="Забанить 🛠️", callback_data=BanData(user_id_ban=str(ban_id),
user_id_clicked=str(admin_id)).pack())
]])
return kb
def mute_keyboard(admin_id, ban_id):
"""
Generating menu for /mute command.
:param admin_id:
:param ban_id:
:return:
"""
kb = InlineKeyboardMarkup(inline_keyboard=[[
InlineKeyboardButton(text="Спасибо ✅", callback_data=DeleteCallbackData(user_id_clicked=str(admin_id)).pack())
], [
InlineKeyboardButton(text="Размутить 🛠️",
callback_data=UnmuteData(user_id_unmute=ban_id, user_id_clicked=admin_id).pack())
]])
return kb
def unmute_keyboard(admin_id, ban_id):
"""
Generating menu for /unmute command.
:param admin_id:
:param ban_id:
:return:
"""
kb = InlineKeyboardMarkup(inline_keyboard=[[
InlineKeyboardButton(text="Спасибо ✅", callback_data=DeleteCallbackData(user_id_clicked=str(admin_id)).pack())
], [
InlineKeyboardButton(text="Замутить 🛠️",
callback_data=MuteData(user_id_mute=ban_id, user_id_clicked=admin_id).pack())
]])
return kb
def invite_keyboard(link: str, admin_id, chat_name):
"""
Generating menu for /invite command. Should be reworked.
:param link:
:param admin_id:
:param chat_name:
:return:
"""
link = link.replace("https://", "")
kb = InlineKeyboardMarkup(inline_keyboard=[[
InlineKeyboardButton(text=chat_name, url=link)
], [
InlineKeyboardButton(text="Отозвать 🛠️", callback_data=RevokeCallbackData(admin_id=admin_id, link=link).pack())
], [
InlineKeyboardButton(text="Спасибо ✅",
callback_data=DeleteCallbackData(user_id_clicked=str(admin_id)).pack())
]])
return kb
about_keyboard = InlineKeyboardBuilder()
about_keyboard.button(
text="Bozo Development", url="https://t.me/BozoDevelopement"
)

View File

@ -0,0 +1,10 @@
# This file had inside all reply keyboard / menus related to bot
from aiogram.types import KeyboardButton
from aiogram.utils.keyboard import ReplyKeyboardBuilder
start_keyboard = ReplyKeyboardBuilder()
start_keyboard.row(
KeyboardButton(text="Добавить в чат 🔌"),
KeyboardButton(text="Функционал 🔨")
)
start_keyboard.add(KeyboardButton(text="О разработчиках "))

View File

@ -0,0 +1,15 @@
__all__ = ["registration"]
import logging
from aiogram import Router, Dispatcher
def register_middlewares(dp: Dispatcher):
"""
Registering all middlewares of bot.
:param router:
:return:
"""
logging.log(msg=f"Registering middlewares of bot", level=logging.INFO)
# dp.message.middleware(Registration)

View File

@ -0,0 +1,39 @@
'''
import logging
from typing import Callable, Dict, Any, Awaitable
from aiogram import BaseMiddleware
from aiogram.types import Message, ContentType
from sqlalchemy import select
from sqlalchemy.orm import sessionmaker
from bozenka.db import Users
class Registration(BaseMiddleware):
"""
Checks, is user & group registered.
"""
async def __call__(
self,
handler: Callable[[Message, Dict[str, Any]], Awaitable[Any]],
event: Message,
data: Dict[str, Any]
) -> Any:
session_maker: sessionmaker = data["session_maker"]
async with session_maker() as session:
async with session.begin():
result = await session.execute(select(Users).where(Users.user_id == event.from_user.id and Users.chat_id == event.chat.id))
user = result.one_or_none()
logging.log(msg=f"Checking user registration with id={event.from_user.id}", level=logging.INFO)
if user is None:
logging.log(msg=f"Registering user into database with id={event.from_user.id}", level=logging.INFO)
user = Users(
user_id=event.from_user.id,
chat_id=event.chat.id
)
await session.merge(user)
if not data:
return await handler(event, None)
'''

View File

@ -0,0 +1,3 @@
from .solution_simpler import SolutionSimpler
from .texts import *
from .states import *

Some files were not shown because too many files have changed in this diff Show More