# encoding: utf-8 # The COPYRIGHT file at the top level of this repository contains the full # copyright notices and license terms. import datetime as mdatetime import pytz import logging import os import sys import time import unicodedata import json from sql import Null, Column from sql.operators import Or from datetime import datetime, timedelta from collections import defaultdict from io import BytesIO from trytond.wizard import Wizard, StateView, StateAction, StateTransition, \ StateReport, Button from trytond.model import ModelSQL, ModelView, fields, Unique, Check, \ sequence_ordered from trytond.model.fields import depends from trytond.pyson import If, Eval, Bool, PYSONEncoder, Id, In, Not, \ PYSONDecoder from trytond.pool import Pool, PoolMeta from trytond.transaction import Transaction from trytond.tools import grouped_slice from trytond.config import config as config_ from trytond import backend from trytond.protocols.jsonrpc import JSONDecoder, JSONEncoder from trytond.modules.html_report.html_report import HTMLReport from .babi_eval import babi_eval __all__ = ['Filter', 'Expression', 'Report', 'ReportGroup', 'Dimension', 'DimensionColumn', 'Measure', 'InternalMeasure', 'Order', 'ActWindow', 'Menu', 'Keyword', 'Model', 'OpenChartStart', 'OpenChart', 'ReportExecution', 'OpenExecutionSelect', 'OpenExecution', 'UpdateDataWizardStart', 'UpdateDataWizardUpdated', 'FilterParameter', 'CleanExecutionsStart', 'CleanExecutions', 'BabiHTMLReport'] FIELD_TYPES = [ ('char', 'Char'), ('integer', 'Integer'), ('float', 'Float'), ('numeric', 'Numeric'), ('boolean', 'Boolean'), ('many2one', 'Many To One'), ] AGGREGATE_TYPES = [ ('avg', 'Average'), ('sum', 'Sum'), ('count', 'Count'), ('max', 'Max'), ('min', 'Min'), ] SRC_CHARS = u""" .'"()/*-+?¿!&$[]{}@#`'^:;<>=~%,|\\""" DST_CHARS = u"""__________________________________""" BABI_CELERY = config_.getboolean('babi', 'celery', default=True) BABI_RETENTION_DAYS = config_.getint('babi', 'retention_days', default=30) BABI_CELERY_TASK = config_.get('babi', 'celery_task', default='trytond.modules.babi.tasks.calculate_execution') BABI_MAX_BD_COLUMN = config_.getint('babi', 'max_db_column', default=60) try: import celery except ImportError: celery = None except AttributeError: # If run from within frepple we will get # AttributeError: 'module' object has no attribute 'argv' pass CELERY_CONFIG = config_.get('celery', 'config', default='trytond.modules.babi.celeryconfig') CELERY_BROKER = 'amqp://%(user)s:%(password)s@%(host)s:%(port)s/%(vhost)s' % { 'user': config_.get('celery', 'user', default='guest'), 'password': config_.get('celery', 'password', default='guest'), 'host': config_.get('celery', 'host', default='localhost'), 'port': config_.getint('celery', 'port', default=5672), 'vhost': config_.get('celery', 'vhost', default='/'), } logger = logging.getLogger(__name__) def unaccent(text): if not (isinstance(text, str) or isinstance(text, unicode)): return str(text) if isinstance(text, str) and bytes == str: text = unicode(text, 'utf-8') text = text.lower() for c in xrange(len(SRC_CHARS)): if c >= len(DST_CHARS): break text = text.replace(SRC_CHARS[c], DST_CHARS[c]) text = unicodedata.normalize('NFKD', text) if bytes == str: text = text.encode('ASCII', 'ignore') return text def _replace(x): return x.replace("'", '') class DynamicModel(ModelSQL, ModelView): @classmethod def __setup__(cls): super(DynamicModel, cls).__setup__() cls._error_messages.update({ 'report_not_exists': ('Report "%s" no longer exists or you do ' 'not have the rights to access it.'), }) pool = Pool() Execution = pool.get('babi.report.execution') executions = Execution.search([ ('babi_model.model', '=', cls.__name__), ]) if not executions or len(executions) > 1: return execution, = executions try: cls._order = execution.get_orders() except AssertionError: # An exception error is raisen on tests where Execution is not # properly loaded in the pool pass @classmethod def fields_view_get(cls, view_id=None, view_type='form'): pool = Pool() Execution = pool.get('babi.report.execution') Dimension = pool.get('babi.dimension') InternalMeasure = pool.get('babi.internal.measure') model_name = '_'.join(cls.__name__.split('_')[0:2]) executions = Execution.search([ ('babi_model.model', '=', cls.__name__), ], limit=1) if not executions: cls.raise_user_error('report_not_exists', cls.__name__) context = Transaction().context execution, = executions with Transaction().set_context(_datetime=execution.create_date): report = execution.report view_type = context.get('view_type', view_type) result = {} result['type'] = view_type result['view_id'] = view_id result['field_childs'] = None fields = [] if view_type == 'tree' or view_type == 'form': keyword = '' if view_type == 'tree': keyword = 'keyword_open="1"' fields.append('children') xml = '<%s string="%s" %s>\n' % (view_type, report.model.name, keyword) for field in report.dimensions + execution.internal_measures: # Avoid duplicated fields if field.internal_name in fields: continue if view_type == 'form': xml += '