Refactor every .py file

This commit is contained in:
Théophile Diot 2022-12-14 17:09:57 +01:00
parent 7689dac76d
commit 146338de63
No known key found for this signature in database
GPG Key ID: E752C80DB72BB014
36 changed files with 295 additions and 246 deletions

View File

@ -6,9 +6,9 @@ from pytablewriter import MarkdownTableWriter
def print_md_table(settings):
values = []
for setting, data in settings.items():
values.append(
writer = MarkdownTableWriter(
headers=["Setting", "Default", "Context", "Multiple", "Description"],
value_matrix=[
[
f"`{setting}`",
"" if data["default"] == "" else f"`{data['default']}`",
@ -16,13 +16,11 @@ def print_md_table(settings):
"no" if not "multiple" in data else "yes",
data["help"],
]
)
writer = MarkdownTableWriter(
headers=["Setting", "Default", "Context", "Multiple", "Description"],
value_matrix=values,
for setting, data in settings.items()
],
)
writer.write_table()
print("")
print()
print("# Settings\n")

View File

@ -22,14 +22,14 @@ class IngressController(Controller, ConfigCaller):
self.__logger = setup_logger("Ingress-controller", getenv("LOG_LEVEL", "INFO"))
def _get_controller_instances(self):
controller_instances = []
for pod in self.__corev1.list_pod_for_all_namespaces(watch=False).items:
return [
pod
for pod in self.__corev1.list_pod_for_all_namespaces(watch=False).items
if (
pod.metadata.annotations != None
and "bunkerweb.io/INSTANCE" in pod.metadata.annotations
):
controller_instances.append(pod)
return controller_instances
)
]
def _to_instances(self, controller_instance):
instance = {}
@ -289,9 +289,10 @@ class IngressController(Controller, ConfigCaller):
def process_events(self):
watch_types = ["pod", "ingress", "configmap"]
threads = []
for watch_type in watch_types:
threads.append(Thread(target=self.__watch, args=(watch_type,)))
threads = [
Thread(target=self.__watch, args=(watch_type,))
for watch_type in watch_types
]
for thread in threads:
thread.start()
for thread in threads:

View File

@ -143,9 +143,10 @@ class SwarmController(Controller, ConfigCaller):
def process_events(self):
event_types = ["service", "config"]
threads = []
for event_type in event_types:
threads.append(Thread(target=self.__event, args=(event_type,)))
threads = [
Thread(target=self.__event, args=(event_type,))
for event_type in event_types
]
for thread in threads:
thread.start()
for thread in threads:

View File

@ -5,10 +5,14 @@ from signal import SIGINT, SIGTERM, signal
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
sys_path.append("/usr/share/bunkerweb/deps/python")
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/api")
sys_path.append("/usr/share/bunkerweb/db")
sys_path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/api",
"/usr/share/bunkerweb/db",
)
)
from logger import setup_logger
from SwarmController import SwarmController

View File

@ -42,7 +42,7 @@ class CLI(ApiCaller):
def __get_apis(self):
# Docker case
if self.__integration == "docker" or self.__integration == "linux":
if self.__integration in ("docker", "linux"):
return [
API(
f"http://127.0.0.1:{self.__variables['API_HTTP_PORT']}",

View File

@ -5,10 +5,14 @@ from os import _exit
from sys import exit as sys_exit, path
from traceback import format_exc
path.append("/usr/share/bunkerweb/deps/python")
path.append("/usr/share/bunkerweb/cli")
path.append("/usr/share/bunkerweb/utils")
path.append("/usr/share/bunkerweb/api")
path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/cli",
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/api",
)
)
from logger import setup_logger
from CLI import CLI

View File

@ -1,15 +1,21 @@
#!/usr/bin/python3
from contextlib import suppress
from ipaddress import ip_address, ip_network
from os import _exit, getenv, makedirs
from pathlib import Path
from re import IGNORECASE, compile as re_compile
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
from typing import Tuple
sys_path.append("/usr/share/bunkerweb/deps/python")
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/db")
sys_path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/db",
)
)
from requests import get
@ -25,17 +31,13 @@ uri_rx = re_compile(rb"^/")
def check_line(kind: str, line: bytes) -> Tuple[bool, bytes]:
if kind == "IP":
if b"/" in line:
try:
with suppress(ValueError):
ip_network(line.decode("utf-8"))
return True, line
except ValueError:
pass
else:
try:
with suppress(ValueError):
ip_address(line.decode("utf-8"))
return True, line
except ValueError:
pass
elif kind == "RDNS":
if rdns_rx.match(line):
return True, line.lower()
@ -163,8 +165,7 @@ try:
content += data + b"\n"
i += 1
with open(f"/var/tmp/bunkerweb/blacklist/{kind}.list", "wb") as f:
f.write(content)
Path(f"/var/tmp/bunkerweb/blacklist/{kind}.list").write_bytes(content)
logger.info(f"Downloaded {i} bad {kind}")
# Check if file has changed

