kalenislims/lims_interface/data.py

1054 lines
37 KiB
Python

# -*- coding: utf-8 -*-
# This file is part of lims_interface module for Tryton.
# The COPYRIGHT file at the top level of this repository contains
# the full copyright notices and license terms.
import collections.abc
from sql import (Table as SqlTable, Column as SqlColumn, Literal,
Desc, Asc, NullsFirst, NullsLast)
from sql.aggregate import Count
import formulas
import schedula
import datetime
from decimal import Decimal
from itertools import chain
from collections import defaultdict
from trytond.model import ModelSQL, ModelView, fields
from trytond.pool import Pool, PoolMeta
from trytond.transaction import Transaction
from trytond.tools import cursor_dict
from trytond.pyson import PYSONEncoder, Eval
from trytond.rpc import RPC
from trytond.exceptions import UserError
from trytond.cache import LRUDictTransaction
from trytond.model.model import record as model_record
from trytond.model.modelstorage import cache_size as model_cache_size
from trytond.model.modelsql import convert_from
from .interface import FIELD_TYPE_TRYTON, FIELD_TYPE_CAST
ALLOWED_RESULT_TYPES = (str, int, float, Decimal, datetime.time,
datetime.date, datetime.timedelta, type(None))
def data_record(name, field_names):
res = model_record(name, field_names)
def _setitem(self, field, value):
try:
return setattr(self, field, value)
except AttributeError:
pass
res._setitem = _setitem
def _update(self, _other=None, **kwargs):
if isinstance(_other, collections.abc.Mapping):
_other = _other.items()
elif _other is None:
_other = []
chained = chain(_other, kwargs.items())
for key, value in chained:
try:
setattr(self, key, value)
except AttributeError:
pass
res._update = _update
return res
class Adapter:
def __getattr__(self, name):
fields = self.get_fields()
return getattr(fields, name)
def __contains__(self, key):
fields = self.get_fields()
return fields.__contains__(key)
def __iter__(self):
fields = self.get_fields()
return fields.__iter__()
def __getitem__(self, name):
fields = self.get_fields()
return fields.__getitem__(name)
def get_fields(self):
# TODO: Cache
Data = Pool().get('lims.interface.data')
table = Data.get_table()
if not table:
return Data._previous_fields
res = {}
groups = 0
for field in table.fields_:
if field.type == 'char':
obj = fields.Char(field.string)
elif field.type == 'multiline':
obj = fields.Text(field.string)
elif field.type == 'integer':
obj = fields.Integer(field.string)
elif field.type == 'float':
obj = fields.Float(field.string)
elif field.type == 'boolean':
obj = fields.Boolean(field.string)
elif field.type == 'numeric':
obj = fields.Numeric(field.string)
elif field.type == 'date':
obj = fields.Date(field.string)
elif field.type == 'datetime':
obj = fields.DateTime(field.string)
elif field.type == 'timestamp':
obj = fields.Timestamp(field.string)
elif field.type == 'time':
obj = fields.Time(field.string)
elif field.type == 'timedelta':
obj = fields.TimeDelta(field.string)
elif field.type == 'many2one':
obj = fields.Many2One(field.related_model.model, field.string)
elif field.type in ('binary', 'icon'):
obj = fields.Binary(field.string)
elif field.type == 'selection':
selection = [tuple(v.split(':', 1))
for v in field.selection.splitlines() if v]
obj = fields.Selection(selection, field.string)
obj.name = field.name
res[field.name] = obj
groups = max(groups, field.group or 0)
obj = fields.Integer('ID')
obj.name = 'id'
res['id'] = obj
obj = fields.Many2One('lims.interface.compilation', 'Compilation')
obj.name = 'compilation'
res['compilation'] = obj
obj = fields.Boolean('Annulled')
obj.name = 'annulled'
res['annulled'] = obj
obj = fields.Many2One('lims.notebook.line', 'Notebook Line')
obj.name = 'notebook_line'
obj.readonly = True
res['notebook_line'] = obj
for i in range(0, groups):
obj = fields.One2Many(
'lims.interface.grouped_data', 'data', 'Group %s' % (i + 1, ))
obj.name = 'group_%s' % (i + 1, )
res[obj.name] = obj
return res
class GroupedAdapter:
def __getattr__(self, name):
fields = self.get_fields()
return getattr(fields, name)
def __contains__(self, key):
fields = self.get_fields()
return fields.__contains__(key)
def __iter__(self):
fields = self.get_fields()
return fields.__iter__()
def __getitem__(self, name):
fields = self.get_fields()
return fields.__getitem__(name)
def get_fields(self):
GroupedData = Pool().get('lims.interface.grouped_data')
table = GroupedData.get_table()
if not table:
return GroupedData._previous_fields
res = {}
for field in table.grouped_fields_:
if field.type == 'char':
obj = fields.Char(field.string)
elif field.type == 'multiline':
obj = fields.Text(field.string)
elif field.type == 'integer':
obj = fields.Integer(field.string)
elif field.type == 'float':
obj = fields.Float(field.string)
elif field.type == 'boolean':
obj = fields.Boolean(field.string)
elif field.type == 'numeric':
obj = fields.Numeric(field.string)
elif field.type == 'date':
obj = fields.Date(field.string)
elif field.type == 'datetime':
obj = fields.DateTime(field.string)
elif field.type == 'timestamp':
obj = fields.Timestamp(field.string)
elif field.type == 'time':
obj = fields.Time(field.string)
elif field.type == 'timedelta':
obj = fields.TimeDelta(field.string)
elif field.type == 'many2one':
obj = fields.Many2One(field.related_model.model, field.string)
elif field.type in ('binary', 'icon'):
obj = fields.Binary(field.string)
elif field.type == 'selection':
selection = [tuple(v.split(':', 1))
for v in field.selection.splitlines() if v]
obj = fields.Selection(selection, field.string)
obj.name = field.name
res[field.name] = obj
obj = fields.Integer('ID')
obj.name = 'id'
res['id'] = obj
obj = fields.Many2One('lims.notebook.line', 'Notebook Line')
obj.name = 'notebook_line'
obj.readonly = True
res['notebook_line'] = obj
obj = fields.Many2One('lims.interface.data', 'Data')
obj.name = 'data'
obj.readonly = True
res['data'] = obj
obj = fields.Integer('Iteration')
obj.name = 'iteration'
obj.readonly = True
res['iteration'] = obj
return res
class ModelAccess(metaclass=PoolMeta):
__name__ = 'ir.model.access'
@classmethod
def check_relation(cls, model_name, field_name, mode='read'):
'''
We must override check_relation and ensure that super() does not
execute:
getattr(Model, fieldname)
because the fields do not exist in the Model. If super() used
Model._fields[fieldname] we would not be forced to override the method.
'''
if model_name in ('lims.interface.data',
'lims.interface.grouped_data'):
return True
return super().check_relation(model_name, field_name, mode)
class Data(ModelSQL, ModelView):
'Lims Interface Data'
__name__ = 'lims.interface.data'
compilation = fields.Many2One('lims.interface.compilation', 'Compilation',
required=True, ondelete='CASCADE')
annulled = fields.Boolean('Annulled')
notebook_line = fields.Many2One('lims.notebook.line', 'Notebook Line',
readonly=True)
@classmethod
def __setup__(cls):
super().__setup__()
cls._order.insert(0, ('notebook_line.notebook', 'ASC'))
cls._order.insert(1, ('notebook_line.analysis.order', 'ASC'))
cls._order.insert(2, ('notebook_line.analysis.code', 'ASC'))
cls.__rpc__['fields_view_get'].cache = None
cls.__rpc__['default_get'].cache = None
@classmethod
def __post_setup__(cls):
super().__post_setup__()
cls._previous_fields = cls._fields
cls._fields = Adapter()
cls._record = data_record('lims.interface.data._record',
cls._fields.keys())
@classmethod
def __table__(cls):
# TODO: Check if we can drop create(), read(), write(), delete() &
# search()
return cls.get_sql_table()
def __init__(self, id=None, **kwargs):
_ids = kwargs.pop('_ids', None)
_local_cache = kwargs.pop('_local_cache', None)
_transaction_cache = kwargs.pop('_transaction_cache', None)
transaction = kwargs.pop('_transaction', None)
kwargs_copy = kwargs.copy()
for kw in kwargs_copy:
kwargs.pop(kw, None)
kwargs['_ids'] = _ids
kwargs['_local_cache'] = LRUDictTransaction(model_cache_size(),
data_record('lims.interface.data._record', self._fields.keys()))
kwargs['_transaction_cache'] = _transaction_cache
kwargs['_transaction'] = transaction
super().__init__(id, **kwargs)
self._values = {}
for kw in kwargs_copy:
self._values[kw] = kwargs_copy[kw]
def __getattr__(self, name):
try:
return super().__getattr__(name)
except AttributeError:
pass
def on_change_with(self, fieldnames=[], single=False):
table = self.get_table()
res = {}
grouped_fields = defaultdict(list)
for field in table.fields_:
if field.group:
grouped_fields[field.group].append(field.name)
for field in table.fields_:
if field.name not in fieldnames:
continue
ast = field.get_ast()
inputs = []
for input_ in field.inputs.split():
found = False
for group, repetition_fields in grouped_fields.items():
if input_ in repetition_fields:
group_values = getattr(self, 'group_%s' % group)
if not group_values:
continue
iter_found = False
for line in group_values:
if line['iteration'] == int(
input_.split('_')[-1:][0]):
inputs.append(line[
'_'.join(input_.split('_')[:-1])])
iter_found = True
if not iter_found:
inputs.append(None)
found = True
if not found:
inputs.append(getattr(self, input_))
try:
value = ast(*inputs)
except schedula.utils.exc.DispatcherError as e:
raise UserError(e.args[0] % e.args[1:])
if isinstance(value, list):
value = str(value)
elif not isinstance(value, ALLOWED_RESULT_TYPES):
value = value.tolist()
if isinstance(value, formulas.tokens.operand.XlError):
value = None
elif isinstance(value, list):
for x in chain(*value):
if isinstance(x, formulas.tokens.operand.XlError):
value = None
res[field.name] = value
if single and len(fieldnames) == 1:
return res[fieldnames[0]]
return res
@classmethod
def add_on_change_with_method(cls, field_name):
"""
Dynamically add 'on_change_with_<field_name>' methods.
"""
fn_name = 'on_change_with_' + field_name
def fn(self, name=None):
return self.on_change_with([field_name], True)
setattr(cls, fn_name, fn)
@classmethod
def _get_readonly_notebook_lines(cls):
readonly_ids = []
compilation_id = Transaction().context.get(
'lims_interface_compilation', None)
if not compilation_id:
return readonly_ids
for line in cls.search([('compilation', '=', compilation_id)]):
if not line.notebook_line:
continue
if line.notebook_line.end_date or line.notebook_line.annulled:
readonly_ids.append(line.notebook_line.id)
return readonly_ids
def set_field(self, value, field):
cursor = Transaction().connection.cursor()
try:
table = self.get_sql_table()
query = table.update([SqlColumn(table, field)], [value],
where=(table.id == self.id))
cursor.execute(*query)
except Exception:
pass
@classmethod
def fields_get(cls, fields_names=None, level=0):
pool = Pool()
Model = pool.get('ir.model')
GroupedData = pool.get('lims.interface.grouped_data')
res = super().fields_get(fields_names)
table = cls.get_table()
interface = cls.get_interface()
readonly_ids = []
readonly = Transaction().context.get('lims_interface_readonly', False)
if not readonly:
readonly_ids = cls._get_readonly_notebook_lines()
encoder = PYSONEncoder()
groups = 0
grouped_fields = defaultdict(list)
for field in table.fields_:
if field.group:
grouped_fields[field.group].append(field.name)
for field in table.fields_:
groups = max(groups, field.group or 0)
if field.group:
continue
states = {
'readonly': (bool(readonly or field.formula or field.readonly)
or Eval('notebook_line').in_(readonly_ids)),
}
res[field.name] = {
'context': encoder.encode(Transaction().context),
'name': field.name,
'string': field.string,
'type': FIELD_TYPE_TRYTON[field.type],
'help': field.help,
'domain': field.domain,
'states': encoder.encode(states),
'sortable': True,
}
if field.type == 'many2one':
res[field.name]['relation'] = (field.related_model.model if
field.related_model else None)
if field.type == 'selection':
selection = [tuple(v.split(':', 1))
for v in field.selection.splitlines() if v]
res[field.name]['selection'] = selection
res[field.name]['selection_change_with'] = []
res[field.name]['sort'] = False
if field.type == 'reference':
selection = []
for model in Model.search([]):
selection.append((model.model, model.name))
res[field.name]['selection'] = selection
if field.type in ['date', 'time', 'datetime', 'timestamp']:
res[field.name]['format'] = PYSONEncoder().encode(
'%H:%M:%S.%f')
if field.type in ['float', 'numeric']:
res[field.name]['digits'] = encoder.encode((16, field.digits))
if field.inputs:
inputs = []
for input_ in field.inputs.split():
found = False
for group, repetition_fields in grouped_fields.items():
if input_ in repetition_fields:
inputs.append('group_%s' % group)
found = True
if not found:
inputs.append(input_)
res[field.name]['on_change_with'] = list(set(inputs))
cls.add_on_change_with_method(field.name)
func_name = '%s_%s' % ('on_change_with', field.name)
cls.__rpc__.setdefault(func_name, RPC(instantiate=0))
for i in range(0, groups):
field_description = None
for rep in interface.grouped_repetitions:
if rep.group == i + 1:
field_description = rep.description
field_name = 'group_%s' % (i + 1)
res[field_name] = {
'name': field_name,
'string': field_description or field_name,
'type': 'one2many',
'help': '',
'relation': 'lims.interface.grouped_data',
'relation_field': 'data',
}
res[field_name]['views'] = {
'tree': GroupedData.fields_view_get(
view_type='tree', level=i + 1)}
#func_name = '%s_%s' % ('on_change_with', field_name)
#cls.__rpc__.setdefault(func_name, RPC(instantiate=0))
return res
@classmethod
def fields_view_get(cls, view_id=None, view_type='form', level=None):
if Pool().test:
return
table = cls.get_table()
for view in table.views:
if view.type == view_type:
break
assert(view.id)
fields_names = [
'compilation',
'annulled',
'notebook_line',
]
groups = 0
for field in table.fields_:
groups = max(groups, field.group or 0)
if field.group and view.type == 'form':
continue
fields_names.append(field.name)
for i in range(0, groups):
fields_names.append('group_%s' % (i + 1))
res = {
'type': view.type,
'view_id': view_id,
'field_childs': None,
'arch': view.arch,
'fields': cls.fields_get(fields_names),
'model': cls.__name__,
}
return res
@classmethod
def search(cls, domain, offset=0, limit=None, order=None, count=False,
query=False):
cursor = Transaction().connection.cursor()
# Clean transaction cache
for cache in Transaction().cache.values():
if cls.__name__ in cache:
del cache[cls.__name__]
if not cls.get_table():
return super().search(domain, offset, limit, order, count, query)
# Get domain clauses
sql_table = cls.get_sql_table()
tables, expression = cls.search_domain(domain,
tables={None: (sql_table, None)})
# Get order by
order_by = []
order_types = {
'DESC': Desc,
'ASC': Asc,
}
null_ordering_types = {
'NULLS FIRST': NullsFirst,
'NULLS LAST': NullsLast,
None: lambda _: _
}
if order is None or order is False:
order = cls._order
for oexpr, otype in order:
fname, _, extra_expr = oexpr.partition('.')
field = cls._fields[fname]
if not otype:
otype, null_ordering = 'ASC', None
else:
otype = otype.upper()
try:
otype, null_ordering = otype.split(' ', 1)
except ValueError:
null_ordering = None
Order = order_types[otype]
NullOrdering = null_ordering_types[null_ordering]
forder = field.convert_order(oexpr, tables, cls)
order_by.extend((NullOrdering(Order(o)) for o in forder))
main_table, _ = tables[None]
table = convert_from(None, tables)
if count:
cursor.execute(*table.select(Count(Literal('*')),
where=expression, limit=limit, offset=offset))
return cursor.fetchone()[0]
columns = [main_table.id]
select = table.select(*columns,
where=expression, order_by=order_by, limit=limit, offset=offset)
if query:
return select
cursor.execute(*select)
res = [x[0] for x in cursor.fetchall()]
return cls.browse(res)
@classmethod
def read(cls, ids, fields_names=None):
sql_table = cls.get_sql_table()
table = cls.get_table()
if not ids:
return []
def read_related(field_name, Target, rows, fields):
target_ids = []
for row in rows:
value = row[field_name]
if value is not None:
target_ids.append(value)
return Target.read(target_ids, fields)
def add_related(field_name, rows, targets):
'''
Adds 'id' and 'rec_name' of many2one/related_model fields
Also adds 'rec_name' for the rows
'''
key = field_name + '.'
for row in rows:
value = row[field_name]
if isinstance(value, str):
value = int(value.split(',', 1)[1])
if value is not None and value >= 0:
row[key] = targets[value]
if 'rec_name' in targets[value]:
row['rec_name'] = targets[value]['rec_name']
else:
row[key] = None
if 'rec_name' not in row:
row['rec_name'] = str(row['id'])
cursor = Transaction().connection.cursor()
cursor.execute(*sql_table.select(where=sql_table.id.in_(ids)))
fetchall = list(cursor_dict(cursor))
fields_related = {
'compilation': 'lims.interface.compilation',
'notebook_line': 'lims.notebook.line'
}
for f in table.fields_:
if f.related_model is not None:
fields_related[f.name] = f.related_model.model
for field in fields_related:
Target = Pool().get(fields_related[field])
if Target:
targets = read_related(
field, Target, fetchall, ['id', 'rec_name'])
targets = {t['id']: t for t in targets}
else:
targets = {}
add_related(field, fetchall, targets)
to_cast = {}
for field in table.fields_:
if fields_names and field.name not in fields_names:
continue
cast = FIELD_TYPE_CAST[field.type]
if cast:
to_cast[field.name] = cast
if to_cast:
for record in fetchall:
for field, cast in to_cast.items():
record[field] = cast(record[field])
return fetchall
@classmethod
def create(cls, vlist):
sql_table = cls.get_sql_table()
cursor = Transaction().connection.cursor()
ids = []
for record in vlist:
fields = []
values = []
for key, value in record.items():
fields.append(SqlColumn(sql_table, key))
values.append(value)
query = sql_table.insert(fields, values=[values],
returning=[sql_table.id])
cursor.execute(*query)
ids.append(cursor.fetchone()[0])
records = cls.browse(ids)
cls.update_formulas(records)
return records
@classmethod
def write(cls, *args):
sql_table = cls.get_sql_table()
cursor = Transaction().connection.cursor()
all_records = []
actions = iter(args)
for records, vals in zip(actions, actions):
all_records += records
fields = []
values = []
for key, value in vals.items():
fields.append(SqlColumn(sql_table, key))
values.append(value)
query = sql_table.update(fields, values,
where=sql_table.id.in_([x.id for x in records]))
cursor.execute(*query)
cls.update_formulas(all_records)
@classmethod
def update_formulas(cls, records=None):
cursor = Transaction().connection.cursor()
pool = Pool()
Compilation = Pool().get('lims.interface.compilation')
TableField = pool.get('lims.interface.table.field')
Column = pool.get('lims.interface.column')
compilation_id = Transaction().context.get(
'lims_interface_compilation')
if compilation_id:
compilation = Compilation(compilation_id)
table = compilation.table
interface = compilation.interface
sql_table = SqlTable(table.name)
else:
table = cls.get_table()
sql_table = cls.get_sql_table()
interface = cls.get_interface()
formula_fields = []
fields = TableField.search([
('table', '=', table),
('formula', 'not in', [None, '']),
])
for field in fields:
col = Column.search([
('interface', '=', interface),
('alias', '=', field.name),
])
order = col and col[0].evaluation_order or 0
formula_fields.append({
'order': order,
'field': field,
})
if not formula_fields:
return
formula_fields = sorted(formula_fields, key=lambda x: x['order'])
if not records:
records = cls.search([])
for record in records:
vals = {}
fields = []
values = []
for field in formula_fields:
for x in (field['field'].inputs or '').split():
if x not in vals:
vals[x] = getattr(record, x)
field_name = field['field'].name
value = record.get_formula_value(field['field'], vals)
if value is None:
continue
fields.append(SqlColumn(sql_table, field_name))
values.append(value)
vals[field_name] = value
if not values:
continue
query = sql_table.update(fields, values,
where=(sql_table.id == record.id))
cursor.execute(*query)
def get_formula_value(self, field, vals={}):
ast = field.get_ast()
inputs = []
for x in (field.inputs or '').split():
inputs.append(vals.get(x))
try:
value = ast(*inputs)
except schedula.utils.exc.DispatcherError as e:
raise UserError(e.args[0] % e.args[1:])
if isinstance(value, list):
value = str(value)
elif not isinstance(value, ALLOWED_RESULT_TYPES):
value = value.tolist()
if isinstance(value, formulas.tokens.operand.XlError):
value = None
elif isinstance(value, list):
for x in chain(*value):
if isinstance(x, formulas.tokens.operand.XlError):
value = None
return value
@classmethod
def delete(cls, records):
sql_table = cls.get_sql_table()
cursor = Transaction().connection.cursor()
ids = [x.id for x in records if x.id > 0]
if ids:
query = sql_table.delete(where=sql_table.id.in_(ids))
cursor.execute(*query)
@classmethod
def copy(cls, records, default=None):
records = cls.read([x.id for x in records if x.id])
for record in records:
del record['id']
del record['notebook_line']
return cls.create(records)
@classmethod
def get_compilation(cls):
Compilation = Pool().get('lims.interface.compilation')
compilation_id = Transaction().context.get(
'lims_interface_compilation')
if compilation_id:
return Compilation(compilation_id)
@classmethod
def get_table(cls):
pool = Pool()
Interface = pool.get('lims.interface')
Table = pool.get('lims.interface.table')
if Pool().test:
# Tryton default tests try to get data using '1' as active_id
# We prevent the tests from failing by returning no table
return
table = Transaction().context.get('lims_interface_table')
if not table:
compilation = cls.get_compilation()
if compilation:
table = compilation.table
if (not table and
Transaction().context.get('active_model') == 'lims.interface'):
interface_id = Transaction().context.get('active_id', None)
if interface_id:
interface = Interface(interface_id)
table = interface.table and interface.table.id or None
if table:
return Table(table)
@classmethod
def get_sql_table(cls):
table = cls.get_table()
if table:
return SqlTable(table.name)
return super().__table__()
@classmethod
def get_interface(cls):
pool = Pool()
Compilation = pool.get('lims.interface.compilation')
Interface = pool.get('lims.interface')
compilation_id = Transaction().context.get(
'lims_interface_compilation')
if compilation_id:
return Compilation(compilation_id).interface
interface_id = (Transaction().context.get(
'active_model') == 'lims.interface' and
Transaction().context.get('active_id', None) or None)
if interface_id:
return Interface(interface_id)
return None
class GroupedData(ModelView):
'Grouped Data'
__name__ = 'lims.interface.grouped_data'
notebook_line = fields.Many2One('lims.notebook.line', 'Notebook Line',
readonly=True)
data = fields.Many2One('lims.inteface.data', 'Data',
readonly=True)
iteration = fields.Integer('Iteration', readonly=True)
@classmethod
def __setup__(cls):
super().__setup__()
cls.__rpc__['fields_view_get'].cache = None
cls.__rpc__['default_get'].cache = None
@classmethod
def __post_setup__(cls):
super().__post_setup__()
cls._previous_fields = cls._fields
cls._fields = GroupedAdapter()
def __init__(self, id=None, **kwargs):
kwargs_copy = kwargs.copy()
for kw in kwargs_copy:
kwargs.pop(kw, None)
super().__init__(id, **kwargs)
self._values = {}
for kw in kwargs_copy:
self._values[kw] = kwargs_copy[kw]
def __getattr__(self, name):
try:
return super().__getattr__(name)
except AttributeError:
pass
def on_change_with(self, fieldnames=[], single=False):
table = self.get_table()
res = {}
for field in table.grouped_fields_:
if field.name not in fieldnames:
continue
ast = field.get_ast()
inputs = field.get_inputs().split()
inputs = [self.data.get(x) if x in self.data.keys()
else getattr(self, x) for x in inputs]
try:
value = ast(*inputs)
except schedula.utils.exc.DispatcherError as e:
raise UserError(e.args[0] % e.args[1:])
if isinstance(value, list):
value = str(value)
elif not isinstance(value, ALLOWED_RESULT_TYPES):
value = value.tolist()
if isinstance(value, formulas.tokens.operand.XlError):
value = None
elif isinstance(value, list):
for x in chain(*value):
if isinstance(x, formulas.tokens.operand.XlError):
value = None
res[field.name] = value
if single and len(fieldnames) == 1:
return res[fieldnames[0]]
return res
@classmethod
def add_on_change_with_method(cls, field_name):
"""
Dynamically add 'on_change_with_<field_name>' methods.
"""
fn_name = 'on_change_with_' + field_name
def fn(self, name=None):
return self.on_change_with([field_name], True)
setattr(cls, fn_name, fn)
@classmethod
def fields_get(cls, fields_names=None, group=0, level=0):
pool = Pool()
Model = pool.get('ir.model')
Data = pool.get('lims.interface.data')
res = super().fields_get(fields_names)
table = cls.get_table()
readonly = Transaction().context.get('lims_interface_readonly', False)
encoder = PYSONEncoder()
for field in table.grouped_fields_:
if field.group != group:
continue
res[field.name] = {
'context': encoder.encode(Transaction().context),
'name': field.name,
'string': field.string,
'type': FIELD_TYPE_TRYTON[field.type],
'readonly': bool(readonly or field.formula or field.readonly),
'help': field.help,
'domain': field.domain,
'states': '{}',
}
if field.type == 'many2one':
res[field.name]['relation'] = (field.related_model.model if
field.related_model else None)
if field.type == 'selection':
selection = [tuple(v.split(':', 1))
for v in field.selection.splitlines() if v]
res[field.name]['selection'] = selection
res[field.name]['selection_change_with'] = []
res[field.name]['sort'] = False
if field.type == 'reference':
selection = []
for model in Model.search([]):
selection.append((model.model, model.name))
res[field.name]['selection'] = selection
if field.type in ['date', 'time', 'datetime', 'timestamp']:
res[field.name]['format'] = PYSONEncoder().encode(
'%H:%M:%S.%f')
if field.type in ['float', 'numeric']:
res[field.name]['digits'] = encoder.encode((16, field.digits))
if field.inputs:
res[field.name]['on_change_with'] = field.inputs.split() + [
'data']
cls.add_on_change_with_method(field.name)
func_name = '%s_%s' % ('on_change_with', field.name)
cls.__rpc__.setdefault(func_name, RPC(instantiate=0))
res['data'] = {
'name': 'data',
'string': 'Data',
'type': 'many2one',
'readonly': True,
'help': '',
'states': '{}',
'relation': 'lims.interface.data',
'relation_field': 'group_%s' % group,
'relation_fields': (Data.fields_get(level=level - 1)
if level > 0 else []),
}
return res
@classmethod
def fields_view_get(cls, view_id=None, view_type='form', level=0):
if Pool().test:
return
table = cls.get_table()
for view in table.grouped_views:
if view.type == view_type and view.group == level:
break
assert(view.id)
fields_names = [
'notebook_line',
'data',
'iteration',
]
for field in table.grouped_fields_:
if field.group != level:
continue
fields_names.append(field.name)
res = {
'type': view.type,
'view_id': view_id,
'field_childs': None,
'arch': view.arch,
'fields': cls.fields_get(fields_names, level),
'model': cls.__name__,
}
return res
@classmethod
def get_compilation(cls):
Compilation = Pool().get('lims.interface.compilation')
compilation_id = Transaction().context.get(
'lims_interface_compilation')
if compilation_id:
return Compilation(compilation_id)
@classmethod
def get_table(cls):
pool = Pool()
Interface = pool.get('lims.interface')
Table = pool.get('lims.interface.table')
if Pool().test:
# Tryton default tests try to get data using '1' as active_id
# We prevent the tests from failing by returning no table
return
table = Transaction().context.get('lims_interface_table')
if not table:
compilation = cls.get_compilation()
if compilation:
table = compilation.table
if (not table and
Transaction().context.get('active_model') == 'lims.interface'):
interface_id = Transaction().context.get('active_id', None)
if interface_id:
interface = Interface(interface_id)
table = interface.table and interface.table.id or None
if table:
return Table(table)