493 lines
18 KiB
Python
493 lines
18 KiB
Python
# Albatrobot - Telegram bot for RPG groups and similar
|
|
# Copyright (C) 2019, 2020, 2021, 2022, 2023 bursa-pastoris
|
|
#
|
|
# This file is part of Albatrobot.
|
|
#
|
|
# Albatrobot is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, version 3 of the License.
|
|
#
|
|
# Albatrobot is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with Albatrobot. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
|
|
import csv
|
|
from datetime import datetime
|
|
import hashlib
|
|
import logging
|
|
import os
|
|
import pyttsx3
|
|
import random
|
|
from re import findall as find
|
|
import signal
|
|
import time
|
|
|
|
import Levenshtein
|
|
import psutil
|
|
from rolldice import roll_dice, rolldice
|
|
from telegram import BotCommand
|
|
from telegram.constants import ChatAction, ChatMemberStatus, ChatType, ParseMode
|
|
|
|
from constants import *
|
|
|
|
# Prepare paths
|
|
required_paths = [LOGDIR,TMPVOICEDIR]
|
|
for i in required_paths:
|
|
os.makedirs(i,exist_ok=True)
|
|
|
|
# Logging
|
|
logger = logging.getLogger('albatrobot_logger')
|
|
logger.setLevel(logging.DEBUG)
|
|
logformat = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
|
|
loghandler_toconsole = logging.StreamHandler()
|
|
loghandler_toconsole.setLevel(logging.DEBUG)
|
|
loghandler_toconsole.setFormatter(logformat)
|
|
loghandler_tofile = logging.FileHandler(f'{LOGDIR}/albatrobot.log')
|
|
loghandler_tofile.setLevel(logging.DEBUG)
|
|
loghandler_tofile.setFormatter(logformat)
|
|
logger.addHandler(loghandler_toconsole)
|
|
logger.addHandler(loghandler_tofile)
|
|
|
|
|
|
# Internal functions
|
|
def closest(good_list, item):
|
|
"""Return the item(s) of good_list closest to item.
|
|
|
|
"Closest" means "with the shortest Levenshtein distance".
|
|
"""
|
|
distances = {}
|
|
for i in good_list:
|
|
i_dist = Levenshtein.distance(i,item)
|
|
if i_dist not in distances:
|
|
distances[i_dist] = list()
|
|
distances[i_dist].append(i)
|
|
closest_items = distances[min(distances.keys())]
|
|
return closest_items
|
|
|
|
|
|
async def unauthorized_user(update, context):
|
|
"""Inform the user that he is not authorized to used the bot."""
|
|
await context.bot.send_message(
|
|
chat_id=update.effective_chat.id,
|
|
text=_('User not authorized.')
|
|
)
|
|
|
|
|
|
async def unknown_command(update, context):
|
|
"""Inform the user that the command does not exist."""
|
|
await context.bot.send_message(
|
|
chat_id=update.effective_chat.id,
|
|
text=_('Unknown command.')
|
|
)
|
|
|
|
|
|
#def error_handler(update, context): # This should be deeply reworked. Sooner or later.
|
|
# system_logger.error(context.error)
|
|
# print(context.error)
|
|
|
|
|
|
# Basic functions
|
|
async def start(update, context):
|
|
"""Show basic info when a user contacts the bot for the first time."""
|
|
await context.bot.send_message(
|
|
chat_id=update.effective_chat.id,
|
|
text=_("Hello, I'm Albatrobot\! Type \/help for help\!\n"
|
|
"\n"
|
|
"\-\-\-\n"
|
|
"_Albatrobot is distributed under"
|
|
" [AGPL\-3\.0\-only](https:\/\/www.gnu.org\/licenses\/agpl-3.0.txt)"
|
|
" license\. Its source code is publicly available on"
|
|
" [Disroot](https:\/\/git.disroot.org\/bursapastoris\/albatrobot)\._"
|
|
),
|
|
parse_mode=ParseMode.MARKDOWN_V2
|
|
)
|
|
|
|
|
|
async def stop(update, context):
|
|
"""Stop the bot."""
|
|
pid = os.getpid()
|
|
parent = psutil.Process(pid)
|
|
await context.bot.send_message(
|
|
chat_id=update.effective_chat.id,
|
|
text=_('Albatrobot will be stopped in a few moments.')
|
|
)
|
|
logger.info('Albatrobot was stopped.')
|
|
parent.send_signal(signal.SIGTERM)
|
|
|
|
|
|
async def help(update, context):
|
|
"""Show a help message.
|
|
|
|
The message doesn't correspond to the inline suggestion set in
|
|
@BotFather. They can be set issuing the command /init.
|
|
"""
|
|
# Common commands
|
|
com_header = _('*Albatrobot commands*')
|
|
com_help = {
|
|
_('christmas'):_('get an image'),
|
|
_('easter')+_(' `\(\<author\>\)`'):_('get a citation from given author,'
|
|
' or a random one if no author is specified\.'),
|
|
_('end\_of\_year\_dinner'):_('send all citations in the archive to a'
|
|
' random user\.'),
|
|
_('epiphony')+_(' `<text\>`'):_('get a voice message from'
|
|
' Albatrobot reading given text'),
|
|
_('help'):_('get help about bot commands\.'),
|
|
_('legal'):_('get legal info about privacy and copyright\.'),
|
|
_('roll')+_(' `\<dice\>\(\<modifiers\>\)`'):_('roll dice, with or'
|
|
' without modifiers\. Launch without arguments to get help on the'
|
|
' modifiers\.'),
|
|
_('version'):_("get Albatrobot's version")
|
|
}
|
|
com_help = [f'\- \/{i}: {com_help[i]}' for i in com_help]
|
|
com_help.sort()
|
|
com_help = '\n'.join(com_help)
|
|
com_help = f'{com_header}\n{com_help}'
|
|
|
|
# Admin commands
|
|
adm_header = _('*Additional commands for bot admins*')
|
|
adm_help = {
|
|
_('reset'):_('reset the bot\. This updates inline suggestions'
|
|
' and purges /epiphony cached files\.'),
|
|
_('stop'):_('stop Albatrobot\. To be used _only_ in case of emergency:'
|
|
' restarting the bot after this command is used requires direct'
|
|
' access to the server\.')
|
|
}
|
|
adm_help = [f'\- \/{i}: {adm_help[i]}' for i in adm_help]
|
|
adm_help.sort()
|
|
adm_help = '\n'.join(adm_help)
|
|
adm_help = f'{adm_header}\n{adm_help}'
|
|
|
|
# Addendum
|
|
nonadm_addendum = _(
|
|
'Bot admins have access to additional commands and can get help on'
|
|
' them using /help in a private chat\.'
|
|
)
|
|
|
|
# Actually send stuff
|
|
if (update.effective_chat.type == ChatType.PRIVATE
|
|
and update.effective_user.id in ADMINS):
|
|
help_text = _(f'{com_help}\n\n{adm_help}')
|
|
else:
|
|
help_text = _(f'{com_help}\n\n{nonadm_addendum}')
|
|
|
|
await context.bot.send_message(
|
|
chat_id=update.effective_chat.id,
|
|
text=help_text,
|
|
parse_mode=ParseMode.MARKDOWN_V2
|
|
)
|
|
|
|
|
|
async def reset(update, context):
|
|
"""Reset the bot."""
|
|
# Set commands suggestions
|
|
commands=[BotCommand(_('christmas'),_('Get an image')),
|
|
BotCommand(_('easter'),_('Get a citation')),
|
|
BotCommand(_('end_of_year_dinner'),_('Send all citations to a random user')),
|
|
BotCommand(_('help'),_('Get help'))]
|
|
await context.bot.send_message(
|
|
chat_id=update.effective_chat.id,
|
|
text=_('Initializing...'))
|
|
await context.bot.set_my_commands(commands)
|
|
await context.bot.send_message(
|
|
chat_id=update.effective_chat.id,
|
|
text=_('Done'))
|
|
# Delete /epiphony files
|
|
for i in os.listdir(TMPVOICEDIR):
|
|
os.remove(f'{TMPVOICEDIR}/{i}')
|
|
|
|
|
|
async def legal(update, context):
|
|
"""
|
|
Send legal information about privacy and copyright.
|
|
"""
|
|
logger.info('A user asked for legal info.')
|
|
if LANG != "en":
|
|
note=_('The only official version is the English one and any'
|
|
' imperfect translation would be misleading. Therefore, this'
|
|
' document will not be translated.')
|
|
else:
|
|
note=""
|
|
await context.bot.send_chat_action(
|
|
chat_id=update.effective_chat.id,
|
|
action=ChatAction.UPLOAD_DOCUMENT)
|
|
with open('doc/LEGAL.pdf','rb') as legalnotes:
|
|
await context.bot.send_document(
|
|
chat_id=update.effective_chat.id,
|
|
document=legalnotes,
|
|
filename='albatrobot-legal-notes.pdf',
|
|
caption=note
|
|
)
|
|
|
|
|
|
async def version(update, context):
|
|
"""Send Albatrobot's current version."""
|
|
await context.bot.send_message(
|
|
chat_id=update.effective_chat.id,
|
|
text=_("*Albatrobot's version:* {}").format(VERSION.replace('.','\.')),
|
|
parse_mode=ParseMode.MARKDOWN_V2
|
|
)
|
|
|
|
|
|
# Commands
|
|
async def christmas(update, context):
|
|
"""Send a random image from the archive."""
|
|
last_msg = context.user_data.get('last_christmas',
|
|
datetime.fromisoformat('1970-01-01T00:00'))
|
|
if datetime.now()-last_msg < CHRISTMASCOOLDOWN:
|
|
await context.bot.send_message(
|
|
chat_id=update.effective_chat.id,
|
|
text=_("Christmas is good, but you can't invoke it so often! "
|
|
"Try again in some seconds...")
|
|
)
|
|
logger.info('A user wants too much Christmas.')
|
|
return
|
|
|
|
with open(f'{IMGPATH}/imgs.csv','r') as imgs_file:
|
|
imgs = list(csv.DictReader(imgs_file))
|
|
img = random.choice(imgs)
|
|
img_file = img['filename']
|
|
img_capt = img['caption']
|
|
|
|
await context.bot.send_chat_action(
|
|
chat_id=update.effective_chat.id,
|
|
action=ChatAction.UPLOAD_PHOTO
|
|
)
|
|
await context.bot.send_photo(
|
|
chat_id=update.effective_chat.id,
|
|
photo=open(f'{IMGPATH}/{img_file}', 'rb'),
|
|
caption=img_capt
|
|
)
|
|
context.user_data['last_christmas'] = datetime.now()
|
|
|
|
|
|
async def easter(update, context):
|
|
"""Send a citation from the archive.
|
|
|
|
User can either choose the author or let the bot pick a random one.
|
|
"""
|
|
last_msg = context.user_data.get('last_easter',
|
|
datetime.fromisoformat('1970-01-01T00:00'))
|
|
if datetime.now()-last_msg < EASTERCOOLDOWN:
|
|
await context.bot.send_message(
|
|
chat_id=update.effective_chat.id,
|
|
text=_("Easter is good, but you can't invoke it so often!"
|
|
" Retry in some seconds...")
|
|
)
|
|
logger.info('A user wants too much Easter.')
|
|
return
|
|
|
|
with open(CITARCHIVE,'r') as cits_file:
|
|
cits = csv.DictReader(cits_file)
|
|
authors = tuple({row['author'] for row in cits})
|
|
|
|
# If the user asked for an author...
|
|
if len(context.args) > 0:
|
|
# ...and it exists
|
|
if context.args[0] in authors:
|
|
author = str(context.args[0])
|
|
# ...and it does not
|
|
else:
|
|
closest_authors = closest(authors,context.args[0])
|
|
if len(closest_authors) == 1:
|
|
suggestion = _('The most similar is {}.').format(closest_authors[0])
|
|
else:
|
|
last_author = closest_authors[-1]
|
|
other_authors = ', '.join(closest_authors[:-1])
|
|
suggestion = _('The most similar are {} and {}.').format(
|
|
other_authors, last_author)
|
|
await context.bot.send_message(
|
|
chat_id=update.effective_chat.id,
|
|
text=_('Author {required_author} is not present in the archive!'
|
|
' {suggestion}\n'
|
|
'\n'
|
|
'N.b.: CaSe Is ImPoRtAnT!').format(
|
|
required_author=context.args[0],
|
|
suggestion=suggestion)
|
|
)
|
|
logger.info('A user wants easter from non registered'
|
|
' author {author}.'.format(author=context.args[0]))
|
|
return
|
|
# If he did not
|
|
else:
|
|
author = random.choice(authors)
|
|
|
|
with open(CITARCHIVE,'r') as cits_file:
|
|
cits = csv.DictReader(cits_file)
|
|
cits_pool = [cit['text'] for cit in cits if cit['author'] == author]
|
|
|
|
await context.bot.send_message(
|
|
chat_id=update.effective_chat.id,
|
|
text=_('{cit}\n\n~{author}').format(
|
|
cit=random.choice(cits_pool),
|
|
author=author
|
|
)
|
|
)
|
|
context.user_data['last_easter'] = datetime.now()
|
|
|
|
|
|
async def end_of_year_dinner(update, context):
|
|
"""Send a user each citation from the archive."""
|
|
with open(CITARCHIVE,'r') as cits_file:
|
|
cits_archive = list(csv.DictReader(cits_file))
|
|
cits_number = 0
|
|
dest_user_id = random.choice(USERS)
|
|
dest_user = await context.bot.getChatMember(chat_id=dest_user_id,user_id=dest_user_id)
|
|
dest_user_firstname = dest_user.user.first_name
|
|
|
|
await context.bot.send_message(
|
|
chat_id=dest_user_id,
|
|
text=_('Hello there, here an end of year dinner offered by {}!').format(
|
|
update.message.from_user.first_name)
|
|
)
|
|
|
|
for cit in cits_archive:
|
|
await context.bot.send_message(
|
|
chat_id=dest_user_id,
|
|
text=_('{cit}\n\n~{author}'.format(cit=cit['text'],author=cit['author']))
|
|
)
|
|
cits_number +=1
|
|
time.sleep(1.1) # To avoid hitting Telegram's message rate limit
|
|
|
|
await context.bot.send_message(
|
|
chat_id=update.effective_chat.id,
|
|
text=_('End of year dinner of {cits_number} plates delvered to {dest}!').format(
|
|
cits_number=cits_number,
|
|
dest=dest_user_firstname
|
|
)
|
|
)
|
|
logger.info('A user sent an end of year dinner.')
|
|
|
|
|
|
async def epiphony(update, context):
|
|
sentence = ' '.join(context.args).upper()
|
|
# Handle command without args
|
|
if len(sentence) == 0:
|
|
await context.bot.send_message(
|
|
chat_id=update.effective_chat.id,
|
|
text=_('What should I say?')
|
|
)
|
|
# Handle command with args
|
|
else:
|
|
# Generate and send audio
|
|
sentence_hash = hashlib.md5(sentence.encode(encoding='UTF-8')).hexdigest()
|
|
voice_file = f'{TMPVOICEDIR}/{sentence_hash}.ogg'
|
|
await context.bot.send_chat_action(
|
|
chat_id=update.effective_chat.id,
|
|
action=ChatAction.RECORD_VOICE)
|
|
if not os.path.exists(voice_file):
|
|
engine = pyttsx3.init()
|
|
engine.setProperty('voice',EPIPHONYLANG)
|
|
engine.setProperty('rate',EPIPHONYSPEED)
|
|
engine.save_to_file(sentence,voice_file)
|
|
engine.runAndWait()
|
|
time.sleep(1) # Workaround to prevent ReferenceError due to using
|
|
# pyttsx3 with asyncio
|
|
await context.bot.send_voice(
|
|
chat_id=update.effective_chat.id,
|
|
voice=open(voice_file,'rb'),
|
|
reply_to_message_id=update.effective_message.reply_to_message.message_id
|
|
if update.effective_message.reply_to_message else None,
|
|
caption=_('Voice message from {sender_name}.'.format(
|
|
sender_name=update.effective_user.first_name))
|
|
)
|
|
# If in group chat and have permission, delete the message that issued
|
|
# the command
|
|
bot_data = await context.bot.get_me()
|
|
bot_chatmember = await context.bot.get_chat_member(
|
|
chat_id=update.effective_chat.id,
|
|
user_id=bot_data.id)
|
|
if (bot_chatmember.status == ChatMemberStatus.ADMINISTRATOR
|
|
and bot_chatmember.can_delete_messages):
|
|
await context.bot.delete_message(
|
|
chat_id=update.effective_chat.id,
|
|
message_id=update.effective_message.message_id)
|
|
|
|
|
|
async def roll(update, context):
|
|
"""Roll a dice following DnD notation: <dice number>d<die faces>.
|
|
|
|
Many modifiers are supported. Dice number and faces are limited to 500
|
|
each, to prevent DoS effects.
|
|
"""
|
|
to_roll = ''.join(context.args)
|
|
if len(to_roll) == 0:
|
|
roll_modifiers = {
|
|
'\+n':_('add `n` to the roll'),
|
|
'\-n':_('subtract `n` from the roll'),
|
|
'\*n':_('multiply the roll by `n`'),
|
|
'/n':_('divide the roll by `n`'),
|
|
'//n':_('floor divide the roll by `n`'),
|
|
'\*\*n':_('exponentiate the roll to `n`'),
|
|
'K\(n\)':_('keep highest `n` rolls \(defaults to `1`\)'),
|
|
'k\(n\)':_('keep lowest `n` rolls \(defaults to `1`\)'),
|
|
'X\(n\)':_('drop highest `n` rolls \(defaults to `1`\)'),
|
|
'x\(n\)':_('drop lowest `n` rolls \(defaults to `1`\)'),
|
|
'\!\(n\)':_('explode rolls equal to `n` \(defaults to die size\)'),
|
|
'\!\<n':_('explode rolls lower than `n`'),
|
|
'\!\>n':_('explode rolls higher than `n`'),
|
|
'f\<n':_('count failures as rolls lower than `n`'),
|
|
'f\>n':_('count failures as rolls higher than `n`'),
|
|
'\<n':_('count successes as rolls lower than `n`'),
|
|
'\>n':_('count successes as rolls higher than `n`'),
|
|
'\!p':_('penetrate rolls equal to dice size'),
|
|
'\!p\<n':_('penetrate rolls lower than `n`'),
|
|
'\!p\>n':_('penetrate rolls higher than `n`'),
|
|
'an':_('add `n` to each roll'),
|
|
'sn':_('subtract `n` from each roll'),
|
|
'mn':_('multiply by `n` each roll'),
|
|
'R\(n\)':_('reroll each dice that rolled `n` until there are no'
|
|
' `n` left\(defaults to `1`\)'),
|
|
'r\(n\)':_('reroll once each dice that rolled `n` \(defaults to'
|
|
' 1\)'),
|
|
'R\<n':_('reroll dice that rolled lower than `n` until there are'
|
|
' no results lower than `n` left'),
|
|
'R\>n':_('reroll dice that rolled higher than `n` until there'
|
|
' no results higher than `n` left'),
|
|
'r\<n':_('reroll once dice that roll lower than `n`'),
|
|
'r\>n':_('reroll once dice that roll higher than `n`')
|
|
}
|
|
roll_modifiers = [f'\- `{i}`: {roll_modifiers[i]}' for i in roll_modifiers]
|
|
roll_modifiers = '\n'.join(roll_modifiers)
|
|
roll_help = _(
|
|
"To roll, use the syntax `\<number of dice\>d\<dice size\>`\. You"
|
|
" can also use the following modifiers\. Note that:\n"
|
|
"\- `\(n\)` means that an integer number may be specified, but if"
|
|
" it isn't the specified default value is assumed\n"
|
|
"\- `n` means that an integer number is required\n"
|
|
"\- any other symbol means exactly that symbol\n"
|
|
"\n"
|
|
"\n"
|
|
)+roll_modifiers # "+" is a workaround to incompatibility between
|
|
# f-strings and parse_mode
|
|
await context.bot.send_message(
|
|
chat_id=update.effective_chat.id,
|
|
text=roll_help,
|
|
parse_mode=ParseMode.MARKDOWN_V2,
|
|
disable_web_page_preview=True
|
|
)
|
|
elif max([int(i) for i in find('\d+',to_roll)]) > 500: # Prevent DoS
|
|
await context.bot.send_message(
|
|
chat_id=update.effective_chat.id,
|
|
text=_('You can roll up to 500 dice with up to 500 faces at once!')
|
|
)
|
|
else:
|
|
try:
|
|
the_roll = roll_dice(to_roll)
|
|
await context.bot.send_message(
|
|
chat_id=update.effective_chat.id,
|
|
text=_('{user_name} rolled {rolls} = {result}').format(
|
|
user_name=update.message.from_user.first_name,
|
|
rolls=the_roll[1],result=the_roll[0]
|
|
)
|
|
)
|
|
except rolldice.DiceGroupException:
|
|
await context.bot.send_message(
|
|
chat_id=update.effective_chat.id,
|
|
text=_('You used a wrong syntax!')
|
|
)
|