View File

@ -2,13 +2,18 @@
from os import _exit, getenv, makedirs
from os.path import isfile
from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
sys_path.append("/usr/share/bunkerweb/deps/python")
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/db")
sys_path.append("/usr/share/bunkerweb/core/bunkernet/jobs")
sys_path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/db",
"/usr/share/bunkerweb/core/bunkernet/jobs",
)
)
from bunkernet import data
from Database import Database
@ -83,8 +88,7 @@ try:
# Writing data to file
logger.info("Saving BunkerNet data ...")
content = "\n".join(data["data"]).encode("utf-8")
with open("/var/tmp/bunkerweb/bunkernet-ip.list", "wb") as f:
f.write(content)
Path("/var/tmp/bunkerweb/bunkernet-ip.list").write_bytes(content)
# Check if file has changed
new_hash = file_hash("/var/tmp/bunkerweb/bunkernet-ip.list")

View File

@ -2,14 +2,19 @@
from os import _exit, getenv, makedirs, remove
from os.path import isfile
from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from time import sleep
from traceback import format_exc
sys_path.append("/usr/share/bunkerweb/deps/python")
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/db")
sys_path.append("/usr/share/bunkerweb/core/bunkernet/jobs")
sys_path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/db",
"/usr/share/bunkerweb/core/bunkernet/jobs",
)
)
from bunkernet import register, ping, get_id
from Database import Database
@ -76,8 +81,7 @@ try:
f"Successfully registered on BunkerNet API with instance id {data['data']}"
)
else:
with open("/var/cache/bunkerweb/bunkernet/instance.id", "r") as f:
bunkernet_id = f.read()
bunkernet_id = Path("/var/cache/bunkerweb/bunkernet/instance.id").read_text()
logger.info(f"Already registered on BunkerNet API with instance id {get_id()}")
# Ping
@ -115,8 +119,7 @@ try:
logger.info("Connectivity with BunkerWeb is successful !")
status = 1
if not isfile("/var/cache/bunkerweb/bunkernet/instance.id"):
with open("/var/cache/bunkerweb/bunkernet/instance.id", "w") as f:
f.write(bunkernet_id)
Path("/var/cache/bunkerweb/bunkernet/instance.id").write_text(bunkernet_id)
# Update db
err = db.update_job_cache(

View File

@ -2,13 +2,18 @@
from os import getenv, makedirs, remove
from os.path import isfile
from pathlib import Path
from shutil import copy
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
sys_path.append("/usr/share/bunkerweb/deps/python")
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/db")
sys_path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/db",
)
)
from Database import Database
from jobs import file_hash
@ -45,16 +50,13 @@ def check_cert(cert_path, key_path, first_server: str = None) -> bool:
cert_hash = file_hash(cert_path)
if not isfile(cert_cache_path):
with open(cert_cache_path, "w") as f:
f.write(cert_hash)
Path(cert_cache_path).write_text(cert_hash)
old_hash = file_hash(cert_cache_path)
if old_hash == cert_hash:
return False
with open(cert_cache_path, "w") as f:
f.write(cert_hash)
Path(cert_cache_path).write_text(cert_hash)
copy(cert_path, cert_cache_path.replace(".hash", ""))
if not isfile(key_path):
@ -71,8 +73,7 @@ def check_cert(cert_path, key_path, first_server: str = None) -> bool:
key_hash = file_hash(key_path)
if not isfile(key_cache_path):
with open(key_cache_path, "w") as f:
f.write(key_hash)
Path(key_cache_path).write_text(key_hash)
old_hash = file_hash(key_cache_path)
if old_hash != key_hash:

View File

@ -1,15 +1,21 @@
#!/usr/bin/python3
from contextlib import suppress
from ipaddress import ip_address, ip_network
from os import _exit, getenv, makedirs
from pathlib import Path
from re import IGNORECASE, compile as re_compile
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
from typing import Tuple
sys_path.append("/usr/share/bunkerweb/deps/python")
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/db")
sys_path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/db",
)
)
from requests import get
@ -25,17 +31,13 @@ uri_rx = re_compile(rb"^/")
def check_line(kind: str, line: bytes) -> Tuple[bool, bytes]:
if kind == "IP":
if b"/" in line:
try:
with suppress(ValueError):
ip_network(line.decode("utf-8"))
return True, line
except ValueError:
pass
else:
try:
with suppress(ValueError):
ip_address(line.decode("utf-8"))
return True, line
except ValueError:
pass
elif kind == "RDNS":
if rdns_rx.match(line):
return True, line.lower()
@ -147,8 +149,7 @@ try:
content += data + b"\n"
i += 1
with open(f"/var/tmp/bunkerweb/greylist/{kind}.list", "wb") as f:
f.write(content)
Path(f"/var/tmp/bunkerweb/greylist/{kind}.list").write_bytes(content)
logger.info(f"Downloaded {i} grey {kind}")
# Check if file has changed

