taskbot/app/helpers.py

191 lines
7.2 KiB
Python

"""business logic and database"""
from collections import defaultdict # , namedtuple
from sqlite3 import Error as DBError
from typing import Optional, Union
from telegram import Chat, ChatMember, Message, MessageEntity, Update, User
from telegram.ext import CallbackContext
import database_interface as db
InternaldictType = dict[MessageEntity, str]
ClassifiedEntitiesdictType = dict[str, list[InternaldictType]]
# def classify_entities(entities: InternaldictType) -> ClassifiedEntitiesdictType:
# """classify entities by type and returns a dictionary with types as keys"""
# internaldict: InternaldictType = InternaldictType()
# internaldictkeys = ("entity", "text")
# entities_bytype: ClassifiedEntitiesdictType = defaultdict(list)
# for entity, text in entities.items():
# internaldict = dict(zip(internaldictkeys,
# [entity, text]))
# entities_bytype[entity.type].append(internaldict)
# return entities_bytype
def is_group(chat: Chat) -> bool:
return chat.type in (Chat.GROUP, Chat.SUPERGROUP)
def is_private(chat: Chat) -> bool:
return chat.type == Chat.PRIVATE
def extract_entities_text(type_name: str, *, message: Message, entities: ClassifiedEntitiesdictType) -> list[str]:
if not entities:
entities = classify_entities(message.entities)
entities_of_type = entities[type_name]
return [entity["text"] for entity in entities_of_type]
def telegram_id_of(reference: Union[MessageEntity, User], nobot: bool = True) -> Optional[int]:
"""numerical telegram id of mention if it is not a bot"""
if type(reference) == MessageEntity:
user = reference.user
elif type(reference) != User:
raise NotImplementedError("supported only MessageEntity and User argument type")
user = reference
return user.id if not user.is_bot else None
def person_id_of(telegram_id: int) -> int:
"""for stats reports"""
raise NotImplementedError
def insert_task(update: Update) -> int: # TODO refactor for gettin externally the command, the message, the place and the participants?
message = update.message
if not is_group(update.effective_chat):
raise ValueError
# split command and determine participants, task type and place
entities_bytype = classify_entities(message.parse_entities()) # TODO include captions to images etc"or message.parse_caption_entities())"
task_kind: str = entities_bytype[MessageEntity.BOT_COMMAND][0]["text"][1:]
chat_id: int = update.effective_chat.id #NOTE TBD permit use of a h/cashtag for signaling that the task is in either place?
# participants_id: list[int] = list()
# for mention in entities_bytype[MessageEntity.MENTION]:
# user = mention["entity"].user
# if user and not user.is_bot:
# try:
# participants_id.append(user.id)
# except AttributeError: # the user does not exists
# pass
participants: list[User] = get_participants(entities_bytype)
print(entities_bytype)
participants_id: list[int] = get_ids_of(participants)
participants_id_unique: list[int] = remove_duplicates(participants_id)
print(f"participants: {[participant.first_name for participant in participants]}")
print(f"id: {participants_id}")
# insert in db
inserted_task_id: int = db.insert_task_in_db(task_type=task_kind, chat_id=chat_id,
participants=participants_id_unique, message=message)
return inserted_task_id
def remove_duplicates(in_list: Union[list, tuple]) -> list: # to export
return list(set(in_list))
def get_participants(entities_bytype: ClassifiedEntitiesdictType) -> list[User]:
"""get real, unique users form a list of classified"""
participants: list = []
participants.extend(entities_bytype[MessageEntity.MENTION])
participants.extend(entities_bytype[MessageEntity.TEXT_MENTION])
print(f"mentions: {entities_bytype[MessageEntity.MENTION]}")
print(f"text_mentions: {entities_bytype[MessageEntity.TEXT_MENTION]}")
users = [ participant["entity"].user for participant in participants]
#users = map(lambda d_entity_text: d_entity_text["entity"].user , participants)
real_users = [user for user in users if user and not user.is_bot] # remove inexistent and bot users
unique_users = remove_duplicates([user for user in real_users if user])
return unique_users
def get_mentions(message: Message) -> dict[MessageEntity, str]:
"""get all mentions and textmentions in the text and the caption.
returns a dictionary of the same type of Message.parse_entities() type"""
# mix of parse_caption_entities, parse_entities and MENTION, TEXT_MENTION
# done like this and not with [MessageEntity.MENTION, MessageEntity.TEXT_MENTION] because I no longer trust the parse function
# caption
mentions_caption_at = message.parse_caption_entities(MessageEntity.MENTION)
mentions_caption_textlike = message.parse_caption_entities(MessageEntity.TEXT_MENTION)
mentions_caption = mentions_caption_at | mentions_caption_textlike
# text message
mentions_textmessage_at = message.parse_entities(MessageEntity.MENTION)
mentions_textmessage_textlike = message.parse_entities(MessageEntity.TEXT_MENTION)
mentions_textmessage = mentions_textmessage_at | mentions_textmessage_textlike
mentions = mentions_text | mentions_at
return mentions
def get_ids_of(user_list: list[User]) -> list[int]:
return list(map(lambda user: user.id, user_list))
def get_administrators_id(chat: Chat) -> list[int]: #util
return [admin.id for admin in chat.get_administrators()]
def is_admin(user: Union[User, int], chat: Chat) -> bool:
test = False
if type(user) == User:
test = user in [member.user for member in chat.get_administrators()]
elif type(user) == int: # user is a user_id
test = user in get_administrators_id(chat)
return test
def dump_db(dbs: list[str]) -> str:
#return db.dump(dbs)
raise NotImplementedError
def is_from_admin(message: Message) -> bool:
return is_admin(user=message.from_user, chat=message.chat)
def delete_task(*, deleting_message: Message, original_message: Message, task_id: int=0,) -> int:
return db.delete_task_in_db(deleting_message_id=deleting_message.message_id,
deleting_chat_id=deleting_message.chat.id,
deleting_message_text=deleting_message.text,
original_message_id=original_message.message_id,
original_chat_id=original_message.chat.id)
def command_name_from_message(message: Message) -> str:
return classify_entities(message.entities)[MessageEntity.BOT_COMMAND][0]["text"]
def place_name_from_message(message: Message) -> str:
command = command_name_from_message(message)
return message.text.replace(command, "").casefold().strip()
def register_group_as_place(message: Message) -> bool:
success = True
place_name = place_name_from_message(message)
db.insert_new_group(group_id=message.chat.id, place=place_name)
return success
def list_places() -> list[str]:
return db.list_places()