Centralize Database and optimize requests

This commit is contained in:
TheophileDiot 2022-10-28 12:01:05 +02:00
parent 7a03ed33f1
commit 66fb266f8e
10 changed files with 554 additions and 393 deletions

View File

@ -1,22 +1,13 @@
from os import getenv
from time import sleep
from traceback import format_exc
from subprocess import run, DEVNULL, STDOUT
from glob import glob
from shutil import rmtree
from os import getenv, makedirs, remove, listdir
from os.path import dirname, isdir
from typing import Tuple
from API import API
from ApiCaller import ApiCaller
from ConfigCaller import ConfigCaller
from Database import Database
from logger import setup_logger
class Config(ApiCaller, ConfigCaller):
class Config(ConfigCaller):
def __init__(self, ctrl_type, lock=None):
ApiCaller.__init__(self)
ConfigCaller.__init__(self)
self.__ctrl_type = ctrl_type
self.__lock = lock
@ -54,83 +45,10 @@ class Config(ApiCaller, ConfigCaller):
return True
return False
def __get_apis(self) -> list:
apis = []
for instance in self.__instances:
endpoint = f"http://{instance['hostname']}:{instance['env'].get('API_HTTP_PORT', '5000')}"
host = instance["env"].get("API_SERVER_NAME", "bwapi")
apis.append(API(endpoint, host=host))
return apis
def __write_configs(self) -> Tuple[bool, list]:
ret = True
custom_configs = []
for config_type in self.__configs:
for file, data in self.__configs[config_type].items():
path = f"/data/configs/{config_type}/{file}"
if not path.endswith(".conf"):
path += ".conf"
makedirs(dirname(path), exist_ok=True)
try:
mode = "w"
if type(data) is bytes:
mode = "wb"
with open(path, mode) as f:
f.write(data)
exploded = file.split("/")
custom_configs.append(
{
"value": data if mode == "w" else data.decode("utf-8"),
"exploded": [exploded[0], config_type, exploded[1]],
}
)
except:
print(format_exc())
self.__logger.error(f"Can't save file {path}")
ret = False
return ret, custom_configs
def __remove_configs(self) -> bool:
ret = True
for config_type in self.__configs:
for file, _ in self.__configs[config_type].items():
path = f"/data/configs/{config_type}/{file}"
if not path.endswith(".conf"):
path += ".conf"
try:
remove(path)
except:
print(format_exc())
self.__logger.error(f"Can't remove file {path}")
ret = False
check_empty_dirs = []
for _type in ["server-http", "modsec", "modsec-crs"]:
check_empty_dirs.extend(glob(f"/data/configs/{type}/*"))
for check_empty_dir in check_empty_dirs:
if isdir(check_empty_dir) and len(listdir(check_empty_dir)) == 0:
try:
rmtree(check_empty_dir)
except:
print(format_exc())
self.__logger.error(f"Can't remove directory {check_empty_dir}")
ret = False
return ret
def apply(self, instances, services, configs=None) -> bool:
success = True
# remove old autoconf configs if it exists
if self.__configs:
ret = self.__remove_configs()
if not ret:
success = False
self.__logger.error(
"removing custom configs failed, configuration will not work as expected...",
)
# update values
self.__instances = instances
self.__services = services
@ -139,7 +57,11 @@ class Config(ApiCaller, ConfigCaller):
if self.__db is None:
self.__db = Database(
self.__logger, sqlalchemy_string=self.__config.get("DATABASE_URI", None)
self.__logger,
sqlalchemy_string=self.__config.get("DATABASE_URI", None),
bw_integration="Kubernetes"
if self.__config.get("KUBERNETES_MODE", "no") == "yes"
else "Cluster",
)
while not self.__db.is_initialized():
@ -148,84 +70,33 @@ class Config(ApiCaller, ConfigCaller):
)
sleep(5)
self._set_apis(self.__get_apis())
# write configs
if configs != None:
ret = self.__db.save_config(self.__config, "autoconf")
if ret:
self.__logger.error(
f"Can't save autoconf config in database: {ret}",
)
ret, custom_configs = self.__write_configs()
if not ret:
success = False
self.__logger.error(
"saving custom configs failed, configuration will not work as expected...",
)
ret = self.__db.save_custom_configs(custom_configs, "autoconf")
if ret:
self.__logger.error(
f"Can't save autoconf custom configs in database: {ret}",
)
else:
ret = self.__db.save_config({}, "autoconf")
if ret:
self.__logger.error(
f"Can't remove autoconf config from the database: {ret}",
)
# get env
env = self.__get_full_env()
# run jobs once
i = 1
for instance in self.__instances:
endpoint = f"http://{instance['hostname']}:{instance['env'].get('API_HTTP_PORT', '5000')}"
host = instance["env"].get("API_SERVER_NAME", "bwapi")
env[f"CLUSTER_INSTANCE_{i}"] = f"{endpoint} {host}"
i += 1
# write config to /tmp/variables.env
with open("/tmp/variables.env", "w") as f:
for variable, value in self.__config.items():
f.write(f"{variable}={value}\n")
# run the generator
cmd = f"python /opt/bunkerweb/gen/main.py --settings /opt/bunkerweb/settings.json --templates /opt/bunkerweb/confs --output /etc/nginx --variables /tmp/variables.env --method autoconf"
proc = run(cmd.split(" "), stdin=DEVNULL, stderr=STDOUT)
if proc.returncode != 0:
success = False
# save config to database
ret = self.__db.save_config(self.__config, "autoconf")
if ret:
self.__logger.error(
"config generator failed, configuration will not work as expected...",
f"Can't save autoconf config in database: {ret}",
)
# cmd = "chown -R root:101 /etc/nginx"
# run(cmd.split(" "), stdin=DEVNULL, stdout=DEVNULL, stderr=STDOUT)
# cmd = "chmod -R 770 /etc/nginx"
# run(cmd.split(" "), stdin=DEVNULL, stdout=DEVNULL, stderr=STDOUT)
# send nginx configs
ret = self._send_files("/etc/nginx", "/confs")
if not ret:
success = False
custom_configs = []
for config_type in self.__configs:
for file, data in self.__configs[config_type].items():
exploded = file.split("/")
custom_configs.append(
{
"value": data,
"exploded": [
exploded[0],
config_type,
exploded[1].replace(".conf", ""),
],
}
)
# save custom configs to database
ret = self.__db.save_custom_configs(custom_configs, "autoconf")
if ret:
self.__logger.error(
"sending nginx configs failed, configuration will not work as expected...",
)
# send data/configs folder
ret = self._send_files("/data/configs", "/custom_configs")
if not ret:
success = False
self.__logger.error(
"sending custom configs failed, configuration will not work as expected...",
)
# reload nginx
ret = self._send_to_apis("POST", "/reload")
if not ret:
success = False
self.__logger.error(
"reload failed, configuration will not work as expected...",
f"Can't save autoconf custom configs in database: {ret}",
)
return success