View File

@ -12,9 +12,13 @@ from shutil import copytree, rmtree
from traceback import format_exc
from zipfile import ZipFile
sys_path.append("/usr/share/bunkerweb/deps/python")
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/db")
sys_path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/db",
)
)
from requests import get
@ -32,7 +36,6 @@ status = 0
def install_plugin(plugin_dir):
# Load plugin.json
metadata = {}
with open(f"{plugin_dir}plugin.json", "rb") as f:
metadata = loads(f.read())
# Don't go further if plugin is already installed

View File

@ -3,12 +3,17 @@
from datetime import date
from gzip import decompress
from os import _exit, getenv
from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
sys_path.append("/usr/share/bunkerweb/deps/python")
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/db")
sys_path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/db",
)
)
from maxminddb import open_database
from requests import get
@ -39,8 +44,7 @@ try:
# Save it to temp
logger.info("Saving mmdb file to tmp ...")
with open("/var/tmp/bunkerweb/asn.mmdb", "wb") as f:
f.write(decompress(resp.content))
Path(f"/var/tmp/bunkerweb/asn.mmdb").write_bytes(decompress(resp.content))
# Try to load it
logger.info("Checking if mmdb file is valid ...")

View File

@ -3,12 +3,17 @@
from datetime import date
from gzip import decompress
from os import _exit, getenv
from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
sys_path.append("/usr/share/bunkerweb/deps/python")
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/db")
sys_path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/db",
)
)
from requests import get
from maxminddb import open_database
@ -39,8 +44,7 @@ try:
# Save it to temp
logger.info("Saving mmdb file to tmp ...")
with open("/var/tmp/bunkerweb/country.mmdb", "wb") as f:
f.write(decompress(resp.content))
Path(f"/var/tmp/bunkerweb/country.mmdb").write_bytes(decompress(resp.content))
# Try to load it
logger.info("Checking if mmdb file is valid ...")

View File

@ -2,13 +2,18 @@
from os import getenv, makedirs
from os.path import exists
from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
sys_path.append("/usr/share/bunkerweb/deps/python")
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/api")
sys_path.append("/usr/share/bunkerweb/db")
sys_path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/api",
"/usr/share/bunkerweb/db",
)
)
from Database import Database
from logger import setup_logger
@ -67,8 +72,7 @@ try:
else:
root_dir = "/var/tmp/bunkerweb/lets-encrypt/.well-known/acme-challenge/"
makedirs(root_dir, exist_ok=True)
with open(f"{root_dir}{token}", "w") as f:
f.write(validation)
Path(f"{root_dir}{token}").write_text(validation)
except:
status = 1
logger.error(f"Exception while running certbot-auth.py :\n{format_exc()}")

View File

@ -5,10 +5,14 @@ from os.path import exists, isfile
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
sys_path.append("/usr/share/bunkerweb/deps/python")
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/api")
sys_path.append("/usr/share/bunkerweb/db")
sys_path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/api",
"/usr/share/bunkerweb/db",
)
)
from Database import Database
from logger import setup_logger

View File

@ -9,10 +9,14 @@ from sys import exit as sys_exit, path as sys_path
from tarfile import open as tar_open
from traceback import format_exc
sys_path.append("/usr/share/bunkerweb/deps/python")
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/api")
sys_path.append("/usr/share/bunkerweb/db")
sys_path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/api",
"/usr/share/bunkerweb/db",
)
)
from Database import Database
from logger import setup_logger

View File

