Small refactor on the ApiCaller and the Scheduler
This commit is contained in:
parent
1bd40a877a
commit
aca0d6da48
|
@ -121,7 +121,7 @@ class CLI(ApiCaller):
|
|||
):
|
||||
# Docker & Linux case
|
||||
super().__init__(
|
||||
apis=[
|
||||
[
|
||||
API(
|
||||
f"http://127.0.0.1:{self.__variables.get('API_HTTP_PORT', '5000')}",
|
||||
host=self.__variables.get("API_SERVER_NAME", "bwapi"),
|
||||
|
@ -142,8 +142,10 @@ class CLI(ApiCaller):
|
|||
elif self.__variables.get("AUTOCONF_MODE", "no").lower() == "yes":
|
||||
return "autoconf"
|
||||
elif integration_path.is_file():
|
||||
return integration_path.read_text().strip().lower()
|
||||
elif os_release_path.is_file() and "Alpine" in os_release_path.read_text():
|
||||
return integration_path.read_text(encoding="utf-8").strip().lower()
|
||||
elif os_release_path.is_file() and "Alpine" in os_release_path.read_text(
|
||||
encoding="utf-8"
|
||||
):
|
||||
return "docker"
|
||||
|
||||
return "linux"
|
||||
|
@ -154,7 +156,7 @@ class CLI(ApiCaller):
|
|||
if not ok:
|
||||
self.__logger.error(f"Failed to delete ban for {ip} from redis")
|
||||
|
||||
if self._send_to_apis("POST", "/unban", data={"ip": ip}):
|
||||
if self.send_to_apis("POST", "/unban", data={"ip": ip}):
|
||||
return True, f"IP {ip} has been unbanned"
|
||||
return False, "error"
|
||||
|
||||
|
@ -168,7 +170,7 @@ class CLI(ApiCaller):
|
|||
if not ok:
|
||||
self.__logger.error(f"Failed to ban {ip} in redis")
|
||||
|
||||
if self._send_to_apis("POST", "/ban", data={"ip": ip, "exp": exp}):
|
||||
if self.send_to_apis("POST", "/ban", data={"ip": ip, "exp": exp}):
|
||||
return (
|
||||
True,
|
||||
f"IP {ip} has been banned for {format_remaining_time(exp)}",
|
||||
|
@ -178,7 +180,7 @@ class CLI(ApiCaller):
|
|||
def bans(self) -> Tuple[bool, str]:
|
||||
servers = {}
|
||||
|
||||
ret, resp = self._send_to_apis("GET", "/bans", response=True)
|
||||
ret, resp = self.send_to_apis("GET", "/bans", response=True)
|
||||
if not ret:
|
||||
return False, "error"
|
||||
|
||||
|
@ -206,7 +208,6 @@ class CLI(ApiCaller):
|
|||
|
||||
for ban in bans:
|
||||
cli_str += f"- {ban['ip']} for {format_remaining_time(ban['exp'])} : {ban.get('reason', 'no reason given')}\n"
|
||||
else:
|
||||
cli_str += "\n"
|
||||
cli_str += "\n"
|
||||
|
||||
return True, cli_str
|
||||
|
|
|
@ -26,6 +26,14 @@ class ApiCaller:
|
|||
self.__apis = apis or []
|
||||
self.__logger = setup_logger("Api", getenv("LOG_LEVEL", "INFO"))
|
||||
|
||||
@property
|
||||
def apis(self) -> List[API]:
|
||||
return self.__apis
|
||||
|
||||
@apis.setter
|
||||
def apis(self, apis: List[API]):
|
||||
self.__apis = apis
|
||||
|
||||
def auto_setup(self, bw_integration: Optional[str] = None):
|
||||
if bw_integration is None:
|
||||
if getenv("KUBERNETES_MODE", "no") == "yes":
|
||||
|
@ -105,13 +113,7 @@ class ApiCaller:
|
|||
)
|
||||
)
|
||||
|
||||
def _set_apis(self, apis: List[API]):
|
||||
self.__apis = apis
|
||||
|
||||
def _get_apis(self):
|
||||
return self.__apis
|
||||
|
||||
def _send_to_apis(
|
||||
def send_to_apis(
|
||||
self,
|
||||
method: Union[Literal["POST"], Literal["GET"]],
|
||||
url: str,
|
||||
|
@ -155,7 +157,7 @@ class ApiCaller:
|
|||
return ret, responses
|
||||
return ret
|
||||
|
||||
def _send_files(self, path: str, url: str) -> bool:
|
||||
def send_files(self, path: str, url: str) -> bool:
|
||||
ret = True
|
||||
with BytesIO() as tgz:
|
||||
with tar_open(
|
||||
|
@ -164,6 +166,6 @@ class ApiCaller:
|
|||
tf.add(path, arcname=".")
|
||||
tgz.seek(0, 0)
|
||||
files = {"archive.tar.gz": tgz}
|
||||
if not self._send_to_apis("POST", url, files=files):
|
||||
if not self.send_to_apis("POST", url, files=files):
|
||||
ret = False
|
||||
return ret
|
||||
|
|
|
@ -36,10 +36,11 @@ class JobScheduler(ApiCaller):
|
|||
def __init__(
|
||||
self,
|
||||
env: Optional[Dict[str, Any]] = None,
|
||||
lock: Optional[Lock] = None,
|
||||
apis: Optional[list] = None,
|
||||
logger: Optional[Logger] = None,
|
||||
integration: str = "Linux",
|
||||
*,
|
||||
lock: Optional[Lock] = None,
|
||||
apis: Optional[list] = None,
|
||||
):
|
||||
super().__init__(apis or [])
|
||||
self.__logger = logger or setup_logger("Scheduler", getenv("LOG_LEVEL", "INFO"))
|
||||
|
@ -53,6 +54,14 @@ class JobScheduler(ApiCaller):
|
|||
self.__job_success = True
|
||||
self.__semaphore = Semaphore(cpu_count() or 1)
|
||||
|
||||
@property
|
||||
def env(self) -> Dict[str, Any]:
|
||||
return self.__env
|
||||
|
||||
@env.setter
|
||||
def env(self, env: Dict[str, Any]):
|
||||
self.__env = env
|
||||
|
||||
def __get_jobs(self):
|
||||
jobs = {}
|
||||
for plugin_file in glob(
|
||||
|
@ -63,7 +72,7 @@ class JobScheduler(ApiCaller):
|
|||
plugin_name = basename(dirname(plugin_file))
|
||||
jobs[plugin_name] = []
|
||||
try:
|
||||
plugin_data = loads(Path(plugin_file).read_text())
|
||||
plugin_data = loads(Path(plugin_file).read_text(encoding="utf-8"))
|
||||
if not "jobs" in plugin_data:
|
||||
continue
|
||||
|
||||
|
@ -130,7 +139,7 @@ class JobScheduler(ApiCaller):
|
|||
return schedule_every().day
|
||||
elif every == "week":
|
||||
return schedule_every().week
|
||||
raise Exception(f"can't convert string {every} to schedule")
|
||||
raise ValueError(f"can't convert string {every} to schedule")
|
||||
|
||||
def __reload(self) -> bool:
|
||||
reload = True
|
||||
|
@ -141,6 +150,7 @@ class JobScheduler(ApiCaller):
|
|||
stdin=DEVNULL,
|
||||
stderr=PIPE,
|
||||
env=self.__env,
|
||||
check=False,
|
||||
)
|
||||
reload = proc.returncode == 0
|
||||
if reload:
|
||||
|
@ -151,7 +161,7 @@ class JobScheduler(ApiCaller):
|
|||
)
|
||||
else:
|
||||
self.__logger.info("Reloading nginx ...")
|
||||
reload = self._send_to_apis("POST", "/reload")
|
||||
reload = self.send_to_apis("POST", "/reload")
|
||||
if reload:
|
||||
self.__logger.info("Successfully reloaded nginx")
|
||||
else:
|
||||
|
@ -166,7 +176,11 @@ class JobScheduler(ApiCaller):
|
|||
ret = -1
|
||||
try:
|
||||
proc = run(
|
||||
join(path, "jobs", file), stdin=DEVNULL, stderr=STDOUT, env=self.__env
|
||||
join(path, "jobs", file),
|
||||
stdin=DEVNULL,
|
||||
stderr=STDOUT,
|
||||
env=self.__env,
|
||||
check=False,
|
||||
)
|
||||
ret = proc.returncode
|
||||
except BaseException:
|
||||
|
@ -235,10 +249,10 @@ class JobScheduler(ApiCaller):
|
|||
|
||||
if reload:
|
||||
try:
|
||||
if self._get_apis():
|
||||
if self.apis:
|
||||
cache_path = join(sep, "var", "cache", "bunkerweb")
|
||||
self.__logger.info(f"Sending {cache_path} folder ...")
|
||||
if not self._send_files(cache_path, "/cache"):
|
||||
if not self.send_files(cache_path, "/cache"):
|
||||
success = False
|
||||
self.__logger.error(f"Error while sending {cache_path} folder")
|
||||
else:
|
||||
|
@ -283,7 +297,7 @@ class JobScheduler(ApiCaller):
|
|||
return ret
|
||||
|
||||
def __run_in_thread(self, jobs: list):
|
||||
self.__semaphore.acquire()
|
||||
self.__semaphore.acquire(timeout=60)
|
||||
for job in jobs:
|
||||
job()
|
||||
self.__semaphore.release()
|
||||
|
|
|
@ -25,7 +25,7 @@ from sys import path as sys_path
|
|||
from tarfile import open as tar_open
|
||||
from time import sleep
|
||||
from traceback import format_exc
|
||||
from typing import Any, Dict, List
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
for deps_path in [
|
||||
join(sep, "usr", "share", "bunkerweb", *paths)
|
||||
|
@ -41,17 +41,17 @@ from Database import Database # type: ignore
|
|||
from JobScheduler import JobScheduler
|
||||
from ApiCaller import ApiCaller # type: ignore
|
||||
|
||||
run = True
|
||||
scheduler = None
|
||||
reloading = False
|
||||
RUN = True
|
||||
SCHEDULER: Optional[JobScheduler] = None
|
||||
GENERATE = False
|
||||
INTEGRATION = "Linux"
|
||||
CACHE_PATH = join(sep, "var", "cache", "bunkerweb")
|
||||
logger = setup_logger("Scheduler", getenv("LOG_LEVEL", "INFO"))
|
||||
|
||||
|
||||
def handle_stop(signum, frame):
|
||||
global run, scheduler
|
||||
run = False
|
||||
if scheduler is not None:
|
||||
scheduler.clear()
|
||||
if SCHEDULER is not None:
|
||||
SCHEDULER.clear()
|
||||
stop(0)
|
||||
|
||||
|
||||
|
@ -61,13 +61,11 @@ signal(SIGTERM, handle_stop)
|
|||
|
||||
# Function to catch SIGHUP and reload the scheduler
|
||||
def handle_reload(signum, frame):
|
||||
global reloading, run, scheduler
|
||||
reloading = True
|
||||
try:
|
||||
if scheduler is not None and run:
|
||||
if SCHEDULER is not None and RUN:
|
||||
# Get the env by reading the .env file
|
||||
env = dotenv_values(join(sep, "etc", "bunkerweb", "variables.env"))
|
||||
if scheduler.reload(env):
|
||||
tmp_env = dotenv_values(join(sep, "etc", "bunkerweb", "variables.env"))
|
||||
if SCHEDULER.reload(tmp_env):
|
||||
logger.info("Reload successful")
|
||||
else:
|
||||
logger.error("Reload failed")
|
||||
|
@ -91,61 +89,84 @@ def stop(status):
|
|||
|
||||
|
||||
def generate_custom_configs(
|
||||
custom_configs: List[Dict[str, Any]],
|
||||
integration: str,
|
||||
api_caller: ApiCaller,
|
||||
configs: List[Dict[str, Any]],
|
||||
*,
|
||||
original_path: str = join(sep, "etc", "bunkerweb", "configs"),
|
||||
original_path: Union[Path, str] = join(sep, "etc", "bunkerweb", "configs"),
|
||||
):
|
||||
logger.info("Generating new custom configs ...")
|
||||
Path(original_path).mkdir(parents=True, exist_ok=True)
|
||||
for custom_config in custom_configs:
|
||||
tmp_path = join(original_path, custom_config["type"].replace("_", "-"))
|
||||
if custom_config["service_id"]:
|
||||
tmp_path = join(tmp_path, custom_config["service_id"])
|
||||
tmp_path = Path(tmp_path, f"{custom_config['name']}.conf")
|
||||
tmp_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp_path.write_bytes(custom_config["data"])
|
||||
if not isinstance(original_path, Path):
|
||||
original_path = Path(original_path)
|
||||
|
||||
if integration in ("Autoconf", "Swarm", "Kubernetes", "Docker"):
|
||||
logger.info("Sending custom configs to BunkerWeb")
|
||||
ret = api_caller._send_files(original_path, "/custom_configs")
|
||||
# Remove old custom configs files
|
||||
logger.info("Removing old custom configs files ...")
|
||||
for file in glob(str(original_path.joinpath("*", "*"))):
|
||||
file = Path(file)
|
||||
if file.is_symlink() or file.is_file():
|
||||
file.unlink()
|
||||
elif file.is_dir():
|
||||
rmtree(str(file), ignore_errors=True)
|
||||
|
||||
if not ret:
|
||||
logger.error(
|
||||
"Sending custom configs failed, configuration will not work as expected...",
|
||||
if configs:
|
||||
logger.info("Generating new custom configs ...")
|
||||
original_path.mkdir(parents=True, exist_ok=True)
|
||||
for custom_config in configs:
|
||||
tmp_path = original_path.joinpath(
|
||||
custom_config["type"].replace("_", "-"),
|
||||
custom_config["service_id"] or "",
|
||||
f"{custom_config['name']}.conf",
|
||||
)
|
||||
tmp_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp_path.write_bytes(custom_config["data"])
|
||||
|
||||
if SCHEDULER.apis:
|
||||
logger.info("Sending custom configs to BunkerWeb")
|
||||
ret = SCHEDULER.send_files(original_path, "/custom_configs")
|
||||
|
||||
if not ret:
|
||||
logger.error(
|
||||
"Sending custom configs failed, configuration will not work as expected...",
|
||||
)
|
||||
|
||||
|
||||
def generate_external_plugins(
|
||||
plugins: List[Dict[str, Any]],
|
||||
integration: str,
|
||||
api_caller: ApiCaller,
|
||||
*,
|
||||
original_path: str = join(sep, "etc", "bunkerweb", "plugins"),
|
||||
original_path: Union[Path, str] = join(sep, "etc", "bunkerweb", "plugins"),
|
||||
):
|
||||
logger.info("Generating new external plugins ...")
|
||||
Path(original_path).mkdir(parents=True, exist_ok=True)
|
||||
for plugin in plugins:
|
||||
tmp_path = Path(original_path, plugin["id"], f"{plugin['name']}.tar.gz")
|
||||
tmp_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp_path.write_bytes(plugin["data"])
|
||||
with tar_open(str(tmp_path), "r:gz") as tar:
|
||||
tar.extractall(original_path)
|
||||
tmp_path.unlink()
|
||||
if not isinstance(original_path, Path):
|
||||
original_path = Path(original_path)
|
||||
|
||||
for job_file in glob(join(str(tmp_path.parent), "jobs", "*")):
|
||||
st = Path(job_file).stat()
|
||||
chmod(job_file, st.st_mode | S_IEXEC)
|
||||
# Remove old external plugins files
|
||||
logger.info("Removing old external plugins files ...")
|
||||
for file in glob(str(original_path.joinpath("*"))):
|
||||
file = Path(file)
|
||||
if file.is_symlink() or file.is_file():
|
||||
file.unlink()
|
||||
elif file.is_dir():
|
||||
rmtree(str(file), ignore_errors=True)
|
||||
|
||||
if integration in ("Autoconf", "Swarm", "Kubernetes", "Docker"):
|
||||
logger.info("Sending plugins to BunkerWeb")
|
||||
ret = api_caller._send_files(original_path, "/plugins")
|
||||
if plugins:
|
||||
logger.info("Generating new external plugins ...")
|
||||
original_path.mkdir(parents=True, exist_ok=True)
|
||||
for plugin in plugins:
|
||||
tmp_path = original_path.joinpath(plugin["id"], f"{plugin['name']}.tar.gz")
|
||||
tmp_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp_path.write_bytes(plugin["data"])
|
||||
with tar_open(str(tmp_path), "r:gz") as tar:
|
||||
tar.extractall(original_path)
|
||||
tmp_path.unlink()
|
||||
|
||||
if not ret:
|
||||
logger.error(
|
||||
"Sending plugins failed, configuration will not work as expected...",
|
||||
)
|
||||
for job_file in glob(join(str(tmp_path.parent), "jobs", "*")):
|
||||
st = Path(job_file).stat()
|
||||
chmod(job_file, st.st_mode | S_IEXEC)
|
||||
|
||||
if SCHEDULER.apis:
|
||||
logger.info("Sending plugins to BunkerWeb")
|
||||
ret = SCHEDULER.send_files(original_path, "/plugins")
|
||||
|
||||
if not ret:
|
||||
logger.error(
|
||||
"Sending plugins failed, configuration will not work as expected...",
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -159,7 +180,7 @@ if __name__ == "__main__":
|
|||
_exit(1)
|
||||
|
||||
# Write pid to file
|
||||
pid_path.write_text(str(getpid()))
|
||||
pid_path.write_text(str(getpid()), encoding="utf-8")
|
||||
|
||||
del pid_path
|
||||
|
||||
|
@ -171,10 +192,6 @@ if __name__ == "__main__":
|
|||
help="path to the file containing environment variables",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
generate = False
|
||||
integration = "Linux"
|
||||
api_caller = ApiCaller()
|
||||
db_configs = None
|
||||
tmp_variables_path = Path(
|
||||
normpath(args.variables) if args.variables else sep,
|
||||
"var",
|
||||
|
@ -182,6 +199,7 @@ if __name__ == "__main__":
|
|||
"bunkerweb",
|
||||
"variables.env",
|
||||
)
|
||||
dotenv_env = dotenv_values(str(tmp_variables_path))
|
||||
|
||||
logger.info("Scheduler started ...")
|
||||
|
||||
|
@ -190,7 +208,7 @@ if __name__ == "__main__":
|
|||
logger.info(f"Variables : {tmp_variables_path}")
|
||||
|
||||
# Read env file
|
||||
env = dotenv_values(str(tmp_variables_path))
|
||||
env = dotenv_env.copy()
|
||||
|
||||
db = Database(
|
||||
logger,
|
||||
|
@ -206,23 +224,19 @@ if __name__ == "__main__":
|
|||
db_configs = db.get_custom_configs()
|
||||
else:
|
||||
# Read from database
|
||||
integration = "Docker"
|
||||
INTEGRATION = "Docker"
|
||||
integration_path = Path(sep, "usr", "share", "bunkerweb", "INTEGRATION")
|
||||
if integration_path.is_file():
|
||||
integration = integration_path.read_text().strip()
|
||||
INTEGRATION = integration_path.read_text(encoding="utf-8").strip()
|
||||
|
||||
del integration_path
|
||||
|
||||
api_caller.auto_setup(bw_integration=integration)
|
||||
db = Database(
|
||||
logger,
|
||||
sqlalchemy_string=getenv("DATABASE_URI", None),
|
||||
)
|
||||
|
||||
if db.is_initialized():
|
||||
db_configs = db.get_custom_configs()
|
||||
|
||||
if integration in (
|
||||
if INTEGRATION in (
|
||||
"Swarm",
|
||||
"Kubernetes",
|
||||
"Autoconf",
|
||||
|
@ -232,9 +246,7 @@ if __name__ == "__main__":
|
|||
"Autoconf is not loaded yet in the database, retrying in 5s ...",
|
||||
)
|
||||
sleep(5)
|
||||
elif not tmp_variables_path.is_file() or db.get_config() != dotenv_values(
|
||||
str(tmp_variables_path)
|
||||
):
|
||||
elif not tmp_variables_path.is_file() or db.get_config() != dotenv_env:
|
||||
# run the config saver
|
||||
proc = subprocess_run(
|
||||
[
|
||||
|
@ -245,6 +257,7 @@ if __name__ == "__main__":
|
|||
],
|
||||
stdin=DEVNULL,
|
||||
stderr=STDOUT,
|
||||
check=False,
|
||||
)
|
||||
if proc.returncode != 0:
|
||||
logger.error(
|
||||
|
@ -257,9 +270,6 @@ if __name__ == "__main__":
|
|||
)
|
||||
sleep(5)
|
||||
|
||||
if not db_configs:
|
||||
db_configs = db.get_custom_configs()
|
||||
|
||||
env = db.get_config()
|
||||
while not db.is_first_config_saved() or not env:
|
||||
logger.warning(
|
||||
|
@ -272,13 +282,14 @@ if __name__ == "__main__":
|
|||
|
||||
# Checking if any custom config has been created by the user
|
||||
custom_configs = []
|
||||
configs_path = join(sep, "etc", "bunkerweb", "configs")
|
||||
root_dirs = listdir(configs_path)
|
||||
for root, dirs, files in walk(configs_path):
|
||||
db_configs = db.get_custom_configs()
|
||||
configs_path = Path(sep, "etc", "bunkerweb", "configs")
|
||||
root_dirs = listdir(str(configs_path))
|
||||
for root, dirs, files in walk(str(configs_path)):
|
||||
if files or (dirs and basename(root) not in root_dirs):
|
||||
path_exploded = root.split("/")
|
||||
for file in files:
|
||||
with open(join(root, file), "r") as f:
|
||||
with open(join(root, file), "r", encoding="utf-8") as f:
|
||||
custom_conf = {
|
||||
"value": f.read(),
|
||||
"exploded": (
|
||||
|
@ -309,26 +320,13 @@ if __name__ == "__main__":
|
|||
f"Couldn't save some manually created custom configs to database: {err}",
|
||||
)
|
||||
|
||||
# Remove old custom configs files
|
||||
logger.info("Removing old custom configs files ...")
|
||||
for file in glob(join(configs_path, "*", "*")):
|
||||
file = Path(file)
|
||||
if file.is_symlink() or file.is_file():
|
||||
file.unlink()
|
||||
elif file.is_dir():
|
||||
rmtree(str(file), ignore_errors=True)
|
||||
|
||||
db_configs = db.get_custom_configs()
|
||||
|
||||
if db_configs:
|
||||
logger.info("Generating new custom configs ...")
|
||||
generate_custom_configs(db_configs, integration, api_caller)
|
||||
generate_custom_configs(db.get_custom_configs(), original_path=configs_path)
|
||||
|
||||
# Check if any external plugin has been added by the user
|
||||
external_plugins = []
|
||||
plugins_dir = join(sep, "etc", "bunkerweb", "plugins")
|
||||
for filename in glob(join(plugins_dir, "*", "plugin.json")):
|
||||
with open(filename, "r") as f:
|
||||
plugins_dir = Path(sep, "etc", "bunkerweb", "plugins")
|
||||
for filename in glob(str(plugins_dir.joinpath("*", "plugin.json"))):
|
||||
with open(filename, "r", encoding="utf-8") as f:
|
||||
_dir = dirname(filename)
|
||||
plugin_content = BytesIO()
|
||||
with tar_open(
|
||||
|
@ -356,60 +354,88 @@ if __name__ == "__main__":
|
|||
f"Couldn't save some manually added plugins to database: {err}",
|
||||
)
|
||||
|
||||
external_plugins = db.get_plugins(external=True)
|
||||
if external_plugins:
|
||||
# Remove old external plugins files
|
||||
logger.info("Removing old external plugins files ...")
|
||||
for file in glob(join(plugins_dir, "*")):
|
||||
file = Path(file)
|
||||
if file.is_symlink() or file.is_file():
|
||||
file.unlink()
|
||||
elif file.is_dir():
|
||||
rmtree(str(file), ignore_errors=True)
|
||||
|
||||
generate_external_plugins(
|
||||
db.get_plugins(external=True, with_data=True),
|
||||
integration,
|
||||
api_caller,
|
||||
original_path=plugins_dir,
|
||||
)
|
||||
generate_external_plugins(
|
||||
db.get_plugins(external=True, with_data=True),
|
||||
original_path=plugins_dir,
|
||||
)
|
||||
|
||||
logger.info("Executing scheduler ...")
|
||||
|
||||
generate = not tmp_variables_path.exists() or env != dotenv_values(
|
||||
str(tmp_variables_path)
|
||||
)
|
||||
GENERATE = not tmp_variables_path.exists() or env != dotenv_env
|
||||
|
||||
if not generate:
|
||||
del dotenv_env
|
||||
|
||||
if not GENERATE:
|
||||
logger.warning(
|
||||
"Looks like BunkerWeb configuration is already generated, will not generate it again ..."
|
||||
"Looks like BunkerWeb configuration is already generated, will not GENERATE it again ..."
|
||||
)
|
||||
|
||||
first_run = True
|
||||
# Instantiate scheduler
|
||||
SCHEDULER = JobScheduler(env.copy() | environ.copy(), logger, INTEGRATION)
|
||||
# Automatically setup the scheduler apis
|
||||
SCHEDULER.auto_setup(bw_integration=INTEGRATION)
|
||||
|
||||
FIRST_RUN = True
|
||||
while True:
|
||||
ret = db.checked_changes()
|
||||
|
||||
if ret:
|
||||
logger.error(
|
||||
f"An error occurred when setting the changes to checked in the database : {changes}"
|
||||
f"An error occurred when setting the changes to checked in the database : {ret}"
|
||||
)
|
||||
stop(1)
|
||||
|
||||
# Instantiate scheduler
|
||||
scheduler = JobScheduler(
|
||||
env=env.copy() | environ.copy(),
|
||||
apis=api_caller._get_apis(),
|
||||
logger=logger,
|
||||
integration=integration,
|
||||
)
|
||||
# Update the environment variables of the scheduler
|
||||
SCHEDULER.env = env.copy() | environ.copy()
|
||||
|
||||
# Only run jobs once
|
||||
if not scheduler.run_once():
|
||||
if not SCHEDULER.run_once():
|
||||
logger.error("At least one job in run_once() failed")
|
||||
else:
|
||||
logger.info("All jobs in run_once() were successful")
|
||||
|
||||
if generate:
|
||||
changes = db.check_changes()
|
||||
|
||||
if isinstance(changes, str):
|
||||
logger.error(
|
||||
f"An error occurred when checking for changes in the database : {changes}"
|
||||
)
|
||||
stop(1)
|
||||
|
||||
# check if the plugins have changed since last time
|
||||
if changes["external_plugins_changed"]:
|
||||
logger.info("External plugins changed, generating ...")
|
||||
generate_external_plugins(
|
||||
db.get_plugins(external=True, with_data=True),
|
||||
original_path=plugins_dir,
|
||||
)
|
||||
|
||||
# run the config saver to save potential plugins settings
|
||||
proc = subprocess_run(
|
||||
[
|
||||
"python",
|
||||
join(sep, "usr", "share", "bunkerweb", "gen", "save_config.py"),
|
||||
"--settings",
|
||||
join(sep, "usr", "share", "bunkerweb", "settings.json"),
|
||||
],
|
||||
stdin=DEVNULL,
|
||||
stderr=STDOUT,
|
||||
check=False,
|
||||
)
|
||||
if proc.returncode != 0:
|
||||
logger.error(
|
||||
"Config saver failed, configuration will not work as expected...",
|
||||
)
|
||||
|
||||
ret = db.checked_changes()
|
||||
|
||||
if ret:
|
||||
logger.error(
|
||||
f"An error occurred when setting the changes to checked in the database : {ret}"
|
||||
)
|
||||
stop(1)
|
||||
|
||||
if GENERATE:
|
||||
# run the generator
|
||||
proc = subprocess_run(
|
||||
[
|
||||
|
@ -424,11 +450,12 @@ if __name__ == "__main__":
|
|||
]
|
||||
+ (
|
||||
["--variables", str(tmp_variables_path)]
|
||||
if args.variables and first_run
|
||||
if args.variables and FIRST_RUN
|
||||
else []
|
||||
),
|
||||
stdin=DEVNULL,
|
||||
stderr=STDOUT,
|
||||
check=False,
|
||||
)
|
||||
|
||||
if proc.returncode != 0:
|
||||
|
@ -441,29 +468,29 @@ if __name__ == "__main__":
|
|||
str(tmp_variables_path),
|
||||
)
|
||||
|
||||
if api_caller._get_apis():
|
||||
if SCHEDULER.apis:
|
||||
# send nginx configs
|
||||
logger.info(f"Sending {join(sep, 'etc', 'nginx')} folder ...")
|
||||
ret = api_caller._send_files(
|
||||
join(sep, "etc", "nginx"), "/confs"
|
||||
)
|
||||
ret = SCHEDULER.send_files(join(sep, "etc", "nginx"), "/confs")
|
||||
if not ret:
|
||||
logger.error(
|
||||
"Sending nginx configs failed, configuration will not work as expected...",
|
||||
)
|
||||
|
||||
try:
|
||||
if api_caller._get_apis():
|
||||
cache_path = join(sep, "var", "cache", "bunkerweb")
|
||||
if SCHEDULER.apis:
|
||||
# send cache
|
||||
logger.info(f"Sending {cache_path} folder ...")
|
||||
if not api_caller._send_files(cache_path, "/cache"):
|
||||
logger.error(f"Error while sending {cache_path} folder")
|
||||
logger.info(f"Sending {CACHE_PATH} folder ...")
|
||||
if not SCHEDULER.send_files(CACHE_PATH, "/cache"):
|
||||
logger.error(f"Error while sending {CACHE_PATH} folder")
|
||||
else:
|
||||
logger.info(f"Successfully sent {cache_path} folder")
|
||||
logger.info(f"Successfully sent {CACHE_PATH} folder")
|
||||
|
||||
# restart nginx
|
||||
if integration not in ("Autoconf", "Swarm", "Kubernetes", "Docker"):
|
||||
if SCHEDULER.send_to_apis("POST", "/reload"):
|
||||
logger.info("Successfully reloaded nginx")
|
||||
else:
|
||||
logger.error("Error while reloading nginx")
|
||||
else:
|
||||
# Stop temp nginx
|
||||
logger.info("Stopping temp nginx ...")
|
||||
proc = subprocess_run(
|
||||
|
@ -471,6 +498,7 @@ if __name__ == "__main__":
|
|||
stdin=DEVNULL,
|
||||
stderr=STDOUT,
|
||||
env=env.copy(),
|
||||
check=False,
|
||||
)
|
||||
if proc.returncode == 0:
|
||||
logger.info("Successfully sent stop signal to temp nginx")
|
||||
|
@ -495,6 +523,7 @@ if __name__ == "__main__":
|
|||
stdin=DEVNULL,
|
||||
stderr=STDOUT,
|
||||
env=env.copy(),
|
||||
check=False,
|
||||
)
|
||||
if proc.returncode == 0:
|
||||
logger.info("Successfully started nginx")
|
||||
|
@ -506,28 +535,25 @@ if __name__ == "__main__":
|
|||
logger.error(
|
||||
f"Error while sending stop signal to temp nginx - returncode: {proc.returncode} - error: {proc.stderr.decode('utf-8') if proc.stderr else 'Missing stderr'}",
|
||||
)
|
||||
else:
|
||||
if api_caller._send_to_apis("POST", "/reload"):
|
||||
logger.info("Successfully reloaded nginx")
|
||||
else:
|
||||
logger.error("Error while reloading nginx")
|
||||
except:
|
||||
logger.error(
|
||||
f"Exception while reloading after running jobs once scheduling : {format_exc()}",
|
||||
)
|
||||
|
||||
generate = True
|
||||
scheduler.setup()
|
||||
need_reload = False
|
||||
configs_need_generation = False
|
||||
plugins_need_generation = False
|
||||
first_run = False
|
||||
GENERATE = True
|
||||
SCHEDULER.setup()
|
||||
NEED_RELOAD = False
|
||||
CONFIGS_NEED_GENERATION = False
|
||||
PLUGINS_NEED_GENERATION = False
|
||||
FIRST_RUN = False
|
||||
|
||||
# infinite schedule for the jobs
|
||||
logger.info("Executing job scheduler ...")
|
||||
Path(sep, "var", "tmp", "bunkerweb", "scheduler.healthy").write_text("ok")
|
||||
while run and not need_reload:
|
||||
scheduler.run_pending()
|
||||
Path(sep, "var", "tmp", "bunkerweb", "scheduler.healthy").write_text(
|
||||
"ok", encoding="utf-8"
|
||||
)
|
||||
while RUN and not NEED_RELOAD:
|
||||
SCHEDULER.run_pending()
|
||||
sleep(1)
|
||||
|
||||
changes = db.check_changes()
|
||||
|
@ -541,58 +567,29 @@ if __name__ == "__main__":
|
|||
# check if the custom configs have changed since last time
|
||||
if changes["custom_configs_changed"]:
|
||||
logger.info("Custom configs changed, generating ...")
|
||||
configs_need_generation = True
|
||||
need_reload = True
|
||||
CONFIGS_NEED_GENERATION = True
|
||||
NEED_RELOAD = True
|
||||
|
||||
# check if the plugins have changed since last time
|
||||
if changes["external_plugins_changed"]:
|
||||
logger.info("External plugins changed, generating ...")
|
||||
plugins_need_generation = True
|
||||
need_reload = True
|
||||
PLUGINS_NEED_GENERATION = True
|
||||
NEED_RELOAD = True
|
||||
|
||||
# check if the config have changed since last time
|
||||
if changes["config_changed"]:
|
||||
logger.info("Config changed, generating ...")
|
||||
need_reload = True
|
||||
|
||||
if need_reload:
|
||||
if configs_need_generation:
|
||||
db_configs = db.get_custom_configs()
|
||||
|
||||
# Remove old custom configs files
|
||||
logger.info("Removing old custom configs files ...")
|
||||
for file in glob(join(configs_path, "*", "*")):
|
||||
file = Path(file)
|
||||
if file.is_symlink() or file.is_file():
|
||||
file.unlink()
|
||||
elif file.is_dir():
|
||||
rmtree(str(file), ignore_errors=True)
|
||||
NEED_RELOAD = True
|
||||
|
||||
if NEED_RELOAD:
|
||||
if CONFIGS_NEED_GENERATION:
|
||||
generate_custom_configs(
|
||||
db_configs,
|
||||
integration,
|
||||
api_caller,
|
||||
original_path=configs_path,
|
||||
db.get_custom_configs(), original_path=configs_path
|
||||
)
|
||||
|
||||
if plugins_need_generation:
|
||||
external_plugins: List[Dict[str, Any]] = db.get_plugins(
|
||||
external=True, with_data=True
|
||||
)
|
||||
|
||||
# Remove old external plugins files
|
||||
logger.info("Removing old external plugins files ...")
|
||||
for file in glob(join(plugins_dir, "*")):
|
||||
file = Path(file)
|
||||
if file.is_symlink() or file.is_file():
|
||||
file.unlink()
|
||||
elif file.is_dir():
|
||||
rmtree(str(file), ignore_errors=True)
|
||||
|
||||
if PLUGINS_NEED_GENERATION:
|
||||
generate_external_plugins(
|
||||
external_plugins,
|
||||
integration,
|
||||
api_caller,
|
||||
db.get_plugins(external=True, with_data=True),
|
||||
original_path=plugins_dir,
|
||||
)
|
||||
|
||||
|
|
|
@ -4,7 +4,6 @@ from os import sep
|
|||
from os.path import join
|
||||
from pathlib import Path
|
||||
from subprocess import DEVNULL, STDOUT, run
|
||||
from sys import path as sys_path
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
from API import API # type: ignore
|
||||
|
@ -47,7 +46,8 @@ class Instance:
|
|||
self.env = data
|
||||
self.apiCaller = apiCaller or ApiCaller()
|
||||
|
||||
def get_id(self) -> str:
|
||||
@property
|
||||
def id(self) -> str:
|
||||
return self._id
|
||||
|
||||
def reload(self) -> bool:
|
||||
|
@ -57,11 +57,12 @@ class Instance:
|
|||
["sudo", join(sep, "usr", "sbin", "nginx"), "-s", "reload"],
|
||||
stdin=DEVNULL,
|
||||
stderr=STDOUT,
|
||||
check=False,
|
||||
).returncode
|
||||
== 0
|
||||
)
|
||||
|
||||
return self.apiCaller._send_to_apis("POST", "/reload")
|
||||
return self.apiCaller.send_to_apis("POST", "/reload")
|
||||
|
||||
def start(self) -> bool:
|
||||
if self._type == "local":
|
||||
|
@ -70,11 +71,12 @@ class Instance:
|
|||
["sudo", join(sep, "usr", "sbin", "nginx")],
|
||||
stdin=DEVNULL,
|
||||
stderr=STDOUT,
|
||||
check=False,
|
||||
).returncode
|
||||
== 0
|
||||
)
|
||||
|
||||
return self.apiCaller._send_to_apis("POST", "/start")
|
||||
return self.apiCaller.send_to_apis("POST", "/start")
|
||||
|
||||
def stop(self) -> bool:
|
||||
if self._type == "local":
|
||||
|
@ -83,11 +85,12 @@ class Instance:
|
|||
["sudo", join(sep, "usr", "sbin", "nginx"), "-s", "stop"],
|
||||
stdin=DEVNULL,
|
||||
stderr=STDOUT,
|
||||
check=False,
|
||||
).returncode
|
||||
== 0
|
||||
)
|
||||
|
||||
return self.apiCaller._send_to_apis("POST", "/stop")
|
||||
return self.apiCaller.send_to_apis("POST", "/stop")
|
||||
|
||||
def restart(self) -> bool:
|
||||
if self._type == "local":
|
||||
|
@ -96,11 +99,12 @@ class Instance:
|
|||
["sudo", join(sep, "usr", "sbin", "nginx"), "-s", "restart"],
|
||||
stdin=DEVNULL,
|
||||
stderr=STDOUT,
|
||||
check=False,
|
||||
).returncode
|
||||
== 0
|
||||
)
|
||||
|
||||
return self.apiCaller._send_to_apis("POST", "/restart")
|
||||
return self.apiCaller.send_to_apis("POST", "/restart")
|
||||
|
||||
|
||||
class Instances:
|
||||
|
@ -112,10 +116,10 @@ class Instances:
|
|||
def __instance_from_id(self, _id) -> Instance:
|
||||
instances: list[Instance] = self.get_instances()
|
||||
for instance in instances:
|
||||
if instance._id == _id:
|
||||
if instance.id == _id:
|
||||
return instance
|
||||
|
||||
raise Exception(f"Can't find instance with id {_id}")
|
||||
raise ValueError(f"Can't find instance with _id {_id}")
|
||||
|
||||
def get_instances(self) -> list[Instance]:
|
||||
instances = []
|
||||
|
@ -129,25 +133,22 @@ class Instances:
|
|||
for x in [env.split("=") for env in instance.attrs["Config"]["Env"]]
|
||||
}
|
||||
|
||||
apiCaller = ApiCaller()
|
||||
apiCaller._set_apis(
|
||||
[
|
||||
API(
|
||||
f"http://{instance.name}:{env_variables.get('API_HTTP_PORT', '5000')}",
|
||||
env_variables.get("API_SERVER_NAME", "bwapi"),
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
instances.append(
|
||||
Instance(
|
||||
instance.id,
|
||||
instance._id,
|
||||
instance.name,
|
||||
instance.name,
|
||||
"container",
|
||||
"up" if instance.status == "running" else "down",
|
||||
instance,
|
||||
apiCaller,
|
||||
ApiCaller(
|
||||
[
|
||||
API(
|
||||
f"http://{instance.name}:{env_variables.get('API_HTTP_PORT', '5000')}",
|
||||
env_variables.get("API_SERVER_NAME", "bwapi"),
|
||||
)
|
||||
]
|
||||
),
|
||||
)
|
||||
)
|
||||
elif self.__integration == "Swarm":
|
||||
|
@ -160,7 +161,7 @@ class Instances:
|
|||
if desired_tasks > 0 and (desired_tasks == running_tasks):
|
||||
status = "up"
|
||||
|
||||
apis = []
|
||||
apiCaller = ApiCaller()
|
||||
api_http_port = None
|
||||
api_server_name = None
|
||||
|
||||
|
@ -173,17 +174,16 @@ class Instances:
|
|||
api_server_name = var.replace("API_SERVER_NAME=", "", 1)
|
||||
|
||||
for task in instance.tasks():
|
||||
apis.append(
|
||||
apiCaller.append(
|
||||
API(
|
||||
f"http://{instance.name}.{task['NodeID']}.{task['ID']}:{api_http_port or '5000'}",
|
||||
host=api_server_name or "bwapi",
|
||||
)
|
||||
)
|
||||
apiCaller = ApiCaller(apis=apis)
|
||||
|
||||
instances.append(
|
||||
Instance(
|
||||
instance.id,
|
||||
instance._id,
|
||||
instance.name,
|
||||
instance.name,
|
||||
"service",
|
||||
|
@ -204,15 +204,6 @@ class Instances:
|
|||
env.name: env.value or "" for env in pod.spec.containers[0].env
|
||||
}
|
||||
|
||||
apiCaller = ApiCaller(
|
||||
apis=[
|
||||
API(
|
||||
f"http://{pod.status.pod_ip}:{env_variables.get('API_HTTP_PORT', '5000')}",
|
||||
host=env_variables.get("API_SERVER_NAME", "bwapi"),
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
status = "up"
|
||||
if pod.status.conditions is not None:
|
||||
for condition in pod.status.conditions:
|
||||
|
@ -228,7 +219,16 @@ class Instances:
|
|||
"pod",
|
||||
status,
|
||||
pod,
|
||||
apiCaller,
|
||||
ApiCaller(
|
||||
[
|
||||
API(
|
||||
f"http://{pod.status.pod_ip}:{env_variables.get('API_HTTP_PORT', '5000')}",
|
||||
host=env_variables.get(
|
||||
"API_SERVER_NAME", "bwapi"
|
||||
),
|
||||
)
|
||||
]
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -239,18 +239,9 @@ class Instances:
|
|||
|
||||
# Local instance
|
||||
if Path(sep, "usr", "sbin", "nginx").exists():
|
||||
apiCaller = ApiCaller()
|
||||
env_variables = dotenv_values(
|
||||
join(sep, "etc", "bunkerweb", "variables.env")
|
||||
)
|
||||
apiCaller._set_apis(
|
||||
[
|
||||
API(
|
||||
f"http://127.0.0.1:{env_variables.get('API_HTTP_PORT', '5000')}",
|
||||
env_variables.get("API_SERVER_NAME", "bwapi"),
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
instances.insert(
|
||||
0,
|
||||
|
@ -263,7 +254,14 @@ class Instances:
|
|||
if Path(sep, "var", "tmp", "bunkerweb", "nginx.pid").exists()
|
||||
else "down",
|
||||
None,
|
||||
apiCaller,
|
||||
ApiCaller(
|
||||
[
|
||||
API(
|
||||
f"http://127.0.0.1:{env_variables.get('API_HTTP_PORT', '5000')}",
|
||||
env_variables.get("API_SERVER_NAME", "bwapi"),
|
||||
)
|
||||
]
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
|
@ -282,10 +280,10 @@ class Instances:
|
|||
return not_reloaded or "Successfully reloaded instances"
|
||||
|
||||
def reload_instance(
|
||||
self, id: Optional[int] = None, instance: Optional[Instance] = None
|
||||
self, _id: Optional[int] = None, instance: Optional[Instance] = None
|
||||
) -> str:
|
||||
if instance is None:
|
||||
instance = self.__instance_from_id(id)
|
||||
instance = self.__instance_from_id(_id)
|
||||
|
||||
result = instance.reload()
|
||||
|
||||
|
@ -294,8 +292,8 @@ class Instances:
|
|||
|
||||
return f"Can't reload {instance.name}"
|
||||
|
||||
def start_instance(self, id) -> str:
|
||||
instance = self.__instance_from_id(id)
|
||||
def start_instance(self, _id) -> str:
|
||||
instance = self.__instance_from_id(_id)
|
||||
|
||||
result = instance.start()
|
||||
|
||||
|
@ -304,8 +302,8 @@ class Instances:
|
|||
|
||||
return f"Can't start {instance.name}"
|
||||
|
||||
def stop_instance(self, id) -> str:
|
||||
instance = self.__instance_from_id(id)
|
||||
def stop_instance(self, _id) -> str:
|
||||
instance = self.__instance_from_id(_id)
|
||||
|
||||
result = instance.stop()
|
||||
|
||||
|
@ -314,8 +312,8 @@ class Instances:
|
|||
|
||||
return f"Can't stop {instance.name}"
|
||||
|
||||
def restart_instance(self, id) -> str:
|
||||
instance = self.__instance_from_id(id)
|
||||
def restart_instance(self, _id) -> str:
|
||||
instance = self.__instance_from_id(_id)
|
||||
|
||||
result = instance.restart()
|
||||
|
||||
|
|
Loading…
Reference in New Issue