trytond-celery_flower/test_tasks.py

44 lines
1.4 KiB
Python

#This file is part celery_flower module for Tryton.
#The COPYRIGHT file at the top level of this repository contains
#the full copyright notices and license terms.
from celery_tryton import TrytonTask
from celery import Celery
from trytond.pool import Pool
from trytond.transaction import Transaction
import os
import time
broker = 'amqp://%(user)s:%(password)s@%(host)s:%(port)s/%(vhost)s' % {
'user': os.environ.get('CELERY_USER'),
'password': os.environ.get('CELERY_PASSWORD'),
'host': os.environ.get('CELERY_HOST'),
'port': os.environ.get('CELERY_PORT'),
'vhost': os.environ.get('CELERY_VHOST'),
}
CELERY_RESULT_BACKEND = 'db+postgresql://tryton:tryton@localhost/try38celery'
celery = Celery('tests_tasks', backend=CELERY_RESULT_BACKEND, broker=broker)
celery.config_from_object('celeryconfig')
@celery.task(base=TrytonTask)
def generate_requests(user_id=None):
"""Purchase Generate Requests"""
pool = Pool()
User = pool.get('res.user')
PurchaseRequest = pool.get('purchase.request')
interval = 5
time.sleep(interval)
if not user_id:
user, = User.search([
('login', '=', 'admin'),
])
user_id = user.id
with Transaction().set_user(user_id), \
Transaction().set_context(
User.get_preferences(context_only=True)):
PurchaseRequest.generate_requests()