/stats implementation corrections

2021-01-17 20:55:59 +01:00
11 changed files with 209 additions and 75 deletions

@ -0,0 +1,15 @@
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "app/main.py",
"console": "integratedTerminal"

install:
$(PIPENV) install
# $(PIP) install -r requirements.txt -r requirements-dev.txt
$(PIPENV) install
@echo "Available targets:"
@echo "- clean Clean up the source directory"
@echo "- pep8 Check style with flake8"
@echo "- lint Check style with pylint"
@echo "- black Check style with black"
@echo "- mypy Check type hinting with mypy"
@echo "- test Run tests using pytest"
@echo "- ensure_pipenv
@echo "Available variables:"
@echo "- PYLINT default: $(PYLINT)"
@echo "- PYTEST default: $(PYTEST)"
@echo "- PEP8 default: $(PEP8)"
@echo "- BLACK default: $(BLACK)"
@echo "- MYPY default: $(MYPY)"
@echo "- PIP default: $(PIP)"
@echo "- PIPENV default: $(PIPENV)"
@echo "pipenv already installed in `which pipenv`, pip already installed in `which pip`. Nothing to do." || \
@ -71,4 +71,12 @@ db:
#TODO (supervisor)
rm -rf database/*
rm -rf logs/app/*
rm -rf logs/database/*
free_space: del_db_exports, del_logfiles

@ -3,7 +3,7 @@ import sqlite3
from itertools import repeat
from sqlite3 import Error as DBError
from sqlite3 import Row
from typing import Literal, Union, Optional
# db connections
task_db: sqlite3.Connection = sqlite3.connect('database/task.sqlite3',check_same_thread=False)
def get_table_content(name:str, *, db=task_db) -> list[tuple[str]]:
def get_table_columns(table_name, *, db=task_db) -> list[str]:
columns_query_result: tuple[Row] = db.execute(f"""
PRAGMA table_info(?)""", (table_name,))
return [column[0] for columnn in columns_query_result]
PRAGMA table_info({table_name})""")
return [column[0] for column in columns_query_result]
PERIODS = ['day','week','month','year','all'] #TODO put in constants, shared with text_commands
def get_top_contributors( top_n: int, period: Literal['day','week','month','year','all'], # *PERIODS],
bounties_only:bool=False, *,db=task_db) -> dict[str,str]:
is_sunday = bool(db.execute("SELECT DATE('now') = DATE('now', 'weekday 0')").fetchone()[0])
# if is_monday:
# do_console(garfield)
period_base_date = {
'day': "'now', 'start of day'",
'week': "'now', 'weekday 0'" + ("" if is_sunday else ", '-7 days'"), # start of week
'month': "'now', 'start of month'",
'year': "'now', 'start of year'",
if period != 'all':
if last_n_periods == 0: # today , this month etc...
end_date = "'now'"
begin_date = period_base_date[period]
end_date = period_base_date[period] + "'-1 day'"
begin_date = end_date + (f", '-{last_n_periods} {period}'" if period != 'week' else \
f", '-{last_n_periods*7} day'")
date_condition = f""" AND task.creation_date >= (SELECT DATE({begin_date}))
AND task.creation_date <= (SELECT DATE({end_date}))"""
date_condition = ""
bounties_join = "JOIN bounties ON task.id=bounties.id" if bounties_only else ""
query = f"""
SELECT telegram_id, COUNT(telegram_id)
FROM participants
JOIN task
ON task.id=participants.task_id
WHERE task.deleted = FALSE
ORDER BY COUNT(telegram_id) DESC
query_results = db.execute(query, (top_n,))
return dict(query_results)

@ -4,13 +4,13 @@
from collections import defaultdict, namedtuple
from functools import singledispatch
from sqlite3 import Error as DBError
from typing import Optional, Union
from typing import Optional, Union, Literal
from xlsxwriter import Workbook
from telegram import Chat, ChatMember, Message, MessageEntity, Update, User
from telegram.ext import CallbackContext
import database_interface as db
import dbwrapper as db
import tempfile
return set(map(lambda admin: admin.id, chat.get_administrators()))
return set(map(lambda admin: admin.id, chat.get_administrators()))
def write_sql(file_to_write):
sql = db.dump()
return file_to_write
def export_db(file_path, file_type):
"""create file to export"""
# def write_odf(file_path):
# raise NotImplementedError
def write_sql(file_path):
sql = db.dump()
with open(file_path, 'x') as file_to_write:
def write_xlsx(workbook: Workbook) -> Workbook:
#TODO decide if the file is managed from this layer or not
tables = db.get_db_table_names()
for table in tables:
worksheet = workbook.add_worksheet(table)
records = db.get_table_content(table)
column_names = db.get_table_columns(table) #TODO control relative order of column names and values
for col_n, column_name in enumerate(column_names): # write column headers
worksheet.write(0, col_n, column_name)
for row_n, record in enumerate(records):
for col_n, entry in enumerate(record): #write values
worksheet.write(row_n + 1, col_n, entry) #TODO manage different datatypes
return workbook
def write_xlsx(file_path):
tables = db.get_db_table_names()
with Workbook(file_path) as workbook:
for table in tables:
worksheet = workbook.add_worksheet(table)
records = db.get_table_content(table)
column_names = db.get_table_columns(table) #TODO control relative order of column names and values
for col_n, column_name in enumerate(column_names): # write column headers
worksheet.write(0, col_n, column_name)
for row_n, record in enumerate(records):
for col_n, entry in enumerate(record): #write values
worksheet.write(row_n + 1, col_n, entry) #TODO manage different datatypes
writer = { 'sql': write_sql,
'xlsx': write_xlsx}
# 'odf': write_odf}
except KeyError:
raise NotImplementedError
def is_from_admin(message: Message) -> bool:
return is_admin(user=message.from_user, chat=message.chat)
def get_all_users_from_update(update):
PERIODS = ['day','week','month','year','all'] #TODO put in constants, shared with text_commands
def get_stats(top_n_contributors: int, n_of_periods: int, period: Literal['day','week','month','year','all'], # *PERIODS],
bounties_only:bool=False) -> list[str, int]:
return db.get_top_contributors(top_n_contributors, period, n_of_periods, bounties_only)
def reverse_dict(dictionary:dict) -> dict:
"""dictionary with value:key"""
return dict(map(lambda kv_pair: (kv_pair[1], kv_pair[0]), dictionary.items()))
def dict_type_casting(dictionary: dict, type_generator) -> dict:
cast = type_generator
return dict(map(lambda kv_pair: (cast(kv_pair[0]), cast(kv_pair[1])), dictionary.items()))

@ -5,7 +5,6 @@ import sqlite3
from telegram.ext import CommandHandler, MessageHandler, Updater, PicklePersistence, Filters
import telegram.ext
import text_commands
import event_handlers
# bot run
if __name__ == "__main__":
# bot run
View File

@ -5,12 +5,11 @@ Functions here should be only for user interaction (messages etc).
import time
from functools import wraps
from itertools import repeat
from sqlite3 import Error as DBError
from textwrap import dedent
from typing import Optional, Union
from tempfile import NamedTemporaryFile
from xlsxwriter.workbook import Workbook
from telegram import Chat, ChatMember, Message, MessageEntity, Update, User, ChatAction
from telegram.ext import CallbackContext, Filters
def test(update: Update, context: CallbackContext) -> None:
def export_db(update: Update, context: CallbackContext) -> None:
"""exports db"""
message = update.message
path_dir = "database/exports/" #TODO put in config file
epoch = str(int(time.time()))
path = "database/exports/"
file_name = "db_export_" + epoch
file_name_xls = file_name + ".xlsx"
file_name_sql = file_name + ".sql"
sql_args = dict(zip(["sql", "sqlite", "sqlite3", "db", "database"], repeat("sql"))) # maps all to "sql"
xls_args = dict(zip(["xls", "xlsx", "excel", "excell"], repeat("xlsx")))
# odf_args = dict(zip(["odf", 'libreoffice', 'open'], repeat('odf'))) #TODO add open formats
all_args = sql_args | xls_args # | odf_args)
epoch_timestamp = str(int(time.time()))
file_name_without_extension = "db_export_" + epoch_timestamp
try: #test valid selection
file_format = all_args[context.args[0].lower()]
except KeyError: # invalid selection
f"Invalid command: try '/export_db' followed by one of: {', '.join(all_args)}. \nNothing done.")
#TODO logging
file_name = file_name_without_extension + '.' + file_format
file_path = path_dir + file_name
h.export_db(file_path, file_format) # file operation
with open(file_path, mode="rb") as file_to_send:
context.bot.send_chat_action(chat_id=update.message.chat_id, action=ChatAction.UPLOAD_DOCUMENT)
update.message.reply_document(document=file_to_send, filename=file_name, quote=True)
sql_args = ["sql", "sqlite", "sqlite3", "db", "database"]
xls_args = ["xls", "xlsx", "excel", "excell"] #TODO add open formats
all_args = [*sql_args, *xls_args]
# create and send files
if context.args[0].lower() in sql_args:
with open(path + file_name_sql, mode="x") as file_to_send:
file_to_send = h.write_sql(file_to_send)
with open(path + file_name_sql, mode='r') as file_to_send:
context.bot.send_chat_action(chat_id=message.chat_id, action=ChatAction.UPLOAD_DOCUMENT)
message.reply_document(document=file_to_send, filename=file_name_sql, quote=True)
elif context.args[0].lower() in xls_args:
with Workbook(path + file_name_xls) as workbook:
workbook = h.write_xlsx(workbook)
with open(path +file_name_xls, "rb") as xls_file:
context.bot.send_chat_action(chat_id=message.chat_id, action=ChatAction.UPLOAD_DOCUMENT)
message.reply_document(document=xls_file, filename=file_name_xls, quote=True)
message.reply_text(f"Invalid command: try {', '.join(all_args)}. \nNothing done.")
PERIODS = ['day','week','month','year','all'] #TODO put in constants, shared with text_commands
USERNAME_TO_USEROBJ_MAP = "username_to_userobj_map" #TODO transfer in config or constants
def stats(update: Update, context: CallbackContext) -> None:
"""stats about task done in a specific timeframe
/stats topN [bountyhunters]
[[day | today] | yesterday | Ndays | daysN |
[week | thisweek] | pastweek | Nweeks | weeksN |
[month | thismonth] | pastmonth | Nmonths | monthsN |
[year | thisyear] | pastyear |
[overall | alltimes]]
topN: get only the N top contributors
day: """ #TODO user guide
top_n = int(context.args[0].strip('top'))
period_to_parse = context.args[1]
only_bounties = 'bounty' in context.args[2]
except IndexError: # not third arg
only_bounties = False
period = list(filter(lambda period: period in period_to_parse, PERIODS))[0]
if 'past' in period_to_parse or 'yester' in period_to_parse:
period_multiplicator = 1
elif (digits := list(filter(lambda char: char.isdigit(), period_to_parse))): # contains a number?
period_multiplicator = int(''.join(map(lambda char: str(char), digits))) # concatenate numbers
else: # today, thismonth, etc...
period_multiplicator = 0
result_str: dict[str,str] = h.get_stats(top_n, period_multiplicator, period, only_bounties) # dict[id, task_count]
result: dict[int,int] = h.dict_type_casting(result_str, int)
except TypeError as e: # None, no results
update.message.reply_text("No results.")
raise e
reply = f"Top {top_n} {'bounty hunters:' if only_bounties else ':'}"
username_user_map:dict['str',User] = context.bot_data[USERNAME_TO_USEROBJ_MAP]
#TODO *URGENT* rethink the mapping in order to update based on id!
user_username_map:dict = h.reverse_dict(username_user_map)
id_user_map: dict = dict(map(lambda user: (user.id, user), user_username_map))
for user_id in result:
user:User = id_user_map[user_id]
reply += f"\n{user.mention_markdown_v2()}: {str(result[user_id])}"
def help(update: Update, context: CallbackContext) -> None:

