Small refactor on the ApiCaller and the Scheduler

This commit is contained in:
Théophile Diot 2023-05-30 11:49:14 -04:00
parent 1bd40a877a
commit aca0d6da48
No known key found for this signature in database
GPG Key ID: E752C80DB72BB014
5 changed files with 283 additions and 271 deletions

View File

@ -121,7 +121,7 @@ class CLI(ApiCaller):
):
# Docker & Linux case
super().__init__(
apis=[
[
API(
f"http://127.0.0.1:{self.__variables.get('API_HTTP_PORT', '5000')}",
host=self.__variables.get("API_SERVER_NAME", "bwapi"),
@ -142,8 +142,10 @@ class CLI(ApiCaller):
elif self.__variables.get("AUTOCONF_MODE", "no").lower() == "yes":
return "autoconf"
elif integration_path.is_file():
return integration_path.read_text().strip().lower()
elif os_release_path.is_file() and "Alpine" in os_release_path.read_text():
return integration_path.read_text(encoding="utf-8").strip().lower()
elif os_release_path.is_file() and "Alpine" in os_release_path.read_text(
encoding="utf-8"
):
return "docker"
return "linux"
@ -154,7 +156,7 @@ class CLI(ApiCaller):
if not ok:
self.__logger.error(f"Failed to delete ban for {ip} from redis")
if self._send_to_apis("POST", "/unban", data={"ip": ip}):
if self.send_to_apis("POST", "/unban", data={"ip": ip}):
return True, f"IP {ip} has been unbanned"
return False, "error"
@ -168,7 +170,7 @@ class CLI(ApiCaller):
if not ok:
self.__logger.error(f"Failed to ban {ip} in redis")
if self._send_to_apis("POST", "/ban", data={"ip": ip, "exp": exp}):
if self.send_to_apis("POST", "/ban", data={"ip": ip, "exp": exp}):
return (
True,
f"IP {ip} has been banned for {format_remaining_time(exp)}",
@ -178,7 +180,7 @@ class CLI(ApiCaller):
def bans(self) -> Tuple[bool, str]:
servers = {}
ret, resp = self._send_to_apis("GET", "/bans", response=True)
ret, resp = self.send_to_apis("GET", "/bans", response=True)
if not ret:
return False, "error"
@ -206,7 +208,6 @@ class CLI(ApiCaller):
for ban in bans:
cli_str += f"- {ban['ip']} for {format_remaining_time(ban['exp'])} : {ban.get('reason', 'no reason given')}\n"
else:
cli_str += "\n"
cli_str += "\n"
return True, cli_str

View File

@ -26,6 +26,14 @@ class ApiCaller:
self.__apis = apis or []
self.__logger = setup_logger("Api", getenv("LOG_LEVEL", "INFO"))
@property
def apis(self) -> List[API]:
return self.__apis
@apis.setter
def apis(self, apis: List[API]):
self.__apis = apis
def auto_setup(self, bw_integration: Optional[str] = None):
if bw_integration is None:
if getenv("KUBERNETES_MODE", "no") == "yes":
@ -105,13 +113,7 @@ class ApiCaller:
)
)
def _set_apis(self, apis: List[API]):
self.__apis = apis
def _get_apis(self):
return self.__apis
def _send_to_apis(
def send_to_apis(
self,
method: Union[Literal["POST"], Literal["GET"]],
url: str,
@ -155,7 +157,7 @@ class ApiCaller:
return ret, responses
return ret
def _send_files(self, path: str, url: str) -> bool:
def send_files(self, path: str, url: str) -> bool:
ret = True
with BytesIO() as tgz:
with tar_open(
@ -164,6 +166,6 @@ class ApiCaller:
tf.add(path, arcname=".")
tgz.seek(0, 0)
files = {"archive.tar.gz": tgz}
if not self._send_to_apis("POST", url, files=files):
if not self.send_to_apis("POST", url, files=files):
ret = False
return ret

View File

@ -36,10 +36,11 @@ class JobScheduler(ApiCaller):
def __init__(
self,
env: Optional[Dict[str, Any]] = None,
lock: Optional[Lock] = None,
apis: Optional[list] = None,
logger: Optional[Logger] = None,
integration: str = "Linux",
*,
lock: Optional[Lock] = None,
apis: Optional[list] = None,
):
super().__init__(apis or [])
self.__logger = logger or setup_logger("Scheduler", getenv("LOG_LEVEL", "INFO"))
@ -53,6 +54,14 @@ class JobScheduler(ApiCaller):
self.__job_success = True
self.__semaphore = Semaphore(cpu_count() or 1)
@property
def env(self) -> Dict[str, Any]:
return self.__env
@env.setter
def env(self, env: Dict[str, Any]):
self.__env = env
def __get_jobs(self):
jobs = {}
for plugin_file in glob(
@ -63,7 +72,7 @@ class JobScheduler(ApiCaller):
plugin_name = basename(dirname(plugin_file))
jobs[plugin_name] = []
try:
plugin_data = loads(Path(plugin_file).read_text())
plugin_data = loads(Path(plugin_file).read_text(encoding="utf-8"))
if not "jobs" in plugin_data:
continue
@ -130,7 +139,7 @@ class JobScheduler(ApiCaller):
return schedule_every().day
elif every == "week":
return schedule_every().week
raise Exception(f"can't convert string {every} to schedule")
raise ValueError(f"can't convert string {every} to schedule")
def __reload(self) -> bool:
reload = True
@ -141,6 +150,7 @@ class JobScheduler(ApiCaller):
stdin=DEVNULL,
stderr=PIPE,
env=self.__env,
check=False,
)
reload = proc.returncode == 0
if reload:
@ -151,7 +161,7 @@ class JobScheduler(ApiCaller):
)
else:
self.__logger.info("Reloading nginx ...")
reload = self._send_to_apis("POST", "/reload")
reload = self.send_to_apis("POST", "/reload")
if reload:
self.__logger.info("Successfully reloaded nginx")
else:
@ -166,7 +176,11 @@ class JobScheduler(ApiCaller):
ret = -1
try:
proc = run(
join(path, "jobs", file), stdin=DEVNULL, stderr=STDOUT, env=self.__env
join(path, "jobs", file),
stdin=DEVNULL,
stderr=STDOUT,
env=self.__env,
check=False,
)
ret = proc.returncode
except BaseException:
@ -235,10 +249,10 @@ class JobScheduler(ApiCaller):
if reload:
try:
if self._get_apis():
if self.apis:
cache_path = join(sep, "var", "cache", "bunkerweb")
self.__logger.info(f"Sending {cache_path} folder ...")
if not self._send_files(cache_path, "/cache"):
if not self.send_files(cache_path, "/cache"):
success = False
self.__logger.error(f"Error while sending {cache_path} folder")
else:
@ -283,7 +297,7 @@ class JobScheduler(ApiCaller):
return ret
def __run_in_thread(self, jobs: list):
self.__semaphore.acquire()
self.__semaphore.acquire(timeout=60)
for job in jobs:
job()
self.__semaphore.release()

View File

@ -25,7 +25,7 @@ from sys import path as sys_path
from tarfile import open as tar_open
from time import sleep
from traceback import format_exc
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional, Union
for deps_path in [
join(sep, "usr", "share", "bunkerweb", *paths)
@ -41,17 +41,17 @@ from Database import Database # type: ignore
from JobScheduler import JobScheduler
from ApiCaller import ApiCaller # type: ignore
run = True
scheduler = None
reloading = False
RUN = True
SCHEDULER: Optional[JobScheduler] = None
GENERATE = False
INTEGRATION = "Linux"
CACHE_PATH = join(sep, "var", "cache", "bunkerweb")
logger = setup_logger("Scheduler", getenv("LOG_LEVEL", "INFO"))
def handle_stop(signum, frame):
global run, scheduler
run = False
if scheduler is not None:
scheduler.clear()
if SCHEDULER is not None:
SCHEDULER.clear()
stop(0)
@ -61,13 +61,11 @@ signal(SIGTERM, handle_stop)
# Function to catch SIGHUP and reload the scheduler
def handle_reload(signum, frame):
global reloading, run, scheduler
reloading = True
try:
if scheduler is not None and run:
if SCHEDULER is not None and RUN:
# Get the env by reading the .env file
env = dotenv_values(join(sep, "etc", "bunkerweb", "variables.env"))
if scheduler.reload(env):
tmp_env = dotenv_values(join(sep, "etc", "bunkerweb", "variables.env"))
if SCHEDULER.reload(tmp_env):
logger.info("Reload successful")
else:
logger.error("Reload failed")
@ -91,61 +89,84 @@ def stop(status):
def generate_custom_configs(
custom_configs: List[Dict[str, Any]],
integration: str,
api_caller: ApiCaller,
configs: List[Dict[str, Any]],
*,
original_path: str = join(sep, "etc", "bunkerweb", "configs"),
original_path: Union[Path, str] = join(sep, "etc", "bunkerweb", "configs"),
):
logger.info("Generating new custom configs ...")
Path(original_path).mkdir(parents=True, exist_ok=True)
for custom_config in custom_configs:
tmp_path = join(original_path, custom_config["type"].replace("_", "-"))
if custom_config["service_id"]:
tmp_path = join(tmp_path, custom_config["service_id"])
tmp_path = Path(tmp_path, f"{custom_config['name']}.conf")
tmp_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path.write_bytes(custom_config["data"])
if not isinstance(original_path, Path):
original_path = Path(original_path)
if integration in ("Autoconf", "Swarm", "Kubernetes", "Docker"):
logger.info("Sending custom configs to BunkerWeb")
ret = api_caller._send_files(original_path, "/custom_configs")
# Remove old custom configs files
logger.info("Removing old custom configs files ...")
for file in glob(str(original_path.joinpath("*", "*"))):
file = Path(file)
if file.is_symlink() or file.is_file():
file.unlink()
elif file.is_dir():
rmtree(str(file), ignore_errors=True)
if not ret:
logger.error(
"Sending custom configs failed, configuration will not work as expected...",
if configs:
logger.info("Generating new custom configs ...")
original_path.mkdir(parents=True, exist_ok=True)
for custom_config in configs:
tmp_path = original_path.joinpath(
custom_config["type"].replace("_", "-"),
custom_config["service_id"] or "",
f"{custom_config['name']}.conf",
)
tmp_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path.write_bytes(custom_config["data"])
if SCHEDULER.apis:
logger.info("Sending custom configs to BunkerWeb")
ret = SCHEDULER.send_files(original_path, "/custom_configs")
if not ret:
logger.error(
"Sending custom configs failed, configuration will not work as expected...",
)
def generate_external_plugins(
plugins: List[Dict[str, Any]],
integration: str,
api_caller: ApiCaller,
*,
original_path: str = join(sep, "etc", "bunkerweb", "plugins"),
original_path: Union[Path, str] = join(sep, "etc", "bunkerweb", "plugins"),
):
logger.info("Generating new external plugins ...")
Path(original_path).mkdir(parents=True, exist_ok=True)
for plugin in plugins:
tmp_path = Path(original_path, plugin["id"], f"{plugin['name']}.tar.gz")
tmp_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path.write_bytes(plugin["data"])
with tar_open(str(tmp_path), "r:gz") as tar:
tar.extractall(original_path)
tmp_path.unlink()
if not isinstance(original_path, Path):
original_path = Path(original_path)
for job_file in glob(join(str(tmp_path.parent), "jobs", "*")):
st = Path(job_file).stat()
chmod(job_file, st.st_mode | S_IEXEC)
# Remove old external plugins files
logger.info("Removing old external plugins files ...")
for file in glob(str(original_path.joinpath("*"))):
file = Path(file)
if file.is_symlink() or file.is_file():
file.unlink()
elif file.is_dir():
rmtree(str(file), ignore_errors=True)
if integration in ("Autoconf", "Swarm", "Kubernetes", "Docker"):
logger.info("Sending plugins to BunkerWeb")
ret = api_caller._send_files(original_path, "/plugins")
if plugins:
logger.info("Generating new external plugins ...")
original_path.mkdir(parents=True, exist_ok=True)
for plugin in plugins:
tmp_path = original_path.joinpath(plugin["id"], f"{plugin['name']}.tar.gz")
tmp_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path.write_bytes(plugin["data"])
with tar_open(str(tmp_path), "r:gz") as tar:
tar.extractall(original_path)
tmp_path.unlink()
if not ret:
logger.error(
"Sending plugins failed, configuration will not work as expected...",
)
for job_file in glob(join(str(tmp_path.parent), "jobs", "*")):
st = Path(job_file).stat()
chmod(job_file, st.st_mode | S_IEXEC)
if SCHEDULER.apis:
logger.info("Sending plugins to BunkerWeb")
ret = SCHEDULER.send_files(original_path, "/plugins")
if not ret:
logger.error(
"Sending plugins failed, configuration will not work as expected...",
)
if __name__ == "__main__":
@ -159,7 +180,7 @@ if __name__ == "__main__":
_exit(1)
# Write pid to file
pid_path.write_text(str(getpid()))
pid_path.write_text(str(getpid()), encoding="utf-8")
del pid_path
@ -171,10 +192,6 @@ if __name__ == "__main__":
help="path to the file containing environment variables",
)
args = parser.parse_args()
generate = False
integration = "Linux"
api_caller = ApiCaller()
db_configs = None
tmp_variables_path = Path(
normpath(args.variables) if args.variables else sep,
"var",
@ -182,6 +199,7 @@ if __name__ == "__main__":
"bunkerweb",
"variables.env",
)
dotenv_env = dotenv_values(str(tmp_variables_path))
logger.info("Scheduler started ...")
@ -190,7 +208,7 @@ if __name__ == "__main__":
logger.info(f"Variables : {tmp_variables_path}")
# Read env file
env = dotenv_values(str(tmp_variables_path))
env = dotenv_env.copy()
db = Database(
logger,
@ -206,23 +224,19 @@ if __name__ == "__main__":
db_configs = db.get_custom_configs()
else:
# Read from database
integration = "Docker"
INTEGRATION = "Docker"
integration_path = Path(sep, "usr", "share", "bunkerweb", "INTEGRATION")
if integration_path.is_file():
integration = integration_path.read_text().strip()
INTEGRATION = integration_path.read_text(encoding="utf-8").strip()
del integration_path
api_caller.auto_setup(bw_integration=integration)
db = Database(
logger,
sqlalchemy_string=getenv("DATABASE_URI", None),
)
if db.is_initialized():
db_configs = db.get_custom_configs()
if integration in (
if INTEGRATION in (
"Swarm",
"Kubernetes",
"Autoconf",
@ -232,9 +246,7 @@ if __name__ == "__main__":
"Autoconf is not loaded yet in the database, retrying in 5s ...",
)
sleep(5)
elif not tmp_variables_path.is_file() or db.get_config() != dotenv_values(
str(tmp_variables_path)
):
elif not tmp_variables_path.is_file() or db.get_config() != dotenv_env:
# run the config saver
proc = subprocess_run(
[
@ -245,6 +257,7 @@ if __name__ == "__main__":
],
stdin=DEVNULL,
stderr=STDOUT,
check=False,
)
if proc.returncode != 0:
logger.error(
@ -257,9 +270,6 @@ if __name__ == "__main__":
)
sleep(5)
if not db_configs:
db_configs = db.get_custom_configs()
env = db.get_config()
while not db.is_first_config_saved() or not env:
logger.warning(
@ -272,13 +282,14 @@ if __name__ == "__main__":
# Checking if any custom config has been created by the user
custom_configs = []
configs_path = join(sep, "etc", "bunkerweb", "configs")
root_dirs = listdir(configs_path)
for root, dirs, files in walk(configs_path):
db_configs = db.get_custom_configs()
configs_path = Path(sep, "etc", "bunkerweb", "configs")
root_dirs = listdir(str(configs_path))
for root, dirs, files in walk(str(configs_path)):
if files or (dirs and basename(root) not in root_dirs):
path_exploded = root.split("/")
for file in files:
with open(join(root, file), "r") as f:
with open(join(root, file), "r", encoding="utf-8") as f:
custom_conf = {
"value": f.read(),
"exploded": (
@ -309,26 +320,13 @@ if __name__ == "__main__":
f"Couldn't save some manually created custom configs to database: {err}",
)
# Remove old custom configs files
logger.info("Removing old custom configs files ...")
for file in glob(join(configs_path, "*", "*")):
file = Path(file)
if file.is_symlink() or file.is_file():
file.unlink()
elif file.is_dir():
rmtree(str(file), ignore_errors=True)
db_configs = db.get_custom_configs()
if db_configs:
logger.info("Generating new custom configs ...")
generate_custom_configs(db_configs, integration, api_caller)
generate_custom_configs(db.get_custom_configs(), original_path=configs_path)
# Check if any external plugin has been added by the user
external_plugins = []
plugins_dir = join(sep, "etc", "bunkerweb", "plugins")
for filename in glob(join(plugins_dir, "*", "plugin.json")):
with open(filename, "r") as f:
plugins_dir = Path(sep, "etc", "bunkerweb", "plugins")
for filename in glob(str(plugins_dir.joinpath("*", "plugin.json"))):
with open(filename, "r", encoding="utf-8") as f:
_dir = dirname(filename)
plugin_content = BytesIO()
with tar_open(
@ -356,60 +354,88 @@ if __name__ == "__main__":
f"Couldn't save some manually added plugins to database: {err}",
)
external_plugins = db.get_plugins(external=True)
if external_plugins:
# Remove old external plugins files
logger.info("Removing old external plugins files ...")
for file in glob(join(plugins_dir, "*")):
file = Path(file)
if file.is_symlink() or file.is_file():
file.unlink()
elif file.is_dir():
rmtree(str(file), ignore_errors=True)
generate_external_plugins(
db.get_plugins(external=True, with_data=True),
integration,
api_caller,
original_path=plugins_dir,
)
generate_external_plugins(
db.get_plugins(external=True, with_data=True),
original_path=plugins_dir,
)
logger.info("Executing scheduler ...")
generate = not tmp_variables_path.exists() or env != dotenv_values(
str(tmp_variables_path)
)
GENERATE = not tmp_variables_path.exists() or env != dotenv_env
if not generate:
del dotenv_env
if not GENERATE:
logger.warning(
"Looks like BunkerWeb configuration is already generated, will not generate it again ..."
"Looks like BunkerWeb configuration is already generated, will not GENERATE it again ..."
)
first_run = True
# Instantiate scheduler
SCHEDULER = JobScheduler(env.copy() | environ.copy(), logger, INTEGRATION)
# Automatically setup the scheduler apis
SCHEDULER.auto_setup(bw_integration=INTEGRATION)
FIRST_RUN = True
while True:
ret = db.checked_changes()
if ret:
logger.error(
f"An error occurred when setting the changes to checked in the database : {changes}"
f"An error occurred when setting the changes to checked in the database : {ret}"
)
stop(1)
# Instantiate scheduler
scheduler = JobScheduler(
env=env.copy() | environ.copy(),
apis=api_caller._get_apis(),
logger=logger,
integration=integration,
)
# Update the environment variables of the scheduler
SCHEDULER.env = env.copy() | environ.copy()
# Only run jobs once
if not scheduler.run_once():
if not SCHEDULER.run_once():
logger.error("At least one job in run_once() failed")
else:
logger.info("All jobs in run_once() were successful")
if generate:
changes = db.check_changes()
if isinstance(changes, str):
logger.error(
f"An error occurred when checking for changes in the database : {changes}"
)
stop(1)
# check if the plugins have changed since last time
if changes["external_plugins_changed"]:
logger.info("External plugins changed, generating ...")
generate_external_plugins(
db.get_plugins(external=True, with_data=True),
original_path=plugins_dir,
)
# run the config saver to save potential plugins settings
proc = subprocess_run(
[
"python",
join(sep, "usr", "share", "bunkerweb", "gen", "save_config.py"),
"--settings",
join(sep, "usr", "share", "bunkerweb", "settings.json"),
],
stdin=DEVNULL,
stderr=STDOUT,
check=False,
)
if proc.returncode != 0:
logger.error(
"Config saver failed, configuration will not work as expected...",
)
ret = db.checked_changes()
if ret:
logger.error(
f"An error occurred when setting the changes to checked in the database : {ret}"
)
stop(1)
if GENERATE:
# run the generator
proc = subprocess_run(
[
@ -424,11 +450,12 @@ if __name__ == "__main__":
]
+ (
["--variables", str(tmp_variables_path)]
if args.variables and first_run
if args.variables and FIRST_RUN
else []
),
stdin=DEVNULL,
stderr=STDOUT,
check=False,
)
if proc.returncode != 0:
@ -441,29 +468,29 @@ if __name__ == "__main__":
str(tmp_variables_path),
)
if api_caller._get_apis():
if SCHEDULER.apis:
# send nginx configs
logger.info(f"Sending {join(sep, 'etc', 'nginx')} folder ...")
ret = api_caller._send_files(
join(sep, "etc", "nginx"), "/confs"
)
ret = SCHEDULER.send_files(join(sep, "etc", "nginx"), "/confs")
if not ret:
logger.error(
"Sending nginx configs failed, configuration will not work as expected...",
)
try:
if api_caller._get_apis():
cache_path = join(sep, "var", "cache", "bunkerweb")
if SCHEDULER.apis:
# send cache
logger.info(f"Sending {cache_path} folder ...")
if not api_caller._send_files(cache_path, "/cache"):
logger.error(f"Error while sending {cache_path} folder")
logger.info(f"Sending {CACHE_PATH} folder ...")
if not SCHEDULER.send_files(CACHE_PATH, "/cache"):
logger.error(f"Error while sending {CACHE_PATH} folder")
else:
logger.info(f"Successfully sent {cache_path} folder")
logger.info(f"Successfully sent {CACHE_PATH} folder")
# restart nginx
if integration not in ("Autoconf", "Swarm", "Kubernetes", "Docker"):
if SCHEDULER.send_to_apis("POST", "/reload"):
logger.info("Successfully reloaded nginx")
else:
logger.error("Error while reloading nginx")
else:
# Stop temp nginx
logger.info("Stopping temp nginx ...")
proc = subprocess_run(
@ -471,6 +498,7 @@ if __name__ == "__main__":
stdin=DEVNULL,
stderr=STDOUT,
env=env.copy(),
check=False,
)
if proc.returncode == 0:
logger.info("Successfully sent stop signal to temp nginx")
@ -495,6 +523,7 @@ if __name__ == "__main__":
stdin=DEVNULL,
stderr=STDOUT,
env=env.copy(),
check=False,
)
if proc.returncode == 0:
logger.info("Successfully started nginx")
@ -506,28 +535,25 @@ if __name__ == "__main__":
logger.error(
f"Error while sending stop signal to temp nginx - returncode: {proc.returncode} - error: {proc.stderr.decode('utf-8') if proc.stderr else 'Missing stderr'}",
)
else:
if api_caller._send_to_apis("POST", "/reload"):
logger.info("Successfully reloaded nginx")
else:
logger.error("Error while reloading nginx")
except:
logger.error(
f"Exception while reloading after running jobs once scheduling : {format_exc()}",
)
generate = True
scheduler.setup()
need_reload = False
configs_need_generation = False
plugins_need_generation = False
first_run = False
GENERATE = True
SCHEDULER.setup()
NEED_RELOAD = False
CONFIGS_NEED_GENERATION = False
PLUGINS_NEED_GENERATION = False
FIRST_RUN = False
# infinite schedule for the jobs
logger.info("Executing job scheduler ...")
Path(sep, "var", "tmp", "bunkerweb", "scheduler.healthy").write_text("ok")
while run and not need_reload:
scheduler.run_pending()
Path(sep, "var", "tmp", "bunkerweb", "scheduler.healthy").write_text(
"ok", encoding="utf-8"
)
while RUN and not NEED_RELOAD:
SCHEDULER.run_pending()
sleep(1)
changes = db.check_changes()
@ -541,58 +567,29 @@ if __name__ == "__main__":
# check if the custom configs have changed since last time
if changes["custom_configs_changed"]:
logger.info("Custom configs changed, generating ...")
configs_need_generation = True
need_reload = True
CONFIGS_NEED_GENERATION = True
NEED_RELOAD = True
# check if the plugins have changed since last time
if changes["external_plugins_changed"]:
logger.info("External plugins changed, generating ...")
plugins_need_generation = True
need_reload = True
PLUGINS_NEED_GENERATION = True
NEED_RELOAD = True
# check if the config have changed since last time
if changes["config_changed"]:
logger.info("Config changed, generating ...")
need_reload = True
if need_reload:
if configs_need_generation:
db_configs = db.get_custom_configs()
# Remove old custom configs files
logger.info("Removing old custom configs files ...")
for file in glob(join(configs_path, "*", "*")):
file = Path(file)
if file.is_symlink() or file.is_file():
file.unlink()
elif file.is_dir():
rmtree(str(file), ignore_errors=True)
NEED_RELOAD = True
if NEED_RELOAD:
if CONFIGS_NEED_GENERATION:
generate_custom_configs(
db_configs,
integration,
api_caller,
original_path=configs_path,
db.get_custom_configs(), original_path=configs_path
)
if plugins_need_generation:
external_plugins: List[Dict[str, Any]] = db.get_plugins(
external=True, with_data=True
)
# Remove old external plugins files
logger.info("Removing old external plugins files ...")
for file in glob(join(plugins_dir, "*")):
file = Path(file)
if file.is_symlink() or file.is_file():
file.unlink()
elif file.is_dir():
rmtree(str(file), ignore_errors=True)
if PLUGINS_NEED_GENERATION:
generate_external_plugins(
external_plugins,
integration,
api_caller,
db.get_plugins(external=True, with_data=True),
original_path=plugins_dir,
)

View File

@ -4,7 +4,6 @@ from os import sep
from os.path import join
from pathlib import Path
from subprocess import DEVNULL, STDOUT, run
from sys import path as sys_path
from typing import Any, Optional, Union
from API import API # type: ignore
@ -47,7 +46,8 @@ class Instance:
self.env = data
self.apiCaller = apiCaller or ApiCaller()
def get_id(self) -> str:
@property
def id(self) -> str:
return self._id
def reload(self) -> bool:
@ -57,11 +57,12 @@ class Instance:
["sudo", join(sep, "usr", "sbin", "nginx"), "-s", "reload"],
stdin=DEVNULL,
stderr=STDOUT,
check=False,
).returncode
== 0
)
return self.apiCaller._send_to_apis("POST", "/reload")
return self.apiCaller.send_to_apis("POST", "/reload")
def start(self) -> bool:
if self._type == "local":
@ -70,11 +71,12 @@ class Instance:
["sudo", join(sep, "usr", "sbin", "nginx")],
stdin=DEVNULL,
stderr=STDOUT,
check=False,
).returncode
== 0
)
return self.apiCaller._send_to_apis("POST", "/start")
return self.apiCaller.send_to_apis("POST", "/start")
def stop(self) -> bool:
if self._type == "local":
@ -83,11 +85,12 @@ class Instance:
["sudo", join(sep, "usr", "sbin", "nginx"), "-s", "stop"],
stdin=DEVNULL,
stderr=STDOUT,
check=False,
).returncode
== 0
)
return self.apiCaller._send_to_apis("POST", "/stop")
return self.apiCaller.send_to_apis("POST", "/stop")
def restart(self) -> bool:
if self._type == "local":
@ -96,11 +99,12 @@ class Instance:
["sudo", join(sep, "usr", "sbin", "nginx"), "-s", "restart"],
stdin=DEVNULL,
stderr=STDOUT,
check=False,
).returncode
== 0
)
return self.apiCaller._send_to_apis("POST", "/restart")
return self.apiCaller.send_to_apis("POST", "/restart")
class Instances:
@ -112,10 +116,10 @@ class Instances:
def __instance_from_id(self, _id) -> Instance:
instances: list[Instance] = self.get_instances()
for instance in instances:
if instance._id == _id:
if instance.id == _id:
return instance
raise Exception(f"Can't find instance with id {_id}")
raise ValueError(f"Can't find instance with _id {_id}")
def get_instances(self) -> list[Instance]:
instances = []
@ -129,25 +133,22 @@ class Instances:
for x in [env.split("=") for env in instance.attrs["Config"]["Env"]]
}
apiCaller = ApiCaller()
apiCaller._set_apis(
[
API(
f"http://{instance.name}:{env_variables.get('API_HTTP_PORT', '5000')}",
env_variables.get("API_SERVER_NAME", "bwapi"),
)
]
)
instances.append(
Instance(
instance.id,
instance._id,
instance.name,
instance.name,
"container",
"up" if instance.status == "running" else "down",
instance,
apiCaller,
ApiCaller(
[
API(
f"http://{instance.name}:{env_variables.get('API_HTTP_PORT', '5000')}",
env_variables.get("API_SERVER_NAME", "bwapi"),
)
]
),
)
)
elif self.__integration == "Swarm":
@ -160,7 +161,7 @@ class Instances:
if desired_tasks > 0 and (desired_tasks == running_tasks):
status = "up"
apis = []
apiCaller = ApiCaller()
api_http_port = None
api_server_name = None
@ -173,17 +174,16 @@ class Instances:
api_server_name = var.replace("API_SERVER_NAME=", "", 1)
for task in instance.tasks():
apis.append(
apiCaller.append(
API(
f"http://{instance.name}.{task['NodeID']}.{task['ID']}:{api_http_port or '5000'}",
host=api_server_name or "bwapi",
)
)
apiCaller = ApiCaller(apis=apis)
instances.append(
Instance(
instance.id,
instance._id,
instance.name,
instance.name,
"service",
@ -204,15 +204,6 @@ class Instances:
env.name: env.value or "" for env in pod.spec.containers[0].env
}
apiCaller = ApiCaller(
apis=[
API(
f"http://{pod.status.pod_ip}:{env_variables.get('API_HTTP_PORT', '5000')}",
host=env_variables.get("API_SERVER_NAME", "bwapi"),
)
]
)
status = "up"
if pod.status.conditions is not None:
for condition in pod.status.conditions:
@ -228,7 +219,16 @@ class Instances:
"pod",
status,
pod,
apiCaller,
ApiCaller(
[
API(
f"http://{pod.status.pod_ip}:{env_variables.get('API_HTTP_PORT', '5000')}",
host=env_variables.get(
"API_SERVER_NAME", "bwapi"
),
)
]
),
)
)
@ -239,18 +239,9 @@ class Instances:
# Local instance
if Path(sep, "usr", "sbin", "nginx").exists():
apiCaller = ApiCaller()
env_variables = dotenv_values(
join(sep, "etc", "bunkerweb", "variables.env")
)
apiCaller._set_apis(
[
API(
f"http://127.0.0.1:{env_variables.get('API_HTTP_PORT', '5000')}",
env_variables.get("API_SERVER_NAME", "bwapi"),
)
]
)
instances.insert(
0,
@ -263,7 +254,14 @@ class Instances:
if Path(sep, "var", "tmp", "bunkerweb", "nginx.pid").exists()
else "down",
None,
apiCaller,
ApiCaller(
[
API(
f"http://127.0.0.1:{env_variables.get('API_HTTP_PORT', '5000')}",
env_variables.get("API_SERVER_NAME", "bwapi"),
)
]
),
),
)
@ -282,10 +280,10 @@ class Instances:
return not_reloaded or "Successfully reloaded instances"
def reload_instance(
self, id: Optional[int] = None, instance: Optional[Instance] = None
self, _id: Optional[int] = None, instance: Optional[Instance] = None
) -> str:
if instance is None:
instance = self.__instance_from_id(id)
instance = self.__instance_from_id(_id)
result = instance.reload()
@ -294,8 +292,8 @@ class Instances:
return f"Can't reload {instance.name}"
def start_instance(self, id) -> str:
instance = self.__instance_from_id(id)
def start_instance(self, _id) -> str:
instance = self.__instance_from_id(_id)
result = instance.start()
@ -304,8 +302,8 @@ class Instances:
return f"Can't start {instance.name}"
def stop_instance(self, id) -> str:
instance = self.__instance_from_id(id)
def stop_instance(self, _id) -> str:
instance = self.__instance_from_id(_id)
result = instance.stop()
@ -314,8 +312,8 @@ class Instances:
return f"Can't stop {instance.name}"
def restart_instance(self, id) -> str:
instance = self.__instance_from_id(id)
def restart_instance(self, _id) -> str:
instance = self.__instance_from_id(_id)
result = instance.restart()