generated from dagur/asm
Initial commit
This commit is contained in:
commit
667f1f6d35
|
@ -0,0 +1,145 @@
|
|||
from aiogram import executor, Dispatcher, Bot, types
|
||||
from aiogram.types import ParseMode
|
||||
from config import BOT_TOKEN, TG_ID, API_KEY
|
||||
from pathlib import Path
|
||||
|
||||
import re
|
||||
import requests
|
||||
import json
|
||||
import io
|
||||
import sqlite3
|
||||
|
||||
app = Bot(BOT_TOKEN)
|
||||
dp = Dispatcher(app)
|
||||
|
||||
current_dir = Path().resolve()
|
||||
users = {}
|
||||
ip_regex = r'^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$'
|
||||
admin = TG_ID
|
||||
|
||||
|
||||
# Base de dados
|
||||
class DataBase:
|
||||
def __init__(self) -> None:
|
||||
self.db = sqlite3.connect(current_dir / "data.db")
|
||||
self.cursor = self.db.cursor()
|
||||
self.cursor.execute("CREATE TABLE IF NOT EXISTS users (user_id INT, available_uses INT)")
|
||||
self.db.commit()
|
||||
|
||||
def save_data(self, userid, uses) -> None:
|
||||
self.cursor.execute(f"INSERT INTO users ('user_id', 'available_uses') VALUES ('{int(userid)}', '{int(uses)}')")
|
||||
self.db.commit()
|
||||
|
||||
def update_data(self, userid, uses) -> None:
|
||||
self.cursor.execute(f"UPDATE users SET available_uses={uses} WHERE user_id={userid}")
|
||||
self.db.commit()
|
||||
|
||||
def read_data(self) -> dict:
|
||||
self.cursor.execute(f"SELECT * FROM users")
|
||||
for id, data in self.cursor.fetchall():
|
||||
if int(id) not in users.keys():
|
||||
users[int(id)] = int(data)
|
||||
|
||||
def exec(self, cmd) -> list:
|
||||
self.cursor.execute(cmd)
|
||||
return self.cursor.fetchall()
|
||||
|
||||
|
||||
async def get_user_id(usrname) -> int:
|
||||
usr = await app.get_chat(usrname)
|
||||
return usr.id
|
||||
|
||||
|
||||
@dp.message_handler(commands=['data'])
|
||||
async def data(message: types.Message):
|
||||
MSG = f"""
|
||||
Users: {len(users)}
|
||||
"""
|
||||
await message.reply(MSG)
|
||||
|
||||
|
||||
@dp.message_handler(commands=['execute_sql'])
|
||||
async def execsql(message: types.Message):
|
||||
if message.chat.id == admin:
|
||||
msg = message.text.replace('/execute_sql', '').strip()
|
||||
data = DataBase().exec(msg)
|
||||
await message.reply(data)
|
||||
|
||||
|
||||
@dp.message_handler(commands=['get_data'])
|
||||
async def get_data(message: types.Message):
|
||||
if message.chat.id == admin:
|
||||
user = await app.get_chat(chat_id=message.text.replace('/get_data', '').strip())
|
||||
await message.reply(user.id)
|
||||
|
||||
|
||||
@dp.message_handler(commands=['start'])
|
||||
async def start(message: types.Message):
|
||||
if message.chat.id not in users:
|
||||
users[message.chat.id] = 10
|
||||
HELLOMSG = f"Olá, {message.from_user.first_name}!\nEste bot foi criado para checar o local de origem de IP. Envie um IP para mim que retornarei com a consulta."
|
||||
await message.reply(HELLOMSG)
|
||||
|
||||
|
||||
@dp.message_handler(commands=['renew'])
|
||||
async def renew(message: types.Message):
|
||||
user = int(message.text.replace('/renew', '').strip())
|
||||
if message.chat.id == admin:
|
||||
if user not in users.keys():
|
||||
await message.reply('Usuário não está na lista de usuários.')
|
||||
else:
|
||||
users[user] += 10
|
||||
DataBase().update_data(user, users[user])
|
||||
await app.send_message(user, f'Suas consultas foram renovadas. Você tem direito a mais {users[user]}')
|
||||
|
||||
|
||||
@dp.message_handler()
|
||||
async def check_ip(message: types.Message):
|
||||
if message.chat.id not in users:
|
||||
users[message.chat.id] = 10
|
||||
DataBase().save_data(message.chat.id, users[message.chat.id])
|
||||
|
||||
match = re.match(ip_regex, message.text)
|
||||
if match:
|
||||
if users[message.chat.id] != 0:
|
||||
users[message.chat.id] = users[message.chat.id] - 1
|
||||
DataBase().update_data(message.chat.id, users[message.chat.id])
|
||||
r = requests.get(f'https://api.ipgeolocation.io/ipgeo?apiKey={API_KEY}&ip={match.group(0)}')
|
||||
bytesobj = r.content
|
||||
value = bytesobj.replace(b"'", b'"')
|
||||
fdata = json.load(io.BytesIO(value))
|
||||
# img = requests.get(fdata['country_flag'])
|
||||
link_gmaps = f'https://www.google.com/maps/search/?api=1&query={fdata["latitude"]}%2C{fdata["longitude"]}'
|
||||
lmsg = f'''
|
||||
Aqui estão os dados:
|
||||
|
||||
IP: {fdata['ip']}
|
||||
Código do País: {fdata['country_code3']}
|
||||
País: {fdata['country_name']}
|
||||
Capital: {fdata['country_capital']}
|
||||
Estado: {fdata['state_prov']}
|
||||
Cidade: {fdata['city']}
|
||||
Distrito: {fdata['district']}
|
||||
CEP/ZIP: {fdata['zipcode']}
|
||||
Operadora: {fdata['isp']}
|
||||
Tipo de Conexão: {fdata['connection_type']}
|
||||
Latitude: {fdata['latitude']}
|
||||
Longitude: {fdata['longitude']}
|
||||
Fuso: {fdata['time_zone']['name'], fdata['time_zone']['offset']}
|
||||
Moeda: {fdata['currency']['code']}
|
||||
Google Maps: <a href="{link_gmaps}">neste link</a>
|
||||
|
||||
Você ainda tem direito a {users[message.chat.id]} consultas gratuitas.
|
||||
'''
|
||||
await message.reply(lmsg, parse_mode=ParseMode.HTML, disable_web_page_preview=True)
|
||||
|
||||
else:
|
||||
await message.reply("Você não tem mais créditos disponíveis para consulta. Entre em contato com @lolzaws(free).")
|
||||
|
||||
else:
|
||||
await message.reply("Não é um IP válido.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
DataBase().read_data()
|
||||
executor.start_polling(dp)
|
|
@ -0,0 +1,10 @@
|
|||
# Your Telegram bot token
|
||||
BOT_TOKEN = ''
|
||||
|
||||
# API key from any IP lookup service
|
||||
# You may need to change the code depending on
|
||||
# The service API data returned.
|
||||
API_KEY = ''
|
||||
|
||||
# Your Telegram user ID
|
||||
TG_ID = 0
|
|
@ -0,0 +1,2 @@
|
|||
aiogram
|
||||
requests
|
Loading…
Reference in New Issue