@ -2,13 +2,18 @@
from os import environ, getcwd, getenv
from os.path import exists
from pathlib import Path
from subprocess import DEVNULL, STDOUT, run
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
sys_path.append("/usr/share/bunkerweb/deps/python")
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/db")
sys_path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/db",
)
)
from Database import Database
from logger import setup_logger
@ -93,10 +98,9 @@ try:
)
if exists(f"/etc/letsencrypt/live/{first_server}/cert.pem"):
with open(
f"/etc/letsencrypt/live/{first_server}/cert.pem", "rb"
) as f:
cert = f.read()
cert = Path(
f"/etc/letsencrypt/live/{first_server}/cert.pem"
).read_bytes()
# Update db
err = db.update_job_cache(
@ -132,10 +136,9 @@ try:
)
if exists(f"/etc/letsencrypt/live/{first_server}/cert.pem"):
with open(
f"/etc/letsencrypt/live/{first_server}/cert.pem", "rb"
) as f:
cert = f.read()
cert = Path(
f"/etc/letsencrypt/live/{first_server}/cert.pem"
).read_bytes()
# Update db
err = db.update_job_cache(

View File

@ -6,8 +6,12 @@ from subprocess import DEVNULL, STDOUT, run
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
sys_path.append("/usr/share/bunkerweb/deps/python")
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/utils",
)
)
from logger import setup_logger
@ -21,7 +25,7 @@ def renew(domain):
"--cert-name",
domain,
"--deploy-hook",
f"{getcwd()}/certbot-deploy.py",
"/usr/share/bunkerweb/core/letsencrypt/jobs/certbot-deploy.py",
],
stdin=DEVNULL,
stderr=STDOUT,

View File

@ -1,13 +1,19 @@
#!/usr/bin/python3
from contextlib import suppress
from ipaddress import ip_address, ip_network
from os import _exit, getenv, makedirs
from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
sys_path.append("/usr/share/bunkerweb/deps/python")
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/db")
sys_path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/db",
)
)
from requests import get
@ -18,17 +24,13 @@ from jobs import cache_file, cache_hash, file_hash, is_cached_file
def check_line(line):
if "/" in line:
try:
with suppress(ValueError):
ip_network(line)
return True, line
except ValueError:
pass
else:
try:
with suppress(ValueError):
ip_address(line)
return True, line
except ValueError:
pass
return False, ""
@ -69,10 +71,7 @@ try:
_exit(0)
# Get URLs
urls = []
for url in getenv("REALIP_FROM_URLS", "").split(" "):
if url != "" and url not in urls:
urls.append(url)
urls = [url for url in getenv("REALIP_FROM_URLS", "").split(" ") if url]
# Download and write data to temp file
i = 0
@ -101,8 +100,7 @@ try:
f"Exception while getting RealIP list from {url} :\n{format_exc()}"
)
with open("/var/tmp/bunkerweb/realip-combined.list", "wb") as f:
f.write(content)
Path("/var/tmp/bunkerweb/realip-combined.list").write_bytes(content)
# Check if file has changed
new_hash = file_hash("/var/tmp/bunkerweb/realip-combined.list")

View File

@ -6,9 +6,13 @@ from subprocess import DEVNULL, STDOUT, run
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
sys_path.append("/usr/share/bunkerweb/deps/python")
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/db")
sys_path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/db",
)
)
from Database import Database
from logger import setup_logger

View File

@ -1,15 +1,21 @@
#!/usr/bin/python3
from contextlib import suppress
from ipaddress import ip_address, ip_network
from os import _exit, getenv, makedirs
from pathlib import Path
from re import IGNORECASE, compile as re_compile
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
from typing import Tuple
sys_path.append("/usr/share/bunkerweb/deps/python")
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/db")
sys_path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/db",
)
)
from requests import get
@ -25,17 +31,13 @@ uri_rx = re_compile(rb"^/")
def check_line(kind: str, line: bytes) -> Tuple[bool, bytes]:
if kind == "IP":
if b"/" in line:
try:
with suppress(ValueError):
ip_network(line.decode("utf-8"))
return True, line
except ValueError:
pass
else:
try:
with suppress(ValueError):
ip_address(line.decode("utf-8"))
return True, line
except ValueError:
pass
elif kind == "RDNS":
if rdns_rx.match(line):
return True, line.lower()
@ -147,8 +149,7 @@ try:
content += data + b"\n"
i += 1
with open(f"/var/tmp/bunkerweb/whitelist/{kind}.list", "wb") as f:
f.write(content)
Path(f"/var/tmp/bunkerweb/whitelist/{kind}.list").write_bytes(content)
logger.info(f"Downloaded {i} good {kind}")
# Check if file has changed

View File