View File

@ -1,10 +1,9 @@
#!/usr/bin/python3
from os import _exit, environ, getenv
from os import _exit, getenv
from signal import SIGINT, SIGTERM, signal
from subprocess import DEVNULL, STDOUT, run
from sys import exit as sys_exit, path as sys_path
from time import sleep
from traceback import format_exc
sys_path.append("/opt/bunkerweb/deps/python")

View File

@ -12,6 +12,7 @@ from subprocess import DEVNULL, STDOUT, run
from sys import exit as sys_exit, path as sys_path
from time import sleep
from traceback import format_exc
from typing import Any
sys_path.append("/opt/bunkerweb/deps/python")
@ -20,6 +21,7 @@ sys_path.append("/opt/bunkerweb/api")
sys_path.append("/opt/bunkerweb/db")
from docker import DockerClient
from kubernetes import client as kube_client
from logger import setup_logger
from Database import Database
@ -161,6 +163,10 @@ if __name__ == "__main__":
for plugin in core_plugins[order]:
core_settings.update(plugin["settings"])
if exists("/opt/bunkerweb/INTEGRATION"):
with open("/opt/bunkerweb/INTEGRATION", "r") as f:
integration = f.read().strip()
if args.variables or args.init:
# Compute the config
logger.info("Computing config ...")
@ -259,23 +265,25 @@ if __name__ == "__main__":
sys_exit(0)
config = config_files
elif args.method != "autoconf":
config = db.get_config()
elif integration == "Docker":
bw_integration = "Cluster"
docker_client = DockerClient(
base_url=getenv("DOCKER_HOST", "unix:///var/run/docker.sock")
)
tmp_config = {}
custom_confs = []
apis = []
db = None
for instance in docker_client.containers.list(
filters={"label": "bunkerweb.INSTANCE"}
):
api = None
def get_instance_configs_and_apis(instance: Any, db, _type="Docker"):
api_http_port = None
api_server_name = None
tmp_config = {}
custom_confs = []
apis = []
for var in instance.attrs["Config"]["Env"]:
for var in (
instance.attrs["Config"]["Env"]
if _type == "Docker"
else instance.attrs["Spec"]["TaskTemplate"]["ContainerSpec"]["Env"]
):
splitted = var.split("=", 1)
if custom_confs_rx.match(splitted[0]):
custom_confs.append(
@ -289,20 +297,60 @@ if __name__ == "__main__":
else:
tmp_config[splitted[0]] = splitted[1]
if splitted[0] == "DATABASE_URI":
if db is None and splitted[0] == "DATABASE_URI":
db = Database(
logger,
sqlalchemy_string=splitted[1],
)
elif splitted[0] == "API_HTTP_PORT":
api = API(f"http://{instance.name}:{splitted[1]}")
api_http_port = splitted[1]
elif splitted[0] == "API_SERVER_NAME":
api_server_name = splitted[1]
if api:
apis.append(api)
else:
apis.append(
API(f"http://{instance.name}:{getenv('API_HTTP_PORT', '5000')}")
apis.append(
API(
f"http://{instance.name}:{api_http_port or getenv('API_HTTP_PORT', '5000')}",
host=api_server_name or getenv("API_SERVER_NAME", "bwapi"),
)
)
return tmp_config, custom_confs, apis, db
tmp_config = {}
custom_confs = []
apis = []
db = None
for instance in docker_client.containers.list(
filters={"label": "bunkerweb.INSTANCE"}
):
conf, cstm_confs, tmp_apis, tmp_db = get_instance_configs_and_apis(
instance, db
)
tmp_config.update(conf)
custom_confs.extend(cstm_confs)
apis.extend(tmp_apis)
if db is None:
db = tmp_db
is_swarm = True
try:
docker_client.swarm.version
except:
is_swarm = False
if is_swarm:
for instance in docker_client.services.list(
filters={"label": "bunkerweb.INSTANCE"}
):
conf, cstm_confs, tmp_apis, tmp_db = get_instance_configs_and_apis(
instance, db, "Swarm"
)
tmp_config.update(conf)
custom_confs.extend(cstm_confs)
apis.extend(tmp_apis)
if db is None:
db = tmp_db
if db is None:
db = Database(logger)
@ -338,7 +386,125 @@ if __name__ == "__main__":
else:
logger.info("Config successfully saved to database")
config = config_files
config = db.get_config()
elif integration == "Kubernetes":
bw_integration = "Kubernetes"
corev1 = kube_client.CoreV1Api()
tmp_config = {}
apis = []
db = None
for pod in corev1.list_pod_for_all_namespaces(watch=False).items:
if (
pod.metadata.annotations != None
and "bunkerweb.io/INSTANCE" in pod.metadata.annotations
):
api_http_port = None
api_server_name = None
for pod_env in pod.spec.containers[0].env:
tmp_config[pod_env.name] = pod_env.value
if db is None and pod_env.name == "DATABASE_URI":
db = Database(
logger,
sqlalchemy_string=pod_env.value,
)
elif pod_env.name == "API_HTTP_PORT":
api_http_port = pod_env.value
elif pod_env.name == "API_SERVER_NAME":
api_server_name = pod_env.value
apis.append(
API(
f"http://{pod.status.pod_ip}:{api_http_port or getenv('API_HTTP_PORT', '5000')}",
host=api_server_name or getenv("API_SERVER_NAME", "bwapi"),
)
)
if db is None:
db = Database(logger)
api_caller = ApiCaller(apis=apis)
# Compute the config
logger.info("Computing config ...")
config = Configurator(
args.settings, core_settings, args.plugins, tmp_config, logger
)
config_files = config.get_config()
if config_files.get("LOG_LEVEL", logger.level) != logger.level:
logger = setup_logger("Generator", config_files["LOG_LEVEL"])
err = db.save_config(config_files, args.method)
if not err:
supported_config_types = [
"http",
"stream",
"server-http",
"server-stream",
"default-server-http",
"modsec",
"modsec-crs",
]
custom_confs = []
for configmap in corev1.list_config_map_for_all_namespaces(
watch=False
).items:
if (
configmap.metadata.annotations is None
or "bunkerweb.io/CONFIG_TYPE"
not in configmap.metadata.annotations
):
continue
config_type = configmap.metadata.annotations[
"bunkerweb.io/CONFIG_TYPE"
]
if config_type not in supported_config_types:
logger.warning(
f"Ignoring unsupported CONFIG_TYPE {config_type} for ConfigMap {configmap.metadata.name}",
)
continue
elif not configmap.data:
logger.warning(
f"Ignoring blank ConfigMap {configmap.metadata.name}",
)
continue
config_site = ""
if "bunkerweb.io/CONFIG_SITE" in configmap.metadata.annotations:
config_site = f"{configmap.metadata.annotations['bunkerweb.io/CONFIG_SITE']}/"
for config_name, config_data in configmap.data.items():
custom_confs.append(
{
"value": config_data,
"exploded": (config_site, config_type, config_name),
}
)
err1 = db.save_custom_configs(custom_confs, args.method)
else:
err = None
err1 = None
with open("/opt/bunkerweb/VERSION", "r") as f:
bw_version = f.read().strip()
if err or err1:
logger.error(
f"Can't save config to database : {err or err1}",
)
sys_exit(1)
else:
logger.info("Config successfully saved to database")
config = db.get_config()
else:
db = Database(
logger,
@ -360,7 +526,7 @@ if __name__ == "__main__":
logger = setup_logger("Generator", config.get("LOG_LEVEL", "INFO"))
if args.method != "autoconf" and bw_integration == "Cluster":
if integration == "Docker":
while not api_caller._send_to_apis("GET", "/ping"):
logger.warning(
"Waiting for BunkerWeb's temporary nginx to start, retrying in 5 seconds ...",
@ -392,22 +558,6 @@ if __name__ == "__main__":
elif isdir(file):
rmtree(file, ignore_errors=False)
if args.method != "autoconf":
logger.info(
"Generating custom configs from Database ...",
)
custom_configs = db.get_custom_configs()
original_path = "/data/configs"
makedirs(original_path, exist_ok=True)
for custom_config in custom_configs:
tmp_path = f"{original_path}/{custom_config['type'].replace('_', '-')}"
if custom_config["service_id"]:
tmp_path += f"/{custom_config['service_id']}"
tmp_path += f"/{custom_config['name']}.conf"
makedirs(dirname(tmp_path), exist_ok=True)
with open(tmp_path, "w") as f:
f.write(custom_config["data"])
# Render the templates
logger.info("Rendering templates ...")
templator = Templator(
@ -420,7 +570,7 @@ if __name__ == "__main__":
)
templator.render()
if args.method != "autoconf" and bw_integration == "Cluster":
if integration == "Docker":
ret = api_caller._send_to_apis("POST", "/reload")
if not ret:
logger.error(

View File

@ -65,6 +65,26 @@ class Database:
if sqlalchemy_string:
break
is_swarm = True
try:
docker_client.swarm.version
except:
is_swarm = False
if not sqlalchemy_string and is_swarm:
for instance in docker_client.services.list(
filters={"label": "bunkerweb.INSTANCE"}
):
for var in instance.attrs["Spec"]["TaskTemplate"][
"ContainerSpec"
]["Env"]:
if var.startswith("DATABASE_URI="):
sqlalchemy_string = var.replace("DATABASE_URI=", "", 1)
break
if sqlalchemy_string:
break
if not sqlalchemy_string:
sqlalchemy_string = getenv("DATABASE_URI", "sqlite:////data/db.sqlite3")
@ -131,7 +151,12 @@ class Database:
"""Check if the first configuration has been saved"""
with self.__db_session() as session:
try:
metadata = session.query(Metadata).get(1)
metadata = (
session.query(Metadata)
.with_entities(Metadata.first_config_saved)
.filter_by(id=1)
.first()
)
return metadata is not None and metadata.first_config_saved
except (ProgrammingError, OperationalError):
return False
@ -140,7 +165,12 @@ class Database:
"""Check if the database is initialized"""
with self.__db_session() as session:
try:
metadata = session.query(Metadata).get(1)
metadata = (
session.query(Metadata)
.with_entities(Metadata.is_initialized)
.filter_by(id=1)
.first()
)
return metadata is not None and metadata.is_initialized
except (ProgrammingError, OperationalError):
return False
@ -268,6 +298,14 @@ class Database:
if config["MULTISITE"] == "yes":
global_values = []
for server_name in config["SERVER_NAME"].split(" "):
if (
session.query(Services)
.with_entities(Services.id)
.filter_by(id=server_name)
.first()
):
continue
if server_name:
to_put.append(Services(id=server_name, method=method))
@ -290,7 +328,10 @@ class Database:
)
elif key not in global_values:
setting = (
session.query(Settings).filter_by(id=key).first()
session.query(Settings)
.with_entities(Settings.default)
.filter_by(id=key)
.first()
)
if setting and value != setting.default:
@ -347,14 +388,15 @@ class Database:
to_put = []
for custom_config in custom_configs:
config = {
"data": custom_config["value"]
.replace("\\\n", "\n")
.encode("utf-8"),
"data": custom_config["value"].replace("\\\n", "\n").encode("utf-8")
if isinstance(custom_config["value"], str)
else custom_config["value"].replace(b"\\\n", b"\n"),
"method": method,
}
if custom_config["exploded"][0]:
if (
not session.query(Services)
.with_entities(Services.id)
.filter_by(id=custom_config["exploded"][0])
.first()
):
@ -388,78 +430,78 @@ class Database:
return ""
def __get_setting_value(
self,
session: scoped_session,
service: Any,
setting: Any,
suffix: int,
) -> Optional[dict]:
tmp_config = {}
global_value = (
session.query(Global_values)
.filter_by(setting_id=setting.id, suffix=suffix)
.first()
)
if global_value is None:
if suffix:
if setting.context != "multisite":
return None
tmp_config[f"{setting.id}_{suffix}"] = setting.default
else:
tmp_config[setting.id] = setting.default
else:
tmp_config[
setting.id + (f"_{suffix}" if suffix else "")
] = global_value.value
if setting.context == "multisite":
try:
tmp_config[
f"{service.id}_{setting.id}" + (f"_{suffix}" if suffix else "")
] = next(
s
for s in service.settings
if s.setting_id == setting.id and s.suffix == suffix
).value
except StopIteration:
if global_value is None and suffix:
return None
elif suffix:
tmp_config[f"{service.id}_{setting.id}_{suffix}"] = tmp_config[
f"{setting.id}_{suffix}"
]
else:
tmp_config[f"{service.id}_{setting.id}"] = tmp_config[setting.id]
return tmp_config
def get_config(self) -> Dict[str, Any]:
"""Get the config from the database"""
with self.__db_session() as session:
config = {}
settings = session.query(Settings).all()
for service in session.query(Services).all():
for setting in settings:
if setting.multiple:
i = 0
while True:
tmp_config = self.__get_setting_value(
session, service, setting, i
)
settings = (
session.query(Settings)
.with_entities(
Settings.id, Settings.context, Settings.default, Settings.multiple
)
.all()
)
if tmp_config is None:
break
for setting in settings:
suffix = 0
while True:
global_value = (
session.query(Global_values)
.with_entities(Global_values.value)
.filter_by(setting_id=setting.id, suffix=suffix)
.first()
)
config.update(tmp_config)
i += 1
if global_value is None:
if suffix > 0:
break
else:
config[setting.id] = setting.default
else:
config.update(
self.__get_setting_value(session, service, setting, 0)
config[
setting.id + (f"_{suffix}" if suffix > 0 else "")
] = global_value.value
if not setting.multiple:
break
suffix += 1
for service in session.query(Services).with_entities(Services.id).all():
for setting in settings:
if setting.context != "multisite":
continue
suffix = 0
while True:
if suffix == 0:
config[f"{service.id}_{setting.id}"] = config[setting.id]
elif f"{setting.id}_{suffix}" in config:
config[f"{service.id}_{setting.id}_{suffix}"] = config[
f"{setting.id}_{suffix}"
]
service_setting = (
session.query(Services_settings)
.with_entities(Services_settings.value)
.filter_by(
service_id=service.id,
setting_id=setting.id,
suffix=suffix,
)
.first()
)
if service_setting is not None:
config[
f"{service.id}_{setting.id}"
+ (f"_{suffix}" if suffix > 0 else "")
] = service_setting.value
elif suffix > 0:
break
suffix += 1
return config
def get_custom_configs(self) -> List[Dict[str, Any]]:
@ -471,10 +513,30 @@ class Database:
"type": custom_config.type,
"name": custom_config.name,
"data": custom_config.data.decode("utf-8"),
"method": custom_config.method,
}
for custom_config in session.query(Custom_configs).all()
for custom_config in session.query(Custom_configs)
.with_entities(
Custom_configs.service_id,
Custom_configs.type,
Custom_configs.name,
Custom_configs.data,
Custom_configs.method,
)
.all()
]
def get_services(self) -> List[Dict[str, Any]]:
"""Get the services' configs from the database"""
services = []
with self.__db_session() as session:
for service in (
session.query(Services).with_entities(Services.settings).all()
):
services.append(service.settings)
return services
def update_job(self, plugin_id: str, job_name: str) -> str:
"""Update the job last_run in the database"""
with self.__db_session() as session:

View File

@ -16,7 +16,7 @@ from sqlalchemy.schema import UniqueConstraint
Base = declarative_base()
CONTEXTS_ENUM = Enum("global", "multisite")
SETTINGS_TYPES_ENUM = Enum("text", "check", "select")
METHODS_ENUM = Enum("ui", "scheduler", "autoconf")
METHODS_ENUM = Enum("ui", "scheduler", "autoconf", "manual")
SCHEDULES_ENUM = Enum("once", "minute", "hour", "day", "week")
CUSTOM_CONFIGS_TYPES = Enum(
"http",

View File

@ -2,7 +2,6 @@ from glob import glob
from json import loads
from logging import Logger
from os import environ
from os.path import isfile
from subprocess import DEVNULL, PIPE, STDOUT, run
from schedule import (
clear as schedule_clear,
@ -27,24 +26,17 @@ class JobScheduler(ApiCaller):
lock=None,
apis=[],
logger: Logger = setup_logger("Scheduler", environ.get("LOG_LEVEL", "INFO")),
auto: bool = False,
bw_integration: str = "Local",
):
super().__init__(apis)
if auto is True:
self.auto_setup()
self.__logger = logger
self.__bw_integration = bw_integration
self.__db = Database(
self.__logger,
sqlalchemy_string=env.get("DATABASE_URI", None),
bw_integration=bw_integration,
bw_integration=self.__bw_integration,
)
self.__env = env
with open("/tmp/autoconf.env", "w") as f:
for k, v in self.__env.items():
f.write(f"{k}={v}\n")
self.__env.update(environ)
self.__jobs = self.__get_jobs()
self.__lock = lock
@ -83,7 +75,7 @@ class JobScheduler(ApiCaller):
def __reload(self):
reload = True
if isfile("/usr/sbin/nginx") and isfile("/opt/bunkerweb/tmp/nginx.pid"):
if self.__bw_integration == "Local":
self.__logger.info("Reloading nginx ...")
proc = run(
["/usr/sbin/nginx", "-s", "reload"],
@ -107,14 +99,6 @@ class JobScheduler(ApiCaller):
self.__logger.error("Error while reloading nginx")
return reload
def __gen_conf(self):
success = True
cmd = "/opt/bunkerweb/gen/main.py --settings /opt/bunkerweb/settings.json --templates /opt/bunkerweb/confs --output /etc/nginx --variables /tmp/autoconf.env --method autoconf"
proc = run(cmd.split(" "), stdin=DEVNULL, stderr=STDOUT)
if proc.returncode != 0:
success = False
return success
def __job_wrapper(self, path, plugin, name, file):
self.__logger.info(
f"Executing job {name} from plugin {plugin} ...",
@ -215,24 +199,6 @@ class JobScheduler(ApiCaller):
f"Exception while running jobs once for plugin {plugin} : {format_exc()}",
)
if ret is False:
return False
try:
if len(self._get_apis()) > 0:
self.__logger.info("Sending /data/cache folder ...")
if not self._send_files("/data/cache", "/cache"):
ret = False
self.__logger.error("Error while sending /data/cache folder")
else:
self.__logger.info("Successfuly sent /data/cache folder")
if not self.__reload():
self.__logger.error("Can't reload BunkerWeb")
except:
ret = False
self.__logger.error(
f"Exception while reloading after running jobs once scheduling : {format_exc()}",
)
return ret
def clear(self):
@ -243,9 +209,6 @@ class JobScheduler(ApiCaller):
try:
self.__env = env
super().__init__(apis)
with open("/tmp/autoconf.env", "w") as f:
for k, v in self.__env.items():
f.write(f"{k}={v}\n")
self.clear()
self.__jobs = self.__get_jobs()
if not self.run_once():

View File

@ -44,35 +44,18 @@ if [ "$?" -ne 0 ] ; then
exit 1
fi
generate=yes
if [ -v VARIABLES_PATH ] && [ -f "/etc/nginx/variables.env" ] && grep -q "^TEMP_NGINX=no$" /etc/nginx/variables.env ; then
log "ENTRYPOINT" "⚠️ " "Looks like BunkerWeb configuration is already generated, will not generate it again"
elif [ "$SWARM_MODE" != "yes" ] && [ "$KUBERNETES_MODE" != "yes" ] && [ "$AUTOCONF_MODE" != "yes" ] ; then
# Generate configuration and send config to bunkerweb
/opt/bunkerweb/gen/main.py --method scheduler
if [ "$?" -ne 0 ] ; then
log "ENTRYPOINT" "❌" "Scheduler generator failed"
exit 1
fi
generate=no
fi
# execute jobs
log "ENTRYPOINT" " " "Executing jobs ..."
log "ENTRYPOINT" " " "Executing scheduler ..."
if [ -v VARIABLES_PATH ] ; then
/opt/bunkerweb/scheduler/main.py --variables $VARIABLES_PATH --run
/opt/bunkerweb/scheduler/main.py --variables $VARIABLES_PATH --generate $generate
else
/opt/bunkerweb/scheduler/main.py --run
fi
if [ "$?" -ne 0 ] ; then
log "ENTRYPOINT" "❌" "Scheduler failed"
exit 1
fi
log "ENTRYPOINT" " " "Executing job scheduler ..."
if [ -v VARIABLES_PATH ] ; then
/opt/bunkerweb/scheduler/main.py --variables $VARIABLES_PATH
else
/opt/bunkerweb/scheduler/main.py
/opt/bunkerweb/scheduler/main.py --generate $generate
fi
log "ENTRYPOINT" " " "Scheduler stopped"

View File

@ -2,9 +2,12 @@
from argparse import ArgumentParser
from copy import deepcopy
from os import _exit, environ, getenv, getpid, path, remove
from os.path import exists
from glob import glob
from os import _exit, getenv, getpid, makedirs, path, remove, unlink
from os.path import dirname, isdir, isfile, islink
from shutil import rmtree
from signal import SIGINT, SIGTERM, SIGUSR1, SIGUSR2, signal
from subprocess import PIPE, run as subprocess_run, DEVNULL, STDOUT
from sys import path as sys_path
from time import sleep
from traceback import format_exc
@ -19,7 +22,7 @@ from dotenv import dotenv_values
from logger import setup_logger
from Database import Database
from JobScheduler import JobScheduler
from API import API
from ApiCaller import ApiCaller
run = True
scheduler = None
@ -91,19 +94,27 @@ if __name__ == "__main__":
# Parse arguments
parser = ArgumentParser(description="Job scheduler for BunkerWeb")
parser.add_argument(
"--run", action="store_true", help="only run jobs one time in foreground"
)
parser.add_argument(
"--variables",
type=str,
help="path to the file containing environment variables",
)
parser.add_argument(
"--generate",
default="no",
type=str,
help="Precise if the configuration needs to be generated directly or not",
)
args = parser.parse_args()
logger.info("Scheduler started ...")
bw_integration = "Local"
bw_integration = (
"Local"
if not isfile("/usr/sbin/nginx")
and not isfile("/opt/bunkerweb/tmp/nginx.pid")
else "Cluster"
)
if args.variables:
logger.info(f"Variables : {args.variables}")
@ -116,6 +127,9 @@ if __name__ == "__main__":
"Kubernetes" if getenv("KUBERNETES_MODE", "no") == "yes" else "Cluster"
)
api_caller = ApiCaller()
api_caller.auto_setup(bw_integration=bw_integration)
db = Database(
logger,
sqlalchemy_string=getenv("DATABASE_URI", None),
@ -136,65 +150,157 @@ if __name__ == "__main__":
sleep(3)
env = db.get_config()
if args.run:
# write config to /tmp/variables.env
with open("/tmp/variables.env", "w") as f:
for variable, value in env.items():
f.write(f"{variable}={value}\n")
run_once = True
else:
# Check if config as changed since last run
run_once = dotenv_values("/tmp/variables.env") != env
custom_configs = db.get_custom_configs()
if run_once:
logger.info("Config changed since last run, reloading ...")
original_path = "/data/configs"
makedirs(original_path, exist_ok=True)
for custom_config in custom_configs:
tmp_path = f"{original_path}/{custom_config['type'].replace('_', '-')}"
if custom_config["service_id"]:
tmp_path += f"/{custom_config['service_id']}"
tmp_path += f"/{custom_config['name']}.conf"
makedirs(dirname(tmp_path), exist_ok=True)
with open(tmp_path, "w") as f:
f.write(custom_config["data"])
logger.info("Executing job scheduler ...")
if bw_integration != "Local":
logger.info("Sending custom configs to BunkerWeb")
ret = api_caller._send_files("/data/configs", "/custom_configs")
if not ret:
logger.error(
"Sending custom configs failed, configuration will not work as expected...",
)
logger.info("Executing scheduler ...")
while True:
# Instantiate scheduler
scheduler = JobScheduler(
env=deepcopy(env),
apis=[],
apis=api_caller._get_apis(),
logger=logger,
auto=not args.variables,
bw_integration=bw_integration,
)
# Only run jobs once
if run_once:
if not scheduler.run_once():
logger.error("At least one job in run_once() failed")
if args.run:
stop(1)
if not scheduler.run_once():
logger.error("At least one job in run_once() failed")
else:
logger.info("All jobs in run_once() were successful")
# run the generator
cmd = f"python /opt/bunkerweb/gen/main.py --settings /opt/bunkerweb/settings.json --templates /opt/bunkerweb/confs --output /etc/nginx{f' --variables {args.variables}' if args.variables else ''} --method scheduler"
proc = subprocess_run(cmd.split(" "), stdin=DEVNULL, stderr=STDOUT)
if proc.returncode != 0:
logger.error(
"Config generator failed, configuration will not work as expected...",
)
if len(api_caller._get_apis()) > 0:
# send nginx configs
logger.info("Sending /etc/nginx folder ...")
ret = api_caller._send_files("/etc/nginx", "/confs")
if not ret:
logger.error(
"Sending nginx configs failed, configuration will not work as expected...",
)
try:
if len(api_caller._get_apis()) > 0:
# send cache
logger.info("Sending /data/cache folder ...")
if not api_caller._send_files("/data/cache", "/cache"):
logger.error("Error while sending /data/cache folder")
else:
logger.info("Successfuly sent /data/cache folder")
# reload nginx
if bw_integration == "Local":
logger.info("Reloading nginx ...")
proc = run(
["/usr/sbin/nginx", "-s", "reload"],
stdin=DEVNULL,
stderr=PIPE,
env=deepcopy(env),
)
if proc.returncode == 0:
logger.info("Successfuly reloaded nginx")
else:
logger.error(
f"Error while reloading nginx - returncode: {proc.returncode} - error: {proc.stderr.decode('utf-8')}",
)
else:
logger.info("All jobs in run_once() were successful")
if args.run:
break
logger.info("Reloading nginx ...")
if api_caller._send_to_apis("POST", "/reload"):
logger.info("Successfuly reloaded nginx")
else:
logger.error("Error while reloading nginx")
except:
logger.error(
f"Exception while reloading after running jobs once scheduling : {format_exc()}",
)
run_once = False
# Or infinite schedule
# infinite schedule for the jobs
scheduler.setup()
logger.info("Executing job scheduler ...")
while run:
scheduler.run_pending()
sleep(1)
# check if the custom configs have changed since last time
tmp_custom_configs = db.get_custom_configs()
if custom_configs != tmp_custom_configs:
logger.info("Custom configs changed, generating ...")
logger.debug(f"{tmp_custom_configs}")
logger.debug(f"{custom_configs}")
custom_configs = tmp_custom_configs
original_path = "/data/configs"
# Remove old custom configs files
logger.info("Removing old custom configs files ...")
files = glob(f"{original_path}/*")
for file in files:
if islink(file):
unlink(file)
elif isfile(file):
remove(file)
elif isdir(file):
rmtree(file, ignore_errors=False)
logger.info("Generating new custom configs ...")
makedirs(original_path, exist_ok=True)
for custom_config in custom_configs:
tmp_path = (
f"{original_path}/{custom_config['type'].replace('_', '-')}"
)
if custom_config["service_id"]:
tmp_path += f"/{custom_config['service_id']}"
tmp_path += f"/{custom_config['name']}.conf"
makedirs(dirname(tmp_path), exist_ok=True)
with open(tmp_path, "w") as f:
f.write(custom_config["data"])
if bw_integration != "Local":
logger.info("Sending custom configs to BunkerWeb")
ret = api_caller._send_files("/data/configs", "/custom_configs")
if not ret:
logger.error(
"Sending custom configs failed, configuration will not work as expected...",
)
# check if the config have changed since last time
tmp_env = (
dotenv_values(args.variables) if args.variables else db.get_config()
)
if env != tmp_env:
logger.info("Config changed, reloading ...")
logger.info("Config changed, generating ...")
logger.debug(f"{tmp_env=}")
logger.debug(f"{env=}")
env = tmp_env
run_once = True
env = deepcopy(tmp_env)
break
except:
logger.error(
f"Exception while executing scheduler : {format_exc()}",
)
stop(1)
logger.info("Job scheduler stopped")
stop(0)

View File

@ -1,6 +1,5 @@
from io import BytesIO
from os import environ, getenv
from os.path import exists
from tarfile import open as taropen
from logger import setup_logger
@ -25,22 +24,21 @@ class ApiCaller:
pod.metadata.annotations != None
and "bunkerweb.io/INSTANCE" in pod.metadata.annotations
):
api = None
api_http_port = None
api_server_name = None
for pod_env in pod.spec.containers[0].env:
if pod_env.name == "API_HTTP_PORT":
api = API(
f"http://{pod.status.pod_ip}:{pod_env.value or '5000'}"
)
break
api_http_port = pod_env.value or "5000"
elif pod_env.name == "API_SERVER_NAME":
api_server_name = pod_env.value or "bwapi"
if api:
self.__apis.append(api)
else:
self.__apis.append(
API(
f"http://{pod.status.pod_ip}:{getenv('API_HTTP_PORT', '5000')}"
)
self.__apis.append(
API(
f"http://{pod.status.pod_ip}:{api_http_port or getenv('API_HTTP_PORT', '5000')}",
host=api_server_name or getenv("API_SERVER_NAME", "bwapi"),
)
)
else:
from docker import DockerClient
@ -50,19 +48,48 @@ class ApiCaller:
for instance in docker_client.containers.list(
filters={"label": "bunkerweb.INSTANCE"}
):
api = None
api_http_port = None
api_server_name = None
for var in instance.attrs["Config"]["Env"]:
if var.startswith("API_HTTP_PORT="):
api = API(
f"http://{instance.name}:{var.replace('API_HTTP_PORT=', '', 1)}"
)
break
api_http_port = var.replace("API_HTTP_PORT=", "", 1)
elif var.startswith("API_SERVER_NAME="):
api_server_name = var.replace("API_SERVER_NAME=", "", 1)
self.__apis.append(
API(
f"http://{instance.name}:{api_http_port or getenv('API_HTTP_PORT', '5000')}",
host=api_server_name or getenv("API_SERVER_NAME", "bwapi"),
)
)
is_swarm = True
try:
docker_client.swarm.version
except:
is_swarm = False
if is_swarm:
for instance in docker_client.services.list(
filters={"label": "bunkerweb.INSTANCE"}
):
api_http_port = None
api_server_name = None
for var in instance.attrs["Spec"]["TaskTemplate"]["ContainerSpec"][
"Env"
]:
if var.startswith("API_HTTP_PORT="):
api_http_port = var.replace("API_HTTP_PORT=", "", 1)
elif var.startswith("API_SERVER_NAME="):
api_server_name = var.replace("API_SERVER_NAME=", "", 1)
if api:
self.__apis.append(api)
else:
self.__apis.append(
API(f"http://{instance.name}:{getenv('API_HTTP_PORT', '5000')}")
API(
f"http://{instance.name}:{api_http_port or getenv('API_HTTP_PORT', '5000')}",
host=api_server_name or getenv("API_SERVER_NAME", "bwapi"),
)
)
def _set_apis(self, apis):

View File

@ -63,7 +63,7 @@ class ConfigCaller:
and env_instances.get("SERVER_NAME", "") != ""
):
for server_name in env_instances["SERVER_NAME"].split(" "):
full_env[f"{server_name}_" + k] = v
full_env[f"{server_name}_{k}"] = v
# Replace with services values
for k, v in env_services.items():
full_env[k] = v