This commit is contained in:
Lorenzo Carbonell 2024-02-11 20:15:21 +01:00
parent b14118e4d4
commit ebdc7ecc91
No known key found for this signature in database
GPG Key ID: B5E8FC9484B82CA9
4 changed files with 0 additions and 205 deletions

View File

@ -8,13 +8,9 @@ readme = "README.md"
[tool.poetry.dependencies]
python = "^3.11"
httpx = "^0.26.0"
python-dotenv = "^1.0.1"
docker = "^7.0.0"
[tool.poetry.group.dev.dependencies]
pytest = "^8.0.0"
pytest-asyncio = "^0.23.5"
[build-system]
requires = ["poetry-core"]

View File

@ -1,85 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2024 Lorenzo Carbonell <a.k.a. atareao>
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Main process
"""
import asyncio
import logging
import os
import sys
import docker
from dotenv import load_dotenv
from telegram import Telegram
logging.basicConfig(
stream=sys.stdout,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.DEBUG,
)
logger = logging.getLogger(__name__)
async def main():
"""Main function
"""
logger.debug("main")
load_dotenv()
url = os.getenv("TELEGRAM_URL")
token = os.getenv("TELEGRAM_TOKEN")
chat_id = os.getenv("TELEGRAM_CHAT_ID")
thread_id = os.getenv("TELEGRAM_THREAD_ID")
if url and token and chat_id and thread_id:
chat_id = int(chat_id)
thread_id = int(thread_id)
telegram_client = Telegram(url, token)
client = docker.DockerClient(
base_url="unix:///run/user/1000/docker.sock")
for event in client.events(decode=True):
message = process_event(event)
if message:
await telegram_client.send_message(chat_id, thread_id, message)
def process_event(event: dict) -> str:
"""Process event
Args:
event (dict): The event
Returns:
str: A message to send to Telegram
"""
logger.debug("process_event: %s", event)
if "Type" in event and "Action" in event and event["Type"] == "container" \
and event["Action"] == "kill":
name = event["Actor"]["Attributes"]["name"]
return f"Container *{name}* was killed"
return None
if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()

View File

@ -1,84 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2024 Lorenzo Carbonell <a.k.a. atareao>
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""A module for telegram
"""
import logging
from httpx import AsyncClient
logger = logging.getLogger(__name__)
class TelegramException(Exception):
"""Telegram exception
Args:
Exception (_type_): exception
"""
class Telegram:
"""A class to work with Telegram
"""
def __init__(self, url: str, token: str):
"""Init Telegram class
Args:
url (str): base url for telegram
token (str): token for bot
"""
logger.info("__info__")
self._url = f"https://{url}/bot{token}"
self._headers = {
"Accept": "application/json",
"Content-Type": "application/json"
}
async def send_message(self, chat_id: int, thread_id: int, message: str):
"""Send message to Telegram
Args:
chat_id (int): id for chat
thread_id (int): id for thread
message (str): message to send
Returns:
_type_: _description_
"""
logger.info("send_message")
url = f"{self._url}/sendMessage"
payload = {
"chat_id": chat_id,
"message_thread_id": thread_id,
"parse_mode": "markdown",
"text": message
}
logger.debug(payload)
async with AsyncClient() as client:
response = await client.post(
url, headers=self._headers, json=payload
)
logger.debug(response)
if response.status_code == 200:
return response.json()

View File

@ -1,32 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests
"""
import os
import pytest
from dotenv import load_dotenv
from src.telegram import Telegram
pytest_plugins = ("pytest_asyncio",)
@pytest.mark.asyncio
async def test_telegram():
"""Test telegram class
"""
load_dotenv()
url = os.getenv("TELEGRAM_URL")
token = os.getenv("TELEGRAM_TOKEN")
chat_id = os.getenv("TELEGRAM_CHAT_ID")
thread_id = os.getenv("TELEGRAM_THREAD_ID")
if url and token and chat_id and thread_id:
telegram = Telegram(url, token)
chat_id = int(chat_id)
thread_id = int(thread_id)
message = "Esto es una prueba"
response = await telegram.send_message(chat_id, thread_id, message)
else:
response = None
assert response is not None
assert response != ""