400 lines
10 KiB
Python
400 lines
10 KiB
Python
# -*- coding: utf-8 -*-
|
|
# This file is part of lims_interface module for Tryton.
|
|
# The COPYRIGHT file at the top level of this repository contains
|
|
# the full copyright notices and license terms.
|
|
import datetime
|
|
import formulas
|
|
import numpy as np
|
|
from dateutil.relativedelta import relativedelta
|
|
from decimal import Decimal
|
|
|
|
from trytond.model import ModelView, ModelSQL, fields
|
|
from trytond.pool import Pool
|
|
from trytond.transaction import Transaction
|
|
|
|
custom_functions = {}
|
|
|
|
|
|
def dummy_iter(alias, iteration=None):
|
|
return alias
|
|
|
|
|
|
custom_functions['ITER'] = dummy_iter
|
|
|
|
|
|
def to_str(value):
|
|
return value and str(value) or ''
|
|
|
|
|
|
custom_functions['STR'] = to_str
|
|
|
|
|
|
def concat(*args):
|
|
return ''.join([a if isinstance(a, str) else '' for a in args])
|
|
|
|
|
|
custom_functions['CONCAT'] = concat
|
|
|
|
|
|
def trunc_float(value, digits=2):
|
|
if not isinstance(value, (float, int)) or not isinstance(digits, int):
|
|
return value
|
|
pow_10 = 10 ** digits
|
|
return (float(int(value * pow_10))) / pow_10
|
|
|
|
|
|
custom_functions['TRUNC_FLOAT'] = trunc_float
|
|
|
|
|
|
def get_variable(notebook_line, variable):
|
|
pool = Pool()
|
|
NotebookLine = pool.get('lims.notebook.line')
|
|
VariableValue = pool.get('lims.interface.variable.value')
|
|
|
|
if not notebook_line or not variable:
|
|
return None
|
|
|
|
if isinstance(notebook_line, int):
|
|
notebook_line = NotebookLine(notebook_line)
|
|
|
|
analysis = notebook_line.analysis
|
|
product_type = notebook_line.product_type
|
|
matrix = notebook_line.matrix
|
|
method = notebook_line.method
|
|
|
|
res = VariableValue.get_value(variable, analysis, product_type, matrix,
|
|
method)
|
|
if res:
|
|
return res
|
|
res = VariableValue.get_value(variable, analysis, product_type, matrix)
|
|
if res:
|
|
return res
|
|
res = VariableValue.get_value(variable, analysis, product_type)
|
|
if res:
|
|
return res
|
|
res = VariableValue.get_value(variable, analysis, method)
|
|
if res:
|
|
return res
|
|
res = VariableValue.get_value(variable, analysis)
|
|
if res:
|
|
return res
|
|
return None
|
|
|
|
|
|
custom_functions['VAR'] = get_variable
|
|
|
|
|
|
def get_constant(name, parameter1=None, parameter2=None, parameter3=None,
|
|
value=None):
|
|
pool = Pool()
|
|
Constant = pool.get('lims.interface.constant')
|
|
if not name:
|
|
return None
|
|
if not value:
|
|
value = 'value1'
|
|
if str(parameter1) == '#VALUE!':
|
|
parameter1 = None
|
|
if str(parameter2) == '#VALUE!':
|
|
parameter2 = None
|
|
if str(parameter3) == '#VALUE!':
|
|
parameter3 = None
|
|
return Constant.get_constant(name, parameter1, parameter2, parameter3,
|
|
value)
|
|
|
|
|
|
custom_functions['CONSTANT'] = get_constant
|
|
|
|
|
|
def _get_column_name(alias, iteration=None):
|
|
if not iteration:
|
|
return alias
|
|
parser = formulas.Parser()
|
|
ast = parser.ast('=%s' % str(iteration))[1].compile()
|
|
iteration = str(ast())
|
|
return '%s_%s' % (alias, iteration)
|
|
|
|
|
|
def get_column_value(notebook_line, alias, iteration=None):
|
|
pool = Pool()
|
|
Data = pool.get('lims.interface.data')
|
|
|
|
if not notebook_line or not alias:
|
|
return None
|
|
|
|
if not isinstance(notebook_line, int):
|
|
notebook_line = notebook_line.id
|
|
|
|
compilation_id = Transaction().context.get('lims_interface_compilation')
|
|
if not compilation_id:
|
|
return None
|
|
|
|
lines = Data.search([
|
|
('compilation', '=', compilation_id),
|
|
('notebook_line', '=', notebook_line),
|
|
], limit=1)
|
|
target_line = lines and lines[0] or None
|
|
if not target_line:
|
|
return None
|
|
|
|
target_field = _get_column_name(alias, iteration)
|
|
if not hasattr(target_line, target_field):
|
|
return None
|
|
|
|
return getattr(target_line, target_field)
|
|
|
|
|
|
custom_functions['V'] = get_column_value
|
|
|
|
|
|
def _td_to_time(td_value):
|
|
hours, remainder = divmod(td_value.seconds, 3600)
|
|
minutes, seconds = divmod(remainder, 60)
|
|
return datetime.time(hours, minutes, seconds)
|
|
|
|
|
|
def time_diff(time_1, time_2, uom=False, return_delta=False):
|
|
uoms = {
|
|
'H': lambda x: x.seconds / 3600,
|
|
'M': lambda x: x.seconds / 60,
|
|
'S': lambda x: x.seconds,
|
|
'MS': lambda x: x.seconds * 1000,
|
|
}
|
|
if not time_1 or not time_2 or time_1 < time_2:
|
|
return None
|
|
datetime1 = datetime.datetime.combine(datetime.date.today(), time_1)
|
|
datetime2 = datetime.datetime.combine(datetime.date.today(), time_2)
|
|
delta_difference = datetime1 - datetime2
|
|
if uom:
|
|
if uom not in uoms:
|
|
return None
|
|
return uoms[uom](delta_difference)
|
|
if return_delta:
|
|
return delta_difference
|
|
return _td_to_time(delta_difference)
|
|
|
|
|
|
custom_functions['TIMEDIF'] = time_diff
|
|
|
|
|
|
def to_time(value, uom):
|
|
uoms = {
|
|
'H': lambda x: datetime.timedelta(hours=x),
|
|
'M': lambda x: datetime.timedelta(minutes=x),
|
|
'S': lambda x: datetime.timedelta(seconds=x),
|
|
'MS': lambda x: datetime.timedelta(seconds=x/1000),
|
|
}
|
|
res = None
|
|
try:
|
|
value = float(value)
|
|
except (ValueError, TypeError):
|
|
value = False
|
|
|
|
if type(value) is float and value == 0:
|
|
res = datetime.timedelta(hours=0)
|
|
elif not value or not type(value) is float or not uom or uom not in uoms:
|
|
res = None
|
|
else:
|
|
res = _td_to_time(uoms[uom](value))
|
|
|
|
return res
|
|
|
|
|
|
custom_functions['TOTIME'] = to_time
|
|
|
|
|
|
def float_to_delta(value, uom):
|
|
uoms = {
|
|
'MO': lambda x: relativedelta(months=x),
|
|
'W': lambda x: datetime.timedelta(days=x*7),
|
|
'D': lambda x: datetime.timedelta(days=x),
|
|
'H': lambda x: datetime.timedelta(hours=x),
|
|
'M': lambda x: datetime.timedelta(minutes=x),
|
|
'S': lambda x: datetime.timedelta(seconds=x),
|
|
}
|
|
return uoms.get(uom, lambda x: False)(value)
|
|
|
|
|
|
def date_add(base_date, value, uom):
|
|
res = None
|
|
try:
|
|
value = float(value)
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
if not type(base_date) in [datetime.date, datetime.datetime]:
|
|
return None
|
|
if type(base_date) is datetime.date and uom in ['H', 'M', 'S']:
|
|
return base_date
|
|
# Float is not allowed for month values, because its ambiguos
|
|
if uom == 'MO':
|
|
value = int(value)
|
|
delta = float_to_delta(value, uom)
|
|
res = base_date + delta if delta else None
|
|
return res
|
|
|
|
|
|
custom_functions['DATEADD'] = date_add
|
|
|
|
|
|
def date_diff(date_1, date_2, uom, return_delta=False):
|
|
uoms = {
|
|
'D': lambda x: x.days,
|
|
'W': lambda x: round((x.days / 7), 2),
|
|
'MO': lambda x: round((x.days / 30), 2),
|
|
'Y': lambda x: round((x.days / 365), 2),
|
|
}
|
|
if not date_1 or not date_2 or date_1 < date_2:
|
|
return None
|
|
if not type(date_1) is datetime.date or not type(date_2) is datetime.date:
|
|
return None
|
|
delta_difference = date_1 - date_2
|
|
if uom:
|
|
if uom not in uoms:
|
|
return None
|
|
res = uoms[uom](delta_difference)
|
|
if return_delta:
|
|
res = delta_difference
|
|
return res
|
|
|
|
|
|
custom_functions['DATEDIFF'] = date_diff
|
|
|
|
|
|
def get_date_list(dates):
|
|
date_list = []
|
|
|
|
def isValid(date):
|
|
return isinstance(date, datetime.date) or isinstance(date, datetime.datetime)
|
|
try:
|
|
date_list = [date for date in dates[0] if isValid(date)]
|
|
except IndexError:
|
|
return date_list
|
|
return date_list
|
|
|
|
|
|
def max_date(dates):
|
|
res = None
|
|
date_list = get_date_list(dates)
|
|
if len(date_list):
|
|
res = max(d for d in date_list)
|
|
|
|
return res
|
|
|
|
|
|
custom_functions['MAXDATE'] = max_date
|
|
|
|
|
|
def min_date(dates):
|
|
res = None
|
|
date_list = get_date_list(dates)
|
|
if len(date_list):
|
|
res = min(d for d in date_list)
|
|
|
|
return res
|
|
|
|
|
|
custom_functions['MINDATE'] = min_date
|
|
|
|
|
|
def slope(yp, xp):
|
|
items_to_delete, i = [], 0
|
|
for y1 in yp:
|
|
for val in y1:
|
|
if not isinstance(val, (int, float, Decimal)):
|
|
items_to_delete.append(i)
|
|
i += 1
|
|
if items_to_delete:
|
|
yp = np.delete(yp, items_to_delete, axis=1)
|
|
xp = np.delete(xp, items_to_delete, axis=1)
|
|
if any(val is None for x1 in xp for val in x1):
|
|
return None
|
|
|
|
return formulas.functions.wrap_func(formulas.functions.stat.xslope)(yp, xp)
|
|
|
|
|
|
custom_functions['SLOPE'] = slope
|
|
|
|
|
|
def intercept(y, x):
|
|
items_to_delete, i = [], 0
|
|
for y1 in y:
|
|
for val in y1:
|
|
if not isinstance(val, (int, float, Decimal)):
|
|
items_to_delete.append(i)
|
|
i += 1
|
|
if items_to_delete:
|
|
y = np.delete(y, items_to_delete, axis=1)
|
|
x = np.delete(x, items_to_delete, axis=1)
|
|
if any(val is None for x1 in x for val in x1):
|
|
return None
|
|
|
|
y, x = y[0].astype(float), x[0].astype(float)
|
|
if len(y) < 2 or len(x) < 2:
|
|
return None
|
|
|
|
def _mean(l):
|
|
return sum(l) / len(l)
|
|
|
|
def _multiply(l1, l2):
|
|
return [a * b for a, b in zip(l1, l2)]
|
|
|
|
m = ((_mean(x) * _mean(y) - _mean(_multiply(x, y))) /
|
|
(_mean(x) ** 2 - _mean(_multiply(x, x))))
|
|
b = _mean(y) - m * _mean(x)
|
|
return b
|
|
|
|
|
|
custom_functions['INTERCEPT'] = intercept
|
|
|
|
|
|
def rsq(y, x):
|
|
items_to_delete, i = [], 0
|
|
for y1 in y:
|
|
for val in y1:
|
|
if not isinstance(val, (int, float, Decimal)):
|
|
items_to_delete.append(i)
|
|
i += 1
|
|
if items_to_delete:
|
|
y = np.delete(y, items_to_delete, axis=1)
|
|
x = np.delete(x, items_to_delete, axis=1)
|
|
if any(val is None for x1 in x for val in x1):
|
|
return None
|
|
|
|
y, x = y[0].astype(float), x[0].astype(float)
|
|
if len(y) < 2 or len(x) < 2:
|
|
return None
|
|
|
|
zx = (x - np.mean(x)) / np.std(x, ddof=1)
|
|
zy = (y - np.mean(y)) / np.std(y, ddof=1)
|
|
r = np.sum(zx * zy) / (len(x) - 1)
|
|
return r ** 2
|
|
|
|
|
|
custom_functions['RSQ'] = rsq
|
|
|
|
|
|
def scientific_notation(value, decimals=2):
|
|
res = ''
|
|
if not value:
|
|
return res
|
|
try:
|
|
res = ("{0:.%ie}" % (decimals)).format(float(value))
|
|
except (TypeError, ValueError):
|
|
pass
|
|
return res
|
|
|
|
|
|
custom_functions['SCIENTIFIC_NOTATION'] = scientific_notation
|
|
|
|
|
|
class Function(ModelSQL, ModelView):
|
|
'Interface Function'
|
|
__name__ = 'lims.interface.function'
|
|
|
|
name = fields.Char('Name', required=True)
|
|
parameters = fields.Char('Parameters')
|
|
help = fields.Text('Help')
|
|
|
|
def get_rec_name(self, name):
|
|
return '%s(%s)' % (self.name, self.parameters)
|