taskbot/app/database_interface.py

143 lines
4.4 KiB
Python

import sqlite3
from sqlite3 import Error as DBError
from itertools import repeat
from telegram import Message
# db connections
task_db: sqlite3.Connection = sqlite3.connect('database/task.sqlite3',check_same_thread=False)
people_db: sqlite3.Connection = sqlite3.connect('database/people.sqlite3',check_same_thread=False)
def place_from_group_id(group_id: int, *, db=task_db) -> str:
result = db.execute("SELECT place \
FROM places_group_id \
WHERE telegram_group_id = ? \
ORDER BY inserted DESC \
LIMIT 1",
(group_id,)
).fetchone()
place_name = result[0] if result else ""
return place_name
def insert_task_in_db(*, db: sqlite3.Connection = task_db,
task_table="task", participants_table="participants",
task_type: str, chat_id: int, participants: list[int], message:Message) -> int:
"""insert task in db and returns the task id"""
#TODO rethink interface for decoupling
#try:
place = place_from_group_id(message.chat.id)
#except sqlite3.Error as e:
#TODO
# raise e
#TODO check existing
insert_tuple = (task_type, place, message.text, message.message_id)
task_insertion = f"""INSERT INTO {task_table}
(type, place, creation_message, creation_message_id, creation_chat_id
)
VALUES
(?,?,?,?,?)""",\
(task_type, place, message.text, message.message_id, chat_id)
db.commit()
#try:
db.execute(*task_insertion)
db.commit()
#except sqlite3.Error as e:
#TODO
#raise e
try:
task_id = db.execute("SELECT last_insert_rowid();").fetchone()[0]
except sqlite3.Error as e:
#TODO
raise e
participant_insertion = [(task_id, participant) for participant in participants]
try:
db.executemany(f"INSERT INTO {participants_table} VALUES (?,?)", participant_insertion)
except sqlite3.Error as e:
#TODO
raise e
return task_id
def task_id_from_message(original_message_id: int, original_chat_id: int,
db: sqlite3.Connection=task_db) -> int:
task_id_query = db.execute("""
SELECT id
FROM task
WHERE creation_message_id = ? AND
creation_chat_id = ? ;""",
(original_message_id, original_chat_id)
).fetchone()
task_id = task_id_query[0]
return task_id
def delete_task_in_db(*, deleting_message_id: int, deleting_chat_id: int, deleting_message_text: str,
original_message_id: int, original_chat_id: int, task_id: int=-1,
db=task_db) -> int:
"""mark deleted (update) in db the task specified.
you can specify a task by referring by task_id or by the creation message.
returns the id of the task deleted"""
db.execute("""
UPDATE task
SET deleted = TRUE,
deletion_time = CURRENT_TIME,
deletion_date = CURRENT_DATE,
deletion_message = ? ,
deletion_message_id = ? ,
deletion_chat_id = ?
WHERE (creation_message_id = ? AND
creation_chat_id = ?)
OR id = ? ;""",
(deleting_message_text, deleting_message_id, deleting_chat_id,
original_message_id, original_chat_id, task_id))
db.commit()
#fetch the id of the last task deleted
if not task_id:
task_id: int = task_id_from_message(original_message_id=original_message_id,
original_chat_id=original_chat_id)
return task_id
def fetch_person_id_from_telegram_id(*, telegram_id: int, db=people_db) -> int:
raise NotImplementedError
def insert_new_group(*, group_id:int, place:str, db=task_db) -> bool:
db.execute("INSERT INTO places_group_id (place, telegram_group_id) \
VALUES (?,?);",
(place, group_id)
)
db.commit()
return True
def list_places(db=task_db) -> list[str]:
place_tuples = db.execute("""
SELECT DITINCT place
FROM places_group_id
""").fetchall()
return [place[0] for place in place_tuples]