Fix error when multiple jobs are trying to write in db at the same time
This commit is contained in:
parent
8c67d08aee
commit
61b9517a87
|
@ -6,6 +6,7 @@ from os import _exit, getenv
|
|||
from pathlib import Path
|
||||
from re import IGNORECASE, compile as re_compile
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from threading import Lock
|
||||
from traceback import format_exc
|
||||
from typing import Tuple
|
||||
|
||||
|
@ -61,10 +62,10 @@ db = Database(
|
|||
logger,
|
||||
sqlalchemy_string=getenv("DATABASE_URI", None),
|
||||
)
|
||||
lock = Lock()
|
||||
status = 0
|
||||
|
||||
try:
|
||||
|
||||
# Check if at least a server has Blacklist activated
|
||||
blacklist_activated = False
|
||||
# Multisite case
|
||||
|
@ -191,13 +192,14 @@ try:
|
|||
status = 2
|
||||
else:
|
||||
# Update db
|
||||
err = db.update_job_cache(
|
||||
"blacklist-download",
|
||||
None,
|
||||
f"{kind}.list",
|
||||
content,
|
||||
checksum=new_hash,
|
||||
)
|
||||
with lock:
|
||||
err = db.update_job_cache(
|
||||
"blacklist-download",
|
||||
None,
|
||||
f"{kind}.list",
|
||||
content,
|
||||
checksum=new_hash,
|
||||
)
|
||||
|
||||
if err:
|
||||
logger.warning(f"Couldn't update db cache: {err}")
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
from os import _exit, getenv
|
||||
from pathlib import Path
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from threading import Lock
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.extend(
|
||||
|
@ -24,10 +25,10 @@ db = Database(
|
|||
logger,
|
||||
sqlalchemy_string=getenv("DATABASE_URI", None),
|
||||
)
|
||||
lock = Lock()
|
||||
status = 0
|
||||
|
||||
try:
|
||||
|
||||
# Check if at least a server has BunkerNet activated
|
||||
bunkernet_activated = False
|
||||
# Multisite case
|
||||
|
@ -123,13 +124,15 @@ try:
|
|||
_exit(2)
|
||||
|
||||
# Update db
|
||||
err = db.update_job_cache(
|
||||
"bunkernet-data",
|
||||
None,
|
||||
"ip.list",
|
||||
content,
|
||||
checksum=new_hash,
|
||||
)
|
||||
with lock:
|
||||
err = db.update_job_cache(
|
||||
"bunkernet-data",
|
||||
None,
|
||||
"ip.list",
|
||||
content,
|
||||
checksum=new_hash,
|
||||
)
|
||||
|
||||
if err:
|
||||
logger.warning(f"Couldn't update db ip.list cache: {err}")
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
from os import _exit, getenv
|
||||
from pathlib import Path
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from threading import Lock
|
||||
from time import sleep
|
||||
from traceback import format_exc
|
||||
|
||||
|
@ -24,10 +25,10 @@ db = Database(
|
|||
logger,
|
||||
sqlalchemy_string=getenv("DATABASE_URI", None),
|
||||
)
|
||||
lock = Lock()
|
||||
status = 0
|
||||
|
||||
try:
|
||||
|
||||
# Check if at least a server has BunkerNet activated
|
||||
bunkernet_activated = False
|
||||
# Multisite case
|
||||
|
@ -153,12 +154,14 @@ try:
|
|||
Path("/var/cache/bunkerweb/bunkernet/instance.id").write_text(bunkernet_id)
|
||||
|
||||
# Update db
|
||||
err = db.update_job_cache(
|
||||
"bunkernet-register",
|
||||
None,
|
||||
"instance.id",
|
||||
bunkernet_id.encode("utf-8"),
|
||||
)
|
||||
with lock:
|
||||
err = db.update_job_cache(
|
||||
"bunkernet-register",
|
||||
None,
|
||||
"instance.id",
|
||||
bunkernet_id.encode("utf-8"),
|
||||
)
|
||||
|
||||
if err:
|
||||
logger.warning(f"Couldn't update db cache: {err}")
|
||||
else:
|
||||
|
|
|
@ -4,6 +4,7 @@ from os import getenv, makedirs
|
|||
from pathlib import Path
|
||||
from shutil import copy
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from threading import Lock
|
||||
from traceback import format_exc
|
||||
from typing import Optional
|
||||
|
||||
|
@ -24,6 +25,7 @@ db = Database(
|
|||
logger,
|
||||
sqlalchemy_string=getenv("DATABASE_URI", None),
|
||||
)
|
||||
lock = Lock()
|
||||
|
||||
|
||||
def check_cert(cert_path, key_path, first_server: Optional[str] = None) -> bool:
|
||||
|
@ -80,13 +82,14 @@ def check_cert(cert_path, key_path, first_server: Optional[str] = None) -> bool:
|
|||
copy(key_path, key_cache_path.replace(".hash", ""))
|
||||
|
||||
with open(key_path, "r") as f:
|
||||
err = db.update_job_cache(
|
||||
"custom-cert",
|
||||
first_server,
|
||||
key_cache_path.replace(".hash", "").split("/")[-1],
|
||||
f.read().encode("utf-8"),
|
||||
checksum=key_hash,
|
||||
)
|
||||
with lock:
|
||||
err = db.update_job_cache(
|
||||
"custom-cert",
|
||||
first_server,
|
||||
key_cache_path.replace(".hash", "").split("/")[-1],
|
||||
f.read().encode("utf-8"),
|
||||
checksum=key_hash,
|
||||
)
|
||||
|
||||
if err:
|
||||
logger.warning(
|
||||
|
@ -94,13 +97,14 @@ def check_cert(cert_path, key_path, first_server: Optional[str] = None) -> bool:
|
|||
)
|
||||
|
||||
with open(cert_path, "r") as f:
|
||||
err = db.update_job_cache(
|
||||
"custom-cert",
|
||||
first_server,
|
||||
cert_cache_path.replace(".hash", "").split("/")[-1],
|
||||
f.read().encode("utf-8"),
|
||||
checksum=cert_hash,
|
||||
)
|
||||
with lock:
|
||||
err = db.update_job_cache(
|
||||
"custom-cert",
|
||||
first_server,
|
||||
cert_cache_path.replace(".hash", "").split("/")[-1],
|
||||
f.read().encode("utf-8"),
|
||||
checksum=cert_hash,
|
||||
)
|
||||
|
||||
if err:
|
||||
logger.warning(
|
||||
|
@ -129,9 +133,7 @@ try:
|
|||
|
||||
for first_server in servers:
|
||||
if not first_server or (
|
||||
getenv(
|
||||
f"{first_server}_USE_CUSTOM_SSL", getenv("USE_CUSTOM_SSL", "no")
|
||||
)
|
||||
getenv(f"{first_server}_USE_CUSTOM_SSL", getenv("USE_CUSTOM_SSL", "no"))
|
||||
!= "yes"
|
||||
):
|
||||
continue
|
||||
|
|
|
@ -6,6 +6,7 @@ from os import _exit, getenv
|
|||
from pathlib import Path
|
||||
from re import IGNORECASE, compile as re_compile
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from threading import Lock
|
||||
from traceback import format_exc
|
||||
from typing import Tuple
|
||||
|
||||
|
@ -61,10 +62,10 @@ db = Database(
|
|||
logger,
|
||||
sqlalchemy_string=getenv("DATABASE_URI", None),
|
||||
)
|
||||
lock = Lock()
|
||||
status = 0
|
||||
|
||||
try:
|
||||
|
||||
# Check if at least a server has Greylist activated
|
||||
greylist_activated = False
|
||||
# Multisite case
|
||||
|
@ -175,13 +176,14 @@ try:
|
|||
status = 2
|
||||
else:
|
||||
# Update db
|
||||
err = db.update_job_cache(
|
||||
"greylist-download",
|
||||
None,
|
||||
f"{kind}.list",
|
||||
content,
|
||||
checksum=new_hash,
|
||||
)
|
||||
with lock:
|
||||
err = db.update_job_cache(
|
||||
"greylist-download",
|
||||
None,
|
||||
f"{kind}.list",
|
||||
content,
|
||||
checksum=new_hash,
|
||||
)
|
||||
|
||||
if err:
|
||||
logger.warning(f"Couldn't update db cache: {err}")
|
||||
|
|
|
@ -6,6 +6,7 @@ from os.path import dirname, join
|
|||
from pathlib import Path
|
||||
from stat import S_IEXEC
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from threading import Lock
|
||||
from uuid import uuid4
|
||||
from glob import glob
|
||||
from json import load, loads
|
||||
|
@ -32,6 +33,7 @@ db = Database(
|
|||
logger,
|
||||
sqlalchemy_string=getenv("DATABASE_URI", None),
|
||||
)
|
||||
lock = Lock()
|
||||
status = 0
|
||||
|
||||
|
||||
|
@ -54,7 +56,6 @@ def install_plugin(plugin_dir):
|
|||
|
||||
|
||||
try:
|
||||
|
||||
# Check if we have plugins to download
|
||||
plugin_urls = getenv("EXTERNAL_PLUGIN_URLS", "")
|
||||
if not plugin_urls:
|
||||
|
@ -109,7 +110,9 @@ try:
|
|||
external_plugins.append(plugin_file)
|
||||
external_plugins_ids.append(plugin_file["id"])
|
||||
|
||||
db_plugins = db.get_plugins()
|
||||
with lock:
|
||||
db_plugins = db.get_plugins()
|
||||
|
||||
for plugin in db_plugins:
|
||||
if plugin["external"] is True and plugin["id"] not in external_plugins_ids:
|
||||
external_plugins.append(plugin)
|
||||
|
@ -121,7 +124,9 @@ try:
|
|||
chmod(join(root, name), 0o770)
|
||||
|
||||
if external_plugins:
|
||||
err = db.update_external_plugins(external_plugins)
|
||||
with lock:
|
||||
err = db.update_external_plugins(external_plugins)
|
||||
|
||||
if err:
|
||||
logger.error(
|
||||
f"Couldn't update external plugins to database: {err}",
|
||||
|
|
|
@ -5,6 +5,7 @@ from gzip import decompress
|
|||
from os import _exit, getenv
|
||||
from pathlib import Path
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from threading import Lock
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.extend(
|
||||
|
@ -27,6 +28,7 @@ db = Database(
|
|||
logger,
|
||||
sqlalchemy_string=getenv("DATABASE_URI", None),
|
||||
)
|
||||
lock = Lock()
|
||||
status = 0
|
||||
|
||||
try:
|
||||
|
@ -68,9 +70,11 @@ try:
|
|||
_exit(2)
|
||||
|
||||
# Update db
|
||||
err = db.update_job_cache(
|
||||
"mmdb-asn", None, "asn.mmdb", resp.content, checksum=new_hash
|
||||
)
|
||||
with lock:
|
||||
err = db.update_job_cache(
|
||||
"mmdb-asn", None, "asn.mmdb", resp.content, checksum=new_hash
|
||||
)
|
||||
|
||||
if err:
|
||||
logger.warning(f"Couldn't update db cache: {err}")
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ from gzip import decompress
|
|||
from os import _exit, getenv
|
||||
from pathlib import Path
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from threading import Lock
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.extend(
|
||||
|
@ -27,6 +28,7 @@ db = Database(
|
|||
logger,
|
||||
sqlalchemy_string=getenv("DATABASE_URI", None),
|
||||
)
|
||||
lock = Lock()
|
||||
status = 0
|
||||
|
||||
try:
|
||||
|
@ -70,9 +72,11 @@ try:
|
|||
_exit(2)
|
||||
|
||||
# Update db
|
||||
err = db.update_job_cache(
|
||||
"mmdb-country", None, "country.mmdb", resp.content, checksum=new_hash
|
||||
)
|
||||
with lock:
|
||||
err = db.update_job_cache(
|
||||
"mmdb-country", None, "country.mmdb", resp.content, checksum=new_hash
|
||||
)
|
||||
|
||||
if err:
|
||||
logger.warning(f"Couldn't update db cache: {err}")
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
from os import getenv, makedirs
|
||||
from pathlib import Path
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from threading import Lock
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.extend(
|
||||
|
@ -23,6 +24,7 @@ db = Database(
|
|||
logger,
|
||||
sqlalchemy_string=getenv("DATABASE_URI", None),
|
||||
)
|
||||
lock = Lock()
|
||||
status = 0
|
||||
|
||||
try:
|
||||
|
@ -42,7 +44,10 @@ try:
|
|||
|
||||
# Cluster case
|
||||
if bw_integration in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
|
||||
for instance in db.get_instances():
|
||||
with lock:
|
||||
instances = db.get_instances()
|
||||
|
||||
for instance in instances:
|
||||
endpoint = f"http://{instance['hostname']}:{instance['port']}"
|
||||
host = instance["server_name"]
|
||||
api = API(endpoint, host=host)
|
||||
|
|
|
@ -4,6 +4,7 @@ from os import getenv
|
|||
from os.path import isfile
|
||||
from pathlib import Path
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from threading import Lock
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.extend(
|
||||
|
@ -24,6 +25,7 @@ db = Database(
|
|||
logger,
|
||||
sqlalchemy_string=getenv("DATABASE_URI", None),
|
||||
)
|
||||
lock = Lock()
|
||||
status = 0
|
||||
|
||||
try:
|
||||
|
@ -42,7 +44,10 @@ try:
|
|||
|
||||
# Cluster case
|
||||
if bw_integration in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
|
||||
for instance in db.get_instances():
|
||||
with lock:
|
||||
instances = db.get_instances()
|
||||
|
||||
for instance in instances:
|
||||
endpoint = f"http://{instance['hostname']}:{instance['port']}"
|
||||
host = instance["server_name"]
|
||||
api = API(endpoint, host=host)
|
||||
|
|
|
@ -8,6 +8,7 @@ from shutil import chown
|
|||
from subprocess import run, DEVNULL, STDOUT
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from tarfile import open as tar_open
|
||||
from threading import Lock
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.extend(
|
||||
|
@ -28,6 +29,7 @@ db = Database(
|
|||
logger,
|
||||
sqlalchemy_string=getenv("DATABASE_URI", None),
|
||||
)
|
||||
lock = Lock()
|
||||
status = 0
|
||||
|
||||
try:
|
||||
|
@ -62,7 +64,10 @@ try:
|
|||
tgz.seek(0, 0)
|
||||
files = {"archive.tar.gz": tgz}
|
||||
|
||||
for instance in db.get_instances():
|
||||
with lock:
|
||||
instances = db.get_instances()
|
||||
|
||||
for instance in instances:
|
||||
endpoint = f"http://{instance['hostname']}:{instance['port']}"
|
||||
host = instance["server_name"]
|
||||
api = API(endpoint, host=host)
|
||||
|
|
|
@ -4,6 +4,7 @@ from os import environ, getenv
|
|||
from pathlib import Path
|
||||
from subprocess import DEVNULL, STDOUT, run
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from threading import Lock
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.extend(
|
||||
|
@ -22,6 +23,7 @@ db = Database(
|
|||
logger,
|
||||
sqlalchemy_string=getenv("DATABASE_URI", None),
|
||||
)
|
||||
lock = Lock()
|
||||
status = 0
|
||||
|
||||
|
||||
|
@ -102,12 +104,14 @@ try:
|
|||
).read_bytes()
|
||||
|
||||
# Update db
|
||||
err = db.update_job_cache(
|
||||
"certbot-new",
|
||||
first_server,
|
||||
"cert.pem",
|
||||
cert,
|
||||
)
|
||||
with lock:
|
||||
err = db.update_job_cache(
|
||||
"certbot-new",
|
||||
first_server,
|
||||
"cert.pem",
|
||||
cert,
|
||||
)
|
||||
|
||||
if err:
|
||||
logger.warning(f"Couldn't update db cache: {err}")
|
||||
|
||||
|
@ -140,12 +144,14 @@ try:
|
|||
).read_bytes()
|
||||
|
||||
# Update db
|
||||
err = db.update_job_cache(
|
||||
"certbot-new",
|
||||
first_server,
|
||||
"cert.pem",
|
||||
cert,
|
||||
)
|
||||
with lock:
|
||||
err = db.update_job_cache(
|
||||
"certbot-new",
|
||||
first_server,
|
||||
"cert.pem",
|
||||
cert,
|
||||
)
|
||||
|
||||
if err:
|
||||
logger.warning(f"Couldn't update db cache: {err}")
|
||||
except:
|
||||
|
|
|
@ -19,7 +19,6 @@ logger = setup_logger("DEFAULT-SERVER-CERT", getenv("LOG_LEVEL", "INFO"))
|
|||
status = 0
|
||||
|
||||
try:
|
||||
|
||||
# Check if we need to generate a self-signed default cert for non-SNI "clients"
|
||||
need_default_cert = False
|
||||
if getenv("MULTISITE", "no") == "yes":
|
||||
|
|
|
@ -5,6 +5,7 @@ from ipaddress import ip_address, ip_network
|
|||
from os import _exit, getenv
|
||||
from pathlib import Path
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from threading import Lock
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.extend(
|
||||
|
@ -39,6 +40,7 @@ db = Database(
|
|||
logger,
|
||||
sqlalchemy_string=getenv("DATABASE_URI", None),
|
||||
)
|
||||
lock = Lock()
|
||||
status = 0
|
||||
|
||||
try:
|
||||
|
@ -125,13 +127,15 @@ try:
|
|||
_exit(2)
|
||||
|
||||
# Update db
|
||||
err = db.update_job_cache(
|
||||
"realip-download",
|
||||
None,
|
||||
"combined.list",
|
||||
content,
|
||||
checksum=new_hash,
|
||||
)
|
||||
with lock:
|
||||
err = db.update_job_cache(
|
||||
"realip-download",
|
||||
None,
|
||||
"combined.list",
|
||||
content,
|
||||
checksum=new_hash,
|
||||
)
|
||||
|
||||
if err:
|
||||
logger.warning(f"Couldn't update db cache: {err}")
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ from os import getenv
|
|||
from pathlib import Path
|
||||
from subprocess import DEVNULL, STDOUT, run
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from threading import Lock
|
||||
from traceback import format_exc
|
||||
|
||||
sys_path.extend(
|
||||
|
@ -22,6 +23,7 @@ db = Database(
|
|||
logger,
|
||||
sqlalchemy_string=getenv("DATABASE_URI", None),
|
||||
)
|
||||
lock = Lock()
|
||||
|
||||
|
||||
def generate_cert(first_server, days, subj):
|
||||
|
@ -40,22 +42,22 @@ def generate_cert(first_server, days, subj):
|
|||
return False, 2
|
||||
|
||||
# Update db
|
||||
with open(f"/var/cache/bunkerweb/selfsigned/{first_server}.key", "r") as f:
|
||||
key_data = f.read().encode("utf-8")
|
||||
key_data = Path(f"/var/cache/bunkerweb/selfsigned/{first_server}.key").read_bytes()
|
||||
|
||||
err = db.update_job_cache(
|
||||
"self-signed", first_server, f"{first_server}.key", key_data
|
||||
)
|
||||
with lock:
|
||||
err = db.update_job_cache(
|
||||
"self-signed", first_server, f"{first_server}.key", key_data
|
||||
)
|
||||
|
||||
if err:
|
||||
logger.warning(f"Couldn't update db cache for {first_server}.key file: {err}")
|
||||
|
||||
with open(f"/var/cache/bunkerweb/selfsigned/{first_server}.pem", "r") as f:
|
||||
pem_data = f.read().encode("utf-8")
|
||||
pem_data = Path(f"/var/cache/bunkerweb/selfsigned/{first_server}.pem").read_bytes()
|
||||
|
||||
err = db.update_job_cache(
|
||||
"self-signed", first_server, f"{first_server}.pem", pem_data
|
||||
)
|
||||
with lock:
|
||||
err = db.update_job_cache(
|
||||
"self-signed", first_server, f"{first_server}.pem", pem_data
|
||||
)
|
||||
|
||||
if err:
|
||||
logger.warning(f"Couldn't update db cache for {first_server}.pem file: {err}")
|
||||
|
|
|
@ -6,6 +6,7 @@ from os import _exit, getenv
|
|||
from pathlib import Path
|
||||
from re import IGNORECASE, compile as re_compile
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from threading import Lock
|
||||
from traceback import format_exc
|
||||
from typing import Tuple
|
||||
|
||||
|
@ -61,10 +62,10 @@ db = Database(
|
|||
logger,
|
||||
sqlalchemy_string=getenv("DATABASE_URI", None),
|
||||
)
|
||||
lock = Lock()
|
||||
status = 0
|
||||
|
||||
try:
|
||||
|
||||
# Check if at least a server has Whitelist activated
|
||||
whitelist_activated = False
|
||||
# Multisite case
|
||||
|
@ -175,13 +176,14 @@ try:
|
|||
status = 2
|
||||
else:
|
||||
# Update db
|
||||
err = db.update_job_cache(
|
||||
"whitelist-download",
|
||||
None,
|
||||
f"{kind}.list",
|
||||
content,
|
||||
checksum=new_hash,
|
||||
)
|
||||
with lock:
|
||||
err = db.update_job_cache(
|
||||
"whitelist-download",
|
||||
None,
|
||||
f"{kind}.list",
|
||||
content,
|
||||
checksum=new_hash,
|
||||
)
|
||||
|
||||
if err:
|
||||
logger.warning(f"Couldn't update db cache: {err}")
|
||||
|
|
Loading…
Reference in New Issue