bunkerized-nginx/ui/main.py

1220 lines
44 KiB
Python
Executable File

import os
from shutil import rmtree, copytree, chown
from logging import getLogger, INFO, ERROR, StreamHandler, Formatter
from flask import (
Flask,
flash,
jsonify,
redirect,
render_template,
request,
send_file,
url_for,
)
from flask_login import LoginManager, login_required, login_user, logout_user
from flask_wtf.csrf import CSRFProtect, CSRFError
from json import JSONDecodeError, load as json_load
from bs4 import BeautifulSoup
from datetime import datetime, timezone
from dateutil.parser import parse as dateutil_parse
from requests import get
from requests.utils import default_headers
from sys import path as sys_path, exit as sys_exit
from copy import deepcopy
from docker import DockerClient
from docker.errors import (
NotFound as docker_NotFound,
APIError as docker_APIError,
DockerException,
)
from uuid import uuid4
from time import time
import tarfile
import zipfile
from ui.src.ConfigFiles import ConfigFiles
from ui.src.Config import Config
from ui.src.ReverseProxied import ReverseProxied
from ui.src.User import User
from ui.utils import (
check_settings,
env_to_summary_class,
form_plugin_gen,
form_service_gen,
form_service_gen_multiple,
form_service_gen_multiple_values,
gen_folders_tree_html,
get_variables,
path_to_dict,
)
sys_path.append("/opt/bunkerweb/utils")
from ui.src.Instances import Instances
from api.API import API
from utils.ApiCaller import ApiCaller
# Set up logger
logger = getLogger("flask_app")
logger.setLevel(INFO)
# create console handler with a higher log level
ch = StreamHandler()
ch.setLevel(ERROR)
# create formatter and add it to the handlers
formatter = Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch.setFormatter(formatter)
# add the handlers to logger
logger.addHandler(ch)
# Flask app
app = Flask(
__name__,
static_url_path="/",
static_folder="static",
template_folder="templates",
)
app.wsgi_app = ReverseProxied(app.wsgi_app)
# Set variables and instantiate objects
vars = get_variables()
if not vars["FLASK_ENV"] == "development" and vars["ADMIN_PASSWORD"] == "changeme":
logger.error("Please change the default admin password.")
sys_exit(1)
if not vars["FLASK_ENV"] == "development" and (
vars["ABSOLUTE_URI"].endswith("/changeme/")
or vars["ABSOLUTE_URI"].endswith("/changeme")
):
logger.error("Please change the default URL.")
sys_exit(1)
with open("/opt/bunkerweb/tmp/ui.pid", "w") as f:
f.write(str(os.getpid()))
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "login"
user = User(vars["ADMIN_USERNAME"], vars["ADMIN_PASSWORD"])
api_caller = ApiCaller()
PLUGIN_KEYS = [
"id",
"order",
"name",
"description",
"version",
"settings",
]
try:
docker_client: DockerClient = DockerClient(base_url=vars["DOCKER_HOST"])
except (docker_APIError, DockerException):
docker_client = None
if docker_client:
apis: list[API] = []
for container in docker_client.containers.list(filters={"label": "bunkerweb.UI"}):
env_variables = {
x[0]: x[1]
for x in [env.split("=") for env in container.attrs["Config"]["Env"]]
}
apis.append(
API(
f"http://{container.name}:{env_variables.get('API_HTTP_PORT', '5000')}",
env_variables.get("API_SERVER_NAME", "bwapi"),
)
)
api_caller._set_apis(apis)
try:
app.config.update(
DEBUG=True,
SECRET_KEY=vars["FLASK_SECRET"],
ABSOLUTE_URI=vars["ABSOLUTE_URI"],
INSTANCES=Instances(docker_client),
CONFIG=Config(),
CONFIGFILES=ConfigFiles(),
SESSION_COOKIE_DOMAIN=vars["ABSOLUTE_URI"]
.replace("http://", "")
.replace("https://", "")
.split("/")[0],
WTF_CSRF_SSL_STRICT=False,
USER=user,
SEND_FILE_MAX_AGE_DEFAULT=86400,
)
except FileNotFoundError as e:
logger.error(repr(e), e.filename)
sys_exit(1)
# Declare functions for jinja2
app.jinja_env.globals.update(env_to_summary_class=env_to_summary_class)
app.jinja_env.globals.update(form_plugin_gen=form_plugin_gen)
app.jinja_env.globals.update(form_service_gen=form_service_gen)
app.jinja_env.globals.update(form_service_gen_multiple=form_service_gen_multiple)
app.jinja_env.globals.update(
form_service_gen_multiple_values=form_service_gen_multiple_values
)
app.jinja_env.globals.update(gen_folders_tree_html=gen_folders_tree_html)
app.jinja_env.globals.update(check_settings=check_settings)
@login_manager.user_loader
def load_user(user_id):
return User(user_id, vars["ADMIN_PASSWORD"])
# CSRF protection
csrf = CSRFProtect()
csrf.init_app(app)
@app.errorhandler(CSRFError)
def handle_csrf_error(e):
"""
It takes a CSRFError exception as an argument, and returns a Flask response
:param e: The exception object
:return: A template with the error message and a 401 status code.
"""
logout_user()
flash("Wrong CSRF token !", "error")
return render_template("login.html"), 403
@app.route("/")
def index():
return redirect(url_for("login"))
@app.route("/loading")
@login_required
def loading():
next_url = request.values.get("next")
return render_template("loading.html", next=next_url)
@app.route("/home")
@login_required
def home():
"""
It returns the home page
:return: The home.html template is being rendered with the following variables:
check_version: a boolean indicating whether the local version is the same as the remote version
remote_version: the remote version
version: the local version
instances_number: the number of instances
services_number: the number of services
posts: a list of posts
"""
r = get(
"https://raw.githubusercontent.com/bunkerity/bunkerweb/master/VERSION",
)
remote_version = None
if r.status_code == 200:
remote_version = r.text.strip()
with open("/opt/bunkerweb/VERSION", "r") as f:
version = f.read().strip()
headers = default_headers()
headers.update({"User-Agent": "bunkerweb-ui"})
r = get(
"https://www.bunkerity.com/wp-json/wp/v2/posts",
headers=headers,
)
formatted_posts = None
if r.status_code == 200:
posts = r.json()
formatted_posts = []
for post in posts[:5]:
formatted_posts.append(
{
"link": post["link"],
"title": post["title"]["rendered"],
"description": BeautifulSoup(
post["content"]["rendered"][
post["content"]["rendered"].index("<em>")
+ 4 : post["content"]["rendered"].index("</em>")
],
features="html.parser",
).get_text()[:256]
+ ("..." if len(post["content"]["rendered"]) > 256 else ""),
"date": dateutil_parse(post["date"]).strftime("%B %d, %Y"),
"image_url": post["yoast_head_json"]["og_image"][0]["url"].replace(
"wwwdev", "www"
),
"reading_time": post["yoast_head_json"]["twitter_misc"][
"Est. reading time"
],
}
)
instances_number = len(app.config["INSTANCES"].get_instances())
services_number = len(app.config["CONFIG"].get_services())
return render_template(
"home.html",
check_version=not remote_version or version == remote_version,
remote_version=remote_version,
version=version,
instances_number=instances_number,
services_number=services_number,
posts=formatted_posts,
)
@app.route("/instances", methods=["GET", "POST"])
@login_required
def instances():
# Manage instances
if request.method == "POST":
# Check operation
if not "operation" in request.form or not request.form["operation"] in [
"reload",
"start",
"stop",
"restart",
]:
flash("Missing operation parameter on /instances.", "error")
return redirect(url_for("loading", next=url_for("instances")))
# Check that all fields are present
if not "INSTANCE_ID" in request.form:
flash("Missing INSTANCE_ID parameter.", "error")
return redirect(url_for("loading", next=url_for("instances")))
# Do the operation
if request.form["operation"] == "reload":
operation = app.config["INSTANCES"].reload_instance(
request.form["INSTANCE_ID"]
)
elif request.form["operation"] == "start":
operation = app.config["INSTANCES"].start_instance(
request.form["INSTANCE_ID"]
)
elif request.form["operation"] == "stop":
operation = app.config["INSTANCES"].stop_instance(
request.form["INSTANCE_ID"]
)
elif request.form["operation"] == "restart":
operation = app.config["INSTANCES"].restart_instance(
request.form["INSTANCE_ID"]
)
if operation.startswith("Can't"):
flash(operation, "error")
else:
flash(operation)
return redirect(url_for("loading", next=url_for("instances")))
# Display instances
instances = app.config["INSTANCES"].get_instances()
return render_template("instances.html", title="Instances", instances=instances)
@app.route("/services", methods=["GET", "POST"])
@login_required
def services():
if request.method == "POST":
# Check operation
if not "operation" in request.form or not request.form["operation"] in [
"new",
"edit",
"delete",
]:
flash("Missing operation parameter on /services.", "error")
return redirect(url_for("loading", next=url_for("services")))
# Check variables
variables = deepcopy(request.form.to_dict())
del variables["csrf_token"]
if (
not "OLD_SERVER_NAME" in request.form
and request.form["operation"] == "edit"
):
flash("Missing OLD_SERVER_NAME parameter.", "error")
return redirect(url_for("loading", next=url_for("services")))
if request.form["operation"] in ("new", "edit"):
del variables["operation"]
if request.form["operation"] == "edit":
del variables["OLD_SERVER_NAME"]
# Edit check fields and remove already existing ones
config = app.config["CONFIG"].get_config()
for variable in deepcopy(variables):
if variables[variable] == "on":
variables[variable] = "yes"
elif variables[variable] == "off":
variables[variable] = "no"
if (
request.form["operation"] == "edit"
and variable != "SERVER_NAME"
and variables[variable] == config.get(variable, None)
or not variables[variable].strip()
):
del variables[variable]
if not variables:
flash(
f"{variables['SERVER_NAME'].split(' ')[0]} was not edited because no values were changed."
)
return redirect(url_for("loading", next=url_for("services")))
error = app.config["CONFIG"].check_variables(variables)
if error:
return redirect(url_for("loading", next=url_for("services")))
# Delete
elif request.form["operation"] == "delete":
if not "SERVER_NAME" in request.form:
flash("Missing SERVER_NAME parameter.", "error")
return redirect(url_for("loading", next=url_for("services")))
error = app.config["CONFIG"].check_variables(
{"SERVER_NAME": request.form["SERVER_NAME"]}
)
if error:
return redirect(url_for("loading", next=url_for("services")))
error = 0
# Do the operation
if request.form["operation"] == "new":
operation, error = app.config["CONFIG"].new_service(variables)
elif request.form["operation"] == "edit":
operation = app.config["CONFIG"].edit_service(
request.form["OLD_SERVER_NAME"], variables
)
elif request.form["operation"] == "delete":
operation, error = app.config["CONFIG"].delete_service(
request.form["SERVER_NAME"]
)
if error:
flash(operation, "error")
return redirect(url_for("loading", next=url_for("services")))
flash(operation)
# Reload instances
_reloads = app.config["INSTANCES"].reload_instances()
if not _reloads:
for _reload in _reloads:
flash(f"Reload failed for the instance {_reload}", "error")
else:
flash("Successfully reloaded instances")
return redirect(url_for("loading", next=url_for("services")))
# Display services
services = app.config["CONFIG"].get_services()
return render_template("services.html", services=services)
@app.route("/global_config", methods=["GET", "POST"])
@login_required
def global_config():
if request.method == "POST":
# Check variables
variables = deepcopy(request.form.to_dict())
del variables["csrf_token"]
# Edit check fields and remove already existing ones
config = app.config["CONFIG"].get_config()
for variable in deepcopy(variables):
if variables[variable] == "on":
variables[variable] = "yes"
elif variables[variable] == "off":
variables[variable] = "no"
if (
variables[variable] == config.get(variable, None)
or not variables[variable].strip()
):
del variables[variable]
if not variables:
flash(
f"The global configuration was not edited because no values were changed."
)
return redirect(url_for("loading", next=url_for("global_config")))
error = app.config["CONFIG"].check_variables(variables, True)
if error:
return redirect(url_for("loading", next=url_for("global_config")))
error = 0
# Do the operation
operation = app.config["CONFIG"].edit_global_conf(variables)
if error:
flash(operation, "error")
return redirect(url_for("loading", next=url_for("global_config")))
flash(operation)
# Reload instances
_reloads = app.config["INSTANCES"].reload_instances()
if not _reloads:
for _reload in _reloads:
flash(f"Reload failed for the instance {_reload}", "error")
else:
flash("Successfully reloaded instances")
return redirect(url_for("loading", next=url_for("global_config")))
# Display services
services = app.config["CONFIG"].get_services()
return render_template("global_config.html", services=services)
@app.route("/configs", methods=["GET", "POST"])
@login_required
def configs():
if request.method == "POST":
operation = ""
# Check operation
if not "operation" in request.form or not request.form["operation"] in [
"new",
"edit",
"delete",
]:
flash("Missing operation parameter on /configs.", "error")
return redirect(url_for("loading", next=url_for("configs")))
# Check variables
variables = deepcopy(request.form.to_dict())
del variables["csrf_token"]
operation = app.config["CONFIGFILES"].check_path(variables["path"])
if operation:
flash(operation, "error")
return redirect(url_for("loading", next=url_for("configs"))), 500
if request.form["operation"] in ("new", "edit"):
if not app.config["CONFIGFILES"].check_name(variables["name"]):
flash(
f"Invalid {variables['type']} name. (Can only contain numbers, letters, underscores and hyphens (min 4 characters and max 32))",
"error",
)
return redirect(url_for("loading", next=url_for("configs")))
if variables["type"] == "file":
variables["name"] = f"{variables['name']}.conf"
variables["content"] = BeautifulSoup(
variables["content"], "html.parser"
).get_text()
if request.form["operation"] == "new":
if variables["type"] == "folder":
operation, error = app.config["CONFIGFILES"].create_folder(
variables["path"], variables["name"]
)
elif variables["type"] == "file":
operation, error = app.config["CONFIGFILES"].create_file(
variables["path"], variables["name"], variables["content"]
)
elif request.form["operation"] == "edit":
if variables["type"] == "folder":
operation, error = app.config["CONFIGFILES"].edit_folder(
variables["path"], variables["name"]
)
elif variables["type"] == "file":
operation, error = app.config["CONFIGFILES"].edit_file(
variables["path"], variables["name"], variables["content"]
)
if error:
flash(operation, "error")
return redirect(url_for("loading", next=url_for("configs")))
else:
operation, error = app.config["CONFIGFILES"].delete_path(variables["path"])
if error:
flash(operation, "error")
return redirect(url_for("loading", next=url_for("configs")))
flash(operation)
# Reload instances
_reloads = app.config["INSTANCES"].reload_instances()
if not _reloads:
for _reload in _reloads:
flash(f"Reload failed for the instance {_reload}", "error")
else:
flash("Successfully reloaded instances")
return redirect(url_for("loading", next=url_for("configs")))
return render_template(
"configs.html", folders=[path_to_dict("/opt/bunkerweb/configs")]
)
@app.route("/plugins", methods=["GET", "POST"])
@login_required
def plugins():
if request.method == "POST":
operation = ""
error = 0
if "operation" in request.form and request.form["operation"] == "delete":
# Check variables
variables = deepcopy(request.form.to_dict())
del variables["csrf_token"]
operation = app.config["CONFIGFILES"].check_path(
variables["path"], "/opt/bunkerweb/plugins/"
)
if operation:
flash(operation, "error")
return redirect(url_for("loading", next=url_for("plugins"))), 500
operation, error = app.config["CONFIGFILES"].delete_path(variables["path"])
if error:
flash(operation, "error")
return redirect(url_for("loading", next=url_for("plugins")))
else:
if not os.path.exists("/opt/bunkerweb/tmp/ui") or not os.listdir(
"/opt/bunkerweb/tmp/ui"
):
flash("Please upload new plugins to reload plugins", "error")
return redirect(url_for("loading", next=url_for("plugins")))
for file in os.listdir("/opt/bunkerweb/tmp/ui"):
if not os.path.isfile(f"/opt/bunkerweb/tmp/ui/{file}"):
continue
folder_name = ""
temp_folder_name = file.split(".")[0]
try:
if file.endswith(".zip"):
try:
with zipfile.ZipFile(
f"/opt/bunkerweb/tmp/ui/{file}"
) as zip_file:
try:
zip_file.getinfo("plugin.json")
zip_file.extractall(
f"/opt/bunkerweb/tmp/ui/{temp_folder_name}"
)
with open(
f"/opt/bunkerweb/tmp/ui/{temp_folder_name}/plugin.json",
"r",
) as f:
plugin_file = json_load(f)
if not all(
key in plugin_file.keys() for key in PLUGIN_KEYS
):
raise ValueError
folder_name = plugin_file["id"]
if not app.config["CONFIGFILES"].check_name(
folder_name
):
error = 1
flash(
f"Invalid plugin name for {temp_folder_name}. (Can only contain numbers, letters, underscores and hyphens (min 4 characters and max 32))",
"error",
)
raise Exception
if os.path.exists(
f"/opt/bunkerweb/plugins/{folder_name}"
):
raise FileExistsError
copytree(
f"/opt/bunkerweb/tmp/ui/{temp_folder_name}",
f"/opt/bunkerweb/plugins/{folder_name}",
)
except KeyError:
zip_file.extractall(
f"/opt/bunkerweb/tmp/ui/{temp_folder_name}"
)
dirs = [
d
for d in os.listdir(
f"/opt/bunkerweb/tmp/ui/{temp_folder_name}"
)
if os.path.isdir(
f"/opt/bunkerweb/tmp/ui/{temp_folder_name}/{d}"
)
]
if (
not dirs
or len(dirs) > 1
or not os.path.exists(
f"/opt/bunkerweb/tmp/ui/{temp_folder_name}/{dirs[0]}/plugin.json"
)
):
raise KeyError
with open(
f"/opt/bunkerweb/tmp/ui/{temp_folder_name}/{dirs[0]}/plugin.json",
"r",
) as f:
plugin_file = json_load(f)
if not all(
key in plugin_file.keys() for key in PLUGIN_KEYS
):
raise ValueError
folder_name = plugin_file["id"]
if not app.config["CONFIGFILES"].check_name(
folder_name
):
error = 1
flash(
f"Invalid plugin name for {temp_folder_name}. (Can only contain numbers, letters, underscores and hyphens (min 4 characters and max 32))",
"error",
)
raise Exception
if os.path.exists(
f"/opt/bunkerweb/plugins/{folder_name}"
):
raise FileExistsError
copytree(
f"/opt/bunkerweb/tmp/ui/{temp_folder_name}/{dirs[0]}",
f"/opt/bunkerweb/plugins/{folder_name}",
)
except zipfile.BadZipFile:
error = 1
flash(
f"{file} is not a valid zip file. ({folder_name if folder_name else temp_folder_name})",
"error",
)
else:
try:
with tarfile.open(
f"/opt/bunkerweb/tmp/ui/{file}",
errorlevel=2,
) as tar_file:
try:
tar_file.getmember("plugin.json")
tar_file.extractall(
f"/opt/bunkerweb/tmp/ui/{temp_folder_name}"
)
with open(
f"/opt/bunkerweb/tmp/ui/{temp_folder_name}/plugin.json",
"r",
) as f:
plugin_file = json_load(f)
if not all(
key in plugin_file.keys() for key in PLUGIN_KEYS
):
raise ValueError
folder_name = plugin_file["id"]
if not app.config["CONFIGFILES"].check_name(
folder_name
):
error = 1
flash(
f"Invalid plugin name for {temp_folder_name}. (Can only contain numbers, letters, underscores and hyphens (min 4 characters and max 32))",
"error",
)
raise Exception
if os.path.exists(
f"/opt/bunkerweb/plugins/{folder_name}"
):
raise FileExistsError
copytree(
f"/opt/bunkerweb/tmp/ui/{temp_folder_name}",
f"/opt/bunkerweb/plugins/{folder_name}",
)
except KeyError:
tar_file.extractall(
f"/opt/bunkerweb/tmp/ui/{temp_folder_name}",
)
dirs = [
d
for d in os.listdir(
f"/opt/bunkerweb/tmp/ui/{temp_folder_name}"
)
if os.path.isdir(
f"/opt/bunkerweb/tmp/ui/{temp_folder_name}/{d}"
)
]
if (
not dirs
or len(dirs) > 1
or not os.path.exists(
f"/opt/bunkerweb/tmp/ui/{temp_folder_name}/{dirs[0]}/plugin.json"
)
):
raise KeyError
with open(
f"/opt/bunkerweb/tmp/ui/{temp_folder_name}/{dirs[0]}/plugin.json",
"r",
) as f:
plugin_file = json_load(f)
if not all(
key in plugin_file.keys() for key in PLUGIN_KEYS
):
raise ValueError
folder_name = plugin_file["id"]
if not app.config["CONFIGFILES"].check_name(
folder_name
):
error = 1
flash(
f"Invalid plugin name for {temp_folder_name}. (Can only contain numbers, letters, underscores and hyphens (min 4 characters and max 32))",
"error",
)
raise Exception
if os.path.exists(
f"/opt/bunkerweb/plugins/{folder_name}"
):
raise FileExistsError
copytree(
f"/opt/bunkerweb/tmp/ui/{temp_folder_name}/{dirs[0]}",
f"/opt/bunkerweb/plugins/{folder_name}",
)
except tarfile.ReadError:
error = 1
flash(
f"Couldn't read file {file} ({folder_name if folder_name else temp_folder_name})",
"error",
)
except tarfile.CompressionError:
error = 1
flash(
f"{file} is not a valid tar file ({folder_name if folder_name else temp_folder_name})",
"error",
)
except tarfile.HeaderError:
error = 1
flash(
f"The file plugin.json in {file} is not valid ({folder_name if folder_name else temp_folder_name})",
"error",
)
except KeyError:
error = 1
flash(
f"{file} is not a valid plugin (plugin.json file is missing) ({folder_name if folder_name else temp_folder_name})",
"error",
)
except JSONDecodeError as e:
error = 1
flash(
f"The file plugin.json in {file} is not valid ({e.msg}: line {e.lineno} column {e.colno} (char {e.pos})) ({folder_name if folder_name else temp_folder_name})",
"error",
)
except ValueError:
error = 1
flash(
f"The file plugin.json is missing one or more of the following keys: <i>{', '.join(PLUGIN_KEYS)}</i> ({folder_name if folder_name else temp_folder_name})",
"error",
)
except FileExistsError:
error = 1
flash(
f"A plugin named {folder_name} already exists",
"error",
)
except (tarfile.TarError, OSError) as e:
error = 1
flash(f"{e}", "error")
except Exception:
pass
finally:
if error != 1:
flash(
f"Successfully created plugin: <b><i>{folder_name}</i></b>"
)
error = 0
for root, dirs, files in os.walk("/opt/bunkerweb/plugins", topdown=False):
for name in files + dirs:
chown(os.path.join(root, name), "nginx", "nginx")
os.chmod(os.path.join(root, name), 0o770)
app.config["CONFIG"].reload_config()
if operation:
flash(operation)
# Reload instances
_reloads = app.config["INSTANCES"].reload_instances()
if not _reloads:
for _reload in _reloads:
flash(f"Reload failed for the instance {_reload}", "error")
else:
flash("Successfully reloaded instances")
if os.path.exists("/opt/bunkerweb/tmp/ui"):
try:
rmtree("/opt/bunkerweb/tmp/ui")
except OSError:
pass
app.config["CONFIG"].reload_plugins()
return redirect(url_for("loading", next=url_for("plugins")))
plugins = [
{
"name": "plugins",
"type": "folder",
"path": "/opt/bunkerweb/plugins",
"can_create_files": False,
"can_create_folders": False,
"can_edit": False,
"can_delete": False,
"children": [
{
"name": _dir,
"type": "folder",
"path": f"/opt/bunkerweb/plugins/{_dir}",
"can_create_files": False,
"can_create_folders": False,
"can_edit": False,
"can_delete": True,
}
for _dir in os.listdir("/opt/bunkerweb/plugins")
],
}
]
return render_template("plugins.html", folders=plugins)
@app.route("/plugins/upload", methods=["POST"])
@login_required
def upload_plugin():
if not request.files:
return {"status": "ko"}, 400
if not os.path.exists("/opt/bunkerweb/tmp/ui"):
os.mkdir("/opt/bunkerweb/tmp/ui")
for file in request.files.values():
if not file.filename.endswith((".zip", ".tar.gz", ".tar.xz")):
return {"status": "ko"}, 422
with open(
f"/opt/bunkerweb/tmp/ui/{uuid4()}{file.filename[file.filename.index('.'):]}",
"wb",
) as f:
f.write(file.read())
return {"status": "ok"}, 201
@app.route("/cache", methods=["GET"])
@login_required
def cache():
return render_template(
"cache.html", folders=[path_to_dict("/opt/bunkerweb/cache", is_cache=True)]
)
@app.route("/cache/download", methods=["GET"])
@login_required
def cache_download():
path = request.args.get("path")
if not path:
return redirect(url_for("loading", next=url_for("cache"))), 400
operation = app.config["CONFIGFILES"].check_path(path, "/opt/bunkerweb/cache/")
if operation:
flash(operation, "error")
return redirect(url_for("loading", next=url_for("plugins"))), 500
return send_file(path, as_attachment=True)
@app.route("/logs", methods=["GET"])
@login_required
def logs():
instances = app.config["INSTANCES"].get_instances()
first_instance = instances[0] if instances else None
return render_template(
"logs.html", first_instance=first_instance, instances=instances
)
@app.route("/logs/local", methods=["GET"])
@login_required
def logs_linux():
if not os.path.exists("/usr/sbin/nginx"):
return (
jsonify(
{
"status": "ko",
"message": "There are no linux instances running",
}
),
404,
)
last_update = request.args.get("last_update")
raw_logs_access = []
raw_logs_error = []
if last_update:
if os.path.exists("/var/log/nginx/error.log"):
with open("/var/log/nginx/error.log", "r") as f:
raw_logs_error = f.read().splitlines()[int(last_update.split(".")[0]) :]
if os.path.exists("/var/log/nginx/access.log"):
with open("/var/log/nginx/access.log", "r") as f:
raw_logs_access = f.read().splitlines()[
int(last_update.split(".")[1]) :
]
else:
if os.path.exists("/var/log/nginx/error.log"):
with open("/var/log/nginx/error.log", "r") as f:
raw_logs_error = f.read().splitlines()
if os.path.exists("/var/log/nginx/access.log"):
with open("/var/log/nginx/access.log", "r") as f:
raw_logs_access = f.read().splitlines()
logs_error = []
temp_multiple_lines = []
NGINX_LOG_LEVELS = [
"debug",
"notice",
"info",
"warn",
"error",
"crit",
"alert",
"emerg",
]
for line in raw_logs_error:
line_lower = line.lower()
if "[info]" in line.lower() and line.endswith(":") or "[error]" in line.lower():
if temp_multiple_lines:
logs_error.append("\n".join(temp_multiple_lines))
temp_multiple_lines = [
f"{datetime.strptime(' '.join(line.strip().split(' ')[0:2]), '%Y/%m/%d %H:%M:%S').replace(tzinfo=timezone.utc).timestamp()} {line}"
]
elif (
all(f"[{log_level}]" not in line_lower for log_level in NGINX_LOG_LEVELS)
and temp_multiple_lines
):
temp_multiple_lines.append(line)
else:
logs_error.append(
f"{datetime.strptime(' '.join(line.strip().split(' ')[0:2]), '%Y/%m/%d %H:%M:%S').replace(tzinfo=timezone.utc).timestamp()} {line}"
)
if temp_multiple_lines:
logs_error.append("\n".join(temp_multiple_lines))
logs_access = [
f"{datetime.strptime(line[line.find('[') + 1: line.find(']')], '%d/%b/%Y:%H:%M:%S %z').timestamp()} {line}"
for line in raw_logs_access
]
raw_logs = logs_error + logs_access
raw_logs.sort(
key=lambda x: float(x.split(" ")[0]) if x.split(" ")[0].isdigit() else 0
)
logs = []
for log in raw_logs:
log_lower = log.lower()
error_type = (
"error"
if "[error]" in log_lower
or "[crit]" in log_lower
or "[alert]" in log_lower
or "" in log_lower
else (
"warn"
if "[warn]" in log_lower
else ("info" if "[info]" in log_lower else "message")
)
)
if "\n" in log:
splitted_one_line = log.split("\n")
logs.append(
{
"content": " ".join(
splitted_one_line.pop(0).strip().split(" ")[1:]
),
"type": error_type,
"separator": True,
}
)
for splitted_log in splitted_one_line:
logs.append(
{
"content": splitted_log,
"type": error_type,
}
)
else:
logs.append(
{
"content": " ".join(log.strip().split(" ")[1:]),
"type": error_type,
}
)
count_error_logs = 0
for log in logs_error:
if "\n" in log:
for _ in log.split("\n"):
count_error_logs += 1
else:
count_error_logs += 1
return jsonify(
{
"logs": logs,
"last_update": f"{count_error_logs + int(last_update.split('.')[0])}.{len(logs_access) + int(last_update.split('.')[1])}"
if last_update
else f"{count_error_logs}.{len(logs_access)}",
}
)
@app.route("/logs/<container_id>", methods=["GET"])
@login_required
def logs_container(container_id):
last_update = request.args.get("last_update")
logs = []
if docker_client:
try:
if last_update:
if not last_update.isdigit():
return (
jsonify(
{
"status": "ko",
"message": "last_update must be an integer",
}
),
422,
)
docker_logs = docker_client.containers.get(container_id).logs(
stdout=True,
stderr=True,
since=datetime.fromtimestamp(int(last_update)),
)
else:
docker_logs = docker_client.containers.get(container_id).logs(
stdout=True,
stderr=True,
)
for log in docker_logs.decode("utf-8", errors="replace").split("\n")[0:-1]:
log_lower = log.lower()
logs.append(
{
"content": log,
"type": "error"
if "[error]" in log_lower
or "[crit]" in log_lower
or "[alert]" in log_lower
or "" in log_lower
else (
"warn"
if "[warn]" in log_lower
else ("info" if "[info]" in log_lower else "message")
),
}
)
except docker_NotFound:
return (
jsonify(
{
"status": "ko",
"message": f"Container with ID {container_id} not found!",
}
),
404,
)
return jsonify({"logs": logs, "last_update": int(time())})
@app.route("/login", methods=["GET", "POST"])
def login():
fail = False
if (
request.method == "POST"
and "username" in request.form
and "password" in request.form
):
if app.config["USER"].get_id() == request.form["username"] and app.config[
"USER"
].check_password(request.form["password"]):
# log the user in
next_url = request.form.get("next")
login_user(app.config["USER"])
# redirect him to the page he originally wanted or to the home page
return redirect(url_for("loading", next=next_url or url_for("home")))
else:
fail = True
if fail:
return (
render_template("login.html", error="Invalid username or password"),
401,
)
return render_template("login.html")
@app.route("/logout")
@login_required
def logout():
logout_user()
return redirect(url_for("login"))