Remove celery support and use Tryton queues instead.

This commit is contained in:
Albert Cervera i Areny 2021-10-14 18:05:21 +02:00
parent 8837c6a552
commit 2e302268c5
5 changed files with 42 additions and 136 deletions

97
babi.py
View File

@ -62,30 +62,9 @@ AGGREGATE_TYPES = [
SRC_CHARS = """ .'"()/*-+?¿!&$[]{}@#`'^:;<>=~%,|\\"""
DST_CHARS = """__________________________________"""
BABI_CELERY = config_.getboolean('babi', 'celery', default=True)
BABI_RETENTION_DAYS = config_.getint('babi', 'retention_days', default=30)
BABI_CELERY_TASK = config_.get('babi', 'celery_task',
default='trytond.modules.babi.tasks.calculate_execution')
BABI_MAX_BD_COLUMN = config_.getint('babi', 'max_db_column', default=60)
try:
import celery
except ImportError:
celery = None
except AttributeError:
# If run from within frepple we will get
# AttributeError: 'module' object has no attribute 'argv'
pass
CELERY_CONFIG = config_.get('celery', 'config',
default='trytond.modules.babi.celeryconfig')
CELERY_BROKER = 'amqp://%(user)s:%(password)s@%(host)s:%(port)s/%(vhost)s' % {
'user': config_.get('celery', 'user', default='guest'),
'password': config_.get('celery', 'password', default='guest'),
'host': config_.get('celery', 'host', default='localhost'),
'port': config_.getint('celery', 'port', default=5672),
'vhost': config_.get('celery', 'vhost', default='/'),
}
logger = logging.getLogger(__name__)
@ -903,8 +882,28 @@ class Report(ModelSQL, ModelView):
@classmethod
def calculate_babi_report(cls, reports):
"""Calculate reports and send email (from cron)"""
HTMLReport = Pool().get('babi.report.html_report', type='report')
'''
Creates an execution, calculates it and sends e-mail if necessary.
Better call this method with a single report so transaction does not
last for so long.
'''
pool = Pool()
Execution = pool.get('babi.report.execution')
HTMLReport = pool.get('babi.report.html_report', type='report')
executions = []
for report in reports:
if not report.measures:
raise UserError(gettext('babi.no_measures',
report=report.rec_name))
if not report.dimensions:
raise UserError(gettext('babi.no_dimensions',
report=report.rec_name))
execution, = Execution.create([report.get_execution_data()])
Transaction().commit()
Execution.calculate([execution])
executions.append(execution)
executions = cls.calculate(reports)
for execution in executions:
@ -932,59 +931,25 @@ class Report(ModelSQL, ModelView):
part = MIMEBase('application', 'octet-stream')
part.set_payload(report[1])
encoders.encode_base64(part)
part.add_header('Content-Disposition', 'attachment; filename=report.pdf')
part.add_header('Content-Disposition',
'attachment; filename=report.pdf')
msg.attach(part)
try:
server = execution.report.smtp
to_addrs = [a for _, a in getaddresses([execution.report.to])]
to_addrs = [a for _, a in
getaddresses([execution.report.to])]
server.send_mail(msg['From'], to_addrs, msg.as_string())
logger.info('Send email report: %s' % (execution.report.rec_name))
logger.info('Send email report: %s'
% execution.report.rec_name)
except Exception as exception:
logger.error('Unable to delivery email report: %s:\n %s' % (
execution.report.rec_name, exception))
def execute(self, execution):
Execution = Pool().get('babi.report.execution')
transaction = Transaction()
user = transaction.user
database_name = transaction.database.name
logger.info('Babi execution %s (report "%s")' % (
execution.id, self.rec_name))
if celery and BABI_CELERY:
os.system(
'%s/celery call %s '
'--broker=%s --args=[%d,%d] --config="%s" --queue=%s' % (
os.path.dirname(sys.executable),
BABI_CELERY_TASK,
CELERY_BROKER,
execution.id,
user,
CELERY_CONFIG,
database_name))
else:
# Fallback to synchronous mode if celery is not available
Execution.calculate([execution])
@classmethod
@ModelView.button
def calculate(cls, reports):
Execution = Pool().get('babi.report.execution')
executions = []
for report in reports:
if not report.measures:
raise UserError(gettext('babi.no_measures',
report=report.rec_name))
if not report.dimensions:
raise UserError(gettext('babi.no_dimensions',
report=report.rec_name))
execution, = Execution.create([report.get_execution_data()])
Transaction().commit()
executions.append(execution)
report.execute(execution)
with Transaction().set_context(queue_name='babi'):
for report in reports:
cls.__queue__.calculate_reports([report])
class ReportExecution(ModelSQL, ModelView):

View File

@ -1,14 +0,0 @@
# The COPYRIGHT file at the top level of this repository contains the full
# copyright notices and license terms.
import os
TRYTON_DATABASE = os.environ.get('TRYTON_DATABASE')
TRYTON_CONFIG = os.environ.get('TRYTON_CONFIG')
# Enable this options to debug. More info in celery page.
CELERY_ALWAYS_EAGER = False
CELERY_EAGER_PROPAGATES_EXCEPTIONS = False
CELERY_TASK_TIME_LIMIT = 7200
CELERYD_TASK_SOFT_TIME_LIMIT = 7200
CELERY_MAX_TASKS_PER_CHILD = 2
CELERY_ACCEPT_CONTENT = ['json']

View File

@ -38,6 +38,7 @@ class Cron(metaclass=PoolMeta):
for cron in babi_crons:
# babi execution require company. Run calculate when has a company
for company in cron.companies:
with Transaction().set_context(company=company.id):
with Transaction().set_context(company=company.id,
queue_name='babi'):
BabiReport.calculate_babi_report([cron.babi_report])
return super(Cron, cls).run_once(list(set(crons) - set(babi_crons)))

View File

@ -1,46 +0,0 @@
Introduction
============
Babi is fully integrated with Celery_. With this integration you can execute
report calculations in background (on a Celery worker).
Configuration
=============
This is an example of a simple configuration and only recomended for testing
envirorments. For more information, please refer to the `Celery Documentation`_
Installing Celery
-----------------
You must have the celery package installed on your system (or virtualenv). It
can be installed with the following command::
pip install celery
Celery needs a message queue in order to work. The default one is RabbitMQ. On
Debian (and derivates) you can install it with the following command::
apt-get install rabbitmq-server
Launching workers
-----------------
The trytond server will launch workers for each database when opening the pool.
This workers use the config defined in celeryconfig.py file from babi directory.
In order to add more workers on a database you must execute the following
command from the modules/babi directory::
celery worker --app=tasks --queue=database --config=celeryconfig
The default config file uses TRYTON_DATABASE and TRYTON_CONFIG
environment variables, so you must define it otherwise the report executions
will fail.
To be able to have multiple workers on the same host with different database,
the database name is used as queue name.
.. _Celery: http://www.celeryproject.org
.. _Celery Documentation: http://docs.celeryproject.org/en/latest/index.html

View File

@ -125,7 +125,7 @@ class BaBITestCase(ModuleTestCase):
'timeout': 30,
}])
self.assertEqual(len(report.order), 0)
self.assertRaises(UserError, Report.calculate, [report])
self.assertRaises(UserError, Report.calculate_reports, [report])
category, = Expression.search([('name', '=', 'Category')])
category, = Dimension.create([{
@ -134,7 +134,7 @@ class BaBITestCase(ModuleTestCase):
'expression': category.id,
}])
self.assertRaises(UserError, Report.calculate, [report])
self.assertRaises(UserError, Report.calculate_reports, [report])
amount, = Expression.search([('name', '=', 'Amount')])
amount, = Measure.create([{
@ -162,7 +162,7 @@ class BaBITestCase(ModuleTestCase):
self.assertIsNone(amount_this_month_order.dimension)
self.assertIsNotNone(amount_this_month_order.measure)
Report.calculate([report])
Report.calculate_reports([report])
report, = Report.search([])
execution, = report.executions
@ -221,7 +221,7 @@ class BaBITestCase(ModuleTestCase):
'expression': month.id,
}])
Report.calculate([report])
Report.calculate_reports([report])
report, = Report.search([])
self.assertEqual(len(report.executions), 2)
@ -320,7 +320,7 @@ class BaBITestCase(ModuleTestCase):
'aggregate': 'count',
}])
Report.calculate([report])
Report.calculate_reports([report])
report = Report(report.id)
execution, = report.executions
@ -387,7 +387,7 @@ class BaBITestCase(ModuleTestCase):
'name': 'Amount',
'aggregate': 'avg',
}])
Report.calculate([report])
Report.calculate_reports([report])
report = Report(report.id)
execution, = report.executions
@ -462,7 +462,7 @@ class BaBITestCase(ModuleTestCase):
'aggregate': 'sum',
}])
Report.calculate([report])
Report.calculate_reports([report])
report = Report(report.id)
execution, = report.executions
@ -513,7 +513,7 @@ class BaBITestCase(ModuleTestCase):
'aggregate': 'sum',
}])
Report.calculate([report])
Report.calculate_reports([report])
report = Report(report.id)
execution, = report.executions
@ -575,7 +575,7 @@ class BaBITestCase(ModuleTestCase):
'aggregate': 'sum',
}])
Report.calculate([report])
Report.calculate_reports([report])
report = Report(report.id)
execution, = report.executions