@ -1,4 +1,4 @@
from contextlib import contextmanager
from contextlib import contextmanager, suppress
from copy import deepcopy
from datetime import datetime
from hashlib import sha256
@ -68,10 +68,8 @@ class Database:
)
if sqlalchemy_string.startswith("sqlite"):
try:
with suppress(FileExistsError):
makedirs(dirname(sqlalchemy_string.split("///")[1]), exist_ok=True)
except FileExistsError:
pass
elif "+" in sqlalchemy_string and "+pymysql" not in sqlalchemy_string:
splitted = sqlalchemy_string.split("+")
sqlalchemy_string = f"{splitted[0]}:{':'.join(splitted[1].split(':')[1:])}"
@ -567,12 +565,10 @@ class Database:
Global_values.suffix == suffix,
).update({Global_values.value: value})
try:
with suppress(ProgrammingError, OperationalError):
metadata = session.query(Metadata).get(1)
if metadata is not None and not metadata.first_config_saved:
metadata.first_config_saved = True
except (ProgrammingError, OperationalError):
pass
try:
session.add_all(to_put)
@ -645,8 +641,9 @@ class Database:
if custom_conf is None:
to_put.append(Custom_configs(**config))
elif config["checksum"] != custom_conf.checksum and (
method == custom_conf.method or method == "autoconf"
elif config["checksum"] != custom_conf.checksum and method in (
custom_conf.method,
"autoconf",
):
session.query(Custom_configs).filter(
Custom_configs.service_id == config.get("service_id", None),
@ -810,10 +807,9 @@ class Database:
for key, value in deepcopy(tmp_config).items():
if key.startswith(f"{service}_"):
tmp_config[key.replace(f"{service}_", "")] = value
del tmp_config[key]
tmp_config[key.replace(f"{service}_", "")] = tmp_config.pop(key)
elif any(key.startswith(f"{s}_") for s in service_names):
del tmp_config[key]
tmp_config.pop(key)
else:
tmp_config[key] = (
{"value": value["value"], "method": "default"}
@ -1367,7 +1363,7 @@ class Database:
return "An instance with the same hostname already exists."
session.add(
Instances(hostname=hostname, port=int(port), server_name=server_name)
Instances(hostname=hostname, port=port, server_name=server_name)
)
try:

View File

@ -62,9 +62,9 @@ class Templator:
if config != None:
real_config = config
Path(dirname(real_path)).mkdir(parents=True, exist_ok=True)
with open(real_path, "w") as f:
for k, v in real_config.items():
f.write(f"{k}={v}\n")
Path(real_path).write_text(
"\n".join(f"{k}={v}" for k, v in real_config.items())
)
def __render_global(self):
self.__write_config()
@ -129,8 +129,7 @@ class Templator:
real_name = name
jinja_template = self.__jinja_env.get_template(template)
Path(dirname(f"{real_output}{real_name}")).mkdir(parents=True, exist_ok=True)
with open(f"{real_output}{real_name}", "w") as f:
f.write(jinja_template.render(real_config))
Path(f"{real_output}{real_name}").write_text(jinja_template.render(real_config))
def is_custom_conf(path):
return glob(f"{path}/*.conf")

View File

@ -11,9 +11,13 @@ from time import sleep
from traceback import format_exc
sys_path.append("/usr/share/bunkerweb/deps/python")
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/api")
sys_path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/api",
)
)
from logger import setup_logger
from Configurator import Configurator

View File

@ -13,10 +13,14 @@ from traceback import format_exc
from typing import Any
sys_path.append("/usr/share/bunkerweb/deps/python")
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/api")
sys_path.append("/usr/share/bunkerweb/db")
sys_path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/api",
"/usr/share/bunkerweb/db",
)
)
from docker import DockerClient
from kubernetes import client as kube_client

View File

@ -1,3 +1,4 @@
from contextlib import suppress
from datetime import datetime
from hashlib import sha512
from json import dumps, loads
@ -21,7 +22,6 @@ def is_cached_file(file, expire):
return False
if not path.isfile(f"{file}.md"):
return False
cached_time = 0
with open(f"{file}.md", "r") as f:
cached_time = loads(f.read())["date"]
current_time = datetime.timestamp(datetime.now())
@ -51,11 +51,9 @@ def file_hash(file):
def cache_hash(cache):
try:
with suppress(BaseException):
with open(f"{cache}.md", "r") as f:
return loads(f.read())["checksum"]
except:
pass
return None

View File

