This repository has been archived on 2024-02-09. You can view files and clone it, but cannot push or open issues or pull requests.
scel-buc/scripts/get_messages.py

105 lines
3.4 KiB
Python
Executable File

#!/bin/env python3
# scel-buc - scripts to send and receive messages through a Telegram bot
# Copyright (C) 2023 bursa-pastoris
#
# This file is part of scel-buc.
#
# scel-buc is free software: you can redistribute it and/or modify it under the
# terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# scel-buc is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License
# along with scel-buc. If not, see <https://www.gnu.org/licenses/>.
import urllib.request
import json
from datetime import datetime as dt
from constants import *
def write_msg(msg):
# If a name is registered for the sender it is used, otherwise a "name" is
# simulated with sender's Telegram ID
registered_sender = None
sender_id = str(msg['from']['id'])
if sender_id in USER_NAMES:
sender_name = USER_NAMES[sender_id]
registered_sender = True
else:
sender_name = 'user:'+sender_id
registered_sender = False
# Group log files are named after the group, single chat log files are
# named after the sender (be it registered or not)
chat_id = str(msg['chat']['id'])
if 'title' in msg['chat']:
# Chat is group chat.
# If a name is registered for the chat it is used, otherwise the group
# chat title from Telegram is used.
if chat_id in GROUP_NAMES:
file_id = f'GRP-{chat_id}-'+GROUP_NAMES[chat_id]
else:
file_id = f'GRP-{chat_id}-'+str(msg['chat']['title'])
elif registered_sender:
# Chat is single chat with a registered user
file_id = f'SIN-{chat_id}-{sender_name}'
else:
# Chat is single chat with unregistered user
file_id = f'SIN-{chat_id}'
# Get message txt...
if 'text' in msg:
text = msg['text']
else:
text = '[unsupported message type]'
# ...and date.
time = dt.strftime(dt.utcfromtimestamp(msg['date']), '%Y-%m-%d %H:%M:%S')
# Write the message
message = f'[{time}] {sender_name}\n{text}\n'
with open(CHAT_PATH+file_id, 'a') as f:
f.write('\n')
f.write(message)
return file_id
# Connect to Telegram API
with urllib.request.urlopen(f'https://api.telegram.org/bot{TOKEN}/getMe') as request:
data = json.loads(request.read())
bot = data['result']
bot_name = '@'+bot['username']
print(f'Getting messages for {USER} from {bot_name}... ', end='', flush=True)
# Get new messages
with urllib.request.urlopen(f'https://api.telegram.org/bot{TOKEN}/getUpdates?offset={LAST_MSG+1}') as request:
data = json.loads(request.read())
messages = data['result']
# Write them
updated_chats = set()
for i in messages:
if 'message' in i:
updated_chats.add(write_msg(i['message'])) # write_msg returns
# file_id
LAST_MSG = i['update_id']
print('Done!')
if len(updated_chats) > 0:
print('New messages in the following chats:\n- {chats}'.format(
chats = '\n- '.join(updated_chats)))
else:
print('No new messages.')
# Save status
with open('.last_msg', 'w') as f:
f.write(str(LAST_MSG))