Refactor every .py file
This commit is contained in:
parent
7689dac76d
commit
146338de63
|
@ -6,9 +6,9 @@ from pytablewriter import MarkdownTableWriter
|
|||
|
||||
|
||||
def print_md_table(settings):
|
||||
values = []
|
||||
for setting, data in settings.items():
|
||||
values.append(
|
||||
writer = MarkdownTableWriter(
|
||||
headers=["Setting", "Default", "Context", "Multiple", "Description"],
|
||||
value_matrix=[
|
||||
[
|
||||
f"`{setting}`",
|
||||
"" if data["default"] == "" else f"`{data['default']}`",
|
||||
|
@ -16,13 +16,11 @@ def print_md_table(settings):
|
|||
"no" if not "multiple" in data else "yes",
|
||||
data["help"],
|
||||
]
|
||||
)
|
||||
writer = MarkdownTableWriter(
|
||||
headers=["Setting", "Default", "Context", "Multiple", "Description"],
|
||||
value_matrix=values,
|
||||
for setting, data in settings.items()
|
||||
],
|
||||
)
|
||||
writer.write_table()
|
||||
print("")
|
||||
print()
|
||||
|
||||
|
||||
print("# Settings\n")
|
||||
|
|
|
@ -22,14 +22,14 @@ class IngressController(Controller, ConfigCaller):
|
|||
self.__logger = setup_logger("Ingress-controller", getenv("LOG_LEVEL", "INFO"))
|
||||
|
||||
def _get_controller_instances(self):
|
||||
controller_instances = []
|
||||
for pod in self.__corev1.list_pod_for_all_namespaces(watch=False).items:
|
||||
return [
|
||||
pod
|
||||
for pod in self.__corev1.list_pod_for_all_namespaces(watch=False).items
|
||||
if (
|
||||
pod.metadata.annotations != None
|
||||
and "bunkerweb.io/INSTANCE" in pod.metadata.annotations
|
||||
):
|
||||
controller_instances.append(pod)
|
||||
return controller_instances
|
||||
)
|
||||
]
|
||||
|
||||
def _to_instances(self, controller_instance):
|
||||
instance = {}
|
||||
|
@ -289,9 +289,10 @@ class IngressController(Controller, ConfigCaller):
|
|||
|
||||
def process_events(self):
|
||||
watch_types = ["pod", "ingress", "configmap"]
|
||||
threads = []
|
||||
for watch_type in watch_types:
|
||||
threads.append(Thread(target=self.__watch, args=(watch_type,)))
|
||||
threads = [
|
||||
Thread(target=self.__watch, args=(watch_type,))
|
||||
for watch_type in watch_types
|
||||
]
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
for thread in threads:
|
||||
|
|
|
@ -143,9 +143,10 @@ class SwarmController(Controller, ConfigCaller):
|
|||
|
||||
def process_events(self):
|
||||
event_types = ["service", "config"]
|
||||
threads = []
|
||||
for event_type in event_types:
|
||||
threads.append(Thread(target=self.__event, args=(event_type,)))
|
||||
threads = [
|
||||
Thread(target=self.__event, args=(event_type,))
|
||||
for event_type in event_types
|
||||
]
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
for thread in threads:
|
||||
|
|
|
@ -5,10 +5,14 @@ from signal import SIGINT, SIGTERM, signal
|
|||
from sys import exit as sys_exit, path as sys_path
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/deps/python")
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/api")
|
||||
sys_path.append("/usr/share/bunkerweb/db")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/api",
|
||||
"/usr/share/bunkerweb/db",
|
||||
)
|
||||
)
|
||||
|
||||
from logger import setup_logger
|
||||
from SwarmController import SwarmController
|
||||
|
|
|
@ -42,7 +42,7 @@ class CLI(ApiCaller):
|
|||
|
||||
def __get_apis(self):
|
||||
# Docker case
|
||||
if self.__integration == "docker" or self.__integration == "linux":
|
||||
if self.__integration in ("docker", "linux"):
|
||||
return [
|
||||
API(
|
||||
f"http://127.0.0.1:{self.__variables['API_HTTP_PORT']}",
|
||||
|
|
|
@ -5,10 +5,14 @@ from os import _exit
|
|||
from sys import exit as sys_exit, path
|
||||
from traceback import format_exc
|
||||
|
||||
path.append("/usr/share/bunkerweb/deps/python")
|
||||
path.append("/usr/share/bunkerweb/cli")
|
||||
path.append("/usr/share/bunkerweb/utils")
|
||||
path.append("/usr/share/bunkerweb/api")
|
||||
path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/cli",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/api",
|
||||
)
|
||||
)
|
||||
|
||||
from logger import setup_logger
|
||||
from CLI import CLI
|
||||
|
|
|
@ -1,15 +1,21 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
from contextlib import suppress
|
||||
from ipaddress import ip_address, ip_network
|
||||
from os import _exit, getenv, makedirs
|
||||
from pathlib import Path
|
||||
from re import IGNORECASE, compile as re_compile
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from traceback import format_exc
|
||||
from typing import Tuple
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/deps/python")
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/db")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/db",
|
||||
)
|
||||
)
|
||||
|
||||
from requests import get
|
||||
|
||||
|
@ -25,17 +31,13 @@ uri_rx = re_compile(rb"^/")
|
|||
def check_line(kind: str, line: bytes) -> Tuple[bool, bytes]:
|
||||
if kind == "IP":
|
||||
if b"/" in line:
|
||||
try:
|
||||
with suppress(ValueError):
|
||||
ip_network(line.decode("utf-8"))
|
||||
return True, line
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
with suppress(ValueError):
|
||||
ip_address(line.decode("utf-8"))
|
||||
return True, line
|
||||
except ValueError:
|
||||
pass
|
||||
elif kind == "RDNS":
|
||||
if rdns_rx.match(line):
|
||||
return True, line.lower()
|
||||
|
@ -163,8 +165,7 @@ try:
|
|||
content += data + b"\n"
|
||||
i += 1
|
||||
|
||||
with open(f"/var/tmp/bunkerweb/blacklist/{kind}.list", "wb") as f:
|
||||
f.write(content)
|
||||
Path(f"/var/tmp/bunkerweb/blacklist/{kind}.list").write_bytes(content)
|
||||
|
||||
logger.info(f"Downloaded {i} bad {kind}")
|
||||
# Check if file has changed
|
||||
|
|
|
@ -2,13 +2,18 @@
|
|||
|
||||
from os import _exit, getenv, makedirs
|
||||
from os.path import isfile
|
||||
from pathlib import Path
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/deps/python")
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/db")
|
||||
sys_path.append("/usr/share/bunkerweb/core/bunkernet/jobs")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/db",
|
||||
"/usr/share/bunkerweb/core/bunkernet/jobs",
|
||||
)
|
||||
)
|
||||
|
||||
from bunkernet import data
|
||||
from Database import Database
|
||||
|
@ -83,8 +88,7 @@ try:
|
|||
# Writing data to file
|
||||
logger.info("Saving BunkerNet data ...")
|
||||
content = "\n".join(data["data"]).encode("utf-8")
|
||||
with open("/var/tmp/bunkerweb/bunkernet-ip.list", "wb") as f:
|
||||
f.write(content)
|
||||
Path("/var/tmp/bunkerweb/bunkernet-ip.list").write_bytes(content)
|
||||
|
||||
# Check if file has changed
|
||||
new_hash = file_hash("/var/tmp/bunkerweb/bunkernet-ip.list")
|
||||
|
|
|
@ -2,14 +2,19 @@
|
|||
|
||||
from os import _exit, getenv, makedirs, remove
|
||||
from os.path import isfile
|
||||
from pathlib import Path
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from time import sleep
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/deps/python")
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/db")
|
||||
sys_path.append("/usr/share/bunkerweb/core/bunkernet/jobs")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/db",
|
||||
"/usr/share/bunkerweb/core/bunkernet/jobs",
|
||||
)
|
||||
)
|
||||
|
||||
from bunkernet import register, ping, get_id
|
||||
from Database import Database
|
||||
|
@ -76,8 +81,7 @@ try:
|
|||
f"Successfully registered on BunkerNet API with instance id {data['data']}"
|
||||
)
|
||||
else:
|
||||
with open("/var/cache/bunkerweb/bunkernet/instance.id", "r") as f:
|
||||
bunkernet_id = f.read()
|
||||
bunkernet_id = Path("/var/cache/bunkerweb/bunkernet/instance.id").read_text()
|
||||
logger.info(f"Already registered on BunkerNet API with instance id {get_id()}")
|
||||
|
||||
# Ping
|
||||
|
@ -115,8 +119,7 @@ try:
|
|||
logger.info("Connectivity with BunkerWeb is successful !")
|
||||
status = 1
|
||||
if not isfile("/var/cache/bunkerweb/bunkernet/instance.id"):
|
||||
with open("/var/cache/bunkerweb/bunkernet/instance.id", "w") as f:
|
||||
f.write(bunkernet_id)
|
||||
Path("/var/cache/bunkerweb/bunkernet/instance.id").write_text(bunkernet_id)
|
||||
|
||||
# Update db
|
||||
err = db.update_job_cache(
|
||||
|
|
|
@ -2,13 +2,18 @@
|
|||
|
||||
from os import getenv, makedirs, remove
|
||||
from os.path import isfile
|
||||
from pathlib import Path
|
||||
from shutil import copy
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/deps/python")
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/db")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/db",
|
||||
)
|
||||
)
|
||||
|
||||
from Database import Database
|
||||
from jobs import file_hash
|
||||
|
@ -45,16 +50,13 @@ def check_cert(cert_path, key_path, first_server: str = None) -> bool:
|
|||
cert_hash = file_hash(cert_path)
|
||||
|
||||
if not isfile(cert_cache_path):
|
||||
with open(cert_cache_path, "w") as f:
|
||||
f.write(cert_hash)
|
||||
Path(cert_cache_path).write_text(cert_hash)
|
||||
|
||||
old_hash = file_hash(cert_cache_path)
|
||||
if old_hash == cert_hash:
|
||||
return False
|
||||
|
||||
with open(cert_cache_path, "w") as f:
|
||||
f.write(cert_hash)
|
||||
|
||||
Path(cert_cache_path).write_text(cert_hash)
|
||||
copy(cert_path, cert_cache_path.replace(".hash", ""))
|
||||
|
||||
if not isfile(key_path):
|
||||
|
@ -71,8 +73,7 @@ def check_cert(cert_path, key_path, first_server: str = None) -> bool:
|
|||
key_hash = file_hash(key_path)
|
||||
|
||||
if not isfile(key_cache_path):
|
||||
with open(key_cache_path, "w") as f:
|
||||
f.write(key_hash)
|
||||
Path(key_cache_path).write_text(key_hash)
|
||||
|
||||
old_hash = file_hash(key_cache_path)
|
||||
if old_hash != key_hash:
|
||||
|
|
|
@ -1,15 +1,21 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
from contextlib import suppress
|
||||
from ipaddress import ip_address, ip_network
|
||||
from os import _exit, getenv, makedirs
|
||||
from pathlib import Path
|
||||
from re import IGNORECASE, compile as re_compile
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from traceback import format_exc
|
||||
from typing import Tuple
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/deps/python")
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/db")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/db",
|
||||
)
|
||||
)
|
||||
|
||||
from requests import get
|
||||
|
||||
|
@ -25,17 +31,13 @@ uri_rx = re_compile(rb"^/")
|
|||
def check_line(kind: str, line: bytes) -> Tuple[bool, bytes]:
|
||||
if kind == "IP":
|
||||
if b"/" in line:
|
||||
try:
|
||||
with suppress(ValueError):
|
||||
ip_network(line.decode("utf-8"))
|
||||
return True, line
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
with suppress(ValueError):
|
||||
ip_address(line.decode("utf-8"))
|
||||
return True, line
|
||||
except ValueError:
|
||||
pass
|
||||
elif kind == "RDNS":
|
||||
if rdns_rx.match(line):
|
||||
return True, line.lower()
|
||||
|
@ -147,8 +149,7 @@ try:
|
|||
content += data + b"\n"
|
||||
i += 1
|
||||
|
||||
with open(f"/var/tmp/bunkerweb/greylist/{kind}.list", "wb") as f:
|
||||
f.write(content)
|
||||
Path(f"/var/tmp/bunkerweb/greylist/{kind}.list").write_bytes(content)
|
||||
|
||||
logger.info(f"Downloaded {i} grey {kind}")
|
||||
# Check if file has changed
|
||||
|
|
|
@ -12,9 +12,13 @@ from shutil import copytree, rmtree
|
|||
from traceback import format_exc
|
||||
from zipfile import ZipFile
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/deps/python")
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/db")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/db",
|
||||
)
|
||||
)
|
||||
|
||||
from requests import get
|
||||
|
||||
|
@ -32,7 +36,6 @@ status = 0
|
|||
|
||||
def install_plugin(plugin_dir):
|
||||
# Load plugin.json
|
||||
metadata = {}
|
||||
with open(f"{plugin_dir}plugin.json", "rb") as f:
|
||||
metadata = loads(f.read())
|
||||
# Don't go further if plugin is already installed
|
||||
|
|
|
@ -3,12 +3,17 @@
|
|||
from datetime import date
|
||||
from gzip import decompress
|
||||
from os import _exit, getenv
|
||||
from pathlib import Path
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/deps/python")
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/db")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/db",
|
||||
)
|
||||
)
|
||||
|
||||
from maxminddb import open_database
|
||||
from requests import get
|
||||
|
@ -39,8 +44,7 @@ try:
|
|||
|
||||
# Save it to temp
|
||||
logger.info("Saving mmdb file to tmp ...")
|
||||
with open("/var/tmp/bunkerweb/asn.mmdb", "wb") as f:
|
||||
f.write(decompress(resp.content))
|
||||
Path(f"/var/tmp/bunkerweb/asn.mmdb").write_bytes(decompress(resp.content))
|
||||
|
||||
# Try to load it
|
||||
logger.info("Checking if mmdb file is valid ...")
|
||||
|
|
|
@ -3,12 +3,17 @@
|
|||
from datetime import date
|
||||
from gzip import decompress
|
||||
from os import _exit, getenv
|
||||
from pathlib import Path
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/deps/python")
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/db")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/db",
|
||||
)
|
||||
)
|
||||
|
||||
from requests import get
|
||||
from maxminddb import open_database
|
||||
|
@ -39,8 +44,7 @@ try:
|
|||
|
||||
# Save it to temp
|
||||
logger.info("Saving mmdb file to tmp ...")
|
||||
with open("/var/tmp/bunkerweb/country.mmdb", "wb") as f:
|
||||
f.write(decompress(resp.content))
|
||||
Path(f"/var/tmp/bunkerweb/country.mmdb").write_bytes(decompress(resp.content))
|
||||
|
||||
# Try to load it
|
||||
logger.info("Checking if mmdb file is valid ...")
|
||||
|
|
|
@ -2,13 +2,18 @@
|
|||
|
||||
from os import getenv, makedirs
|
||||
from os.path import exists
|
||||
from pathlib import Path
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/deps/python")
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/api")
|
||||
sys_path.append("/usr/share/bunkerweb/db")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/api",
|
||||
"/usr/share/bunkerweb/db",
|
||||
)
|
||||
)
|
||||
|
||||
from Database import Database
|
||||
from logger import setup_logger
|
||||
|
@ -67,8 +72,7 @@ try:
|
|||
else:
|
||||
root_dir = "/var/tmp/bunkerweb/lets-encrypt/.well-known/acme-challenge/"
|
||||
makedirs(root_dir, exist_ok=True)
|
||||
with open(f"{root_dir}{token}", "w") as f:
|
||||
f.write(validation)
|
||||
Path(f"{root_dir}{token}").write_text(validation)
|
||||
except:
|
||||
status = 1
|
||||
logger.error(f"Exception while running certbot-auth.py :\n{format_exc()}")
|
||||
|
|
|
@ -5,10 +5,14 @@ from os.path import exists, isfile
|
|||
from sys import exit as sys_exit, path as sys_path
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/deps/python")
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/api")
|
||||
sys_path.append("/usr/share/bunkerweb/db")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/api",
|
||||
"/usr/share/bunkerweb/db",
|
||||
)
|
||||
)
|
||||
|
||||
from Database import Database
|
||||
from logger import setup_logger
|
||||
|
|
|
@ -9,10 +9,14 @@ from sys import exit as sys_exit, path as sys_path
|
|||
from tarfile import open as tar_open
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/deps/python")
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/api")
|
||||
sys_path.append("/usr/share/bunkerweb/db")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/api",
|
||||
"/usr/share/bunkerweb/db",
|
||||
)
|
||||
)
|
||||
|
||||
from Database import Database
|
||||
from logger import setup_logger
|
||||
|
|
|
@ -2,13 +2,18 @@
|
|||
|
||||
from os import environ, getcwd, getenv
|
||||
from os.path import exists
|
||||
from pathlib import Path
|
||||
from subprocess import DEVNULL, STDOUT, run
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/deps/python")
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/db")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/db",
|
||||
)
|
||||
)
|
||||
|
||||
from Database import Database
|
||||
from logger import setup_logger
|
||||
|
@ -93,10 +98,9 @@ try:
|
|||
)
|
||||
|
||||
if exists(f"/etc/letsencrypt/live/{first_server}/cert.pem"):
|
||||
with open(
|
||||
f"/etc/letsencrypt/live/{first_server}/cert.pem", "rb"
|
||||
) as f:
|
||||
cert = f.read()
|
||||
cert = Path(
|
||||
f"/etc/letsencrypt/live/{first_server}/cert.pem"
|
||||
).read_bytes()
|
||||
|
||||
# Update db
|
||||
err = db.update_job_cache(
|
||||
|
@ -132,10 +136,9 @@ try:
|
|||
)
|
||||
|
||||
if exists(f"/etc/letsencrypt/live/{first_server}/cert.pem"):
|
||||
with open(
|
||||
f"/etc/letsencrypt/live/{first_server}/cert.pem", "rb"
|
||||
) as f:
|
||||
cert = f.read()
|
||||
cert = Path(
|
||||
f"/etc/letsencrypt/live/{first_server}/cert.pem"
|
||||
).read_bytes()
|
||||
|
||||
# Update db
|
||||
err = db.update_job_cache(
|
||||
|
|
|
@ -6,8 +6,12 @@ from subprocess import DEVNULL, STDOUT, run
|
|||
from sys import exit as sys_exit, path as sys_path
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/deps/python")
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
)
|
||||
)
|
||||
|
||||
from logger import setup_logger
|
||||
|
||||
|
@ -21,7 +25,7 @@ def renew(domain):
|
|||
"--cert-name",
|
||||
domain,
|
||||
"--deploy-hook",
|
||||
f"{getcwd()}/certbot-deploy.py",
|
||||
"/usr/share/bunkerweb/core/letsencrypt/jobs/certbot-deploy.py",
|
||||
],
|
||||
stdin=DEVNULL,
|
||||
stderr=STDOUT,
|
||||
|
|
|
@ -1,13 +1,19 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
from contextlib import suppress
|
||||
from ipaddress import ip_address, ip_network
|
||||
from os import _exit, getenv, makedirs
|
||||
from pathlib import Path
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/deps/python")
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/db")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/db",
|
||||
)
|
||||
)
|
||||
|
||||
from requests import get
|
||||
|
||||
|
@ -18,17 +24,13 @@ from jobs import cache_file, cache_hash, file_hash, is_cached_file
|
|||
|
||||
def check_line(line):
|
||||
if "/" in line:
|
||||
try:
|
||||
with suppress(ValueError):
|
||||
ip_network(line)
|
||||
return True, line
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
with suppress(ValueError):
|
||||
ip_address(line)
|
||||
return True, line
|
||||
except ValueError:
|
||||
pass
|
||||
return False, ""
|
||||
|
||||
|
||||
|
@ -69,10 +71,7 @@ try:
|
|||
_exit(0)
|
||||
|
||||
# Get URLs
|
||||
urls = []
|
||||
for url in getenv("REALIP_FROM_URLS", "").split(" "):
|
||||
if url != "" and url not in urls:
|
||||
urls.append(url)
|
||||
urls = [url for url in getenv("REALIP_FROM_URLS", "").split(" ") if url]
|
||||
|
||||
# Download and write data to temp file
|
||||
i = 0
|
||||
|
@ -101,8 +100,7 @@ try:
|
|||
f"Exception while getting RealIP list from {url} :\n{format_exc()}"
|
||||
)
|
||||
|
||||
with open("/var/tmp/bunkerweb/realip-combined.list", "wb") as f:
|
||||
f.write(content)
|
||||
Path("/var/tmp/bunkerweb/realip-combined.list").write_bytes(content)
|
||||
|
||||
# Check if file has changed
|
||||
new_hash = file_hash("/var/tmp/bunkerweb/realip-combined.list")
|
||||
|
|
|
@ -6,9 +6,13 @@ from subprocess import DEVNULL, STDOUT, run
|
|||
from sys import exit as sys_exit, path as sys_path
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/deps/python")
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/db")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/db",
|
||||
)
|
||||
)
|
||||
|
||||
from Database import Database
|
||||
from logger import setup_logger
|
||||
|
|
|
@ -1,15 +1,21 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
from contextlib import suppress
|
||||
from ipaddress import ip_address, ip_network
|
||||
from os import _exit, getenv, makedirs
|
||||
from pathlib import Path
|
||||
from re import IGNORECASE, compile as re_compile
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from traceback import format_exc
|
||||
from typing import Tuple
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/deps/python")
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/db")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/db",
|
||||
)
|
||||
)
|
||||
|
||||
from requests import get
|
||||
|
||||
|
@ -25,17 +31,13 @@ uri_rx = re_compile(rb"^/")
|
|||
def check_line(kind: str, line: bytes) -> Tuple[bool, bytes]:
|
||||
if kind == "IP":
|
||||
if b"/" in line:
|
||||
try:
|
||||
with suppress(ValueError):
|
||||
ip_network(line.decode("utf-8"))
|
||||
return True, line
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
with suppress(ValueError):
|
||||
ip_address(line.decode("utf-8"))
|
||||
return True, line
|
||||
except ValueError:
|
||||
pass
|
||||
elif kind == "RDNS":
|
||||
if rdns_rx.match(line):
|
||||
return True, line.lower()
|
||||
|
@ -147,8 +149,7 @@ try:
|
|||
content += data + b"\n"
|
||||
i += 1
|
||||
|
||||
with open(f"/var/tmp/bunkerweb/whitelist/{kind}.list", "wb") as f:
|
||||
f.write(content)
|
||||
Path(f"/var/tmp/bunkerweb/whitelist/{kind}.list").write_bytes(content)
|
||||
|
||||
logger.info(f"Downloaded {i} good {kind}")
|
||||
# Check if file has changed
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from contextlib import contextmanager
|
||||
from contextlib import contextmanager, suppress
|
||||
from copy import deepcopy
|
||||
from datetime import datetime
|
||||
from hashlib import sha256
|
||||
|
@ -68,10 +68,8 @@ class Database:
|
|||
)
|
||||
|
||||
if sqlalchemy_string.startswith("sqlite"):
|
||||
try:
|
||||
with suppress(FileExistsError):
|
||||
makedirs(dirname(sqlalchemy_string.split("///")[1]), exist_ok=True)
|
||||
except FileExistsError:
|
||||
pass
|
||||
elif "+" in sqlalchemy_string and "+pymysql" not in sqlalchemy_string:
|
||||
splitted = sqlalchemy_string.split("+")
|
||||
sqlalchemy_string = f"{splitted[0]}:{':'.join(splitted[1].split(':')[1:])}"
|
||||
|
@ -567,12 +565,10 @@ class Database:
|
|||
Global_values.suffix == suffix,
|
||||
).update({Global_values.value: value})
|
||||
|
||||
try:
|
||||
with suppress(ProgrammingError, OperationalError):
|
||||
metadata = session.query(Metadata).get(1)
|
||||
if metadata is not None and not metadata.first_config_saved:
|
||||
metadata.first_config_saved = True
|
||||
except (ProgrammingError, OperationalError):
|
||||
pass
|
||||
|
||||
try:
|
||||
session.add_all(to_put)
|
||||
|
@ -645,8 +641,9 @@ class Database:
|
|||
|
||||
if custom_conf is None:
|
||||
to_put.append(Custom_configs(**config))
|
||||
elif config["checksum"] != custom_conf.checksum and (
|
||||
method == custom_conf.method or method == "autoconf"
|
||||
elif config["checksum"] != custom_conf.checksum and method in (
|
||||
custom_conf.method,
|
||||
"autoconf",
|
||||
):
|
||||
session.query(Custom_configs).filter(
|
||||
Custom_configs.service_id == config.get("service_id", None),
|
||||
|
@ -810,10 +807,9 @@ class Database:
|
|||
|
||||
for key, value in deepcopy(tmp_config).items():
|
||||
if key.startswith(f"{service}_"):
|
||||
tmp_config[key.replace(f"{service}_", "")] = value
|
||||
del tmp_config[key]
|
||||
tmp_config[key.replace(f"{service}_", "")] = tmp_config.pop(key)
|
||||
elif any(key.startswith(f"{s}_") for s in service_names):
|
||||
del tmp_config[key]
|
||||
tmp_config.pop(key)
|
||||
else:
|
||||
tmp_config[key] = (
|
||||
{"value": value["value"], "method": "default"}
|
||||
|
@ -1367,7 +1363,7 @@ class Database:
|
|||
return "An instance with the same hostname already exists."
|
||||
|
||||
session.add(
|
||||
Instances(hostname=hostname, port=int(port), server_name=server_name)
|
||||
Instances(hostname=hostname, port=port, server_name=server_name)
|
||||
)
|
||||
|
||||
try:
|
||||
|
|
|
@ -62,9 +62,9 @@ class Templator:
|
|||
if config != None:
|
||||
real_config = config
|
||||
Path(dirname(real_path)).mkdir(parents=True, exist_ok=True)
|
||||
with open(real_path, "w") as f:
|
||||
for k, v in real_config.items():
|
||||
f.write(f"{k}={v}\n")
|
||||
Path(real_path).write_text(
|
||||
"\n".join(f"{k}={v}" for k, v in real_config.items())
|
||||
)
|
||||
|
||||
def __render_global(self):
|
||||
self.__write_config()
|
||||
|
@ -129,8 +129,7 @@ class Templator:
|
|||
real_name = name
|
||||
jinja_template = self.__jinja_env.get_template(template)
|
||||
Path(dirname(f"{real_output}{real_name}")).mkdir(parents=True, exist_ok=True)
|
||||
with open(f"{real_output}{real_name}", "w") as f:
|
||||
f.write(jinja_template.render(real_config))
|
||||
Path(f"{real_output}{real_name}").write_text(jinja_template.render(real_config))
|
||||
|
||||
def is_custom_conf(path):
|
||||
return glob(f"{path}/*.conf")
|
||||
|
|
|
@ -11,9 +11,13 @@ from time import sleep
|
|||
from traceback import format_exc
|
||||
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/deps/python")
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/api")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/api",
|
||||
)
|
||||
)
|
||||
|
||||
from logger import setup_logger
|
||||
from Configurator import Configurator
|
||||
|
|
|
@ -13,10 +13,14 @@ from traceback import format_exc
|
|||
from typing import Any
|
||||
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/deps/python")
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/api")
|
||||
sys_path.append("/usr/share/bunkerweb/db")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/api",
|
||||
"/usr/share/bunkerweb/db",
|
||||
)
|
||||
)
|
||||
|
||||
from docker import DockerClient
|
||||
from kubernetes import client as kube_client
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
from contextlib import suppress
|
||||
from datetime import datetime
|
||||
from hashlib import sha512
|
||||
from json import dumps, loads
|
||||
|
@ -21,7 +22,6 @@ def is_cached_file(file, expire):
|
|||
return False
|
||||
if not path.isfile(f"{file}.md"):
|
||||
return False
|
||||
cached_time = 0
|
||||
with open(f"{file}.md", "r") as f:
|
||||
cached_time = loads(f.read())["date"]
|
||||
current_time = datetime.timestamp(datetime.now())
|
||||
|
@ -51,11 +51,9 @@ def file_hash(file):
|
|||
|
||||
|
||||
def cache_hash(cache):
|
||||
try:
|
||||
with suppress(BaseException):
|
||||
with open(f"{cache}.md", "r") as f:
|
||||
return loads(f.read())["checksum"]
|
||||
except:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
|
|
|
@ -13,8 +13,7 @@ from schedule import (
|
|||
from sys import path as sys_path
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/db")
|
||||
sys_path.extend(("/usr/share/bunkerweb/utils", "/usr/share/bunkerweb/db"))
|
||||
|
||||
from Database import Database
|
||||
from logger import setup_logger
|
||||
|
|
|
@ -16,6 +16,7 @@ from os import (
|
|||
walk,
|
||||
)
|
||||
from os.path import dirname, exists, isdir, isfile, islink, join
|
||||
from pathlib import Path
|
||||
from shutil import chown, copy, rmtree
|
||||
from signal import SIGINT, SIGTERM, signal, SIGHUP
|
||||
from subprocess import run as subprocess_run, DEVNULL, STDOUT
|
||||
|
@ -24,10 +25,14 @@ from time import sleep
|
|||
from traceback import format_exc
|
||||
from typing import Any, Dict, List
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/deps/python")
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/api")
|
||||
sys_path.append("/usr/share/bunkerweb/db")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/deps/python",
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/api",
|
||||
"/usr/share/bunkerweb/db",
|
||||
)
|
||||
)
|
||||
|
||||
from dotenv import dotenv_values
|
||||
|
||||
|
@ -97,9 +102,8 @@ def generate_custom_configs(
|
|||
if custom_config["service_id"]:
|
||||
tmp_path += f"/{custom_config['service_id']}"
|
||||
tmp_path += f"/{custom_config['name']}.conf"
|
||||
makedirs(dirname(tmp_path), exist_ok=True)
|
||||
with open(tmp_path, "wb") as f:
|
||||
f.write(custom_config["data"])
|
||||
Path(dirname(tmp_path)).mkdir(parents=True, exist_ok=True)
|
||||
Path(tmp_path).write_bytes(custom_config["data"])
|
||||
|
||||
# Fix permissions for the custom configs folder
|
||||
for root, dirs, files in walk("/data/configs", topdown=False):
|
||||
|
@ -127,8 +131,7 @@ if __name__ == "__main__":
|
|||
_exit(1)
|
||||
|
||||
# Write pid to file
|
||||
with open("/var/tmp/bunkerweb/scheduler.pid", "w") as f:
|
||||
f.write(str(getpid()))
|
||||
Path("/var/tmp/bunkerweb/scheduler.pid").write_text(str(getpid()))
|
||||
|
||||
# Parse arguments
|
||||
parser = ArgumentParser(description="Job scheduler for BunkerWeb")
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
from contextlib import suppress
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from signal import SIGINT, signal, SIGTERM
|
||||
from bs4 import BeautifulSoup
|
||||
from copy import deepcopy
|
||||
|
@ -41,9 +43,13 @@ from typing import Optional
|
|||
from uuid import uuid4
|
||||
from zipfile import BadZipFile, ZipFile
|
||||
|
||||
sys_path.append("/usr/share/bunkerweb/utils")
|
||||
sys_path.append("/usr/share/bunkerweb/api")
|
||||
sys_path.append("/usr/share/bunkerweb/db")
|
||||
sys_path.extend(
|
||||
(
|
||||
"/usr/share/bunkerweb/utils",
|
||||
"/usr/share/bunkerweb/api",
|
||||
"/usr/share/bunkerweb/db",
|
||||
)
|
||||
)
|
||||
|
||||
from src.Instances import Instances
|
||||
from src.ConfigFiles import ConfigFiles
|
||||
|
@ -112,8 +118,7 @@ if not vars["FLASK_ENV"] == "development" and vars["ABSOLUTE_URI"].endswith(
|
|||
logger.error("Please change the default URL.")
|
||||
sys_exit(1)
|
||||
|
||||
with open("/var/tmp/bunkerweb/ui.pid", "w") as f:
|
||||
f.write(str(getpid()))
|
||||
Path("/var/tmp/bunkerweb/ui.pid").write_text(str(getpid()))
|
||||
|
||||
login_manager = LoginManager()
|
||||
login_manager.init_app(app)
|
||||
|
@ -334,12 +339,12 @@ def instances():
|
|||
# Manage instances
|
||||
if request.method == "POST":
|
||||
# Check operation
|
||||
if not "operation" in request.form or not request.form["operation"] in [
|
||||
if not "operation" in request.form or not request.form["operation"] in (
|
||||
"reload",
|
||||
"start",
|
||||
"stop",
|
||||
"restart",
|
||||
]:
|
||||
):
|
||||
flash("Missing operation parameter on /instances.", "error")
|
||||
return redirect(url_for("loading", next=url_for("instances")))
|
||||
|
||||
|
@ -384,11 +389,11 @@ def services():
|
|||
if request.method == "POST":
|
||||
|
||||
# Check operation
|
||||
if not "operation" in request.form or not request.form["operation"] in [
|
||||
if not "operation" in request.form or not request.form["operation"] in (
|
||||
"new",
|
||||
"edit",
|
||||
"delete",
|
||||
]:
|
||||
):
|
||||
flash("Missing operation parameter on /services.", "error")
|
||||
return redirect(url_for("loading", next=url_for("services")))
|
||||
|
||||
|
@ -574,11 +579,11 @@ def configs():
|
|||
operation = ""
|
||||
|
||||
# Check operation
|
||||
if not "operation" in request.form or not request.form["operation"] in [
|
||||
if not "operation" in request.form or not request.form["operation"] in (
|
||||
"new",
|
||||
"edit",
|
||||
"delete",
|
||||
]:
|
||||
):
|
||||
flash("Missing operation parameter on /configs.", "error")
|
||||
return redirect(url_for("loading", next=url_for("configs")))
|
||||
|
||||
|
@ -811,7 +816,7 @@ def plugins():
|
|||
except BadZipFile:
|
||||
error = 1
|
||||
flash(
|
||||
f"{file} is not a valid zip file. ({folder_name if folder_name else temp_folder_name})",
|
||||
f"{file} is not a valid zip file. ({folder_name or temp_folder_name})",
|
||||
"error",
|
||||
)
|
||||
else:
|
||||
|
@ -911,37 +916,37 @@ def plugins():
|
|||
except ReadError:
|
||||
error = 1
|
||||
flash(
|
||||
f"Couldn't read file {file} ({folder_name if folder_name else temp_folder_name})",
|
||||
f"Couldn't read file {file} ({folder_name or temp_folder_name})",
|
||||
"error",
|
||||
)
|
||||
except CompressionError:
|
||||
error = 1
|
||||
flash(
|
||||
f"{file} is not a valid tar file ({folder_name if folder_name else temp_folder_name})",
|
||||
f"{file} is not a valid tar file ({folder_name or temp_folder_name})",
|
||||
"error",
|
||||
)
|
||||
except HeaderError:
|
||||
error = 1
|
||||
flash(
|
||||
f"The file plugin.json in {file} is not valid ({folder_name if folder_name else temp_folder_name})",
|
||||
f"The file plugin.json in {file} is not valid ({folder_name or temp_folder_name})",
|
||||
"error",
|
||||
)
|
||||
except KeyError:
|
||||
error = 1
|
||||
flash(
|
||||
f"{file} is not a valid plugin (plugin.json file is missing) ({folder_name if folder_name else temp_folder_name})",
|
||||
f"{file} is not a valid plugin (plugin.json file is missing) ({folder_name or temp_folder_name})",
|
||||
"error",
|
||||
)
|
||||
except JSONDecodeError as e:
|
||||
error = 1
|
||||
flash(
|
||||
f"The file plugin.json in {file} is not valid ({e.msg}: line {e.lineno} column {e.colno} (char {e.pos})) ({folder_name if folder_name else temp_folder_name})",
|
||||
f"The file plugin.json in {file} is not valid ({e.msg}: line {e.lineno} column {e.colno} (char {e.pos})) ({folder_name or temp_folder_name})",
|
||||
"error",
|
||||
)
|
||||
except ValueError:
|
||||
error = 1
|
||||
flash(
|
||||
f"The file plugin.json is missing one or more of the following keys: <i>{', '.join(PLUGIN_KEYS)}</i> ({folder_name if folder_name else temp_folder_name})",
|
||||
f"The file plugin.json is missing one or more of the following keys: <i>{', '.join(PLUGIN_KEYS)}</i> ({folder_name or temp_folder_name})",
|
||||
"error",
|
||||
)
|
||||
except FileExistsError:
|
||||
|
@ -985,10 +990,8 @@ def plugins():
|
|||
|
||||
# Remove tmp folder
|
||||
if exists("/var/tmp/bunkerweb/ui"):
|
||||
try:
|
||||
with suppress(OSError):
|
||||
rmtree("/var/tmp/bunkerweb/ui")
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
return redirect(
|
||||
url_for("loading", next=url_for("plugins"), message="Reloading plugins")
|
||||
|
@ -1057,11 +1060,9 @@ def upload_plugin():
|
|||
if not file.filename.endswith((".zip", ".tar.gz", ".tar.xz")):
|
||||
return {"status": "ko"}, 422
|
||||
|
||||
with open(
|
||||
f"/var/tmp/bunkerweb/ui/{uuid4()}{file.filename[file.filename.index('.'):]}",
|
||||
"wb",
|
||||
) as f:
|
||||
f.write(file.read())
|
||||
Path(
|
||||
f"/var/tmp/bunkerweb/ui/{uuid4()}{file.filename[file.filename.index('.'):]}"
|
||||
).write_bytes(file.read())
|
||||
|
||||
return {"status": "ok"}, 201
|
||||
|
||||
|
@ -1127,7 +1128,7 @@ def custom_plugin(plugin):
|
|||
finally:
|
||||
# Remove the custom plugin from the shared library
|
||||
sys_path.pop()
|
||||
del sys_modules["actions"]
|
||||
sys_modules.pop("actions")
|
||||
del actions
|
||||
|
||||
if (
|
||||
|
@ -1304,7 +1305,7 @@ def logs_container(container_id):
|
|||
if from_date is not None:
|
||||
last_update = from_date
|
||||
|
||||
if any(arg and not arg.isdigit() for arg in [last_update, from_date, to_date]):
|
||||
if any(arg and not arg.isdigit() for arg in (last_update, from_date, to_date)):
|
||||
return (
|
||||
jsonify(
|
||||
{
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from copy import deepcopy
|
||||
from os import listdir, remove
|
||||
from pathlib import Path
|
||||
from time import sleep
|
||||
from flask import flash
|
||||
from os.path import exists, isfile
|
||||
|
@ -98,11 +99,8 @@ class Config:
|
|||
if not isfile(filename):
|
||||
return {}
|
||||
|
||||
with open(filename, "r") as f:
|
||||
env = f.read()
|
||||
|
||||
data = {}
|
||||
for line in env.split("\n"):
|
||||
for line in Path(filename).read_text().split("\n"):
|
||||
if not "=" in line:
|
||||
continue
|
||||
var = line.split("=")[0]
|
||||
|
@ -121,8 +119,9 @@ class Config:
|
|||
variables : dict
|
||||
The dict to convert to env file
|
||||
"""
|
||||
with open(filename, "w") as f:
|
||||
f.write("\n".join(f"{k}={variables[k]}" for k in sorted(variables)))
|
||||
Path(filename).write_text(
|
||||
"\n".join(f"{k}={variables[k]}" for k in sorted(variables))
|
||||
)
|
||||
|
||||
def __gen_conf(self, global_conf: dict, services_conf: list[dict]) -> None:
|
||||
"""Generates the nginx configuration file from the given configuration
|
||||
|
@ -306,7 +305,7 @@ class Config:
|
|||
if edit is False:
|
||||
return f"Service {service['SERVER_NAME']} already exists.", 1
|
||||
|
||||
del services[i]
|
||||
services.pop(i)
|
||||
|
||||
services.append(variables)
|
||||
self.__gen_conf(self.get_config(methods=False), services)
|
||||
|
@ -397,11 +396,11 @@ class Config:
|
|||
|
||||
for k in full_env:
|
||||
if k.startswith(service_name):
|
||||
del new_env[k]
|
||||
new_env.pop(k)
|
||||
|
||||
for service in new_services:
|
||||
if k in service:
|
||||
del service[k]
|
||||
service.pop(k)
|
||||
|
||||
self.__gen_conf(new_env, new_services)
|
||||
return f"Configuration for {service_name} has been deleted.", 0
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from os import listdir, mkdir, remove, replace, walk
|
||||
from os.path import dirname, exists, join, isfile
|
||||
from pathlib import Path
|
||||
from re import compile as re_compile
|
||||
from shutil import rmtree, move as shutil_move
|
||||
from typing import Tuple
|
||||
|
@ -95,7 +96,7 @@ class ConfigFiles:
|
|||
def create_folder(self, path: str, name: str) -> Tuple[str, int]:
|
||||
folder_path = join(path, name) if not path.endswith(name) else path
|
||||
try:
|
||||
mkdir(folder_path)
|
||||
Path(folder_path).mkdir()
|
||||
except OSError:
|
||||
return f"Could not create {folder_path}", 1
|
||||
|
||||
|
@ -103,11 +104,8 @@ class ConfigFiles:
|
|||
|
||||
def create_file(self, path: str, name: str, content: str) -> Tuple[str, int]:
|
||||
file_path = join(path, name)
|
||||
mkdir(path, exist_ok=True)
|
||||
|
||||
with open(file_path, "w") as f:
|
||||
f.write(content)
|
||||
|
||||
Path(path).mkdir(exist_ok=True)
|
||||
Path(file_path).write_text(content)
|
||||
return f"The file {file_path} was successfully created", 0
|
||||
|
||||
def edit_folder(self, path: str, name: str, old_name: str) -> Tuple[str, int]:
|
||||
|
@ -137,8 +135,7 @@ class ConfigFiles:
|
|||
old_path = dirname(join(path, old_name))
|
||||
|
||||
try:
|
||||
with open(old_path, "r") as f:
|
||||
file_content = f.read()
|
||||
file_content = Path(old_path).read_text()
|
||||
except FileNotFoundError:
|
||||
return f"Could not find {old_path}", 1
|
||||
|
||||
|
@ -161,7 +158,6 @@ class ConfigFiles:
|
|||
except OSError:
|
||||
return f"Could not remove {old_path}", 1
|
||||
|
||||
with open(new_path, "w") as f:
|
||||
f.write(content)
|
||||
Path(new_path).write_text(content)
|
||||
|
||||
return f"The file {old_path} was successfully edited", 0
|
||||
|
|
|
@ -18,7 +18,7 @@ class LinuxTest(Test):
|
|||
r"app2\.example\.com": getenv("TEST_DOMAIN1_2"),
|
||||
r"app3\.example\.com": getenv("TEST_DOMAIN1_3"),
|
||||
}
|
||||
if not distro in ["ubuntu", "debian", "fedora", "centos"]:
|
||||
if not distro in ("ubuntu", "debian", "fedora", "centos"):
|
||||
raise Exception(f"unknown distro {distro}")
|
||||
self.__distro = distro
|
||||
self.__logger = setup_logger("Linux_test", getenv("LOGLEVEL", "INFO"))
|
||||
|
@ -40,9 +40,9 @@ class LinuxTest(Test):
|
|||
proc = run(cmd, shell=True)
|
||||
if proc.returncode != 0:
|
||||
raise Exception("docker run failed (linux stack)")
|
||||
if distro in ["ubuntu", "debian"]:
|
||||
if distro in ("ubuntu", "debian"):
|
||||
cmd = "apt install -y /opt/\$(ls /opt | grep deb)"
|
||||
elif distro in ["centos", "fedora"]:
|
||||
elif distro in ("centos", "fedora"):
|
||||
cmd = "dnf install -y /opt/\$(ls /opt | grep rpm)"
|
||||
proc = LinuxTest.docker_exec(distro, cmd)
|
||||
if proc.returncode != 0:
|
||||
|
@ -64,7 +64,7 @@ class LinuxTest(Test):
|
|||
f"docker exec failed for directory {src} (linux stack)"
|
||||
)
|
||||
|
||||
if distro in ["ubuntu", "debian"]:
|
||||
if distro in ("ubuntu", "debian"):
|
||||
LinuxTest.docker_exec(
|
||||
distro,
|
||||
"DEBIAN_FRONTEND=noninteractive apt-get install -y php-fpm unzip",
|
||||
|
@ -87,7 +87,7 @@ class LinuxTest(Test):
|
|||
LinuxTest.docker_exec(
|
||||
distro, "systemctl stop php7.4-fpm ; systemctl start php7.4-fpm"
|
||||
)
|
||||
elif distro in ["centos", "fedora"]:
|
||||
elif distro in ("centos", "fedora"):
|
||||
LinuxTest.docker_exec(distro, "dnf install -y php-fpm unzip")
|
||||
LinuxTest.docker_cp(
|
||||
distro, "./tests/www-rpm.conf", "/etc/php-fpm.d/www.conf"
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
from abc import ABC
|
||||
from pathlib import Path
|
||||
from time import time, sleep
|
||||
from requests import get
|
||||
from traceback import format_exc
|
||||
|
@ -134,11 +135,9 @@ class Test(ABC):
|
|||
@staticmethod
|
||||
def replace_in_file(path, old, new):
|
||||
try:
|
||||
with open(path, "r") as f:
|
||||
content = f.read()
|
||||
content = sub(old, new, content, flags=MULTILINE)
|
||||
with open(path, "w") as f:
|
||||
f.write(content)
|
||||
Path(path).write_text(
|
||||
sub(old, new, Path(path).read_text(), flags=MULTILINE)
|
||||
)
|
||||
except:
|
||||
setup_logger("Test", getenv("LOG_LEVEL", "INFO")).warning(
|
||||
f"Can't replace file {path}"
|
||||
|
|
|
@ -1,15 +1,15 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from pathlib import Path
|
||||
from sys import path, argv, exit
|
||||
from glob import glob
|
||||
from os import getcwd, getenv, _exit
|
||||
from os import getenv, _exit
|
||||
from os.path import isfile
|
||||
from traceback import format_exc
|
||||
from json import loads
|
||||
from subprocess import run
|
||||
|
||||
path.append(f"{getcwd()}/utils")
|
||||
path.append(f"{getcwd()}/tests")
|
||||
path.extend((f"{Path.cwd()}/utils", f"{Path.cwd()}/tests"))
|
||||
|
||||
from Test import Test
|
||||
from DockerTest import DockerTest
|
||||
|
@ -26,7 +26,7 @@ if len(argv) <= 1:
|
|||
exit(1)
|
||||
|
||||
test_type = argv[1]
|
||||
if not test_type in ["linux", "docker", "autoconf", "swarm", "kubernetes", "ansible"]:
|
||||
if not test_type in ("linux", "docker", "autoconf", "swarm", "kubernetes", "ansible"):
|
||||
logger.error(f"Wrong type argument {test_type}")
|
||||
exit(1)
|
||||
|
||||
|
|
Loading…
Reference in New Issue