Finish updating bwcli

This commit is contained in:
Théophile Diot 2023-04-25 15:17:37 +02:00
parent 94b97a6bb9
commit 9d829ebca1
No known key found for this signature in database
GPG Key ID: E752C80DB72BB014
2 changed files with 42 additions and 25 deletions

View File

@ -154,14 +154,21 @@ class CLI(ApiCaller):
return False, "error"
def bans(self) -> Tuple[bool, str]:
bans = {}
servers = {}
ret, resp = self._send_to_apis("GET", "/bans", response=True)
if not ret:
return False, "error"
for k, v in resp.items():
servers[k] = v.get("data", [])
if self.__redis:
bans["redis"] = []
servers["redis"] = []
for key in self.__redis.scan_iter("bans_ip_*"):
ip = key.decode("utf-8").replace("bans_ip_", "")
exp = self.__redis.ttl(key)
bans["redis"].append(
servers["redis"].append(
{
"ip": ip,
"exp": exp,
@ -169,18 +176,15 @@ class CLI(ApiCaller):
}
)
ret, resp = self._send_to_apis("GET", "/bans", response=True) # TODO: fix this
if not ret:
return False, "error"
cli_str = ""
for server, bans in servers.items():
cli_str += f"List of bans for {server}:\n"
if not bans:
cli_str += "No ban found\n"
print(resp, flush=True) # TODO: handle response
for ban in bans:
cli_str += f"- {ban['ip']} for {format_remaining_time(ban['exp'])} : {ban.get('reason', 'no reason given')}\n"
else:
cli_str += "\n"
# bans.extend(resp.get("data", []))
# if len(bans) == 0:
# return True, "No ban found"
# cli_str = "List of bans :\n"
# for ban in bans:
# cli_str += f"- {ban['ip']} for {format_remaining_time(ban['exp'])} : {ban.get('reason', 'no reason given')}\n"
# return True, cli_str
return True, cli_str

View File

@ -2,7 +2,7 @@ from io import BytesIO
from os import getenv
from sys import path as sys_path
from tarfile import open as taropen
from typing import Optional
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
if "/usr/share/bunkerweb/utils" not in sys_path:
sys_path.append("/usr/share/bunkerweb/utils")
@ -18,8 +18,8 @@ from docker import DockerClient
class ApiCaller:
def __init__(self, apis=[]):
self.__apis = apis
def __init__(self, apis: List[API] = None):
self.__apis = apis or []
self.__logger = setup_logger("Api", getenv("LOG_LEVEL", "INFO"))
def auto_setup(self, bw_integration: Optional[str] = None):
@ -101,14 +101,22 @@ class ApiCaller:
)
)
def _set_apis(self, apis):
def _set_apis(self, apis: List[API]):
self.__apis = apis
def _get_apis(self):
return self.__apis
def _send_to_apis(self, method, url, files=None, data=None, response=False):
def _send_to_apis(
self,
method: Union[Literal["POST"], Literal["GET"]],
url: str,
files: Optional[Dict[str, BytesIO]] = None,
data: Optional[Dict[str, Any]] = None,
response: bool = False,
) -> Tuple[bool, Tuple[bool, Optional[Dict[str, Any]]]]:
ret = True
responses = {}
for api in self.__apis:
if files is not None:
for buffer in files.values():
@ -130,13 +138,18 @@ class ApiCaller:
f"Successfully sent API request to {api.get_endpoint()}{url}",
)
if response:
instance = api.get_endpoint().replace("http://", "").split(":")[0]
if isinstance(resp, dict):
responses[instance] = resp
else:
responses[instance] = resp.json()
if response:
if isinstance(resp, dict):
return ret, resp
return ret, resp.json()
return ret, responses
return ret
def _send_files(self, path, url):
def _send_files(self, path: str, url: str) -> bool:
ret = True
with BytesIO() as tgz:
with taropen(