Fix error when multiple jobs are trying to write in db at the same time

This commit is contained in:
Théophile Diot 2023-03-09 10:05:35 +01:00
parent 8c67d08aee
commit 61b9517a87
No known key found for this signature in database
GPG Key ID: E752C80DB72BB014
16 changed files with 151 additions and 98 deletions

View File

@ -6,6 +6,7 @@ from os import _exit, getenv
from pathlib import Path
from re import IGNORECASE, compile as re_compile
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from traceback import format_exc
from typing import Tuple
@ -61,10 +62,10 @@ db = Database(
logger,
sqlalchemy_string=getenv("DATABASE_URI", None),
)
lock = Lock()
status = 0
try:
# Check if at least a server has Blacklist activated
blacklist_activated = False
# Multisite case
@ -191,13 +192,14 @@ try:
status = 2
else:
# Update db
err = db.update_job_cache(
"blacklist-download",
None,
f"{kind}.list",
content,
checksum=new_hash,
)
with lock:
err = db.update_job_cache(
"blacklist-download",
None,
f"{kind}.list",
content,
checksum=new_hash,
)
if err:
logger.warning(f"Couldn't update db cache: {err}")

View File

@ -3,6 +3,7 @@
from os import _exit, getenv
from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from traceback import format_exc
sys_path.extend(
@ -24,10 +25,10 @@ db = Database(
logger,
sqlalchemy_string=getenv("DATABASE_URI", None),
)
lock = Lock()
status = 0
try:
# Check if at least a server has BunkerNet activated
bunkernet_activated = False
# Multisite case
@ -123,13 +124,15 @@ try:
_exit(2)
# Update db
err = db.update_job_cache(
"bunkernet-data",
None,
"ip.list",
content,
checksum=new_hash,
)
with lock:
err = db.update_job_cache(
"bunkernet-data",
None,
"ip.list",
content,
checksum=new_hash,
)
if err:
logger.warning(f"Couldn't update db ip.list cache: {err}")

View File

@ -3,6 +3,7 @@
from os import _exit, getenv
from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from time import sleep
from traceback import format_exc
@ -24,10 +25,10 @@ db = Database(
logger,
sqlalchemy_string=getenv("DATABASE_URI", None),
)
lock = Lock()
status = 0
try:
# Check if at least a server has BunkerNet activated
bunkernet_activated = False
# Multisite case
@ -153,12 +154,14 @@ try:
Path("/var/cache/bunkerweb/bunkernet/instance.id").write_text(bunkernet_id)
# Update db
err = db.update_job_cache(
"bunkernet-register",
None,
"instance.id",
bunkernet_id.encode("utf-8"),
)
with lock:
err = db.update_job_cache(
"bunkernet-register",
None,
"instance.id",
bunkernet_id.encode("utf-8"),
)
if err:
logger.warning(f"Couldn't update db cache: {err}")
else:

View File

@ -4,6 +4,7 @@ from os import getenv, makedirs
from pathlib import Path
from shutil import copy
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from traceback import format_exc
from typing import Optional
@ -24,6 +25,7 @@ db = Database(
logger,
sqlalchemy_string=getenv("DATABASE_URI", None),
)
lock = Lock()
def check_cert(cert_path, key_path, first_server: Optional[str] = None) -> bool:
@ -80,13 +82,14 @@ def check_cert(cert_path, key_path, first_server: Optional[str] = None) -> bool:
copy(key_path, key_cache_path.replace(".hash", ""))
with open(key_path, "r") as f:
err = db.update_job_cache(
"custom-cert",
first_server,
key_cache_path.replace(".hash", "").split("/")[-1],
f.read().encode("utf-8"),
checksum=key_hash,
)
with lock:
err = db.update_job_cache(
"custom-cert",
first_server,
key_cache_path.replace(".hash", "").split("/")[-1],
f.read().encode("utf-8"),
checksum=key_hash,
)
if err:
logger.warning(
@ -94,13 +97,14 @@ def check_cert(cert_path, key_path, first_server: Optional[str] = None) -> bool:
)
with open(cert_path, "r") as f:
err = db.update_job_cache(
"custom-cert",
first_server,
cert_cache_path.replace(".hash", "").split("/")[-1],
f.read().encode("utf-8"),
checksum=cert_hash,
)
with lock:
err = db.update_job_cache(
"custom-cert",
first_server,
cert_cache_path.replace(".hash", "").split("/")[-1],
f.read().encode("utf-8"),
checksum=cert_hash,
)
if err:
logger.warning(
@ -129,9 +133,7 @@ try:
for first_server in servers:
if not first_server or (
getenv(
f"{first_server}_USE_CUSTOM_SSL", getenv("USE_CUSTOM_SSL", "no")
)
getenv(f"{first_server}_USE_CUSTOM_SSL", getenv("USE_CUSTOM_SSL", "no"))
!= "yes"
):
continue

View File

@ -6,6 +6,7 @@ from os import _exit, getenv
from pathlib import Path
from re import IGNORECASE, compile as re_compile
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from traceback import format_exc
from typing import Tuple
@ -61,10 +62,10 @@ db = Database(
logger,
sqlalchemy_string=getenv("DATABASE_URI", None),
)
lock = Lock()
status = 0
try:
# Check if at least a server has Greylist activated
greylist_activated = False
# Multisite case
@ -175,13 +176,14 @@ try:
status = 2
else:
# Update db
err = db.update_job_cache(
"greylist-download",
None,
f"{kind}.list",
content,
checksum=new_hash,
)
with lock:
err = db.update_job_cache(
"greylist-download",
None,
f"{kind}.list",
content,
checksum=new_hash,
)
if err:
logger.warning(f"Couldn't update db cache: {err}")

View File

@ -6,6 +6,7 @@ from os.path import dirname, join
from pathlib import Path
from stat import S_IEXEC
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from uuid import uuid4
from glob import glob
from json import load, loads
@ -32,6 +33,7 @@ db = Database(
logger,
sqlalchemy_string=getenv("DATABASE_URI", None),
)
lock = Lock()
status = 0
@ -54,7 +56,6 @@ def install_plugin(plugin_dir):
try:
# Check if we have plugins to download
plugin_urls = getenv("EXTERNAL_PLUGIN_URLS", "")
if not plugin_urls:
@ -109,7 +110,9 @@ try:
external_plugins.append(plugin_file)
external_plugins_ids.append(plugin_file["id"])
db_plugins = db.get_plugins()
with lock:
db_plugins = db.get_plugins()
for plugin in db_plugins:
if plugin["external"] is True and plugin["id"] not in external_plugins_ids:
external_plugins.append(plugin)
@ -121,7 +124,9 @@ try:
chmod(join(root, name), 0o770)
if external_plugins:
err = db.update_external_plugins(external_plugins)
with lock:
err = db.update_external_plugins(external_plugins)
if err:
logger.error(
f"Couldn't update external plugins to database: {err}",

View File

@ -5,6 +5,7 @@ from gzip import decompress
from os import _exit, getenv
from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from traceback import format_exc
sys_path.extend(
@ -27,6 +28,7 @@ db = Database(
logger,
sqlalchemy_string=getenv("DATABASE_URI", None),
)
lock = Lock()
status = 0
try:
@ -68,9 +70,11 @@ try:
_exit(2)
# Update db
err = db.update_job_cache(
"mmdb-asn", None, "asn.mmdb", resp.content, checksum=new_hash
)
with lock:
err = db.update_job_cache(
"mmdb-asn", None, "asn.mmdb", resp.content, checksum=new_hash
)
if err:
logger.warning(f"Couldn't update db cache: {err}")

View File

@ -5,6 +5,7 @@ from gzip import decompress
from os import _exit, getenv
from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from traceback import format_exc
sys_path.extend(
@ -27,6 +28,7 @@ db = Database(
logger,
sqlalchemy_string=getenv("DATABASE_URI", None),
)
lock = Lock()
status = 0
try:
@ -70,9 +72,11 @@ try:
_exit(2)
# Update db
err = db.update_job_cache(
"mmdb-country", None, "country.mmdb", resp.content, checksum=new_hash
)
with lock:
err = db.update_job_cache(
"mmdb-country", None, "country.mmdb", resp.content, checksum=new_hash
)
if err:
logger.warning(f"Couldn't update db cache: {err}")

View File

@ -3,6 +3,7 @@
from os import getenv, makedirs
from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from traceback import format_exc
sys_path.extend(
@ -23,6 +24,7 @@ db = Database(
logger,
sqlalchemy_string=getenv("DATABASE_URI", None),
)
lock = Lock()
status = 0
try:
@ -42,7 +44,10 @@ try:
# Cluster case
if bw_integration in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
for instance in db.get_instances():
with lock:
instances = db.get_instances()
for instance in instances:
endpoint = f"http://{instance['hostname']}:{instance['port']}"
host = instance["server_name"]
api = API(endpoint, host=host)

View File

@ -4,6 +4,7 @@ from os import getenv
from os.path import isfile
from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from traceback import format_exc
sys_path.extend(
@ -24,6 +25,7 @@ db = Database(
logger,
sqlalchemy_string=getenv("DATABASE_URI", None),
)
lock = Lock()
status = 0
try:
@ -42,7 +44,10 @@ try:
# Cluster case
if bw_integration in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
for instance in db.get_instances():
with lock:
instances = db.get_instances()
for instance in instances:
endpoint = f"http://{instance['hostname']}:{instance['port']}"
host = instance["server_name"]
api = API(endpoint, host=host)

View File

@ -8,6 +8,7 @@ from shutil import chown
from subprocess import run, DEVNULL, STDOUT
from sys import exit as sys_exit, path as sys_path
from tarfile import open as tar_open
from threading import Lock
from traceback import format_exc
sys_path.extend(
@ -28,6 +29,7 @@ db = Database(
logger,
sqlalchemy_string=getenv("DATABASE_URI", None),
)
lock = Lock()
status = 0
try:
@ -62,7 +64,10 @@ try:
tgz.seek(0, 0)
files = {"archive.tar.gz": tgz}
for instance in db.get_instances():
with lock:
instances = db.get_instances()
for instance in instances:
endpoint = f"http://{instance['hostname']}:{instance['port']}"
host = instance["server_name"]
api = API(endpoint, host=host)

View File

@ -4,6 +4,7 @@ from os import environ, getenv
from pathlib import Path
from subprocess import DEVNULL, STDOUT, run
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from traceback import format_exc
sys_path.extend(
@ -22,6 +23,7 @@ db = Database(
logger,
sqlalchemy_string=getenv("DATABASE_URI", None),
)
lock = Lock()
status = 0
@ -102,12 +104,14 @@ try:
).read_bytes()
# Update db
err = db.update_job_cache(
"certbot-new",
first_server,
"cert.pem",
cert,
)
with lock:
err = db.update_job_cache(
"certbot-new",
first_server,
"cert.pem",
cert,
)
if err:
logger.warning(f"Couldn't update db cache: {err}")
@ -140,12 +144,14 @@ try:
).read_bytes()
# Update db
err = db.update_job_cache(
"certbot-new",
first_server,
"cert.pem",
cert,
)
with lock:
err = db.update_job_cache(
"certbot-new",
first_server,
"cert.pem",
cert,
)
if err:
logger.warning(f"Couldn't update db cache: {err}")
except:

View File

@ -19,7 +19,6 @@ logger = setup_logger("DEFAULT-SERVER-CERT", getenv("LOG_LEVEL", "INFO"))
status = 0
try:
# Check if we need to generate a self-signed default cert for non-SNI "clients"
need_default_cert = False
if getenv("MULTISITE", "no") == "yes":

View File

@ -5,6 +5,7 @@ from ipaddress import ip_address, ip_network
from os import _exit, getenv
from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from traceback import format_exc
sys_path.extend(
@ -39,6 +40,7 @@ db = Database(
logger,
sqlalchemy_string=getenv("DATABASE_URI", None),
)
lock = Lock()
status = 0
try:
@ -125,13 +127,15 @@ try:
_exit(2)
# Update db
err = db.update_job_cache(
"realip-download",
None,
"combined.list",
content,
checksum=new_hash,
)
with lock:
err = db.update_job_cache(
"realip-download",
None,
"combined.list",
content,
checksum=new_hash,
)
if err:
logger.warning(f"Couldn't update db cache: {err}")

View File

@ -4,6 +4,7 @@ from os import getenv
from pathlib import Path
from subprocess import DEVNULL, STDOUT, run
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from traceback import format_exc
sys_path.extend(
@ -22,6 +23,7 @@ db = Database(
logger,
sqlalchemy_string=getenv("DATABASE_URI", None),
)
lock = Lock()
def generate_cert(first_server, days, subj):
@ -40,22 +42,22 @@ def generate_cert(first_server, days, subj):
return False, 2
# Update db
with open(f"/var/cache/bunkerweb/selfsigned/{first_server}.key", "r") as f:
key_data = f.read().encode("utf-8")
key_data = Path(f"/var/cache/bunkerweb/selfsigned/{first_server}.key").read_bytes()
err = db.update_job_cache(
"self-signed", first_server, f"{first_server}.key", key_data
)
with lock:
err = db.update_job_cache(
"self-signed", first_server, f"{first_server}.key", key_data
)
if err:
logger.warning(f"Couldn't update db cache for {first_server}.key file: {err}")
with open(f"/var/cache/bunkerweb/selfsigned/{first_server}.pem", "r") as f:
pem_data = f.read().encode("utf-8")
pem_data = Path(f"/var/cache/bunkerweb/selfsigned/{first_server}.pem").read_bytes()
err = db.update_job_cache(
"self-signed", first_server, f"{first_server}.pem", pem_data
)
with lock:
err = db.update_job_cache(
"self-signed", first_server, f"{first_server}.pem", pem_data
)
if err:
logger.warning(f"Couldn't update db cache for {first_server}.pem file: {err}")

View File

@ -6,6 +6,7 @@ from os import _exit, getenv
from pathlib import Path
from re import IGNORECASE, compile as re_compile
from sys import exit as sys_exit, path as sys_path
from threading import Lock
from traceback import format_exc
from typing import Tuple
@ -61,10 +62,10 @@ db = Database(
logger,
sqlalchemy_string=getenv("DATABASE_URI", None),
)
lock = Lock()
status = 0
try:
# Check if at least a server has Whitelist activated
whitelist_activated = False
# Multisite case
@ -175,13 +176,14 @@ try:
status = 2
else:
# Update db
err = db.update_job_cache(
"whitelist-download",
None,
f"{kind}.list",
content,
checksum=new_hash,
)
with lock:
err = db.update_job_cache(
"whitelist-download",
None,
f"{kind}.list",
content,
checksum=new_hash,
)
if err:
logger.warning(f"Couldn't update db cache: {err}")