@ -13,8 +13,7 @@ from schedule import (
from sys import path as sys_path
from traceback import format_exc
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/db")
sys_path.extend(("/usr/share/bunkerweb/utils", "/usr/share/bunkerweb/db"))
from Database import Database
from logger import setup_logger

View File

@ -16,6 +16,7 @@ from os import (
walk,
)
from os.path import dirname, exists, isdir, isfile, islink, join
from pathlib import Path
from shutil import chown, copy, rmtree
from signal import SIGINT, SIGTERM, signal, SIGHUP
from subprocess import run as subprocess_run, DEVNULL, STDOUT
@ -24,10 +25,14 @@ from time import sleep
from traceback import format_exc
from typing import Any, Dict, List
sys_path.append("/usr/share/bunkerweb/deps/python")
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/api")
sys_path.append("/usr/share/bunkerweb/db")
sys_path.extend(
(
"/usr/share/bunkerweb/deps/python",
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/api",
"/usr/share/bunkerweb/db",
)
)
from dotenv import dotenv_values
@ -97,9 +102,8 @@ def generate_custom_configs(
if custom_config["service_id"]:
tmp_path += f"/{custom_config['service_id']}"
tmp_path += f"/{custom_config['name']}.conf"
makedirs(dirname(tmp_path), exist_ok=True)
with open(tmp_path, "wb") as f:
f.write(custom_config["data"])
Path(dirname(tmp_path)).mkdir(parents=True, exist_ok=True)
Path(tmp_path).write_bytes(custom_config["data"])
# Fix permissions for the custom configs folder
for root, dirs, files in walk("/data/configs", topdown=False):
@ -127,8 +131,7 @@ if __name__ == "__main__":
_exit(1)
# Write pid to file
with open("/var/tmp/bunkerweb/scheduler.pid", "w") as f:
f.write(str(getpid()))
Path("/var/tmp/bunkerweb/scheduler.pid").write_text(str(getpid()))
# Parse arguments
parser = ArgumentParser(description="Job scheduler for BunkerWeb")

View File

@ -1,4 +1,6 @@
from contextlib import suppress
from io import BytesIO
from pathlib import Path
from signal import SIGINT, signal, SIGTERM
from bs4 import BeautifulSoup
from copy import deepcopy
@ -41,9 +43,13 @@ from typing import Optional
from uuid import uuid4
from zipfile import BadZipFile, ZipFile
sys_path.append("/usr/share/bunkerweb/utils")
sys_path.append("/usr/share/bunkerweb/api")
sys_path.append("/usr/share/bunkerweb/db")
sys_path.extend(
(
"/usr/share/bunkerweb/utils",
"/usr/share/bunkerweb/api",
"/usr/share/bunkerweb/db",
)
)
from src.Instances import Instances
from src.ConfigFiles import ConfigFiles
@ -112,8 +118,7 @@ if not vars["FLASK_ENV"] == "development" and vars["ABSOLUTE_URI"].endswith(
logger.error("Please change the default URL.")
sys_exit(1)
with open("/var/tmp/bunkerweb/ui.pid", "w") as f:
f.write(str(getpid()))
Path("/var/tmp/bunkerweb/ui.pid").write_text(str(getpid()))
login_manager = LoginManager()
login_manager.init_app(app)
@ -334,12 +339,12 @@ def instances():
# Manage instances
if request.method == "POST":
# Check operation
if not "operation" in request.form or not request.form["operation"] in [
if not "operation" in request.form or not request.form["operation"] in (
"reload",
"start",
"stop",
"restart",
]:
):
flash("Missing operation parameter on /instances.", "error")
return redirect(url_for("loading", next=url_for("instances")))
@ -384,11 +389,11 @@ def services():
if request.method == "POST":
# Check operation
if not "operation" in request.form or not request.form["operation"] in [
if not "operation" in request.form or not request.form["operation"] in (
"new",
"edit",
"delete",
]:
):
flash("Missing operation parameter on /services.", "error")
return redirect(url_for("loading", next=url_for("services")))
@ -574,11 +579,11 @@ def configs():
operation = ""
# Check operation
if not "operation" in request.form or not request.form["operation"] in [
if not "operation" in request.form or not request.form["operation"] in (
"new",
"edit",
"delete",
]:
):
flash("Missing operation parameter on /configs.", "error")
return redirect(url_for("loading", next=url_for("configs")))
@ -811,7 +816,7 @@ def plugins():
except BadZipFile:
error = 1
flash(
f"{file} is not a valid zip file. ({folder_name if folder_name else temp_folder_name})",
f"{file} is not a valid zip file. ({folder_name or temp_folder_name})",
"error",
)
else:
@ -911,37 +916,37 @@ def plugins():
except ReadError:
error = 1
flash(
f"Couldn't read file {file} ({folder_name if folder_name else temp_folder_name})",
f"Couldn't read file {file} ({folder_name or temp_folder_name})",
"error",
)
except CompressionError:
error = 1
flash(
f"{file} is not a valid tar file ({folder_name if folder_name else temp_folder_name})",
f"{file} is not a valid tar file ({folder_name or temp_folder_name})",
"error",
)
except HeaderError:
error = 1
flash(
f"The file plugin.json in {file} is not valid ({folder_name if folder_name else temp_folder_name})",
f"The file plugin.json in {file} is not valid ({folder_name or temp_folder_name})",
"error",
)
except KeyError:
error = 1
flash(
f"{file} is not a valid plugin (plugin.json file is missing) ({folder_name if folder_name else temp_folder_name})",
f"{file} is not a valid plugin (plugin.json file is missing) ({folder_name or temp_folder_name})",
"error",
)
except JSONDecodeError as e:
error = 1
flash(
f"The file plugin.json in {file} is not valid ({e.msg}: line {e.lineno} column {e.colno} (char {e.pos})) ({folder_name if folder_name else temp_folder_name})",
f"The file plugin.json in {file} is not valid ({e.msg}: line {e.lineno} column {e.colno} (char {e.pos})) ({folder_name or temp_folder_name})",
"error",
)
except ValueError:
error = 1
flash(
f"The file plugin.json is missing one or more of the following keys: <i>{', '.join(PLUGIN_KEYS)}</i> ({folder_name if folder_name else temp_folder_name})",
f"The file plugin.json is missing one or more of the following keys: <i>{', '.join(PLUGIN_KEYS)}</i> ({folder_name or temp_folder_name})",
"error",
)
except FileExistsError:
@ -985,10 +990,8 @@ def plugins():
# Remove tmp folder
if exists("/var/tmp/bunkerweb/ui"):
try:
with suppress(OSError):
rmtree("/var/tmp/bunkerweb/ui")
except OSError:
pass
return redirect(
url_for("loading", next=url_for("plugins"), message="Reloading plugins")
@ -1057,11 +1060,9 @@ def upload_plugin():
if not file.filename.endswith((".zip", ".tar.gz", ".tar.xz")):
return {"status": "ko"}, 422
with open(
f"/var/tmp/bunkerweb/ui/{uuid4()}{file.filename[file.filename.index('.'):]}",
"wb",
) as f:
f.write(file.read())
Path(
f"/var/tmp/bunkerweb/ui/{uuid4()}{file.filename[file.filename.index('.'):]}"
).write_bytes(file.read())
return {"status": "ok"}, 201
@ -1127,7 +1128,7 @@ def custom_plugin(plugin):
finally:
# Remove the custom plugin from the shared library
sys_path.pop()
del sys_modules["actions"]
sys_modules.pop("actions")
del actions
if (
@ -1304,7 +1305,7 @@ def logs_container(container_id):
if from_date is not None:
last_update = from_date
if any(arg and not arg.isdigit() for arg in [last_update, from_date, to_date]):
if any(arg and not arg.isdigit() for arg in (last_update, from_date, to_date)):
return (
jsonify(
{

View File

@ -1,5 +1,6 @@
from copy import deepcopy
from os import listdir, remove
from pathlib import Path
from time import sleep
from flask import flash
from os.path import exists, isfile
@ -98,11 +99,8 @@ class Config:
if not isfile(filename):
return {}
with open(filename, "r") as f:
env = f.read()
data = {}
for line in env.split("\n"):
for line in Path(filename).read_text().split("\n"):
if not "=" in line:
continue
var = line.split("=")[0]
@ -121,8 +119,9 @@ class Config:
variables : dict
The dict to convert to env file
"""
with open(filename, "w") as f:
f.write("\n".join(f"{k}={variables[k]}" for k in sorted(variables)))
Path(filename).write_text(
"\n".join(f"{k}={variables[k]}" for k in sorted(variables))
)
def __gen_conf(self, global_conf: dict, services_conf: list[dict]) -> None:
"""Generates the nginx configuration file from the given configuration
@ -306,7 +305,7 @@ class Config:
if edit is False:
return f"Service {service['SERVER_NAME']} already exists.", 1
del services[i]
services.pop(i)
services.append(variables)
self.__gen_conf(self.get_config(methods=False), services)
@ -397,11 +396,11 @@ class Config:
for k in full_env:
if k.startswith(service_name):
del new_env[k]
new_env.pop(k)
for service in new_services:
if k in service:
del service[k]
service.pop(k)
self.__gen_conf(new_env, new_services)
return f"Configuration for {service_name} has been deleted.", 0

View File

@ -1,5 +1,6 @@
from os import listdir, mkdir, remove, replace, walk
from os.path import dirname, exists, join, isfile
from pathlib import Path
from re import compile as re_compile
from shutil import rmtree, move as shutil_move
from typing import Tuple
@ -95,7 +96,7 @@ class ConfigFiles:
def create_folder(self, path: str, name: str) -> Tuple[str, int]:
folder_path = join(path, name) if not path.endswith(name) else path
try:
mkdir(folder_path)
Path(folder_path).mkdir()
except OSError:
return f"Could not create {folder_path}", 1
@ -103,11 +104,8 @@ class ConfigFiles:
def create_file(self, path: str, name: str, content: str) -> Tuple[str, int]:
file_path = join(path, name)
mkdir(path, exist_ok=True)
with open(file_path, "w") as f:
f.write(content)
Path(path).mkdir(exist_ok=True)
Path(file_path).write_text(content)
return f"The file {file_path} was successfully created", 0
def edit_folder(self, path: str, name: str, old_name: str) -> Tuple[str, int]:
@ -137,8 +135,7 @@ class ConfigFiles:
old_path = dirname(join(path, old_name))
try:
with open(old_path, "r") as f:
file_content = f.read()
file_content = Path(old_path).read_text()
except FileNotFoundError:
return f"Could not find {old_path}", 1
@ -161,7 +158,6 @@ class ConfigFiles:
except OSError:
return f"Could not remove {old_path}", 1
with open(new_path, "w") as f:
f.write(content)
Path(new_path).write_text(content)
return f"The file {old_path} was successfully edited", 0

0
src/ui/src/__init__.py Normal file
View File

View File

@ -18,7 +18,7 @@ class LinuxTest(Test):
r"app2\.example\.com": getenv("TEST_DOMAIN1_2"),
r"app3\.example\.com": getenv("TEST_DOMAIN1_3"),
}
if not distro in ["ubuntu", "debian", "fedora", "centos"]:
if not distro in ("ubuntu", "debian", "fedora", "centos"):
raise Exception(f"unknown distro {distro}")
self.__distro = distro
self.__logger = setup_logger("Linux_test", getenv("LOGLEVEL", "INFO"))
@ -40,9 +40,9 @@ class LinuxTest(Test):
proc = run(cmd, shell=True)
if proc.returncode != 0:
raise Exception("docker run failed (linux stack)")
if distro in ["ubuntu", "debian"]:
if distro in ("ubuntu", "debian"):
cmd = "apt install -y /opt/\$(ls /opt | grep deb)"
elif distro in ["centos", "fedora"]:
elif distro in ("centos", "fedora"):
cmd = "dnf install -y /opt/\$(ls /opt | grep rpm)"
proc = LinuxTest.docker_exec(distro, cmd)
if proc.returncode != 0:
@ -64,7 +64,7 @@ class LinuxTest(Test):
f"docker exec failed for directory {src} (linux stack)"
)
if distro in ["ubuntu", "debian"]:
if distro in ("ubuntu", "debian"):
LinuxTest.docker_exec(
distro,
"DEBIAN_FRONTEND=noninteractive apt-get install -y php-fpm unzip",
@ -87,7 +87,7 @@ class LinuxTest(Test):
LinuxTest.docker_exec(
distro, "systemctl stop php7.4-fpm ; systemctl start php7.4-fpm"
)
elif distro in ["centos", "fedora"]:
elif distro in ("centos", "fedora"):
LinuxTest.docker_exec(distro, "dnf install -y php-fpm unzip")
LinuxTest.docker_cp(
distro, "./tests/www-rpm.conf", "/etc/php-fpm.d/www.conf"

View File

@ -1,4 +1,5 @@
from abc import ABC
from pathlib import Path
from time import time, sleep
from requests import get
from traceback import format_exc
@ -134,11 +135,9 @@ class Test(ABC):
@staticmethod
def replace_in_file(path, old, new):
try:
with open(path, "r") as f:
content = f.read()
content = sub(old, new, content, flags=MULTILINE)
with open(path, "w") as f:
f.write(content)
Path(path).write_text(
sub(old, new, Path(path).read_text(), flags=MULTILINE)
)
except:
setup_logger("Test", getenv("LOG_LEVEL", "INFO")).warning(
f"Can't replace file {path}"

View File

@ -1,15 +1,15 @@
#!/usr/bin/env python3
from pathlib import Path
from sys import path, argv, exit
from glob import glob
from os import getcwd, getenv, _exit
from os import getenv, _exit
from os.path import isfile
from traceback import format_exc
from json import loads
from subprocess import run
path.append(f"{getcwd()}/utils")
path.append(f"{getcwd()}/tests")
path.extend((f"{Path.cwd()}/utils", f"{Path.cwd()}/tests"))
from Test import Test
from DockerTest import DockerTest
@ -26,7 +26,7 @@ if len(argv) <= 1:
exit(1)
test_type = argv[1]
if not test_type in ["linux", "docker", "autoconf", "swarm", "kubernetes", "ansible"]:
if not test_type in ("linux", "docker", "autoconf", "swarm", "kubernetes", "ansible"):
logger.error(f"Wrong type argument {test_type}")
exit(1)