albatrobot/albatrobot.py

493 lines
18 KiB
Python

# Albatrobot - Telegram bot for RPG groups and similar
# Copyright (C) 2019, 2020, 2021, 2022, 2023 bursa-pastoris
#
# This file is part of Albatrobot.
#
# Albatrobot is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# Albatrobot is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Albatrobot. If not, see <https://www.gnu.org/licenses/>.
import csv
from datetime import datetime
import hashlib
import logging
import os
import pyttsx3
import random
from re import findall as find
import signal
import time
import Levenshtein
import psutil
from rolldice import roll_dice, rolldice
from telegram import BotCommand
from telegram.constants import ChatAction, ChatMemberStatus, ChatType, ParseMode
from constants import *
# Prepare paths
required_paths = [LOGDIR,TMPVOICEDIR]
for i in required_paths:
os.makedirs(i,exist_ok=True)
# Logging
logger = logging.getLogger('albatrobot_logger')
logger.setLevel(logging.DEBUG)
logformat = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
loghandler_toconsole = logging.StreamHandler()
loghandler_toconsole.setLevel(logging.DEBUG)
loghandler_toconsole.setFormatter(logformat)
loghandler_tofile = logging.FileHandler(f'{LOGDIR}/albatrobot.log')
loghandler_tofile.setLevel(logging.DEBUG)
loghandler_tofile.setFormatter(logformat)
logger.addHandler(loghandler_toconsole)
logger.addHandler(loghandler_tofile)
# Internal functions
def closest(good_list, item):
"""Return the item(s) of good_list closest to item.
"Closest" means "with the shortest Levenshtein distance".
"""
distances = {}
for i in good_list:
i_dist = Levenshtein.distance(i,item)
if i_dist not in distances:
distances[i_dist] = list()
distances[i_dist].append(i)
closest_items = distances[min(distances.keys())]
return closest_items
async def unauthorized_user(update, context):
"""Inform the user that he is not authorized to used the bot."""
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=_('User not authorized.')
)
async def unknown_command(update, context):
"""Inform the user that the command does not exist."""
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=_('Unknown command.')
)
#def error_handler(update, context): # This should be deeply reworked. Sooner or later.
# system_logger.error(context.error)
# print(context.error)
# Basic functions
async def start(update, context):
"""Show basic info when a user contacts the bot for the first time."""
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=_("Hello, I'm Albatrobot\! Type \/help for help\!\n"
"\n"
"\-\-\-\n"
"_Albatrobot is distributed under"
" [AGPL\-3\.0\-only](https:\/\/www.gnu.org\/licenses\/agpl-3.0.txt)"
" license\. Its source code is publicly available on"
" [Disroot](https:\/\/git.disroot.org\/bursapastoris\/albatrobot)\._"
),
parse_mode=ParseMode.MARKDOWN_V2
)
async def stop(update, context):
"""Stop the bot."""
pid = os.getpid()
parent = psutil.Process(pid)
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=_('Albatrobot will be stopped in a few moments.')
)
logger.info('Albatrobot was stopped.')
parent.send_signal(signal.SIGTERM)
async def help(update, context):
"""Show a help message.
The message doesn't correspond to the inline suggestion set in
@BotFather. They can be set issuing the command /init.
"""
# Common commands
com_header = _('*Albatrobot commands*')
com_help = {
_('christmas'):_('get an image'),
_('easter')+_(' `\(\<author\>\)`'):_('get a citation from given author,'
' or a random one if no author is specified\.'),
_('end\_of\_year\_dinner'):_('send all citations in the archive to a'
' random user\.'),
_('epiphony')+_(' `<text\>`'):_('get a voice message from'
' Albatrobot reading given text'),
_('help'):_('get help about bot commands\.'),
_('legal'):_('get legal info about privacy and copyright\.'),
_('roll')+_(' `\<dice\>\(\<modifiers\>\)`'):_('roll dice, with or'
' without modifiers\. Launch without arguments to get help on the'
' modifiers\.'),
_('version'):_("get Albatrobot's version")
}
com_help = [f'\- \/{i}: {com_help[i]}' for i in com_help]
com_help.sort()
com_help = '\n'.join(com_help)
com_help = f'{com_header}\n{com_help}'
# Admin commands
adm_header = _('*Additional commands for bot admins*')
adm_help = {
_('reset'):_('reset the bot\. This updates inline suggestions'
' and purges /epiphony cached files\.'),
_('stop'):_('stop Albatrobot\. To be used _only_ in case of emergency:'
' restarting the bot after this command is used requires direct'
' access to the server\.')
}
adm_help = [f'\- \/{i}: {adm_help[i]}' for i in adm_help]
adm_help.sort()
adm_help = '\n'.join(adm_help)
adm_help = f'{adm_header}\n{adm_help}'
# Addendum
nonadm_addendum = _(
'Bot admins have access to additional commands and can get help on'
' them using /help in a private chat\.'
)
# Actually send stuff
if (update.effective_chat.type == ChatType.PRIVATE
and update.effective_user.id in ADMINS):
help_text = _(f'{com_help}\n\n{adm_help}')
else:
help_text = _(f'{com_help}\n\n{nonadm_addendum}')
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=help_text,
parse_mode=ParseMode.MARKDOWN_V2
)
async def reset(update, context):
"""Reset the bot."""
# Set commands suggestions
commands=[BotCommand(_('christmas'),_('Get an image')),
BotCommand(_('easter'),_('Get a citation')),
BotCommand(_('end_of_year_dinner'),_('Send all citations to a random user')),
BotCommand(_('help'),_('Get help'))]
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=_('Initializing...'))
await context.bot.set_my_commands(commands)
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=_('Done'))
# Delete /epiphony files
for i in os.listdir(TMPVOICEDIR):
os.remove(f'{TMPVOICEDIR}/{i}')
async def legal(update, context):
"""
Send legal information about privacy and copyright.
"""
logger.info('A user asked for legal info.')
if LANG != "en":
note=_('The only official version is the English one and any'
' imperfect translation would be misleading. Therefore, this'
' document will not be translated.')
else:
note=""
await context.bot.send_chat_action(
chat_id=update.effective_chat.id,
action=ChatAction.UPLOAD_DOCUMENT)
with open('doc/LEGAL.pdf','rb') as legalnotes:
await context.bot.send_document(
chat_id=update.effective_chat.id,
document=legalnotes,
filename='albatrobot-legal-notes.pdf',
caption=note
)
async def version(update, context):
"""Send Albatrobot's current version."""
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=_("*Albatrobot's version:* {}").format(VERSION.replace('.','\.')),
parse_mode=ParseMode.MARKDOWN_V2
)
# Commands
async def christmas(update, context):
"""Send a random image from the archive."""
last_msg = context.user_data.get('last_christmas',
datetime.fromisoformat('1970-01-01T00:00'))
if datetime.now()-last_msg < CHRISTMASCOOLDOWN:
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=_("Christmas is good, but you can't invoke it so often! "
"Try again in some seconds...")
)
logger.info('A user wants too much Christmas.')
return
with open(f'{IMGPATH}/imgs.csv','r') as imgs_file:
imgs = list(csv.DictReader(imgs_file))
img = random.choice(imgs)
img_file = img['filename']
img_capt = img['caption']
await context.bot.send_chat_action(
chat_id=update.effective_chat.id,
action=ChatAction.UPLOAD_PHOTO
)
await context.bot.send_photo(
chat_id=update.effective_chat.id,
photo=open(f'{IMGPATH}/{img_file}', 'rb'),
caption=img_capt
)
context.user_data['last_christmas'] = datetime.now()
async def easter(update, context):
"""Send a citation from the archive.
User can either choose the author or let the bot pick a random one.
"""
last_msg = context.user_data.get('last_easter',
datetime.fromisoformat('1970-01-01T00:00'))
if datetime.now()-last_msg < EASTERCOOLDOWN:
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=_("Easter is good, but you can't invoke it so often!"
" Retry in some seconds...")
)
logger.info('A user wants too much Easter.')
return
with open(CITARCHIVE,'r') as cits_file:
cits = csv.DictReader(cits_file)
authors = tuple({row['author'] for row in cits})
# If the user asked for an author...
if len(context.args) > 0:
# ...and it exists
if context.args[0] in authors:
author = str(context.args[0])
# ...and it does not
else:
closest_authors = closest(authors,context.args[0])
if len(closest_authors) == 1:
suggestion = _('The most similar is {}.').format(closest_authors[0])
else:
last_author = closest_authors[-1]
other_authors = ', '.join(closest_authors[:-1])
suggestion = _('The most similar are {} and {}.').format(
other_authors, last_author)
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=_('Author {required_author} is not present in the archive!'
' {suggestion}\n'
'\n'
'N.b.: CaSe Is ImPoRtAnT!').format(
required_author=context.args[0],
suggestion=suggestion)
)
logger.info('A user wants easter from non registered'
' author {author}.'.format(author=context.args[0]))
return
# If he did not
else:
author = random.choice(authors)
with open(CITARCHIVE,'r') as cits_file:
cits = csv.DictReader(cits_file)
cits_pool = [cit['text'] for cit in cits if cit['author'] == author]
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=_('{cit}\n\n~{author}').format(
cit=random.choice(cits_pool),
author=author
)
)
context.user_data['last_easter'] = datetime.now()
async def end_of_year_dinner(update, context):
"""Send a user each citation from the archive."""
with open(CITARCHIVE,'r') as cits_file:
cits_archive = list(csv.DictReader(cits_file))
cits_number = 0
dest_user_id = random.choice(USERS)
dest_user = await context.bot.getChatMember(chat_id=dest_user_id,user_id=dest_user_id)
dest_user_firstname = dest_user.user.first_name
await context.bot.send_message(
chat_id=dest_user_id,
text=_('Hello there, here an end of year dinner offered by {}!').format(
update.message.from_user.first_name)
)
for cit in cits_archive:
await context.bot.send_message(
chat_id=dest_user_id,
text=_('{cit}\n\n~{author}'.format(cit=cit['text'],author=cit['author']))
)
cits_number +=1
time.sleep(1.1) # To avoid hitting Telegram's message rate limit
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=_('End of year dinner of {cits_number} plates delvered to {dest}!').format(
cits_number=cits_number,
dest=dest_user_firstname
)
)
logger.info('A user sent an end of year dinner.')
async def epiphony(update, context):
sentence = ' '.join(context.args).upper()
# Handle command without args
if len(sentence) == 0:
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=_('What should I say?')
)
# Handle command with args
else:
# Generate and send audio
sentence_hash = hashlib.md5(sentence.encode(encoding='UTF-8')).hexdigest()
voice_file = f'{TMPVOICEDIR}/{sentence_hash}.ogg'
await context.bot.send_chat_action(
chat_id=update.effective_chat.id,
action=ChatAction.RECORD_VOICE)
if not os.path.exists(voice_file):
engine = pyttsx3.init()
engine.setProperty('voice',EPIPHONYLANG)
engine.setProperty('rate',EPIPHONYSPEED)
engine.save_to_file(sentence,voice_file)
engine.runAndWait()
time.sleep(1) # Workaround to prevent ReferenceError due to using
# pyttsx3 with asyncio
await context.bot.send_voice(
chat_id=update.effective_chat.id,
voice=open(voice_file,'rb'),
reply_to_message_id=update.effective_message.reply_to_message.message_id
if update.effective_message.reply_to_message else None,
caption=_('Voice message from {sender_name}.'.format(
sender_name=update.effective_user.first_name))
)
# If in group chat and have permission, delete the message that issued
# the command
bot_data = await context.bot.get_me()
bot_chatmember = await context.bot.get_chat_member(
chat_id=update.effective_chat.id,
user_id=bot_data.id)
if (bot_chatmember.status == ChatMemberStatus.ADMINISTRATOR
and bot_chatmember.can_delete_messages):
await context.bot.delete_message(
chat_id=update.effective_chat.id,
message_id=update.effective_message.message_id)
async def roll(update, context):
"""Roll a dice following DnD notation: <dice number>d<die faces>.
Many modifiers are supported. Dice number and faces are limited to 500
each, to prevent DoS effects.
"""
to_roll = ''.join(context.args)
if len(to_roll) == 0:
roll_modifiers = {
'\+n':_('add `n` to the roll'),
'\-n':_('subtract `n` from the roll'),
'\*n':_('multiply the roll by `n`'),
'/n':_('divide the roll by `n`'),
'//n':_('floor divide the roll by `n`'),
'\*\*n':_('exponentiate the roll to `n`'),
'K\(n\)':_('keep highest `n` rolls \(defaults to `1`\)'),
'k\(n\)':_('keep lowest `n` rolls \(defaults to `1`\)'),
'X\(n\)':_('drop highest `n` rolls \(defaults to `1`\)'),
'x\(n\)':_('drop lowest `n` rolls \(defaults to `1`\)'),
'\!\(n\)':_('explode rolls equal to `n` \(defaults to die size\)'),
'\!\<n':_('explode rolls lower than `n`'),
'\!\>n':_('explode rolls higher than `n`'),
'f\<n':_('count failures as rolls lower than `n`'),
'f\>n':_('count failures as rolls higher than `n`'),
'\<n':_('count successes as rolls lower than `n`'),
'\>n':_('count successes as rolls higher than `n`'),
'\!p':_('penetrate rolls equal to dice size'),
'\!p\<n':_('penetrate rolls lower than `n`'),
'\!p\>n':_('penetrate rolls higher than `n`'),
'an':_('add `n` to each roll'),
'sn':_('subtract `n` from each roll'),
'mn':_('multiply by `n` each roll'),
'R\(n\)':_('reroll each dice that rolled `n` until there are no'
' `n` left\(defaults to `1`\)'),
'r\(n\)':_('reroll once each dice that rolled `n` \(defaults to'
' 1\)'),
'R\<n':_('reroll dice that rolled lower than `n` until there are'
' no results lower than `n` left'),
'R\>n':_('reroll dice that rolled higher than `n` until there'
' no results higher than `n` left'),
'r\<n':_('reroll once dice that roll lower than `n`'),
'r\>n':_('reroll once dice that roll higher than `n`')
}
roll_modifiers = [f'\- `{i}`: {roll_modifiers[i]}' for i in roll_modifiers]
roll_modifiers = '\n'.join(roll_modifiers)
roll_help = _(
"To roll, use the syntax `\<number of dice\>d\<dice size\>`\. You"
" can also use the following modifiers\. Note that:\n"
"\- `\(n\)` means that an integer number may be specified, but if"
" it isn't the specified default value is assumed\n"
"\- `n` means that an integer number is required\n"
"\- any other symbol means exactly that symbol\n"
"\n"
"\n"
)+roll_modifiers # "+" is a workaround to incompatibility between
# f-strings and parse_mode
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=roll_help,
parse_mode=ParseMode.MARKDOWN_V2,
disable_web_page_preview=True
)
elif max([int(i) for i in find('\d+',to_roll)]) > 500: # Prevent DoS
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=_('You can roll up to 500 dice with up to 500 faces at once!')
)
else:
try:
the_roll = roll_dice(to_roll)
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=_('{user_name} rolled {rolls} = {result}').format(
user_name=update.message.from_user.first_name,
rolls=the_roll[1],result=the_roll[0]
)
)
except rolldice.DiceGroupException:
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=_('You used a wrong syntax!')
)