Thread the jobs run_once

This commit is contained in:
TheophileDiot 2022-11-17 19:12:52 +01:00
parent 89e8839bbb
commit fa98003f22
1 changed files with 43 additions and 32 deletions

View File

@ -1,8 +1,10 @@
from copy import deepcopy
from glob import glob
from json import loads
from logging import Logger
from os import environ, getenv
from subprocess import DEVNULL, PIPE, STDOUT, run
from threading import Lock, Thread
from schedule import (
clear as schedule_clear,
every as schedule_every,
@ -36,6 +38,8 @@ class JobScheduler(ApiCaller):
self.__env.update(environ)
self.__jobs = self.__get_jobs()
self.__lock = lock
self.__thread_lock = Lock()
self.__job_success = True
def __get_jobs(self):
jobs = {}
@ -101,7 +105,6 @@ class JobScheduler(ApiCaller):
self.__logger.info(
f"Executing job {name} from plugin {plugin} ...",
)
success = True
try:
proc = run(
f"{path}/jobs/{file}",
@ -109,29 +112,31 @@ class JobScheduler(ApiCaller):
stderr=STDOUT,
env=self.__env,
)
except:
self.__logger.error(
f"Exception while executing job {name} from plugin {plugin} :\n{format_exc()}",
)
success = False
if success and proc.returncode >= 2:
self.__logger.error(
f"Error while executing job {name} from plugin {plugin}",
)
success = False
except BaseException:
with self.__thread_lock:
self.__logger.error(
f"Exception while executing job {name} from plugin {plugin} :\n{format_exc()}",
)
self.__job_success = False
err = self.__db.update_job(plugin, name, success)
if self.__job_success and proc.returncode >= 2:
with self.__thread_lock:
self.__logger.error(
f"Error while executing job {name} from plugin {plugin}",
)
self.__job_success = False
if not err:
self.__logger.info(
f"Successfully updated database for the job {name} from plugin {plugin}",
)
else:
self.__logger.warning(
f"Failed to update database for the job {name} from plugin {plugin}: {err}",
)
with self.__thread_lock:
err = self.__db.update_job(plugin, name, self.__job_success)
return success
if not err:
self.__logger.info(
f"Successfully updated database for the job {name} from plugin {plugin}",
)
else:
self.__logger.warning(
f"Failed to update database for the job {name} from plugin {plugin}: {err}",
)
def setup(self):
for plugin, jobs in self.__jobs.items():
@ -184,19 +189,25 @@ class JobScheduler(ApiCaller):
def run_once(self):
ret = True
threads = []
for plugin, jobs in self.__jobs.items():
for job in jobs:
try:
path = job["path"]
name = job["name"]
file = job["file"]
if self.__job_wrapper(path, plugin, name, file) >= 2:
ret = False
except BaseException:
ret = False
self.__logger.error(
f"Exception while running jobs once for plugin {plugin} : {format_exc()}",
)
path = job["path"]
name = job["name"]
file = job["file"]
thread = Thread(
target=self.__job_wrapper, args=(path, plugin, name, file)
)
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
ret = self.__job_success
self.__job_success = True
return ret