kalenislims/lims_analysis_sheet/notebook.py

1932 lines
71 KiB
Python

# This file is part of lims_analysis_sheet module for Tryton.
# The COPYRIGHT file at the top level of this repository contains
# the full copyright notices and license terms.
import formulas
import schedula
from datetime import datetime
from itertools import chain
from collections import defaultdict
#from dateutil.relativedelta import relativedelta
from trytond.model import ModelSQL, ModelView, fields
from trytond.wizard import Wizard, StateTransition, StateView, Button
from trytond.pool import Pool, PoolMeta
from trytond.pyson import Eval, Bool, Or, And
from trytond.transaction import Transaction
from trytond.rpc import RPC
from trytond.exceptions import UserError
from trytond.i18n import gettext
from trytond.modules.lims.formula_parser import FormulaParser
from trytond.modules.lims_interface.data import ALLOWED_RESULT_TYPES
class NotebookLine(metaclass=PoolMeta):
__name__ = 'lims.notebook.line'
analysis_sheet = fields.Many2One('lims.analysis_sheet', 'Analysis Sheet',
readonly=True)
def get_analysis_sheet_template(self):
cursor = Transaction().connection.cursor()
pool = Pool()
Template = pool.get('lims.template.analysis_sheet')
TemplateAnalysis = pool.get('lims.template.analysis_sheet.analysis')
# Analysis + Method + Product type + Matrix
cursor.execute('SELECT t.id '
'FROM "' + Template._table + '" t '
'INNER JOIN "' + TemplateAnalysis._table + '" ta '
'ON t.id = ta.template '
'WHERE t.active IS TRUE '
'AND ta.analysis = %s '
'AND ta.method = %s '
'AND ta.product_type = %s '
'AND ta.matrix = %s',
(self.analysis.id, self.method.id, self.product_type.id,
self.matrix.id))
template = cursor.fetchone()
if template:
return template[0]
# Analysis + Method + Product type
cursor.execute('SELECT t.id '
'FROM "' + Template._table + '" t '
'INNER JOIN "' + TemplateAnalysis._table + '" ta '
'ON t.id = ta.template '
'WHERE t.active IS TRUE '
'AND ta.analysis = %s '
'AND ta.method = %s '
'AND ta.product_type = %s '
'AND ta.matrix IS NULL',
(self.analysis.id, self.method.id, self.product_type.id))
template = cursor.fetchone()
if template:
return template[0]
# Analysis + Method + Matrix
cursor.execute('SELECT t.id '
'FROM "' + Template._table + '" t '
'INNER JOIN "' + TemplateAnalysis._table + '" ta '
'ON t.id = ta.template '
'WHERE t.active IS TRUE '
'AND ta.analysis = %s '
'AND ta.method = %s '
'AND ta.product_type IS NULL '
'AND ta.matrix = %s',
(self.analysis.id, self.method.id, self.matrix.id))
template = cursor.fetchone()
if template:
return template[0]
# Analysis + Product type
cursor.execute('SELECT t.id '
'FROM "' + Template._table + '" t '
'INNER JOIN "' + TemplateAnalysis._table + '" ta '
'ON t.id = ta.template '
'WHERE t.active IS TRUE '
'AND ta.analysis = %s '
'AND ta.method IS NULL '
'AND ta.product_type = %s '
'AND ta.matrix IS NULL',
(self.analysis.id, self.product_type.id))
template = cursor.fetchone()
if template:
return template[0]
# Analysis + Matrix
cursor.execute('SELECT t.id '
'FROM "' + Template._table + '" t '
'INNER JOIN "' + TemplateAnalysis._table + '" ta '
'ON t.id = ta.template '
'WHERE t.active IS TRUE '
'AND ta.analysis = %s '
'AND ta.method IS NULL '
'AND ta.product_type IS NULL '
'AND ta.matrix = %s',
(self.analysis.id, self.matrix.id))
template = cursor.fetchone()
if template:
return template[0]
# Analysis + Method
cursor.execute('SELECT t.id '
'FROM "' + Template._table + '" t '
'INNER JOIN "' + TemplateAnalysis._table + '" ta '
'ON t.id = ta.template '
'WHERE t.active IS TRUE '
'AND ta.analysis = %s '
'AND ta.method = %s '
'AND ta.product_type IS NULL '
'AND ta.matrix IS NULL',
(self.analysis.id, self.method.id))
template = cursor.fetchone()
if template:
return template[0]
# Analysis
cursor.execute('SELECT t.id '
'FROM "' + Template._table + '" t '
'INNER JOIN "' + TemplateAnalysis._table + '" ta '
'ON t.id = ta.template '
'WHERE t.active IS TRUE '
'AND ta.analysis = %s '
'AND ta.method IS NULL '
'AND ta.product_type IS NULL '
'AND ta.matrix IS NULL',
(self.analysis.id,))
template = cursor.fetchone()
if template:
return template[0]
return None
@classmethod
def write(cls, *args):
super().write(*args)
actions = iter(args)
for lines, vals in zip(actions, actions):
if 'annulled' in vals:
cls.update_analysis_sheet_line(lines, vals['annulled'])
@staticmethod
def update_analysis_sheet_line(nb_lines, annulled):
pool = Pool()
AnalysisSheet = pool.get('lims.analysis_sheet')
Data = pool.get('lims.interface.data')
for nb_line in nb_lines:
template_id = nb_line.get_analysis_sheet_template()
if not template_id:
continue
sheets = AnalysisSheet.search([
('template', '=', template_id),
('state', 'in', ['draft', 'active', 'validated'])
], order=[('id', 'DESC')])
for s in sheets:
with Transaction().set_context(
lims_interface_table=s.compilation.table.id):
lines = Data.search([
('compilation', '=', s.compilation.id),
('notebook_line', '=', nb_line.id),
], limit=1)
if not lines:
continue
Data.write(lines, {'annulled': annulled})
break
@fields.depends('end_date', 'analysis', 'method')
def on_change_end_date(self):
if not self.end_date:
return
if self.analysis_sheet:
return
template_id = self.get_analysis_sheet_template()
if not template_id:
return
pool = Pool()
AnalysisSheet = pool.get('lims.analysis_sheet')
Data = pool.get('lims.interface.data')
sheets = AnalysisSheet.search([
('template', '=', template_id),
('state', 'in', ['draft', 'active', 'validated'])
], order=[('id', 'DESC')])
for s in sheets:
with Transaction().set_context(
lims_interface_table=s.compilation.table.id):
lines = Data.search([
('compilation', '=', s.compilation.id),
('notebook_line', '=', self.id),
], limit=1)
if lines:
self.end_date = None
return
class AddControlStart(ModelView):
'Add Controls'
__name__ = 'lims.analysis_sheet.add_control.start'
analysis_sheet = fields.Many2One('lims.analysis_sheet', 'Analysis Sheet')
type = fields.Selection([
('con', 'CON'),
('rm', 'RM'),
('bmz', 'BMZ'),
], 'Control type', sort=False, required=True)
con_type = fields.Selection([
('exist', 'Existing'),
('coi', 'COI'),
('mrc', 'MRC'),
('sla', 'SLA'),
('itc', 'ITC'),
('itl', 'ITL'),
], 'Type', sort=False,
depends=['type'], states={
'required': Eval('type') == 'con',
'invisible': Eval('type') != 'con',
})
rm_bmz_type = fields.Selection([
('exist', 'Existing'),
('sla', 'SLA'),
], 'Type', sort=False,
depends=['type'], states={
'required': Eval('type').in_(['rm', 'bmz']),
'invisible': ~Eval('type').in_(['rm', 'bmz']),
})
original_fraction = fields.Many2One('lims.fraction',
'Original/Reference Fraction',
required=True, domain=[('id', 'in', Eval('fraction_domain'))],
depends=['fraction_domain'])
fraction_domain = fields.Function(fields.One2Many('lims.fraction',
None, 'Fraction domain'), 'on_change_with_fraction_domain')
label = fields.Char('Label', depends=['type', 'con_type', 'rm_bmz_type'],
states={'readonly': Or(
And(Eval('type') == 'con',
Eval('con_type') == 'exist'),
And(Eval('type').in_(['rm', 'bmz']),
Eval('rm_bmz_type') == 'exist')),
})
concentration_level = fields.Many2One('lims.concentration.level',
'Concentration level',
states={'invisible': Bool(Eval('concentration_level_invisible'))},
depends=['concentration_level_invisible'])
concentration_level_invisible = fields.Boolean(
'Concentration level invisible')
quantity = fields.Integer('Quantity', required=True,
depends=['type', 'con_type', 'rm_bmz_type'],
states={'readonly': Or(
And(Eval('type') == 'con',
Eval('con_type') == 'exist'),
And(Eval('type').in_(['rm', 'bmz']),
Eval('rm_bmz_type') == 'exist')),
})
@fields.depends('type')
def on_change_with_concentration_level_invisible(self, name=None):
Config = Pool().get('lims.configuration')
config = Config(1)
if self.type == 'con':
if (config.con_fraction_type and
config.con_fraction_type.control_charts):
return False
elif self.type == 'rm':
if (config.rm_fraction_type and
config.rm_fraction_type.control_charts):
return False
elif self.type == 'bmz':
if (config.bmz_fraction_type and
config.bmz_fraction_type.control_charts):
return False
return True
@fields.depends('type', 'con_type', 'rm_bmz_type', 'analysis_sheet',
'_parent_analysis_sheet.template')
def on_change_with_fraction_domain(self, name=None):
cursor = Transaction().connection.cursor()
pool = Pool()
Fraction = pool.get('lims.fraction')
NotebookLine = pool.get('lims.notebook.line')
Notebook = pool.get('lims.notebook')
if not self.type:
return []
special_type = ''
existing = False
if self.type == 'con':
if not self.con_type:
return []
special_type = 'con' if self.con_type == 'exist' else self.con_type
existing = (self.con_type == 'exist')
elif self.type == 'rm':
if not self.rm_bmz_type or self.rm_bmz_type == 'noref':
return []
special_type = 'sla' if self.rm_bmz_type == 'sla' else self.type
existing = (self.rm_bmz_type == 'exist')
elif self.type == 'bmz':
if not self.rm_bmz_type or self.rm_bmz_type == 'noref':
return []
special_type = 'sla' if self.rm_bmz_type == 'sla' else self.type
existing = (self.rm_bmz_type == 'exist')
controls_allowed = (self.analysis_sheet.template.controls_allowed or
['0'])
if special_type not in controls_allowed:
return []
t_analysis_ids = [ta.analysis.id
for ta in self.analysis_sheet.template.analysis]
stored_fractions_ids = Fraction.get_stored_fractions()
clause = [
('notebook.fraction.special_type', '=', special_type),
('notebook.fraction.id', 'in', stored_fractions_ids),
('analysis', 'in', t_analysis_ids),
]
if existing:
#deadline = datetime.now() - relativedelta(days=5)
clause.extend([
('result', 'in', (None, '')),
('end_date', '=', None),
('annulment_date', '=', None),
#('notebook.fraction.sample.date2', '>=', deadline),
])
notebook_lines = NotebookLine.search(clause)
if not notebook_lines:
return []
notebook_lines_ids = ', '.join(str(nl.id) for nl in notebook_lines)
cursor.execute('SELECT DISTINCT(n.fraction) '
'FROM "' + Notebook._table + '" n '
'INNER JOIN "' + NotebookLine._table + '" nl '
'ON nl.notebook = n.id '
'WHERE nl.id IN (' + notebook_lines_ids + ')')
return [x[0] for x in cursor.fetchall()]
@fields.depends('type', 'con_type', 'rm_bmz_type', 'original_fraction',
'concentration_level', '_parent_original_fraction.label',
'_parent_concentration_level.description')
def on_change_with_label(self, name=None):
Date = Pool().get('ir.date')
if self.type == 'con':
if self.con_type == 'exist':
return ''
label = ''
if self.original_fraction:
label += '%s' % self.original_fraction.label
if self.concentration_level:
label += ' (%s)' % self.concentration_level.description
elif self.type == 'rm':
if self.rm_bmz_type == 'exist':
return ''
label = 'RM'
if self.concentration_level:
label += ' (%s)' % self.concentration_level.description
if self.rm_bmz_type == 'sla':
if self.original_fraction:
label += ' %s' % self.original_fraction.label
elif self.type == 'bmz':
if self.rm_bmz_type == 'exist':
return ''
label = 'BMZ'
if self.rm_bmz_type == 'sla':
if self.original_fraction:
label += ' %s' % self.original_fraction.label
else:
return ''
label += ' %s' % str(Date.today())
return label
@fields.depends('type', 'con_type', 'rm_bmz_type', 'quantity')
def on_change_with_quantity(self, name=None):
if ((self.type == 'con' and self.con_type == 'exist') or
(self.type in ('rm', 'bmz') and self.rm_bmz_type == 'exist')):
return 1
return self.quantity
class AddControl(Wizard):
'Add Controls'
__name__ = 'lims.analysis_sheet.add_control'
start_state = 'check'
check = StateTransition()
start = StateView('lims.analysis_sheet.add_control.start',
'lims_analysis_sheet.analysis_sheet_add_control_start_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Add', 'add', 'tryton-ok', default=True),
])
add = StateTransition()
def _get_analysis_sheet_id(self):
return Transaction().context.get('lims_analysis_sheet', None)
def transition_check(self):
AnalysisSheet = Pool().get('lims.analysis_sheet')
sheet_id = self._get_analysis_sheet_id()
if sheet_id:
sheet = AnalysisSheet(sheet_id)
if sheet.state in ('active', 'validated'):
return 'start'
return 'end'
def default_start(self, fields):
defaults = {
'analysis_sheet': self._get_analysis_sheet_id(),
'concentration_level_invisible': True,
'quantity': 1,
}
return defaults
def transition_add(self):
fractions = [self.start.original_fraction]
if ((self.start.type == 'con' and
self.start.con_type != 'exist') or
(self.start.type in ('rm', 'bmz') and
self.start.rm_bmz_type != 'exist')):
fractions = self.create_control()
self.add_to_analysis_sheet(fractions)
return 'end'
def create_control(self):
pool = Pool()
Config = pool.get('lims.configuration')
LabWorkYear = pool.get('lims.lab.workyear')
Entry = pool.get('lims.entry')
Sample = pool.get('lims.sample')
Fraction = pool.get('lims.fraction')
Service = pool.get('lims.service')
Analysis = pool.get('lims.analysis')
NotebookLine = pool.get('lims.notebook.line')
EntryDetailAnalysis = pool.get('lims.entry.detail.analysis')
config = Config(1)
if self.start.type == 'con':
if not config.con_fraction_type:
raise UserError(gettext('lims.msg_no_con_fraction_type'))
fraction_type = config.con_fraction_type
elif self.start.type == 'rm':
if not config.rm_fraction_type:
raise UserError(gettext('lims.msg_no_rm_fraction_type'))
fraction_type = config.rm_fraction_type
elif self.start.type == 'bmz':
if not config.bmz_fraction_type:
raise UserError(gettext('lims.msg_no_bmz_fraction_type'))
fraction_type = config.bmz_fraction_type
if (fraction_type.control_charts and not
self.start.concentration_level):
raise UserError(gettext('lims.msg_no_concentration_level'))
workyear_id = LabWorkYear.find()
workyear = LabWorkYear(workyear_id)
if not workyear.default_entry_control:
raise UserError(gettext('lims.msg_no_entry_control'))
entry = Entry(workyear.default_entry_control.id)
original_fraction = self.start.original_fraction
original_sample = Sample(original_fraction.sample.id)
obj_description = self._get_obj_description(original_sample)
sample_default = {
'entry': entry.id,
'date': datetime.now(),
'label': self.start.label,
'obj_description': obj_description,
'fractions': [],
}
fraction_default = {
'type': fraction_type.id,
'services': [],
}
if self.start.type == 'con':
fraction_default['con_type'] = self.start.con_type
fraction_default['con_original_fraction'] = original_fraction.id
elif self.start.type == 'rm':
fraction_default['rm_type'] = 'sla'
fraction_default['rm_product_type'] = (
original_sample.product_type.id)
fraction_default['rm_matrix'] = original_sample.matrix.id
fraction_default['rm_original_fraction'] = original_fraction.id
elif self.start.type == 'bmz':
fraction_default['bmz_type'] = 'sla'
fraction_default['bmz_product_type'] = (
original_sample.product_type.id)
fraction_default['bmz_matrix'] = original_sample.matrix.id
fraction_default['bmz_original_fraction'] = original_fraction.id
t_analysis_ids = [ta.analysis.id
for ta in self.start.analysis_sheet.template.analysis]
original_services = []
services = Service.search([
('fraction', '=', original_fraction),
('annulled', '=', False),
])
for service in services:
if Analysis.is_typified(service.analysis,
original_sample.product_type, original_sample.matrix):
original_services.append(service)
res = []
for i in range(0, self.start.quantity):
# new sample
new_sample, = Sample.copy([original_sample],
default=sample_default)
# new fraction
new_fraction, = Fraction.copy([original_fraction],
default={**fraction_default, 'sample': new_sample.id})
# new services
for service in original_services:
method_id = service.method and service.method.id or None
device_id = service.device and service.device.id or None
if service.analysis.type == 'analysis':
original_lines = NotebookLine.search([
('notebook.fraction', '=', original_fraction.id),
('analysis', '=', service.analysis.id),
('repetition', '=', 0),
], limit=1)
original_line = (original_lines and
original_lines[0] or None)
if original_line:
method_id = original_line.method.id
if original_line.device:
device_id = original_line.device.id
service_default = {
'fraction': new_fraction.id,
'method': method_id,
'device': device_id,
}
new_service, = Service.copy([service], default=service_default)
# delete services/details not related to template
to_delete = EntryDetailAnalysis.search([
('service', '=', new_service.id),
('analysis', 'not in', t_analysis_ids),
])
if to_delete:
with Transaction().set_user(0, set_context=True):
EntryDetailAnalysis.delete(to_delete)
if EntryDetailAnalysis.search_count([
('service', '=', new_service.id),
]) == 0:
with Transaction().set_user(0, set_context=True):
Service.delete([new_service])
# confirm fraction: new notebook and stock move
Fraction.confirm([new_fraction])
# Edit notebook lines
if fraction_type.control_charts:
notebook_lines = NotebookLine.search([
('notebook.fraction', '=', new_fraction.id),
])
if notebook_lines:
NotebookLine.write(notebook_lines, {
'concentration_level': (
self.start.concentration_level.id),
})
if self.start.type == 'rm':
notebook_lines = NotebookLine.search([
('notebook.fraction', '=', new_fraction.id),
])
if notebook_lines:
defaults = {
'final_concentration': None,
'final_unit': None,
'detection_limit': None,
'quantification_limit': None,
'lower_limit': None,
'upper_limit': None,
}
if config.rm_start_uom:
defaults['initial_unit'] = config.rm_start_uom.id
NotebookLine.write(notebook_lines, defaults)
res.append(new_fraction)
return res
def _get_obj_description(self, sample):
cursor = Transaction().connection.cursor()
ObjectiveDescription = Pool().get('lims.objective_description')
if not sample.product_type or not sample.matrix:
return None
cursor.execute('SELECT id '
'FROM "' + ObjectiveDescription._table + '" '
'WHERE product_type = %s '
'AND matrix = %s',
(sample.product_type.id, sample.matrix.id))
res = cursor.fetchone()
return res and res[0] or None
def add_to_analysis_sheet(self, fractions):
NotebookLine = Pool().get('lims.notebook.line')
sheet = self.start.analysis_sheet
t_analysis_ids = [ta.analysis.id for ta in sheet.template.analysis]
clause = [
('notebook.fraction', 'in', [f.id for f in fractions]),
('analysis', 'in', t_analysis_ids),
]
if ((self.start.type == 'con' and
self.start.con_type == 'exist') or
(self.start.type in ('rm', 'bmz') and
self.start.rm_bmz_type == 'exist')):
clause.extend([
('result', 'in', (None, '')),
('end_date', '=', None),
('annulment_date', '=', None),
])
notebook_lines = NotebookLine.search(clause)
if notebook_lines:
sheet.create_lines(notebook_lines)
return 'end'
def end(self):
return 'reload'
class RepeatAnalysisStart(ModelView):
'Repeat Analysis'
__name__ = 'lims.analysis_sheet.repeat_analysis.start'
lines = fields.Many2Many(
'lims.analysis_sheet.repeat_analysis.start.line', None, None,
'Lines', required=True, domain=[('id', 'in', Eval('lines_domain'))],
depends=['lines_domain'])
lines_domain = fields.One2Many(
'lims.analysis_sheet.repeat_analysis.start.line', None,
'Lines domain')
annul = fields.Boolean('Annul current lines')
urgent = fields.Boolean('Urgent repetition')
class RepeatAnalysisStartLine(ModelSQL, ModelView):
'Analysis Sheet Line'
__name__ = 'lims.analysis_sheet.repeat_analysis.start.line'
line = fields.Many2One('lims.notebook.line', 'Line')
fraction = fields.Function(fields.Many2One('lims.fraction', 'Fraction'),
'get_line_field', searcher='search_line_field')
analysis = fields.Function(fields.Many2One('lims.analysis', 'Analysis'),
'get_line_field', searcher='search_line_field')
repetition = fields.Function(fields.Integer('Repetition'),
'get_line_field', searcher='search_line_field')
session_id = fields.Integer('Session ID')
@classmethod
def __register__(cls, module_name):
super().__register__(module_name)
cursor = Transaction().connection.cursor()
cursor.execute('DELETE FROM "' + cls._table + '"')
@classmethod
def __setup__(cls):
super().__setup__()
@classmethod
def get_line_field(cls, lines, names):
result = {}
for name in names:
result[name] = {}
if name in ('fraction', 'analysis'):
for l in lines:
field = getattr(l.line, name, None)
result[name][l.id] = field.id if field else None
else:
for l in lines:
result[name][l.id] = getattr(l.line, name, None)
return result
@classmethod
def search_line_field(cls, name, clause):
return [('line.' + name,) + tuple(clause[1:])]
class RepeatAnalysis(Wizard):
'Repeat Analysis'
__name__ = 'lims.analysis_sheet.repeat_analysis'
start_state = 'check'
check = StateTransition()
start = StateView('lims.analysis_sheet.repeat_analysis.start',
'lims_analysis_sheet.analysis_sheet_repeat_analysis_start_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Repeat', 'repeat', 'tryton-ok', default=True),
])
repeat = StateTransition()
def _get_analysis_sheet_id(self):
return Transaction().context.get('lims_analysis_sheet', None)
def transition_check(self):
AnalysisSheet = Pool().get('lims.analysis_sheet')
sheet_id = self._get_analysis_sheet_id()
if sheet_id:
sheet = AnalysisSheet(sheet_id)
if sheet.state in ('active', 'validated'):
return 'start'
return 'end'
def default_start(self, fields):
pool = Pool()
AnalysisSheet = pool.get('lims.analysis_sheet')
Data = pool.get('lims.interface.data')
RepeatAnalysisStartLine = pool.get(
'lims.analysis_sheet.repeat_analysis.start.line')
defaults = {
'annul': False,
'urgent': False,
}
sheet_id = self._get_analysis_sheet_id()
sheet = AnalysisSheet(sheet_id)
to_create = []
selected_lines = []
with Transaction().set_context(
lims_interface_table=sheet.compilation.table.id):
lines = Data.search([('compilation', '=', sheet.compilation.id)])
for line in lines:
nl = line.notebook_line
if not nl:
continue
to_create.append({
'session_id': self._session_id,
'line': nl,
})
if line.id in Transaction().context['active_ids']:
selected_lines.append(line.notebook_line)
lines = RepeatAnalysisStartLine.create(to_create)
defaults['lines_domain'] = [l.id for l in lines]
defaults['lines'] = [l.id for l in lines if l.line in selected_lines]
return defaults
def transition_repeat(self):
pool = Pool()
AnalysisSheet = pool.get('lims.analysis_sheet')
Date = pool.get('ir.date')
NotebookLine = pool.get('lims.notebook.line')
Data = pool.get('lims.interface.data')
sheet_id = self._get_analysis_sheet_id()
sheet = AnalysisSheet(sheet_id)
to_create = []
to_update = []
to_annul = []
date = Date.today()
for sheet_line in self.start.lines:
nline_to_repeat = sheet_line.line
detail_id = nline_to_repeat.analysis_detail.id
defaults = {
'notebook': nline_to_repeat.notebook.id,
'analysis_detail': detail_id,
'service': nline_to_repeat.service.id,
'analysis': nline_to_repeat.analysis.id,
'analysis_origin': nline_to_repeat.analysis_origin,
'urgent': self.start.urgent,
'repetition': nline_to_repeat.repetition + 1,
'laboratory': nline_to_repeat.laboratory.id,
'method': nline_to_repeat.method.id,
'device': (nline_to_repeat.device.id if nline_to_repeat.device
else None),
'initial_concentration': nline_to_repeat.initial_concentration,
'decimals': nline_to_repeat.decimals,
'significant_digits': nline_to_repeat.significant_digits,
'scientific_notation': nline_to_repeat.scientific_notation,
'report': nline_to_repeat.report,
'concentration_level': (nline_to_repeat.concentration_level.id
if nline_to_repeat.concentration_level else None),
'results_estimated_waiting': (
nline_to_repeat.results_estimated_waiting),
'department': nline_to_repeat.department,
'final_concentration': nline_to_repeat.final_concentration,
'initial_unit': (nline_to_repeat.initial_unit.id if
nline_to_repeat.initial_unit else None),
'final_unit': (nline_to_repeat.final_unit.id if
nline_to_repeat.final_unit else None),
'detection_limit': nline_to_repeat.detection_limit,
'quantification_limit': nline_to_repeat.quantification_limit,
'lower_limit': nline_to_repeat.lower_limit,
'upper_limit': nline_to_repeat.upper_limit,
'start_date': date,
}
to_create.append(defaults)
to_update.append(nline_to_repeat)
if self.start.annul:
to_annul.append(nline_to_repeat.id)
notebook_lines = NotebookLine.create(to_create)
if notebook_lines:
sheet.create_lines(notebook_lines)
NotebookLine.write(to_update, {
'report': False,
})
if to_annul:
with Transaction().set_context(
lims_interface_table=sheet.compilation.table.id):
lines = Data.search([
('compilation', '=', sheet.compilation.id),
('notebook_line', 'in', to_annul),
])
Data.write(lines, {'annulled': True})
return 'end'
def end(self):
return 'reload'
class CalculateExpressions(Wizard):
'Calculate Expressions'
__name__ = 'lims.analysis_sheet.calculate_expressions'
start_state = 'check'
check = StateTransition()
calcuate = StateTransition()
def _get_analysis_sheet_id(self):
return Transaction().context.get('lims_analysis_sheet', None)
def transition_check(self):
AnalysisSheet = Pool().get('lims.analysis_sheet')
sheet_id = self._get_analysis_sheet_id()
if sheet_id:
sheet = AnalysisSheet(sheet_id)
if sheet.state in ('active', 'validated'):
return 'calcuate'
return 'end'
def transition_calcuate(self):
pool = Pool()
AnalysisSheet = pool.get('lims.analysis_sheet')
Data = pool.get('lims.interface.data')
sheet_id = self._get_analysis_sheet_id()
sheet = AnalysisSheet(sheet_id)
expressions = {}
for t_analysis in sheet.template.analysis:
if not t_analysis.expressions:
continue
if t_analysis.analysis.id not in expressions:
expressions[t_analysis.analysis.id] = {}
for expression in t_analysis.expressions:
expressions[t_analysis.analysis.id][
expression.column.alias] = expression.expression
if not expressions:
return 'end'
parser = formulas.Parser()
with Transaction().set_context(
lims_interface_table=sheet.compilation.table.id):
lines = Data.search([('compilation', '=', sheet.compilation.id)])
for line in lines:
nl = line.notebook_line
if not nl:
continue
if nl.analysis.id not in expressions:
continue
with Transaction().set_context(
lims_analysis_notebook=nl.notebook.id):
for alias, formula in expressions[nl.analysis.id].items():
if not formula:
continue
ast = parser.ast(formula)[1].compile()
inputs = (' '.join([x for x in ast.inputs])).lower()
inputs = [getattr(line, x) for x in inputs.split()]
try:
value = ast(*inputs)
except schedula.utils.exc.DispatcherError as e:
raise UserError(e.args[0] % e.args[1:])
if isinstance(value, list):
value = str(value)
elif not isinstance(value, ALLOWED_RESULT_TYPES):
value = value.tolist()
if isinstance(value, formulas.tokens.operand.XlError):
value = None
elif isinstance(value, list):
for x in chain(*value):
if isinstance(x,
formulas.tokens.operand.XlError):
value = None
Data.write([line], {alias: value})
return 'end'
def end(self):
return 'reload'
class ResultsVerificationStart(ModelView):
'Results Verification'
__name__ = 'lims.analysis_sheet.results_verification.start'
range_type = fields.Many2One('lims.range.type', 'Origin', required=True,
domain=[('use', '=', 'results_verification')])
class ResultsVerification(Wizard):
'Results Verification'
__name__ = 'lims.analysis_sheet.results_verification'
start_state = 'check'
check = StateTransition()
start = StateView('lims.analysis_sheet.results_verification.start',
'lims_analysis_sheet.analysis_sheet_results_verification_start_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Ok', 'verify', 'tryton-ok', default=True),
])
verify = StateTransition()
def _get_analysis_sheet_id(self):
return Transaction().context.get('lims_analysis_sheet', None)
def transition_check(self):
AnalysisSheet = Pool().get('lims.analysis_sheet')
sheet_id = self._get_analysis_sheet_id()
if sheet_id:
sheet = AnalysisSheet(sheet_id)
if sheet.state in ('active', 'validated'):
return 'start'
return 'end'
def default_start(self, fields):
RangeType = Pool().get('lims.range.type')
default = {}
default_range_type = RangeType.search([
('use', '=', 'results_verification'),
('by_default', '=', True),
])
if default_range_type:
default['range_type'] = default_range_type[0].id
return default
def transition_verify(self):
pool = Pool()
AnalysisSheet = pool.get('lims.analysis_sheet')
Data = pool.get('lims.interface.data')
sheet_id = self._get_analysis_sheet_id()
sheet = AnalysisSheet(sheet_id)
table_id = sheet.compilation.table.id
result_column = self._get_template_column(
'result', table_id)
if not result_column:
raise UserError(gettext('lims_analysis_sheet.'
'msg_template_not_result_field'))
result_field = result_column.name
verification_column = self._get_template_column(
'verification', table_id)
if not verification_column:
raise UserError(gettext('lims_analysis_sheet.'
'msg_template_not_verification_field'))
verification_field = verification_column.name
notebook_lines = {}
with Transaction().set_context(lims_interface_table=table_id):
lines = Data.search([('compilation', '=', sheet.compilation.id)])
for line in lines:
nl = line.notebook_line
if not nl:
continue
notebook_lines[line] = nl
if not notebook_lines:
return 'end'
for s_line, n_line in notebook_lines.items():
verification = self._get_result_verification(
getattr(s_line, result_field), n_line)
if verification is not None:
Data.write([s_line],
{verification_field: str(verification)})
return 'end'
def _get_template_column(self, field_name, table_id):
pool = Pool()
Field = pool.get('lims.interface.table.field')
table_column = Field.search([
('table', '=', table_id),
('transfer_field', '=', True),
('related_line_field.name', '=', field_name),
])
return table_column and table_column[0] or None
def _get_result_verification(self, result, notebook_line):
pool = Pool()
Range = pool.get('lims.range')
UomConversion = pool.get('lims.uom.conversion')
VolumeConversion = pool.get('lims.volume.conversion')
try:
result = float(result)
except (TypeError, ValueError):
return None
iu = notebook_line.initial_unit
if not iu:
return None
try:
ic = float(notebook_line.initial_concentration)
except (TypeError, ValueError):
return None
ranges = Range.search([
('range_type', '=', self.start.range_type),
('analysis', '=', notebook_line.analysis.id),
('product_type', '=', notebook_line.product_type.id),
('matrix', '=', notebook_line.matrix.id),
])
if not ranges:
return None
fu = ranges[0].uom
try:
fc = float(ranges[0].concentration)
except (TypeError, ValueError):
return None
if fu and fu.rec_name != '-':
converted_result = None
if (iu == fu and ic == fc):
converted_result = result
elif (iu != fu and ic == fc):
formula = UomConversion.get_conversion_formula(iu,
fu)
if not formula:
return None
variables = self._get_variables(formula, notebook_line)
parser = FormulaParser(formula, variables)
formula_result = parser.getValue()
converted_result = result * formula_result
elif (iu == fu and ic != fc):
converted_result = result * (fc / ic)
else:
formula = None
conversions = UomConversion.search([
('initial_uom', '=', iu),
('final_uom', '=', fu),
])
if conversions:
formula = conversions[0].conversion_formula
if not formula:
return None
variables = self._get_variables(formula, notebook_line)
parser = FormulaParser(formula, variables)
formula_result = parser.getValue()
if (conversions[0].initial_uom_volume and
conversions[0].final_uom_volume):
d_ic = VolumeConversion.brixToDensity(ic)
d_fc = VolumeConversion.brixToDensity(fc)
converted_result = (result * (fc / ic) *
(d_fc / d_ic) * formula_result)
else:
converted_result = (result * (fc / ic) *
formula_result)
result = float(converted_result)
return self._verificate_result(result, ranges[0])
def _get_variables(self, formula, notebook_line):
pool = Pool()
VolumeConversion = pool.get('lims.volume.conversion')
variables = {}
for var in ('DI',):
while True:
idx = formula.find(var)
if idx >= 0:
variables[var] = 0
formula = formula.replace(var, '_')
else:
break
for var in variables.keys():
if var == 'DI':
ic = float(notebook_line.final_concentration)
result = VolumeConversion.brixToDensity(ic)
if result:
variables[var] = result
return variables
def _verificate_result(self, result, range_):
if range_.min95 and range_.max95:
if result < range_.min:
return gettext('lims.msg_out')
elif result < range_.min95:
return gettext('lims.msg_ok*')
elif result <= range_.max95:
return gettext('lims.msg_ok')
elif result <= range_.max:
return gettext('lims.msg_ok*')
else:
return gettext('lims.msg_out')
else:
if (range_.min and result < range_.min):
return gettext('lims.msg_out')
elif (range_.max and result <= range_.max):
return gettext('lims.msg_ok')
else:
return gettext('lims.msg_out')
def end(self):
return 'reload'
class LimitsValidation(Wizard):
'Limits Validation'
__name__ = 'lims.analysis_sheet.limits_validation'
start_state = 'check'
check = StateTransition()
validate_limits = StateTransition()
def _get_analysis_sheet_id(self):
return Transaction().context.get('lims_analysis_sheet', None)
def transition_check(self):
AnalysisSheet = Pool().get('lims.analysis_sheet')
sheet_id = self._get_analysis_sheet_id()
if sheet_id:
sheet = AnalysisSheet(sheet_id)
if sheet.state in ('active', 'validated'):
return 'validate_limits'
return 'end'
def transition_validate_limits(self):
pool = Pool()
ModelData = pool.get('ir.model.data')
AnalysisSheet = pool.get('lims.analysis_sheet')
Data = pool.get('lims.interface.data')
NotebookLine = pool.get('lims.notebook.line')
unattended = Transaction().context.get('unattended', False)
sheet_id = self._get_analysis_sheet_id()
sheet = AnalysisSheet(sheet_id)
table_id = sheet.compilation.table.id
result_modifier_low = ModelData.get_id('lims', 'result_modifier_low')
result_modifier_nd = ModelData.get_id('lims', 'result_modifier_nd')
result_column = self._get_template_column(
'result', table_id)
if not result_column:
if unattended:
return 'end'
raise UserError(gettext('lims_analysis_sheet.'
'msg_template_not_result_field'))
result_field = result_column.name
result_modifier_column = self._get_template_column(
'result_modifier', table_id)
if not result_modifier_column:
if unattended:
return 'end'
raise UserError(gettext('lims_analysis_sheet.'
'msg_template_not_result_modifier_field'))
result_modifier_field = result_modifier_column.name
with Transaction().set_context(lims_interface_table=table_id):
lines = Data.search([
('compilation', '=', sheet.compilation.id),
('notebook_line', '!=', None),
])
for line in lines:
nl = line.notebook_line
result = getattr(line, result_field)
if result is None:
continue
result_modifier = getattr(line, result_modifier_field)
if result_modifier:
continue
try:
value = float(result)
except ValueError:
continue
try:
dl = float(nl.detection_limit)
ql = float(nl.quantification_limit)
except (TypeError, ValueError):
continue
ll = nl.lower_limit and float(nl.lower_limit) or None
ul = nl.upper_limit and float(nl.upper_limit) or None
data = {}
if (ll and value < ll) or (ul and value > ul):
raise UserError(gettext(
'lims.msg_error_limits_allowed',
line=nl.rec_name))
if dl < value and value < ql:
data[result_field] = str(ql)
data[result_modifier_field] = result_modifier_low
elif value < dl:
data[result_field] = None
data[result_modifier_field] = result_modifier_nd
elif value == dl:
data[result_field] = str(ql)
data[result_modifier_field] = result_modifier_low
else:
data[result_modifier_field] = None
if data:
Data.write([line], data)
if data[result_modifier_field]:
NotebookLine.write([nl], {'backup': str(value)})
return 'end'
def _get_template_column(self, field_name, table_id):
pool = Pool()
Field = pool.get('lims.interface.table.field')
table_column = Field.search([
('table', '=', table_id),
('transfer_field', '=', True),
('related_line_field.name', '=', field_name),
])
return table_column and table_column[0] or None
def end(self):
return 'reload'
class EvaluateRules(Wizard):
'Evaluate Rules'
__name__ = 'lims.analysis_sheet.evaluate_rules'
start_state = 'check'
check = StateTransition()
evaluate = StateTransition()
def _get_analysis_sheet_id(self):
return Transaction().context.get('lims_analysis_sheet', None)
def transition_check(self):
AnalysisSheet = Pool().get('lims.analysis_sheet')
sheet_id = self._get_analysis_sheet_id()
if sheet_id:
sheet = AnalysisSheet(sheet_id)
if sheet.state in ('active', 'validated'):
return 'evaluate'
return 'end'
def transition_evaluate(self):
pool = Pool()
AnalysisSheet = pool.get('lims.analysis_sheet')
Data = pool.get('lims.interface.data')
NotebookRule = pool.get('lims.rule')
sheet_id = self._get_analysis_sheet_id()
sheet = AnalysisSheet(sheet_id)
with Transaction().set_context(
lims_interface_table=sheet.compilation.table.id,
lims_interface_compilation=sheet.compilation.id,
lims_analysis_sheet=sheet.id):
lines = Data.search([
('compilation', '=', sheet.compilation.id),
('notebook_line', '!=', None),
])
for line in lines:
rules = NotebookRule.search([
('analysis', '=', line.notebook_line.analysis),
])
for rule in rules:
if rule.eval_sheet_condition(line):
rule.exec_sheet_action(line)
return 'end'
def end(self):
return 'reload'
class EditGroupedDataStart(ModelView):
'Edit Grouped Data'
__name__ = 'lims.analysis_sheet.edit_grouped_data.start'
data = fields.One2Many('lims.interface.data', None, 'Data')
class EditGroupedData(Wizard):
'Edit Grouped Data'
__name__ = 'lims.analysis_sheet.edit_grouped_data'
start_state = 'check'
check = StateTransition()
start = StateView('lims.analysis_sheet.edit_grouped_data.start',
'lims_analysis_sheet.analysis_sheet_edit_grouped_data_start_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Save', 'save', 'tryton-save', default=True),
])
save = StateTransition()
def _get_analysis_sheet_id(self):
return Transaction().context.get('lims_analysis_sheet', None)
def transition_check(self):
AnalysisSheet = Pool().get('lims.analysis_sheet')
line_id = Transaction().context.get('active_id', None)
sheet_id = self._get_analysis_sheet_id()
if line_id and sheet_id:
sheet = AnalysisSheet(sheet_id)
if sheet.state in ('active', 'validated', 'done'):
return 'start'
return 'end'
def default_start(self, fields):
pool = Pool()
AnalysisSheet = pool.get('lims.analysis_sheet')
Data = pool.get('lims.interface.data')
line_id = Transaction().context.get('active_id', None)
sheet_id = self._get_analysis_sheet_id()
sheet = AnalysisSheet(sheet_id)
fields = sheet.compilation.table.fields_
data = []
with Transaction().set_context(
lims_interface_table=sheet.compilation.table.id):
line = Data.search([
('compilation', '=', sheet.compilation.id),
('id', '=', line_id)])[0]
record = {
'notebook_line': line.notebook_line and line.notebook_line.id,
}
for field in fields:
if field.group:
continue
val = getattr(line, field.name, None)
if val is None:
continue
if field.type == 'many2one':
record[field.name] = val and val.id or None
else:
record[field.name] = val
grouped_fields = defaultdict(list)
for field in sheet.compilation.table.grouped_fields_:
grouped_fields[field.group].append(field)
for group, repetition_fields in grouped_fields.items():
for rep in sheet.template.interface.grouped_repetitions:
if rep.group == group:
reps = (rep.repetitions or 1) + 1
break
group_fields = []
for rep in range(1, reps):
grouped_record = {
'notebook_line': (line.notebook_line and
line.notebook_line.id),
'data': line.id,
'iteration': rep,
}
for field in repetition_fields:
val = getattr(line, '%s_%s' % (field.name, str(rep)))
if field.type == 'many2one':
grouped_record[field.name] = val and val.id or None
else:
grouped_record[field.name] = val
group_fields.append(grouped_record)
record['group_%s' % group] = group_fields
data.append(record)
defaults = {
'data': data,
}
return defaults
def transition_save(self):
pool = Pool()
AnalysisSheet = pool.get('lims.analysis_sheet')
Data = pool.get('lims.interface.data')
line_id = Transaction().context.get('active_id', None)
sheet_id = self._get_analysis_sheet_id()
sheet = AnalysisSheet(sheet_id)
if sheet.state not in ('active', 'validated'):
return 'end'
fields = sheet.compilation.table.fields_
grouped_fields = sheet.compilation.table.grouped_fields_
with Transaction().set_context(
lims_interface_table=sheet.compilation.table.id):
line = Data.search([
('compilation', '=', sheet.compilation.id),
('id', '=', line_id)])[0]
res = {}
for data in self.start.data:
groups = 0
for field in fields:
groups = max(groups, field.group or 0)
value = getattr(data, field.name)
if isinstance(value, list):
value = str(value)
elif not isinstance(value, ALLOWED_RESULT_TYPES):
value = value.tolist()
if isinstance(value, formulas.tokens.operand.XlError):
value = None
elif isinstance(value, list):
for x in chain(*value):
if isinstance(x, formulas.tokens.operand.XlError):
value = None
res[field.name] = value
for group in range(1, groups + 1):
for group_data in getattr(
data, 'group_%s' % group):
for field in grouped_fields:
if field.group != group:
continue
field_name = '%s_%s' % (
field.name, str(group_data['iteration']))
value = group_data[field.name]
if value != 0.0 and not value:
continue
if isinstance(value, list):
value = str(value)
elif not isinstance(value, ALLOWED_RESULT_TYPES):
value = value.tolist()
if isinstance(
value, formulas.tokens.operand.XlError):
value = None
elif isinstance(value, list):
for x in chain(*value):
if isinstance(
x, formulas.tokens.operand.XlError):
value = None
res[field_name] = value
Data.write([line], res)
return 'end'
def end(self):
return 'reload'
class EditMultiSampleDataStart(ModelView):
'Edit Multi Sample Data'
__name__ = 'lims.analysis_sheet.edit_multi_sample_data.start'
data = fields.One2Many('lims.interface.multi_sample_data', None, 'Data')
@classmethod
def __setup__(cls):
super().__setup__()
cls.__rpc__['fields_view_get'].cache = None
cls.__rpc__['default_get'].cache = None
@classmethod
def fields_view_get(cls, view_id=None, view_type='form', level=None):
result = super().fields_view_get(view_id, view_type, level)
key = (cls.__name__, view_id, view_type, level)
cls._fields_view_get_cache.set(key, False)
return result
class MultiSampleData(ModelView):
'Multi Sample Data'
__name__ = 'lims.interface.multi_sample_data'
fraction = fields.Char('Sample', readonly=True)
@classmethod
def __setup__(cls):
super().__setup__()
cls.__rpc__['fields_view_get'].cache = None
cls.__rpc__['default_get'].cache = None
def __init__(self, id=None, **kwargs):
kwargs_copy = kwargs.copy()
for kw in kwargs_copy:
kwargs.pop(kw, None)
super().__init__(id, **kwargs)
self._values = {}
for kw in kwargs_copy:
self._values[kw] = kwargs_copy[kw]
def __getattr__(self, name):
try:
return super().__getattr__(name)
except AttributeError:
pass
@classmethod
def fields_view_get(cls, view_id=None, view_type='form', level=None):
if Pool().test:
return
pool = Pool()
AnalysisSheet = pool.get('lims.analysis_sheet')
Data = pool.get('lims.interface.data')
sheet = AnalysisSheet(Transaction().context.get('active_id'))
fields = {
'fraction': {'default_width': 100, 'order': 0},
}
add_analysis_columns = False
order = 1
for view_column in sheet.view.columns:
if not view_column.analysis_specific:
fields[view_column.column.alias] = {
'default_width': view_column.column.default_width or 100,
'order': order,
}
order += 1
else:
add_analysis_columns = True
if add_analysis_columns:
analysis_codes = set()
with Transaction().set_context(
lims_interface_table=sheet.compilation.table.id):
lines = Data.search([
('compilation', '=', sheet.compilation.id),
])
for line in lines:
analysis_codes.add((line.notebook_line.analysis.code,
line.notebook_line.analysis.order))
for analysis, order in list(analysis_codes):
fields[analysis] = {
'default_width': 60,
'order': order,
}
res = {
'type': 'tree',
'view_id': view_id,
'field_childs': None,
'arch': cls.get_tree_multi_sample_view(fields),
'fields': cls.fields_get(),
'model': cls.__name__,
}
return res
@classmethod
def fields_get(cls, fields=None):
pool = Pool()
AnalysisSheet = pool.get('lims.analysis_sheet')
Data = pool.get('lims.interface.data')
sheet = AnalysisSheet(Transaction().context.get('active_id'))
readonly = (sheet.state not in ('active', 'validated'))
res = {
'fraction': {
'name': 'fraction',
'string': 'Sample',
'type': 'char',
'help': '',
'readonly': True,
},
}
add_analysis_columns = False
analysis_column_field = None
analysis_column_type = 'float'
for view_column in sheet.view.columns:
if not view_column.analysis_specific:
name = view_column.column.alias
res[name] = {
'name': name,
'string': view_column.column.name,
'type': view_column.column.type_,
'help': '',
'readonly': bool(view_column.column.expression or
view_column.column.readonly or readonly),
}
if view_column.column.expression:
parser = formulas.Parser()
ast = parser.ast(
view_column.column.expression)[1].compile()
inputs = (' '.join([x for x in ast.inputs])).lower()
if inputs:
inputs = list(set(inputs.split()))
res[name]['on_change_with'] = inputs
cls.add_on_change_with_method(view_column.column)
func_name = '%s_%s' % ('on_change_with', name)
cls.__rpc__.setdefault(func_name, RPC(instantiate=0))
else:
add_analysis_columns = True
analysis_column_field = view_column.analysis_field
analysis_column_type = view_column.column.type_
if add_analysis_columns:
analysis_codes = set()
analysis_strings = dict()
with Transaction().set_context(
lims_interface_table=sheet.compilation.table.id):
lines = Data.search([
('compilation', '=', sheet.compilation.id),
])
for line in lines:
code = line.notebook_line.analysis.code
analysis_codes.add(code)
analysis_strings[code] = getattr(
line.notebook_line.analysis, analysis_column_field)
for analysis in list(analysis_codes):
res[analysis] = {
'name': analysis,
'string': analysis_strings[analysis],
'type': analysis_column_type,
'help': '',
'readonly': readonly,
}
return res
@classmethod
def get_tree_multi_sample_view(cls, fields):
fields = cls._get_fields_tree_multi_sample_view(fields)
return ('<?xml version="1.0"?>\n'
'<tree editable="1">\n'
'%s\n'
'</tree>') % ('\n'.join(fields))
@classmethod
def _get_fields_tree_multi_sample_view(cls, fields):
view_fields = []
for field, values in sorted(
fields.items(), key=lambda x: x[1]['order']):
view_fields.append('<field name="%s" width="%s"/>' % (
field, values['default_width']))
return view_fields
@classmethod
def add_on_change_with_method(cls, column):
fn_name = 'on_change_with_' + column.alias
def fn(self):
parser = formulas.Parser()
ast = parser.ast(column.expression)[1].compile()
inputs = (' '.join([x for x in ast.inputs])).lower().split()
inputs = [getattr(self, x) for x in inputs]
try:
value = ast(*inputs)
except schedula.utils.exc.DispatcherError as e:
raise UserError(e.args[0] % e.args[1:])
if isinstance(value, list):
value = str(value)
elif not isinstance(value, ALLOWED_RESULT_TYPES):
value = value.tolist()
if isinstance(value, formulas.tokens.operand.XlError):
value = None
elif isinstance(value, list):
for x in chain(*value):
if isinstance(x, formulas.tokens.operand.XlError):
value = None
return value
setattr(cls, fn_name, fn)
class EditMultiSampleData(Wizard):
'Edit Multi Sample Data'
__name__ = 'lims.analysis_sheet.edit_multi_sample_data'
start_state = 'check'
check = StateTransition()
start = StateView('lims.analysis_sheet.edit_multi_sample_data.start',
'lims_analysis_sheet.analysis_sheet_edit_multi_sample_data_start'
'_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Save', 'save', 'tryton-save', default=True),
])
save = StateTransition()
def transition_check(self):
AnalysisSheet = Pool().get('lims.analysis_sheet')
sheet_id = Transaction().context.get('active_id', None)
if sheet_id:
sheet = AnalysisSheet(sheet_id)
if sheet.view and sheet.state in ('active', 'validated', 'done'):
return 'start'
return 'end'
def default_start(self, fields):
pool = Pool()
AnalysisSheet = pool.get('lims.analysis_sheet')
Data = pool.get('lims.interface.data')
sheet_id = Transaction().context.get('active_id', None)
sheet = AnalysisSheet(sheet_id)
records = {}
with Transaction().set_context(
lims_interface_table=sheet.compilation.table.id):
lines = Data.search([('compilation', '=', sheet.compilation.id)])
for line in lines:
fraction = line.notebook_line.fraction.number
if fraction not in records.keys():
records[fraction] = {'fraction': fraction}
for view_column in sheet.view.columns:
if view_column.analysis_specific:
alias = line.notebook_line.analysis.code
else:
alias = view_column.column.alias
records[fraction][alias] = getattr(
line, view_column.column.alias)
data = list(records.values())
defaults = {
'data': data,
}
return defaults
def transition_save(self):
pool = Pool()
AnalysisSheet = pool.get('lims.analysis_sheet')
Data = pool.get('lims.interface.data')
sheet_id = Transaction().context.get('active_id', None)
sheet = AnalysisSheet(sheet_id)
if sheet.state not in ('active', 'validated'):
return 'end'
columns = {}
for view_column in sheet.view.columns:
if view_column.column.expression:
continue
if view_column.analysis_specific:
analysis_codes = set()
with Transaction().set_context(
lims_interface_table=sheet.compilation.table.id):
lines = Data.search([
('compilation', '=', sheet.compilation.id),
])
for line in lines:
analysis_codes.add(line.notebook_line.analysis.code)
for alias in list(analysis_codes):
columns[alias] = {
'field': view_column.column.alias,
'is_analysis': True,
}
else:
alias = view_column.column.alias
columns[alias] = {
'field': view_column.column.alias,
'is_analysis': False,
}
with Transaction().set_context(
lims_interface_table=sheet.compilation.table.id):
for data in self.start.data:
fraction_clause = [
('compilation', '=', sheet.compilation.id),
('notebook_line.fraction.number', '=', data.fraction),
]
for alias, field in columns.items():
value = getattr(data, alias)
if field['is_analysis']:
analysis_clause = [
('notebook_line.analysis.code', '=', alias),
]
else:
analysis_clause = []
lines = Data.search(fraction_clause + analysis_clause)
if not lines:
continue
Data.write(lines, {field['field']: value})
return 'end'
def end(self):
return 'reload'
def _save(self):
pass
class MoveDataStart(ModelView):
'Move Data'
__name__ = 'lims.analysis_sheet.move_data.start'
move_to = fields.Selection([
('new', 'New sheet'),
('exist', 'Existing sheet'),
], 'Move to', required=True)
analysis_sheet = fields.Many2One('lims.analysis_sheet',
'Analysis Sheet', required=True,
domain=[
('compilation.table', '=', Eval('table')),
('state', 'in', ['draft', 'active']),
],
states={
'invisible': Eval('move_to') != 'exist',
'required': Eval('move_to') == 'exist',
},
depends=['move_to', 'table'])
table = fields.Many2One('lims.interface.table', 'Table')
class MoveData(Wizard):
'Move Data'
__name__ = 'lims.analysis_sheet.move_data'
start_state = 'check'
check = StateTransition()
start = StateView('lims.analysis_sheet.move_data.start',
'lims_analysis_sheet.analysis_sheet_move_data_start_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Move', 'move', 'tryton-ok', default=True),
])
move = StateTransition()
def _get_analysis_sheet_id(self):
return Transaction().context.get('lims_analysis_sheet', None)
def transition_check(self):
AnalysisSheet = Pool().get('lims.analysis_sheet')
line_ids = Transaction().context.get('active_ids', None)
sheet_id = self._get_analysis_sheet_id()
if line_ids and sheet_id:
sheet = AnalysisSheet(sheet_id)
if sheet.state == 'active':
return 'start'
return 'end'
def default_start(self, fields):
AnalysisSheet = Pool().get('lims.analysis_sheet')
sheet_id = self._get_analysis_sheet_id()
sheet = AnalysisSheet(sheet_id)
defaults = {
'move_to': 'new',
'table': sheet.compilation.table.id,
}
return defaults
def transition_move(self):
pool = Pool()
AnalysisSheet = pool.get('lims.analysis_sheet')
Data = pool.get('lims.interface.data')
line_ids = Transaction().context.get('active_ids', None)
sheet_id = self._get_analysis_sheet_id()
sheet = AnalysisSheet(sheet_id)
if self.start.move_to == 'new':
with Transaction().set_user(0):
target = AnalysisSheet()
target.template = sheet.template
target.compilation = sheet.get_new_compilation({
'table': sheet.compilation.table.id,
'revision': sheet.compilation.revision,
})
target.professional = sheet.professional
target.laboratory = sheet.laboratory
target.save()
else:
target = self.start.analysis_sheet
with Transaction().set_context(
lims_interface_table=sheet.compilation.table.id):
lines = Data.search([
('compilation', '=', sheet.compilation.id),
('id', 'in', line_ids),
])
Data.write(lines, {
'compilation': target.compilation.id,
})
return 'end'
def end(self):
return 'reload'