mirror of
https://github.com/bunkerity/bunkerized-nginx
synced 2023-12-13 21:30:18 +01:00
224 lines
11 KiB
Python
224 lines
11 KiB
Python
from traceback import format_exc
|
||
from kubernetes import client, config, watch
|
||
from kubernetes.client.exceptions import ApiException
|
||
from threading import Thread, Lock
|
||
from logger import log
|
||
from sys import exit
|
||
|
||
from Controller import Controller
|
||
from ConfigCaller import ConfigCaller
|
||
|
||
class IngressController(Controller, ConfigCaller) :
|
||
|
||
def __init__(self) :
|
||
Controller.__init__(self, "kubernetes")
|
||
ConfigCaller.__init__(self)
|
||
config.load_incluster_config()
|
||
self.__corev1 = client.CoreV1Api()
|
||
self.__networkingv1 = client.NetworkingV1Api()
|
||
self.__internal_lock = Lock()
|
||
|
||
def _get_controller_instances(self) :
|
||
controller_instances = []
|
||
for pod in self.__corev1.list_pod_for_all_namespaces(watch=False).items :
|
||
if pod.metadata.annotations != None and "bunkerweb.io/AUTOCONF" in pod.metadata.annotations :
|
||
controller_instances.append(pod)
|
||
return controller_instances
|
||
|
||
def _to_instances(self, controller_instance) :
|
||
instance = {}
|
||
instance["name"] = controller_instance.metadata.name
|
||
instance["hostname"] = controller_instance.status.pod_ip
|
||
health = False
|
||
if controller_instance.status.conditions is not None :
|
||
for condition in controller_instance.status.conditions :
|
||
if condition.type == "Ready" and condition.status == "True" :
|
||
health = True
|
||
break
|
||
instance["health"] = health
|
||
instance["env"] = {}
|
||
for env in controller_instance.spec.containers[0].env :
|
||
if env.value is not None :
|
||
instance["env"][env.name] = env.value
|
||
else :
|
||
instance["env"][env.name] = ""
|
||
for controller_service in self._get_controller_services() :
|
||
if controller_service.metadata.annotations is not None :
|
||
for annotation, value in controller_service.metadata.annotations.items() :
|
||
if not annotation.startswith("bunkerweb.io/") :
|
||
continue
|
||
variable = annotation.replace("bunkerweb.io/", "", 1)
|
||
if self._is_setting(variable) :
|
||
instance["env"][variable] = value
|
||
return [instance]
|
||
|
||
def _get_controller_services(self) :
|
||
return self.__networkingv1.list_ingress_for_all_namespaces(watch=False).items
|
||
|
||
def _to_services(self, controller_service) :
|
||
if controller_service.spec is None or controller_service.spec.rules is None :
|
||
return []
|
||
services = []
|
||
# parse rules
|
||
for rule in controller_service.spec.rules :
|
||
if rule.host is None :
|
||
log("INGRESS-CONTROLLER", "⚠️", "Ignoring unsupported ingress rule without host.")
|
||
continue
|
||
service = {}
|
||
service["SERVER_NAME"] = rule.host
|
||
if rule.http is None :
|
||
services.append(service)
|
||
continue
|
||
location = 1
|
||
for path in rule.http.paths :
|
||
if path.path is None :
|
||
log("INGRESS-CONTROLLER", "⚠️", "Ignoring unsupported ingress rule without path.")
|
||
continue
|
||
if path.backend.service is None :
|
||
log("INGRESS-CONTROLLER", "⚠️", "Ignoring unsupported ingress rule without backend service.")
|
||
continue
|
||
if path.backend.service.port is None :
|
||
log("INGRESS-CONTROLLER", "⚠️", "Ignoring unsupported ingress rule without backend service port.")
|
||
continue
|
||
if path.backend.service.port.number is None :
|
||
log("INGRESS-CONTROLLER", "⚠️", "Ignoring unsupported ingress rule without backend service port number.")
|
||
continue
|
||
service_list = self.__corev1.list_service_for_all_namespaces(watch=False, field_selector="metadata.name=" + path.backend.service.name).items
|
||
if len(service_list) == 0 :
|
||
log("INGRESS-CONTROLLER", "⚠️", "Ignoring ingress rule with service " + path.backend.service.name + " : service not found.")
|
||
continue
|
||
reverse_proxy_host = "http://" + path.backend.service.name + "." + service_list[0].metadata.namespace + ".svc.cluster.local:" + str(path.backend.service.port.number)
|
||
service["USE_REVERSE_PROXY"] = "yes"
|
||
service["REVERSE_PROXY_HOST_" + str(location)] = reverse_proxy_host
|
||
service["REVERSE_PROXY_URL_" + str(location)] = path.path
|
||
location += 1
|
||
services.append(service)
|
||
|
||
# parse tls
|
||
if controller_service.spec.tls is not None :
|
||
log("INGRESS-CONTROLLER", "⚠️", "Ignoring unsupported tls.")
|
||
|
||
# parse annotations
|
||
if controller_service.metadata.annotations is not None :
|
||
for service in services :
|
||
for annotation, value in controller_service.metadata.annotations.items() :
|
||
if not annotation.startswith("bunkerweb.io/") :
|
||
continue
|
||
variable = annotation.replace("bunkerweb.io/", "", 1)
|
||
if not variable.startswith(service["SERVER_NAME"].split(" ")[0] + "_") :
|
||
continue
|
||
variable = variable.replace(service["SERVER_NAME"].split(" ")[0] + "_", "", 1)
|
||
if self._is_multisite_setting(variable) :
|
||
service[variable] = value
|
||
return services
|
||
|
||
def _get_static_services(self) :
|
||
services = []
|
||
variables = {}
|
||
for instance in self.__corev1.list_pod_for_all_namespaces(watch=False).items :
|
||
if instance.metadata.annotations is None or not "bunkerweb.io/AUTOCONF" in instance.metadata.annotations :
|
||
continue
|
||
for env in instance.spec.containers[0].env :
|
||
if env.value is None :
|
||
variables[env.name] = ""
|
||
else :
|
||
variables[env.name] = env.value
|
||
server_names = []
|
||
if "SERVER_NAME" in variables and variables["SERVER_NAME"] != "" :
|
||
server_names = variables["SERVER_NAME"].split(" ")
|
||
for server_name in server_names :
|
||
service = {}
|
||
service["SERVER_NAME"] = server_name
|
||
for variable, value in variables.items() :
|
||
prefix = variable.split("_")[0]
|
||
real_variable = variable.replace(prefix + "_", "", 1)
|
||
if prefix == server_name and self._is_multisite_setting(real_variable) :
|
||
service[real_variable] = value
|
||
services.append(service)
|
||
return services
|
||
|
||
def get_configs(self) :
|
||
configs = {}
|
||
supported_config_types = ["http", "stream", "server-http", "server-stream", "default-server-http", "modsec", "modsec-crs"]
|
||
for config_type in supported_config_types :
|
||
configs[config_type] = {}
|
||
for configmap in self.__corev1.list_config_map_for_all_namespaces(watch=False).items :
|
||
if configmap.metadata.annotations is None or "bunkerweb.io/CONFIG_TYPE" not in configmap.metadata.annotations :
|
||
continue
|
||
config_type = configmap.metadata.annotations["bunkerweb.io/CONFIG_TYPE"]
|
||
if config_type not in supported_config_types :
|
||
log("INGRESS-CONTROLLER", "⚠️", "Ignoring unsupported CONFIG_TYPE " + config_type + " for ConfigMap " + configmap.metadata.name)
|
||
continue
|
||
if not configmap.data :
|
||
log("INGRESS-CONTROLLER", "⚠️", "Ignoring blank ConfigMap " + configmap.metadata.name)
|
||
continue
|
||
config_site = ""
|
||
if "bunkerweb.io/CONFIG_SITE" in configmap.metadata.annotations :
|
||
config_site = configmap.metadata.annotations["bunkerweb.io/CONFIG_SITE"] + "/"
|
||
for config_name, config_data in configmap.data.items() :
|
||
configs[config_type][config_site + config_name] = config_data
|
||
return configs
|
||
|
||
def __watch(self, watch_type) :
|
||
w = watch.Watch()
|
||
what = None
|
||
if watch_type == "pod" :
|
||
what = self.__corev1.list_pod_for_all_namespaces
|
||
elif watch_type == "ingress" :
|
||
what = self.__networkingv1.list_ingress_for_all_namespaces
|
||
elif watch_type == "configmap" :
|
||
what = self.__corev1.list_config_map_for_all_namespaces
|
||
else :
|
||
raise Exception("unsupported watch_type " + watch_type)
|
||
while True :
|
||
locked = False
|
||
try :
|
||
for event in w.stream(what) :
|
||
self.__internal_lock.acquire()
|
||
locked = True
|
||
self._instances = self.get_instances()
|
||
self._services = self.get_services()
|
||
self._configs = self.get_configs()
|
||
if not self._config.update_needed(self._instances, self._services, configs=self._configs) :
|
||
self.__internal_lock.release()
|
||
locked = False
|
||
continue
|
||
log("INGRESS-CONTROLLER", "ℹ️", "Catched kubernetes event, deploying new configuration ...")
|
||
try :
|
||
ret = self.apply_config()
|
||
if not ret :
|
||
log("INGRESS-CONTROLLER", "❌", "Error while deploying new configuration ...")
|
||
else :
|
||
log("INGRESS-CONTROLLER", "ℹ️", "Successfully deployed new configuration 🚀")
|
||
except :
|
||
log("INGRESS-CONTROLLER", "❌", "Exception while deploying new configuration :")
|
||
print(format_exc())
|
||
self.__internal_lock.release()
|
||
locked = False
|
||
except ApiException as e :
|
||
if e.status != 410 :
|
||
log("INGRESS-CONTROLLER", "❌", "Exception while reading k8s event (type = " + watch_type + ") : ")
|
||
print(format_exc())
|
||
exit(1)
|
||
if locked :
|
||
self.__internal_lock.release()
|
||
except :
|
||
log("INGRESS-CONTROLLER", "❌", "Unknown exception while reading k8s event (type = " + watch_type + ") : ")
|
||
print(format_exc())
|
||
exit(2)
|
||
|
||
def apply_config(self) :
|
||
self._config.stop_scheduler()
|
||
ret = self._config.apply(self._instances, self._services, configs=self._configs)
|
||
self._config.start_scheduler()
|
||
return ret
|
||
|
||
def process_events(self) :
|
||
watch_types = ["pod", "ingress", "configmap"]
|
||
threads = []
|
||
for watch_type in watch_types :
|
||
threads.append(Thread(target=self.__watch, args=(watch_type,)))
|
||
for thread in threads :
|
||
thread.start()
|
||
for thread in threads :
|
||
thread.join()
|