Thread the jobs run_once
This commit is contained in:
parent
89e8839bbb
commit
fa98003f22
|
@ -1,8 +1,10 @@
|
|||
from copy import deepcopy
|
||||
from glob import glob
|
||||
from json import loads
|
||||
from logging import Logger
|
||||
from os import environ, getenv
|
||||
from subprocess import DEVNULL, PIPE, STDOUT, run
|
||||
from threading import Lock, Thread
|
||||
from schedule import (
|
||||
clear as schedule_clear,
|
||||
every as schedule_every,
|
||||
|
@ -36,6 +38,8 @@ class JobScheduler(ApiCaller):
|
|||
self.__env.update(environ)
|
||||
self.__jobs = self.__get_jobs()
|
||||
self.__lock = lock
|
||||
self.__thread_lock = Lock()
|
||||
self.__job_success = True
|
||||
|
||||
def __get_jobs(self):
|
||||
jobs = {}
|
||||
|
@ -101,7 +105,6 @@ class JobScheduler(ApiCaller):
|
|||
self.__logger.info(
|
||||
f"Executing job {name} from plugin {plugin} ...",
|
||||
)
|
||||
success = True
|
||||
try:
|
||||
proc = run(
|
||||
f"{path}/jobs/{file}",
|
||||
|
@ -109,29 +112,31 @@ class JobScheduler(ApiCaller):
|
|||
stderr=STDOUT,
|
||||
env=self.__env,
|
||||
)
|
||||
except:
|
||||
self.__logger.error(
|
||||
f"Exception while executing job {name} from plugin {plugin} :\n{format_exc()}",
|
||||
)
|
||||
success = False
|
||||
if success and proc.returncode >= 2:
|
||||
self.__logger.error(
|
||||
f"Error while executing job {name} from plugin {plugin}",
|
||||
)
|
||||
success = False
|
||||
except BaseException:
|
||||
with self.__thread_lock:
|
||||
self.__logger.error(
|
||||
f"Exception while executing job {name} from plugin {plugin} :\n{format_exc()}",
|
||||
)
|
||||
self.__job_success = False
|
||||
|
||||
err = self.__db.update_job(plugin, name, success)
|
||||
if self.__job_success and proc.returncode >= 2:
|
||||
with self.__thread_lock:
|
||||
self.__logger.error(
|
||||
f"Error while executing job {name} from plugin {plugin}",
|
||||
)
|
||||
self.__job_success = False
|
||||
|
||||
if not err:
|
||||
self.__logger.info(
|
||||
f"Successfully updated database for the job {name} from plugin {plugin}",
|
||||
)
|
||||
else:
|
||||
self.__logger.warning(
|
||||
f"Failed to update database for the job {name} from plugin {plugin}: {err}",
|
||||
)
|
||||
with self.__thread_lock:
|
||||
err = self.__db.update_job(plugin, name, self.__job_success)
|
||||
|
||||
return success
|
||||
if not err:
|
||||
self.__logger.info(
|
||||
f"Successfully updated database for the job {name} from plugin {plugin}",
|
||||
)
|
||||
else:
|
||||
self.__logger.warning(
|
||||
f"Failed to update database for the job {name} from plugin {plugin}: {err}",
|
||||
)
|
||||
|
||||
def setup(self):
|
||||
for plugin, jobs in self.__jobs.items():
|
||||
|
@ -184,19 +189,25 @@ class JobScheduler(ApiCaller):
|
|||
|
||||
def run_once(self):
|
||||
ret = True
|
||||
threads = []
|
||||
for plugin, jobs in self.__jobs.items():
|
||||
for job in jobs:
|
||||
try:
|
||||
path = job["path"]
|
||||
name = job["name"]
|
||||
file = job["file"]
|
||||
if self.__job_wrapper(path, plugin, name, file) >= 2:
|
||||
ret = False
|
||||
except BaseException:
|
||||
ret = False
|
||||
self.__logger.error(
|
||||
f"Exception while running jobs once for plugin {plugin} : {format_exc()}",
|
||||
)
|
||||
path = job["path"]
|
||||
name = job["name"]
|
||||
file = job["file"]
|
||||
thread = Thread(
|
||||
target=self.__job_wrapper, args=(path, plugin, name, file)
|
||||
)
|
||||
threads.append(thread)
|
||||
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
ret = self.__job_success
|
||||
self.__job_success = True
|
||||
|
||||
return ret
|
||||
|
||||
|
|
Loading…
Reference in New Issue