Optimize the scheduler and gen even more (we love threads)

This commit is contained in:
Théophile Diot 2023-06-21 15:33:23 -04:00
parent 0661916ffc
commit b007916d6f
No known key found for this signature in database
GPG Key ID: E752C80DB72BB014
4 changed files with 161 additions and 112 deletions

View File

@ -11,7 +11,7 @@ from os.path import basename, dirname, join
from pathlib import Path
from re import compile as re_compile
from sys import _getframe, path as sys_path
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Union
from time import sleep
from traceback import format_exc
@ -647,7 +647,7 @@ class Database:
)
)
config.pop("SERVER_NAME")
config.pop("SERVER_NAME", None)
for key, value in config.items():
suffix = 0
@ -705,7 +705,7 @@ class Database:
if metadata is not None:
if not metadata.first_config_saved:
metadata.first_config_saved = True
metadata.config_changed = bool(to_put)
metadata.config_changed = True
try:
session.add_all(to_put)

View File

@ -5,12 +5,13 @@ from hashlib import sha256
from io import BytesIO
from json import loads
from logging import Logger
from os import listdir, sep
from os import cpu_count, listdir, sep
from os.path import basename, dirname, join
from pathlib import Path
from re import compile as re_compile, search as re_search
from sys import path as sys_path
from tarfile import open as tar_open
from threading import Lock, Semaphore, Thread
from traceback import format_exc
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
@ -28,16 +29,20 @@ class Configurator:
logger: Logger,
):
self.__logger = logger
self.__thread_lock = Lock()
self.__semaphore = Semaphore(cpu_count() or 1)
self.__plugin_id_rx = re_compile(r"^[\w.-]{1,64}$")
self.__plugin_version_rx = re_compile(r"^\d+\.\d+(\.\d+)?$")
self.__setting_id_rx = re_compile(r"^[A-Z0-9_]{1,256}$")
self.__name_rx = re_compile(r"^[\w.-]{1,128}$")
self.__job_file_rx = re_compile(r"^[\w./-]{1,256}$")
self.__settings = self.__load_settings(settings)
self.__core_plugins = self.__load_plugins(core)
self.__core_plugins = []
self.__load_plugins(core)
if isinstance(external_plugins, str):
self.__external_plugins = self.__load_plugins(external_plugins, "external")
self.__external_plugins = []
self.__load_plugins(external_plugins, "external")
else:
self.__external_plugins = external_plugins
@ -103,50 +108,61 @@ class Configurator:
def __load_settings(self, path: str) -> Dict[str, Any]:
return loads(Path(path).read_text())
def __load_plugins(self, path: str, _type: str = "core") -> List[Dict[str, Any]]:
plugins = []
files = glob(join(path, "*", "plugin.json"))
for file in files:
try:
data = self.__load_settings(file)
def __load_plugins(self, path: str, _type: str = "core"):
threads = []
for file in glob(join(path, "*", "plugin.json")):
thread = Thread(target=self.__load_plugin, args=(file, _type))
thread.start()
threads.append(thread)
resp, msg = self.__validate_plugin(data)
if not resp:
self.__logger.warning(
f"Ignoring plugin {file} : {msg}",
for thread in threads:
thread.join()
def __load_plugin(self, file: str, _type: str = "core"):
self.__semaphore.acquire(timeout=60)
try:
data = self.__load_settings(file)
resp, msg = self.__validate_plugin(data)
if not resp:
self.__logger.warning(
f"Ignoring plugin {file} : {msg}",
)
return
if _type == "external":
plugin_content = BytesIO()
with tar_open(
fileobj=plugin_content, mode="w:gz", compresslevel=9
) as tar:
tar.add(
dirname(file),
arcname=basename(dirname(file)),
recursive=True,
)
continue
plugin_content.seek(0, 0)
value = plugin_content.getvalue()
if _type == "external":
plugin_content = BytesIO()
with tar_open(
fileobj=plugin_content, mode="w:gz", compresslevel=9
) as tar:
tar.add(
dirname(file),
arcname=basename(dirname(file)),
recursive=True,
)
plugin_content.seek(0, 0)
value = plugin_content.getvalue()
data.update(
{
"external": True,
"page": "ui" in listdir(dirname(file)),
"method": "manual",
"data": value,
"checksum": sha256(value).hexdigest(),
}
)
plugins.append(data)
except:
self.__logger.error(
f"Exception while loading JSON from {file} : {format_exc()}",
data.update(
{
"external": True,
"page": "ui" in listdir(dirname(file)),
"method": "manual",
"data": value,
"checksum": sha256(value).hexdigest(),
}
)
return plugins
with self.__thread_lock:
self.__external_plugins.append(data)
else:
with self.__thread_lock:
self.__core_plugins.append(data)
except:
self.__logger.error(
f"Exception while loading JSON from {file} : {format_exc()}",
)
self.__semaphore.release()
def __load_variables(self, path: str) -> Dict[str, Any]:
variables = {}

View File

@ -90,12 +90,12 @@ class JobScheduler(ApiCaller):
for x, job in enumerate(deepcopy(plugin_jobs)):
if not all(
key in job.keys()
for key in [
for key in (
"name",
"file",
"every",
"reload",
]
)
):
self.__logger.warning(
f"missing keys for job {job['name']} in plugin {plugin_name}, must have name, file, every and reload, ignoring job"
@ -115,7 +115,7 @@ class JobScheduler(ApiCaller):
)
plugin_jobs.pop(x)
continue
elif job["every"] not in ["once", "minute", "hour", "day", "week"]:
elif job["every"] not in ("once", "minute", "hour", "day", "week"):
self.__logger.warning(
f"Invalid every for job {job['name']} in plugin {plugin_name} (Must be once, minute, hour, day or week), ignoring job"
)
@ -208,6 +208,11 @@ class JobScheduler(ApiCaller):
with self.__thread_lock:
self.__job_success = False
Thread(target=self.__update_job, args=(plugin, name, success)).start()
return ret
def __update_job(self, plugin: str, name: str, success: bool):
with self.__thread_lock:
err = self.__db.update_job(plugin, name, success)
@ -219,7 +224,6 @@ class JobScheduler(ApiCaller):
self.__logger.warning(
f"Failed to update database for the job {name} from plugin {plugin}: {err}",
)
return ret
def setup(self):
for plugin, jobs in self.__jobs.items():

View File

@ -23,6 +23,7 @@ from stat import S_IEXEC
from subprocess import run as subprocess_run, DEVNULL, STDOUT
from sys import path as sys_path
from tarfile import open as tar_open
from threading import Thread
from time import sleep
from traceback import format_exc
from typing import Any, Dict, List, Optional, Union
@ -172,8 +173,7 @@ def generate_external_plugins(
def dict_to_frozenset(d):
if isinstance(d, dict):
return frozenset((k, dict_to_frozenset(v)) for k, v in d.items())
else:
return d
return d
if __name__ == "__main__":
@ -351,7 +351,11 @@ if __name__ == "__main__":
)
if (scheduler_first_start and db_configs) or changes:
generate_custom_configs(db.get_custom_configs(), original_path=configs_path)
Thread(
target=generate_custom_configs,
args=(db.get_custom_configs(),),
kwargs={"original_path": configs_path},
).start()
del custom_configs, db_configs
@ -404,6 +408,7 @@ if __name__ == "__main__":
db.get_plugins(external=True, with_data=True),
original_path=plugins_dir,
)
SCHEDULER.update_jobs()
del tmp_external_plugins, external_plugins, db_plugins
@ -437,7 +442,33 @@ if __name__ == "__main__":
FIRST_RUN = True
CHANGES = []
threads = []
def send_nginx_configs():
logger.info(f"Sending {join(sep, 'etc', 'nginx')} folder ...")
ret = SCHEDULER.send_files(join(sep, "etc", "nginx"), "/confs")
if not ret:
logger.error(
"Sending nginx configs failed, configuration will not work as expected...",
)
def send_nginx_cache():
logger.info(f"Sending {CACHE_PATH} folder ...")
if not SCHEDULER.send_files(CACHE_PATH, "/cache"):
logger.error(f"Error while sending {CACHE_PATH} folder")
else:
logger.info(f"Successfully sent {CACHE_PATH} folder")
while True:
threads.clear()
ret = db.checked_changes(CHANGES)
if ret:
logger.error(
f"An error occurred when setting the changes to checked in the database : {ret}"
)
stop(1)
# Update the environment variables of the scheduler
SCHEDULER.env = env.copy() | environ.copy()
SCHEDULER.setup()
@ -448,37 +479,6 @@ if __name__ == "__main__":
else:
logger.info("All jobs in run_once() were successful")
changes = db.check_changes()
if isinstance(changes, str):
logger.error(
f"An error occurred when checking for changes in the database : {changes}"
)
stop(1)
# check if the plugins have changed since last time
if changes["external_plugins_changed"]:
# run the config saver to save potential plugins settings
proc = subprocess_run(
[
"python",
join(sep, "usr", "share", "bunkerweb", "gen", "save_config.py"),
"--settings",
join(sep, "usr", "share", "bunkerweb", "settings.json"),
],
stdin=DEVNULL,
stderr=STDOUT,
check=False,
)
if proc.returncode != 0:
logger.error(
"Config saver failed, configuration will not work as expected...",
)
CHANGES = CHANGES or ["config", "custom_configs"]
if "external_plugins_changed" in CHANGES:
CHANGES.remove("external_plugins_changed")
if GENERATE:
# run the generator
proc = subprocess_run(
@ -511,21 +511,19 @@ if __name__ == "__main__":
if SCHEDULER.apis:
# send nginx configs
logger.info(f"Sending {join(sep, 'etc', 'nginx')} folder ...")
ret = SCHEDULER.send_files(join(sep, "etc", "nginx"), "/confs")
if not ret:
logger.error(
"Sending nginx configs failed, configuration will not work as expected...",
)
thread = Thread(target=send_nginx_configs)
thread.start()
threads.append(thread)
try:
if SCHEDULER.apis:
# send cache
logger.info(f"Sending {CACHE_PATH} folder ...")
if not SCHEDULER.send_files(CACHE_PATH, "/cache"):
logger.error(f"Error while sending {CACHE_PATH} folder")
else:
logger.info(f"Successfully sent {CACHE_PATH} folder")
thread = Thread(target=send_nginx_cache)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
if SCHEDULER.send_to_apis("POST", "/reload"):
logger.info("Successfully reloaded nginx")
@ -586,15 +584,6 @@ if __name__ == "__main__":
CONFIG_NEED_GENERATION = False
CONFIGS_NEED_GENERATION = False
PLUGINS_NEED_GENERATION = False
FIRST_RUN = False
ret = db.checked_changes(CHANGES)
if ret:
logger.error(
f"An error occurred when setting the changes to checked in the database : {ret}"
)
stop(1)
# infinite schedule for the jobs
logger.info("Executing job scheduler ...")
@ -613,32 +602,72 @@ if __name__ == "__main__":
)
stop(1)
# check if the plugins have changed since last time
if changes["external_plugins_changed"]:
logger.info("External plugins changed, generating ...")
if FIRST_RUN:
# run the config saver to save potential ignored external plugins settings
logger.info(
"Running config saver to save potential ignored external plugins settings ..."
)
proc = subprocess_run(
[
"python",
join(
sep,
"usr",
"share",
"bunkerweb",
"gen",
"save_config.py",
),
"--settings",
join(sep, "usr", "share", "bunkerweb", "settings.json"),
],
stdin=DEVNULL,
stderr=STDOUT,
check=False,
)
if proc.returncode != 0:
logger.error(
"Config saver failed, configuration will not work as expected...",
)
changes.update(
{
"custom_configs_changed": True,
"config_changed": True,
}
)
PLUGINS_NEED_GENERATION = True
NEED_RELOAD = True
# check if the custom configs have changed since last time
if changes["custom_configs_changed"]:
logger.info("Custom configs changed, generating ...")
CONFIGS_NEED_GENERATION = True
NEED_RELOAD = True
# check if the plugins have changed since last time
if changes["external_plugins_changed"]:
logger.info("External plugins changed, generating ...")
PLUGINS_NEED_GENERATION = True
NEED_RELOAD = True
# check if the config have changed since last time
if changes["config_changed"]:
logger.info("Config changed, generating ...")
CONFIG_NEED_GENERATION = True
NEED_RELOAD = True
FIRST_RUN = False
if NEED_RELOAD:
CHANGES.clear()
if CONFIGS_NEED_GENERATION:
CHANGES.append("custom_configs")
generate_custom_configs(
db.get_custom_configs(), original_path=configs_path
)
Thread(
target=generate_custom_configs,
args=(db.get_custom_configs(),),
kwargs={"original_path": configs_path},
).start()
if PLUGINS_NEED_GENERATION:
CHANGES.append("external_plugins")