kalenislims/lims_instrument/resultsimport.py

673 lines
28 KiB
Python

# -*- coding: utf-8 -*-
# This file is part of lims_instrument module for Tryton.
# The COPYRIGHT file at the top level of this repository contains
# the full copyright notices and license terms.
import io
import traceback
import xlrd
from xlutils.copy import copy
from datetime import datetime
from sql import Null, Literal
from trytond.model import ModelView, ModelSQL, fields, Unique
from trytond.wizard import Wizard, StateView, StateTransition, Button
from trytond.pool import Pool, PoolMeta
from trytond.transaction import Transaction
from trytond.exceptions import UserError
from trytond.i18n import gettext
class NotebookLine(metaclass=PoolMeta):
__name__ = 'lims.notebook.line'
imported_result = fields.Char('Imported Result')
imported_literal_result = fields.Char('Imported Literal result')
imported_end_date = fields.Date('Imported End date')
imported_professionals = fields.Char('Imported Professionals')
imported_chromatogram = fields.Char('Imported Chromatogram')
imported_device = fields.Many2One('lims.lab.device', 'Imported Device')
imported_dilution_factor = fields.Float('Imported Dilution factor')
imported_rm_correction_formula = fields.Char(
'Imported RM Correction Formula')
imported_inj_date = fields.Date('Imported Inject date')
imported_trace_report = fields.Boolean('Imported Trace report')
class BaseImport(object):
controller = None
infile = None
rawresults = {}
mimetype = None
numline = 0
analysis_code = None
formula = None
header = []
def getInputFile(self):
return self.infile
def setInputFile(self, infile):
self.infile = infile
def loadController(self):
self.controller = None
def parse(self, infile):
self.rawresults = {}
if not self.controller:
self.loadController()
try:
return self.controller.parse(self, infile)
except AttributeError:
traceback.print_exc()
raise UserError(gettext('lims_instrument.msg_not_implemented',
function='parse'))
def exportResults(self):
'''
This function defines whether the importer
export results to a file at the end of the process.
Default is False
'''
if not self.controller:
self.loadController()
try:
return self.controller.exportResults(self)
except AttributeError:
return False
class ResultsImport(BaseImport, ModelSQL, ModelView):
'Results Import'
__name__ = 'lims.resultsimport'
_rec_name = 'description'
name = fields.Selection([], 'Name', required=True, sort=False,
depends=['description'])
description = fields.Char('Description', required=True)
@classmethod
def __setup__(cls):
super().__setup__()
t = cls.__table__()
cls._sql_constraints += [
('name_uniq', Unique(t, t.name),
'lims_instrument.msg_results_importer_unique_id'),
]
@fields.depends('name')
def on_change_with_description(self, name=None):
description = None
if self.name:
self.loadController()
if self.controller:
try:
description = self.controller.getControllerName()
except AttributeError:
raise UserError(gettext('lims_instrument.msg_not_implemented',
function='getControllerName'))
return description
def loadController(self):
raise UserError(gettext('lims_instrument.msg_not_module',
module=self.name))
class NotebookLoadResultsFileStart(ModelView):
'Load Results from File'
__name__ = 'lims.notebook.load_results_file.start'
results_importer = fields.Many2One('lims.resultsimport',
'Results importer', required=True)
infile_01 = fields.Binary('File 01', filename='name_01')
name_01 = fields.Char('Name 01', readonly=True)
infile_02 = fields.Binary('File 02', filename='name_02')
name_02 = fields.Char('Name 02', readonly=True)
infile_03 = fields.Binary('File 03', filename='name_03')
name_03 = fields.Char('Name 03', readonly=True)
infile_04 = fields.Binary('File 04', filename='name_04')
name_04 = fields.Char('Name 04', readonly=True)
infile_05 = fields.Binary('File 05', filename='name_05')
name_05 = fields.Char('Name 05', readonly=True)
infile_06 = fields.Binary('File 06', filename='name_06')
name_06 = fields.Char('Name 06', readonly=True)
infile_07 = fields.Binary('File 07', filename='name_07')
name_07 = fields.Char('Name 07', readonly=True)
infile_08 = fields.Binary('File 08', filename='name_08')
name_08 = fields.Char('Name 08', readonly=True)
infile_09 = fields.Binary('File 09', filename='name_09')
name_09 = fields.Char('Name 09', readonly=True)
infile_10 = fields.Binary('File 10', filename='name_10')
name_10 = fields.Char('Name 10', readonly=True)
infile_11 = fields.Binary('File 11', filename='name_11')
name_11 = fields.Char('Name 11', readonly=True)
infile_12 = fields.Binary('File 12', filename='name_12')
name_12 = fields.Char('Name 12', readonly=True)
infile_13 = fields.Binary('File 13', filename='name_13')
name_13 = fields.Char('Name 13', readonly=True)
infile_14 = fields.Binary('File 14', filename='name_14')
name_14 = fields.Char('Name 14', readonly=True)
infile_15 = fields.Binary('File 15', filename='name_15')
name_15 = fields.Char('Name 15', readonly=True)
infile_16 = fields.Binary('File 16', filename='name_16')
name_16 = fields.Char('Name 16', readonly=True)
infile_17 = fields.Binary('File 17', filename='name_17')
name_17 = fields.Char('Name 17', readonly=True)
infile_18 = fields.Binary('File 18', filename='name_18')
name_18 = fields.Char('Name 18', readonly=True)
infile_19 = fields.Binary('File 19', filename='name_19')
name_19 = fields.Char('Name 19', readonly=True)
infile_20 = fields.Binary('File 20', filename='name_20')
name_20 = fields.Char('Name 20', readonly=True)
infile_21 = fields.Binary('File 21', filename='name_21')
name_21 = fields.Char('Name 21', readonly=True)
infile_22 = fields.Binary('File 22', filename='name_22')
name_22 = fields.Char('Name 22', readonly=True)
infile_23 = fields.Binary('File 23', filename='name_23')
name_23 = fields.Char('Name 23', readonly=True)
infile_24 = fields.Binary('File 24', filename='name_24')
name_24 = fields.Char('Name 24', readonly=True)
infile_25 = fields.Binary('File 25', filename='name_25')
name_25 = fields.Char('Name 25', readonly=True)
infile_26 = fields.Binary('File 26', filename='name_26')
name_26 = fields.Char('Name 26', readonly=True)
infile_27 = fields.Binary('File 27', filename='name_27')
name_27 = fields.Char('Name 27', readonly=True)
infile_28 = fields.Binary('File 28', filename='name_28')
name_28 = fields.Char('Name 28', readonly=True)
infile_29 = fields.Binary('File 29', filename='name_29')
name_29 = fields.Char('Name 29', readonly=True)
infile_30 = fields.Binary('File 30', filename='name_30')
name_30 = fields.Char('Name 30', readonly=True)
infile_31 = fields.Binary('File 31', filename='name_31')
name_31 = fields.Char('Name 31', readonly=True)
infile_32 = fields.Binary('File 32', filename='name_32')
name_32 = fields.Char('Name 32', readonly=True)
infile_33 = fields.Binary('File 33', filename='name_33')
name_33 = fields.Char('Name 33', readonly=True)
infile_34 = fields.Binary('File 34', filename='name_34')
name_34 = fields.Char('Name 34', readonly=True)
infile_35 = fields.Binary('File 35', filename='name_35')
name_35 = fields.Char('Name 35', readonly=True)
infile_36 = fields.Binary('File 36', filename='name_36')
name_36 = fields.Char('Name 36', readonly=True)
infile_37 = fields.Binary('File 37', filename='name_37')
name_37 = fields.Char('Name 37', readonly=True)
infile_38 = fields.Binary('File 38', filename='name_38')
name_38 = fields.Char('Name 38', readonly=True)
infile_39 = fields.Binary('File 39', filename='name_39')
name_39 = fields.Char('Name 39', readonly=True)
infile_40 = fields.Binary('File 40', filename='name_40')
name_40 = fields.Char('Name 40', readonly=True)
infile_41 = fields.Binary('File 41', filename='name_41')
name_41 = fields.Char('Name 41', readonly=True)
infile_42 = fields.Binary('File 42', filename='name_42')
name_42 = fields.Char('Name 42', readonly=True)
infile_43 = fields.Binary('File 43', filename='name_43')
name_43 = fields.Char('Name 43', readonly=True)
infile_44 = fields.Binary('File 44', filename='name_44')
name_44 = fields.Char('Name 44', readonly=True)
infile_45 = fields.Binary('File 45', filename='name_45')
name_45 = fields.Char('Name 45', readonly=True)
infile_46 = fields.Binary('File 46', filename='name_46')
name_46 = fields.Char('Name 46', readonly=True)
infile_47 = fields.Binary('File 47', filename='name_47')
name_47 = fields.Char('Name 47', readonly=True)
infile_48 = fields.Binary('File 48', filename='name_48')
name_48 = fields.Char('Name 48', readonly=True)
infile_49 = fields.Binary('File 49', filename='name_49')
name_49 = fields.Char('Name 49', readonly=True)
infile_50 = fields.Binary('File 50', filename='name_50')
name_50 = fields.Char('Name 50', readonly=True)
infile_51 = fields.Binary('File 51', filename='name_51')
name_51 = fields.Char('Name 51', readonly=True)
infile_52 = fields.Binary('File 52', filename='name_52')
name_52 = fields.Char('Name 52', readonly=True)
infile_53 = fields.Binary('File 53', filename='name_53')
name_53 = fields.Char('Name 53', readonly=True)
infile_54 = fields.Binary('File 54', filename='name_54')
name_54 = fields.Char('Name 54', readonly=True)
infile_55 = fields.Binary('File 55', filename='name_55')
name_55 = fields.Char('Name 55', readonly=True)
infile_56 = fields.Binary('File 56', filename='name_56')
name_56 = fields.Char('Name 56', readonly=True)
infile_57 = fields.Binary('File 57', filename='name_57')
name_57 = fields.Char('Name 57', readonly=True)
infile_58 = fields.Binary('File 58', filename='name_58')
name_58 = fields.Char('Name 58', readonly=True)
infile_59 = fields.Binary('File 59', filename='name_59')
name_59 = fields.Char('Name 59', readonly=True)
infile_60 = fields.Binary('File 60', filename='name_60')
name_60 = fields.Char('Name 60', readonly=True)
class NotebookLoadResultsFileEmpty(ModelView):
'Load Results from File Empty'
__name__ = 'lims.notebook.load_results_file.empty'
class NotebookLoadResultsFileResult(ModelView):
'Process Results from File'
__name__ = 'lims.notebook.load_results_file.result'
result_lines = fields.One2Many('lims.notebook.line', None, 'Lines')
class NotebookLoadResultsFileWarning(ModelView):
'Load Results from File Warning'
__name__ = 'lims.notebook.load_results_file.warning'
msg = fields.Text('Message')
class NotebookLoadResultsFileExport(ModelView):
"Export Results from File"
__name__ = 'lims.notebook.load_results_file.export'
file = fields.Binary('File', readonly=True)
class NotebookLoadResultsFile(Wizard):
'Load Results from File'
__name__ = 'lims.notebook.load_results_file'
start = StateView('lims.notebook.load_results_file.start',
'lims_instrument.lims_load_results_file_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Collect', 'collect', 'tryton-forward', default=True),
])
collect = StateTransition()
empty = StateView('lims.notebook.load_results_file.empty',
'lims_instrument.lims_load_results_file_empty_view_form', [
Button('Try again', 'start', 'tryton-forward'),
Button('Close', 'end', 'tryton-close', default=True),
])
result = StateView('lims.notebook.load_results_file.result',
'lims_instrument.lims_load_results_file_result_view_form', [
Button('Cancel', 'cancel', 'tryton-cancel'),
Button('Confirm', 'confirm', 'tryton-ok', default=True),
])
confirm = StateTransition()
cancel = StateTransition()
warning = StateView('lims.notebook.load_results_file.warning',
'lims_instrument.lims_load_results_file_warning_view_form', [
Button('Ok', 'close', 'tryton-ok'),
])
close = StateTransition()
export = StateView('lims.notebook.load_results_file.export',
'lims_instrument.lims_load_results_file_export_view_form', [
Button('Done', 'end', 'tryton-close', default=True),
])
def transition_collect(self):
cursor = Transaction().connection.cursor()
pool = Pool()
Fraction = pool.get('lims.fraction')
Notebook = pool.get('lims.notebook')
NotebookLine = pool.get('lims.notebook.line')
Analysis = pool.get('lims.analysis')
lines = []
for fline in [str(item).zfill(2) for item in range(1, 61)]:
file_ = getattr(self.start, 'infile_%s' % fline)
if not file_:
continue
self.start.results_importer.parse(file_)
raw_results = self.start.results_importer.rawresults
fractions_numbers = list(raw_results.keys())
if not fractions_numbers:
continue
numbers = '\', \''.join(str(n) for n in fractions_numbers)
cursor.execute('SELECT id, number '
'FROM "' + Fraction._table + '" '
'WHERE number IN (\'' + numbers + '\') '
'ORDER BY number ASC')
for f in cursor.fetchall():
cursor.execute('SELECT id '
'FROM "' + Notebook._table + '" '
'WHERE fraction = %s '
'LIMIT 1', (f[0],))
notebook = cursor.fetchone()
if not notebook:
continue
for analysis in list(raw_results[f[1]].keys()):
cursor.execute('SELECT id '
'FROM "' + Analysis._table + '" '
'WHERE code = %s '
'AND automatic_acquisition = TRUE '
'LIMIT 1', (analysis,))
if not cursor.fetchone():
continue
for rep in list(raw_results[f[1]][analysis].keys()):
clause = [
('notebook', '=', notebook[0]),
('analysis', '=', analysis),
('repetition', '=', rep),
('start_date', '!=', None),
('result', 'in', [None, '']),
('converted_result', 'in', [None, '']),
('literal_result', 'in', [None, '']),
['OR', ('result_modifier', '=', None),
('result_modifier.code', 'not in',
['d', 'nd', 'pos', 'neg', 'ni', 'abs',
'pre', 'na'])],
['OR', ('converted_result_modifier', '=', None),
('converted_result_modifier.code', 'not in',
['d', 'nd', 'pos', 'neg', 'ni', 'abs',
'pre'])],
]
line = NotebookLine.search(clause, limit=1)
if not line:
continue
data = raw_results[f[1]][analysis][rep]
res = self.get_results(line[0], data)
if res:
NotebookLine.write([line[0]], res)
lines.append(line[0])
if lines:
self.result.result_lines = [l.id for l in lines]
return 'result'
return 'empty'
def get_results(self, line, data):
pool = Pool()
Device = pool.get('lims.lab.device')
res = {}
if 'result' in data or 'literal_result' in data:
if 'result' in data:
res['imported_result'] = str(float(data['result']))
if 'literal_result' in data:
res['imported_literal_result'] = data['literal_result']
res['imported_end_date'] = (data['end_date'] if 'end_date' in data
else line.end_date)
res['imported_inj_date'] = (data['injection_date']
if 'injection_date' in data else None)
if 'professionals' in data:
res['imported_professionals'] = data['professionals']
if 'chromatogram' in data:
res['imported_chromatogram'] = data['chromatogram']
device = data['device'] if 'device' in data else None
if device:
dev = Device.search([('code', '=', device)])
if dev:
res['imported_device'] = dev[0].id
if 'dilution_factor' in data:
res['imported_dilution_factor'] = data['dilution_factor']
if 'rm_correction_formula' in data:
res['imported_rm_correction_formula'] = (
data['rm_correction_formula'])
if 'trace_report' in data:
res['imported_trace_report'] = data['trace_report']
return res
def default_result(self, fields):
default = {}
default['result_lines'] = [l.id for l in self.result.result_lines]
return default
def get_professionals(self, professionals_codes):
'''
This function gets a string with one or more professionals codes,
separated by commas, like: 'ABC' or 'JLB, ABC'
It returns the professionals
'''
cursor = Transaction().connection.cursor()
pool = Pool()
Professional = pool.get('lims.laboratory.professional')
res = []
professionals = ''.join(professionals_codes.split())
professionals = professionals.split(',')
for professional in professionals:
cursor.execute('SELECT id, code '
'FROM "' + Professional._table + '" '
'WHERE code = %s '
'LIMIT 1', (professional,))
prof = cursor.fetchone()
if not prof:
return []
res.append(prof)
return res
def check_professionals(self, professionals, method):
cursor = Transaction().connection.cursor()
pool = Pool()
LabProfessionalMethod = pool.get('lims.lab.professional.method')
validated = False
msg = ''
for professional in professionals:
cursor.execute('SELECT state '
'FROM "' + LabProfessionalMethod._table + '" '
'WHERE professional = %s '
'AND method = %s '
'AND type = \'analytical\' '
'LIMIT 1', (professional[0], method.id))
qualification = cursor.fetchone()
if not qualification:
validated = False
msg += '%s not qualified for method: %s' % (
professional[1], method.code)
return validated, msg
elif qualification[0] == 'training':
if not validated:
msg += '%s in training for method: %s. ' \
'Add qualified professional' % (
professional[1], method.code)
elif (qualification[0] in ('qualified', 'requalified')):
validated = True
return validated, msg
def transition_confirm(self):
cursor = Transaction().connection.cursor()
pool = Pool()
ModelData = pool.get('ir.model.data')
NotebookLine = pool.get('lims.notebook.line')
AnalyticProfessional = pool.get('lims.notebook.line.professional')
sql_table = NotebookLine.__table__()
NOW = datetime.now()
warnings = False
messages = ''
export_results = self.start.results_importer.exportResults()
result_modifier_na = ModelData.get_id('lims', 'result_modifier_na')
previous_professionals = []
new_professionals = []
for line in self.result.result_lines:
columns = []
values = []
prevent_line = False
outcome = 'OK'
if line.imported_result != '-1000.0':
if not line.imported_end_date:
prevent_line = True
outcome = gettext('lims_instrument.msg_end_date')
elif (line.imported_end_date and line.start_date and
line.start_date > line.imported_end_date):
prevent_line = True
outcome = gettext(
'lims_instrument.msg_end_date_start_date')
elif (line.imported_inj_date and line.start_date and
line.start_date > line.imported_inj_date):
prevent_line = True
outcome = gettext(
'lims_instrument.msg_inj_date_start_date')
elif (line.imported_end_date and line.imported_inj_date and
line.imported_inj_date > line.imported_end_date):
prevent_line = True
outcome = gettext('lims_instrument.msg_inj_date_end_date')
else:
columns.append(sql_table.result)
values.append(line.imported_result)
columns.append(sql_table.end_date)
values.append(line.imported_end_date)
columns.append(sql_table.injection_date)
values.append(line.imported_inj_date)
else:
columns.append(sql_table.result)
values.append(Null)
columns.append(sql_table.result_modifier)
values.append(result_modifier_na)
columns.append(sql_table.report)
values.append(Literal(False))
columns.append(sql_table.annulled)
values.append(Literal(True))
columns.append(sql_table.annulment_date)
values.append(NOW)
columns.append(sql_table.end_date)
values.append(Null)
columns.append(sql_table.injection_date)
values.append(Null)
columns.append(sql_table.literal_result)
values.append(line.imported_literal_result)
columns.append(sql_table.chromatogram)
values.append(line.imported_chromatogram)
columns.append(sql_table.device)
values.append(line.imported_device and
line.imported_device.id or Null)
columns.append(sql_table.dilution_factor)
values.append(line.imported_dilution_factor)
columns.append(sql_table.rm_correction_formula)
values.append(line.imported_rm_correction_formula)
columns.append(sql_table.trace_report)
values.append(line.imported_trace_report)
line_previous_professionals = []
if line.imported_professionals:
profs = self.get_professionals(line.imported_professionals)
if profs:
validated, msg = self.check_professionals(
profs, line.method)
if validated:
line_previous_professionals = [p for p in
line.professionals]
line_new_professionals = [{
'notebook_line': line.id,
'professional': p[0],
} for p in profs]
else:
prevent_line = True
outcome = msg
else:
prevent_line = True
outcome = gettext('lims_instrument.msg_professionals',
code=str(line.imported_professionals))
if prevent_line:
warnings = True
message = '%s [%s] (%s): %s\n' % (
line.fraction.number, line.analysis.code, line.repetition,
outcome)
messages += message
# Update rawresults
if export_results:
rawresults = self.start.results_importer.rawresults
number = line.fraction.number
if number in rawresults:
code = line.analysis.code
if code in rawresults[number]:
rep = line.repetition
if rep in rawresults[number][code]:
rawresults[number][code][rep]['outcome'] = (
outcome)
else:
previous_professionals.extend(line_previous_professionals)
new_professionals.extend(line_new_professionals)
columns.append(sql_table.imported_result)
values.append(Null)
columns.append(sql_table.imported_literal_result)
values.append(Null)
columns.append(sql_table.imported_end_date)
values.append(Null)
columns.append(sql_table.imported_professionals)
values.append(Null)
columns.append(sql_table.imported_chromatogram)
values.append(Null)
columns.append(sql_table.imported_device)
values.append(Null)
columns.append(sql_table.imported_dilution_factor)
values.append(Null)
columns.append(sql_table.imported_rm_correction_formula)
values.append(Null)
columns.append(sql_table.imported_inj_date)
values.append(Null)
columns.append(sql_table.imported_trace_report)
values.append(Literal(False))
# Write Results to Notebook lines
cursor.execute(*sql_table.update(
columns, values,
where=(sql_table.id == line.id)))
# Update Professionals
AnalyticProfessional.delete(previous_professionals)
AnalyticProfessional.create(new_professionals)
if warnings:
self.warning.msg = messages
return 'warning'
if export_results:
return 'end' # 'export'
return 'end'
def transition_cancel(self):
NotebookLine = Pool().get('lims.notebook.line')
# Clean results froms Notebook lines
notebook_line_clean = {
'imported_result': None,
'imported_literal_result': None,
'imported_end_date': None,
'imported_professionals': None,
'imported_chromatogram': None,
'imported_device': None,
'imported_dilution_factor': None,
'imported_rm_correction_formula': None,
'imported_inj_date': None,
'imported_trace_report': False,
}
NotebookLine.write(
list(self.result.result_lines), notebook_line_clean)
return 'end'
def default_warning(self, fields):
defaults = {}
if self.warning.msg:
defaults['msg'] = self.warning.msg
return defaults
def transition_close(self):
if self.start.results_importer.exportResults():
return 'end' # 'export'
return 'end'
def default_export(self, fields):
rawresults = self.start.results_importer.rawresults
filedata = io.StringIO(self.start.infile) # TODO: refactoring
workbook = xlrd.open_workbook(file_contents=filedata.getvalue(),
formatting_info=True)
wb_copy = copy(workbook)
for fraction in rawresults:
for analysis in rawresults[fraction]:
for rep in rawresults[fraction][analysis]:
repetition = rawresults[fraction][analysis][rep]
if 'outcome' in repetition and 'status_cell' in repetition:
sheet, row, col = repetition['status_cell']
wb_sheet = wb_copy.get_sheet(sheet)
wb_sheet.write(row, col, repetition['outcome'])
output = io.StringIO()
wb_copy.save(output)
return {'file': bytearray(output.getvalue())}