taskbot/app/helpers.py
2021-01-21 16:20:39 +01:00

201 lines
No EOL
7.7 KiB
Python

"""business logic and database"""
import json
from collections import defaultdict, namedtuple
from functools import singledispatch
from sqlite3 import Error as DBError
from typing import Optional, Union, Literal
from xlsxwriter import Workbook
from telegram import Chat, ChatMember, Message, MessageEntity, Update, User
from telegram.ext import CallbackContext
import dbwrapper as db
from constants import ANY_MENTION, USERS, PARSED_MESSAGEENTITIES, CONFIG_FILE_PATH, PERIODS
import tempfile
# flags and conf
with open(CONFIG_FILE_PATH) as config_file:
config = json.load(config_file)
development = config["development"]
#types
Changes = namedtuple("Changes", ["added","unchanged","deleted"])
def is_group(chat: Chat) -> bool:
return chat.type in (Chat.GROUP, Chat.SUPERGROUP)
def is_private(chat: Chat) -> bool:
return chat.type == Chat.PRIVATE
def is_admin(user: Union[User, int], chat: Chat) -> bool:
test = False
if isinstance(user, User):
test = user in [member.user for member in chat.get_administrators()]
elif isinstance(user, int): # user is a user_id
test = user in get_administrators_id(chat)
return test
#REFACTOR should be a function from mentions to users and from users to ids?
def ids_from_mentions(mentions: PARSED_MESSAGEENTITIES) -> list[int]:
ids: list[int] = []
mention:MessageEntity
for mention in mentions:
ids.append(mention.user.id)
return ids
def insert_task(update: Update, context: CallbackContext) -> int: # TODO refactor for gettin externally the command, the message, the place and the participants?
message = update.message
task_kind: str = command_name_from_message(message)[1:] # warn! assumes len(prefix) == 1
chat_id: int = update.effective_chat.id #NOTE TBD permit use of a h/cashtag for signaling that the task is in either place?
mentions = get_mentions(message)
participants: set[User] = get_users(mentions, context)
participants_id: set[int] = get_ids_of(participants)
place_name = db.place_from_group_id(chat_id)
# insert in db
inserted_task_id: int = db.insert_task_in_db(task_type=task_kind, chat_id=chat_id,
participants=participants_id,
place_name=place_name, message_id=message.message_id,
message_text=message.text, requesting_user_id=message.from_user.id)
return inserted_task_id
def insert_confirmation_in_db(inserted_task_id:int, message:Message):
db.creation_confirm_message(inserted_task_id, message.message_id, message.text)
def get_users(mentions: PARSED_MESSAGEENTITIES, context, allow_bots=False) -> set[User]:
"""unique users in the mentions, by default filter away bots"""
users: set = set()
registered_users: set = context.bot_data[USERS]
mention: MessageEntity
for mention, username in mentions.items():
if mention.user:
user = mention.user
else:
username = username.lstrip("@")
try:
user = list(filter(lambda user: user.username == username, registered_users))[0]
except IndexError: # do not add not registered users
continue
if user and (allow_bots or not user.is_bot):
users.add(user)
return users
def get_mentions(message: Message) -> PARSED_MESSAGEENTITIES:
"""get all mentions and textmentions in the text and the caption.
returns a dictionary of the same type of Message.parse_entities() type"""
# mix of parse_caption_entities, parse_entities and MENTION, TEXT_MENTION
mentions_caption = message.parse_caption_entities(ANY_MENTION)
mentions_textmessage = message.parse_entities(ANY_MENTION)
mentions = mentions_textmessage | mentions_caption
return mentions
def get_ids_of(user_list: Union[set[User],list[User]]) -> set[int]:
"""ids of the user list passed"""
return set(map(lambda user: user.id, user_list))
def get_administrators_id(chat: Chat) -> set[int]: #util
return set(map(lambda admin: admin.id, chat.get_administrators()))
def export_db(file_path, file_type):
"""create file to export"""
# def write_odf(file_path):
# TODO
# raise NotImplementedError
def write_sql(file_path):
sql = db.dump()
with open(file_path, 'x') as file_to_write:
file_to_write.writelines(sql)
def write_xlsx(file_path):
tables = db.get_db_table_names()
with Workbook(file_path) as workbook:
for table in tables:
worksheet = workbook.add_worksheet(table)
records = db.get_table_content(table)
column_names = db.get_table_columns(table) #TODO control relative order of column names and values
for col_n, column_name in enumerate(column_names): # write column headers
worksheet.write(0, col_n, column_name)
for row_n, record in enumerate(records):
for col_n, entry in enumerate(record): #write values
worksheet.write(row_n + 1, col_n, entry) #TODO manage different datatypes
writer = { 'sql': write_sql,
'xlsx': write_xlsx}
# 'odf': write_odf}
try:
writer[file_type](file_path)
except KeyError:
raise NotImplementedError
def is_from_admin(message: Message) -> bool:
return is_admin(user=message.from_user, chat=message.chat)
def delete_task(*, deleting_message: Message, original_message: Optional[Message]=None, task_id:int=0,) -> int:
if not task_id: # call with original message ids
return db.delete_task_in_db(deleting_message_id=deleting_message.message_id,
deleting_chat_id=deleting_message.chat.id,
deleting_message_text=deleting_message.text,
original_message_id=original_message.message_id if original_message else 0,
original_chat_id=original_message.chat.id if original_message else 0,
deletion_request_by_id=deleting_message.from_user.id)
else: # call with task_id
return db.delete_task_in_db(deleting_message_id=deleting_message.message_id,
deleting_chat_id=deleting_message.chat.id,
deleting_message_text=deleting_message.text,
deletion_request_by_id=deleting_message.from_user.id,
task_id=task_id)
def command_name_from_message(message: Message) -> str:
"""return the bot command used, complete with prefix"""
entity: dict = message.parse_entities([MessageEntity.BOT_COMMAND])
name = list(entity.values())[0]
return name
def register_group_as_place(group_id: int, place_name:str):
db.insert_new_group(group_id=group_id, place=place_name)
def list_places() -> list[str]:
return db.list_places()
def get_all_users_from_update(update):
#TODO move logic from event_handlers
raise NotImplementedError
def get_stats(top_n_contributors: int, n_of_periods: int, period: Literal['day','week','month','year','all'], # *PERIODS],
bounties_only:bool=False) -> list[str, int]:
return db.get_top_contributors(top_n_contributors, period, n_of_periods, bounties_only)
def reverse_dict(dictionary:dict) -> dict:
"""dictionary with value:key"""
return dict(map(lambda kv_pair: (kv_pair[1], kv_pair[0]), dictionary.items()))
def dict_type_casting(dictionary: dict, type_generator) -> dict:
cast = type_generator
return dict(map(lambda kv_pair: (cast(kv_pair[0]), cast(kv_pair[1])), dictionary.items()))